You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2020/10/27 03:48:44 UTC
[rocketmq-exporter] branch master updated: chore(code-style): fix
code style issue to avoid build error and add travis CI.
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-exporter.git
The following commit(s) were added to refs/heads/master by this push:
new 475579b chore(code-style): fix code style issue to avoid build error and add travis CI.
new e834d3f Merge pull request #39 from RongtongJin/master
475579b is described below
commit 475579b7d49e5199db36dc2581d5ddb735bff5e9
Author: RongtongJin <79...@qq.com>
AuthorDate: Tue Oct 27 11:10:00 2020 +0800
chore(code-style): fix code style issue to avoid build error and add travis CI.
---
.travis.yaml | 20 +++
.../rocketmq/exporter/task/MetricsCollectTask.java | 143 ++++++++++-----------
2 files changed, 91 insertions(+), 72 deletions(-)
diff --git a/.travis.yaml b/.travis.yaml
new file mode 100644
index 0000000..686026c
--- /dev/null
+++ b/.travis.yaml
@@ -0,0 +1,20 @@
+dist: trusty
+
+language: java
+
+matrix:
+ include:
+ # On OSX, run with default JDK only.
+ # - os: osx
+ # On Linux, run with specific JDKs only.
+ - os: linux
+ env: CUSTOM_JDK="oraclejdk8"
+
+before_install:
+ - echo 'MAVEN_OPTS="$MAVEN_OPTS -Xmx1024m -XX:MaxPermSize=512m -XX:+BytecodeVerificationLocal"' >> ~/.mavenrc
+ - cat ~/.mavenrc
+ - if [ "$TRAVIS_OS_NAME" == "osx" ]; then export JAVA_HOME=$(/usr/libexec/java_home); fi
+ - if [ "$TRAVIS_OS_NAME" == "linux" ]; then jdk_switcher use "$CUSTOM_JDK"; fi
+
+script:
+ - travis_retry mvn clean install
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java b/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
index b36fdbe..b82f454 100644
--- a/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
+++ b/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
@@ -93,23 +93,24 @@ public class MetricsCollectTask {
private BlockingQueue<Runnable> collectClientTaskBlockQueue;
@Bean(name = "collectClientMetricExecutor")
- private ExecutorService collectClientMetricExecutor(CollectClientMetricExecutorConfig collectClientMetricExecutorConfig) {
+ private ExecutorService collectClientMetricExecutor(
+ CollectClientMetricExecutorConfig collectClientMetricExecutorConfig) {
collectClientTaskBlockQueue = new LinkedBlockingDeque<Runnable>(collectClientMetricExecutorConfig.getQueueSize());
ExecutorService executorService = new ClientMetricCollectorFixedThreadPoolExecutor(
- collectClientMetricExecutorConfig.getCorePoolSize(),
- collectClientMetricExecutorConfig.getMaximumPoolSize(),
- collectClientMetricExecutorConfig.getKeepAliveTime(),
- TimeUnit.MILLISECONDS,
- this.collectClientTaskBlockQueue,
- new ThreadFactory() {
- private final AtomicLong threadIndex = new AtomicLong(0);
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "collectClientMetricThread_" + this.threadIndex.incrementAndGet());
- }
- },
- new ThreadPoolExecutor.DiscardOldestPolicy()
+ collectClientMetricExecutorConfig.getCorePoolSize(),
+ collectClientMetricExecutorConfig.getMaximumPoolSize(),
+ collectClientMetricExecutorConfig.getKeepAliveTime(),
+ TimeUnit.MILLISECONDS,
+ this.collectClientTaskBlockQueue,
+ new ThreadFactory() {
+ private final AtomicLong threadIndex = new AtomicLong(0);
+
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "collectClientMetricThread_" + this.threadIndex.incrementAndGet());
+ }
+ },
+ new ThreadPoolExecutor.DiscardOldestPolicy()
);
return executorService;
}
@@ -131,7 +132,7 @@ public class MetricsCollectTask {
}
log.info(infoOut.toString());
if (clusterName == null) {
- log.error("get cluster info error" );
+ log.error("get cluster info error");
}
log.info(String.format("MetricsCollectTask init finished....cost:%d", System.currentTimeMillis() - start));
}
@@ -148,13 +149,13 @@ public class MetricsCollectTask {
topicList = mqAdminExt.fetchAllTopicList();
} catch (Exception ex) {
log.error(String.format("collectTopicOffset-exception comes getting topic list from namesrv, address is %s",
- JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
+ JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
return;
}
Set<String> topicSet = topicList != null ? topicList.getTopicList() : null;
if (topicSet == null || topicSet.isEmpty()) {
log.error(String.format("collectTopicOffset-the topic list is empty. the namesrv address is %s",
- JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
+ JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
return;
}
@@ -164,8 +165,8 @@ public class MetricsCollectTask {
topicStats = mqAdminExt.examineTopicStats(topic);
} catch (Exception ex) {
log.error(String.format("collectTopicOffset-getting topic(%s) stats error. the namesrv address is %s",
- topic,
- JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
+ topic,
+ JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
continue;
}
@@ -194,7 +195,7 @@ public class MetricsCollectTask {
Set<Map.Entry<String, Long>> brokerOffsetEntries = brokerOffsetMap.entrySet();
for (Map.Entry<String, Long> brokerOffsetEntry : brokerOffsetEntries) {
metricsService.getCollector().addTopicOffsetMetric(clusterName, brokerOffsetEntry.getKey(), topic,
- brokerUpdateTimestampMap.get(brokerOffsetEntry.getKey()), brokerOffsetEntry.getValue());
+ brokerUpdateTimestampMap.get(brokerOffsetEntry.getKey()), brokerOffsetEntry.getValue());
}
}
log.info("topic offset collection task finished...." + (System.currentTimeMillis() - start));
@@ -212,11 +213,10 @@ public class MetricsCollectTask {
topicList = mqAdminExt.fetchAllTopicList();
} catch (Exception ex) {
log.error(String.format("collectConsumerOffset-fetch topic list from namesrv error, the address is %s",
- JSON.toJSONString(mqAdminExt.getNameServerAddressList())), ex);
+ JSON.toJSONString(mqAdminExt.getNameServerAddressList())), ex);
return;
}
-
Set<String> topicSet = topicList.getTopicList();
for (String topic : topicSet) {
GroupList groupList = null;
@@ -237,7 +237,6 @@ public class MetricsCollectTask {
continue;
}
-
for (String group : groupList.getGroupList()) {
ConsumeStats consumeStats = null;
ConsumerConnection onlineConsumers = null;
@@ -273,12 +272,12 @@ public class MetricsCollectTask {
}
if (countOfOnlineConsumers > 0) {
collectClientMetricExecutor.submit(new ClientMetricTaskRunnable(
- group,
- onlineConsumers,
- false,
- this.mqAdminExt,
- log,
- this.metricsService
+ group,
+ onlineConsumers,
+ false,
+ this.mqAdminExt,
+ log,
+ this.metricsService
));
}
try {
@@ -298,11 +297,11 @@ public class MetricsCollectTask {
diff = consumeStats.computeTotalDiff();
consumeTPS = consumeStats.getConsumeTps();
metricsService.getCollector().addGroupDiffMetric(
- String.valueOf(countOfOnlineConsumers),
- group,
- topic,
- String.valueOf(messageModel.ordinal()),
- diff
+ String.valueOf(countOfOnlineConsumers),
+ group,
+ topic,
+ String.valueOf(messageModel.ordinal()),
+ diff
);
//metricsService.getCollector().addGroupConsumeTPSMetric(topic, group, consumeTPS);
}
@@ -320,7 +319,7 @@ public class MetricsCollectTask {
}
for (Map.Entry<String, Long> consumeOffsetEntry : consumeOffsetMap.entrySet()) {
metricsService.getCollector().addGroupBrokerTotalOffsetMetric(clusterName,
- consumeOffsetEntry.getKey(), topic, group, consumeOffsetEntry.getValue());
+ consumeOffsetEntry.getKey(), topic, group, consumeOffsetEntry.getValue());
}
} catch (Exception ex) {
log.warn("addGroupBrokerTotalOffsetMetric error", ex);
@@ -353,7 +352,7 @@ public class MetricsCollectTask {
}
for (Map.Entry<String, Long> consumeLatencyEntry : consumerLatencyMap.entrySet()) {
metricsService.getCollector().addGroupGetLatencyByStoreTimeMetric(clusterName,
- consumeLatencyEntry.getKey(), topic, group, consumeLatencyEntry.getValue());
+ consumeLatencyEntry.getKey(), topic, group, consumeLatencyEntry.getValue());
}
} catch (Exception ex) {
@@ -377,7 +376,7 @@ public class MetricsCollectTask {
topicSet = topicList.getTopicList();
} catch (Exception ex) {
log.error(String.format("collectBrokerStatsTopic-fetch topic list from namesrv error, the address is %s",
- JSON.toJSONString(mqAdminExt.getNameServerAddressList())), ex);
+ JSON.toJSONString(mqAdminExt.getNameServerAddressList())), ex);
return;
}
if (topicSet == null || topicSet.isEmpty()) {
@@ -388,7 +387,7 @@ public class MetricsCollectTask {
clusterInfo = mqAdminExt.examineBrokerClusterInfo();
} catch (Exception ex) {
log.error(String.format("collectBrokerStatsTopic-fetch cluster info exception, the address is %s",
- JSON.toJSONString(mqAdminExt.getNameServerAddressList())), ex);
+ JSON.toJSONString(mqAdminExt.getNameServerAddressList())), ex);
return;
}
@@ -413,11 +412,11 @@ public class MetricsCollectTask {
bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.TOPIC_PUT_NUMS, topic);
String brokerIP = clusterInfo.getBrokerAddrTable().get(bd.getBrokerName()).getBrokerAddrs().get(MixAll.MASTER_ID);
metricsService.getCollector().addTopicPutNumsMetric(
- bd.getCluster(),
- bd.getBrokerName(),
- brokerIP,
- topic,
- Utils.getFixedDouble(bsd.getStatsMinute().getTps())
+ bd.getCluster(),
+ bd.getBrokerName(),
+ brokerIP,
+ topic,
+ Utils.getFixedDouble(bsd.getStatsMinute().getTps())
);
} catch (MQClientException ex) {
if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
@@ -433,11 +432,11 @@ public class MetricsCollectTask {
bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.TOPIC_PUT_SIZE, topic);
String brokerIP = clusterInfo.getBrokerAddrTable().get(bd.getBrokerName()).getBrokerAddrs().get(MixAll.MASTER_ID);
metricsService.getCollector().addTopicPutSizeMetric(
- bd.getCluster(),
- bd.getBrokerName(),
- brokerIP,
- topic,
- Utils.getFixedDouble(bsd.getStatsMinute().getTps())
+ bd.getCluster(),
+ bd.getBrokerName(),
+ brokerIP,
+ topic,
+ Utils.getFixedDouble(bsd.getStatsMinute().getTps())
);
} catch (MQClientException ex) {
if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
@@ -472,11 +471,11 @@ public class MetricsCollectTask {
//how many messages the consumer has get for the topic
bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.GROUP_GET_NUMS, statsKey);
metricsService.getCollector().addGroupGetNumsMetric(
- bd.getCluster(),
- bd.getBrokerName(),
- topic,
- group,
- Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
+ bd.getCluster(),
+ bd.getBrokerName(),
+ topic,
+ group,
+ Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
} catch (MQClientException ex) {
if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
log.error(String.format("GROUP_GET_NUMS-error, topic=%s, group=%s,master broker=%s, %s", topic, group, masterAddr, ex.getErrorMessage()));
@@ -490,11 +489,11 @@ public class MetricsCollectTask {
//how many bytes the consumer has get for the topic
bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.GROUP_GET_SIZE, statsKey);
metricsService.getCollector().addGroupGetSizeMetric(
- bd.getCluster(),
- bd.getBrokerName(),
- topic,
- group,
- Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
+ bd.getCluster(),
+ bd.getBrokerName(),
+ topic,
+ group,
+ Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
} catch (MQClientException ex) {
if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
log.error(String.format("GROUP_GET_SIZE-error, topic=%s, group=%s, master broker=%s, %s", topic, group, masterAddr, ex.getErrorMessage()));
@@ -508,11 +507,11 @@ public class MetricsCollectTask {
////how many re-send times the consumer did for the topic
bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.SNDBCK_PUT_NUMS, statsKey);
metricsService.getCollector().addSendBackNumsMetric(
- bd.getCluster(),
- bd.getBrokerName(),
- topic,
- group,
- Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
+ bd.getCluster(),
+ bd.getBrokerName(),
+ topic,
+ group,
+ Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
} catch (MQClientException ex) {
if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
log.error(String.format("SNDBCK_PUT_NUMS-error, topic=%s, group=%s, master broker=%s, %s", topic, group, masterAddr, ex.getErrorMessage()));
@@ -555,10 +554,10 @@ public class MetricsCollectTask {
bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.BROKER_PUT_NUMS, clusterEntry.getValue().getCluster());
String brokerIP = clusterEntry.getValue().getBrokerAddrs().get(MixAll.MASTER_ID);
metricsService.getCollector().addBrokerPutNumsMetric(
- clusterEntry.getValue().getCluster(),
- brokerIP,
- clusterEntry.getValue().getBrokerName(),
- Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
+ clusterEntry.getValue().getCluster(),
+ brokerIP,
+ clusterEntry.getValue().getBrokerName(),
+ Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
} catch (Exception ex) {
log.error(String.format("BROKER_PUT_NUMS-error, master broker=%s", masterAddr), ex);
}
@@ -566,10 +565,10 @@ public class MetricsCollectTask {
bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.BROKER_GET_NUMS, clusterEntry.getValue().getCluster());
String brokerIP = clusterEntry.getValue().getBrokerAddrs().get(MixAll.MASTER_ID);
metricsService.getCollector().addBrokerGetNumsMetric(
- clusterEntry.getValue().getCluster(),
- brokerIP,
- clusterEntry.getValue().getBrokerName(),
- Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
+ clusterEntry.getValue().getCluster(),
+ brokerIP,
+ clusterEntry.getValue().getBrokerName(),
+ Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
} catch (Exception ex) {
log.error(String.format("BROKER_GET_NUMS-error, master broker=%s", masterAddr), ex);
}