You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/12/29 04:51:15 UTC
[incubator-inlong] branch master updated: [INLONG-1992] sort-flink support configurable loader of getting configuration. (#2022)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ead0b9e [INLONG-1992] sort-flink support configurable loader of getting configuration. (#2022)
ead0b9e is described below
commit ead0b9e692c850ded4c8ba30bf9bbafcc0927fd9
Author: 卢春亮 <94...@qq.com>
AuthorDate: Wed Dec 29 12:51:06 2021 +0800
[INLONG-1992] sort-flink support configurable loader of getting configuration. (#2022)
---
.../inlong/sort/meta/MetaDataFlowInfoListener.java | 73 +++++
.../org/apache/inlong/sort/meta/MetaManager.java | 298 ++++++++++-----------
.../org/apache/inlong/sort/meta/MetaWatcher.java | 44 +++
.../sort/meta/zookeeper/ZookeeperMetaWatcher.java | 167 ++++++++++++
.../zookeeper/ZookeeperWatcherUtils.java} | 97 ++++---
.../sort/formats/base}/TableFormatUtilsTest.java | 0
6 files changed, 486 insertions(+), 193 deletions(-)
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/MetaDataFlowInfoListener.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/MetaDataFlowInfoListener.java
new file mode 100644
index 0000000..7bfdac9
--- /dev/null
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/MetaDataFlowInfoListener.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.meta;
+
+import org.apache.inlong.sort.meta.MetaManager.DataFlowInfoListener;
+import org.apache.inlong.sort.protocol.DataFlowInfo;
+
+/**
+ *
+ * MetaDataFlowInfoListener
+ */
+public class MetaDataFlowInfoListener implements DataFlowInfoListener {
+
+ private MetaManager manager;
+
+ /**
+ * Constructor
+ *
+ * @param manager
+ */
+ public MetaDataFlowInfoListener(MetaManager manager) {
+ this.manager = manager;
+ }
+
+ /**
+ * addDataFlow
+ *
+ * @param dataFlowInfo
+ * @throws Exception
+ */
+ @Override
+ public void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+ this.manager.addDataFlow(dataFlowInfo);
+ }
+
+ /**
+ * updateDataFlow
+ *
+ * @param dataFlowInfo
+ * @throws Exception
+ */
+ @Override
+ public void updateDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+ this.manager.updateDataFlow(dataFlowInfo);
+ }
+
+ /**
+ * removeDataFlow
+ *
+ * @param dataFlowInfo
+ * @throws Exception
+ */
+ @Override
+ public void removeDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+ this.manager.removeDataFlow(dataFlowInfo);
+ }
+
+}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/MetaManager.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/MetaManager.java
index 409daa6..0961ffe 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/MetaManager.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/MetaManager.java
@@ -17,51 +17,50 @@
package org.apache.inlong.sort.meta;
-import org.apache.inlong.sort.configuration.Configuration;
-import org.apache.inlong.sort.configuration.Constants;
-import org.apache.inlong.sort.protocol.DataFlowInfo;
-import org.apache.inlong.sort.protocol.DataFlowStorageInfo;
-import org.apache.inlong.sort.util.ZooKeeperUtils;
-import org.apache.inlong.sort.util.ZookeeperWatcher;
-import org.apache.inlong.sort.util.ZookeeperWatcher.ChildrenWatcherListener;
+import static org.apache.inlong.sort.configuration.ConfigOptions.key;
+
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
import javax.annotation.concurrent.GuardedBy;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.cache.ChildData;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.inlong.sort.configuration.ConfigOption;
+import org.apache.inlong.sort.configuration.Configuration;
+import org.apache.inlong.sort.meta.zookeeper.ZookeeperMetaWatcher;
+import org.apache.inlong.sort.protocol.DataFlowInfo;
+import org.objenesis.instantiator.util.ClassUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * This is the manager of meta. It's a singleton and shared by many components from different
- * threads. So it's thread-safe.
+ * This is the manager of meta. It's a singleton and shared by many components from different threads. So it's
+ * thread-safe.
*
*/
public class MetaManager implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(MetaManager.class);
- private static MetaManager instance;
+ public static final ConfigOption<String> META_WATCHER_TYPE = key("meta.watcher.type")
+ .defaultValue(ZookeeperMetaWatcher.class.getName())
+ .withDescription("Defines the meta watcher class name.");
- private static final Object lock = new Object();
-
- private final CuratorFramework zookeeperClient;
+ private static MetaManager instance;
- private final ZookeeperWatcher zookeeperWatcher;
+ public static final Object LOCK = new Object();
- private final ObjectMapper objectMapper;
+ private MetaWatcher watcher;
@GuardedBy("lock")
private final Map<Long, DataFlowInfo> dataFlowInfoMap;
@GuardedBy("lock")
- private final List<DataFlowInfoListener> dataFlowInfoListeners;
+ private final List<DataFlowInfoListener> dataFlowInfoListeners;
public static MetaManager getInstance(Configuration config) throws Exception {
- synchronized (lock) {
+ synchronized (LOCK) {
if (instance == null) {
instance = new MetaManager(config);
instance.open(config);
@@ -71,7 +70,7 @@ public class MetaManager implements AutoCloseable {
}
public static void release() throws Exception {
- synchronized (lock) {
+ synchronized (LOCK) {
if (instance != null) {
instance.close();
instance = null;
@@ -80,45 +79,48 @@ public class MetaManager implements AutoCloseable {
}
/**
- * If you want to change the path rule here, please change
- * ZkTools#getNodePathOfDataFlowStorageInfoInCluster in api too.
+ * Constructor
+ *
+ * @param config process start parameters
*/
- public static String getWatchingPathOfDataFlowsInCluster(String cluster) {
- return "/clusters/" + cluster + "/dataflows";
- }
-
private MetaManager(Configuration config) {
- zookeeperClient = ZooKeeperUtils.startCuratorFramework(config);
- zookeeperWatcher = new ZookeeperWatcher(zookeeperClient);
- objectMapper = new ObjectMapper();
- dataFlowInfoMap = new HashMap<>();
+ dataFlowInfoMap = new ConcurrentHashMap<>();
dataFlowInfoListeners = new ArrayList<>();
}
+ /**
+ * open
+ *
+ * @param config
+ * @throws Exception
+ */
private void open(Configuration config) throws Exception {
- synchronized (lock) {
- final String dataFlowsWatchingPath
- = getWatchingPathOfDataFlowsInCluster(config.getString(Constants.CLUSTER_ID));
- final DataFlowsChildrenWatcherListener dataFlowsChildrenWatcherListener
- = new DataFlowsChildrenWatcherListener();
- zookeeperWatcher.registerPathChildrenWatcher(
- dataFlowsWatchingPath, dataFlowsChildrenWatcherListener, true);
- final List<ChildData> childData = zookeeperWatcher.getCurrentPathChildrenDatum(dataFlowsWatchingPath);
- if (childData != null) {
- dataFlowsChildrenWatcherListener.onInitialized(childData);
- }
- }
+ String watcherClass = config.getString(META_WATCHER_TYPE);
+ this.watcher = (MetaWatcher) ClassUtils.newInstance(Class.forName(watcherClass));
+ this.watcher.open(config, new MetaDataFlowInfoListener(this));
}
+ /**
+ * close
+ *
+ * @throws Exception
+ */
public void close() throws Exception {
- synchronized (lock) {
- zookeeperWatcher.close();
- zookeeperClient.close();
+ synchronized (LOCK) {
+ if (watcher != null) {
+ watcher.close();
+ }
}
}
+ /**
+ * registerDataFlowInfoListener
+ *
+ * @param dataFlowInfoListener
+ * @throws Exception
+ */
public void registerDataFlowInfoListener(DataFlowInfoListener dataFlowInfoListener) throws Exception {
- synchronized (lock) {
+ synchronized (LOCK) {
dataFlowInfoListeners.add(dataFlowInfoListener);
for (DataFlowInfo dataFlowInfo : dataFlowInfoMap.values()) {
try {
@@ -132,149 +134,119 @@ public class MetaManager implements AutoCloseable {
LOG.info("Register DataFlowInfoListener successfully");
}
+ /**
+ * getDataFlowInfo
+ *
+ * @param id
+ * @return
+ */
public DataFlowInfo getDataFlowInfo(long id) {
- synchronized (lock) {
+ synchronized (LOCK) {
return dataFlowInfoMap.get(id);
}
}
- public interface DataFlowInfoListener {
- void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
-
- void updateDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
-
- void removeDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
- }
-
- private class DataFlowsChildrenWatcherListener implements ChildrenWatcherListener {
-
- @Override
- public void onChildAdded(ChildData childData) throws Exception {
- LOG.info("DataFlow Added event retrieved");
-
- final byte[] data = childData.getData();
- if (data == null) {
- return;
- }
-
- synchronized (lock) {
- DataFlowStorageInfo dataFlowStorageInfo = objectMapper.readValue(data, DataFlowStorageInfo.class);
- DataFlowInfo dataFlowInfo = getDataFlowInfo(dataFlowStorageInfo);
- long dataFlowId = dataFlowInfo.getId();
-
- DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId, dataFlowInfo);
- if (oldDataFlowInfo == null) {
- LOG.info("Try to add dataFlow {}", dataFlowId);
- for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
- try {
- dataFlowInfoListener.addDataFlow(dataFlowInfo);
- } catch (Exception e) {
- LOG.warn("Error happens when notifying listener data flow added", e);
- }
+ /**
+ * addDataFlow
+ *
+ * @param dataFlowInfo
+ * @throws Exception
+ */
+ public void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+ synchronized (LOCK) {
+ long dataFlowId = dataFlowInfo.getId();
+ DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId, dataFlowInfo);
+ if (oldDataFlowInfo == null) {
+ LOG.info("Try to add dataFlow {}", dataFlowId);
+ for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
+ try {
+ dataFlowInfoListener.addDataFlow(dataFlowInfo);
+ } catch (Exception e) {
+ LOG.warn("Error happens when notifying listener data flow added", e);
}
- } else {
- LOG.warn("DataFlow {} should not be exist", dataFlowId);
- for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
- try {
- dataFlowInfoListener.updateDataFlow(dataFlowInfo);
- } catch (Exception e) {
- LOG.warn("Error happens when notifying listener data flow updated", e);
- }
+ }
+ } else {
+ LOG.warn("DataFlow {} should not be exist", dataFlowId);
+ for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
+ try {
+ dataFlowInfoListener.updateDataFlow(dataFlowInfo);
+ } catch (Exception e) {
+ LOG.warn("Error happens when notifying listener data flow updated", e);
}
}
}
}
+ }
- @Override
- public void onChildUpdated(ChildData childData) throws Exception {
- LOG.info("DataFlow Updated event retrieved");
-
- final byte[] data = childData.getData();
- if (data == null) {
- return;
- }
-
- synchronized (lock) {
- DataFlowStorageInfo dataFlowStorageInfo = objectMapper.readValue(data, DataFlowStorageInfo.class);
- DataFlowInfo dataFlowInfo = getDataFlowInfo(dataFlowStorageInfo);
- long dataFlowId = dataFlowInfo.getId();
-
- DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId, dataFlowInfo);
- if (oldDataFlowInfo == null) {
- LOG.warn("DataFlow {} should already be exist.", dataFlowId);
- for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
- try {
- dataFlowInfoListener.addDataFlow(dataFlowInfo);
- } catch (Exception e) {
- LOG.warn("Error happens when notifying listener data flow updated", e);
- }
- }
- } else {
- if (dataFlowInfo.equals(oldDataFlowInfo)) {
- LOG.info("DataFlowInfo has not been changed, ignore update.");
- return;
- }
-
- LOG.info("Try to update dataFlow {}.", dataFlowId);
+ /**
+ * updateDataFlow
+ *
+ * @param dataFlowInfo
+ * @throws Exception
+ */
+ public void updateDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+ synchronized (LOCK) {
+ long dataFlowId = dataFlowInfo.getId();
- for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
- try {
- dataFlowInfoListener.updateDataFlow(dataFlowInfo);
- } catch (Exception e) {
- LOG.warn("Error happens when notifying listener data flow updated", e);
- }
+ DataFlowInfo oldDataFlowInfo = dataFlowInfoMap.put(dataFlowId, dataFlowInfo);
+ if (oldDataFlowInfo == null) {
+ LOG.warn("DataFlow {} should already be exist.", dataFlowId);
+ for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
+ try {
+ dataFlowInfoListener.addDataFlow(dataFlowInfo);
+ } catch (Exception e) {
+ LOG.warn("Error happens when notifying listener data flow updated", e);
}
}
- }
- }
-
- @Override
- public void onChildRemoved(ChildData childData) throws Exception {
- LOG.info("DataFlow Removed event retrieved");
-
- final byte[] data = childData.getData();
- if (data == null) {
- return;
- }
+ } else {
+ if (dataFlowInfo.equals(oldDataFlowInfo)) {
+ LOG.info("DataFlowInfo has not been changed, ignore update.");
+ return;
+ }
- synchronized (lock) {
- DataFlowStorageInfo dataFlowStorageInfo = objectMapper.readValue(data, DataFlowStorageInfo.class);
- DataFlowInfo dataFlowInfo = getDataFlowInfo(dataFlowStorageInfo);
- long dataFlowId = dataFlowInfo.getId();
+ LOG.info("Try to update dataFlow {}.", dataFlowId);
- dataFlowInfoMap.remove(dataFlowId);
for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
try {
- dataFlowInfoListener.removeDataFlow(dataFlowInfo);
+ dataFlowInfoListener.updateDataFlow(dataFlowInfo);
} catch (Exception e) {
- LOG.warn("Error happens when notifying listener data flow deleted", e);
+ LOG.warn("Error happens when notifying listener data flow updated", e);
}
}
}
}
+ }
- @Override
- public void onInitialized(List<ChildData> childData) throws Exception {
- LOG.info("Initialized event retrieved");
+ /**
+ * removeDataFlow
+ *
+ * @param dataFlowInfo
+ * @throws Exception
+ */
+ public void removeDataFlow(DataFlowInfo dataFlowInfo) throws Exception {
+ synchronized (LOCK) {
+ long dataFlowId = dataFlowInfo.getId();
- for (ChildData singleChildData : childData) {
- onChildAdded(singleChildData);
+ dataFlowInfoMap.remove(dataFlowId);
+ for (DataFlowInfoListener dataFlowInfoListener : dataFlowInfoListeners) {
+ try {
+ dataFlowInfoListener.removeDataFlow(dataFlowInfo);
+ } catch (Exception e) {
+ LOG.warn("Error happens when notifying listener data flow deleted", e);
+ }
}
}
}
- private DataFlowInfo getDataFlowInfo(DataFlowStorageInfo dataFlowStorageInfo) throws Exception {
- switch (dataFlowStorageInfo.getStorageType()) {
- case ZK:
- String zkPath = dataFlowStorageInfo.getPath();
- byte[] data = zookeeperClient.getData().forPath(zkPath);
- return objectMapper.readValue(data, DataFlowInfo.class);
- case HDFS:
- throw new IllegalArgumentException("HDFS dataFlow storage type not supported yet!");
- default:
- throw new IllegalArgumentException("Unsupported dataFlow storage type "
- + dataFlowStorageInfo.getStorageType());
- }
- }
+ /**
+ * MetaManager DataFlowInfoListener
+ */
+ public interface DataFlowInfoListener {
+
+ void addDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
+ void updateDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
+
+ void removeDataFlow(DataFlowInfo dataFlowInfo) throws Exception;
+ }
}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/MetaWatcher.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/MetaWatcher.java
new file mode 100644
index 0000000..cc85902
--- /dev/null
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/MetaWatcher.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.meta;
+
+import org.apache.inlong.sort.configuration.Configuration;
+import org.apache.inlong.sort.meta.MetaManager.DataFlowInfoListener;
+
+/**
+ *
+ * MetaWatcher
+ */
+public interface MetaWatcher {
+
+ /**
+ * open
+ *
+ * @param config command parameters when process start.
+ * @param metaListener a listener of DataFlowInfo
+ * @throws Exception any exception
+ */
+ public void open(Configuration config, DataFlowInfoListener metaListener) throws Exception;
+
+ /**
+ * close
+ *
+ * @throws Exception
+ */
+ public void close() throws Exception;
+}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/zookeeper/ZookeeperMetaWatcher.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/zookeeper/ZookeeperMetaWatcher.java
new file mode 100644
index 0000000..2d08206
--- /dev/null
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/zookeeper/ZookeeperMetaWatcher.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.meta.zookeeper;
+
+import java.util.List;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.sort.configuration.Configuration;
+import org.apache.inlong.sort.meta.MetaManager;
+import org.apache.inlong.sort.meta.MetaManager.DataFlowInfoListener;
+import org.apache.inlong.sort.meta.MetaWatcher;
+import org.apache.inlong.sort.meta.zookeeper.ZookeeperWatcherUtils.ChildrenWatcherListener;
+import org.apache.inlong.sort.protocol.DataFlowInfo;
+import org.apache.inlong.sort.protocol.DataFlowStorageInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * ZookeeperMetaWatcher
+ */
+public class ZookeeperMetaWatcher implements MetaWatcher {
+
+ private ZookeeperWatcherUtils watcherUtils;
+
+ /**
+ * Constructor
+ */
+ public ZookeeperMetaWatcher() {
+ this.watcherUtils = new ZookeeperWatcherUtils();
+ }
+
+ /**
+ * open
+ *
+ * @param config
+ * @param metaListener
+ * @throws Exception
+ */
+ public void open(Configuration config, DataFlowInfoListener metaListener) throws Exception {
+ DataFlowsChildrenWatcherListener dataFlowsChildrenWatcherListener = new DataFlowsChildrenWatcherListener(
+ metaListener);
+ this.watcherUtils.open(config, dataFlowsChildrenWatcherListener);
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.watcherUtils.close();
+ }
+
+ private static class DataFlowsChildrenWatcherListener implements ChildrenWatcherListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DataFlowsChildrenWatcherListener.class);
+
+ private final ObjectMapper objectMapper;
+
+ /**
+ * Connection to the used ZooKeeper quorum.
+ */
+ private CuratorFramework zookeeperClient;
+
+ private final DataFlowInfoListener metaListener;
+
+ public DataFlowsChildrenWatcherListener(DataFlowInfoListener metaListener) {
+ this.objectMapper = new ObjectMapper();
+ this.metaListener = metaListener;
+ }
+
+ /**
+ * initCuratorFramework
+ *
+ * @param zookeeperClient
+ */
+ @Override
+ public void initCuratorFramework(CuratorFramework zookeeperClient) {
+ this.zookeeperClient = zookeeperClient;
+ }
+
+ @Override
+ public void onChildAdded(ChildData childData) throws Exception {
+ LOG.info("DataFlow Added event retrieved");
+
+ final byte[] data = childData.getData();
+ if (data == null) {
+ return;
+ }
+
+ synchronized (MetaManager.LOCK) {
+ DataFlowStorageInfo dataFlowStorageInfo = objectMapper.readValue(data, DataFlowStorageInfo.class);
+ DataFlowInfo dataFlowInfo = getDataFlowInfo(dataFlowStorageInfo);
+ metaListener.addDataFlow(dataFlowInfo);
+ }
+ }
+
+ @Override
+ public void onChildUpdated(ChildData childData) throws Exception {
+ LOG.info("DataFlow Updated event retrieved");
+
+ final byte[] data = childData.getData();
+ if (data == null) {
+ return;
+ }
+
+ synchronized (MetaManager.LOCK) {
+ DataFlowStorageInfo dataFlowStorageInfo = objectMapper.readValue(data, DataFlowStorageInfo.class);
+ DataFlowInfo dataFlowInfo = getDataFlowInfo(dataFlowStorageInfo);
+ metaListener.updateDataFlow(dataFlowInfo);
+ }
+ }
+
+ @Override
+ public void onChildRemoved(ChildData childData) throws Exception {
+ LOG.info("DataFlow Removed event retrieved");
+
+ final byte[] data = childData.getData();
+ if (data == null) {
+ return;
+ }
+
+ synchronized (MetaManager.LOCK) {
+ DataFlowStorageInfo dataFlowStorageInfo = objectMapper.readValue(data, DataFlowStorageInfo.class);
+ DataFlowInfo dataFlowInfo = getDataFlowInfo(dataFlowStorageInfo);
+ metaListener.removeDataFlow(dataFlowInfo);
+ }
+ }
+
+ @Override
+ public void onInitialized(List<ChildData> childData) throws Exception {
+ LOG.info("Initialized event retrieved");
+
+ for (ChildData singleChildData : childData) {
+ onChildAdded(singleChildData);
+ }
+ }
+
+ private DataFlowInfo getDataFlowInfo(DataFlowStorageInfo dataFlowStorageInfo) throws Exception {
+ switch (dataFlowStorageInfo.getStorageType()) {
+ case ZK :
+ String zkPath = dataFlowStorageInfo.getPath();
+ byte[] data = zookeeperClient.getData().forPath(zkPath);
+ return objectMapper.readValue(data, DataFlowInfo.class);
+ case HDFS :
+ throw new IllegalArgumentException("HDFS dataFlow storage type not supported yet!");
+ default :
+ throw new IllegalArgumentException("Unsupported dataFlow storage type "
+ + dataFlowStorageInfo.getStorageType());
+ }
+ }
+ }
+}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/util/ZookeeperWatcher.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/zookeeper/ZookeeperWatcherUtils.java
similarity index 80%
rename from inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/util/ZookeeperWatcher.java
rename to inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/zookeeper/ZookeeperWatcherUtils.java
index bebd072..c26d1c6 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/util/ZookeeperWatcher.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/meta/zookeeper/ZookeeperWatcherUtils.java
@@ -15,18 +15,19 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.util;
+package org.apache.inlong.sort.meta.zookeeper;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.nio.charset.StandardCharsets.UTF_8;
-import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
+
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.recipes.cache.ChildData;
@@ -38,38 +39,69 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.inlong.sort.configuration.Configuration;
+import org.apache.inlong.sort.configuration.Constants;
+import org.apache.inlong.sort.meta.MetaManager;
+import org.apache.inlong.sort.util.ZooKeeperUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ZookeeperWatcher implements AutoCloseable, UnhandledErrorListener {
+import com.google.common.base.Preconditions;
+
+public class ZookeeperWatcherUtils implements AutoCloseable, UnhandledErrorListener {
- private static final Logger LOG = LoggerFactory.getLogger(ZookeeperWatcher.class);
+ private static final Logger LOG = LoggerFactory.getLogger(ZookeeperWatcherUtils.class);
private final Object lock = new Object();
/**
* Connection to the used ZooKeeper quorum.
*/
- private final CuratorFramework client;
+ private CuratorFramework client;
/**
* Curator recipe to watch changes of a specific ZooKeeper node.
*/
@GuardedBy("lock")
- private final Map<String, NodeCache> caches;
+ private Map<String, NodeCache> caches;
@GuardedBy("lock")
- private final Map<String, PathChildrenCache> pathChildrenCaches;
+ private Map<String, PathChildrenCache> pathChildrenCaches;
- private final ConnectionStateListener connectionStateListener =
- (client, newState) -> handleStateChange(newState);
+ private ConnectionStateListener connectionStateListener = (client, newState) -> handleStateChange(newState);
- public ZookeeperWatcher(CuratorFramework client) {
- this.client = checkNotNull(client, "CuratorFramework client");
+ /**
+ * open
+ *
+ * @param config command parameters when process start.
+ * @param listener a listener of ChildrenWatcherListener
+ * @throws Exception any exception
+ */
+ public void open(Configuration config, ChildrenWatcherListener listener) throws Exception {
+ this.client = ZooKeeperUtils.startCuratorFramework(config);
this.caches = new HashMap<>();
this.pathChildrenCaches = new HashMap<>();
client.getUnhandledErrorListenable().addListener(this);
client.getConnectionStateListenable().addListener(connectionStateListener);
+ synchronized (MetaManager.LOCK) {
+ final String dataFlowsWatchingPath = getWatchingPathOfDataFlowsInCluster(
+ config.getString(Constants.CLUSTER_ID));
+ listener.initCuratorFramework(client);
+ this.registerPathChildrenWatcher(
+ dataFlowsWatchingPath, listener, true);
+ final List<ChildData> childData = this.getCurrentPathChildrenDatum(dataFlowsWatchingPath);
+ if (childData != null) {
+ listener.onInitialized(childData);
+ }
+ }
+ }
+
+ /**
+ * If you want to change the path rule here, please change ZkTools#getNodePathOfDataFlowStorageInfoInCluster in api
+ * too.
+ */
+ public static String getWatchingPathOfDataFlowsInCluster(String cluster) {
+ return "/clusters/" + cluster + "/dataflows";
}
@Override
@@ -93,17 +125,18 @@ public class ZookeeperWatcher implements AutoCloseable, UnhandledErrorListener {
throw new Exception("Could not properly stop the ZooKeeperLeaderRetrievalService.", e);
}
}
+ this.client.close();
}
/**
* Register a watcher on path.
*
- * @param watchingPath the watching path
- * @param listener the callback listener
- * @param initial if initial is true, it retrieves the data of the watching path
- * synchronously, then invokes the listener immediately. Otherwise it returns
- * immediately and invoke the listener asynchronously if the node is changed
- * @throws Exception exception
+ * @param watchingPath the watching path
+ * @param listener the callback listener
+ * @param initial if initial is true, it retrieves the data of the watching path synchronously, then invokes
+ * the listener immediately. Otherwise it returns immediately and invoke the listener
+ * asynchronously if the node is changed
+ * @throws Exception exception
*/
public void registerWatcher(
String watchingPath, WatcherListener listener, boolean initial) throws Exception {
@@ -135,6 +168,7 @@ public class ZookeeperWatcher implements AutoCloseable, UnhandledErrorListener {
/**
* Register pathChildren watcher.
+ *
* @param buildInitial whether build the initial cache or not.
*/
public void registerPathChildrenWatcher(
@@ -204,19 +238,19 @@ public class ZookeeperWatcher implements AutoCloseable, UnhandledErrorListener {
protected void handleStateChange(ConnectionState newState) {
switch (newState) {
- case CONNECTED:
+ case CONNECTED :
LOG.info("Connected to ZooKeeper quorum.");
break;
- case SUSPENDED:
+ case SUSPENDED :
LOG.warn("Connection to ZooKeeper suspended.");
break;
- case RECONNECTED:
+ case RECONNECTED :
LOG.info("Connection to ZooKeeper was reconnected.");
break;
- case LOST:
+ case LOST :
LOG.warn("Connection to ZooKeeper lost.");
break;
- default:
+ default :
LOG.info("Received Zookeeper new state {}", newState);
break;
}
@@ -266,6 +300,9 @@ public class ZookeeperWatcher implements AutoCloseable, UnhandledErrorListener {
}
public interface ChildrenWatcherListener {
+
+ void initCuratorFramework(CuratorFramework zookeeperClient);
+
void onChildAdded(ChildData childData) throws Exception;
void onChildUpdated(ChildData childData) throws Exception;
@@ -307,28 +344,28 @@ public class ZookeeperWatcher implements AutoCloseable, UnhandledErrorListener {
try {
switch (pathChildrenCacheEvent.getType()) {
- case INITIALIZED:
+ case INITIALIZED :
childrenWatcherListener.onInitialized(pathChildrenCacheEvent.getInitialData());
break;
- case CHILD_ADDED:
+ case CHILD_ADDED :
childrenWatcherListener.onChildAdded(pathChildrenCacheEvent.getData());
break;
- case CHILD_UPDATED:
+ case CHILD_UPDATED :
childrenWatcherListener.onChildUpdated(pathChildrenCacheEvent.getData());
break;
- case CHILD_REMOVED:
+ case CHILD_REMOVED :
childrenWatcherListener.onChildRemoved(pathChildrenCacheEvent.getData());
break;
- case CONNECTION_LOST:
+ case CONNECTION_LOST :
childrenWatcherListener.onConnectionLost();
break;
- case CONNECTION_SUSPENDED:
+ case CONNECTION_SUSPENDED :
childrenWatcherListener.onConnectionSuspended();
break;
- case CONNECTION_RECONNECTED:
+ case CONNECTION_RECONNECTED :
childrenWatcherListener.onConnectionReconnected();
break;
- default:
+ default :
LOG.error("Undefined pathChildrenCacheEvent");
}
} catch (Throwable t) {
diff --git a/inlong-sort/sort-formats/format-base/src/test/java/org.apache.inlong.sort.formats.base/TableFormatUtilsTest.java b/inlong-sort/sort-formats/format-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java
similarity index 100%
rename from inlong-sort/sort-formats/format-base/src/test/java/org.apache.inlong.sort.formats.base/TableFormatUtilsTest.java
rename to inlong-sort/sort-formats/format-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java