You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2021/12/17 08:54:02 UTC

[GitHub] [incubator-inlong] luchunliang opened a new pull request #2022: [INLONG-1992] sort-flink support configurable loader of getting configuration.

luchunliang opened a new pull request #2022:
URL: https://github.com/apache/incubator-inlong/pull/2022


   ### Title Name: [INLONG-1992][Inlong-sort-flink] sort-flink support configurable loader of getting configuration.
   
   Fixes #1992 
   
   ### Motivation
   
   *Explain here the context, and why you're making that change. What is the problem you're trying to solve.*
   
   ### Modifications
   
   *Describe the modifications you've done.*
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
     - If a feature is not applicable for documentation, explain why?
     - If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] chantccc commented on a change in pull request #2022: [INLONG-1992] sort-flink support configurable loader of getting configuration.

Posted by GitBox <gi...@apache.org>.
chantccc commented on a change in pull request #2022:
URL: https://github.com/apache/incubator-inlong/pull/2022#discussion_r775383191



##########
File path: inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/ZookeeperWatcher.java
##########
@@ -38,10 +39,16 @@
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.inlong.sort.configuration.Configuration;
+import org.apache.inlong.sort.configuration.Constants;
+import org.apache.inlong.sort.meta.MetaManager.DataFlowInfoListener;
+import org.apache.inlong.sort.util.ZooKeeperUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ZookeeperWatcher implements AutoCloseable, UnhandledErrorListener {
+import com.google.common.base.Preconditions;
+
+public class ZookeeperWatcher implements AutoCloseable, UnhandledErrorListener, MetaWatcher {

Review comment:
       ZookeeperWatcher is a basic util class wrapping zk related operations. It should not implement MetaWatcher.
   Use another class ZookeeperMetaWatcher which implements MetaWatcher instead.

##########
File path: inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/ZookeeperWatcher.java
##########
@@ -50,26 +57,52 @@
     /**
      * Connection to the used ZooKeeper quorum.
      */
-    private final CuratorFramework client;
+    private CuratorFramework client;
 
     /**
      * Curator recipe to watch changes of a specific ZooKeeper node.
      */
     @GuardedBy("lock")
-    private final Map<String, NodeCache> caches;
+    private Map<String, NodeCache> caches;
 
     @GuardedBy("lock")
-    private final Map<String, PathChildrenCache> pathChildrenCaches;
+    private Map<String, PathChildrenCache> pathChildrenCaches;
 
-    private final ConnectionStateListener connectionStateListener =
-            (client, newState) -> handleStateChange(newState);
+    private ConnectionStateListener connectionStateListener = (client, newState) -> handleStateChange(newState);
 
-    public ZookeeperWatcher(CuratorFramework client) {
-        this.client = checkNotNull(client, "CuratorFramework client");
+    /**

Review comment:
       descriptions of param are empty.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] codecov-commenter edited a comment on pull request #2022: [INLONG-1992] sort-flink support configurable loader of getting configuration.

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #2022:
URL: https://github.com/apache/incubator-inlong/pull/2022#issuecomment-996559178


   # [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/2022?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#2022](https://codecov.io/gh/apache/incubator-inlong/pull/2022?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (77ee515) into [master](https://codecov.io/gh/apache/incubator-inlong/commit/59d9be6dbf4a7f348f67ebd3acf87640bb859432?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (59d9be6) will **increase** coverage by `0.00%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-inlong/pull/2022/graphs/tree.svg?width=650&height=150&src=pr&token=1EUK92O9K2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-inlong/pull/2022?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff            @@
   ##             master    #2022   +/-   ##
   =========================================
     Coverage     12.27%   12.28%           
     Complexity     1157     1157           
   =========================================
     Files           413      413           
     Lines         35225    35215   -10     
     Branches       5542     5542           
   =========================================
     Hits           4325     4325           
   + Misses        30129    30119   -10     
     Partials        771      771           
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-inlong/pull/2022?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../flink/connectors/tubemq/TubemqSourceFunction.java](https://codecov.io/gh/apache/incubator-inlong/pull/2022/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtY29ubmVjdG9ycy90dWJlbXEtY29ubmVjdG9yLWZsaW5rL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9mbGluay9jb25uZWN0b3JzL3R1YmVtcS9UdWJlbXFTb3VyY2VGdW5jdGlvbi5qYXZh) | `0.00% <0.00%> (ø)` | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/2022?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/2022?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [59d9be6...77ee515](https://codecov.io/gh/apache/incubator-inlong/pull/2022?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] luchunliang commented on a change in pull request #2022: [INLONG-1992] sort-flink support configurable loader of getting configuration.

Posted by GitBox <gi...@apache.org>.
luchunliang commented on a change in pull request #2022:
URL: https://github.com/apache/incubator-inlong/pull/2022#discussion_r775422281



##########
File path: inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/ZookeeperWatcher.java
##########
@@ -38,10 +39,16 @@
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.inlong.sort.configuration.Configuration;
+import org.apache.inlong.sort.configuration.Constants;
+import org.apache.inlong.sort.meta.MetaManager.DataFlowInfoListener;
+import org.apache.inlong.sort.util.ZooKeeperUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ZookeeperWatcher implements AutoCloseable, UnhandledErrorListener {
+import com.google.common.base.Preconditions;
+
+public class ZookeeperWatcher implements AutoCloseable, UnhandledErrorListener, MetaWatcher {

Review comment:
       ok, add another class ZookeeperMetaWatcher  which implements MetaWatcher, and change ZookeeperWatcher  to ZookeeperWatcherUtils.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] chantccc commented on a change in pull request #2022: [INLONG-1992] sort-flink support configurable loader of getting configuration.

Posted by GitBox <gi...@apache.org>.
chantccc commented on a change in pull request #2022:
URL: https://github.com/apache/incubator-inlong/pull/2022#discussion_r775695014



##########
File path: inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/zookeeper/ZookeeperWatcherUtils.java
##########
@@ -38,38 +39,74 @@
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.sort.configuration.Configuration;
+import org.apache.inlong.sort.configuration.Constants;
+import org.apache.inlong.sort.meta.MetaManager;
+import org.apache.inlong.sort.meta.MetaManager.DataFlowInfoListener;
+import org.apache.inlong.sort.protocol.DataFlowInfo;
+import org.apache.inlong.sort.protocol.DataFlowStorageInfo;
+import org.apache.inlong.sort.util.ZooKeeperUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ZookeeperWatcher implements AutoCloseable, UnhandledErrorListener {
+import com.google.common.base.Preconditions;
 
-    private static final Logger LOG = LoggerFactory.getLogger(ZookeeperWatcher.class);
+public class ZookeeperWatcherUtils implements AutoCloseable, UnhandledErrorListener {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ZookeeperWatcherUtils.class);
 
     private final Object lock = new Object();
 
     /**
      * Connection to the used ZooKeeper quorum.
      */
-    private final CuratorFramework client;
+    private CuratorFramework client;
 
     /**
      * Curator recipe to watch changes of a specific ZooKeeper node.
      */
     @GuardedBy("lock")
-    private final Map<String, NodeCache> caches;
+    private Map<String, NodeCache> caches;
 
     @GuardedBy("lock")
-    private final Map<String, PathChildrenCache> pathChildrenCaches;
+    private Map<String, PathChildrenCache> pathChildrenCaches;
 
-    private final ConnectionStateListener connectionStateListener =
-            (client, newState) -> handleStateChange(newState);
+    private ConnectionStateListener connectionStateListener = (client, newState) -> handleStateChange(newState);
 
-    public ZookeeperWatcher(CuratorFramework client) {
-        this.client = checkNotNull(client, "CuratorFramework client");
+    /**
+     * open
+     * 
+     * @param  config       command parameters when process start.
+     * @param  metaListener a listener of DataFlowInfo
+     * @throws Exception    any exception
+     */
+    public void open(Configuration config, DataFlowInfoListener metaListener) throws Exception {
+        this.client = ZooKeeperUtils.startCuratorFramework(config);
         this.caches = new HashMap<>();
         this.pathChildrenCaches = new HashMap<>();
         client.getUnhandledErrorListenable().addListener(this);
         client.getConnectionStateListenable().addListener(connectionStateListener);
+        synchronized (MetaManager.LOCK) {

Review comment:
       Move these codes into ZookeeperMetaWatcher.
   ZookeeperWatcherUtils should not be aware of anything about dataflow.

##########
File path: inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/zookeeper/ZookeeperWatcherUtils.java
##########
@@ -38,38 +39,74 @@
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.sort.configuration.Configuration;
+import org.apache.inlong.sort.configuration.Constants;
+import org.apache.inlong.sort.meta.MetaManager;
+import org.apache.inlong.sort.meta.MetaManager.DataFlowInfoListener;
+import org.apache.inlong.sort.protocol.DataFlowInfo;
+import org.apache.inlong.sort.protocol.DataFlowStorageInfo;
+import org.apache.inlong.sort.util.ZooKeeperUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ZookeeperWatcher implements AutoCloseable, UnhandledErrorListener {
+import com.google.common.base.Preconditions;
 
-    private static final Logger LOG = LoggerFactory.getLogger(ZookeeperWatcher.class);
+public class ZookeeperWatcherUtils implements AutoCloseable, UnhandledErrorListener {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ZookeeperWatcherUtils.class);
 
     private final Object lock = new Object();
 
     /**
      * Connection to the used ZooKeeper quorum.
      */
-    private final CuratorFramework client;
+    private CuratorFramework client;
 
     /**
      * Curator recipe to watch changes of a specific ZooKeeper node.
      */
     @GuardedBy("lock")
-    private final Map<String, NodeCache> caches;
+    private Map<String, NodeCache> caches;
 
     @GuardedBy("lock")
-    private final Map<String, PathChildrenCache> pathChildrenCaches;
+    private Map<String, PathChildrenCache> pathChildrenCaches;
 
-    private final ConnectionStateListener connectionStateListener =
-            (client, newState) -> handleStateChange(newState);
+    private ConnectionStateListener connectionStateListener = (client, newState) -> handleStateChange(newState);
 
-    public ZookeeperWatcher(CuratorFramework client) {
-        this.client = checkNotNull(client, "CuratorFramework client");
+    /**
+     * open
+     * 
+     * @param  config       command parameters when process start.
+     * @param  metaListener a listener of DataFlowInfo
+     * @throws Exception    any exception
+     */
+    public void open(Configuration config, DataFlowInfoListener metaListener) throws Exception {
+        this.client = ZooKeeperUtils.startCuratorFramework(config);
         this.caches = new HashMap<>();
         this.pathChildrenCaches = new HashMap<>();
         client.getUnhandledErrorListenable().addListener(this);
         client.getConnectionStateListenable().addListener(connectionStateListener);
+        synchronized (MetaManager.LOCK) {
+            final String dataFlowsWatchingPath = getWatchingPathOfDataFlowsInCluster(
+                    config.getString(Constants.CLUSTER_ID));
+            DataFlowsChildrenWatcherListener dataFlowsChildrenWatcherListener = new DataFlowsChildrenWatcherListener(
+                    this.client, metaListener);
+            this.registerPathChildrenWatcher(
+                    dataFlowsWatchingPath, dataFlowsChildrenWatcherListener, true);
+            final List<ChildData> childData = this.getCurrentPathChildrenDatum(dataFlowsWatchingPath);
+            if (childData != null) {
+                dataFlowsChildrenWatcherListener.onInitialized(childData);
+            }
+        }
+    }
+
+    /**
+     * If you want to change the path rule here, please change ZkTools#getNodePathOfDataFlowStorageInfoInCluster in api
+     * too.
+     */
+    public static String getWatchingPathOfDataFlowsInCluster(String cluster) {

Review comment:
       ditto




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] chantccc commented on pull request #2022: [INLONG-1992] sort-flink support configurable loader of getting configuration.

Posted by GitBox <gi...@apache.org>.
chantccc commented on pull request #2022:
URL: https://github.com/apache/incubator-inlong/pull/2022#issuecomment-1002396390


   +1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] chantccc commented on a change in pull request #2022: [INLONG-1992] sort-flink support configurable loader of getting configuration.

Posted by GitBox <gi...@apache.org>.
chantccc commented on a change in pull request #2022:
URL: https://github.com/apache/incubator-inlong/pull/2022#discussion_r775695412



##########
File path: inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/MetaManager.java
##########
@@ -132,149 +129,116 @@ public void registerDataFlowInfoListener(DataFlowInfoListener dataFlowInfoListen
         LOG.info("Register DataFlowInfoListener successfully");
     }
 
+    /**
+     * getDataFlowInfo
+     * 
+     * @param  id
+     * @return
+     */
     public DataFlowInfo getDataFlowInfo(long id) {
-        synchronized (lock) {
+        synchronized (LOCK) {
             return dataFlowInfoMap.get(id);
         }
     }
 
-    public interface DataFlowInfoListener {
-        void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
-
-        void updateDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
-
-        void removeDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
-    }
-
-    private class DataFlowsChildrenWatcherListener implements ChildrenWatcherListener {
-
-        @Override
-        public void onChildAdded(ChildData childData) throws Exception {
-            LOG.info("DataFlow Added event retrieved");
-
-            final byte[] data = childData.getData();
-            if (data == null) {
-                return;
+    /**
+     * addDataFlow
+     * 
+     * @param  dataFlowInfo
+     * @throws Exception
+     */
+    public void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+        long dataFlowId = dataFlowInfo.getId();
+        DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId, dataFlowInfo);
+        if (oldDataFlowInfo == null) {
+            LOG.info("Try to add dataFlow {}", dataFlowId);
+            for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
+                try {
+                    dataFlowInfoListener.addDataFlow(dataFlowInfo);
+                } catch (Exception e) {
+                    LOG.warn("Error happens when notifying listener data flow added", e);
+                }
             }
-
-            synchronized (lock) {
-                DataFlowStorageInfo dataFlowStorageInfo = objectMapper.readValue(data, DataFlowStorageInfo.class);
-                DataFlowInfo dataFlowInfo = getDataFlowInfo(dataFlowStorageInfo);
-                long dataFlowId = dataFlowInfo.getId();
-
-                DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId, dataFlowInfo);
-                if (oldDataFlowInfo == null) {
-                    LOG.info("Try to add dataFlow {}", dataFlowId);
-                    for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
-                        try {
-                            dataFlowInfoListener.addDataFlow(dataFlowInfo);
-                        } catch (Exception e) {
-                            LOG.warn("Error happens when notifying listener data flow added", e);
-                        }
-                    }
-                } else {
-                    LOG.warn("DataFlow {} should not be exist", dataFlowId);
-                    for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
-                        try {
-                            dataFlowInfoListener.updateDataFlow(dataFlowInfo);
-                        } catch (Exception e) {
-                            LOG.warn("Error happens when notifying listener data flow updated", e);
-                        }
-                    }
+        } else {
+            LOG.warn("DataFlow {} should not be exist", dataFlowId);
+            for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
+                try {
+                    dataFlowInfoListener.updateDataFlow(dataFlowInfo);
+                } catch (Exception e) {
+                    LOG.warn("Error happens when notifying listener data flow updated", e);
                 }
             }
         }
 
-        @Override
-        public void onChildUpdated(ChildData childData) throws Exception {
-            LOG.info("DataFlow Updated event retrieved");
+    }
 
-            final byte[] data = childData.getData();
-            if (data == null) {
-                return;
-            }
+    /**
+     * updateDataFlow
+     * 
+     * @param  dataFlowInfo
+     * @throws Exception
+     */
+    public void updateDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+        long dataFlowId = dataFlowInfo.getId();
 
-            synchronized (lock) {
-                DataFlowStorageInfo dataFlowStorageInfo = objectMapper.readValue(data, DataFlowStorageInfo.class);
-                DataFlowInfo dataFlowInfo = getDataFlowInfo(dataFlowStorageInfo);
-                long dataFlowId = dataFlowInfo.getId();
-
-                DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId, dataFlowInfo);
-                if (oldDataFlowInfo == null) {
-                    LOG.warn("DataFlow {} should already be exist.", dataFlowId);
-                    for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
-                        try {
-                            dataFlowInfoListener.addDataFlow(dataFlowInfo);
-                        } catch (Exception e) {
-                            LOG.warn("Error happens when notifying listener data flow updated", e);
-                        }
-                    }
-                } else {
-                    if (dataFlowInfo.equals(oldDataFlowInfo)) {
-                        LOG.info("DataFlowInfo has not been changed, ignore update.");
-                        return;
-                    }
-
-                    LOG.info("Try to update dataFlow {}.", dataFlowId);
-
-                    for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
-                        try {
-                            dataFlowInfoListener.updateDataFlow(dataFlowInfo);
-                        } catch (Exception e) {
-                            LOG.warn("Error happens when notifying listener data flow updated", e);
-                        }
-                    }
+        DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId, dataFlowInfo);
+        if (oldDataFlowInfo == null) {
+            LOG.warn("DataFlow {} should already be exist.", dataFlowId);
+            for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
+                try {
+                    dataFlowInfoListener.addDataFlow(dataFlowInfo);
+                } catch (Exception e) {
+                    LOG.warn("Error happens when notifying listener data flow updated", e);
                 }
             }
-        }
-
-        @Override
-        public void onChildRemoved(ChildData childData) throws Exception {
-            LOG.info("DataFlow Removed event retrieved");
-
-            final byte[] data = childData.getData();
-            if (data == null) {
+        } else {
+            if (dataFlowInfo.equals(oldDataFlowInfo)) {
+                LOG.info("DataFlowInfo has not been changed, ignore update.");
                 return;
             }
 
-            synchronized (lock) {
-                DataFlowStorageInfo dataFlowStorageInfo = objectMapper.readValue(data, DataFlowStorageInfo.class);
-                DataFlowInfo dataFlowInfo = getDataFlowInfo(dataFlowStorageInfo);
-                long dataFlowId = dataFlowInfo.getId();
-
-                dataFlowInfoMap.remove(dataFlowId);
-                for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
-                    try {
-                        dataFlowInfoListener.removeDataFlow(dataFlowInfo);
-                    } catch (Exception e) {
-                        LOG.warn("Error happens when notifying listener data flow deleted", e);
-                    }
+            LOG.info("Try to update dataFlow {}.", dataFlowId);
+
+            for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
+                try {
+                    dataFlowInfoListener.updateDataFlow(dataFlowInfo);
+                } catch (Exception e) {
+                    LOG.warn("Error happens when notifying listener data flow updated", e);
                 }
             }
         }
 
-        @Override
-        public void onInitialized(List<ChildData> childData) throws Exception {
-            LOG.info("Initialized event retrieved");
+    }
 
-            for (ChildData singleChildData : childData) {
-                onChildAdded(singleChildData);
+    /**
+     * removeDataFlow
+     * 
+     * @param  dataFlowInfo
+     * @throws Exception
+     */
+    public void removeDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+        long dataFlowId = dataFlowInfo.getId();
+
+        dataFlowInfoMap.remove(dataFlowId);

Review comment:
       ditto




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] dockerzhang commented on pull request #2022: [INLONG-1992] sort-flink support configurable loader of getting configuration.

Posted by GitBox <gi...@apache.org>.
dockerzhang commented on pull request #2022:
URL: https://github.com/apache/incubator-inlong/pull/2022#issuecomment-999534590


   @ifndef-SleePy PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] codecov-commenter commented on pull request #2022: [INLONG-1992] sort-flink support configurable loader of getting configuration.

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #2022:
URL: https://github.com/apache/incubator-inlong/pull/2022#issuecomment-996559178


   # [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/2022?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#2022](https://codecov.io/gh/apache/incubator-inlong/pull/2022?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6830d20) into [master](https://codecov.io/gh/apache/incubator-inlong/commit/59d9be6dbf4a7f348f67ebd3acf87640bb859432?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (59d9be6) will **decrease** coverage by `0.01%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-inlong/pull/2022/graphs/tree.svg?width=650&height=150&src=pr&token=1EUK92O9K2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-inlong/pull/2022?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #2022      +/-   ##
   ============================================
   - Coverage     12.27%   12.25%   -0.02%     
     Complexity     1157     1157              
   ============================================
     Files           413      413              
     Lines         35225    35225              
     Branches       5542     5542              
   ============================================
   - Hits           4325     4318       -7     
   - Misses        30129    30135       +6     
   - Partials        771      772       +1     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-inlong/pull/2022?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../java/org/apache/flume/sink/tubemq/TubemqSink.java](https://codecov.io/gh/apache/incubator-inlong/pull/2022/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtY29ubmVjdG9ycy90dWJlbXEtY29ubmVjdG9yLWZsdW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9mbHVtZS9zaW5rL3R1YmVtcS9UdWJlbXFTaW5rLmphdmE=) | `51.42% <0.00%> (-4.00%)` | :arrow_down: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/2022?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/2022?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [59d9be6...6830d20](https://codecov.io/gh/apache/incubator-inlong/pull/2022?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] luchunliang commented on a change in pull request #2022: [INLONG-1992] sort-flink support configurable loader of getting configuration.

Posted by GitBox <gi...@apache.org>.
luchunliang commented on a change in pull request #2022:
URL: https://github.com/apache/incubator-inlong/pull/2022#discussion_r775423095



##########
File path: inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/MetaWatcher.java
##########
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.meta;
+
+import org.apache.inlong.sort.configuration.Configuration;
+import org.apache.inlong.sort.meta.MetaManager.DataFlowInfoListener;
+
+/**
+ * 
+ * MetaWatcher
+ */
+public interface MetaWatcher {
+
+    /**
+     * open
+     * 
+     * @param  config

Review comment:
       add param comment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] chantccc commented on a change in pull request #2022: [INLONG-1992] sort-flink support configurable loader of getting configuration.

Posted by GitBox <gi...@apache.org>.
chantccc commented on a change in pull request #2022:
URL: https://github.com/apache/incubator-inlong/pull/2022#discussion_r775695298



##########
File path: inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/MetaManager.java
##########
@@ -132,149 +129,116 @@ public void registerDataFlowInfoListener(DataFlowInfoListener dataFlowInfoListen
         LOG.info("Register DataFlowInfoListener successfully");
     }
 
+    /**
+     * getDataFlowInfo
+     * 
+     * @param  id
+     * @return
+     */
     public DataFlowInfo getDataFlowInfo(long id) {
-        synchronized (lock) {
+        synchronized (LOCK) {
             return dataFlowInfoMap.get(id);
         }
     }
 
-    public interface DataFlowInfoListener {
-        void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
-
-        void updateDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
-
-        void removeDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
-    }
-
-    private class DataFlowsChildrenWatcherListener implements ChildrenWatcherListener {
-
-        @Override
-        public void onChildAdded(ChildData childData) throws Exception {
-            LOG.info("DataFlow Added event retrieved");
-
-            final byte[] data = childData.getData();
-            if (data == null) {
-                return;
+    /**
+     * addDataFlow
+     * 
+     * @param  dataFlowInfo
+     * @throws Exception
+     */
+    public void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+        long dataFlowId = dataFlowInfo.getId();
+        DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId, dataFlowInfo);

Review comment:
       modification of dataFlowInfoMap should be under `LOCK`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] chantccc commented on a change in pull request #2022: [INLONG-1992] sort-flink support configurable loader of getting configuration.

Posted by GitBox <gi...@apache.org>.
chantccc commented on a change in pull request #2022:
URL: https://github.com/apache/incubator-inlong/pull/2022#discussion_r775695059



##########
File path: inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/zookeeper/ZookeeperWatcherUtils.java
##########
@@ -337,4 +377,95 @@ public void childEvent(
             }
         }
     }
+
+    private static class DataFlowsChildrenWatcherListener implements ChildrenWatcherListener {

Review comment:
       ditto




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] luchunliang commented on a change in pull request #2022: [INLONG-1992] sort-flink support configurable loader of getting configuration.

Posted by GitBox <gi...@apache.org>.
luchunliang commented on a change in pull request #2022:
URL: https://github.com/apache/incubator-inlong/pull/2022#discussion_r775779625



##########
File path: inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/zookeeper/ZookeeperWatcherUtils.java
##########
@@ -38,38 +39,74 @@
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.sort.configuration.Configuration;
+import org.apache.inlong.sort.configuration.Constants;
+import org.apache.inlong.sort.meta.MetaManager;
+import org.apache.inlong.sort.meta.MetaManager.DataFlowInfoListener;
+import org.apache.inlong.sort.protocol.DataFlowInfo;
+import org.apache.inlong.sort.protocol.DataFlowStorageInfo;
+import org.apache.inlong.sort.util.ZooKeeperUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ZookeeperWatcher implements AutoCloseable, UnhandledErrorListener {
+import com.google.common.base.Preconditions;
 
-    private static final Logger LOG = LoggerFactory.getLogger(ZookeeperWatcher.class);
+public class ZookeeperWatcherUtils implements AutoCloseable, UnhandledErrorListener {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ZookeeperWatcherUtils.class);
 
     private final Object lock = new Object();
 
     /**
      * Connection to the used ZooKeeper quorum.
      */
-    private final CuratorFramework client;
+    private CuratorFramework client;
 
     /**
      * Curator recipe to watch changes of a specific ZooKeeper node.
      */
     @GuardedBy("lock")
-    private final Map<String, NodeCache> caches;
+    private Map<String, NodeCache> caches;
 
     @GuardedBy("lock")
-    private final Map<String, PathChildrenCache> pathChildrenCaches;
+    private Map<String, PathChildrenCache> pathChildrenCaches;
 
-    private final ConnectionStateListener connectionStateListener =
-            (client, newState) -> handleStateChange(newState);
+    private ConnectionStateListener connectionStateListener = (client, newState) -> handleStateChange(newState);
 
-    public ZookeeperWatcher(CuratorFramework client) {
-        this.client = checkNotNull(client, "CuratorFramework client");
+    /**
+     * open
+     * 
+     * @param  config       command parameters when process start.
+     * @param  metaListener a listener of DataFlowInfo
+     * @throws Exception    any exception
+     */
+    public void open(Configuration config, DataFlowInfoListener metaListener) throws Exception {
+        this.client = ZooKeeperUtils.startCuratorFramework(config);
         this.caches = new HashMap<>();
         this.pathChildrenCaches = new HashMap<>();
         client.getUnhandledErrorListenable().addListener(this);
         client.getConnectionStateListenable().addListener(connectionStateListener);
+        synchronized (MetaManager.LOCK) {

Review comment:
       ZookeeperMetaWatcher implements MetaWatcher.
   ZookeeperWatcherUtils is a basic util class.
   
   chantccc yesterday
   ZookeeperWatcher is a basic util class wrapping zk related operations. It should not implement MetaWatcher.
   Use another class ZookeeperMetaWatcher which implements MetaWatcher instead.

##########
File path: inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/zookeeper/ZookeeperWatcherUtils.java
##########
@@ -38,38 +39,74 @@
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.sort.configuration.Configuration;
+import org.apache.inlong.sort.configuration.Constants;
+import org.apache.inlong.sort.meta.MetaManager;
+import org.apache.inlong.sort.meta.MetaManager.DataFlowInfoListener;
+import org.apache.inlong.sort.protocol.DataFlowInfo;
+import org.apache.inlong.sort.protocol.DataFlowStorageInfo;
+import org.apache.inlong.sort.util.ZooKeeperUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ZookeeperWatcher implements AutoCloseable, UnhandledErrorListener {
+import com.google.common.base.Preconditions;
 
-    private static final Logger LOG = LoggerFactory.getLogger(ZookeeperWatcher.class);
+public class ZookeeperWatcherUtils implements AutoCloseable, UnhandledErrorListener {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ZookeeperWatcherUtils.class);
 
     private final Object lock = new Object();
 
     /**
      * Connection to the used ZooKeeper quorum.
      */
-    private final CuratorFramework client;
+    private CuratorFramework client;
 
     /**
      * Curator recipe to watch changes of a specific ZooKeeper node.
      */
     @GuardedBy("lock")
-    private final Map<String, NodeCache> caches;
+    private Map<String, NodeCache> caches;
 
     @GuardedBy("lock")
-    private final Map<String, PathChildrenCache> pathChildrenCaches;
+    private Map<String, PathChildrenCache> pathChildrenCaches;
 
-    private final ConnectionStateListener connectionStateListener =
-            (client, newState) -> handleStateChange(newState);
+    private ConnectionStateListener connectionStateListener = (client, newState) -> handleStateChange(newState);
 
-    public ZookeeperWatcher(CuratorFramework client) {
-        this.client = checkNotNull(client, "CuratorFramework client");
+    /**
+     * open
+     * 
+     * @param  config       command parameters when process start.
+     * @param  metaListener a listener of DataFlowInfo
+     * @throws Exception    any exception
+     */
+    public void open(Configuration config, DataFlowInfoListener metaListener) throws Exception {
+        this.client = ZooKeeperUtils.startCuratorFramework(config);
         this.caches = new HashMap<>();
         this.pathChildrenCaches = new HashMap<>();
         client.getUnhandledErrorListenable().addListener(this);
         client.getConnectionStateListenable().addListener(connectionStateListener);
+        synchronized (MetaManager.LOCK) {
+            final String dataFlowsWatchingPath = getWatchingPathOfDataFlowsInCluster(
+                    config.getString(Constants.CLUSTER_ID));
+            DataFlowsChildrenWatcherListener dataFlowsChildrenWatcherListener = new DataFlowsChildrenWatcherListener(
+                    this.client, metaListener);
+            this.registerPathChildrenWatcher(
+                    dataFlowsWatchingPath, dataFlowsChildrenWatcherListener, true);
+            final List<ChildData> childData = this.getCurrentPathChildrenDatum(dataFlowsWatchingPath);
+            if (childData != null) {
+                dataFlowsChildrenWatcherListener.onInitialized(childData);
+            }
+        }
+    }
+
+    /**
+     * If you want to change the path rule here, please change ZkTools#getNodePathOfDataFlowStorageInfoInCluster in api
+     * too.
+     */
+    public static String getWatchingPathOfDataFlowsInCluster(String cluster) {

Review comment:
       ZookeeperMetaWatcher implements MetaWatcher.
   ZookeeperWatcherUtils is a basic util class.
   
   chantccc yesterday
   ZookeeperWatcher is a basic util class wrapping zk related operations. It should not implement MetaWatcher.
   Use another class ZookeeperMetaWatcher which implements MetaWatcher instead.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] luchunliang commented on a change in pull request #2022: [INLONG-1992] sort-flink support configurable loader of getting configuration.

Posted by GitBox <gi...@apache.org>.
luchunliang commented on a change in pull request #2022:
URL: https://github.com/apache/incubator-inlong/pull/2022#discussion_r776132298



##########
File path: inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/zookeeper/ZookeeperWatcherUtils.java
##########
@@ -38,38 +39,74 @@
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.sort.configuration.Configuration;
+import org.apache.inlong.sort.configuration.Constants;
+import org.apache.inlong.sort.meta.MetaManager;
+import org.apache.inlong.sort.meta.MetaManager.DataFlowInfoListener;
+import org.apache.inlong.sort.protocol.DataFlowInfo;
+import org.apache.inlong.sort.protocol.DataFlowStorageInfo;
+import org.apache.inlong.sort.util.ZooKeeperUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ZookeeperWatcher implements AutoCloseable, UnhandledErrorListener {
+import com.google.common.base.Preconditions;
 
-    private static final Logger LOG = LoggerFactory.getLogger(ZookeeperWatcher.class);
+public class ZookeeperWatcherUtils implements AutoCloseable, UnhandledErrorListener {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ZookeeperWatcherUtils.class);
 
     private final Object lock = new Object();
 
     /**
      * Connection to the used ZooKeeper quorum.
      */
-    private final CuratorFramework client;
+    private CuratorFramework client;
 
     /**
      * Curator recipe to watch changes of a specific ZooKeeper node.
      */
     @GuardedBy("lock")
-    private final Map<String, NodeCache> caches;
+    private Map<String, NodeCache> caches;
 
     @GuardedBy("lock")
-    private final Map<String, PathChildrenCache> pathChildrenCaches;
+    private Map<String, PathChildrenCache> pathChildrenCaches;
 
-    private final ConnectionStateListener connectionStateListener =
-            (client, newState) -> handleStateChange(newState);
+    private ConnectionStateListener connectionStateListener = (client, newState) -> handleStateChange(newState);
 
-    public ZookeeperWatcher(CuratorFramework client) {
-        this.client = checkNotNull(client, "CuratorFramework client");
+    /**
+     * open
+     * 
+     * @param  config       command parameters when process start.
+     * @param  metaListener a listener of DataFlowInfo
+     * @throws Exception    any exception
+     */
+    public void open(Configuration config, DataFlowInfoListener metaListener) throws Exception {
+        this.client = ZooKeeperUtils.startCuratorFramework(config);
         this.caches = new HashMap<>();
         this.pathChildrenCaches = new HashMap<>();
         client.getUnhandledErrorListenable().addListener(this);
         client.getConnectionStateListenable().addListener(connectionStateListener);
+        synchronized (MetaManager.LOCK) {

Review comment:
       ok, move those codes of about DataFlowInfo to ZookeeperMetaWatcher.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] dockerzhang merged pull request #2022: [INLONG-1992] sort-flink support configurable loader of getting configuration.

Posted by GitBox <gi...@apache.org>.
dockerzhang merged pull request #2022:
URL: https://github.com/apache/incubator-inlong/pull/2022


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] chantccc commented on a change in pull request #2022: [INLONG-1992] sort-flink support configurable loader of getting configuration.

Posted by GitBox <gi...@apache.org>.
chantccc commented on a change in pull request #2022:
URL: https://github.com/apache/incubator-inlong/pull/2022#discussion_r775695332



##########
File path: inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/MetaManager.java
##########
@@ -132,149 +129,116 @@ public void registerDataFlowInfoListener(DataFlowInfoListener dataFlowInfoListen
         LOG.info("Register DataFlowInfoListener successfully");
     }
 
+    /**
+     * getDataFlowInfo
+     * 
+     * @param  id
+     * @return
+     */
     public DataFlowInfo getDataFlowInfo(long id) {
-        synchronized (lock) {
+        synchronized (LOCK) {
             return dataFlowInfoMap.get(id);
         }
     }
 
-    public interface DataFlowInfoListener {
-        void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
-
-        void updateDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
-
-        void removeDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
-    }
-
-    private class DataFlowsChildrenWatcherListener implements ChildrenWatcherListener {
-
-        @Override
-        public void onChildAdded(ChildData childData) throws Exception {
-            LOG.info("DataFlow Added event retrieved");
-
-            final byte[] data = childData.getData();
-            if (data == null) {
-                return;
+    /**
+     * addDataFlow
+     * 
+     * @param  dataFlowInfo
+     * @throws Exception
+     */
+    public void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+        long dataFlowId = dataFlowInfo.getId();
+        DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId, dataFlowInfo);
+        if (oldDataFlowInfo == null) {
+            LOG.info("Try to add dataFlow {}", dataFlowId);
+            for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
+                try {
+                    dataFlowInfoListener.addDataFlow(dataFlowInfo);
+                } catch (Exception e) {
+                    LOG.warn("Error happens when notifying listener data flow added", e);
+                }
             }
-
-            synchronized (lock) {
-                DataFlowStorageInfo dataFlowStorageInfo = objectMapper.readValue(data, DataFlowStorageInfo.class);
-                DataFlowInfo dataFlowInfo = getDataFlowInfo(dataFlowStorageInfo);
-                long dataFlowId = dataFlowInfo.getId();
-
-                DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId, dataFlowInfo);
-                if (oldDataFlowInfo == null) {
-                    LOG.info("Try to add dataFlow {}", dataFlowId);
-                    for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
-                        try {
-                            dataFlowInfoListener.addDataFlow(dataFlowInfo);
-                        } catch (Exception e) {
-                            LOG.warn("Error happens when notifying listener data flow added", e);
-                        }
-                    }
-                } else {
-                    LOG.warn("DataFlow {} should not be exist", dataFlowId);
-                    for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
-                        try {
-                            dataFlowInfoListener.updateDataFlow(dataFlowInfo);
-                        } catch (Exception e) {
-                            LOG.warn("Error happens when notifying listener data flow updated", e);
-                        }
-                    }
+        } else {
+            LOG.warn("DataFlow {} should not be exist", dataFlowId);
+            for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
+                try {
+                    dataFlowInfoListener.updateDataFlow(dataFlowInfo);
+                } catch (Exception e) {
+                    LOG.warn("Error happens when notifying listener data flow updated", e);
                 }
             }
         }
 
-        @Override
-        public void onChildUpdated(ChildData childData) throws Exception {
-            LOG.info("DataFlow Updated event retrieved");
+    }
 
-            final byte[] data = childData.getData();
-            if (data == null) {
-                return;
-            }
+    /**
+     * updateDataFlow
+     * 
+     * @param  dataFlowInfo
+     * @throws Exception
+     */
+    public void updateDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+        long dataFlowId = dataFlowInfo.getId();
 
-            synchronized (lock) {
-                DataFlowStorageInfo dataFlowStorageInfo = objectMapper.readValue(data, DataFlowStorageInfo.class);
-                DataFlowInfo dataFlowInfo = getDataFlowInfo(dataFlowStorageInfo);
-                long dataFlowId = dataFlowInfo.getId();
-
-                DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId, dataFlowInfo);
-                if (oldDataFlowInfo == null) {
-                    LOG.warn("DataFlow {} should already be exist.", dataFlowId);
-                    for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
-                        try {
-                            dataFlowInfoListener.addDataFlow(dataFlowInfo);
-                        } catch (Exception e) {
-                            LOG.warn("Error happens when notifying listener data flow updated", e);
-                        }
-                    }
-                } else {
-                    if (dataFlowInfo.equals(oldDataFlowInfo)) {
-                        LOG.info("DataFlowInfo has not been changed, ignore update.");
-                        return;
-                    }
-
-                    LOG.info("Try to update dataFlow {}.", dataFlowId);
-
-                    for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
-                        try {
-                            dataFlowInfoListener.updateDataFlow(dataFlowInfo);
-                        } catch (Exception e) {
-                            LOG.warn("Error happens when notifying listener data flow updated", e);
-                        }
-                    }
+        DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId, dataFlowInfo);

Review comment:
       ditto




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] chantccc commented on a change in pull request #2022: [INLONG-1992] sort-flink support configurable loader of getting configuration.

Posted by GitBox <gi...@apache.org>.
chantccc commented on a change in pull request #2022:
URL: https://github.com/apache/incubator-inlong/pull/2022#discussion_r775383500



##########
File path: inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/DataFlowsChildrenWatcherListener.java
##########
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.meta;
+
+import java.util.List;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.sort.meta.MetaManager.DataFlowInfoListener;
+import org.apache.inlong.sort.meta.ZookeeperWatcher.ChildrenWatcherListener;
+import org.apache.inlong.sort.protocol.DataFlowInfo;
+import org.apache.inlong.sort.protocol.DataFlowStorageInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * DataFlowsChildrenWatcherListener
+ */
+public class DataFlowsChildrenWatcherListener implements ChildrenWatcherListener {

Review comment:
       This class is only used in ZookeeperMetaWatcher, make it an inner class is better.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] luchunliang commented on a change in pull request #2022: [INLONG-1992] sort-flink support configurable loader of getting configuration.

Posted by GitBox <gi...@apache.org>.
luchunliang commented on a change in pull request #2022:
URL: https://github.com/apache/incubator-inlong/pull/2022#discussion_r775423220



##########
File path: inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/DataFlowsChildrenWatcherListener.java
##########
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.meta;
+
+import java.util.List;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.sort.meta.MetaManager.DataFlowInfoListener;
+import org.apache.inlong.sort.meta.ZookeeperWatcher.ChildrenWatcherListener;
+import org.apache.inlong.sort.protocol.DataFlowInfo;
+import org.apache.inlong.sort.protocol.DataFlowStorageInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * DataFlowsChildrenWatcherListener
+ */
+public class DataFlowsChildrenWatcherListener implements ChildrenWatcherListener {

Review comment:
       ok, make it to an inner class




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] codecov-commenter edited a comment on pull request #2022: [INLONG-1992] sort-flink support configurable loader of getting configuration.

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #2022:
URL: https://github.com/apache/incubator-inlong/pull/2022#issuecomment-996559178


   # [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/2022?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#2022](https://codecov.io/gh/apache/incubator-inlong/pull/2022?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (1d08f0f) into [master](https://codecov.io/gh/apache/incubator-inlong/commit/59d9be6dbf4a7f348f67ebd3acf87640bb859432?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (59d9be6) will **increase** coverage by `0.00%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-inlong/pull/2022/graphs/tree.svg?width=650&height=150&src=pr&token=1EUK92O9K2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-inlong/pull/2022?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@            Coverage Diff            @@
   ##             master    #2022   +/-   ##
   =========================================
     Coverage     12.27%   12.28%           
     Complexity     1157     1157           
   =========================================
     Files           413      413           
     Lines         35225    35215   -10     
     Branches       5542     5542           
   =========================================
     Hits           4325     4325           
   + Misses        30129    30119   -10     
     Partials        771      771           
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-inlong/pull/2022?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../flink/connectors/tubemq/TubemqSourceFunction.java](https://codecov.io/gh/apache/incubator-inlong/pull/2022/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtY29ubmVjdG9ycy90dWJlbXEtY29ubmVjdG9yLWZsaW5rL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9mbGluay9jb25uZWN0b3JzL3R1YmVtcS9UdWJlbXFTb3VyY2VGdW5jdGlvbi5qYXZh) | `0.00% <0.00%> (ø)` | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/2022?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/2022?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [59d9be6...1d08f0f](https://codecov.io/gh/apache/incubator-inlong/pull/2022?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] luchunliang commented on a change in pull request #2022: [INLONG-1992] sort-flink support configurable loader of getting configuration.

Posted by GitBox <gi...@apache.org>.
luchunliang commented on a change in pull request #2022:
URL: https://github.com/apache/incubator-inlong/pull/2022#discussion_r775779945



##########
File path: inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/MetaManager.java
##########
@@ -132,149 +129,116 @@ public void registerDataFlowInfoListener(DataFlowInfoListener dataFlowInfoListen
         LOG.info("Register DataFlowInfoListener successfully");
     }
 
+    /**
+     * getDataFlowInfo
+     * 
+     * @param  id
+     * @return
+     */
     public DataFlowInfo getDataFlowInfo(long id) {
-        synchronized (lock) {
+        synchronized (LOCK) {
             return dataFlowInfoMap.get(id);
         }
     }
 
-    public interface DataFlowInfoListener {
-        void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
-
-        void updateDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
-
-        void removeDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
-    }
-
-    private class DataFlowsChildrenWatcherListener implements ChildrenWatcherListener {
-
-        @Override
-        public void onChildAdded(ChildData childData) throws Exception {
-            LOG.info("DataFlow Added event retrieved");
-
-            final byte[] data = childData.getData();
-            if (data == null) {
-                return;
+    /**
+     * addDataFlow
+     * 
+     * @param  dataFlowInfo
+     * @throws Exception
+     */
+    public void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+        long dataFlowId = dataFlowInfo.getId();
+        DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId, dataFlowInfo);

Review comment:
       add LOCK operation.

##########
File path: inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/MetaManager.java
##########
@@ -132,149 +129,116 @@ public void registerDataFlowInfoListener(DataFlowInfoListener dataFlowInfoListen
         LOG.info("Register DataFlowInfoListener successfully");
     }
 
+    /**
+     * getDataFlowInfo
+     * 
+     * @param  id
+     * @return
+     */
     public DataFlowInfo getDataFlowInfo(long id) {
-        synchronized (lock) {
+        synchronized (LOCK) {
             return dataFlowInfoMap.get(id);
         }
     }
 
-    public interface DataFlowInfoListener {
-        void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
-
-        void updateDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
-
-        void removeDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
-    }
-
-    private class DataFlowsChildrenWatcherListener implements ChildrenWatcherListener {
-
-        @Override
-        public void onChildAdded(ChildData childData) throws Exception {
-            LOG.info("DataFlow Added event retrieved");
-
-            final byte[] data = childData.getData();
-            if (data == null) {
-                return;
+    /**
+     * addDataFlow
+     * 
+     * @param  dataFlowInfo
+     * @throws Exception
+     */
+    public void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+        long dataFlowId = dataFlowInfo.getId();
+        DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId, dataFlowInfo);
+        if (oldDataFlowInfo == null) {
+            LOG.info("Try to add dataFlow {}", dataFlowId);
+            for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
+                try {
+                    dataFlowInfoListener.addDataFlow(dataFlowInfo);
+                } catch (Exception e) {
+                    LOG.warn("Error happens when notifying listener data flow added", e);
+                }
             }
-
-            synchronized (lock) {
-                DataFlowStorageInfo dataFlowStorageInfo = objectMapper.readValue(data, DataFlowStorageInfo.class);
-                DataFlowInfo dataFlowInfo = getDataFlowInfo(dataFlowStorageInfo);
-                long dataFlowId = dataFlowInfo.getId();
-
-                DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId, dataFlowInfo);
-                if (oldDataFlowInfo == null) {
-                    LOG.info("Try to add dataFlow {}", dataFlowId);
-                    for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
-                        try {
-                            dataFlowInfoListener.addDataFlow(dataFlowInfo);
-                        } catch (Exception e) {
-                            LOG.warn("Error happens when notifying listener data flow added", e);
-                        }
-                    }
-                } else {
-                    LOG.warn("DataFlow {} should not be exist", dataFlowId);
-                    for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
-                        try {
-                            dataFlowInfoListener.updateDataFlow(dataFlowInfo);
-                        } catch (Exception e) {
-                            LOG.warn("Error happens when notifying listener data flow updated", e);
-                        }
-                    }
+        } else {
+            LOG.warn("DataFlow {} should not be exist", dataFlowId);
+            for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
+                try {
+                    dataFlowInfoListener.updateDataFlow(dataFlowInfo);
+                } catch (Exception e) {
+                    LOG.warn("Error happens when notifying listener data flow updated", e);
                 }
             }
         }
 
-        @Override
-        public void onChildUpdated(ChildData childData) throws Exception {
-            LOG.info("DataFlow Updated event retrieved");
+    }
 
-            final byte[] data = childData.getData();
-            if (data == null) {
-                return;
-            }
+    /**
+     * updateDataFlow
+     * 
+     * @param  dataFlowInfo
+     * @throws Exception
+     */
+    public void updateDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+        long dataFlowId = dataFlowInfo.getId();
 
-            synchronized (lock) {
-                DataFlowStorageInfo dataFlowStorageInfo = objectMapper.readValue(data, DataFlowStorageInfo.class);
-                DataFlowInfo dataFlowInfo = getDataFlowInfo(dataFlowStorageInfo);
-                long dataFlowId = dataFlowInfo.getId();
-
-                DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId, dataFlowInfo);
-                if (oldDataFlowInfo == null) {
-                    LOG.warn("DataFlow {} should already be exist.", dataFlowId);
-                    for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
-                        try {
-                            dataFlowInfoListener.addDataFlow(dataFlowInfo);
-                        } catch (Exception e) {
-                            LOG.warn("Error happens when notifying listener data flow updated", e);
-                        }
-                    }
-                } else {
-                    if (dataFlowInfo.equals(oldDataFlowInfo)) {
-                        LOG.info("DataFlowInfo has not been changed, ignore update.");
-                        return;
-                    }
-
-                    LOG.info("Try to update dataFlow {}.", dataFlowId);
-
-                    for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
-                        try {
-                            dataFlowInfoListener.updateDataFlow(dataFlowInfo);
-                        } catch (Exception e) {
-                            LOG.warn("Error happens when notifying listener data flow updated", e);
-                        }
-                    }
+        DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId, dataFlowInfo);

Review comment:
       add LOCK operation.

##########
File path: inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/MetaManager.java
##########
@@ -132,149 +129,116 @@ public void registerDataFlowInfoListener(DataFlowInfoListener dataFlowInfoListen
         LOG.info("Register DataFlowInfoListener successfully");
     }
 
+    /**
+     * getDataFlowInfo
+     * 
+     * @param  id
+     * @return
+     */
     public DataFlowInfo getDataFlowInfo(long id) {
-        synchronized (lock) {
+        synchronized (LOCK) {
             return dataFlowInfoMap.get(id);
         }
     }
 
-    public interface DataFlowInfoListener {
-        void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
-
-        void updateDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
-
-        void removeDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
-    }
-
-    private class DataFlowsChildrenWatcherListener implements ChildrenWatcherListener {
-
-        @Override
-        public void onChildAdded(ChildData childData) throws Exception {
-            LOG.info("DataFlow Added event retrieved");
-
-            final byte[] data = childData.getData();
-            if (data == null) {
-                return;
+    /**
+     * addDataFlow
+     * 
+     * @param  dataFlowInfo
+     * @throws Exception
+     */
+    public void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+        long dataFlowId = dataFlowInfo.getId();
+        DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId, dataFlowInfo);
+        if (oldDataFlowInfo == null) {
+            LOG.info("Try to add dataFlow {}", dataFlowId);
+            for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
+                try {
+                    dataFlowInfoListener.addDataFlow(dataFlowInfo);
+                } catch (Exception e) {
+                    LOG.warn("Error happens when notifying listener data flow added", e);
+                }
             }
-
-            synchronized (lock) {
-                DataFlowStorageInfo dataFlowStorageInfo = objectMapper.readValue(data, DataFlowStorageInfo.class);
-                DataFlowInfo dataFlowInfo = getDataFlowInfo(dataFlowStorageInfo);
-                long dataFlowId = dataFlowInfo.getId();
-
-                DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId, dataFlowInfo);
-                if (oldDataFlowInfo == null) {
-                    LOG.info("Try to add dataFlow {}", dataFlowId);
-                    for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
-                        try {
-                            dataFlowInfoListener.addDataFlow(dataFlowInfo);
-                        } catch (Exception e) {
-                            LOG.warn("Error happens when notifying listener data flow added", e);
-                        }
-                    }
-                } else {
-                    LOG.warn("DataFlow {} should not be exist", dataFlowId);
-                    for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
-                        try {
-                            dataFlowInfoListener.updateDataFlow(dataFlowInfo);
-                        } catch (Exception e) {
-                            LOG.warn("Error happens when notifying listener data flow updated", e);
-                        }
-                    }
+        } else {
+            LOG.warn("DataFlow {} should not be exist", dataFlowId);
+            for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
+                try {
+                    dataFlowInfoListener.updateDataFlow(dataFlowInfo);
+                } catch (Exception e) {
+                    LOG.warn("Error happens when notifying listener data flow updated", e);
                 }
             }
         }
 
-        @Override
-        public void onChildUpdated(ChildData childData) throws Exception {
-            LOG.info("DataFlow Updated event retrieved");
+    }
 
-            final byte[] data = childData.getData();
-            if (data == null) {
-                return;
-            }
+    /**
+     * updateDataFlow
+     * 
+     * @param  dataFlowInfo
+     * @throws Exception
+     */
+    public void updateDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+        long dataFlowId = dataFlowInfo.getId();
 
-            synchronized (lock) {
-                DataFlowStorageInfo dataFlowStorageInfo = objectMapper.readValue(data, DataFlowStorageInfo.class);
-                DataFlowInfo dataFlowInfo = getDataFlowInfo(dataFlowStorageInfo);
-                long dataFlowId = dataFlowInfo.getId();
-
-                DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId, dataFlowInfo);
-                if (oldDataFlowInfo == null) {
-                    LOG.warn("DataFlow {} should already be exist.", dataFlowId);
-                    for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
-                        try {
-                            dataFlowInfoListener.addDataFlow(dataFlowInfo);
-                        } catch (Exception e) {
-                            LOG.warn("Error happens when notifying listener data flow updated", e);
-                        }
-                    }
-                } else {
-                    if (dataFlowInfo.equals(oldDataFlowInfo)) {
-                        LOG.info("DataFlowInfo has not been changed, ignore update.");
-                        return;
-                    }
-
-                    LOG.info("Try to update dataFlow {}.", dataFlowId);
-
-                    for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
-                        try {
-                            dataFlowInfoListener.updateDataFlow(dataFlowInfo);
-                        } catch (Exception e) {
-                            LOG.warn("Error happens when notifying listener data flow updated", e);
-                        }
-                    }
+        DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId, dataFlowInfo);
+        if (oldDataFlowInfo == null) {
+            LOG.warn("DataFlow {} should already be exist.", dataFlowId);
+            for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
+                try {
+                    dataFlowInfoListener.addDataFlow(dataFlowInfo);
+                } catch (Exception e) {
+                    LOG.warn("Error happens when notifying listener data flow updated", e);
                 }
             }
-        }
-
-        @Override
-        public void onChildRemoved(ChildData childData) throws Exception {
-            LOG.info("DataFlow Removed event retrieved");
-
-            final byte[] data = childData.getData();
-            if (data == null) {
+        } else {
+            if (dataFlowInfo.equals(oldDataFlowInfo)) {
+                LOG.info("DataFlowInfo has not been changed, ignore update.");
                 return;
             }
 
-            synchronized (lock) {
-                DataFlowStorageInfo dataFlowStorageInfo = objectMapper.readValue(data, DataFlowStorageInfo.class);
-                DataFlowInfo dataFlowInfo = getDataFlowInfo(dataFlowStorageInfo);
-                long dataFlowId = dataFlowInfo.getId();
-
-                dataFlowInfoMap.remove(dataFlowId);
-                for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
-                    try {
-                        dataFlowInfoListener.removeDataFlow(dataFlowInfo);
-                    } catch (Exception e) {
-                        LOG.warn("Error happens when notifying listener data flow deleted", e);
-                    }
+            LOG.info("Try to update dataFlow {}.", dataFlowId);
+
+            for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
+                try {
+                    dataFlowInfoListener.updateDataFlow(dataFlowInfo);
+                } catch (Exception e) {
+                    LOG.warn("Error happens when notifying listener data flow updated", e);
                 }
             }
         }
 
-        @Override
-        public void onInitialized(List<ChildData> childData) throws Exception {
-            LOG.info("Initialized event retrieved");
+    }
 
-            for (ChildData singleChildData : childData) {
-                onChildAdded(singleChildData);
+    /**
+     * removeDataFlow
+     * 
+     * @param  dataFlowInfo
+     * @throws Exception
+     */
+    public void removeDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+        long dataFlowId = dataFlowInfo.getId();
+
+        dataFlowInfoMap.remove(dataFlowId);

Review comment:
       add LOCK operation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] luchunliang commented on a change in pull request #2022: [INLONG-1992] sort-flink support configurable loader of getting configuration.

Posted by GitBox <gi...@apache.org>.
luchunliang commented on a change in pull request #2022:
URL: https://github.com/apache/incubator-inlong/pull/2022#discussion_r775779767



##########
File path: inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/zookeeper/ZookeeperWatcherUtils.java
##########
@@ -337,4 +377,95 @@ public void childEvent(
             }
         }
     }
+
+    private static class DataFlowsChildrenWatcherListener implements ChildrenWatcherListener {

Review comment:
       ZookeeperMetaWatcher implements MetaWatcher.
   ZookeeperWatcherUtils is a basic util class.
   
   chantccc yesterday
   ZookeeperWatcher is a basic util class wrapping zk related operations. It should not implement MetaWatcher.
   Use another class ZookeeperMetaWatcher which implements MetaWatcher instead.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] chantccc commented on a change in pull request #2022: [INLONG-1992] sort-flink support configurable loader of getting configuration.

Posted by GitBox <gi...@apache.org>.
chantccc commented on a change in pull request #2022:
URL: https://github.com/apache/incubator-inlong/pull/2022#discussion_r775383437



##########
File path: inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/MetaWatcher.java
##########
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.meta;
+
+import org.apache.inlong.sort.configuration.Configuration;
+import org.apache.inlong.sort.meta.MetaManager.DataFlowInfoListener;
+
+/**
+ * 
+ * MetaWatcher
+ */
+public interface MetaWatcher {
+
+    /**
+     * open
+     * 
+     * @param  config

Review comment:
       descriptions of param are empty.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] luchunliang commented on a change in pull request #2022: [INLONG-1992] sort-flink support configurable loader of getting configuration.

Posted by GitBox <gi...@apache.org>.
luchunliang commented on a change in pull request #2022:
URL: https://github.com/apache/incubator-inlong/pull/2022#discussion_r776135712



##########
File path: inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/zookeeper/ZookeeperWatcherUtils.java
##########
@@ -38,38 +39,74 @@
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.sort.configuration.Configuration;
+import org.apache.inlong.sort.configuration.Constants;
+import org.apache.inlong.sort.meta.MetaManager;
+import org.apache.inlong.sort.meta.MetaManager.DataFlowInfoListener;
+import org.apache.inlong.sort.protocol.DataFlowInfo;
+import org.apache.inlong.sort.protocol.DataFlowStorageInfo;
+import org.apache.inlong.sort.util.ZooKeeperUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ZookeeperWatcher implements AutoCloseable, UnhandledErrorListener {
+import com.google.common.base.Preconditions;
 
-    private static final Logger LOG = LoggerFactory.getLogger(ZookeeperWatcher.class);
+public class ZookeeperWatcherUtils implements AutoCloseable, UnhandledErrorListener {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ZookeeperWatcherUtils.class);
 
     private final Object lock = new Object();
 
     /**
      * Connection to the used ZooKeeper quorum.
      */
-    private final CuratorFramework client;
+    private CuratorFramework client;
 
     /**
      * Curator recipe to watch changes of a specific ZooKeeper node.
      */
     @GuardedBy("lock")
-    private final Map<String, NodeCache> caches;
+    private Map<String, NodeCache> caches;
 
     @GuardedBy("lock")
-    private final Map<String, PathChildrenCache> pathChildrenCaches;
+    private Map<String, PathChildrenCache> pathChildrenCaches;
 
-    private final ConnectionStateListener connectionStateListener =
-            (client, newState) -> handleStateChange(newState);
+    private ConnectionStateListener connectionStateListener = (client, newState) -> handleStateChange(newState);
 
-    public ZookeeperWatcher(CuratorFramework client) {
-        this.client = checkNotNull(client, "CuratorFramework client");
+    /**
+     * open
+     * 
+     * @param  config       command parameters when process start.
+     * @param  metaListener a listener of DataFlowInfo
+     * @throws Exception    any exception
+     */
+    public void open(Configuration config, DataFlowInfoListener metaListener) throws Exception {
+        this.client = ZooKeeperUtils.startCuratorFramework(config);
         this.caches = new HashMap<>();
         this.pathChildrenCaches = new HashMap<>();
         client.getUnhandledErrorListenable().addListener(this);
         client.getConnectionStateListenable().addListener(connectionStateListener);
+        synchronized (MetaManager.LOCK) {
+            final String dataFlowsWatchingPath = getWatchingPathOfDataFlowsInCluster(
+                    config.getString(Constants.CLUSTER_ID));
+            DataFlowsChildrenWatcherListener dataFlowsChildrenWatcherListener = new DataFlowsChildrenWatcherListener(
+                    this.client, metaListener);
+            this.registerPathChildrenWatcher(
+                    dataFlowsWatchingPath, dataFlowsChildrenWatcherListener, true);
+            final List<ChildData> childData = this.getCurrentPathChildrenDatum(dataFlowsWatchingPath);
+            if (childData != null) {
+                dataFlowsChildrenWatcherListener.onInitialized(childData);
+            }
+        }
+    }
+
+    /**
+     * If you want to change the path rule here, please change ZkTools#getNodePathOfDataFlowStorageInfoInCluster in api
+     * too.
+     */
+    public static String getWatchingPathOfDataFlowsInCluster(String cluster) {

Review comment:
       ok, move those codes of about DataFlowInfo to ZookeeperMetaWatcher.

##########
File path: inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/zookeeper/ZookeeperWatcherUtils.java
##########
@@ -337,4 +377,95 @@ public void childEvent(
             }
         }
     }
+
+    private static class DataFlowsChildrenWatcherListener implements ChildrenWatcherListener {

Review comment:
       ok, move those codes of about DataFlowInfo to ZookeeperMetaWatcher.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] codecov-commenter edited a comment on pull request #2022: [INLONG-1992] sort-flink support configurable loader of getting configuration.

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #2022:
URL: https://github.com/apache/incubator-inlong/pull/2022#issuecomment-996559178


   # [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/2022?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#2022](https://codecov.io/gh/apache/incubator-inlong/pull/2022?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (bf5561e) into [master](https://codecov.io/gh/apache/incubator-inlong/commit/59d9be6dbf4a7f348f67ebd3acf87640bb859432?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (59d9be6) will **decrease** coverage by `0.01%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-inlong/pull/2022/graphs/tree.svg?width=650&height=150&src=pr&token=1EUK92O9K2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-inlong/pull/2022?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #2022      +/-   ##
   ============================================
   - Coverage     12.27%   12.26%   -0.02%     
     Complexity     1157     1157              
   ============================================
     Files           413      413              
     Lines         35225    35215      -10     
     Branches       5542     5542              
   ============================================
   - Hits           4325     4318       -7     
   + Misses        30129    30125       -4     
   - Partials        771      772       +1     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-inlong/pull/2022?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../java/org/apache/flume/sink/tubemq/TubemqSink.java](https://codecov.io/gh/apache/incubator-inlong/pull/2022/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtY29ubmVjdG9ycy90dWJlbXEtY29ubmVjdG9yLWZsdW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9mbHVtZS9zaW5rL3R1YmVtcS9UdWJlbXFTaW5rLmphdmE=) | `51.42% <0.00%> (-4.00%)` | :arrow_down: |
   | [.../flink/connectors/tubemq/TubemqSourceFunction.java](https://codecov.io/gh/apache/incubator-inlong/pull/2022/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-aW5sb25nLXR1YmVtcS90dWJlbXEtY29ubmVjdG9ycy90dWJlbXEtY29ubmVjdG9yLWZsaW5rL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9mbGluay9jb25uZWN0b3JzL3R1YmVtcS9UdWJlbXFTb3VyY2VGdW5jdGlvbi5qYXZh) | `0.00% <0.00%> (ø)` | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/2022?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/2022?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [59d9be6...bf5561e](https://codecov.io/gh/apache/incubator-inlong/pull/2022?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] chantccc commented on a change in pull request #2022: [INLONG-1992] sort-flink support configurable loader of getting configuration.

Posted by GitBox <gi...@apache.org>.
chantccc commented on a change in pull request #2022:
URL: https://github.com/apache/incubator-inlong/pull/2022#discussion_r776124907



##########
File path: inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/zookeeper/ZookeeperWatcherUtils.java
##########
@@ -38,38 +39,74 @@
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.sort.configuration.Configuration;
+import org.apache.inlong.sort.configuration.Constants;
+import org.apache.inlong.sort.meta.MetaManager;
+import org.apache.inlong.sort.meta.MetaManager.DataFlowInfoListener;
+import org.apache.inlong.sort.protocol.DataFlowInfo;
+import org.apache.inlong.sort.protocol.DataFlowStorageInfo;
+import org.apache.inlong.sort.util.ZooKeeperUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ZookeeperWatcher implements AutoCloseable, UnhandledErrorListener {
+import com.google.common.base.Preconditions;
 
-    private static final Logger LOG = LoggerFactory.getLogger(ZookeeperWatcher.class);
+public class ZookeeperWatcherUtils implements AutoCloseable, UnhandledErrorListener {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ZookeeperWatcherUtils.class);
 
     private final Object lock = new Object();
 
     /**
      * Connection to the used ZooKeeper quorum.
      */
-    private final CuratorFramework client;
+    private CuratorFramework client;
 
     /**
      * Curator recipe to watch changes of a specific ZooKeeper node.
      */
     @GuardedBy("lock")
-    private final Map<String, NodeCache> caches;
+    private Map<String, NodeCache> caches;
 
     @GuardedBy("lock")
-    private final Map<String, PathChildrenCache> pathChildrenCaches;
+    private Map<String, PathChildrenCache> pathChildrenCaches;
 
-    private final ConnectionStateListener connectionStateListener =
-            (client, newState) -> handleStateChange(newState);
+    private ConnectionStateListener connectionStateListener = (client, newState) -> handleStateChange(newState);
 
-    public ZookeeperWatcher(CuratorFramework client) {
-        this.client = checkNotNull(client, "CuratorFramework client");
+    /**
+     * open
+     * 
+     * @param  config       command parameters when process start.
+     * @param  metaListener a listener of DataFlowInfo
+     * @throws Exception    any exception
+     */
+    public void open(Configuration config, DataFlowInfoListener metaListener) throws Exception {
+        this.client = ZooKeeperUtils.startCuratorFramework(config);
         this.caches = new HashMap<>();
         this.pathChildrenCaches = new HashMap<>();
         client.getUnhandledErrorListenable().addListener(this);
         client.getConnectionStateListenable().addListener(connectionStateListener);
+        synchronized (MetaManager.LOCK) {

Review comment:
       Basic util class means  that ZookeeperWatcherUtils is just a wrapper of curator, all business codes should not be placed in it.
   The watch of dataflow should be placed in ZookeeperMetaWatcher(in our case,  data of dataflow is called meta)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org