You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/12/29 04:51:15 UTC

[incubator-inlong] branch master updated: [INLONG-1992] sort-flink support configurable loader of getting configuration. (#2022)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ead0b9e  [INLONG-1992] sort-flink support configurable loader of getting configuration. (#2022)
ead0b9e is described below

commit ead0b9e692c850ded4c8ba30bf9bbafcc0927fd9
Author: 卢春亮 <94...@qq.com>
AuthorDate: Wed Dec 29 12:51:06 2021 +0800

    [INLONG-1992] sort-flink support configurable loader of getting configuration. (#2022)
---
 .../inlong/sort/meta/MetaDataFlowInfoListener.java |  73 +++++
 .../org/apache/inlong/sort/meta/MetaManager.java   | 298 ++++++++++-----------
 .../org/apache/inlong/sort/meta/MetaWatcher.java   |  44 +++
 .../sort/meta/zookeeper/ZookeeperMetaWatcher.java  | 167 ++++++++++++
 .../zookeeper/ZookeeperWatcherUtils.java}          |  97 ++++---
 .../sort/formats/base}/TableFormatUtilsTest.java   |   0
 6 files changed, 486 insertions(+), 193 deletions(-)

diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/MetaDataFlowInfoListener.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/MetaDataFlowInfoListener.java
new file mode 100644
index 0000000..7bfdac9
--- /dev/null
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/MetaDataFlowInfoListener.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.meta;
+
+import org.apache.inlong.sort.meta.MetaManager.DataFlowInfoListener;
+import org.apache.inlong.sort.protocol.DataFlowInfo;
+
+/**
+ * 
+ * MetaDataFlowInfoListener
+ */
+public class MetaDataFlowInfoListener implements DataFlowInfoListener {
+
+    private MetaManager manager;
+
+    /**
+     * Constructor
+     * 
+     * @param manager
+     */
+    public MetaDataFlowInfoListener(MetaManager manager) {
+        this.manager = manager;
+    }
+
+    /**
+     * addDataFlow
+     * 
+     * @param  dataFlowInfo
+     * @throws Exception
+     */
+    @Override
+    public void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+        this.manager.addDataFlow(dataFlowInfo);
+    }
+
+    /**
+     * updateDataFlow
+     * 
+     * @param  dataFlowInfo
+     * @throws Exception
+     */
+    @Override
+    public void updateDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+        this.manager.updateDataFlow(dataFlowInfo);
+    }
+
+    /**
+     * removeDataFlow
+     * 
+     * @param  dataFlowInfo
+     * @throws Exception
+     */
+    @Override
+    public void removeDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+        this.manager.removeDataFlow(dataFlowInfo);
+    }
+
+}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/MetaManager.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/MetaManager.java
index 409daa6..0961ffe 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/MetaManager.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/MetaManager.java
@@ -17,51 +17,50 @@
 
 package org.apache.inlong.sort.meta;
 
-import org.apache.inlong.sort.configuration.Configuration;
-import org.apache.inlong.sort.configuration.Constants;
-import org.apache.inlong.sort.protocol.DataFlowInfo;
-import org.apache.inlong.sort.protocol.DataFlowStorageInfo;
-import org.apache.inlong.sort.util.ZooKeeperUtils;
-import org.apache.inlong.sort.util.ZookeeperWatcher;
-import org.apache.inlong.sort.util.ZookeeperWatcher.ChildrenWatcherListener;
+import static org.apache.inlong.sort.configuration.ConfigOptions.key;
+
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
 import javax.annotation.concurrent.GuardedBy;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.cache.ChildData;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.inlong.sort.configuration.ConfigOption;
+import org.apache.inlong.sort.configuration.Configuration;
+import org.apache.inlong.sort.meta.zookeeper.ZookeeperMetaWatcher;
+import org.apache.inlong.sort.protocol.DataFlowInfo;
+import org.objenesis.instantiator.util.ClassUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * This is the manager of meta. It's a singleton and shared by many components from different
- * threads. So it's thread-safe.
+ * This is the manager of meta. It's a singleton and shared by many components from different threads. So it's
+ * thread-safe.
  *
  */
 public class MetaManager implements AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(MetaManager.class);
 
-    private static MetaManager instance;
+    public static final ConfigOption<String> META_WATCHER_TYPE = key("meta.watcher.type")
+            .defaultValue(ZookeeperMetaWatcher.class.getName())
+            .withDescription("Defines the meta watcher class name.");
 
