You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2020/10/27 03:48:44 UTC

[rocketmq-exporter] branch master updated: chore(code-style): fix code style issue to avoid build error and add travis CI.

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-exporter.git


The following commit(s) were added to refs/heads/master by this push:
     new 475579b  chore(code-style): fix code style issue to avoid build error and add travis CI.
     new e834d3f  Merge pull request #39 from RongtongJin/master
475579b is described below

commit 475579b7d49e5199db36dc2581d5ddb735bff5e9
Author: RongtongJin <79...@qq.com>
AuthorDate: Tue Oct 27 11:10:00 2020 +0800

    chore(code-style): fix code style issue to avoid build error and add travis CI.
---
 .travis.yaml                                       |  20 +++
 .../rocketmq/exporter/task/MetricsCollectTask.java | 143 ++++++++++-----------
 2 files changed, 91 insertions(+), 72 deletions(-)

diff --git a/.travis.yaml b/.travis.yaml
new file mode 100644
index 0000000..686026c
--- /dev/null
+++ b/.travis.yaml
@@ -0,0 +1,20 @@
+dist: trusty
+
+language: java
+
+matrix:
+  include:
+    # On OSX, run with default JDK only.
+    # - os: osx
+    # On Linux, run with specific JDKs only.
+    - os: linux
+      env: CUSTOM_JDK="oraclejdk8"
+
+before_install:
+  - echo 'MAVEN_OPTS="$MAVEN_OPTS -Xmx1024m -XX:MaxPermSize=512m -XX:+BytecodeVerificationLocal"' >> ~/.mavenrc
+  - cat ~/.mavenrc
+  - if [ "$TRAVIS_OS_NAME" == "osx" ]; then export JAVA_HOME=$(/usr/libexec/java_home); fi
+  - if [ "$TRAVIS_OS_NAME" == "linux" ]; then jdk_switcher use "$CUSTOM_JDK"; fi
+
+script:
+  - travis_retry mvn clean install
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java b/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
index b36fdbe..b82f454 100644
--- a/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
+++ b/src/main/java/org/apache/rocketmq/exporter/task/MetricsCollectTask.java
@@ -93,23 +93,24 @@ public class MetricsCollectTask {
     private BlockingQueue<Runnable> collectClientTaskBlockQueue;
 
     @Bean(name = "collectClientMetricExecutor")
-    private ExecutorService collectClientMetricExecutor(CollectClientMetricExecutorConfig collectClientMetricExecutorConfig) {
+    private ExecutorService collectClientMetricExecutor(
+        CollectClientMetricExecutorConfig collectClientMetricExecutorConfig) {
         collectClientTaskBlockQueue = new LinkedBlockingDeque<Runnable>(collectClientMetricExecutorConfig.getQueueSize());
         ExecutorService executorService = new ClientMetricCollectorFixedThreadPoolExecutor(
-                collectClientMetricExecutorConfig.getCorePoolSize(),
-                collectClientMetricExecutorConfig.getMaximumPoolSize(),
-                collectClientMetricExecutorConfig.getKeepAliveTime(),
-                TimeUnit.MILLISECONDS,
-                this.collectClientTaskBlockQueue,
-                new ThreadFactory() {
-                    private final AtomicLong threadIndex = new AtomicLong(0);
-
-                    @Override
-                    public Thread newThread(Runnable r) {
-                        return new Thread(r, "collectClientMetricThread_" + this.threadIndex.incrementAndGet());
-                    }
-                },
-                new ThreadPoolExecutor.DiscardOldestPolicy()
+            collectClientMetricExecutorConfig.getCorePoolSize(),
+            collectClientMetricExecutorConfig.getMaximumPoolSize(),
+            collectClientMetricExecutorConfig.getKeepAliveTime(),
+            TimeUnit.MILLISECONDS,
+            this.collectClientTaskBlockQueue,
+            new ThreadFactory() {
+                private final AtomicLong threadIndex = new AtomicLong(0);
+
+                @Override
+                public Thread newThread(Runnable r) {
+                    return new Thread(r, "collectClientMetricThread_" + this.threadIndex.incrementAndGet());
+                }
+            },
+            new ThreadPoolExecutor.DiscardOldestPolicy()
         );
         return executorService;
     }
@@ -131,7 +132,7 @@ public class MetricsCollectTask {
         }
         log.info(infoOut.toString());
         if (clusterName == null) {
-            log.error("get cluster info error" );
+            log.error("get cluster info error");
         }
         log.info(String.format("MetricsCollectTask init finished....cost:%d", System.currentTimeMillis() - start));
     }
@@ -148,13 +149,13 @@ public class MetricsCollectTask {
             topicList = mqAdminExt.fetchAllTopicList();
         } catch (Exception ex) {
             log.error(String.format("collectTopicOffset-exception comes getting topic list from namesrv, address is %s",
-                    JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
+                JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
             return;
         }
         Set<String> topicSet = topicList != null ? topicList.getTopicList() : null;
         if (topicSet == null || topicSet.isEmpty()) {
             log.error(String.format("collectTopicOffset-the topic list is empty. the namesrv address is %s",
-                    JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
+                JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
             return;
         }
 
@@ -164,8 +165,8 @@ public class MetricsCollectTask {
                 topicStats = mqAdminExt.examineTopicStats(topic);
             } catch (Exception ex) {
                 log.error(String.format("collectTopicOffset-getting topic(%s) stats error. the namesrv address is %s",
-                        topic,
-                        JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
+                    topic,
+                    JSON.toJSONString(mqAdminExt.getNameServerAddressList())));
                 continue;
             }
 
@@ -194,7 +195,7 @@ public class MetricsCollectTask {
             Set<Map.Entry<String, Long>> brokerOffsetEntries = brokerOffsetMap.entrySet();
             for (Map.Entry<String, Long> brokerOffsetEntry : brokerOffsetEntries) {
                 metricsService.getCollector().addTopicOffsetMetric(clusterName, brokerOffsetEntry.getKey(), topic,
-                        brokerUpdateTimestampMap.get(brokerOffsetEntry.getKey()), brokerOffsetEntry.getValue());
+                    brokerUpdateTimestampMap.get(brokerOffsetEntry.getKey()), brokerOffsetEntry.getValue());
             }
         }
         log.info("topic offset collection task finished...." + (System.currentTimeMillis() - start));
@@ -212,11 +213,10 @@ public class MetricsCollectTask {
             topicList = mqAdminExt.fetchAllTopicList();
         } catch (Exception ex) {
             log.error(String.format("collectConsumerOffset-fetch topic list from namesrv error, the address is %s",
-                    JSON.toJSONString(mqAdminExt.getNameServerAddressList())), ex);
+                JSON.toJSONString(mqAdminExt.getNameServerAddressList())), ex);
             return;
         }
 
-
         Set<String> topicSet = topicList.getTopicList();
         for (String topic : topicSet) {
             GroupList groupList = null;
@@ -237,7 +237,6 @@ public class MetricsCollectTask {
                 continue;
             }
 
-
             for (String group : groupList.getGroupList()) {
                 ConsumeStats consumeStats = null;
                 ConsumerConnection onlineConsumers = null;
@@ -273,12 +272,12 @@ public class MetricsCollectTask {
                 }
                 if (countOfOnlineConsumers > 0) {
                     collectClientMetricExecutor.submit(new ClientMetricTaskRunnable(
-                            group,
-                            onlineConsumers,
-                            false,
-                            this.mqAdminExt,
-                            log,
-                            this.metricsService
+                        group,
+                        onlineConsumers,
+                        false,
+                        this.mqAdminExt,
+                        log,
+                        this.metricsService
                     ));
                 }
                 try {
@@ -298,11 +297,11 @@ public class MetricsCollectTask {
                     diff = consumeStats.computeTotalDiff();
                     consumeTPS = consumeStats.getConsumeTps();
                     metricsService.getCollector().addGroupDiffMetric(
-                            String.valueOf(countOfOnlineConsumers),
-                            group,
-                            topic,
-                            String.valueOf(messageModel.ordinal()),
-                            diff
+                        String.valueOf(countOfOnlineConsumers),
+                        group,
+                        topic,
+                        String.valueOf(messageModel.ordinal()),
+                        diff
                     );
                     //metricsService.getCollector().addGroupConsumeTPSMetric(topic, group, consumeTPS);
                 }
@@ -320,7 +319,7 @@ public class MetricsCollectTask {
                     }
                     for (Map.Entry<String, Long> consumeOffsetEntry : consumeOffsetMap.entrySet()) {
                         metricsService.getCollector().addGroupBrokerTotalOffsetMetric(clusterName,
-                                consumeOffsetEntry.getKey(), topic, group, consumeOffsetEntry.getValue());
+                            consumeOffsetEntry.getKey(), topic, group, consumeOffsetEntry.getValue());
                     }
                 } catch (Exception ex) {
                     log.warn("addGroupBrokerTotalOffsetMetric error", ex);
@@ -353,7 +352,7 @@ public class MetricsCollectTask {
                     }
                     for (Map.Entry<String, Long> consumeLatencyEntry : consumerLatencyMap.entrySet()) {
                         metricsService.getCollector().addGroupGetLatencyByStoreTimeMetric(clusterName,
-                                consumeLatencyEntry.getKey(), topic, group, consumeLatencyEntry.getValue());
+                            consumeLatencyEntry.getKey(), topic, group, consumeLatencyEntry.getValue());
                     }
 
                 } catch (Exception ex) {
@@ -377,7 +376,7 @@ public class MetricsCollectTask {
             topicSet = topicList.getTopicList();
         } catch (Exception ex) {
             log.error(String.format("collectBrokerStatsTopic-fetch topic list from namesrv error, the address is %s",
-                    JSON.toJSONString(mqAdminExt.getNameServerAddressList())), ex);
+                JSON.toJSONString(mqAdminExt.getNameServerAddressList())), ex);
             return;
         }
         if (topicSet == null || topicSet.isEmpty()) {
@@ -388,7 +387,7 @@ public class MetricsCollectTask {
             clusterInfo = mqAdminExt.examineBrokerClusterInfo();
         } catch (Exception ex) {
             log.error(String.format("collectBrokerStatsTopic-fetch cluster info exception, the address is %s",
-                    JSON.toJSONString(mqAdminExt.getNameServerAddressList())), ex);
+                JSON.toJSONString(mqAdminExt.getNameServerAddressList())), ex);
             return;
         }
 
