You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2021/11/15 12:24:46 UTC

[GitHub] [incubator-inlong] luchunliang opened a new pull request #1797: [Feature]DataProxy support monitor indicator with JMX. #1796

luchunliang opened a new pull request #1797:
URL: https://github.com/apache/incubator-inlong/pull/1797


   ### Title Name: [INLONG-1796][component] DataProxy support monitor indicator with JMX. #1796
   
   [Feature]DataProxy support monitor indicator with JMX. #1796
   
   Fixes #<1796>
   
   ### Motivation
   
   [Feature]DataProxy provide monitor indicator based on JMX, user can implement the code that read the metrics and report to user-defined monitor system.
   
   DataProxy provide monitor indicator based on JMX, user can implement the code that read the metrics and report to user-defined monitor system.
   Source-module and Sink-module can add monitor metric class that is the subclass of org.apache.inlong.commons.config.metrics.MetricItemSet, and register it to MBeanServer.
   User-defined plugin can get module metric with JMX, and report metric data to different monitor system.
   
   User can describe the configuration in the file "common.properties ".
   For example:
   `
   metricDomains=DataProxy
   metricDomains.DataProxy.domainListeners=com.tencent.pcg.atta.dataproxy.metrics.m007.M007MetricListener org.apache.inlong.dataproxy.metrics.prometheus.PrometheusMetricListener
   metricDomains.DataProxy.snapshotInterval=60000
   `
   
   The JMX domain name of DataProxy is "DataProxy". 
   It is defined by the parameter "metricDomains".
   The listeners of JMX domain is defined by the parameter "metricDomains.$domainName.domainListeners".
   The class names of the listeners is separated by the space char. 
   The listener class need to implement the interface "org.apache.inlong.dataproxy.metrics.MetricListener".
   The snapshot interval of the listeners is defined by the parameter "metricDomains.$domainName.snapshotInterval", the parameter unit is "millisecond".
   
   The method proto of org.apache.inlong.dataproxy.metrics.MetricListener is:
   public void snapshot(String domain, List<MetricItemValue> itemValues);
   
   The field of MetricItemValue.dimensions has these key(The fields of DataProxyMetricItem defined by the Annotation "@Dimension"):
   
   - public String clusterId;
   - public String sourceId;
   - public String sourceDataId;
   - public String inlongGroupId;
   - public String inlongStreamId;
   - public String sinkId;
   - public String sinkDataId;
   
   The field of MetricItemValue.metrics has these key(The fields of DataProxyMetricItem defined by the Annotation "@CountMetric"):
   
   - readSuccessCount
   - readSuccessSize
   - readSuccessCount
   - readSuccessSize
   - readFailCount
   - readFailSize
   - sendCount
   - sendSize
   - sendSuccessCount
   - sendSuccessSize
   - sendFailCount
   - sendFailSize
   - sinkDuration, the unit is millisecond, the duration is between current timepoint and the timepoint in sending to sink destination.
   - nodeDuration, the unit is millisecond, the duration is between current timepoint and the timepoint in getting event from source.
   - wholeDuration, the unit is millisecond, the duration is between current timepoint and the timepoint in generating event.
   
   
   ### Modifications
   
   *Describe the modifications you've done.*
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
     - If a feature is not applicable for documentation, explain why?
     - If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] luchunliang commented on a change in pull request #1797: [INLONG-1796]DataProxy support monitor indicator with JMX. #1796

Posted by GitBox <gi...@apache.org>.
luchunliang commented on a change in pull request #1797:
URL: https://github.com/apache/incubator-inlong/pull/1797#discussion_r750303899



##########
File path: inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ContextCacheClusterConfigLoader.java
##########
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.loader;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.flume.Context;
+import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
+import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
+
+/**
+ * 
+ * ContextCacheClusterConfigLoader
+ */
+public class ContextCacheClusterConfigLoader implements CacheClusterConfigLoader {
+
+    private Context context;
+
+    /**
+     * load
+     * 
+     * @return
+     */
+    @Override
+    public List<CacheClusterConfig> load() {
+        String clusterNames = context.getString("cacheClusterConfig");

Review comment:
       fixed, extract to constant.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] dockerzhang commented on pull request #1797: [INLONG-1796] DataProxy support monitor indicator with JMX. #1796

Posted by GitBox <gi...@apache.org>.
dockerzhang commented on pull request #1797:
URL: https://github.com/apache/incubator-inlong/pull/1797#issuecomment-969734690


   @luchunliang please add some UTs to verify this feature.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] aloyszhang commented on a change in pull request #1797: [INLONG-1796]DataProxy support monitor indicator with JMX. #1796

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on a change in pull request #1797:
URL: https://github.com/apache/incubator-inlong/pull/1797#discussion_r750081208



##########
File path: inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationSink.java
##########
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.sink.pulsar.federation;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * PulsarSetSink
+ */
+public class PulsarFederationSink extends AbstractSink implements Configurable {
+
+    public static final Logger LOG = LoggerFactory.getLogger(PulsarFederationSink.class);
+
+    private PulsarFederationSinkContext context;
+    private List<PulsarFederationWorker> workers = new ArrayList<>();
+    private Map<String, String> dimensions;
+
+    /**
+     * start
+     */
+    @Override
+    public void start() {
+        String sinkName = this.getName();
+        // create worker
+        for (int i = 0; i < context.getMaxThreads(); i++) {
+            PulsarFederationWorker worker = new PulsarFederationWorker(sinkName, i, context);
+            worker.start();
+            this.workers.add(worker);
+        }
+        super.start();
+    }
+
+    /**
+     * stop
+     */
+    @Override
+    public void stop() {
+        for (PulsarFederationWorker worker : workers) {
+            try {
+                worker.close();
+            } catch (Throwable e) {
+                LOG.error(e.getMessage(), e);
+            }
+        }
+        this.context.close();
+        super.stop();
+    }
+
+    /**
+     * configure
+     * 
+     * @param context
+     */
+    @Override
+    public void configure(Context context) {
+        LOG.info("start to configure:{}, context:{}.", this.getClass().getSimpleName(), context.toString());
+        this.context = new PulsarFederationSinkContext(context);
+        this.dimensions = new HashMap<>();
+        this.dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.context.getProxyClusterId());
+        this.dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getName());
+    }
+
+    /**
+     * process
+     * 
+     * @return                        Status
+     * @throws EventDeliveryException
+     */
+    @Override
+    public Status process() throws EventDeliveryException {
+        Channel channel = getChannel();
+        Transaction tx = channel.getTransaction();
+        tx.begin();
+        try {
+            Event event = channel.take();
+            if (event == null) {
+                tx.commit();
+                return Status.BACKOFF;
+            }
+            //
+            int eventSize = event.getBody().length;
+            if (!this.context.getBufferQueue().tryAcquire(eventSize)) {
+                // record the failure of queue full for monitor
+                // metric
+                DataProxyMetricItem metricItem = this.context.getMetricItemSet().findMetricItem(dimensions);
+                metricItem.readFailCount.incrementAndGet();
+                metricItem.readFailSize.addAndGet(eventSize);
+                //
+                tx.rollback();
+                return Status.BACKOFF;
+            }
+            this.context.getBufferQueue().offer(event);
+            tx.commit();
+            return Status.READY;
+        } catch (Throwable t) {
+            LOG.error("Process event failed!" + this.getName(), t);
+            try {
+                tx.rollback();
+                // metric
+                DataProxyMetricItem metricItem = this.context.getMetricItemSet().findMetricItem(dimensions);
+                metricItem.readFailCount.incrementAndGet();
+            } catch (Throwable e) {
+                LOG.error("Channel take transaction rollback exception:" + getName(), e);
+            }
+            return Status.BACKOFF;
+        } finally {
+            tx.close();
+        }
+//        Channel channel = getChannel();

Review comment:
       remove un-used code




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] inter12 commented on a change in pull request #1797: [INLONG-1796]DataProxy support monitor indicator with JMX. #1796