-    private static final Object lock = new Object();
-
-    private final CuratorFramework zookeeperClient;
+    private static MetaManager instance;
 
-    private final ZookeeperWatcher zookeeperWatcher;
+    public static final Object LOCK = new Object();
 
-    private final ObjectMapper objectMapper;
+    private MetaWatcher watcher;
 
     @GuardedBy("lock")
     private final Map<Long, DataFlowInfo> dataFlowInfoMap;
 
     @GuardedBy("lock")
-    private final List<DataFlowInfoListener>  dataFlowInfoListeners;
+    private final List<DataFlowInfoListener> dataFlowInfoListeners;
 
     public static MetaManager getInstance(Configuration config) throws Exception {
-        synchronized (lock) {
+        synchronized (LOCK) {
             if (instance == null) {
                 instance = new MetaManager(config);
                 instance.open(config);
@@ -71,7 +70,7 @@ public class MetaManager implements AutoCloseable {
     }
 
     public static void release() throws Exception {
-        synchronized (lock) {
+        synchronized (LOCK) {
             if (instance != null) {
                 instance.close();
                 instance = null;
@@ -80,45 +79,48 @@ public class MetaManager implements AutoCloseable {
     }
 
     /**
-     * If you want to change the path rule here, please change
-     * ZkTools#getNodePathOfDataFlowStorageInfoInCluster in api too.
+     * Constructor
+     * 
+     * @param config process start parameters
      */
-    public static String getWatchingPathOfDataFlowsInCluster(String cluster) {
-        return "/clusters/" + cluster + "/dataflows";
-    }
-
     private MetaManager(Configuration config) {
-        zookeeperClient = ZooKeeperUtils.startCuratorFramework(config);
-        zookeeperWatcher = new ZookeeperWatcher(zookeeperClient);
-        objectMapper = new ObjectMapper();
-        dataFlowInfoMap = new HashMap<>();
+        dataFlowInfoMap = new ConcurrentHashMap<>();
         dataFlowInfoListeners = new ArrayList<>();
     }
 
+    /**
+     * open
+     * 
+     * @param  config
+     * @throws Exception
+     */
     private void open(Configuration config) throws Exception {
-        synchronized (lock) {
-            final String dataFlowsWatchingPath
-                    = getWatchingPathOfDataFlowsInCluster(config.getString(Constants.CLUSTER_ID));
-            final DataFlowsChildrenWatcherListener dataFlowsChildrenWatcherListener
-                    = new DataFlowsChildrenWatcherListener();
-            zookeeperWatcher.registerPathChildrenWatcher(
-                    dataFlowsWatchingPath, dataFlowsChildrenWatcherListener, true);
-            final List<ChildData> childData = zookeeperWatcher.getCurrentPathChildrenDatum(dataFlowsWatchingPath);
-            if (childData != null) {
-                dataFlowsChildrenWatcherListener.onInitialized(childData);
-            }
-        }
+        String watcherClass = config.getString(META_WATCHER_TYPE);
+        this.watcher = (MetaWatcher) ClassUtils.newInstance(Class.forName(watcherClass));
+        this.watcher.open(config, new MetaDataFlowInfoListener(this));
     }
 
+    /**
+     * close
+     * 
+     * @throws Exception
+     */
     public void close() throws Exception {
-        synchronized (lock) {
-            zookeeperWatcher.close();
-            zookeeperClient.close();
+        synchronized (LOCK) {
+            if (watcher != null) {
+                watcher.close();
+            }
         }
     }
 
+    /**
+     * registerDataFlowInfoListener
+     * 
+     * @param  dataFlowInfoListener
+     * @throws Exception
+     */
     public void registerDataFlowInfoListener(DataFlowInfoListener dataFlowInfoListener) throws Exception {
-        synchronized (lock) {
+        synchronized (LOCK) {
             dataFlowInfoListeners.add(dataFlowInfoListener);
             for (DataFlowInfo dataFlowInfo : dataFlowInfoMap.values()) {
                 try {
@@ -132,149 +134,119 @@ public class MetaManager implements AutoCloseable {
         LOG.info("Register DataFlowInfoListener successfully");
     }
 
+    /**
+     * getDataFlowInfo
+     * 
+     * @param  id
+     * @return
+     */
     public DataFlowInfo getDataFlowInfo(long id) {
-        synchronized (lock) {
+        synchronized (LOCK) {
             return dataFlowInfoMap.get(id);
         }
     }
 
-    public interface DataFlowInfoListener {
-        void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
-
-        void updateDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
-
-        void removeDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
-    }
-
-    private class DataFlowsChildrenWatcherListener implements ChildrenWatcherListener {
-
-        @Override
-        public void onChildAdded(ChildData childData) throws Exception {
-            LOG.info("DataFlow Added event retrieved");
-
-            final byte[] data = childData.getData();
-            if (data == null) {
-                return;
-            }
-
-            synchronized (lock) {
-                DataFlowStorageInfo dataFlowStorageInfo = objectMapper.readValue(data, DataFlowStorageInfo.class);
-                DataFlowInfo dataFlowInfo = getDataFlowInfo(dataFlowStorageInfo);
-                long dataFlowId = dataFlowInfo.getId();
-
-                DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId, dataFlowInfo);
-                if (oldDataFlowInfo == null) {
-                    LOG.info("Try to add dataFlow {}", dataFlowId);
-                    for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
-                        try {
-                            dataFlowInfoListener.addDataFlow(dataFlowInfo);
-                        } catch (Exception e) {
-                            LOG.warn("Error happens when notifying listener data flow added", e);
-                        }
+    /**
+     * addDataFlow
+     * 
+     * @param  dataFlowInfo
+     * @throws Exception
+     */
+    public void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+        synchronized (LOCK) {
+            long dataFlowId = dataFlowInfo.getId();
+            DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId, dataFlowInfo);
+            if (oldDataFlowInfo == null) {
+                LOG.info("Try to add dataFlow {}", dataFlowId);
+                for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
+                    try {
+                        dataFlowInfoListener.addDataFlow(dataFlowInfo);
+                    } catch (Exception e) {
+                        LOG.warn("Error happens when notifying listener data flow added", e);
                     }
-                } else {
-                    LOG.warn("DataFlow {} should not be exist", dataFlowId);
-                    for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
-                        try {
-                            dataFlowInfoListener.updateDataFlow(dataFlowInfo);
-                        } catch (Exception e) {
-                            LOG.warn("Error happens when notifying listener data flow updated", e);
-                        }
+                }
+            } else {
+                LOG.warn("DataFlow {} should not be exist", dataFlowId);
+                for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
+                    try {
+                        dataFlowInfoListener.updateDataFlow(dataFlowInfo);
+                    } catch (Exception e) {
+                        LOG.warn("Error happens when notifying listener data flow updated", e);
                     }
                 }
             }
         }
+    }
 
-        @Override
-        public void onChildUpdated(ChildData childData) throws Exception {
-            LOG.info("DataFlow Updated event retrieved");
-
-            final byte[] data = childData.getData();
-            if (data == null) {
-                return;
-            }
-
-            synchronized (lock) {
-                DataFlowStorageInfo dataFlowStorageInfo = objectMapper.readValue(data, DataFlowStorageInfo.class);
-                DataFlowInfo dataFlowInfo = getDataFlowInfo(dataFlowStorageInfo);
-                long dataFlowId = dataFlowInfo.getId();
-
-                DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId, dataFlowInfo);
-                if (oldDataFlowInfo == null) {
-                    LOG.warn("DataFlow {} should already be exist.", dataFlowId);
-                    for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
-                        try {
-                            dataFlowInfoListener.addDataFlow(dataFlowInfo);
-                        } catch (Exception e) {
-                            LOG.warn("Error happens when notifying listener data flow updated", e);
-                        }
-                    }
-                } else {
-                    if (dataFlowInfo.equals(oldDataFlowInfo)) {
-                        LOG.info("DataFlowInfo has not been changed, ignore update.");
-                        return;
-                    }
-
-                    LOG.info("Try to update dataFlow {}.", dataFlowId);
+    /**
+     * updateDataFlow
+     * 
+     * @param  dataFlowInfo
+     * @throws Exception
+     */
+    public void updateDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+        synchronized (LOCK) {
+            long dataFlowId = dataFlowInfo.getId();
 
-                    for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
-                        try {
-                            dataFlowInfoListener.updateDataFlow(dataFlowInfo);
-                        } catch (Exception e) {
-                            LOG.warn("Error happens when notifying listener data flow updated", e);
-                        }
+            DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId, dataFlowInfo);
+            if (oldDataFlowInfo == null) {
+                LOG.warn("DataFlow {} should already be exist.", dataFlowId);
+                for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
+                    try {
+                        dataFlowInfoListener.addDataFlow(dataFlowInfo);
+                    } catch (Exception e) {
+                        LOG.warn("Error happens when notifying listener data flow updated", e);
                     }
                 }
-            }
-        }
-
-        @Override
-        public void onChildRemoved(ChildData childData) throws Exception {
-            LOG.info("DataFlow Removed event retrieved");
-
-            final byte[] data = childData.getData();
-            if (data == null) {
-                return;
-            }
+            } else {
+                if (dataFlowInfo.equals(oldDataFlowInfo)) {
+                    LOG.info("DataFlowInfo has not been changed, ignore update.");
+                    return;
+                }
 
-            synchronized (lock) {
-                DataFlowStorageInfo dataFlowStorageInfo = objectMapper.readValue(data, DataFlowStorageInfo.class);
-                DataFlowInfo dataFlowInfo = getDataFlowInfo(dataFlowStorageInfo);
-                long dataFlowId = dataFlowInfo.getId();
+                LOG.info("Try to update dataFlow {}.", dataFlowId);
 
-                dataFlowInfoMap.remove(dataFlowId);
                 for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
                     try {
-                        dataFlowInfoListener.removeDataFlow(dataFlowInfo);
+                        dataFlowInfoListener.updateDataFlow(dataFlowInfo);
                     } catch (Exception e) {
-                        LOG.warn("Error happens when notifying listener data flow deleted", e);
+                        LOG.warn("Error happens when notifying listener data flow updated", e);
                     }
                 }
             }
         }
+    }
 
-        @Override
-        public void onInitialized(List<ChildData> childData) throws Exception {
-            LOG.info("Initialized event retrieved");
+    /**
+     * removeDataFlow
+     * 
+     * @param  dataFlowInfo
+     * @throws Exception
+     */
+    public void removeDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+        synchronized (LOCK) {
+            long dataFlowId = dataFlowInfo.getId();
 
-            for (ChildData singleChildData : childData) {
-                onChildAdded(singleChildData);
+            dataFlowInfoMap.remove(dataFlowId);
+            for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
+                try {
+                    dataFlowInfoListener.removeDataFlow(dataFlowInfo);
+                } catch (Exception e) {
+                    LOG.warn("Error happens when notifying listener data flow deleted", e);
+                }
             }
         }
     }
 
-    private DataFlowInfo getDataFlowInfo(DataFlowStorageInfo dataFlowStorageInfo) throws Exception {
-        switch (dataFlowStorageInfo.getStorageType()) {
-            case ZK:
-                String zkPath = dataFlowStorageInfo.getPath();
-                byte[] data = zookeeperClient.getData().forPath(zkPath);
-                return objectMapper.readValue(data, DataFlowInfo.class);
-            case HDFS:
-                throw new IllegalArgumentException("HDFS dataFlow storage type not supported yet!");
-            default:
-                throw new IllegalArgumentException("Unsupported dataFlow storage type "
-                        + dataFlowStorageInfo.getStorageType());
-        }
-    }
+    /**
+     * MetaManager DataFlowInfoListener
+     */
+    public interface DataFlowInfoListener {
+
+        void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
 
+        void updateDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
+
+        void removeDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
+    }
 }
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/MetaWatcher.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/MetaWatcher.java
new file mode 100644
index 0000000..cc85902
--- /dev/null
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/MetaWatcher.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.meta;
+
+import org.apache.inlong.sort.configuration.Configuration;
+import org.apache.inlong.sort.meta.MetaManager.DataFlowInfoListener;
+
+/**
+ * 
+ * MetaWatcher
+ */
+public interface MetaWatcher {
+
+    /**
+     * open
+     * 
+     * @param  config       command parameters when process start.
+     * @param  metaListener a listener of DataFlowInfo
+     * @throws Exception    any exception
+     */
+    public void open(Configuration config, DataFlowInfoListener metaListener) throws Exception;
+
+    /**
+     * close
+     * 
+     * @throws Exception
+     */
+    public void close() throws Exception;
+}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/zookeeper/ZookeeperMetaWatcher.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/zookeeper/ZookeeperMetaWatcher.java
new file mode 100644
index 0000000..2d08206
--- /dev/null
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/zookeeper/ZookeeperMetaWatcher.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.meta.zookeeper;
+
+import java.util.List;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.sort.configuration.Configuration;
+import org.apache.inlong.sort.meta.MetaManager;
+import org.apache.inlong.sort.meta.MetaManager.DataFlowInfoListener;
+import org.apache.inlong.sort.meta.MetaWatcher;
+import org.apache.inlong.sort.meta.zookeeper.ZookeeperWatcherUtils.ChildrenWatcherListener;
+import org.apache.inlong.sort.protocol.DataFlowInfo;
+import org.apache.inlong.sort.protocol.DataFlowStorageInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * ZookeeperMetaWatcher
+ */
+public class ZookeeperMetaWatcher implements MetaWatcher {
+
+    private ZookeeperWatcherUtils watcherUtils;
+
+    /**
+     * Constructor
+     */
+    public ZookeeperMetaWatcher() {
+        this.watcherUtils = new ZookeeperWatcherUtils();
+    }
+
+    /**
+     * open
+     * 
+     * @param  config
+     * @param  metaListener
+     * @throws Exception
+     */
+    public void open(Configuration config, DataFlowInfoListener metaListener) throws Exception {
+        DataFlowsChildrenWatcherListener dataFlowsChildrenWatcherListener = new DataFlowsChildrenWatcherListener(
+                metaListener);
+        this.watcherUtils.open(config, dataFlowsChildrenWatcherListener);
+    }
+
+    @Override
+    public void close() throws Exception {
+        this.watcherUtils.close();
+    }
+
+    private static class DataFlowsChildrenWatcherListener implements ChildrenWatcherListener {
+
+        private static final Logger LOG = LoggerFactory.getLogger(DataFlowsChildrenWatcherListener.class);
+
+        private final ObjectMapper objectMapper;
+
+        /**
+         * Connection to the used ZooKeeper quorum.
+         */
+        private CuratorFramework zookeeperClient;
+
+        private final DataFlowInfoListener metaListener;
+
+        public DataFlowsChildrenWatcherListener(DataFlowInfoListener metaListener) {
+            this.objectMapper = new ObjectMapper();
+            this.metaListener = metaListener;
+        }
+
+        /**
+         * initCuratorFramework
+         * 
+         * @param zookeeperClient
+         */
+        @Override
+        public void initCuratorFramework(CuratorFramework zookeeperClient) {
+            this.zookeeperClient = zookeeperClient;
+        }
+
+        @Override
+        public void onChildAdded(ChildData childData) throws Exception {
+            LOG.info("DataFlow Added event retrieved");
+
+            final byte[] data = childData.getData();
+            if (data == null) {
+                return;
+            }
+
+            synchronized (MetaManager.LOCK) {
+                DataFlowStorageInfo dataFlowStorageInfo = objectMapper.readValue(data, DataFlowStorageInfo.class);
+                DataFlowInfo dataFlowInfo = getDataFlowInfo(dataFlowStorageInfo);
+                metaListener.addDataFlow(dataFlowInfo);
+            }
+        }
+
+        @Override
+        public void onChildUpdated(ChildData childData) throws Exception {
+            LOG.info("DataFlow Updated event retrieved");
+
+            final byte[] data = childData.getData();
+            if (data == null) {
+                return;
+            }
+
+            synchronized (MetaManager.LOCK) {
+                DataFlowStorageInfo dataFlowStorageInfo = objectMapper.readValue(data, DataFlowStorageInfo.class);
+                DataFlowInfo dataFlowInfo = getDataFlowInfo(dataFlowStorageInfo);
+                metaListener.updateDataFlow(dataFlowInfo);
+            }
+        }
+
+        @Override
+        public void onChildRemoved(ChildData childData) throws Exception {
+            LOG.info("DataFlow Removed event retrieved");
+
+            final byte[] data = childData.getData();
+            if (data == null) {
+                return;
+            }
+
+            synchronized (MetaManager.LOCK) {
+                DataFlowStorageInfo dataFlowStorageInfo = objectMapper.readValue(data, DataFlowStorageInfo.class);
+                DataFlowInfo dataFlowInfo = getDataFlowInfo(dataFlowStorageInfo);
+                metaListener.removeDataFlow(dataFlowInfo);
+            }
+        }
+
+        @Override
+        public void onInitialized(List<ChildData> childData) throws Exception {
+            LOG.info("Initialized event retrieved");
+
+            for (ChildData singleChildData : childData) {
+                onChildAdded(singleChildData);
+            }
+        }
+
+        private DataFlowInfo getDataFlowInfo(DataFlowStorageInfo dataFlowStorageInfo) throws Exception {
+            switch (dataFlowStorageInfo.getStorageType()) {
+                case ZK :
+                    String zkPath = dataFlowStorageInfo.getPath();
+                    byte[] data = zookeeperClient.getData().forPath(zkPath);
+                    return objectMapper.readValue(data, DataFlowInfo.class);
+                case HDFS :
+                    throw new IllegalArgumentException("HDFS dataFlow storage type not supported yet!");
+                default :
+                    throw new IllegalArgumentException("Unsupported dataFlow storage type "
+                            + dataFlowStorageInfo.getStorageType());
+            }
+        }
+    }
+}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/util/ZookeeperWatcher.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/zookeeper/ZookeeperWatcherUtils.java
similarity index 80%
rename from inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/util/ZookeeperWatcher.java
rename to inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/zookeeper/ZookeeperWatcherUtils.java
index bebd072..c26d1c6 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/util/ZookeeperWatcher.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/zookeeper/ZookeeperWatcherUtils.java
@@ -15,18 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.util;
+package org.apache.inlong.sort.meta.zookeeper;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.nio.charset.StandardCharsets.UTF_8;
 
-import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
+
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.framework.recipes.cache.ChildData;
@@ -38,38 +39,69 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.inlong.sort.configuration.Configuration;
+import org.apache.inlong.sort.configuration.Constants;
+import org.apache.inlong.sort.meta.MetaManager;
+import org.apache.inlong.sort.util.ZooKeeperUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ZookeeperWatcher implements AutoCloseable, UnhandledErrorListener {
+import com.google.common.base.Preconditions;
+
+public class ZookeeperWatcherUtils implements AutoCloseable, UnhandledErrorListener {
 
-    private static final Logger LOG = LoggerFactory.getLogger(ZookeeperWatcher.class);
+    private static final Logger LOG = LoggerFactory.getLogger(ZookeeperWatcherUtils.class);
 
     private final Object lock = new Object();
 
     /**
      * Connection to the used ZooKeeper quorum.
      */
-    private final CuratorFramework client;
+    private CuratorFramework client;
 
     /**
      * Curator recipe to watch changes of a specific ZooKeeper node.
      */
     @GuardedBy("lock")
-    private final Map<String, NodeCache> caches;
+    private Map<String, NodeCache> caches;
 
     @GuardedBy("lock")
-    private final Map<String, PathChildrenCache> pathChildrenCaches;
+    private Map<String, PathChildrenCache> pathChildrenCaches;
 
-    private final ConnectionStateListener connectionStateListener =
-            (client, newState) -> handleStateChange(newState);
+    private ConnectionStateListener connectionStateListener = (client, newState) -> handleStateChange(newState);
 
-    public ZookeeperWatcher(CuratorFramework client) {
-        this.client = checkNotNull(client, "CuratorFramework client");
+    /**
+     * open
+     * 
+     * @param  config    command parameters when process start.
+     * @param  listener  a listener of ChildrenWatcherListener
+     * @throws Exception any exception
+     */
+    public void open(Configuration config, ChildrenWatcherListener listener) throws Exception {
+        this.client = ZooKeeperUtils.startCuratorFramework(config);
         this.caches = new HashMap<>();
         this.pathChildrenCaches = new HashMap<>();
         client.getUnhandledErrorListenable().addListener(this);
         client.getConnectionStateListenable().addListener(connectionStateListener);
+        synchronized (MetaManager.LOCK) {
+            final String dataFlowsWatchingPath = getWatchingPathOfDataFlowsInCluster(
+                    config.getString(Constants.CLUSTER_ID));
+            listener.initCuratorFramework(client);
+            this.registerPathChildrenWatcher(
+                    dataFlowsWatchingPath, listener, true);
+            final List<ChildData> childData = this.getCurrentPathChildrenDatum(dataFlowsWatchingPath);
+            if (childData != null) {
+                listener.onInitialized(childData);
+            }
+        }
+    }
+
+    /**
+     * If you want to change the path rule here, please change ZkTools#getNodePathOfDataFlowStorageInfoInCluster in api
+     * too.
+     */
+    public static String getWatchingPathOfDataFlowsInCluster(String cluster) {
+        return "/clusters/" + cluster + "/dataflows";
     }
 
     @Override
@@ -93,17 +125,18 @@ public class ZookeeperWatcher implements AutoCloseable, UnhandledErrorListener {
                 throw new Exception("Could not properly stop the ZooKeeperLeaderRetrievalService.", e);
             }
         }
+        this.client.close();
     }
 
     /**
      * Register a watcher on path.
      *
-     * @param watchingPath the watching path
-     * @param listener the callback listener
-     * @param initial if initial is true, it retrieves the data of the watching path
-     *         synchronously, then invokes the listener immediately. Otherwise it returns
-     *         immediately and invoke the listener asynchronously if the node is changed
-     * @throws Exception exception
+     * @param  watchingPath the watching path
+     * @param  listener     the callback listener
+     * @param  initial      if initial is true, it retrieves the data of the watching path synchronously, then invokes
+     *                      the listener immediately. Otherwise it returns immediately and invoke the listener
+     *                      asynchronously if the node is changed
+     * @throws Exception    exception
      */
     public void registerWatcher(
             String watchingPath, WatcherListener listener, boolean initial) throws Exception {
@@ -135,6 +168,7 @@ public class ZookeeperWatcher implements AutoCloseable, UnhandledErrorListener {
 
     /**
      * Register pathChildren watcher.
+     * 
      * @param buildInitial whether build the initial cache or not.
      */
     public void registerPathChildrenWatcher(
@@ -204,19 +238,19 @@ public class ZookeeperWatcher implements AutoCloseable, UnhandledErrorListener {
 
     protected void handleStateChange(ConnectionState newState) {
         switch (newState) {
-            case CONNECTED:
+            case CONNECTED :
                 LOG.info("Connected to ZooKeeper quorum.");
                 break;
-            case SUSPENDED:
+            case SUSPENDED :
                 LOG.warn("Connection to ZooKeeper suspended.");
                 break;
-            case RECONNECTED:
+            case RECONNECTED :
                 LOG.info("Connection to ZooKeeper was reconnected.");
                 break;
-            case LOST:
+            case LOST :
                 LOG.warn("Connection to ZooKeeper lost.");
                 break;
-            default:
+            default :
                 LOG.info("Received Zookeeper new state {}", newState);
                 break;
         }
@@ -266,6 +300,9 @@ public class ZookeeperWatcher implements AutoCloseable, UnhandledErrorListener {
     }
 
     public interface ChildrenWatcherListener {
+
+        void initCuratorFramework(CuratorFramework zookeeperClient);
+
         void onChildAdded(ChildData childData) throws Exception;
 
         void onChildUpdated(ChildData childData) throws Exception;
@@ -307,28 +344,28 @@ public class ZookeeperWatcher implements AutoCloseable, UnhandledErrorListener {
 
             try {
                 switch (pathChildrenCacheEvent.getType()) {
-                    case INITIALIZED:
+                    case INITIALIZED :
                         childrenWatcherListener.onInitialized(pathChildrenCacheEvent.getInitialData());
                         break;
-                    case CHILD_ADDED:
+                    case CHILD_ADDED :
                         childrenWatcherListener.onChildAdded(pathChildrenCacheEvent.getData());
                         break;
-                    case CHILD_UPDATED:
+                    case CHILD_UPDATED :
                         childrenWatcherListener.onChildUpdated(pathChildrenCacheEvent.getData());
                         break;
-                    case CHILD_REMOVED:
+                    case CHILD_REMOVED :
                         childrenWatcherListener.onChildRemoved(pathChildrenCacheEvent.getData());
                         break;
-                    case CONNECTION_LOST:
+                    case CONNECTION_LOST :
                         childrenWatcherListener.onConnectionLost();
                         break;
-                    case CONNECTION_SUSPENDED:
+                    case CONNECTION_SUSPENDED :
                         childrenWatcherListener.onConnectionSuspended();
                         break;
-                    case CONNECTION_RECONNECTED:
+                    case CONNECTION_RECONNECTED :
                         childrenWatcherListener.onConnectionReconnected();
                         break;
-                    default:
+                    default :
                         LOG.error("Undefined pathChildrenCacheEvent");
                 }
             } catch (Throwable t) {
diff --git a/inlong-sort/sort-formats/format-base/src/test/java/org.apache.inlong.sort.formats.base/TableFormatUtilsTest.java b/inlong-sort/sort-formats/format-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java
similarity index 100%
rename from inlong-sort/sort-formats/format-base/src/test/java/org.apache.inlong.sort.formats.base/TableFormatUtilsTest.java
rename to inlong-sort/sort-formats/format-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java