@@ -413,11 +412,11 @@ public class MetricsCollectTask {
                         bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.TOPIC_PUT_NUMS, topic);
                         String brokerIP = clusterInfo.getBrokerAddrTable().get(bd.getBrokerName()).getBrokerAddrs().get(MixAll.MASTER_ID);
                         metricsService.getCollector().addTopicPutNumsMetric(
-                                bd.getCluster(),
-                                bd.getBrokerName(),
-                                brokerIP,
-                                topic,
-                                Utils.getFixedDouble(bsd.getStatsMinute().getTps())
+                            bd.getCluster(),
+                            bd.getBrokerName(),
+                            brokerIP,
+                            topic,
+                            Utils.getFixedDouble(bsd.getStatsMinute().getTps())
                         );
                     } catch (MQClientException ex) {
                         if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
@@ -433,11 +432,11 @@ public class MetricsCollectTask {
                         bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.TOPIC_PUT_SIZE, topic);
                         String brokerIP = clusterInfo.getBrokerAddrTable().get(bd.getBrokerName()).getBrokerAddrs().get(MixAll.MASTER_ID);
                         metricsService.getCollector().addTopicPutSizeMetric(
-                                bd.getCluster(),
-                                bd.getBrokerName(),
-                                brokerIP,
-                                topic,
-                                Utils.getFixedDouble(bsd.getStatsMinute().getTps())
+                            bd.getCluster(),
+                            bd.getBrokerName(),
+                            brokerIP,
+                            topic,
+                            Utils.getFixedDouble(bsd.getStatsMinute().getTps())
                         );
                     } catch (MQClientException ex) {
                         if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
@@ -472,11 +471,11 @@ public class MetricsCollectTask {
                             //how many messages the consumer has get for the topic
                             bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.GROUP_GET_NUMS, statsKey);
                             metricsService.getCollector().addGroupGetNumsMetric(
-                                    bd.getCluster(),
-                                    bd.getBrokerName(),
-                                    topic,
-                                    group,
-                                    Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
+                                bd.getCluster(),
+                                bd.getBrokerName(),
+                                topic,
+                                group,
+                                Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
                         } catch (MQClientException ex) {
                             if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
                                 log.error(String.format("GROUP_GET_NUMS-error, topic=%s, group=%s,master broker=%s, %s", topic, group, masterAddr, ex.getErrorMessage()));
@@ -490,11 +489,11 @@ public class MetricsCollectTask {
                             //how many bytes the consumer has get for the topic
                             bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.GROUP_GET_SIZE, statsKey);
                             metricsService.getCollector().addGroupGetSizeMetric(
-                                    bd.getCluster(),
-                                    bd.getBrokerName(),
-                                    topic,
-                                    group,
-                                    Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
+                                bd.getCluster(),
+                                bd.getBrokerName(),
+                                topic,
+                                group,
+                                Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
                         } catch (MQClientException ex) {
                             if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
                                 log.error(String.format("GROUP_GET_SIZE-error, topic=%s, group=%s, master broker=%s, %s", topic, group, masterAddr, ex.getErrorMessage()));
@@ -508,11 +507,11 @@ public class MetricsCollectTask {
                             ////how many re-send times the consumer did for the topic
                             bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.SNDBCK_PUT_NUMS, statsKey);
                             metricsService.getCollector().addSendBackNumsMetric(
-                                    bd.getCluster(),
-                                    bd.getBrokerName(),
-                                    topic,
-                                    group,
-                                    Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
+                                bd.getCluster(),
+                                bd.getBrokerName(),
+                                topic,
+                                group,
+                                Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
                         } catch (MQClientException ex) {
                             if (ex.getResponseCode() == ResponseCode.SYSTEM_ERROR) {
                                 log.error(String.format("SNDBCK_PUT_NUMS-error, topic=%s, group=%s, master broker=%s, %s", topic, group, masterAddr, ex.getErrorMessage()));
@@ -555,10 +554,10 @@ public class MetricsCollectTask {
                 bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.BROKER_PUT_NUMS, clusterEntry.getValue().getCluster());
                 String brokerIP = clusterEntry.getValue().getBrokerAddrs().get(MixAll.MASTER_ID);
                 metricsService.getCollector().addBrokerPutNumsMetric(
-                        clusterEntry.getValue().getCluster(),
-                        brokerIP,
-                        clusterEntry.getValue().getBrokerName(),
-                        Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
+                    clusterEntry.getValue().getCluster(),
+                    brokerIP,
+                    clusterEntry.getValue().getBrokerName(),
+                    Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
             } catch (Exception ex) {
                 log.error(String.format("BROKER_PUT_NUMS-error, master broker=%s", masterAddr), ex);
             }
@@ -566,10 +565,10 @@ public class MetricsCollectTask {
                 bsd = mqAdminExt.viewBrokerStatsData(masterAddr, BrokerStatsManager.BROKER_GET_NUMS, clusterEntry.getValue().getCluster());
                 String brokerIP = clusterEntry.getValue().getBrokerAddrs().get(MixAll.MASTER_ID);
                 metricsService.getCollector().addBrokerGetNumsMetric(
-                        clusterEntry.getValue().getCluster(),
-                        brokerIP,
-                        clusterEntry.getValue().getBrokerName(),
-                        Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
+                    clusterEntry.getValue().getCluster(),
+                    brokerIP,
+                    clusterEntry.getValue().getBrokerName(),
+                    Utils.getFixedDouble(bsd.getStatsMinute().getTps()));
             } catch (Exception ex) {
                 log.error(String.format("BROKER_GET_NUMS-error, master broker=%s", masterAddr), ex);
             }