Posted by GitBox <gi...@apache.org>.
inter12 commented on a change in pull request #1797:
URL: https://github.com/apache/incubator-inlong/pull/1797#discussion_r750056197



##########
File path: inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CacheClusterConfigHolder.java
##########
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.holder;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.commons.lang.ClassUtils;
+import org.apache.flume.Context;
+import org.apache.flume.conf.Configurable;
+import org.apache.inlong.dataproxy.config.loader.CacheClusterConfigLoader;
+import org.apache.inlong.dataproxy.config.loader.ContextCacheClusterConfigLoader;
+import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * CacheClusterConfigHolder
+ */
+public class CacheClusterConfigHolder implements Configurable {
+
+    public static final Logger LOG = LoggerFactory.getLogger(CacheClusterConfigHolder.class);
+
+    protected Context context;
+    private long reloadInterval;
+    private Timer reloadTimer;
+    private CacheClusterConfigLoader loader;
+
+    private List<CacheClusterConfig> configList;
+
+    /**
+     * configure
+     * 
+     * @param context
+     */
+    @Override
+    public void configure(Context context) {
+        this.context = context;
+        this.reloadInterval = context.getLong("reloadInterval", 60000L);
+        String loaderType = context.getString("cacheClusterConfig.type",
+                ContextCacheClusterConfigLoader.class.getName());
+        try {
+            Class<?> loaderClass = ClassUtils.getClass(loaderType);
+            Object loaderObject = loaderClass.getDeclaredConstructor().newInstance();
+            if (loaderObject instanceof CacheClusterConfigLoader) {
+                this.loader = (CacheClusterConfigLoader) loaderObject;
+            }
+        } catch (Throwable t) {
+            LOG.error("Fail to init loader,loaderType:{},error:{}", loaderType, t.getMessage());
+            LOG.error(t.getMessage(), t);
+        }
+        if (this.loader == null) {
+            this.loader = new ContextCacheClusterConfigLoader();
+        }
+        this.loader.configure(context);
+    }
+
+    /**
+     * start
+     */
+    public void start() {
+        try {
+            this.reload();
+            this.setReloadTimer();
+        } catch (Exception e) {
+            e.printStackTrace();
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * close
+     */
+    public void close() {
+        try {
+            this.reloadTimer.cancel();
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * setReloadTimer
+     */
+    private void setReloadTimer() {
+        reloadTimer = new Timer(true);
+        TimerTask task = new TimerTask() {

Review comment:
       TimerTask is not recommend to use . use ScheduledExecutorService instead 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] aloyszhang commented on a change in pull request #1797: [INLONG-1796]DataProxy support monitor indicator with JMX. #1796

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on a change in pull request #1797:
URL: https://github.com/apache/incubator-inlong/pull/1797#discussion_r750079569



##########
File path: inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/MetricListenerRunnable.java
##########
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.metrics;
+
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.management.AttributeNotFoundException;
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
+
+import org.apache.commons.lang.ClassUtils;
+import org.apache.inlong.commons.config.metrics.MetricItem;
+import org.apache.inlong.commons.config.metrics.MetricItemMBean;
+import org.apache.inlong.commons.config.metrics.MetricItemSetMBean;
+import org.apache.inlong.commons.config.metrics.MetricValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * MetricListenerRunnable
+ */
+public class MetricListenerRunnable implements Runnable {
+
+    public static final Logger LOG = LoggerFactory.getLogger(MetricObserver.class);
+
+    private String domain;
+    private List<MetricListener> listenerList;
+
+    /**
+     * Constructor
+     * 
+     * @param domain
+     * @param listenerList
+     */
+    public MetricListenerRunnable(String domain, List<MetricListener> listenerList) {
+        this.domain = domain;
+        this.listenerList = listenerList;
+    }
+
+    /**
+     * run
+     */
+    @Override
+    public void run() {
+        LOG.info("begin to snapshot metric:{}", domain);
+        try {
+            List<MetricItemValue> itemValues = this.getItemValues();
+            LOG.info("snapshot metric:{},size:{}", domain, itemValues.size());
+//            LOG.info("snapshot metric:{},size:{},contents:{}", 

Review comment:
       remove un-used code




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] inter12 commented on a change in pull request #1797: [INLONG-1796]DataProxy support monitor indicator with JMX. #1796

Posted by GitBox <gi...@apache.org>.
inter12 commented on a change in pull request #1797:
URL: https://github.com/apache/incubator-inlong/pull/1797#discussion_r750059647



##########
File path: inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ContextCacheClusterConfigLoader.java
##########
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.loader;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.flume.Context;
+import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
+import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
+
+/**
+ * 
+ * ContextCacheClusterConfigLoader
+ */
+public class ContextCacheClusterConfigLoader implements CacheClusterConfigLoader {
+
+    private Context context;
+
+    /**
+     * load
+     * 
+     * @return
+     */
+    @Override
+    public List<CacheClusterConfig> load() {
+        String clusterNames = context.getString("cacheClusterConfig");

Review comment:
       extract  to constant 

##########
File path: inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ContextIdTopicConfigLoader.java
##########
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.loader;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.flume.Context;
+import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
+
+/**
+ * 
+ * ContextIdTopicConfigLoader
+ */
+public class ContextIdTopicConfigLoader implements IdTopicConfigLoader {
+
+    private Context context;
+
+    /**
+     * load
+     * 
+     * @return
+     */
+    @Override
+    public List<IdTopicConfig> load() {
+        Map<String, String> idTopicMap = context.getSubProperties("idTopicConfig.");
+        List<IdTopicConfig> configList = new ArrayList<>(idTopicMap.size());

Review comment:
       NPE check?

##########
File path: inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ClassResourceCommonPropertiesLoader.java
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.loader;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.URL;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * FileCommonPropertiesLoader
+ */
+public class ClassResourceCommonPropertiesLoader implements CommonPropertiesLoader {
+
+    public static final Logger LOG = LoggerFactory.getLogger(ClassResourceCommonPropertiesLoader.class);
+
+    /**
+     * load
+     * 
+     * @return
+     */
+    @Override
+    public Map<String, String> load() {
+        return this.loadProperties("common.properties");
+    }
+
+    /**
+     * loadProperties
+     * 
+     * @param  fileName
+     * @return
+     */
+    protected Map<String, String> loadProperties(String fileName) {
+        Map<String, String> result = new ConcurrentHashMap<>();
+        InputStream inStream = null;

Review comment:
       recommend to use try-resource 

##########
File path: inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/MetricObserver.java
##########
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.metrics;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.lang.ClassUtils;
+import org.apache.flume.Context;
+import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * MetricObserver
+ */
+public class MetricObserver {
+
+    public static final Logger LOG = LoggerFactory.getLogger(MetricObserver.class);
+    private static final AtomicBoolean isInited = new AtomicBoolean(false);
+    private static ScheduledExecutorService statExecutor = Executors.newScheduledThreadPool(5);

Review comment:
       should init with thead name for online trouble shooting 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] codecov-commenter commented on pull request #1797: [Feature]DataProxy support monitor indicator with JMX. #1796

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #1797:
URL: https://github.com/apache/incubator-inlong/pull/1797#issuecomment-968876147


   # [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/1797?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#1797](https://codecov.io/gh/apache/incubator-inlong/pull/1797?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (0ffa9dd) into [master](https://codecov.io/gh/apache/incubator-inlong/commit/d0279f6cd179972bd9c091162e1a8280ea3408ea?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (d0279f6) will **decrease** coverage by `0.19%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-inlong/pull/1797/graphs/tree.svg?width=650&height=150&src=pr&token=1EUK92O9K2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-inlong/pull/1797?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #1797      +/-   ##
   ============================================
   - Coverage     12.18%   11.99%   -0.20%     
   - Complexity     1049     1050       +1     
   ============================================
     Files           396      396              
     Lines         32833    33393     +560     
     Branches       5149     5274     +125     
   ============================================
   + Hits           4002     4005       +3     
   - Misses        28065    28621     +556     
   - Partials        766      767       +1     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-inlong/pull/1797?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...ache/inlong/tubemq/server/master/MasterConfig.java](https://codecov.io/gh/apache/incubator-inlong/pull/1797/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtc2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9pbmxvbmcvdHViZW1xL3NlcnZlci9tYXN0ZXIvTWFzdGVyQ29uZmlnLmphdmE=) | `32.63% <0.00%> (-0.50%)` | :arrow_down: |
   | [.../producer/qltystats/DefaultBrokerRcvQltyStats.java](https://codecov.io/gh/apache/incubator-inlong/pull/1797/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtY2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9pbmxvbmcvdHViZW1xL2NsaWVudC9wcm9kdWNlci9xbHR5c3RhdHMvRGVmYXVsdEJyb2tlclJjdlFsdHlTdGF0cy5qYXZh) | `45.31% <0.00%> (-0.40%)` | :arrow_down: |
   | [...rg/apache/inlong/tubemq/server/master/TMaster.java](https://codecov.io/gh/apache/incubator-inlong/pull/1797/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtc2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9pbmxvbmcvdHViZW1xL3NlcnZlci9tYXN0ZXIvVE1hc3Rlci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...ster/nodemanage/nodeconsumer/ConsumeGroupInfo.java](https://codecov.io/gh/apache/incubator-inlong/pull/1797/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtc2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9pbmxvbmcvdHViZW1xL3NlcnZlci9tYXN0ZXIvbm9kZW1hbmFnZS9ub2RlY29uc3VtZXIvQ29uc3VtZUdyb3VwSW5mby5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...aster/nodemanage/nodeconsumer/TopicConfigInfo.java](https://codecov.io/gh/apache/incubator-inlong/pull/1797/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtc2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9pbmxvbmcvdHViZW1xL3NlcnZlci9tYXN0ZXIvbm9kZW1hbmFnZS9ub2RlY29uc3VtZXIvVG9waWNDb25maWdJbmZvLmphdmE=) | | |
   | [...ache/inlong/tubemq/corebase/utils/OpsSyncInfo.java](https://codecov.io/gh/apache/incubator-inlong/pull/1797/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaW5sb25nL3R1YmVtcS9jb3JlYmFzZS91dGlscy9PcHNTeW5jSW5mby5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [.../tubemq/corebase/policies/FlowCtrlRuleHandler.java](https://codecov.io/gh/apache/incubator-inlong/pull/1797/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaW5sb25nL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybFJ1bGVIYW5kbGVyLmphdmE=) | `34.51% <0.00%> (+0.44%)` | :arrow_up: |
   | [.../inlong/tubemq/corebase/policies/FlowCtrlItem.java](https://codecov.io/gh/apache/incubator-inlong/pull/1797/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaW5sb25nL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybEl0ZW0uamF2YQ==) | `40.00% <0.00%> (+1.11%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/1797?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/1797?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [d0279f6...0ffa9dd](https://codecov.io/gh/apache/incubator-inlong/pull/1797?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] luchunliang commented on a change in pull request #1797: [INLONG-1796]DataProxy support monitor indicator with JMX. #1796

Posted by GitBox <gi...@apache.org>.
luchunliang commented on a change in pull request #1797:
URL: https://github.com/apache/incubator-inlong/pull/1797#discussion_r750308891



##########
File path: inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/MetricObserver.java
##########
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.metrics;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.lang.ClassUtils;
+import org.apache.flume.Context;
+import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * MetricObserver
+ */
+public class MetricObserver {
+
+    public static final Logger LOG = LoggerFactory.getLogger(MetricObserver.class);
+    private static final AtomicBoolean isInited = new AtomicBoolean(false);
+    private static ScheduledExecutorService statExecutor = Executors.newScheduledThreadPool(5);

Review comment:
       statExecutor is a thread pool. Runtime exception will print stack trace to log file.
   It is enough to online trouble shooting based on log file.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] codecov-commenter edited a comment on pull request #1797: [Feature]DataProxy support monitor indicator with JMX. #1796

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #1797:
URL: https://github.com/apache/incubator-inlong/pull/1797#issuecomment-968876147


   # [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/1797?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#1797](https://codecov.io/gh/apache/incubator-inlong/pull/1797?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (26f6782) into [master](https://codecov.io/gh/apache/incubator-inlong/commit/d0279f6cd179972bd9c091162e1a8280ea3408ea?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (d0279f6) will **decrease** coverage by `0.20%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-inlong/pull/1797/graphs/tree.svg?width=650&height=150&src=pr&token=1EUK92O9K2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-inlong/pull/1797?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #1797      +/-   ##
   ============================================
   - Coverage     12.18%   11.98%   -0.21%     
   + Complexity     1049     1048       -1     
   ============================================
     Files           396      396              
     Lines         32833    33393     +560     
     Branches       5149     5274     +125     
   ============================================
     Hits           4002     4002              
   - Misses        28065    28623     +558     
   - Partials        766      768       +2     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-inlong/pull/1797?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...ache/inlong/tubemq/server/master/MasterConfig.java](https://codecov.io/gh/apache/incubator-inlong/pull/1797/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtc2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9pbmxvbmcvdHViZW1xL3NlcnZlci9tYXN0ZXIvTWFzdGVyQ29uZmlnLmphdmE=) | `32.63% <0.00%> (-0.50%)` | :arrow_down: |
   | [.../producer/qltystats/DefaultBrokerRcvQltyStats.java](https://codecov.io/gh/apache/incubator-inlong/pull/1797/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtY2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9pbmxvbmcvdHViZW1xL2NsaWVudC9wcm9kdWNlci9xbHR5c3RhdHMvRGVmYXVsdEJyb2tlclJjdlFsdHlTdGF0cy5qYXZh) | `45.31% <0.00%> (-0.40%)` | :arrow_down: |
   | [...rg/apache/inlong/tubemq/server/master/TMaster.java](https://codecov.io/gh/apache/incubator-inlong/pull/1797/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtc2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9pbmxvbmcvdHViZW1xL3NlcnZlci9tYXN0ZXIvVE1hc3Rlci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...ster/nodemanage/nodeconsumer/ConsumeGroupInfo.java](https://codecov.io/gh/apache/incubator-inlong/pull/1797/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtc2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9pbmxvbmcvdHViZW1xL3NlcnZlci9tYXN0ZXIvbm9kZW1hbmFnZS9ub2RlY29uc3VtZXIvQ29uc3VtZUdyb3VwSW5mby5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...aster/nodemanage/nodeconsumer/TopicConfigInfo.java](https://codecov.io/gh/apache/incubator-inlong/pull/1797/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtc2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9pbmxvbmcvdHViZW1xL3NlcnZlci9tYXN0ZXIvbm9kZW1hbmFnZS9ub2RlY29uc3VtZXIvVG9waWNDb25maWdJbmZvLmphdmE=) | | |
   | [...ache/inlong/tubemq/corebase/utils/OpsSyncInfo.java](https://codecov.io/gh/apache/incubator-inlong/pull/1797/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvaW5sb25nL3R1YmVtcS9jb3JlYmFzZS91dGlscy9PcHNTeW5jSW5mby5qYXZh) | `0.00% <0.00%> (ø)` | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/1797?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/1797?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [d0279f6...26f6782](https://codecov.io/gh/apache/incubator-inlong/pull/1797?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] aloyszhang merged pull request #1797: [INLONG-1796]DataProxy support monitor indicator with JMX. #1796

Posted by GitBox <gi...@apache.org>.
aloyszhang merged pull request #1797:
URL: https://github.com/apache/incubator-inlong/pull/1797


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] inter12 commented on a change in pull request #1797: [INLONG-1796]DataProxy support monitor indicator with JMX. #1796

Posted by GitBox <gi...@apache.org>.
inter12 commented on a change in pull request #1797:
URL: https://github.com/apache/incubator-inlong/pull/1797#discussion_r750054359



##########
File path: inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CacheClusterConfigHolder.java
##########
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.holder;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.commons.lang.ClassUtils;
+import org.apache.flume.Context;
+import org.apache.flume.conf.Configurable;
+import org.apache.inlong.dataproxy.config.loader.CacheClusterConfigLoader;
+import org.apache.inlong.dataproxy.config.loader.ContextCacheClusterConfigLoader;
+import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * CacheClusterConfigHolder
+ */
+public class CacheClusterConfigHolder implements Configurable {
+
+    public static final Logger LOG = LoggerFactory.getLogger(CacheClusterConfigHolder.class);
+
+    protected Context context;
+    private long reloadInterval;
+    private Timer reloadTimer;
+    private CacheClusterConfigLoader loader;
+
+    private List<CacheClusterConfig> configList;
+
+    /**
+     * configure
+     * 
+     * @param context
+     */
+    @Override
+    public void configure(Context context) {
+        this.context = context;
+        this.reloadInterval = context.getLong("reloadInterval", 60000L);
+        String loaderType = context.getString("cacheClusterConfig.type",
+                ContextCacheClusterConfigLoader.class.getName());
+        try {
+            Class<?> loaderClass = ClassUtils.getClass(loaderType);
+            Object loaderObject = loaderClass.getDeclaredConstructor().newInstance();
+            if (loaderObject instanceof CacheClusterConfigLoader) {
+                this.loader = (CacheClusterConfigLoader) loaderObject;
+            }
+        } catch (Throwable t) {
+            LOG.error("Fail to init loader,loaderType:{},error:{}", loaderType, t.getMessage());
+            LOG.error(t.getMessage(), t);
+        }
+        if (this.loader == null) {
+            this.loader = new ContextCacheClusterConfigLoader();
+        }
+        this.loader.configure(context);
+    }
+
+    /**
+     * start
+     */
+    public void start() {
+        try {
+            this.reload();
+            this.setReloadTimer();
+        } catch (Exception e) {
+            e.printStackTrace();

Review comment:
       should not print stacktrace direct




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] luchunliang commented on a change in pull request #1797: [INLONG-1796]DataProxy support monitor indicator with JMX. #1796

Posted by GitBox <gi...@apache.org>.
luchunliang commented on a change in pull request #1797:
URL: https://github.com/apache/incubator-inlong/pull/1797#discussion_r753652882



##########
File path: inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CacheClusterConfigHolder.java
##########
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.holder;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.commons.lang.ClassUtils;
+import org.apache.flume.Context;
+import org.apache.flume.conf.Configurable;
+import org.apache.inlong.dataproxy.config.loader.CacheClusterConfigLoader;
+import org.apache.inlong.dataproxy.config.loader.ContextCacheClusterConfigLoader;
+import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * CacheClusterConfigHolder
+ */
+public class CacheClusterConfigHolder implements Configurable {
+
+    public static final Logger LOG = LoggerFactory.getLogger(CacheClusterConfigHolder.class);
+
+    protected Context context;
+    private long reloadInterval;
+    private Timer reloadTimer;
+    private CacheClusterConfigLoader loader;
+
+    private List<CacheClusterConfig> configList;
+
+    /**
+     * configure
+     * 
+     * @param context
+     */
+    @Override
+    public void configure(Context context) {
+        this.context = context;
+        this.reloadInterval = context.getLong("reloadInterval", 60000L);
+        String loaderType = context.getString("cacheClusterConfig.type",
+                ContextCacheClusterConfigLoader.class.getName());
+        try {
+            Class<?> loaderClass = ClassUtils.getClass(loaderType);
+            Object loaderObject = loaderClass.getDeclaredConstructor().newInstance();
+            if (loaderObject instanceof CacheClusterConfigLoader) {
+                this.loader = (CacheClusterConfigLoader) loaderObject;
+            }
+        } catch (Throwable t) {
+            LOG.error("Fail to init loader,loaderType:{},error:{}", loaderType, t.getMessage());
+            LOG.error(t.getMessage(), t);
+        }
+        if (this.loader == null) {
+            this.loader = new ContextCacheClusterConfigLoader();
+        }
+        this.loader.configure(context);
+    }
+
+    /**
+     * start
+     */
+    public void start() {
+        try {
+            this.reload();
+            this.setReloadTimer();
+        } catch (Exception e) {
+            e.printStackTrace();

Review comment:
       fixed, print exception with log4j.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] inter12 commented on a change in pull request #1797: [INLONG-1796]DataProxy support monitor indicator with JMX. #1796

Posted by GitBox <gi...@apache.org>.
inter12 commented on a change in pull request #1797:
URL: https://github.com/apache/incubator-inlong/pull/1797#discussion_r750057618



##########
File path: inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/IdTopicConfigHolder.java
##########
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.holder;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.lang.ClassUtils;
+import org.apache.flume.Context;
+import org.apache.flume.conf.Configurable;
+import org.apache.inlong.dataproxy.config.loader.ContextIdTopicConfigLoader;
+import org.apache.inlong.dataproxy.config.loader.IdTopicConfigLoader;
+import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * IdTopicConfigHolder
+ */
+public class IdTopicConfigHolder implements Configurable {
+
+    public static final Logger LOG = LoggerFactory.getLogger(IdTopicConfigHolder.class);
+
+    protected Context context;
+    private long reloadInterval;
+    private Timer reloadTimer;
+    private IdTopicConfigLoader loader;
+
+    private List<IdTopicConfig> configList = new ArrayList<>();
+    private Map<String, IdTopicConfig> configMap = new ConcurrentHashMap<>();
+
+    /**
+     * configure
+     * 
+     * @param context
+     */
+    @Override
+    public void configure(Context context) {
+        this.context = context;
+        this.reloadInterval = context.getLong("reloadInterval", 60000L);
+        String loaderType = context.getString("idTopicConfig.type", ContextIdTopicConfigLoader.class.getName());
+        try {
+            Class<?> loaderClass = ClassUtils.getClass(loaderType);
+            Object loaderObject = loaderClass.getDeclaredConstructor().newInstance();
+            if (loaderObject instanceof IdTopicConfigLoader) {
+                this.loader = (IdTopicConfigLoader) loaderObject;
+            }
+        } catch (Throwable t) {
+            LOG.error("Fail to init loader,loaderType:{},error:{}", loaderType, t.getMessage());
+            LOG.error(t.getMessage(), t);
+        }
+        if (this.loader == null) {
+            this.loader = new ContextIdTopicConfigLoader();
+        }
+        this.loader.configure(context);
+    }
+
+    /**
+     * start
+     */
+    public void start() {
+        try {
+            this.reload();
+            this.setReloadTimer();
+        } catch (Exception e) {
+            e.printStackTrace();

Review comment:
       e.printstacktrace is not suitable




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] luchunliang commented on a change in pull request #1797: [INLONG-1796]DataProxy support monitor indicator with JMX. #1796

Posted by GitBox <gi...@apache.org>.
luchunliang commented on a change in pull request #1797:
URL: https://github.com/apache/incubator-inlong/pull/1797#discussion_r753652882



##########
File path: inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CacheClusterConfigHolder.java
##########
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.holder;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.commons.lang.ClassUtils;
+import org.apache.flume.Context;
+import org.apache.flume.conf.Configurable;
+import org.apache.inlong.dataproxy.config.loader.CacheClusterConfigLoader;
+import org.apache.inlong.dataproxy.config.loader.ContextCacheClusterConfigLoader;
+import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * CacheClusterConfigHolder
+ */
+public class CacheClusterConfigHolder implements Configurable {
+
+    public static final Logger LOG = LoggerFactory.getLogger(CacheClusterConfigHolder.class);
+
+    protected Context context;
+    private long reloadInterval;
+    private Timer reloadTimer;
+    private CacheClusterConfigLoader loader;
+
+    private List<CacheClusterConfig> configList;
+
+    /**
+     * configure
+     * 
+     * @param context
+     */
+    @Override
+    public void configure(Context context) {
+        this.context = context;
+        this.reloadInterval = context.getLong("reloadInterval", 60000L);
+        String loaderType = context.getString("cacheClusterConfig.type",
+                ContextCacheClusterConfigLoader.class.getName());
+        try {
+            Class<?> loaderClass = ClassUtils.getClass(loaderType);
+            Object loaderObject = loaderClass.getDeclaredConstructor().newInstance();
+            if (loaderObject instanceof CacheClusterConfigLoader) {
+                this.loader = (CacheClusterConfigLoader) loaderObject;
+            }
+        } catch (Throwable t) {
+            LOG.error("Fail to init loader,loaderType:{},error:{}", loaderType, t.getMessage());
+            LOG.error(t.getMessage(), t);
+        }
+        if (this.loader == null) {
+            this.loader = new ContextCacheClusterConfigLoader();
+        }
+        this.loader.configure(context);
+    }
+
+    /**
+     * start
+     */
+    public void start() {
+        try {
+            this.reload();
+            this.setReloadTimer();
+        } catch (Exception e) {
+            e.printStackTrace();

Review comment:
       fixed, print exception with log4j.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] luchunliang commented on a change in pull request #1797: [INLONG-1796]DataProxy support monitor indicator with JMX. #1796

Posted by GitBox <gi...@apache.org>.
luchunliang commented on a change in pull request #1797:
URL: https://github.com/apache/incubator-inlong/pull/1797#discussion_r750310322



##########
File path: inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationSink.java
##########
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.sink.pulsar.federation;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * PulsarSetSink
+ */
+public class PulsarFederationSink extends AbstractSink implements Configurable {
+
+    public static final Logger LOG = LoggerFactory.getLogger(PulsarFederationSink.class);
+
+    private PulsarFederationSinkContext context;
+    private List<PulsarFederationWorker> workers = new ArrayList<>();
+    private Map<String, String> dimensions;
+
+    /**
+     * start
+     */
+    @Override
+    public void start() {
+        String sinkName = this.getName();
+        // create worker
+        for (int i = 0; i < context.getMaxThreads(); i++) {
+            PulsarFederationWorker worker = new PulsarFederationWorker(sinkName, i, context);
+            worker.start();
+            this.workers.add(worker);
+        }
+        super.start();
+    }
+
+    /**
+     * stop
+     */
+    @Override
+    public void stop() {
+        for (PulsarFederationWorker worker : workers) {
+            try {
+                worker.close();
+            } catch (Throwable e) {
+                LOG.error(e.getMessage(), e);
+            }
+        }
+        this.context.close();
+        super.stop();
+    }
+
+    /**
+     * configure
+     * 
+     * @param context
+     */
+    @Override
+    public void configure(Context context) {
+        LOG.info("start to configure:{}, context:{}.", this.getClass().getSimpleName(), context.toString());
+        this.context = new PulsarFederationSinkContext(context);
+        this.dimensions = new HashMap<>();
+        this.dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.context.getProxyClusterId());
+        this.dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getName());
+    }
+
+    /**
+     * process
+     * 
+     * @return                        Status
+     * @throws EventDeliveryException
+     */
+    @Override
+    public Status process() throws EventDeliveryException {
+        Channel channel = getChannel();
+        Transaction tx = channel.getTransaction();
+        tx.begin();
+        try {
+            Event event = channel.take();
+            if (event == null) {
+                tx.commit();
+                return Status.BACKOFF;
+            }
+            //
+            int eventSize = event.getBody().length;
+            if (!this.context.getBufferQueue().tryAcquire(eventSize)) {
+                // record the failure of queue full for monitor
+                // metric
+                DataProxyMetricItem metricItem = this.context.getMetricItemSet().findMetricItem(dimensions);
+                metricItem.readFailCount.incrementAndGet();
+                metricItem.readFailSize.addAndGet(eventSize);
+                //
+                tx.rollback();
+                return Status.BACKOFF;
+            }
+            this.context.getBufferQueue().offer(event);
+            tx.commit();
+            return Status.READY;
+        } catch (Throwable t) {
+            LOG.error("Process event failed!" + this.getName(), t);
+            try {
+                tx.rollback();
+                // metric
+                DataProxyMetricItem metricItem = this.context.getMetricItemSet().findMetricItem(dimensions);
+                metricItem.readFailCount.incrementAndGet();
+            } catch (Throwable e) {
+                LOG.error("Channel take transaction rollback exception:" + getName(), e);
+            }
+            return Status.BACKOFF;
+        } finally {
+            tx.close();
+        }
+//        Channel channel = getChannel();

Review comment:
       fixed, remove un-used code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] codecov-commenter edited a comment on pull request #1797: [INLONG-1796]DataProxy support monitor indicator with JMX. #1796

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #1797:
URL: https://github.com/apache/incubator-inlong/pull/1797#issuecomment-968876147


   # [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/1797?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#1797](https://codecov.io/gh/apache/incubator-inlong/pull/1797?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (ec31078) into [master](https://codecov.io/gh/apache/incubator-inlong/commit/d0279f6cd179972bd9c091162e1a8280ea3408ea?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (d0279f6) will **decrease** coverage by `0.24%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-inlong/pull/1797/graphs/tree.svg?width=650&height=150&src=pr&token=1EUK92O9K2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-inlong/pull/1797?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #1797      +/-   ##
   ============================================
   - Coverage     12.18%   11.94%   -0.25%     
     Complexity     1049     1049              
   ============================================
     Files           396      399       +3     
     Lines         32833    33466     +633     
     Branches       5149     5276     +127     
   ============================================
   - Hits           4002     3997       -5     
   - Misses        28065    28701     +636     
   - Partials        766      768       +2     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-inlong/pull/1797?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../java/org/apache/flume/sink/tubemq/TubemqSink.java](https://codecov.io/gh/apache/incubator-inlong/pull/1797/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtY29ubmVjdG9ycy90dWJlbXEtY29ubmVjdG9yLWZsdW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9mbHVtZS9zaW5rL3R1YmVtcS9UdWJlbXFTaW5rLmphdmE=) | `51.42% <0.00%> (-4.00%)` | :arrow_down: |
   | [...tubemq/client/factory/TubeMultiSessionFactory.java](https://codecov.io/gh/apache/incubator-inlong/pull/1797/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtY2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9pbmxvbmcvdHViZW1xL2NsaWVudC9mYWN0b3J5L1R1YmVNdWx0aVNlc3Npb25GYWN0b3J5LmphdmE=) | `42.30% <0.00%> (-1.70%)` | :arrow_down: |
   | [...ubemq/client/factory/TubeSingleSessionFactory.java](https://codecov.io/gh/apache/incubator-inlong/pull/1797/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtY2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9pbmxvbmcvdHViZW1xL2NsaWVudC9mYWN0b3J5L1R1YmVTaW5nbGVTZXNzaW9uRmFjdG9yeS5qYXZh) | `34.21% <0.00%> (-0.93%)` | :arrow_down: |
   | [...he/inlong/tubemq/client/config/ConsumerConfig.java](https://codecov.io/gh/apache/incubator-inlong/pull/1797/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtY2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9pbmxvbmcvdHViZW1xL2NsaWVudC9jb25maWcvQ29uc3VtZXJDb25maWcuamF2YQ==) | `22.12% <0.00%> (-0.74%)` | :arrow_down: |
   | [.../tubemq/client/factory/TubeBaseSessionFactory.java](https://codecov.io/gh/apache/incubator-inlong/pull/1797/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtY2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9pbmxvbmcvdHViZW1xL2NsaWVudC9mYWN0b3J5L1R1YmVCYXNlU2Vzc2lvbkZhY3RvcnkuamF2YQ==) | `58.13% <0.00%> (-0.69%)` | :arrow_down: |
   | [...ache/inlong/tubemq/server/master/MasterConfig.java](https://codecov.io/gh/apache/incubator-inlong/pull/1797/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtc2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9pbmxvbmcvdHViZW1xL3NlcnZlci9tYXN0ZXIvTWFzdGVyQ29uZmlnLmphdmE=) | `32.63% <0.00%> (-0.50%)` | :arrow_down: |
   | [...rg/apache/inlong/tubemq/server/master/TMaster.java](https://codecov.io/gh/apache/incubator-inlong/pull/1797/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtc2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9pbmxvbmcvdHViZW1xL3NlcnZlci9tYXN0ZXIvVE1hc3Rlci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...g/apache/inlong/tubemq/client/common/PeerInfo.java](https://codecov.io/gh/apache/incubator-inlong/pull/1797/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtY2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9pbmxvbmcvdHViZW1xL2NsaWVudC9jb21tb24vUGVlckluZm8uamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...ster/nodemanage/nodeconsumer/ConsumeGroupInfo.java](https://codecov.io/gh/apache/incubator-inlong/pull/1797/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtc2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9pbmxvbmcvdHViZW1xL3NlcnZlci9tYXN0ZXIvbm9kZW1hbmFnZS9ub2RlY29uc3VtZXIvQ29uc3VtZUdyb3VwSW5mby5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...aster/nodemanage/nodeconsumer/TopicConfigInfo.java](https://codecov.io/gh/apache/incubator-inlong/pull/1797/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtc2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9pbmxvbmcvdHViZW1xL3NlcnZlci9tYXN0ZXIvbm9kZW1hbmFnZS9ub2RlY29uc3VtZXIvVG9waWNDb25maWdJbmZvLmphdmE=) | | |
   | ... and [4 more](https://codecov.io/gh/apache/incubator-inlong/pull/1797/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/1797?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/1797?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [d0279f6...ec31078](https://codecov.io/gh/apache/incubator-inlong/pull/1797?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] luchunliang commented on a change in pull request #1797: [INLONG-1796]DataProxy support monitor indicator with JMX. #1796

Posted by GitBox <gi...@apache.org>.
luchunliang commented on a change in pull request #1797:
URL: https://github.com/apache/incubator-inlong/pull/1797#discussion_r750276112



##########
File path: inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/IdTopicConfigHolder.java
##########
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.holder;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.lang.ClassUtils;
+import org.apache.flume.Context;
+import org.apache.flume.conf.Configurable;
+import org.apache.inlong.dataproxy.config.loader.ContextIdTopicConfigLoader;
+import org.apache.inlong.dataproxy.config.loader.IdTopicConfigLoader;
+import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * IdTopicConfigHolder
+ */
+public class IdTopicConfigHolder implements Configurable {
+
+    public static final Logger LOG = LoggerFactory.getLogger(IdTopicConfigHolder.class);
+
+    protected Context context;
+    private long reloadInterval;
+    private Timer reloadTimer;
+    private IdTopicConfigLoader loader;
+
+    private List<IdTopicConfig> configList = new ArrayList<>();
+    private Map<String, IdTopicConfig> configMap = new ConcurrentHashMap<>();
+
+    /**
+     * configure
+     * 
+     * @param context
+     */
+    @Override
+    public void configure(Context context) {
+        this.context = context;
+        this.reloadInterval = context.getLong("reloadInterval", 60000L);
+        String loaderType = context.getString("idTopicConfig.type", ContextIdTopicConfigLoader.class.getName());
+        try {
+            Class<?> loaderClass = ClassUtils.getClass(loaderType);
+            Object loaderObject = loaderClass.getDeclaredConstructor().newInstance();
+            if (loaderObject instanceof IdTopicConfigLoader) {
+                this.loader = (IdTopicConfigLoader) loaderObject;
+            }
+        } catch (Throwable t) {
+            LOG.error("Fail to init loader,loaderType:{},error:{}", loaderType, t.getMessage());
+            LOG.error(t.getMessage(), t);
+        }
+        if (this.loader == null) {
+            this.loader = new ContextIdTopicConfigLoader();
+        }
+        this.loader.configure(context);
+    }
+
+    /**
+     * start
+     */
+    public void start() {
+        try {
+            this.reload();
+            this.setReloadTimer();
+        } catch (Exception e) {
+            e.printStackTrace();

Review comment:
       fixed, remove this line.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] luchunliang commented on a change in pull request #1797: [INLONG-1796]DataProxy support monitor indicator with JMX. #1796

Posted by GitBox <gi...@apache.org>.
luchunliang commented on a change in pull request #1797:
URL: https://github.com/apache/incubator-inlong/pull/1797#discussion_r750281884



##########
File path: inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ContextIdTopicConfigLoader.java
##########
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.loader;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.flume.Context;
+import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
+
+/**
+ * 
+ * ContextIdTopicConfigLoader
+ */
+public class ContextIdTopicConfigLoader implements IdTopicConfigLoader {
+
+    private Context context;
+
+    /**
+     * load
+     * 
+     * @return
+     */
+    @Override
+    public List<IdTopicConfig> load() {
+        Map<String, String> idTopicMap = context.getSubProperties("idTopicConfig.");
+        List<IdTopicConfig> configList = new ArrayList<>(idTopicMap.size());

Review comment:
       context.getSubProperties() should not return null.
     public ImmutableMap<String, String> getSubProperties(String prefix) {
       Preconditions.checkArgument(prefix.endsWith("."),
           "The given prefix does not end with a period (" + prefix + ")");
       Map<String, String> result = Maps.newHashMap();
       synchronized (parameters) {
         for (Entry<String, String> entry : parameters.entrySet()) {
           String key = entry.getKey();
           if (key.startsWith(prefix)) {
             String name = key.substring(prefix.length());
             result.put(name, entry.getValue());
           }
         }
       }
       return ImmutableMap.copyOf(result);
     }




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] luchunliang commented on a change in pull request #1797: [INLONG-1796]DataProxy support monitor indicator with JMX. #1796

Posted by GitBox <gi...@apache.org>.
luchunliang commented on a change in pull request #1797:
URL: https://github.com/apache/incubator-inlong/pull/1797#discussion_r750279955



##########
File path: inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ClassResourceCommonPropertiesLoader.java
##########
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.loader;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.URL;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * FileCommonPropertiesLoader
+ */
+public class ClassResourceCommonPropertiesLoader implements CommonPropertiesLoader {
+
+    public static final Logger LOG = LoggerFactory.getLogger(ClassResourceCommonPropertiesLoader.class);
+
+    /**
+     * load
+     * 
+     * @return
+     */
+    @Override
+    public Map<String, String> load() {
+        return this.loadProperties("common.properties");
+    }
+
+    /**
+     * loadProperties
+     * 
+     * @param  fileName
+     * @return
+     */
+    protected Map<String, String> loadProperties(String fileName) {
+        Map<String, String> result = new ConcurrentHashMap<>();
+        InputStream inStream = null;

Review comment:
       fixed, use try-with-resource mode.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] luchunliang commented on a change in pull request #1797: [INLONG-1796]DataProxy support monitor indicator with JMX. #1796

Posted by GitBox <gi...@apache.org>.
luchunliang commented on a change in pull request #1797:
URL: https://github.com/apache/incubator-inlong/pull/1797#discussion_r750309388



##########
File path: inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/MetricListenerRunnable.java
##########
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.metrics;
+
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.management.AttributeNotFoundException;
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
+
+import org.apache.commons.lang.ClassUtils;
+import org.apache.inlong.commons.config.metrics.MetricItem;
+import org.apache.inlong.commons.config.metrics.MetricItemMBean;
+import org.apache.inlong.commons.config.metrics.MetricItemSetMBean;
+import org.apache.inlong.commons.config.metrics.MetricValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * MetricListenerRunnable
+ */
+public class MetricListenerRunnable implements Runnable {
+
+    public static final Logger LOG = LoggerFactory.getLogger(MetricObserver.class);
+
+    private String domain;
+    private List<MetricListener> listenerList;
+
+    /**
+     * Constructor
+     * 
+     * @param domain
+     * @param listenerList
+     */
+    public MetricListenerRunnable(String domain, List<MetricListener> listenerList) {
+        this.domain = domain;
+        this.listenerList = listenerList;
+    }
+
+    /**
+     * run
+     */
+    @Override
+    public void run() {
+        LOG.info("begin to snapshot metric:{}", domain);
+        try {
+            List<MetricItemValue> itemValues = this.getItemValues();
+            LOG.info("snapshot metric:{},size:{}", domain, itemValues.size());
+//            LOG.info("snapshot metric:{},size:{},contents:{}", 

Review comment:
       fixed, remove un-used code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] luchunliang commented on a change in pull request #1797: [INLONG-1796]DataProxy support monitor indicator with JMX. #1796

Posted by GitBox <gi...@apache.org>.
luchunliang commented on a change in pull request #1797:
URL: https://github.com/apache/incubator-inlong/pull/1797#discussion_r750273949



##########
File path: inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CacheClusterConfigHolder.java
##########
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.holder;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.commons.lang.ClassUtils;
+import org.apache.flume.Context;
+import org.apache.flume.conf.Configurable;
+import org.apache.inlong.dataproxy.config.loader.CacheClusterConfigLoader;
+import org.apache.inlong.dataproxy.config.loader.ContextCacheClusterConfigLoader;
+import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * CacheClusterConfigHolder
+ */
+public class CacheClusterConfigHolder implements Configurable {
+
+    public static final Logger LOG = LoggerFactory.getLogger(CacheClusterConfigHolder.class);
+
+    protected Context context;
+    private long reloadInterval;
+    private Timer reloadTimer;
+    private CacheClusterConfigLoader loader;
+
+    private List<CacheClusterConfig> configList;
+
+    /**
+     * configure
+     * 
+     * @param context
+     */
+    @Override
+    public void configure(Context context) {
+        this.context = context;
+        this.reloadInterval = context.getLong("reloadInterval", 60000L);
+        String loaderType = context.getString("cacheClusterConfig.type",
+                ContextCacheClusterConfigLoader.class.getName());
+        try {
+            Class<?> loaderClass = ClassUtils.getClass(loaderType);
+            Object loaderObject = loaderClass.getDeclaredConstructor().newInstance();
+            if (loaderObject instanceof CacheClusterConfigLoader) {
+                this.loader = (CacheClusterConfigLoader) loaderObject;
+            }
+        } catch (Throwable t) {
+            LOG.error("Fail to init loader,loaderType:{},error:{}", loaderType, t.getMessage());
+            LOG.error(t.getMessage(), t);
+        }
+        if (this.loader == null) {
+            this.loader = new ContextCacheClusterConfigLoader();
+        }
+        this.loader.configure(context);
+    }
+
+    /**
+     * start
+     */
+    public void start() {
+        try {
+            this.reload();
+            this.setReloadTimer();
+        } catch (Exception e) {
+            e.printStackTrace();
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * close
+     */
+    public void close() {
+        try {
+            this.reloadTimer.cancel();
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * setReloadTimer
+     */
+    private void setReloadTimer() {
+        reloadTimer = new Timer(true);
+        TimerTask task = new TimerTask() {

Review comment:
       ScheduledExecutorService is used to ThreadPool case, it is better for a lot of same task.
   The reload task of CacheClusterConfigHolder is a single schedule task, single thread of Timer is better.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org