You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by al...@apache.org on 2021/11/18 08:34:41 UTC
[incubator-inlong] branch master updated: [INLONG-1796]DataProxy support monitor indicator with JMX. #1796 (#1797)
This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e117b3c [INLONG-1796]DataProxy support monitor indicator with JMX. #1796 (#1797)
e117b3c is described below
commit e117b3c5b422706ff782f6b9ae0925c45a37ddb5
Author: 卢春亮 <94...@qq.com>
AuthorDate: Thu Nov 18 16:34:32 2021 +0800
[INLONG-1796]DataProxy support monitor indicator with JMX. #1796 (#1797)
* [Feature]DataProxy support monitor indicator with JMX. #1796
* [Feature]DataProxy support monitor indicator with JMX. #1796
* [Feature]DataProxy support monitor indicator with JMX. #1796
* [Feature]DataProxy support monitor indicator with JMX. #1796
* fix PR reviews.
* fix PR reviews.
* fix checkstyle problem.
* add more unit test case.
* add apache licenses.
* fix checkstyle problem.
* fix checkstyle problem.
---
.../commons/config/metrics/MetricItemSet.java | 20 ++
.../commons/config/metrics/MetricRegister.java | 4 +-
.../config/metrics/set/DataProxyMetricItemSet.java | 1 +
.../config/metrics/set/TestMetricItemSetMBean.java | 4 +-
inlong-dataproxy/dataproxy-source/pom.xml | 106 ++++----
.../config/holder/CacheClusterConfigHolder.java | 139 ++++++++++
.../config/holder/CommonPropertiesHolder.java | 116 ++++++++
.../config/holder/IdTopicConfigHolder.java | 161 +++++++++++
.../config/loader/CacheClusterConfigLoader.java | 31 +++
.../ClassResourceCommonPropertiesLoader.java | 68 +++++
.../config/loader/CommonPropertiesLoader.java | 34 +++
.../dataproxy/config/loader/ConfigLoader.java | 31 +++
.../loader/ContextCacheClusterConfigLoader.java | 76 ++++++
.../config/loader/ContextIdTopicConfigLoader.java | 64 +++++
.../config/loader/IdTopicConfigLoader.java | 30 +++
.../loader/ManagerCacheClusterConfigLoader.java | 65 +++++
.../config/loader/ManagerIdTopicConfigLoader.java | 78 ++++++
.../dataproxy/config/pojo/CacheClusterConfig.java | 53 ++--
.../inlong/dataproxy/config/pojo/CacheType.java | 57 ++--
.../inlong/dataproxy/config/pojo/DataType.java | 57 ++--
.../dataproxy/config/pojo/IdTopicConfig.java | 178 +++++++++++++
.../dataproxy/metrics/DataProxyMetricItem.java | 117 ++++++++
.../dataproxy/metrics}/DataProxyMetricItemSet.java | 28 +-
.../inlong/dataproxy/metrics/MetricItemValue.java | 84 ++++++
.../inlong/dataproxy/metrics/MetricListener.java | 39 +++
.../dataproxy/metrics/MetricListenerRunnable.java | 132 +++++++++
.../inlong/dataproxy/metrics/MetricObserver.java | 114 ++++++++
.../prometheus/PrometheusMetricListener.java | 42 +++
.../apache/inlong/dataproxy/node/Application.java | 5 +
.../org/apache/inlong/dataproxy/sink/MetaSink.java | 89 ++++++-
.../pulsar/federation/PulsarFederationSink.java | 140 ++++++++++
.../federation/PulsarFederationSinkContext.java | 198 ++++++++++++++
.../pulsar/federation/PulsarFederationWorker.java | 142 ++++++++++
.../pulsar/federation/PulsarProducerCluster.java | 295 +++++++++++++++++++++
.../federation/PulsarProducerFederation.java | 163 ++++++++++++
.../dataproxy/source/ServerMessageFactory.java | 11 +-
.../dataproxy/source/ServerMessageHandler.java | 68 ++++-
.../inlong/dataproxy/source/SimpleTcpSource.java | 25 +-
.../apache/inlong/dataproxy/utils/BufferQueue.java | 167 ++++++++++++
.../apache/inlong/dataproxy/utils/Constants.java | 33 +++
.../inlong/dataproxy/utils/SizeSemaphore.java | 126 +++++++++
.../TestClassResourceCommonPropertiesLoader.java | 47 ++++
.../TestContextCacheClusterConfigLoader.java | 84 ++++++
.../loader/TestContextIdTopicConfigLoader.java | 86 ++++++
.../metrics/TestDataProxyMetricItemSet.java | 20 +-
.../metrics/TestMetricListenerRunnable.java | 103 ++++---
.../federation/TestPulsarFederationSink.java | 90 +++++++
.../federation/TestPulsarProducerFederation.java | 100 +++++++
.../apache/inlong/dataproxy/utils/MockUtils.java | 157 +++++++++++
.../src/test/resources/common.properties | 22 ++
.../dataproxy-source/src/test/resources/flume.conf | 86 ++++++
51 files changed, 3944 insertions(+), 242 deletions(-)
diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricItemSet.java b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricItemSet.java
index 20fea7f..0426699 100644
--- a/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricItemSet.java
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricItemSet.java
@@ -28,9 +28,29 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public abstract class MetricItemSet<T extends MetricItem> implements MetricItemSetMBean {
+ protected String name;
+
protected Map<String, T> itemMap = new ConcurrentHashMap<>();
/**
+ * Constructor
+ *
+ * @param name
+ */
+ public MetricItemSet(String name) {
+ this.name = name;
+ }
+
+ /**
+ * getName
+ *
+ * @return
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
* createItem
*
* @return
diff --git a/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricRegister.java b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricRegister.java
index d4a5fc5..e459dc9 100644
--- a/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricRegister.java
+++ b/inlong-common/src/main/java/org/apache/inlong/commons/config/metrics/MetricRegister.java
@@ -61,8 +61,8 @@ public class MetricRegister {
public static void register(MetricItemSet<? extends MetricItem> obj) {
final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
StringBuilder beanName = new StringBuilder();
- beanName.append(MetricUtils.getDomain(obj.getClass())).append(MetricItemMBean.DOMAIN_SEPARATOR).append("type=")
- .append(obj.getClass().toString());
+ beanName.append(MetricUtils.getDomain(obj.getClass())).append(MetricItemMBean.DOMAIN_SEPARATOR).append("name=")
+ .append(obj.getName());
String strBeanName = beanName.toString();
try {
ObjectName objName = new ObjectName(strBeanName);
diff --git a/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItemSet.java b/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItemSet.java
index c50e5cd..0ba892c 100644
--- a/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItemSet.java
+++ b/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItemSet.java
@@ -31,6 +31,7 @@ public class DataProxyMetricItemSet extends MetricItemSet<DataProxyMetricItem> {
* Constructor
*/
private DataProxyMetricItemSet() {
+ super("DataProxy");
}
/**
diff --git a/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/TestMetricItemSetMBean.java b/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/TestMetricItemSetMBean.java
index 4e70d63..e6b7cb0 100644
--- a/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/TestMetricItemSetMBean.java
+++ b/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/TestMetricItemSetMBean.java
@@ -123,8 +123,8 @@ public class TestMetricItemSetMBean {
final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
StringBuilder beanName = new StringBuilder();
beanName.append(MetricUtils.getDomain(DataProxyMetricItemSet.class)).append(MetricItemMBean.DOMAIN_SEPARATOR)
- .append("type=")
- .append(DataProxyMetricItemSet.class.toString());
+ .append("name=")
+ .append(itemSet.getName());
String strBeanName = beanName.toString();
ObjectName objName = new ObjectName(strBeanName);
{
diff --git a/inlong-dataproxy/dataproxy-source/pom.xml b/inlong-dataproxy/dataproxy-source/pom.xml
index 7d2f1d0..31b1c5d 100644
--- a/inlong-dataproxy/dataproxy-source/pom.xml
+++ b/inlong-dataproxy/dataproxy-source/pom.xml
@@ -1,53 +1,65 @@
<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
--->
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ you under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <groupId>org.apache.inlong</groupId>
- <artifactId>inlong-dataproxy</artifactId>
- <version>0.12.0-incubating-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
- <name>Apache InLong - DataProxy Source</name>
- <artifactId>dataproxy-source</artifactId>
-
- <properties>
- <maven.compiler.source>8</maven.compiler.source>
- <maven.compiler.target>8</maven.compiler.target>
- <guava.version>19.0</guava.version>
- </properties>
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>inlong-dataproxy</artifactId>
+ <version>0.12.0-incubating-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <name>Apache InLong - DataProxy Source</name>
+ <artifactId>dataproxy-source</artifactId>
- <dependencies>
- <dependency>
- <groupId>org.apache.inlong</groupId>
- <artifactId>tubemq-client</artifactId>
- <version>${project.version}</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>${guava.version}</version>
- </dependency>
- </dependencies>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <compiler.source>1.8</compiler.source>
+ <compiler.target>1.8</compiler.target>
+ <junit.version>4.13</junit.version>
+ <guava.version>19.0</guava.version>
+ <skipTests>false</skipTests>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>tubemq-client</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ <version>2.0.2</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito2</artifactId>
+ <version>2.0.2</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
</project>
\ No newline at end of file
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CacheClusterConfigHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CacheClusterConfigHolder.java
new file mode 100644
index 0000000..a792bfe
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CacheClusterConfigHolder.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.holder;
+
+import static org.apache.inlong.dataproxy.config.loader.CacheClusterConfigLoader.CACHE_CLUSTER_CONFIG_TYPE;
+import static org.apache.inlong.dataproxy.config.loader.ConfigLoader.RELOAD_INTERVAL;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.commons.lang.ClassUtils;
+import org.apache.flume.Context;
+import org.apache.flume.conf.Configurable;
+import org.apache.inlong.dataproxy.config.loader.CacheClusterConfigLoader;
+import org.apache.inlong.dataproxy.config.loader.ContextCacheClusterConfigLoader;
+import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * CacheClusterConfigHolder
+ */
+public class CacheClusterConfigHolder implements Configurable {
+
+ public static final Logger LOG = LoggerFactory.getLogger(CacheClusterConfigHolder.class);
+
+ protected Context context;
+ private long reloadInterval;
+ private Timer reloadTimer;
+ private CacheClusterConfigLoader loader;
+
+ private List<CacheClusterConfig> configList;
+
+ /**
+ * configure
+ *
+ * @param context
+ */
+ @Override
+ public void configure(Context context) {
+ this.context = context;
+ this.reloadInterval = context.getLong(RELOAD_INTERVAL, 60000L);
+ String loaderType = context.getString(CACHE_CLUSTER_CONFIG_TYPE,
+ ContextCacheClusterConfigLoader.class.getName());
+ try {
+ Class<?> loaderClass = ClassUtils.getClass(loaderType);
+ Object loaderObject = loaderClass.getDeclaredConstructor().newInstance();
+ if (loaderObject instanceof CacheClusterConfigLoader) {
+ this.loader = (CacheClusterConfigLoader) loaderObject;
+ }
+ } catch (Throwable t) {
+ LOG.error("Fail to init loader,loaderType:{},error:{}", loaderType, t.getMessage());
+ LOG.error(t.getMessage(), t);
+ }
+ if (this.loader == null) {
+ this.loader = new ContextCacheClusterConfigLoader();
+ }
+ this.loader.configure(context);
+ }
+
+ /**
+ * start
+ */
+ public void start() {
+ try {
+ this.reload();
+ this.setReloadTimer();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * close
+ */
+ public void close() {
+ try {
+ this.reloadTimer.cancel();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * setReloadTimer
+ */
+ private void setReloadTimer() {
+ reloadTimer = new Timer(true);
+ TimerTask task = new TimerTask() {
+
+ /**
+ * run
+ */
+ public void run() {
+ reload();
+ }
+ };
+ reloadTimer.schedule(task, new Date(System.currentTimeMillis() + reloadInterval), reloadInterval);
+ }
+
+ /**
+ * reload
+ */
+ public void reload() {
+ try {
+ this.configList = this.loader.load();
+ } catch (Throwable e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * get configList
+ *
+ * @return the configList
+ */
+ public List<CacheClusterConfig> getConfigList() {
+ return configList;
+ }
+
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
new file mode 100644
index 0000000..6d9aa95
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.holder;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.lang.ClassUtils;
+import org.apache.flume.Context;
+import org.apache.inlong.dataproxy.config.loader.ClassResourceCommonPropertiesLoader;
+import org.apache.inlong.dataproxy.config.loader.CommonPropertiesLoader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * CommonPropertiesHolder
+ */
+public class CommonPropertiesHolder {
+
+ public static final Logger LOG = LoggerFactory.getLogger(CommonPropertiesHolder.class);
+ public static final String KEY_COMMON_PROPERTIES = "common-properties-loader";
+ public static final String DEFAULT_LOADER = ClassResourceCommonPropertiesLoader.class.getName();
+
+ private static Map<String, String> props;
+
+ /**
+ * init
+ */
+ private static void init() {
+ synchronized (KEY_COMMON_PROPERTIES) {
+ if (props == null) {
+ props = new ConcurrentHashMap<>();
+ String loaderClassName = System.getenv(KEY_COMMON_PROPERTIES);
+ loaderClassName = (loaderClassName == null) ? DEFAULT_LOADER : loaderClassName;
+ try {
+ Class<?> loaderClass = ClassUtils.getClass(loaderClassName);
+ Object loaderObject = loaderClass.getDeclaredConstructor().newInstance();
+ if (loaderObject instanceof CommonPropertiesLoader) {
+ CommonPropertiesLoader loader = (CommonPropertiesLoader) loaderObject;
+ props.putAll(loader.load());
+ LOG.info("loaderClass:{},properties:{}", loaderClassName, props);
+ }
+ } catch (Throwable t) {
+ LOG.error("Fail to init CommonPropertiesLoader,loaderClass:{},error:{}",
+ loaderClassName, t.getMessage());
+ LOG.error(t.getMessage(), t);
+ }
+
+ }
+ }
+ }
+
+ /**
+ * get props
+ *
+ * @return the props
+ */
+ public static Map<String, String> get() {
+ if (props != null) {
+ return props;
+ }
+ init();
+ return props;
+ }
+
+ /**
+ * Gets value mapped to key, returning defaultValue if unmapped.
+ *
+ * @param key to be found
+ * @param defaultValue returned if key is unmapped
+ * @return value associated with key
+ */
+ public static String getString(String key, String defaultValue) {
+ return get().getOrDefault(key, defaultValue);
+ }
+
+ /**
+ * Gets value mapped to key, returning null if unmapped.
+ *
+ * @param key to be found
+ * @return value associated with key or null if unmapped
+ */
+ public static String getString(String key) {
+ return get().get(key);
+ }
+
+ /**
+ * getStringFromContext
+ *
+ * @param context
+ * @param key
+ * @param defaultValue
+ * @return
+ */
+ public static String getStringFromContext(Context context, String key, String defaultValue) {
+ String value = context.getString(key);
+ value = (value != null) ? value : props.getOrDefault(key, defaultValue);
+ return value;
+ }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/IdTopicConfigHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/IdTopicConfigHolder.java
new file mode 100644
index 0000000..acb653c
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/IdTopicConfigHolder.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.holder;
+
+import static org.apache.inlong.dataproxy.config.loader.ConfigLoader.RELOAD_INTERVAL;
+import static org.apache.inlong.dataproxy.config.loader.IdTopicConfigLoader.IDTOPIC_CONFIG_TYPE;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.lang.ClassUtils;
+import org.apache.flume.Context;
+import org.apache.flume.conf.Configurable;
+import org.apache.inlong.dataproxy.config.loader.ContextIdTopicConfigLoader;
+import org.apache.inlong.dataproxy.config.loader.IdTopicConfigLoader;
+import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * IdTopicConfigHolder
+ */
+public class IdTopicConfigHolder implements Configurable {
+
+ public static final Logger LOG = LoggerFactory.getLogger(IdTopicConfigHolder.class);
+
+ protected Context context;
+ private long reloadInterval;
+ private Timer reloadTimer;
+ private IdTopicConfigLoader loader;
+
+ private List<IdTopicConfig> configList = new ArrayList<>();
+ private Map<String, IdTopicConfig> configMap = new ConcurrentHashMap<>();
+
+ /**
+ * configure
+ *
+ * @param context
+ */
+ @Override
+ public void configure(Context context) {
+ this.context = context;
+ this.reloadInterval = context.getLong(RELOAD_INTERVAL, 60000L);
+ String loaderType = context.getString(IDTOPIC_CONFIG_TYPE, ContextIdTopicConfigLoader.class.getName());
+ try {
+ Class<?> loaderClass = ClassUtils.getClass(loaderType);
+ Object loaderObject = loaderClass.getDeclaredConstructor().newInstance();
+ if (loaderObject instanceof IdTopicConfigLoader) {
+ this.loader = (IdTopicConfigLoader) loaderObject;
+ }
+ } catch (Throwable t) {
+ LOG.error("Fail to init loader,loaderType:{},error:{}", loaderType, t.getMessage());
+ LOG.error(t.getMessage(), t);
+ }
+ if (this.loader == null) {
+ this.loader = new ContextIdTopicConfigLoader();
+ }
+ this.loader.configure(context);
+ }
+
+ /**
+ * start
+ */
+ public void start() {
+ try {
+ this.reload();
+ this.setReloadTimer();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * close
+ */
+ public void close() {
+ try {
+ this.reloadTimer.cancel();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * setReloadTimer
+ */
+ private void setReloadTimer() {
+ reloadTimer = new Timer(true);
+ TimerTask task = new TimerTask() {
+
+ /**
+ * run
+ */
+ public void run() {
+ reload();
+ }
+ };
+ reloadTimer.schedule(task, new Date(System.currentTimeMillis() + reloadInterval), reloadInterval);
+ }
+
+ /**
+ * reload
+ */
+ public void reload() {
+ try {
+ List<IdTopicConfig> newConfigList = this.loader.load();
+ Map<String, IdTopicConfig> newConfigMap = new ConcurrentHashMap<>();
+ for (IdTopicConfig config : newConfigList) {
+ newConfigMap.put(config.getUid(), config);
+ }
+ this.configList = newConfigList;
+ this.configMap = newConfigMap;
+ } catch (Throwable e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * get configList
+ *
+ * @return the configList
+ */
+ public List<IdTopicConfig> getConfigList() {
+ return configList;
+ }
+
+ /**
+ * getTopic
+ *
+ * @param uid
+ * @return
+ */
+ public String getTopic(String uid) {
+ IdTopicConfig config = this.configMap.get(uid);
+ if (config != null) {
+ return config.getTopicName();
+ }
+ return null;
+ }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/CacheClusterConfigLoader.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/CacheClusterConfigLoader.java
new file mode 100644
index 0000000..f437eec
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/CacheClusterConfigLoader.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.loader;
+
+import org.apache.flume.conf.Configurable;
+import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
+
+/**
+ *
+ * CacheClusterConfigLoader
+ */
+public interface CacheClusterConfigLoader extends ConfigLoader<CacheClusterConfig>, Configurable {
+
+ String CACHE_CLUSTER_CONFIG = "cacheClusterConfig";
+ String CACHE_CLUSTER_CONFIG_TYPE = "cacheClusterConfig.type";
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ClassResourceCommonPropertiesLoader.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ClassResourceCommonPropertiesLoader.java
new file mode 100644
index 0000000..d029442
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ClassResourceCommonPropertiesLoader.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.loader;
+
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * FileCommonPropertiesLoader
+ */
+public class ClassResourceCommonPropertiesLoader implements CommonPropertiesLoader {
+
+ public static final Logger LOG = LoggerFactory.getLogger(ClassResourceCommonPropertiesLoader.class);
+
+ /**
+ * load
+ *
+ * @return
+ */
+ @Override
+ public Map<String, String> load() {
+ return this.loadProperties("common.properties");
+ }
+
+ /**
+ * loadProperties
+ *
+ * @param fileName
+ * @return
+ */
+ protected Map<String, String> loadProperties(String fileName) {
+ Map<String, String> result = new ConcurrentHashMap<>();
+ try (InputStream inStream = getClass().getClassLoader().getResource(fileName).openStream()) {
+ Properties props = new Properties();
+ props.load(inStream);
+ for (Map.Entry<Object, Object> entry : props.entrySet()) {
+ result.put((String) entry.getKey(), (String) entry.getValue());
+ }
+ } catch (UnsupportedEncodingException e) {
+ LOG.error("fail to load properties, file ={}, and e= {}", fileName, e);
+ } catch (Exception e) {
+ LOG.error("fail to load properties, file ={}, and e= {}", fileName, e);
+ }
+ return result;
+ }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/CommonPropertiesLoader.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/CommonPropertiesLoader.java
new file mode 100644
index 0000000..fcf839d
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/CommonPropertiesLoader.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.loader;
+
+import java.util.Map;
+
+/**
+ *
+ * CommonPropertiesLoader
+ */
+public interface CommonPropertiesLoader {
+
+ /**
+ * load
+ *
+ * @return
+ */
+ Map<String, String> load();
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ConfigLoader.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ConfigLoader.java
new file mode 100644
index 0000000..536256a
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ConfigLoader.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.loader;
+
+import java.util.List;
+
+/**
+ *
+ * ConfigLoader
+ */
+public interface ConfigLoader<T> {
+
+ String RELOAD_INTERVAL = "reloadInterval";
+
+ List<T> load();
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ContextCacheClusterConfigLoader.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ContextCacheClusterConfigLoader.java
new file mode 100644
index 0000000..ff6ac98
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ContextCacheClusterConfigLoader.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.loader;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.flume.Context;
+import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
+import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
+
+/**
+ *
+ * ContextCacheClusterConfigLoader
+ */
+public class ContextCacheClusterConfigLoader implements CacheClusterConfigLoader {
+
+ private Context context;
+
+ /**
+ * load
+ *
+ * @return
+ */
+ @Override
+ public List<CacheClusterConfig> load() {
+ String clusterNames = context.getString(CACHE_CLUSTER_CONFIG);
+ if (StringUtils.isBlank(clusterNames)) {
+ return new ArrayList<>();
+ }
+ //
+ String[] clusterNameArray = StringUtils.split(clusterNames);
+ Set<String> clusterNameSet = new HashSet<>();
+ clusterNameSet.addAll(Arrays.asList(clusterNameArray));
+ //
+ List<CacheClusterConfig> configList = new ArrayList<>(clusterNameSet.size());
+ for (String clusterName : clusterNameSet) {
+ CacheClusterConfig config = new CacheClusterConfig();
+ configList.add(config);
+ config.setClusterName(clusterName);
+ Map<String, String> params = context.getSubProperties("cacheClusterConfig." + clusterName + ".");
+ config.setParams(params);
+ }
+ return configList;
+ }
+
+ /**
+ * configure
+ *
+ * @param context
+ */
+ @Override
+ public void configure(Context context) {
+ this.context = context;
+ }
+
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ContextIdTopicConfigLoader.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ContextIdTopicConfigLoader.java
new file mode 100644
index 0000000..56fd240
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ContextIdTopicConfigLoader.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.loader;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.flume.Context;
+import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
+
+/**
+ *
+ * ContextIdTopicConfigLoader
+ */
+public class ContextIdTopicConfigLoader implements IdTopicConfigLoader {
+
+ private Context context;
+
+ /**
+ * load
+ *
+ * @return
+ */
+ @Override
+ public List<IdTopicConfig> load() {
+ Map<String, String> idTopicMap = context.getSubProperties("idTopicConfig.");
+ List<IdTopicConfig> configList = new ArrayList<>(idTopicMap.size());
+ for (Entry<String, String> entry : idTopicMap.entrySet()) {
+ IdTopicConfig config = new IdTopicConfig();
+ config.setInlongGroupId(entry.getKey());
+ config.setTopicName(entry.getValue());
+ configList.add(config);
+ }
+ return configList;
+ }
+
+ /**
+ * configure
+ *
+ * @param context
+ */
+ @Override
+ public void configure(Context context) {
+ this.context = context;
+ }
+
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/IdTopicConfigLoader.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/IdTopicConfigLoader.java
new file mode 100644
index 0000000..548c9e3
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/IdTopicConfigLoader.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.loader;
+
+import org.apache.flume.conf.Configurable;
+import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
+
+/**
+ *
+ * IdTopicConfigLoader
+ */
+public interface IdTopicConfigLoader extends ConfigLoader<IdTopicConfig>, Configurable {
+
+ String IDTOPIC_CONFIG_TYPE = "idTopicConfig.type";
+}
\ No newline at end of file
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ManagerCacheClusterConfigLoader.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ManagerCacheClusterConfigLoader.java
new file mode 100644
index 0000000..6eb5c2e
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ManagerCacheClusterConfigLoader.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.loader;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flume.Context;
+import org.apache.inlong.commons.pojo.dataproxy.CacheClusterObject;
+import org.apache.inlong.commons.pojo.dataproxy.DataProxyCluster;
+import org.apache.inlong.dataproxy.config.RemoteConfigManager;
+import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
+
+/**
+ *
+ * ManagerCacheClusterConfigLoader
+ */
+public class ManagerCacheClusterConfigLoader implements CacheClusterConfigLoader {
+
+ /**
+ * load
+ *
+ * @return
+ */
+ @Override
+ public List<CacheClusterConfig> load() {
+ List<CacheClusterConfig> configList = new ArrayList<>();
+ DataProxyCluster dataProxyCluster = RemoteConfigManager.getInstance().getCurrentClusterConfig();
+ if (dataProxyCluster == null) {
+ return configList;
+ }
+ for (CacheClusterObject obj : dataProxyCluster.getCacheClusterSet().getCacheClusters()) {
+ CacheClusterConfig config = new CacheClusterConfig();
+ config.setClusterName(obj.getName());
+ config.getParams().putAll(obj.getParams());
+ configList.add(config);
+ }
+ return configList;
+ }
+
+ /**
+ * configure
+ *
+ * @param context
+ */
+ @Override
+ public void configure(Context context) {
+ }
+
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ManagerIdTopicConfigLoader.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ManagerIdTopicConfigLoader.java
new file mode 100644
index 0000000..b48fe05
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/loader/ManagerIdTopicConfigLoader.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.loader;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flume.Context;
+import org.apache.inlong.commons.pojo.dataproxy.DataProxyCluster;
+import org.apache.inlong.commons.pojo.dataproxy.InLongIdObject;
+import org.apache.inlong.dataproxy.config.RemoteConfigManager;
+import org.apache.inlong.dataproxy.config.pojo.DataType;
+import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
+
+/**
+ *
+ * ManagerIdTopicConfigLoader
+ */
+public class ManagerIdTopicConfigLoader implements IdTopicConfigLoader {
+
+ /**
+ * load
+ *
+ * @return
+ */
+ @Override
+ public List<IdTopicConfig> load() {
+ List<IdTopicConfig> configList = new ArrayList<>();
+ DataProxyCluster dataProxyCluster = RemoteConfigManager.getInstance().getCurrentClusterConfig();
+ if (dataProxyCluster == null) {
+ return configList;
+ }
+ for (InLongIdObject obj : dataProxyCluster.getProxyCluster().getInlongIds()) {
+ IdTopicConfig config = new IdTopicConfig();
+ String id = obj.getInlongId();
+ String[] ids = id.split("\\.");
+ if (ids.length == 2) {
+ config.setInlongGroupId(ids[0]);
+ config.setInlongStreamid(ids[1]);
+ } else {
+ config.setInlongGroupId(id);
+ }
+ config.setTopicName(obj.getTopic());
+ Map<String, String> params = obj.getParams();
+ config.setDataType(DataType.convert(params.getOrDefault("dataType", DataType.TEXT.value())));
+ config.setFieldDelimiter(params.getOrDefault("fieldDelimiter", "|"));
+ config.setFileDelimiter(params.getOrDefault("fileDelimiter", "\n"));
+ configList.add(config);
+ }
+ return configList;
+ }
+
+ /**
+ * configure
+ *
+ * @param context
+ */
+ @Override
+ public void configure(Context context) {
+ }
+
+}
diff --git a/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItemSet.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/CacheClusterConfig.java
similarity index 52%
copy from inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItemSet.java
copy to inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/CacheClusterConfig.java
index c50e5cd..099e731 100644
--- a/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItemSet.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/CacheClusterConfig.java
@@ -15,49 +15,54 @@
* limitations under the License.
*/
-package org.apache.inlong.commons.config.metrics.set;
+package org.apache.inlong.dataproxy.config.pojo;
-import org.apache.inlong.commons.config.metrics.MetricItemSet;
+import java.util.HashMap;
+import java.util.Map;
/**
*
- * DataProxyMetricItemSet
+ * CacheClusterConfig
*/
-public class DataProxyMetricItemSet extends MetricItemSet<DataProxyMetricItem> {
+public class CacheClusterConfig {
- private static DataProxyMetricItemSet instance;
+ private String clusterName;
+ private Map<String, String> params = new HashMap<>();
/**
- * Constructor
+ * get clusterName
+ *
+ * @return the clusterName
+ */
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ /**
+ * set clusterName
+ *
+ * @param clusterName the clusterName to set
*/
- private DataProxyMetricItemSet() {
+ public void setClusterName(String clusterName) {
+ this.clusterName = clusterName;
}
/**
- * getInstance
+ * get params
*
- * @return
+ * @return the params
*/
- public static DataProxyMetricItemSet getInstance() {
- if (instance != null) {
- return instance;
- }
- synchronized (DataProxyMetricItemSet.class) {
- if (instance == null) {
- instance = new DataProxyMetricItemSet();
- }
- }
- return instance;
+ public Map<String, String> getParams() {
+ return params;
}
/**
- * createItem
+ * set params
*
- * @return
+ * @param params the params to set
*/
- @Override
- protected DataProxyMetricItem createItem() {
- return new DataProxyMetricItem();
+ public void setParams(Map<String, String> params) {
+ this.params = params;
}
}
diff --git a/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItemSet.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/CacheType.java
similarity index 56%
copy from inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItemSet.java
copy to inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/CacheType.java
index c50e5cd..b224777 100644
--- a/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItemSet.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/CacheType.java
@@ -15,49 +15,56 @@
* limitations under the License.
*/
-package org.apache.inlong.commons.config.metrics.set;
-
-import org.apache.inlong.commons.config.metrics.MetricItemSet;
+package org.apache.inlong.dataproxy.config.pojo;
/**
- *
- * DataProxyMetricItemSet
+ * cache cluster type
*/
-public class DataProxyMetricItemSet extends MetricItemSet<DataProxyMetricItem> {
+public enum CacheType {
+
+ TUBE("tube"), KAFKA("kafka"), PULSAR("pulsar"), N("n");
- private static DataProxyMetricItemSet instance;
+ private final String value;
/**
+ *
* Constructor
+ *
+ * @param value
*/
- private DataProxyMetricItemSet() {
+ private CacheType(String value) {
+ this.value = value;
}
/**
- * getInstance
- *
+ * value
+ *
* @return
*/
- public static DataProxyMetricItemSet getInstance() {
- if (instance != null) {
- return instance;
- }
- synchronized (DataProxyMetricItemSet.class) {
- if (instance == null) {
- instance = new DataProxyMetricItemSet();
- }
- }
- return instance;
+ public String value() {
+ return this.value;
}
/**
- * createItem
- *
- * @return
+ * toString
*/
@Override
- protected DataProxyMetricItem createItem() {
- return new DataProxyMetricItem();
+ public String toString() {
+ return this.name() + ":" + this.value;
}
+ /**
+ * convert
+ *
+ * @param value
+ * @return
+ */
+ public static CacheType convert(String value) {
+ for (CacheType v : values()) {
+ if (v.value().equals(value)) {
+ return v;
+ }
+ }
+ return N;
+ }
}
diff --git a/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItemSet.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/DataType.java
similarity index 56%
copy from inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItemSet.java
copy to inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/DataType.java
index c50e5cd..c09433d 100644
--- a/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItemSet.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/DataType.java
@@ -15,49 +15,56 @@
* limitations under the License.
*/
-package org.apache.inlong.commons.config.metrics.set;
-
-import org.apache.inlong.commons.config.metrics.MetricItemSet;
+package org.apache.inlong.dataproxy.config.pojo;
/**
- *
- * DataProxyMetricItemSet
+ * data content type
*/
-public class DataProxyMetricItemSet extends MetricItemSet<DataProxyMetricItem> {
+public enum DataType {
+
+ TEXT("text"), PB("pb"), JCE("jce"), N("n");
- private static DataProxyMetricItemSet instance;
+ private final String value;
/**
+ *
* Constructor
+ *
+ * @param value
*/
- private DataProxyMetricItemSet() {
+ private DataType(String value) {
+ this.value = value;
}
/**
- * getInstance
- *
+ * value
+ *
* @return
*/
- public static DataProxyMetricItemSet getInstance() {
- if (instance != null) {
- return instance;
- }
- synchronized (DataProxyMetricItemSet.class) {
- if (instance == null) {
- instance = new DataProxyMetricItemSet();
- }
- }
- return instance;
+ public String value() {
+ return this.value;
}
/**
- * createItem
- *
- * @return
+ * toString
*/
@Override
- protected DataProxyMetricItem createItem() {
- return new DataProxyMetricItem();
+ public String toString() {
+ return this.name() + ":" + this.value;
}
+ /**
+ * convert
+ *
+ * @param value
+ * @return
+ */
+ public static DataType convert(String value) {
+ for (DataType v : values()) {
+ if (v.value().equals(value)) {
+ return v;
+ }
+ }
+ return N;
+ }
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java
new file mode 100644
index 0000000..c8a1fa3
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/IdTopicConfig.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.pojo;
+
+import org.apache.commons.lang.StringUtils;
+
+/**
+ *
+ * IdTopicConfig
+ */
+public class IdTopicConfig {
+
+ private String uid;
+ private String inlongGroupId;
+ private String inlongStreamid;
+ private String topicName;
+ private DataType dataType = DataType.TEXT;
+ private String fieldDelimiter = "|";
+ private String fileDelimiter = "\n";
+
+ /**
+ * get uid
+ *
+ * @return the uid
+ */
+ public String getUid() {
+ return uid;
+ }
+
+ /**
+ * generateUid
+ *
+ * @param inlongGroupId
+ * @param inlongStreamId
+ * @return
+ */
+ public static String generateUid(String inlongGroupId, String inlongStreamId) {
+ if (StringUtils.isBlank(inlongGroupId)) {
+ if (StringUtils.isBlank(inlongStreamId)) {
+ return "";
+ } else {
+ return inlongStreamId;
+ }
+ } else {
+ if (StringUtils.isBlank(inlongStreamId)) {
+ return inlongGroupId;
+ } else {
+ return inlongGroupId + "." + inlongStreamId;
+ }
+ }
+ }
+
+ /**
+ * get inlongGroupId
+ *
+ * @return the inlongGroupId
+ */
+ public String getInlongGroupId() {
+ return inlongGroupId;
+ }
+
+ /**
+ * set inlongGroupId
+ *
+ * @param inlongGroupId the inlongGroupId to set
+ */
+ public void setInlongGroupId(String inlongGroupId) {
+ this.inlongGroupId = inlongGroupId;
+ this.uid = generateUid(this.inlongGroupId, this.inlongStreamid);
+ }
+
+ /**
+ * get inlongStreamid
+ *
+ * @return the inlongStreamid
+ */
+ public String getInlongStreamid() {
+ return inlongStreamid;
+ }
+
+ /**
+ * set inlongStreamid
+ *
+ * @param inlongStreamid the inlongStreamid to set
+ */
+ public void setInlongStreamid(String inlongStreamid) {
+ this.inlongStreamid = inlongStreamid;
+ this.uid = generateUid(this.inlongGroupId, this.inlongStreamid);
+ }
+
+ /**
+ * get topicName
+ *
+ * @return the topicName
+ */
+ public String getTopicName() {
+ return topicName;
+ }
+
+ /**
+ * set topicName
+ *
+ * @param topicName the topicName to set
+ */
+ public void setTopicName(String topicName) {
+ this.topicName = topicName;
+ }
+
+ /**
+ * get dataType
+ *
+ * @return the dataType
+ */
+ public DataType getDataType() {
+ return dataType;
+ }
+
+ /**
+ * set dataType
+ *
+ * @param dataType the dataType to set
+ */
+ public void setDataType(DataType dataType) {
+ this.dataType = dataType;
+ }
+
+ /**
+ * get fieldDelimiter
+ *
+ * @return the fieldDelimiter
+ */
+ public String getFieldDelimiter() {
+ return fieldDelimiter;
+ }
+
+ /**
+ * set fieldDelimiter
+ *
+ * @param fieldDelimiter the fieldDelimiter to set
+ */
+ public void setFieldDelimiter(String fieldDelimiter) {
+ this.fieldDelimiter = fieldDelimiter;
+ }
+
+ /**
+ * get fileDelimiter
+ *
+ * @return the fileDelimiter
+ */
+ public String getFileDelimiter() {
+ return fileDelimiter;
+ }
+
+ /**
+ * set fileDelimiter
+ *
+ * @param fileDelimiter the fileDelimiter to set
+ */
+ public void setFileDelimiter(String fileDelimiter) {
+ this.fileDelimiter = fileDelimiter;
+ }
+
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItem.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItem.java
new file mode 100644
index 0000000..328078e
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItem.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.metrics;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.flume.Event;
+import org.apache.inlong.commons.config.metrics.CountMetric;
+import org.apache.inlong.commons.config.metrics.Dimension;
+import org.apache.inlong.commons.config.metrics.MetricDomain;
+import org.apache.inlong.commons.config.metrics.MetricItem;
+import org.apache.inlong.dataproxy.utils.Constants;
+
+/**
+ *
+ * DataProxyMetricItem
+ */
+@MetricDomain(name = "DataProxy")
+public class DataProxyMetricItem extends MetricItem {
+
+ public static final String KEY_CLUSTER_ID = "clusterId";
+ public static final String KEY_SOURCE_ID = "sourceId";
+ public static final String KEY_SOURCE_DATA_ID = "sourceDataId";
+ public static final String KEY_INLONG_GROUP_ID = "inlongGroupId";
+ public static final String KEY_INLONG_STREAM_ID = "inlongStreamId";
+ public static final String KEY_SINK_ID = "sinkId";
+ public static final String KEY_SINK_DATA_ID = "sinkDataId";
+ //
+ public static final String M_READ_SUCCESS_COUNT = "readSuccessCount";
+ public static final String M_READ_SUCCESS_SIZE = "readSuccessSize";
+ public static final String M_READ_FAIL_COUNT = "readFailCount";
+ public static final String M_READ_FAIL_SIZE = "readFailSize";
+ public static final String M_SEND_COUNT = "sendCount";
+ public static final String M_SEND_SIZE = "sendSize";
+ public static final String M_SEND_SUCCESS_COUNT = "sendSuccessCount";
+ public static final String M_SEND_SUCCESS_SIZE = "sendSuccessSize";
+ public static final String M_SEND_FAIL_COUNT = "sendFailCount";
+ public static final String M_SEND_FAIL_SIZE = "sendFailSize";
+ //
+ public static final String M_SINK_DURATION = "sinkDuration";
+ public static final String M_NODE_DURATION = "nodeDuration";
+ public static final String M_WHOLE_DURATION = "wholeDuration";
+
+ @Dimension
+ public String clusterId;
+ @Dimension
+ public String sourceId;
+ @Dimension
+ public String sourceDataId;
+ @Dimension
+ public String inlongGroupId;
+ @Dimension
+ public String inlongStreamId;
+ @Dimension
+ public String sinkId;
+ @Dimension
+ public String sinkDataId;
+ @CountMetric
+ public AtomicLong readSuccessCount = new AtomicLong(0);
+ @CountMetric
+ public AtomicLong readSuccessSize = new AtomicLong(0);
+ @CountMetric
+ public AtomicLong readFailCount = new AtomicLong(0);
+ @CountMetric
+ public AtomicLong readFailSize = new AtomicLong(0);
+ @CountMetric
+ public AtomicLong sendCount = new AtomicLong(0);
+ @CountMetric
+ public AtomicLong sendSize = new AtomicLong(0);
+ @CountMetric
+ public AtomicLong sendSuccessCount = new AtomicLong(0);
+ @CountMetric
+ public AtomicLong sendSuccessSize = new AtomicLong(0);
+ @CountMetric
+ public AtomicLong sendFailCount = new AtomicLong(0);
+ @CountMetric
+ public AtomicLong sendFailSize = new AtomicLong(0);
+ @CountMetric
+ // sinkCallbackTime - sinkBeginTime(milliseconds)
+ public AtomicLong sinkDuration = new AtomicLong(0);
+ @CountMetric
+ // sinkCallbackTime - sourceReceiveTime(milliseconds)
+ public AtomicLong nodeDuration = new AtomicLong(0);
+ @CountMetric
+ // sinkCallbackTime - eventCreateTime(milliseconds)
+ public AtomicLong wholeDuration = new AtomicLong(0);
+
+ /**
+ * fillInlongId
+ *
+ * @param event
+ * @param dimensions
+ */
+ public static void fillInlongId(Event event, Map<String, String> dimensions) {
+ Map<String, String> headers = event.getHeaders();
+ String inlongGroupId = headers.getOrDefault(Constants.INLONG_GROUP_ID, "");
+ String inlongStreamId = headers.getOrDefault(Constants.INLONG_STREAM_ID, "");
+ dimensions.put(KEY_INLONG_GROUP_ID, inlongGroupId);
+ dimensions.put(KEY_INLONG_STREAM_ID, inlongStreamId);
+ }
+}
diff --git a/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItemSet.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java
similarity index 68%
copy from inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItemSet.java
copy to inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java
index c50e5cd..bf8d042 100644
--- a/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/DataProxyMetricItemSet.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java
@@ -15,39 +15,25 @@
* limitations under the License.
*/
-package org.apache.inlong.commons.config.metrics.set;
+package org.apache.inlong.dataproxy.metrics;
+import org.apache.inlong.commons.config.metrics.MetricDomain;
import org.apache.inlong.commons.config.metrics.MetricItemSet;
/**
*
- * DataProxyMetricItemSet
+ * MetaSinkMetricItemSet
*/
+@MetricDomain(name = "DataProxy")
public class DataProxyMetricItemSet extends MetricItemSet<DataProxyMetricItem> {
- private static DataProxyMetricItemSet instance;
-
/**
* Constructor
- */
- private DataProxyMetricItemSet() {
- }
-
- /**
- * getInstance
*
- * @return
+ * @param name
*/
- public static DataProxyMetricItemSet getInstance() {
- if (instance != null) {
- return instance;
- }
- synchronized (DataProxyMetricItemSet.class) {
- if (instance == null) {
- instance = new DataProxyMetricItemSet();
- }
- }
- return instance;
+ public DataProxyMetricItemSet(String name) {
+ super(name);
}
/**
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/MetricItemValue.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/MetricItemValue.java
new file mode 100644
index 0000000..6c5f2b7
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/MetricItemValue.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.metrics;
+
+import java.util.Map;
+
+import org.apache.inlong.commons.config.metrics.MetricValue;
+import org.apache.pulsar.shade.com.google.gson.Gson;
+
+/**
+ *
+ * MetricItemValue
+ */
+public class MetricItemValue {
+
+ private final String key;
+ private final Map<String, String> dimensions;
+ private final Map<String, MetricValue> metrics;
+
+ /**
+ * Constructor
+ *
+ * @param key
+ * @param dimensions
+ * @param metrics
+ */
+ public MetricItemValue(String key, Map<String, String> dimensions, Map<String, MetricValue> metrics) {
+ this.key = key;
+ this.dimensions = dimensions;
+ this.metrics = metrics;
+ }
+
+ /**
+ * get key
+ *
+ * @return the key
+ */
+ public String getKey() {
+ return key;
+ }
+
+ /**
+ * get dimensions
+ *
+ * @return the dimensions
+ */
+ public Map<String, String> getDimensions() {
+ return dimensions;
+ }
+
+ /**
+ * get metrics
+ *
+ * @return the metrics
+ */
+ public Map<String, MetricValue> getMetrics() {
+ return metrics;
+ }
+
+ /**
+ * toString
+ *
+ * @return
+ */
+ public String toString() {
+ Gson gson = new Gson();
+ return gson.toJson(this);
+ }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/MetricListener.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/MetricListener.java
new file mode 100644
index 0000000..7ae89f3
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/MetricListener.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.metrics;
+
+import java.util.List;
+
+/**
+ *
+ * MetricListener
+ */
+public interface MetricListener {
+
+ String KEY_METRIC_DOMAINS = "metricDomains";
+ String KEY_DOMAIN_LISTENERS = "domainListeners";
+ String KEY_SNAPSHOT_INTERVAL = "snapshotInterval";
+
+ /**
+ * snapshot
+ *
+ * @param domain
+ * @param itemValues
+ */
+ public void snapshot(String domain, List<MetricItemValue> itemValues);
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/MetricListenerRunnable.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/MetricListenerRunnable.java
new file mode 100644
index 0000000..fcab604
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/MetricListenerRunnable.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.metrics;
+
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.management.AttributeNotFoundException;
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
+
+import org.apache.commons.lang.ClassUtils;
+import org.apache.inlong.commons.config.metrics.MetricItem;
+import org.apache.inlong.commons.config.metrics.MetricItemMBean;
+import org.apache.inlong.commons.config.metrics.MetricItemSetMBean;
+import org.apache.inlong.commons.config.metrics.MetricValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * MetricListenerRunnable
+ */
+public class MetricListenerRunnable implements Runnable {
+
+ public static final Logger LOG = LoggerFactory.getLogger(MetricObserver.class);
+
+ private String domain;
+ private List<MetricListener> listenerList;
+
+ /**
+ * Constructor
+ *
+ * @param domain
+ * @param listenerList
+ */
+ public MetricListenerRunnable(String domain, List<MetricListener> listenerList) {
+ this.domain = domain;
+ this.listenerList = listenerList;
+ }
+
+ /**
+ * run
+ */
+ @Override
+ public void run() {
+ LOG.info("begin to snapshot metric:{}", domain);
+ try {
+ List<MetricItemValue> itemValues = this.getItemValues();
+ LOG.info("snapshot metric:{},size:{}", domain, itemValues.size());
+ this.listenerList.forEach((item) -> {
+ item.snapshot(domain, itemValues);
+ });
+ } catch (Throwable t) {
+ LOG.error(t.getMessage(), t);
+ }
+ LOG.info("end to snapshot metric:{}", domain);
+ }
+
+ /**
+ * getItemValues
+ *
+ * @return MetricItemValue List
+ * @throws InstanceNotFoundException
+ * @throws AttributeNotFoundException
+ * @throws ReflectionException
+ * @throws MBeanException
+ * @throws MalformedObjectNameException
+ * @throws ClassNotFoundException
+ */
+ @SuppressWarnings("unchecked")
+ private List<MetricItemValue> getItemValues() throws InstanceNotFoundException, AttributeNotFoundException,
+ ReflectionException, MBeanException, MalformedObjectNameException, ClassNotFoundException {
+ ObjectName objName = new ObjectName(domain + MetricItemMBean.DOMAIN_SEPARATOR + "*");
+ final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ Set<ObjectInstance> mbeans = mbs.queryMBeans(objName, null);
+ LOG.info("getItemValues for domain:{},queryMBeans:{}", domain, mbeans);
+ List<MetricItemValue> itemValues = new ArrayList<>();
+ for (ObjectInstance mbean : mbeans) {
+ String className = mbean.getClassName();
+ Class<?> clazz = ClassUtils.getClass(className);
+ if (ClassUtils.isAssignable(clazz, MetricItemMBean.class)) {
+ ObjectName metricObjectName = mbean.getObjectName();
+ String dimensionsKey = (String) mbs.getAttribute(metricObjectName,
+ MetricItemMBean.ATTRIBUTE_KEY);
+ Map<String, String> dimensions = (Map<String, String>) mbs
+ .getAttribute(metricObjectName, MetricItemMBean.ATTRIBUTE_DIMENSIONS);
+ Map<String, MetricValue> metrics = (Map<String, MetricValue>) mbs
+ .invoke(metricObjectName, MetricItemMBean.METHOD_SNAPSHOT, null, null);
+ MetricItemValue itemValue = new MetricItemValue(dimensionsKey, dimensions, metrics);
+ LOG.info("MetricItemMBean get itemValue:{}", itemValue);
+ itemValues.add(itemValue);
+ } else if (ClassUtils.isAssignable(clazz, MetricItemSetMBean.class)) {
+ ObjectName metricObjectName = mbean.getObjectName();
+ List<MetricItem> items = (List<MetricItem>) mbs.invoke(metricObjectName,
+ MetricItemMBean.METHOD_SNAPSHOT, null, null);
+ for (MetricItem item : items) {
+ String dimensionsKey = item.getDimensionsKey();
+ Map<String, String> dimensions = item.getDimensions();
+ Map<String, MetricValue> metrics = item.snapshot();
+ MetricItemValue itemValue = new MetricItemValue(dimensionsKey, dimensions, metrics);
+ LOG.info("MetricItemSetMBean get itemValue:{}", itemValue);
+ itemValues.add(itemValue);
+ }
+ }
+ }
+ return itemValues;
+ }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/MetricObserver.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/MetricObserver.java
new file mode 100644
index 0000000..50d9a82
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/MetricObserver.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.metrics;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.lang.ClassUtils;
+import org.apache.flume.Context;
+import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * MetricObserver
+ */
+public class MetricObserver {
+
+ public static final Logger LOG = LoggerFactory.getLogger(MetricObserver.class);
+ private static final AtomicBoolean isInited = new AtomicBoolean(false);
+ private static ScheduledExecutorService statExecutor = Executors.newScheduledThreadPool(5);
+
+ /**
+ * init
+ *
+ * @param commonProperties
+ */
+ public static void init(Map<String, String> commonProperties) {
+ if (!isInited.compareAndSet(false, true)) {
+ return;
+ }
+ // init
+ Context context = new Context(commonProperties);
+ // get domain name list
+ String metricDomains = context.getString(MetricListener.KEY_METRIC_DOMAINS);
+ if (StringUtils.isBlank(metricDomains)) {
+ return;
+ }
+ // split domain name
+ String[] domains = metricDomains.split("\\s+");
+ for (String domain : domains) {
+ // get domain parameters
+ Context domainContext = new Context(
+ context.getSubProperties(MetricListener.KEY_METRIC_DOMAINS + "." + domain + "."));
+ List<MetricListener> listenerList = parseDomain(domain, domainContext);
+ // no listener
+ if (listenerList == null || listenerList.size() <= 0) {
+ continue;
+ }
+ // get snapshot interval
+ long snapshotInterval = domainContext.getLong(MetricListener.KEY_SNAPSHOT_INTERVAL, 60000L);
+ LOG.info("begin to register domain:{},MetricListeners:{},snapshotInterval:{}", domain, listenerList,
+ snapshotInterval);
+ statExecutor.scheduleWithFixedDelay(new MetricListenerRunnable(domain, listenerList), snapshotInterval,
+ snapshotInterval, TimeUnit.MILLISECONDS);
+ }
+
+ }
+
+ /**
+ * parseDomain
+ *
+ * @param domain
+ * @param context
+ * @return
+ */
+ private static List<MetricListener> parseDomain(String domain, Context domainContext) {
+ String listeners = domainContext.getString(MetricListener.KEY_DOMAIN_LISTENERS);
+ if (StringUtils.isBlank(listeners)) {
+ return null;
+ }
+ String[] listenerTypes = listeners.split("\\s+");
+ List<MetricListener> listenerList = new ArrayList<>();
+ for (String listenerType : listenerTypes) {
+ // new listener object
+ try {
+ Class<?> listenerClass = ClassUtils.getClass(listenerType);
+ Object listenerObject = listenerClass.getDeclaredConstructor().newInstance();
+ if (listenerObject == null || !(listenerObject instanceof MetricListener)) {
+ LOG.error("{} is not instance of MetricListener.", listenerType);
+ continue;
+ }
+ final MetricListener listener = (MetricListener) listenerObject;
+ listenerList.add(listener);
+ } catch (Throwable t) {
+ LOG.error("Fail to init MetricListener:{},error:{}",
+ listenerType, t.getMessage());
+ continue;
+ }
+ }
+ return listenerList;
+ }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/prometheus/PrometheusMetricListener.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/prometheus/PrometheusMetricListener.java
new file mode 100644
index 0000000..7c35f96
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/prometheus/PrometheusMetricListener.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.metrics.prometheus;
+
+import java.util.List;
+
+import org.apache.inlong.dataproxy.metrics.MetricItemValue;
+import org.apache.inlong.dataproxy.metrics.MetricListener;
+
+/**
+ *
+ * PrometheusMetricListener
+ */
+public class PrometheusMetricListener implements MetricListener {
+
+ /**
+ * snapshot
+ *
+ * @param domain
+ * @param itemValues
+ */
+ @Override
+ public void snapshot(String domain, List<MetricItemValue> itemValues) {
+ // TODO Auto-generated method stub
+ }
+
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/Application.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/Application.java
index b7307cf..a009d76 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/Application.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/node/Application.java
@@ -54,6 +54,7 @@ import org.apache.flume.util.SSLUtil;
import org.apache.inlong.commons.config.IDataProxyConfigHolder;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.RemoteConfigManager;
+import org.apache.inlong.dataproxy.metrics.MetricObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -385,6 +386,10 @@ public class Application {
application.handleConfigurationEvent(configurationProvider.getConfiguration());
}
}
+ //metrics
+ MetricObserver.init(ConfigManager.getInstance().getCommonProperties());
+
+ //start application
application.start();
final Application appReference = application;
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/MetaSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/MetaSink.java
index 034b2b4..159f860 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/MetaSink.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/MetaSink.java
@@ -17,11 +17,6 @@
package org.apache.inlong.dataproxy.sink;
-import com.google.common.base.Preconditions;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -44,10 +39,14 @@ import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.apache.flume.source.shaded.guava.RateLimiter;
+import org.apache.inlong.commons.config.metrics.MetricRegister;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback;
import org.apache.inlong.dataproxy.consts.AttributeConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
+import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
+import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
+import org.apache.inlong.dataproxy.utils.Constants;
import org.apache.inlong.dataproxy.utils.NetworkUtils;
import org.apache.inlong.tubemq.client.config.TubeClientConfig;
import org.apache.inlong.tubemq.client.exception.TubeClientException;
@@ -58,14 +57,21 @@ import org.apache.inlong.tubemq.client.producer.MessageSentResult;
import org.apache.inlong.tubemq.corebase.Message;
import org.apache.inlong.tubemq.corebase.TErrCodeConstants;
import org.apache.inlong.tubemq.corerpc.exception.OverflowException;
+import org.apache.pulsar.shade.org.apache.commons.lang.math.NumberUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
public class MetaSink extends AbstractSink implements Configurable {
private static final Logger logger = LoggerFactory.getLogger(MetaSink.class);
private static int MAX_TOPICS_EACH_PRODUCER_HOLD = 200;
private static final String TUBE_REQUEST_TIMEOUT = "tube-request-timeout";
+ private static final String KEY_DISK_IO_RATE_PER_SEC = "disk-io-rate-per-sec";
private static int BAD_EVENT_QUEUE_SIZE = 10000;
@@ -135,6 +141,9 @@ public class MetaSink extends AbstractSink implements Configurable {
private long sessionMaxAllowedDelayedMsgCount;
private long nettyWriteBufferHighWaterMark;
private int recoverthreadcount;
+ //
+ private Map<String, String> dimensions;
+ private DataProxyMetricItemSet metricItemSet;
private static final LoadingCache<String, Long> agentIdCache = CacheBuilder
.newBuilder().concurrencyLevel(4 * 8).initialCapacity(5000000).expireAfterAccess(30, TimeUnit.SECONDS)
@@ -321,6 +330,14 @@ public class MetaSink extends AbstractSink implements Configurable {
@Override
public void start() {
+ this.dimensions = new HashMap<>();
+ this.dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy");
+ this.dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getName());
+ //register metrics
+ this.metricItemSet = new DataProxyMetricItemSet(this.getName());
+ MetricRegister.register(metricItemSet);
+
+ //create tube connection
try {
createConnection();
} catch (FlumeException e) {
@@ -521,16 +538,20 @@ public class MetaSink extends AbstractSink implements Configurable {
public class MyCallback implements MessageSentCallback {
private EventStat myEventStat;
+ private long sendTime;
public MyCallback(EventStat eventStat) {
this.myEventStat = eventStat;
+ this.sendTime = System.currentTimeMillis();
}
@Override
public void onMessageSent(final MessageSentResult result) {
if (result.isSuccess()) {
// TODO: add stats
+ this.addMetric(myEventStat.getEvent(), true, sendTime);
} else {
+ this.addMetric(myEventStat.getEvent(), false, 0);
if (result.getErrCode() == TErrCodeConstants.FORBIDDEN) {
logger.warn("Send message failed, error message: {}, resendQueue size: {}, event:{}",
result.getErrMsg(), resendQueue.size(),
@@ -547,6 +568,44 @@ public class MetaSink extends AbstractSink implements Configurable {
}
}
+ /**
+ * addMetric
+ *
+ * @param currentRecord
+ * @param topic
+ * @param result
+ * @param size
+ */
+ private void addMetric(Event currentRecord, boolean result, long sendTime) {
+ Map<String, String> dimensions = new HashMap<>();
+ dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, MetaSink.this.getName());
+ dimensions.put(DataProxyMetricItem.KEY_SINK_ID, MetaSink.this.getName());
+ if (currentRecord.getHeaders().containsKey(TOPIC)) {
+ dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, currentRecord.getHeaders().get(TOPIC));
+ } else {
+ dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, "");
+ }
+ DataProxyMetricItem metricItem = MetaSink.this.metricItemSet.findMetricItem(dimensions);
+ if (result) {
+ metricItem.sendSuccessCount.incrementAndGet();
+ metricItem.sendSuccessSize.addAndGet(currentRecord.getBody().length);
+ if (sendTime > 0) {
+ long currentTime = System.currentTimeMillis();
+ long msgTime = NumberUtils.toLong(currentRecord.getHeaders().get(Constants.HEADER_KEY_MSG_TIME),
+ sendTime);
+ long sinkDuration = currentTime - sendTime;
+ long nodeDuration = currentTime - NumberUtils.toLong(Constants.HEADER_KEY_SOURCE_TIME, msgTime);
+ long wholeDuration = currentTime - msgTime;
+ metricItem.sinkDuration.addAndGet(sinkDuration);
+ metricItem.nodeDuration.addAndGet(nodeDuration);
+ metricItem.wholeDuration.addAndGet(wholeDuration);
+ }
+ } else {
+ metricItem.sendFailCount.incrementAndGet();
+ metricItem.sendFailSize.addAndGet(currentRecord.getBody().length);
+ }
+ }
+
@Override
public void onException(final Throwable e) {
Throwable t = e;
@@ -613,6 +672,15 @@ public class MetaSink extends AbstractSink implements Configurable {
tx.rollback();
} else {
tx.commit();
+ // metric
+ if (event.getHeaders().containsKey(TOPIC)) {
+ dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, event.getHeaders().get(TOPIC));
+ } else {
+ dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, "");
+ }
+ DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions);
+ metricItem.readFailCount.incrementAndGet();
+ metricItem.readFailSize.addAndGet(event.getBody().length);
}
} else {
@@ -712,7 +780,7 @@ public class MetaSink extends AbstractSink implements Configurable {
sinkThreadPool = new Thread[threadNum];
eventQueue = new LinkedBlockingQueue<Event>(EVENT_QUEUE_SIZE);
- diskIORatePerSec = context.getLong("disk-io-rate-per-sec", 0L);
+ diskIORatePerSec = context.getLong(KEY_DISK_IO_RATE_PER_SEC, 0L);
if (diskIORatePerSec != 0) {
diskRateLimiter = RateLimiter.create(diskIORatePerSec);
}
@@ -728,4 +796,13 @@ public class MetaSink extends AbstractSink implements Configurable {
recoverthreadcount = context.getInteger(ConfigConstants.RECOVER_THREAD_COUNT,
Runtime.getRuntime().availableProcessors() + 1);
}
+
+ /**
+ * get metricItemSet
+ * @return the metricItemSet
+ */
+ public DataProxyMetricItemSet getMetricItemSet() {
+ return metricItemSet;
+ }
+
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationSink.java
new file mode 100644
index 0000000..b0bb2a4
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationSink.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.sink.pulsar.federation;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * PulsarSetSink
+ */
+public class PulsarFederationSink extends AbstractSink implements Configurable {
+
+ public static final Logger LOG = LoggerFactory.getLogger(PulsarFederationSink.class);
+
+ private PulsarFederationSinkContext context;
+ private List<PulsarFederationWorker> workers = new ArrayList<>();
+ private Map<String, String> dimensions;
+
+ /**
+ * start
+ */
+ @Override
+ public void start() {
+ String sinkName = this.getName();
+ // create worker
+ for (int i = 0; i < context.getMaxThreads(); i++) {
+ PulsarFederationWorker worker = new PulsarFederationWorker(sinkName, i, context);
+ worker.start();
+ this.workers.add(worker);
+ }
+ super.start();
+ }
+
+ /**
+ * stop
+ */
+ @Override
+ public void stop() {
+ for (PulsarFederationWorker worker : workers) {
+ try {
+ worker.close();
+ } catch (Throwable e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+ this.context.close();
+ super.stop();
+ }
+
+ /**
+ * configure
+ *
+ * @param context
+ */
+ @Override
+ public void configure(Context context) {
+ LOG.info("start to configure:{}, context:{}.", this.getClass().getSimpleName(), context.toString());
+ this.context = new PulsarFederationSinkContext(this.getName(), context);
+ this.dimensions = new HashMap<>();
+ this.dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.context.getProxyClusterId());
+ this.dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.getName());
+ }
+
+ /**
+ * process
+ *
+ * @return Status
+ * @throws EventDeliveryException
+ */
+ @Override
+ public Status process() throws EventDeliveryException {
+ Channel channel = getChannel();
+ Transaction tx = channel.getTransaction();
+ tx.begin();
+ try {
+ Event event = channel.take();
+ if (event == null) {
+ tx.commit();
+ return Status.BACKOFF;
+ }
+ //
+ int eventSize = event.getBody().length;
+ if (!this.context.getBufferQueue().tryAcquire(eventSize)) {
+ // record the failure of queue full for monitor
+ // metric
+ DataProxyMetricItem metricItem = this.context.getMetricItemSet().findMetricItem(dimensions);
+ metricItem.readFailCount.incrementAndGet();
+ metricItem.readFailSize.addAndGet(eventSize);
+ //
+ tx.rollback();
+ return Status.BACKOFF;
+ }
+ this.context.getBufferQueue().offer(event);
+ tx.commit();
+ return Status.READY;
+ } catch (Throwable t) {
+ LOG.error("Process event failed!" + this.getName(), t);
+ try {
+ tx.rollback();
+ // metric
+ DataProxyMetricItem metricItem = this.context.getMetricItemSet().findMetricItem(dimensions);
+ metricItem.readFailCount.incrementAndGet();
+ } catch (Throwable e) {
+ LOG.error("Channel take transaction rollback exception:" + getName(), e);
+ }
+ return Status.BACKOFF;
+ } finally {
+ tx.close();
+ }
+ }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationSinkContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationSinkContext.java
new file mode 100644
index 0000000..a62abeb
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationSinkContext.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.sink.pulsar.federation;
+
+import java.util.Map;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.inlong.commons.config.metrics.MetricRegister;
+import org.apache.inlong.dataproxy.config.RemoteConfigManager;
+import org.apache.inlong.dataproxy.config.holder.CacheClusterConfigHolder;
+import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.dataproxy.config.holder.IdTopicConfigHolder;
+import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
+import org.apache.inlong.dataproxy.utils.BufferQueue;
+
+/**
+ *
+ * PulsarFederationContext
+ */
+public class PulsarFederationSinkContext {
+
+ public static final String KEY_MAX_THREADS = "max-threads";
+ public static final String KEY_MAXTRANSACTION = "maxTransaction";
+ public static final String KEY_PROCESSINTERVAL = "processInterval";
+ public static final String KEY_RELOADINTERVAL = "reloadInterval";
+ public static final String KEY_MAXBUFFERQUEUESIZE = "maxBufferQueueSize";
+ public static final String PREFIX_PRODUCER = "producer.";
+
+ private final String proxyClusterId;
+ private final Context sinkContext;
+ private final Context producerContext;
+ //
+ private final IdTopicConfigHolder idTopicHolder;
+ private final CacheClusterConfigHolder cacheHolder;
+ private final BufferQueue<Event> bufferQueue;
+ //
+ private final int maxThreads;
+ private final int maxTransaction;
+ private final long processInterval;
+ private final long reloadInterval;
+ //
+ private final DataProxyMetricItemSet metricItemSet;
+
+ /**
+ * Constructor
+ *
+ * @param context
+ */
+ public PulsarFederationSinkContext(String sinkName, Context context) {
+ this.proxyClusterId = CommonPropertiesHolder.getString(RemoteConfigManager.KEY_PROXY_CLUSTER_NAME);
+ this.sinkContext = context;
+ this.maxThreads = context.getInteger(KEY_MAX_THREADS, 10);
+ this.maxTransaction = context.getInteger(KEY_MAXTRANSACTION, 1);
+ this.processInterval = context.getInteger(KEY_PROCESSINTERVAL, 100);
+ this.reloadInterval = context.getLong(KEY_RELOADINTERVAL, 60000L);
+ //
+ this.idTopicHolder = new IdTopicConfigHolder();
+ this.idTopicHolder.configure(context);
+ this.idTopicHolder.start();
+ //
+ this.cacheHolder = new CacheClusterConfigHolder();
+ this.cacheHolder.configure(context);
+ this.cacheHolder.start();
+ //
+ int maxBufferQueueSize = context.getInteger(KEY_MAXBUFFERQUEUESIZE, 128 * 1024);
+ this.bufferQueue = new BufferQueue<Event>(maxBufferQueueSize);
+ //
+ Map<String, String> producerParams = context.getSubProperties(PREFIX_PRODUCER);
+ this.producerContext = new Context(producerParams);
+ //
+ this.metricItemSet = new DataProxyMetricItemSet(sinkName);
+ MetricRegister.register(this.metricItemSet);
+ }
+
+ /**
+ * close
+ */
+ public void close() {
+ this.idTopicHolder.close();
+ this.cacheHolder.close();
+ }
+
+ /**
+ * get proxyClusterId
+ *
+ * @return the proxyClusterId
+ */
+ public String getProxyClusterId() {
+ return proxyClusterId;
+ }
+
+ /**
+ * get sinkContext
+ *
+ * @return the sinkContext
+ */
+ public Context getSinkContext() {
+ return sinkContext;
+ }
+
+ /**
+ * get producerContext
+ *
+ * @return the producerContext
+ */
+ public Context getProducerContext() {
+ return producerContext;
+ }
+
+ /**
+ * get idTopicHolder
+ *
+ * @return the idTopicHolder
+ */
+ public IdTopicConfigHolder getIdTopicHolder() {
+ return idTopicHolder;
+ }
+
+ /**
+ * get cacheHolder
+ *
+ * @return the cacheHolder
+ */
+ public CacheClusterConfigHolder getCacheHolder() {
+ return cacheHolder;
+ }
+
+ /**
+ * get bufferQueue
+ *
+ * @return the bufferQueue
+ */
+ public BufferQueue<Event> getBufferQueue() {
+ return bufferQueue;
+ }
+
+ /**
+ * get maxThreads
+ *
+ * @return the maxThreads
+ */
+ public int getMaxThreads() {
+ return maxThreads;
+ }
+
+ /**
+ * get maxTransaction
+ *
+ * @return the maxTransaction
+ */
+ public int getMaxTransaction() {
+ return maxTransaction;
+ }
+
+ /**
+ * get processInterval
+ *
+ * @return the processInterval
+ */
+ public long getProcessInterval() {
+ return processInterval;
+ }
+
+ /**
+ * get reloadInterval
+ *
+ * @return the reloadInterval
+ */
+ public long getReloadInterval() {
+ return reloadInterval;
+ }
+
+ /**
+ * get metricItemSet
+ *
+ * @return the metricItemSet
+ */
+ public DataProxyMetricItemSet getMetricItemSet() {
+ return metricItemSet;
+ }
+
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationWorker.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationWorker.java
new file mode 100644
index 0000000..914cff3
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarFederationWorker.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.sink.pulsar.federation;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flume.Event;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
+import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
+import org.apache.inlong.dataproxy.utils.Constants;
+import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * PulsarSetWorker
+ */
+public class PulsarFederationWorker extends Thread {
+
+ public static final Logger LOG = LoggerFactory.getLogger(PulsarFederationWorker.class);
+
+ private final String workerName;
+ private final PulsarFederationSinkContext context;
+
+ private PulsarProducerFederation producerFederation;
+ private LifecycleState status;
+ private Map<String, String> dimensions;
+
+ /**
+ * Constructor
+ *
+ * @param sinkName
+ * @param workerIndex
+ * @param context
+ */
+ public PulsarFederationWorker(String sinkName, int workerIndex, PulsarFederationSinkContext context) {
+ super();
+ this.workerName = sinkName + "-worker-" + workerIndex;
+ this.context = context;
+ this.producerFederation = new PulsarProducerFederation(workerName, this.context);
+ this.status = LifecycleState.IDLE;
+ this.dimensions = new HashMap<>();
+ this.dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.context.getProxyClusterId());
+ this.dimensions.put(DataProxyMetricItem.KEY_SINK_ID, sinkName);
+ }
+
+ /**
+ * start
+ */
+ @Override
+ public void start() {
+ this.producerFederation.start();
+ this.status = LifecycleState.START;
+ super.start();
+ }
+
+ /**
+ *
+ * close
+ */
+ public void close() {
+ // close all producers
+ this.producerFederation.close();
+ this.status = LifecycleState.STOP;
+ }
+
+ /**
+ * run
+ */
+ @Override
+ public void run() {
+ LOG.info(String.format("start PulsarSetWorker:%s", this.workerName));
+ while (status != LifecycleState.STOP) {
+ try {
+ Event currentRecord = context.getBufferQueue().pollRecord();
+ if (currentRecord == null) {
+ Thread.sleep(context.getProcessInterval());
+ continue;
+ }
+ // fill topic
+ this.fillTopic(currentRecord);
+ // metric
+ DataProxyMetricItem.fillInlongId(currentRecord, dimensions);
+ this.dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID,
+ currentRecord.getHeaders().get(Constants.TOPIC));
+ DataProxyMetricItem metricItem = this.context.getMetricItemSet().findMetricItem(dimensions);
+ metricItem.sendCount.incrementAndGet();
+ metricItem.sendSize.addAndGet(currentRecord.getBody().length);
+ // send
+ this.producerFederation.send(currentRecord);
+ } catch (Throwable e) {
+ LOG.error(e.getMessage(), e);
+ this.sleepOneInterval();
+ }
+ }
+ }
+
+ /**
+ * fillTopic
+ *
+ * @param currentRecord
+ */
+ private void fillTopic(Event currentRecord) {
+ Map<String, String> headers = currentRecord.getHeaders();
+ String inlongGroupId = headers.get(Constants.INLONG_GROUP_ID);
+ String inlongStreamId = headers.get(Constants.INLONG_STREAM_ID);
+ String uid = IdTopicConfig.generateUid(inlongGroupId, inlongStreamId);
+ String topic = this.context.getIdTopicHolder().getTopic(uid);
+ if (!StringUtils.isBlank(topic)) {
+ headers.put(Constants.TOPIC, topic);
+ }
+ }
+
+ /**
+ * sleepOneInterval
+ */
+ private void sleepOneInterval() {
+ try {
+ Thread.sleep(context.getProcessInterval());
+ } catch (InterruptedException e1) {
+ LOG.error(e1.getMessage(), e1);
+ }
+ }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java
new file mode 100644
index 0000000..69a8ee1
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerCluster.java
@@ -0,0 +1,295 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.sink.pulsar.federation;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.lifecycle.LifecycleAware;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
+import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
+import org.apache.inlong.dataproxy.utils.Constants;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.BatcherBuilder;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.HashingScheme;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.shade.org.apache.commons.lang.math.NumberUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * PulsarProducerCluster
+ */
+public class PulsarProducerCluster implements LifecycleAware {
+
+ public static final Logger LOG = LoggerFactory.getLogger(PulsarProducerCluster.class);
+
+ public static final String KEY_SERVICE_URL = "serviceUrl";
+ public static final String KEY_AUTHENTICATION = "authentication";
+
+ public static final String KEY_ENABLEBATCHING = "enableBatching";
+ public static final String KEY_BATCHINGMAXBYTES = "batchingMaxBytes";
+ public static final String KEY_BATCHINGMAXMESSAGES = "batchingMaxMessages";
+ public static final String KEY_BATCHINGMAXPUBLISHDELAY = "batchingMaxPublishDelay";
+ public static final String KEY_MAXPENDINGMESSAGES = "maxPendingMessages";
+ public static final String KEY_MAXPENDINGMESSAGESACROSSPARTITIONS = "maxPendingMessagesAcrossPartitions";
+ public static final String KEY_SENDTIMEOUT = "sendTimeout";
+ public static final String KEY_COMPRESSIONTYPE = "compressionType";
+ public static final String KEY_BLOCKIFQUEUEFULL = "blockIfQueueFull";
+ public static final String KEY_ROUNDROBINROUTERBATCHINGPARTITIONSWITCHFREQUENCY = "roundRobinRouter"
+ + "BatchingPartitionSwitchFrequency";
+
+ private final String workerName;
+ private final CacheClusterConfig config;
+ private final PulsarFederationSinkContext sinkContext;
+ private final Context context;
+ private final String cacheClusterName;
+ private LifecycleState state;
+
+ /**
+ * pulsar client
+ */
+ private PulsarClient client;
+ private ProducerBuilder<byte[]> baseBuilder;
+
+ private Map<String, Producer<byte[]>> producerMap = new ConcurrentHashMap<>();
+
+ /**
+ * Constructor
+ *
+ * @param workerName
+ * @param config
+ * @param context
+ */
+ public PulsarProducerCluster(String workerName, CacheClusterConfig config, PulsarFederationSinkContext context) {
+ this.workerName = workerName;
+ this.config = config;
+ this.sinkContext = context;
+ this.context = context.getProducerContext();
+ this.state = LifecycleState.IDLE;
+ this.cacheClusterName = config.getClusterName();
+ }
+
+ /**
+ * start
+ */
+ @Override
+ public void start() {
+ this.state = LifecycleState.START;
+ // create pulsar client
+ try {
+ String serviceUrl = config.getParams().get(KEY_SERVICE_URL);
+ String authentication = config.getParams().get(KEY_AUTHENTICATION);
+ this.client = PulsarClient.builder()
+ .serviceUrl(serviceUrl)
+ .authentication(AuthenticationFactory.token(authentication))
+ .build();
+ this.baseBuilder = client.newProducer();
+// Map<String, Object> builderConf = new HashMap<>();
+// builderConf.putAll(context.getParameters());
+ this.baseBuilder
+ .hashingScheme(HashingScheme.Murmur3_32Hash)
+ .enableBatching(context.getBoolean(KEY_ENABLEBATCHING, true))
+ .batchingMaxBytes(context.getInteger(KEY_BATCHINGMAXBYTES, 5242880))
+ .batchingMaxMessages(context.getInteger(KEY_BATCHINGMAXMESSAGES, 3000))
+ .batchingMaxPublishDelay(context.getInteger(KEY_BATCHINGMAXPUBLISHDELAY, 1),
+ TimeUnit.MILLISECONDS);
+ this.baseBuilder.maxPendingMessages(context.getInteger(KEY_MAXPENDINGMESSAGES, 1000))
+ .maxPendingMessagesAcrossPartitions(
+ context.getInteger(KEY_MAXPENDINGMESSAGESACROSSPARTITIONS, 50000))
+ .sendTimeout(context.getInteger(KEY_SENDTIMEOUT, 0), TimeUnit.MILLISECONDS)
+ .compressionType(this.getPulsarCompressionType())
+ .blockIfQueueFull(context.getBoolean(KEY_BLOCKIFQUEUEFULL, true))
+ .roundRobinRouterBatchingPartitionSwitchFrequency(
+ context.getInteger(KEY_ROUNDROBINROUTERBATCHINGPARTITIONSWITCHFREQUENCY, 10))
+ .batcherBuilder(BatcherBuilder.DEFAULT);
+ } catch (Throwable e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * getPulsarCompressionType
+ *
+ * @return CompressionType
+ */
+ private CompressionType getPulsarCompressionType() {
+ String type = this.context.getString(KEY_COMPRESSIONTYPE);
+ switch (type) {
+ case "LZ4" :
+ return CompressionType.LZ4;
+ case "NONE" :
+ return CompressionType.NONE;
+ case "ZLIB" :
+ return CompressionType.ZLIB;
+ case "ZSTD" :
+ return CompressionType.ZSTD;
+ case "SNAPPY" :
+ return CompressionType.SNAPPY;
+ default :
+ return CompressionType.NONE;
+ }
+ }
+
+ /**
+ * stop
+ */
+ @Override
+ public void stop() {
+ this.state = LifecycleState.STOP;
+ //
+ for (Entry<String, Producer<byte[]>> entry : this.producerMap.entrySet()) {
+ try {
+ entry.getValue().close();
+ } catch (PulsarClientException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+ try {
+ this.client.close();
+ } catch (PulsarClientException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * getLifecycleState
+ *
+ * @return
+ */
+ @Override
+ public LifecycleState getLifecycleState() {
+ return state;
+ }
+
+ /**
+ * send
+ *
+ * @param event
+ */
+ public boolean send(Event event) {
+ // send
+ Map<String, String> headers = event.getHeaders();
+ String topic = headers.get(Constants.TOPIC);
+ // get producer
+ Producer<byte[]> producer = this.producerMap.get(topic);
+ if (producer == null) {
+ try {
+ LOG.info("try to new a object for topic " + topic);
+ producer = baseBuilder.clone().topic(topic)
+ .producerName(workerName + "-" + cacheClusterName + "-" + topic)
+ .create();
+ LOG.info("create new producer success:{}", producer.getProducerName());
+ Producer<byte[]> oldProducer = this.producerMap.putIfAbsent(topic, producer);
+ if (oldProducer != null) {
+ producer.close();
+ LOG.info("close producer success:{}", producer.getProducerName());
+ producer = oldProducer;
+ }
+ } catch (Throwable ex) {
+ LOG.error("create new producer failed", ex);
+ }
+ }
+ // create producer failed
+ if (producer == null) {
+ sinkContext.getBufferQueue().release(event.getBody().length);
+ this.addMetric(event, topic, false, 0);
+ return false;
+ }
+ String messageKey = headers.get(Constants.MESSAGE_KEY);
+ if (messageKey == null) {
+ messageKey = headers.get(Constants.HEADER_KEY_SOURCE_IP);
+ }
+ // sendAsync
+ long sendTime = System.currentTimeMillis();
+ CompletableFuture<MessageId> future = producer.newMessage().key(messageKey).properties(headers)
+ .value(event.getBody()).sendAsync();
+ // callback
+ future.whenCompleteAsync((msgId, ex) -> {
+ if (ex != null) {
+ LOG.error("Send fail:{}", ex.getMessage());
+ LOG.error(ex.getMessage(), ex);
+ sinkContext.getBufferQueue().offer(event);
+ this.addMetric(event, topic, false, 0);
+ } else {
+ sinkContext.getBufferQueue().release(event.getBody().length);
+ this.addMetric(event, topic, true, sendTime);
+ }
+ });
+ return true;
+ }
+
+ /**
+ * addMetric
+ *
+ * @param currentRecord
+ * @param topic
+ * @param result
+ * @param size
+ */
+ private void addMetric(Event currentRecord, String topic, boolean result, long sendTime) {
+ Map<String, String> dimensions = new HashMap<>();
+ dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, this.sinkContext.getProxyClusterId());
+ // metric
+ DataProxyMetricItem.fillInlongId(currentRecord, dimensions);
+ dimensions.put(DataProxyMetricItem.KEY_SINK_ID, this.cacheClusterName);
+ dimensions.put(DataProxyMetricItem.KEY_SINK_DATA_ID, topic);
+ DataProxyMetricItem metricItem = this.sinkContext.getMetricItemSet().findMetricItem(dimensions);
+ if (result) {
+ metricItem.sendSuccessCount.incrementAndGet();
+ metricItem.sendSuccessSize.addAndGet(currentRecord.getBody().length);
+ if (sendTime > 0) {
+ long currentTime = System.currentTimeMillis();
+ long msgTime = NumberUtils.toLong(currentRecord.getHeaders().get(Constants.HEADER_KEY_MSG_TIME),
+ sendTime);
+ long sinkDuration = currentTime - sendTime;
+ long nodeDuration = currentTime - NumberUtils.toLong(Constants.HEADER_KEY_SOURCE_TIME, msgTime);
+ long wholeDuration = currentTime - msgTime;
+ metricItem.sinkDuration.addAndGet(sinkDuration);
+ metricItem.nodeDuration.addAndGet(nodeDuration);
+ metricItem.wholeDuration.addAndGet(wholeDuration);
+ }
+ } else {
+ metricItem.sendFailCount.incrementAndGet();
+ metricItem.sendFailSize.addAndGet(currentRecord.getBody().length);
+ }
+ }
+
+ /**
+ * get cacheClusterName
+ *
+ * @return the cacheClusterName
+ */
+ public String getCacheClusterName() {
+ return cacheClusterName;
+ }
+
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerFederation.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerFederation.java
new file mode 100644
index 0000000..bc387ef
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/pulsar/federation/PulsarProducerFederation.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.sink.pulsar.federation;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.flume.Event;
+import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * PulsarProducerSet
+ */
+public class PulsarProducerFederation {
+
+ public static final Logger LOG = LoggerFactory.getLogger(PulsarProducerFederation.class);
+
+ private final String workerName;
+ private final PulsarFederationSinkContext context;
+ private Timer reloadTimer;
+
+ private List<PulsarProducerCluster> clusterList = new ArrayList<>();
+ private List<PulsarProducerCluster> deletingClusterList = new ArrayList<>();
+
+ private AtomicInteger clusterIndex = new AtomicInteger(0);
+
+ /**
+ * Constructor
+ *
+ * @param workerName
+ * @param context
+ */
+ public PulsarProducerFederation(String workerName, PulsarFederationSinkContext context) {
+ this.workerName = workerName;
+ this.context = context;
+ }
+
+ /**
+ * start
+ */
+ public void start() {
+ try {
+ this.reload();
+ this.setReloadTimer();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * close
+ */
+ public void close() {
+ try {
+ this.reloadTimer.cancel();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ for (PulsarProducerCluster cluster : this.clusterList) {
+ cluster.stop();
+ }
+ }
+
+ /**
+ * setReloadTimer
+ */
+ private void setReloadTimer() {
+ reloadTimer = new Timer(true);
+ TimerTask task = new TimerTask() {
+
+ public void run() {
+ reload();
+ }
+ };
+ reloadTimer.schedule(task, new Date(System.currentTimeMillis() + context.getReloadInterval()),
+ context.getReloadInterval());
+ }
+
+ /**
+ * reload
+ */
+ public void reload() {
+ try {
+ // stop deleted cluster
+ deletingClusterList.forEach(item -> {
+ item.stop();
+ });
+ deletingClusterList.clear();
+ // update cluster list
+ List<CacheClusterConfig> configList = this.context.getCacheHolder().getConfigList();
+ List<PulsarProducerCluster> newClusterList = new ArrayList<>(configList.size());
+ // prepare
+ Set<String> newClusterNames = new HashSet<>();
+ configList.forEach(item -> {
+ newClusterNames.add(item.getClusterName());
+ });
+ Set<String> oldClusterNames = new HashSet<>();
+ clusterList.forEach(item -> {
+ oldClusterNames.add(item.getCacheClusterName());
+ });
+ // add
+ for (CacheClusterConfig config : configList) {
+ if (!oldClusterNames.contains(config.getClusterName())) {
+ PulsarProducerCluster cluster = new PulsarProducerCluster(workerName, config, context);
+ cluster.start();
+ newClusterList.add(cluster);
+ }
+ }
+ // remove
+ for (PulsarProducerCluster cluster : this.clusterList) {
+ if (newClusterNames.contains(cluster.getCacheClusterName())) {
+ newClusterList.add(cluster);
+ } else {
+ deletingClusterList.add(cluster);
+ }
+ }
+ this.clusterList = newClusterList;
+ } catch (Throwable e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * send
+ *
+ * @param event
+ */
+ public boolean send(Event event) {
+ int currentIndex = clusterIndex.getAndIncrement();
+ if (currentIndex > Integer.MAX_VALUE / 2) {
+ clusterIndex.set(0);
+ }
+ List<PulsarProducerCluster> currentClusterList = this.clusterList;
+ int currentSize = currentClusterList.size();
+ int realIndex = currentIndex % currentSize;
+ PulsarProducerCluster clusterProducer = currentClusterList.get(realIndex);
+ return clusterProducer.send(event);
+ }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
index da1d627..eb58398 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageFactory.java
@@ -21,6 +21,7 @@ import java.lang.reflect.Constructor;
import java.util.concurrent.TimeUnit;
import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.source.AbstractSource;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
@@ -40,6 +41,7 @@ public class ServerMessageFactory implements ChannelPipelineFactory {
private static final Logger LOG = LoggerFactory.getLogger(ServerMessageFactory.class);
private static final int DEFAULT_READ_IDLE_TIME = 70 * 60 * 1000;
+ private AbstractSource source;
private ChannelProcessor processor;
private ChannelGroup allChannels;
private ExecutionHandler executionHandler;
@@ -71,12 +73,13 @@ public class ServerMessageFactory implements ChannelPipelineFactory {
* @param isCompressed
* @param name
*/
- public ServerMessageFactory(ChannelProcessor processor,
+ public ServerMessageFactory(AbstractSource source,
ChannelGroup allChannels, String protocol, ServiceDecoder serProcessor,
String messageHandlerName, Integer maxMsgLength,
String topic, String attr, Boolean filterEmptyMsg, Integer maxCons,
Boolean isCompressed, String name) {
- this.processor = processor;
+ this.source = source;
+ this.processor = source.getChannelProcessor();
this.allChannels = allChannels;
this.topic = topic;
this.attr = attr;
@@ -124,12 +127,12 @@ public class ServerMessageFactory implements ChannelPipelineFactory {
.forName(messageHandlerName);
Constructor<?> ctor = clazz.getConstructor(
- ChannelProcessor.class, ServiceDecoder.class, ChannelGroup.class,
+ AbstractSource.class, ServiceDecoder.class, ChannelGroup.class,
String.class, String.class, Boolean.class, Integer.class,
Integer.class, Boolean.class, String.class);
SimpleChannelHandler messageHandler = (SimpleChannelHandler) ctor
- .newInstance(processor, serviceProcessor, allChannels, topic, attr,
+ .newInstance(source, serviceProcessor, allChannels, topic, attr,
filterEmptyMsg, maxMsgLength, maxConnections, isCompressed, protocolType
);
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index eadf2bc..e6eee0e 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -18,13 +18,10 @@
package org.apache.inlong.dataproxy.source;
import static org.apache.inlong.dataproxy.consts.AttributeConstants.SEPARATOR;
-import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_GROUPID;
import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_DATA;
+import static org.apache.inlong.dataproxy.consts.ConfigConstants.SLA_METRIC_GROUPID;
import static org.apache.inlong.dataproxy.source.SimpleTcpSource.blacklist;
-import com.google.common.base.Joiner;
-import com.google.common.base.Splitter;
-
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
@@ -34,13 +31,14 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
import java.util.concurrent.atomic.AtomicReference;
+
import org.apache.commons.lang.StringUtils;
import org.apache.flume.ChannelException;
import org.apache.flume.Event;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.event.EventBuilder;
+import org.apache.flume.source.AbstractSource;
import org.apache.inlong.commons.msg.TDMsg1;
import org.apache.inlong.dataproxy.base.ProxyMessage;
import org.apache.inlong.dataproxy.config.ConfigManager;
@@ -48,6 +46,8 @@ import org.apache.inlong.dataproxy.consts.AttributeConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.exception.ErrorCode;
import org.apache.inlong.dataproxy.exception.MessageIDException;
+import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
+import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
@@ -60,6 +60,9 @@ import org.jboss.netty.channel.group.ChannelGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+
/**
* Server message handler
*
@@ -91,7 +94,7 @@ public class ServerMessageHandler extends SimpleChannelHandler {
return new SimpleDateFormat("yyyyMMddHHmmss");
}
};
-
+ private AbstractSource source;
private final ChannelGroup allChannels;
private int maxConnections = Integer.MAX_VALUE;
private boolean filterEmptyMsg = false;
@@ -102,14 +105,16 @@ public class ServerMessageHandler extends SimpleChannelHandler {
private String defaultMXAttr = "m=3";
private final ChannelBuffer heartbeatBuffer;
private final String protocolType;
+ //
+ private final DataProxyMetricItemSet metricItemSet;
- public ServerMessageHandler(ChannelProcessor processor, ServiceDecoder serProcessor,
+ public ServerMessageHandler(AbstractSource source, ServiceDecoder serProcessor,
ChannelGroup allChannels,
String topic, String attr, Boolean filterEmptyMsg, Integer maxMsgLength,
Integer maxCons,
Boolean isCompressed, String protocolType) {
-
- this.processor = processor;
+ this.source = source;
+ this.processor = source.getChannelProcessor();
this.serviceProcessor = serProcessor;
this.allChannels = allChannels;
this.defaultTopic = topic;
@@ -122,6 +127,11 @@ public class ServerMessageHandler extends SimpleChannelHandler {
this.heartbeatBuffer = ChannelBuffers.wrappedBuffer(new byte[]{0, 0, 0, 1, 1});
this.maxConnections = maxCons;
this.protocolType = protocolType;
+ if (source instanceof SimpleTcpSource) {
+ this.metricItemSet = ((SimpleTcpSource) source).getMetricItemSet();
+ } else {
+ this.metricItemSet = new DataProxyMetricItemSet(this.toString());
+ }
}
private String getRemoteIp(Channel channel) {
@@ -420,9 +430,10 @@ public class ServerMessageHandler extends SimpleChannelHandler {
dtten = dtten * 1000 * 60 * 10;
try {
processor.processEvent(event);
+ this.addMetric(true, data.length);
} catch (Throwable ex) {
logger.error("Error writting to channel,data will discard.", ex);
-
+ this.addMetric(false, data.length);
throw new ChannelException("ProcessEvent error can't write event to channel.");
}
}
@@ -524,6 +535,7 @@ public class ServerMessageHandler extends SimpleChannelHandler {
logger.info("message received");
if (e == null) {
logger.error("get null messageevent, just skip");
+ this.addMetric(false, 0);
return;
}
ChannelBuffer cb = ((ChannelBuffer) e.getMessage());
@@ -533,6 +545,7 @@ public class ServerMessageHandler extends SimpleChannelHandler {
if (len == 0 && this.filterEmptyMsg) {
logger.warn("skip empty msg.");
cb.clear();
+ this.addMetric(false, 0);
return;
}
@@ -541,23 +554,27 @@ public class ServerMessageHandler extends SimpleChannelHandler {
try {
resultMap = serviceProcessor.extractData(cb, remoteChannel);
} catch (MessageIDException ex) {
+ this.addMetric(false, 0);
throw new IOException(ex.getCause());
}
if (resultMap == null) {
logger.info("result is null");
+ this.addMetric(false, 0);
return;
}
MsgType msgType = (MsgType) resultMap.get(ConfigConstants.MSG_TYPE);
if (MsgType.MSG_HEARTBEAT.equals(msgType)) {
remoteChannel.write(heartbeatBuffer, remoteSocketAddress);
+ this.addMetric(false, 0);
return;
}
if (MsgType.MSG_BIN_HEARTBEAT.equals(msgType)) {
// ChannelBuffer binBuffer = getBinHeart(resultMap,msgType);
// remoteChannel.write(binBuffer, remoteSocketAddress);
+ this.addMetric(false, 0);
return;
}
@@ -591,9 +608,10 @@ public class ServerMessageHandler extends SimpleChannelHandler {
Event event = EventBuilder.withBody(body, headers);
try {
processor.processEvent(event);
+ this.addMetric(true, body.length);
} catch (Throwable ex) {
logger.error("Error writing to controller,data will discard.", ex);
-
+ this.addMetric(false, body.length);
throw new ChannelException(
"Process Controller Event error can't write event to channel.");
}
@@ -611,9 +629,10 @@ public class ServerMessageHandler extends SimpleChannelHandler {
Event event = EventBuilder.withBody(body, headers);
try {
processor.processEvent(event);
+ this.addMetric(true, body.length);
} catch (Throwable ex) {
logger.error("Error writing to controller,data will discard.", ex);
-
+ this.addMetric(false, body.length);
throw new ChannelException(
"Process Controller Event error can't write event to channel.");
}
@@ -631,4 +650,29 @@ public class ServerMessageHandler extends SimpleChannelHandler {
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
logger.error("channel closed {}", ctx.getChannel());
}
+
+ /**
+ * addMetric
+ *
+ * @param currentRecord
+ * @param topic
+ * @param result
+ * @param size
+ */
+ private void addMetric(boolean result, long size) {
+ Map<String, String> dimensions = new HashMap<>();
+ dimensions.put(DataProxyMetricItem.KEY_CLUSTER_ID, "DataProxy");
+ dimensions.put(DataProxyMetricItem.KEY_SOURCE_ID, source.getName());
+ dimensions.put(DataProxyMetricItem.KEY_SOURCE_DATA_ID, source.getName());
+ dimensions.put(DataProxyMetricItem.KEY_INLONG_GROUP_ID, "");
+ dimensions.put(DataProxyMetricItem.KEY_INLONG_STREAM_ID, "");
+ DataProxyMetricItem metricItem = this.metricItemSet.findMetricItem(dimensions);
+ if (result) {
+ metricItem.readSuccessCount.incrementAndGet();
+ metricItem.readSuccessSize.addAndGet(size);
+ } else {
+ metricItem.readFailCount.incrementAndGet();
+ metricItem.readFailSize.addAndGet(size);
+ }
+ }
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
index 59df0a6..af7c8da 100644
--- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleTcpSource.java
@@ -17,8 +17,6 @@
package org.apache.inlong.dataproxy.source;
-import com.google.common.base.Preconditions;
-
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
@@ -37,12 +35,13 @@ import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.FlumeException;
-import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.conf.Configurables;
import org.apache.flume.source.AbstractSource;
+import org.apache.inlong.commons.config.metrics.MetricRegister;
import org.apache.inlong.dataproxy.base.NamedThreadFactory;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
+import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
@@ -55,6 +54,8 @@ import org.jboss.netty.util.ThreadRenamingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
/**
* Simple tcp source
*
@@ -92,6 +93,8 @@ public class SimpleTcpSource extends AbstractSource implements Configurable, Eve
protected boolean filterEmptyMsg;
private Channel nettyChannel = null;
+ //
+ private DataProxyMetricItemSet metricItemSet;
public SimpleTcpSource() {
super();
@@ -190,6 +193,8 @@ public class SimpleTcpSource extends AbstractSource implements Configurable, Eve
@Override
public synchronized void start() {
logger.info("start " + this.getName());
+ this.metricItemSet = new DataProxyMetricItemSet(this.getName());
+ MetricRegister.register(metricItemSet);
checkBlackListThread = new CheckBlackListThread();
checkBlackListThread.start();
super.start();
@@ -222,13 +227,13 @@ public class SimpleTcpSource extends AbstractSource implements Configurable, Eve
(Class<? extends ChannelPipelineFactory>) Class.forName(msgFactoryName);
Constructor ctor =
- clazz.getConstructor(ChannelProcessor.class, ChannelGroup.class,
+ clazz.getConstructor(AbstractSource.class, ChannelGroup.class,
String.class, ServiceDecoder.class, String.class,
Integer.class, String.class, String.class, Boolean.class,
Integer.class, Boolean.class, String.class);
- logger.info("Using channel processor:{}", getChannelProcessor().getClass().getName());
- fac = (ChannelPipelineFactory) ctor.newInstance(getChannelProcessor(), allChannels,
+ logger.info("Using channel processor:{}", this.getClass().getName());
+ fac = (ChannelPipelineFactory) ctor.newInstance(this, allChannels,
"tcp", serviceDecoder, messageHandlerName,
maxMsgLength, topic, attr, filterEmptyMsg, maxConnections, isCompressed, this.getName());
@@ -367,4 +372,12 @@ public class SimpleTcpSource extends AbstractSource implements Configurable, Eve
"maxMsgLength must be >= 4 and <= " + ConfigConstants.MSG_MAX_LENGTH_BYTES);
isCompressed = context.getBoolean(ConfigConstants.MSG_COMPRESSED, true);
}
+
+ /**
+ * get metricItemSet
+ * @return the metricItemSet
+ */
+ public DataProxyMetricItemSet getMetricItemSet() {
+ return metricItemSet;
+ }
}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/BufferQueue.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/BufferQueue.java
new file mode 100644
index 0000000..34bb46d
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/BufferQueue.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.utils;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * BufferQueue
+ */
+public class BufferQueue<A> {
+
+ private final LinkedBlockingQueue<A> queue;
+ private final SizeSemaphore currentTokens;
+ private SizeSemaphore globalTokens;
+ private final AtomicLong offerCount = new AtomicLong(0);
+ private final AtomicLong pollCount = new AtomicLong(0);
+
+ /**
+ * Constructor
+ *
+ * @param maxSizeKb
+ */
+ public BufferQueue(int maxSizeKb) {
+ this.queue = new LinkedBlockingQueue<>();
+ this.currentTokens = new SizeSemaphore(maxSizeKb, SizeSemaphore.ONEKB);
+ }
+
+ /**
+ * Constructor
+ *
+ * @param maxSizeKb
+ * @param globalTokens
+ */
+ public BufferQueue(int maxSizeKb, SizeSemaphore globalTokens) {
+ this(maxSizeKb);
+ this.globalTokens = globalTokens;
+ }
+
+ /**
+ * pollRecord
+ */
+ public A pollRecord() {
+ A record = queue.poll();
+ this.pollCount.getAndIncrement();
+ return record;
+ }
+
+ /**
+ * offer
+ */
+ public void offer(A record) {
+ if (record == null) {
+ return;
+ }
+ queue.offer(record);
+ this.offerCount.incrementAndGet();
+ }
+
+ /**
+ * queue size
+ */
+ public int size() {
+ return queue.size();
+ }
+
+ /**
+ * small change
+ */
+ public int leftKb() {
+ return currentTokens.leftSemaphore();
+ }
+
+ /**
+ * availablePermits
+ */
+ public int availablePermits() {
+ return currentTokens.availablePermits();
+ }
+
+ /**
+ * maxSizeKb
+ */
+ public int maxSizeKb() {
+ return currentTokens.maxSize();
+ }
+
+ /**
+ * getIdleRate
+ */
+ public double getIdleRate() {
+ double remaining = currentTokens.availablePermits();
+ return remaining * 100.0 / currentTokens.maxSize();
+ }
+
+ /**
+ * tryAcquire
+ */
+ public boolean tryAcquire(long sizeInByte) {
+ boolean cidResult = currentTokens.tryAcquire(sizeInByte);
+ if (!cidResult) {
+ return false;
+ }
+ if (this.globalTokens == null) {
+ return true;
+ }
+ boolean globalResult = this.globalTokens.tryAcquire(sizeInByte);
+ if (globalResult) {
+ return true;
+ }
+ currentTokens.release(sizeInByte);
+ return false;
+ }
+
+ /**
+ * acquire
+ */
+ public void acquire(long sizeInByte) {
+ currentTokens.acquire(sizeInByte);
+ if (this.globalTokens != null) {
+ globalTokens.acquire(sizeInByte);
+ }
+ }
+
+ /**
+ * release
+ */
+ public void release(long sizeInByte) {
+ if (this.globalTokens != null) {
+ this.globalTokens.release(sizeInByte);
+ }
+ this.currentTokens.release(sizeInByte);
+ }
+
+ /**
+ * get offerCount
+ *
+ * @return the offerCount
+ */
+ public long getOfferCount() {
+ return offerCount.getAndSet(0);
+ }
+
+ /**
+ * get pollCount
+ *
+ * @return the pollCount
+ */
+ public long getPollCount() {
+ return pollCount.getAndSet(0);
+ }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/Constants.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/Constants.java
new file mode 100644
index 0000000..18a8677
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/Constants.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.utils;
+
+/**
+ *
+ * Constants
+ */
+public interface Constants {
+
+ String INLONG_GROUP_ID = "inlongGroupId";
+ String INLONG_STREAM_ID = "inlongStreamId";
+ String TOPIC = "topic";
+ String HEADER_KEY_MSG_TIME = "msgTime";
+ String HEADER_KEY_SOURCE_IP = "sourceIp";
+ String HEADER_KEY_SOURCE_TIME = "sourceTime";
+ String MESSAGE_KEY = "messageKey";
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/SizeSemaphore.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/SizeSemaphore.java
new file mode 100644
index 0000000..31803a6
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/SizeSemaphore.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.utils;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * SizeSemaphore
+ */
+public class SizeSemaphore {
+
+ public static final int ONEKB = 1024;
+
+ private int maxSize = 0;
+ private int leftSize = 0;
+ private Semaphore sizeSemaphore = null;
+ private AtomicInteger leftSemaphore = new AtomicInteger(0);
+
+ /**
+ * Constructor
+ *
+ * @param maxSize
+ * @param leftSize
+ */
+ public SizeSemaphore(int maxSize, int leftSize) {
+ this.maxSize = maxSize;
+ this.leftSize = leftSize;
+ this.sizeSemaphore = new Semaphore(maxSize, true);
+ }
+
+ /**
+ * small change
+ */
+ public int leftSemaphore() {
+ return leftSemaphore.get();
+ }
+
+ /**
+ * availablePermits
+ */
+ public int availablePermits() {
+ return sizeSemaphore.availablePermits();
+ }
+
+ /**
+ * maxSize
+ */
+ public int maxSize() {
+ return maxSize;
+ }
+
+ /**
+ * getIdleRate
+ */
+ public double getIdleRate() {
+ double remaining = sizeSemaphore.availablePermits();
+ return remaining * 100.0 / maxSize;
+ }
+
+ /**
+ * tryAcquire
+ */
+ public boolean tryAcquire(long sizeInByte) {
+ int sizeInKb = (int) (sizeInByte / leftSize);
+ int sizeChange = (int) (sizeInByte % leftSize);
+ if (leftSemaphore.get() - sizeChange < 0) {
+ boolean result = sizeSemaphore.tryAcquire(sizeInKb + 1);
+ if (result) {
+ leftSemaphore.addAndGet(-sizeChange + leftSize);
+ }
+ return result;
+ } else {
+ boolean result = sizeSemaphore.tryAcquire(sizeInKb);
+ if (result) {
+ leftSemaphore.addAndGet(-sizeChange);
+ }
+ return result;
+ }
+ }
+
+ /**
+ * acquire
+ */
+ public void acquire(long sizeInByte) {
+ int sizeInKb = (int) (sizeInByte / leftSize);
+ int sizeChange = (int) (sizeInByte % leftSize);
+ if (leftSemaphore.get() - sizeChange < 0) {
+ sizeSemaphore.acquireUninterruptibly(sizeInKb + 1);
+ leftSemaphore.addAndGet(-sizeChange + leftSize);
+ } else {
+ sizeSemaphore.acquireUninterruptibly(sizeInKb);
+ leftSemaphore.addAndGet(-sizeChange);
+ }
+ }
+
+ /**
+ * release
+ */
+ public void release(long sizeInByte) {
+ int sizeInKb = (int) (sizeInByte / leftSize);
+ int sizeChange = (int) (sizeInByte % leftSize);
+ if (leftSemaphore.get() + sizeChange > leftSize) {
+ sizeSemaphore.release(sizeInKb + 1);
+ leftSemaphore.addAndGet(sizeChange - leftSize);
+ } else {
+ sizeSemaphore.release(sizeInKb);
+ leftSemaphore.addAndGet(sizeChange);
+ }
+ }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestClassResourceCommonPropertiesLoader.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestClassResourceCommonPropertiesLoader.java
new file mode 100644
index 0000000..63656ae
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestClassResourceCommonPropertiesLoader.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.loader;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Map;
+
+import org.apache.inlong.dataproxy.config.RemoteConfigManager;
+import org.apache.inlong.dataproxy.metrics.MetricListener;
+import org.junit.Test;
+
+/**
+ *
+ * TestClassResourceCommonPropertiesLoader
+ */
+public class TestClassResourceCommonPropertiesLoader {
+
+ /**
+ * testResult
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testResult() throws Exception {
+ // increase source
+ ClassResourceCommonPropertiesLoader loader = new ClassResourceCommonPropertiesLoader();
+ Map<String, String> props = loader.load();
+ assertEquals("proxy_inlong5th_sz", props.get(RemoteConfigManager.KEY_PROXY_CLUSTER_NAME));
+ assertEquals("DataProxy", props.get(MetricListener.KEY_METRIC_DOMAINS));
+ }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestContextCacheClusterConfigLoader.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestContextCacheClusterConfigLoader.java
new file mode 100644
index 0000000..44b38cf
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestContextCacheClusterConfigLoader.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.loader;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.flume.Context;
+import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * TestContextCacheClusterConfigLoader
+ */
+public class TestContextCacheClusterConfigLoader {
+
+ public static final Logger LOG = LoggerFactory.getLogger(TestContextCacheClusterConfigLoader.class);
+ private static Context context;
+ private static Context sinkContext;
+
+ /**
+ * setup
+ */
+ @BeforeClass
+ public static void setup() {
+ Map<String, String> result = new ConcurrentHashMap<>();
+ try (InputStream inStream = TestContextCacheClusterConfigLoader.class.getClassLoader().getResource("flume.conf")
+ .openStream()) {
+ Properties props = new Properties();
+ props.load(inStream);
+ for (Map.Entry<Object, Object> entry : props.entrySet()) {
+ result.put((String) entry.getKey(), (String) entry.getValue());
+ }
+ context = new Context(result);
+ sinkContext = new Context(context.getSubProperties("proxy_inlong5th_sz.sinks.pulsar-sink-more1."));
+ } catch (UnsupportedEncodingException e) {
+ LOG.error("fail to load properties, file ={}, and e= {}", "flume.conf", e);
+ } catch (Exception e) {
+ LOG.error("fail to load properties, file ={}, and e= {}", "flume.conf", e);
+ }
+ }
+
+ /**
+ * testResult
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testResult() throws Exception {
+ ContextCacheClusterConfigLoader loader = new ContextCacheClusterConfigLoader();
+ loader.configure(sinkContext);
+ List<CacheClusterConfig> configList = loader.load();
+ assertEquals(1, configList.size());
+ CacheClusterConfig config = configList.get(0);
+ assertEquals("cache_inlong5th_sz1", config.getClusterName());
+ assertEquals("http://tdmq.ap-gz.tencentyun.com:8080", config.getParams().get("serviceUrl"));
+ assertEquals("xxxx", config.getParams().get("authentication"));
+ }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestContextIdTopicConfigLoader.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestContextIdTopicConfigLoader.java
new file mode 100644
index 0000000..a1eae40
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/loader/TestContextIdTopicConfigLoader.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.config.loader;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.flume.Context;
+import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * TestContextCacheClusterConfigLoader
+ */
+public class TestContextIdTopicConfigLoader {
+
+ public static final Logger LOG = LoggerFactory.getLogger(TestContextIdTopicConfigLoader.class);
+ private static Context context;
+ private static Context sinkContext;
+
+ /**
+ * setup
+ */
+ @BeforeClass
+ public static void setup() {
+ Map<String, String> result = new ConcurrentHashMap<>();
+ try (InputStream inStream = TestContextIdTopicConfigLoader.class.getClassLoader().getResource("flume.conf")
+ .openStream()) {
+ Properties props = new Properties();
+ props.load(inStream);
+ for (Map.Entry<Object, Object> entry : props.entrySet()) {
+ result.put((String) entry.getKey(), (String) entry.getValue());
+ }
+ context = new Context(result);
+ sinkContext = new Context(context.getSubProperties("proxy_inlong5th_sz.sinks.pulsar-sink-more1."));
+ } catch (UnsupportedEncodingException e) {
+ LOG.error("fail to load properties, file ={}, and e= {}", "flume.conf", e);
+ } catch (Exception e) {
+ LOG.error("fail to load properties, file ={}, and e= {}", "flume.conf", e);
+ }
+ }
+
+ /**
+ * testResult
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testResult() throws Exception {
+ ContextIdTopicConfigLoader loader = new ContextIdTopicConfigLoader();
+ loader.configure(sinkContext);
+ List<IdTopicConfig> configList = loader.load();
+ assertEquals(2, configList.size());
+
+ for (IdTopicConfig config : configList) {
+ if ("03a00000026".equals(config.getInlongGroupId())) {
+ assertEquals("pulsar-9xn9wp35pbxb/test/topic1", config.getTopicName());
+ }
+ }
+ }
+}
diff --git a/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/TestMetricItemSetMBean.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/metrics/TestDataProxyMetricItemSet.java
similarity index 92%
copy from inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/TestMetricItemSetMBean.java
copy to inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/metrics/TestDataProxyMetricItemSet.java
index 4e70d63..2219eea 100644
--- a/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/TestMetricItemSetMBean.java
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/metrics/TestDataProxyMetricItemSet.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.commons.config.metrics.set;
+package org.apache.inlong.dataproxy.metrics;
import static org.junit.Assert.assertEquals;
@@ -39,9 +39,9 @@ import org.junit.Test;
*
* TestMetricItemSetMBean
*/
-public class TestMetricItemSetMBean {
+public class TestDataProxyMetricItemSet {
- public static final String SET_ID = "inlong5th_sz";
+ public static final String CLUSTER_ID = "inlong5th_sz";
public static final String CONTAINER_NAME = "2222.inlong.DataProxy.sz100001";
public static final String CONTAINER_IP = "127.0.0.1";
private static final String SOURCE_ID = "agent-source";
@@ -60,13 +60,11 @@ public class TestMetricItemSetMBean {
*/
@BeforeClass
public static void setup() {
- itemSet = DataProxyMetricItemSet.getInstance();
+ itemSet = new DataProxyMetricItemSet(CLUSTER_ID);
MetricRegister.register(itemSet);
// prepare
DataProxyMetricItem itemSource = new DataProxyMetricItem();
- itemSource.setId = SET_ID;
- itemSource.containerName = CONTAINER_NAME;
- itemSource.containerIp = CONTAINER_IP;
+ itemSource.clusterId = CLUSTER_ID;
itemSource.sourceId = SOURCE_ID;
itemSource.sourceDataId = SOURCE_DATA_ID;
itemSource.inlongGroupId = INLONG_GROUP_ID1;
@@ -74,9 +72,7 @@ public class TestMetricItemSetMBean {
dimSource = itemSource.getDimensions();
//
DataProxyMetricItem itemSink = new DataProxyMetricItem();
- itemSink.setId = SET_ID;
- itemSink.containerName = CONTAINER_NAME;
- itemSink.containerIp = CONTAINER_IP;
+ itemSink.clusterId = CLUSTER_ID;
itemSink.sinkId = SINK_ID;
itemSink.sinkDataId = SINK_DATA_ID;
itemSink.inlongGroupId = INLONG_GROUP_ID1;
@@ -123,8 +119,8 @@ public class TestMetricItemSetMBean {
final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
StringBuilder beanName = new StringBuilder();
beanName.append(MetricUtils.getDomain(DataProxyMetricItemSet.class)).append(MetricItemMBean.DOMAIN_SEPARATOR)
- .append("type=")
- .append(DataProxyMetricItemSet.class.toString());
+ .append("name=")
+ .append(itemSet.getName());
String strBeanName = beanName.toString();
ObjectName objName = new ObjectName(strBeanName);
{
diff --git a/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/TestMetricItemSetMBean.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/metrics/TestMetricListenerRunnable.java
similarity index 53%
copy from inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/TestMetricItemSetMBean.java
copy to inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/metrics/TestMetricListenerRunnable.java
index 4e70d63..0576200 100644
--- a/inlong-common/src/test/java/org/apache/inlong/commons/config/metrics/set/TestMetricItemSetMBean.java
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/metrics/TestMetricListenerRunnable.java
@@ -15,20 +15,14 @@
* limitations under the License.
*/
-package org.apache.inlong.commons.config.metrics.set;
+package org.apache.inlong.dataproxy.metrics;
import static org.junit.Assert.assertEquals;
-import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
-import org.apache.inlong.commons.config.metrics.MetricItem;
-import org.apache.inlong.commons.config.metrics.MetricItemMBean;
-import org.apache.inlong.commons.config.metrics.MetricItemSetMBean;
import org.apache.inlong.commons.config.metrics.MetricRegister;
import org.apache.inlong.commons.config.metrics.MetricUtils;
import org.apache.inlong.commons.config.metrics.MetricValue;
@@ -39,9 +33,9 @@ import org.junit.Test;
*
* TestMetricItemSetMBean
*/
-public class TestMetricItemSetMBean {
+public class TestMetricListenerRunnable {
- public static final String SET_ID = "inlong5th_sz";
+ public static final String CLUSTER_ID = "inlong5th_sz";
public static final String CONTAINER_NAME = "2222.inlong.DataProxy.sz100001";
public static final String CONTAINER_IP = "127.0.0.1";
private static final String SOURCE_ID = "agent-source";
@@ -54,19 +48,21 @@ public class TestMetricItemSetMBean {
private static DataProxyMetricItemSet itemSet;
private static Map<String, String> dimSource;
private static Map<String, String> dimSink;
+ private static String keySource1;
+ private static String keySource2;
+ private static String keySink1;
+ private static String keySink2;
/**
* setup
*/
@BeforeClass
public static void setup() {
- itemSet = DataProxyMetricItemSet.getInstance();
+ itemSet = new DataProxyMetricItemSet(CLUSTER_ID);
MetricRegister.register(itemSet);
// prepare
DataProxyMetricItem itemSource = new DataProxyMetricItem();
- itemSource.setId = SET_ID;
- itemSource.containerName = CONTAINER_NAME;
- itemSource.containerIp = CONTAINER_IP;
+ itemSource.clusterId = CLUSTER_ID;
itemSource.sourceId = SOURCE_ID;
itemSource.sourceDataId = SOURCE_DATA_ID;
itemSource.inlongGroupId = INLONG_GROUP_ID1;
@@ -74,9 +70,7 @@ public class TestMetricItemSetMBean {
dimSource = itemSource.getDimensions();
//
DataProxyMetricItem itemSink = new DataProxyMetricItem();
- itemSink.setId = SET_ID;
- itemSink.containerName = CONTAINER_NAME;
- itemSink.containerIp = CONTAINER_IP;
+ itemSink.clusterId = CLUSTER_ID;
itemSink.sinkId = SINK_ID;
itemSink.sinkDataId = SINK_DATA_ID;
itemSink.inlongGroupId = INLONG_GROUP_ID1;
@@ -89,7 +83,6 @@ public class TestMetricItemSetMBean {
*
* @throws Exception
*/
- @SuppressWarnings("unchecked")
@Test
public void testResult() throws Exception {
// increase source
@@ -97,20 +90,20 @@ public class TestMetricItemSetMBean {
item = itemSet.findMetricItem(dimSource);
item.readSuccessCount.incrementAndGet();
item.readSuccessSize.addAndGet(100);
- String keySource1 = MetricUtils.getDimensionsKey(dimSource);
+ keySource1 = MetricUtils.getDimensionsKey(dimSource);
//
dimSource.put("inlongGroupId", INLONG_GROUP_ID2);
item = itemSet.findMetricItem(dimSource);
item.readFailCount.addAndGet(20);
item.readFailSize.addAndGet(2000);
- String keySource2 = MetricUtils.getDimensionsKey(dimSource);
+ keySource2 = MetricUtils.getDimensionsKey(dimSource);
// increase sink
item = itemSet.findMetricItem(dimSink);
item.sendCount.incrementAndGet();
item.sendSize.addAndGet(100);
item.sendSuccessCount.incrementAndGet();
item.sendSuccessSize.addAndGet(100);
- String keySink1 = MetricUtils.getDimensionsKey(dimSink);
+ keySink1 = MetricUtils.getDimensionsKey(dimSink);
//
dimSink.put("inlongGroupId", INLONG_GROUP_ID2);
item = itemSet.findMetricItem(dimSink);
@@ -118,43 +111,41 @@ public class TestMetricItemSetMBean {
item.sendSize.addAndGet(2000);
item.sendFailCount.addAndGet(20);
item.sendFailSize.addAndGet(2000);
- String keySink2 = MetricUtils.getDimensionsKey(dimSink);
+ keySink2 = MetricUtils.getDimensionsKey(dimSink);
// report
- final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
- StringBuilder beanName = new StringBuilder();
- beanName.append(MetricUtils.getDomain(DataProxyMetricItemSet.class)).append(MetricItemMBean.DOMAIN_SEPARATOR)
- .append("type=")
- .append(DataProxyMetricItemSet.class.toString());
- String strBeanName = beanName.toString();
- ObjectName objName = new ObjectName(strBeanName);
- {
- List<MetricItem> items = (List<MetricItem>) mbs.invoke(objName, MetricItemSetMBean.METHOD_SNAPSHOT, null,
- null);
- for (MetricItem itemObj : items) {
- if (keySource1.equals(itemObj.getDimensionsKey())) {
- Map<String, MetricValue> metricMap = itemObj.snapshot();
- assertEquals(1, metricMap.get("readSuccessCount").value);
- assertEquals(100, metricMap.get("readSuccessSize").value);
- } else if (keySource2.equals(itemObj.getDimensionsKey())) {
- Map<String, MetricValue> metricMap = itemObj.snapshot();
- assertEquals(20, metricMap.get("readFailCount").value);
- assertEquals(2000, metricMap.get("readFailSize").value);
- } else if (keySink1.equals(itemObj.getDimensionsKey())) {
- Map<String, MetricValue> metricMap = itemObj.snapshot();
- assertEquals(1, metricMap.get("sendCount").value);
- assertEquals(100, metricMap.get("sendSize").value);
- assertEquals(1, metricMap.get("sendSuccessCount").value);
- assertEquals(100, metricMap.get("sendSuccessSize").value);
- } else if (keySink2.equals(itemObj.getDimensionsKey())) {
- Map<String, MetricValue> metricMap = itemObj.snapshot();
- assertEquals(20, metricMap.get("sendCount").value);
- assertEquals(2000, metricMap.get("sendSize").value);
- assertEquals(20, metricMap.get("sendFailCount").value);
- assertEquals(2000, metricMap.get("sendFailSize").value);
- } else {
- System.out.println("bad MetricItem:" + itemObj.getDimensionsKey());
+ MetricListener listener = new MetricListener() {
+
+ @Override
+ public void snapshot(String domain, List<MetricItemValue> itemValues) {
+ assertEquals("DataProxy", domain);
+ for (MetricItemValue itemValue : itemValues) {
+ String key = itemValue.getKey();
+ Map<String, MetricValue> metricMap = itemValue.getMetrics();
+ if (keySource1.equals(itemValue.getKey())) {
+ assertEquals(1, metricMap.get("readSuccessCount").value);
+ assertEquals(100, metricMap.get("readSuccessSize").value);
+ } else if (keySource2.equals(key)) {
+ assertEquals(20, metricMap.get("readFailCount").value);
+ assertEquals(2000, metricMap.get("readFailSize").value);
+ } else if (keySink1.equals(key)) {
+ assertEquals(1, metricMap.get("sendCount").value);
+ assertEquals(100, metricMap.get("sendSize").value);
+ assertEquals(1, metricMap.get("sendSuccessCount").value);
+ assertEquals(100, metricMap.get("sendSuccessSize").value);
+ } else if (keySink2.equals(key)) {
+ assertEquals(20, metricMap.get("sendCount").value);
+ assertEquals(2000, metricMap.get("sendSize").value);
+ assertEquals(20, metricMap.get("sendFailCount").value);
+ assertEquals(2000, metricMap.get("sendFailSize").value);
+ } else {
+ System.out.println("bad MetricItem:" + key);
+ }
}
}
- }
+ };
+ List<MetricListener> listeners = new ArrayList<>();
+ listeners.add(listener);
+ MetricListenerRunnable runnable = new MetricListenerRunnable("DataProxy", listeners);
+ runnable.run();
}
}
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/pulsar/federation/TestPulsarFederationSink.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/pulsar/federation/TestPulsarFederationSink.java
new file mode 100644
index 0000000..847f218
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/pulsar/federation/TestPulsarFederationSink.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.sink.pulsar.federation;
+
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.inlong.dataproxy.config.loader.TestContextIdTopicConfigLoader;
+import org.apache.inlong.dataproxy.utils.MockUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * TestPulsarFederationSink
+ */
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore("javax.management.*")
+public class TestPulsarFederationSink {
+
+ public static final Logger LOG = LoggerFactory.getLogger(TestContextIdTopicConfigLoader.class);
+ public static Context context;
+ public static Context sinkContext;
+ public static PulsarFederationSink sinkObj;
+
+ /**
+ * setup
+ */
+ @BeforeClass
+ public static void setUp() {
+ Map<String, String> result = new ConcurrentHashMap<>();
+ try (InputStream inStream = TestPulsarFederationSink.class.getClassLoader().getResource("flume.conf")
+ .openStream()) {
+ Properties props = new Properties();
+ props.load(inStream);
+ for (Map.Entry<Object, Object> entry : props.entrySet()) {
+ result.put((String) entry.getKey(), (String) entry.getValue());
+ }
+ context = new Context(result);
+ sinkContext = new Context(context.getSubProperties("proxy_inlong5th_sz.sinks.pulsar-sink-more1."));
+ } catch (UnsupportedEncodingException e) {
+ LOG.error("fail to load properties, file ={}, and e= {}", "flume.conf", e);
+ } catch (Exception e) {
+ LOG.error("fail to load properties, file ={}, and e= {}", "flume.conf", e);
+ }
+ //
+ sinkObj = new PulsarFederationSink();
+ sinkObj.configure(sinkContext);
+ }
+
+ /**
+ * testResult
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testResult() throws Exception {
+ // mock
+ Channel channel = MockUtils.mockChannel();
+ sinkObj.setChannel(channel);
+ //
+ sinkObj.process();
+ }
+
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/pulsar/federation/TestPulsarProducerFederation.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/pulsar/federation/TestPulsarProducerFederation.java
new file mode 100644
index 0000000..06f3776
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/sink/pulsar/federation/TestPulsarProducerFederation.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.sink.pulsar.federation;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.inlong.dataproxy.config.loader.TestContextIdTopicConfigLoader;
+import org.apache.inlong.dataproxy.utils.MockUtils;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * TestPulsarProducerFederation
+ */
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore("javax.management.*")
+@PrepareForTest({PulsarClient.class, ClientBuilder.class, MessageId.class,
+ Producer.class, ProducerBuilder.class, TypedMessageBuilder.class})
+public class TestPulsarProducerFederation {
+
+ public static final Logger LOG = LoggerFactory.getLogger(TestContextIdTopicConfigLoader.class);
+ public static Context context;
+ public static Context sinkContext;
+
+ /**
+ * setup
+ */
+ @BeforeClass
+ public static void setUp() {
+ Map<String, String> result = new ConcurrentHashMap<>();
+ try (InputStream inStream = TestPulsarFederationSink.class.getClassLoader().getResource("flume.conf")
+ .openStream()) {
+ Properties props = new Properties();
+ props.load(inStream);
+ for (Map.Entry<Object, Object> entry : props.entrySet()) {
+ result.put((String) entry.getKey(), (String) entry.getValue());
+ }
+ context = new Context(result);
+ sinkContext = new Context(context.getSubProperties("proxy_inlong5th_sz.sinks.pulsar-sink-more1."));
+ MockUtils.mockPulsarClient();
+ } catch (UnsupportedEncodingException e) {
+ LOG.error("fail to load properties, file ={}, and e= {}", "flume.conf", e);
+ } catch (Exception e) {
+ LOG.error("fail to load properties, file ={}, and e= {}", "flume.conf", e);
+ }
+ }
+
+ /**
+ * testResult
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testResult() throws Exception {
+ String workerName = "workerName";
+ PulsarFederationSinkContext pulsarContext = new PulsarFederationSinkContext(MockUtils.SINK_ID, sinkContext);
+ PulsarProducerFederation federation = new PulsarProducerFederation(workerName, pulsarContext);
+ federation.start();
+ Event event = MockUtils.mockEvent();
+ boolean result = federation.send(event);
+ assertEquals(true, result);
+ }
+
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/utils/MockUtils.java b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/utils/MockUtils.java
new file mode 100644
index 0000000..beaf1e0
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/utils/MockUtils.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.dataproxy.utils;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.event.EventBuilder;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.powermock.api.mockito.PowerMockito;
+
+/**
+ *
+ * MockUtils
+ */
+public class MockUtils {
+
+ public static final String CLUSTER_ID = "proxy_inlong5th_sz";
+ public static final String CONTAINER_NAME = "2222.inlong.DataProxy.sz100001";
+ public static final String CONTAINER_IP = "127.0.0.1";
+ public static final String SOURCE_ID = "agent-source";
+ public static final String SOURCE_DATA_ID = "12069";
+ public static final String INLONG_GROUP_ID1 = "03a00000026";
+ public static final String INLONG_GROUP_ID2 = "03a00000126";
+ public static final String INLONG_STREAM_ID = "";
+ public static final String SINK_ID = "inlong5th-pulsar-sz";
+ public static final String SINK_DATA_ID = "PULSAR_TOPIC_1";
+
+ /**
+ * mockChannel
+ *
+ * @return
+ * @throws Exception
+ */
+ public static Channel mockChannel() throws Exception {
+ Transaction transaction = PowerMockito.mock(Transaction.class);
+ Channel channel = PowerMockito.mock(Channel.class);
+ PowerMockito.when(channel.getTransaction()).thenReturn(transaction);
+ PowerMockito.doNothing().when(transaction, "begin");
+ PowerMockito.doNothing().when(transaction, "commit");
+ PowerMockito.doNothing().when(transaction, "rollback");
+ PowerMockito.doNothing().when(transaction, "close");
+ Event event = mockEvent();
+ PowerMockito.when(channel.take()).thenReturn(event);
+ return channel;
+ }
+
+ /**
+ * mockEvent
+ *
+ * @return
+ * @throws Exception
+ */
+ public static Event mockEvent() throws Exception {
+ Map<String, String> headers = new HashMap<>();
+ String sourceTime = String.valueOf(System.currentTimeMillis());
+ headers.put(Constants.HEADER_KEY_MSG_TIME, String.valueOf(sourceTime));
+ headers.put(Constants.HEADER_KEY_SOURCE_IP, CONTAINER_IP);
+ headers.put(Constants.HEADER_KEY_SOURCE_TIME, sourceTime);
+ headers.put(Constants.INLONG_GROUP_ID, INLONG_GROUP_ID1);
+ headers.put(Constants.TOPIC, SINK_DATA_ID);
+ byte[] body = "testContent".getBytes();
+ Event event = EventBuilder.withBody(body, headers);
+ return event;
+ }
+
+ /**
+ * mockPulsarClient
+ *
+ * @return
+ * @throws Exception
+ */
+ @SuppressWarnings("unchecked")
+ public static PulsarClient mockPulsarClient() throws Exception {
+ ClientBuilder clientBuilder = PowerMockito.mock(ClientBuilder.class);
+ PowerMockito.mockStatic(PulsarClient.class);
+ //
+ PowerMockito.when(PulsarClient.builder()).thenReturn(clientBuilder);
+ PowerMockito.when(clientBuilder.serviceUrl(anyString())).thenReturn(clientBuilder);
+ PowerMockito.when(clientBuilder.authentication(any())).thenReturn(clientBuilder);
+ PulsarClient client = PowerMockito.mock(PulsarClient.class);
+ PowerMockito.when(clientBuilder.build()).thenReturn(client);
+ //
+ ProducerBuilder<byte[]> producerBuilder = PowerMockito.mock(ProducerBuilder.class);
+ PowerMockito.when(client.newProducer()).thenReturn(producerBuilder);
+ PowerMockito.when(producerBuilder.hashingScheme(any())).thenReturn(producerBuilder);
+ PowerMockito.when(producerBuilder.enableBatching(anyBoolean())).thenReturn(producerBuilder);
+ PowerMockito.when(producerBuilder.batchingMaxBytes(anyInt())).thenReturn(producerBuilder);
+ PowerMockito.when(producerBuilder.batchingMaxMessages(anyInt())).thenReturn(producerBuilder);
+ PowerMockito.when(producerBuilder.batchingMaxPublishDelay(anyInt(), any())).thenReturn(producerBuilder);
+ PowerMockito.when(producerBuilder.maxPendingMessages(anyInt())).thenReturn(producerBuilder);
+ PowerMockito.when(producerBuilder.maxPendingMessagesAcrossPartitions(anyInt())).thenReturn(producerBuilder);
+ PowerMockito.when(producerBuilder.sendTimeout(anyInt(), any())).thenReturn(producerBuilder);
+ PowerMockito.when(producerBuilder.compressionType(any())).thenReturn(producerBuilder);
+ PowerMockito.when(producerBuilder.blockIfQueueFull(anyBoolean())).thenReturn(producerBuilder);
+ PowerMockito.when(producerBuilder.roundRobinRouterBatchingPartitionSwitchFrequency(anyInt()))
+ .thenReturn(producerBuilder);
+ PowerMockito.when(producerBuilder.batcherBuilder(any())).thenReturn(producerBuilder);
+ //
+ Producer<byte[]> producer = mockProducer();
+ PowerMockito.when(producerBuilder.clone()).thenReturn(producerBuilder);
+ PowerMockito.when(producerBuilder.topic(anyString())).thenReturn(producerBuilder);
+ PowerMockito.when(producerBuilder.producerName(anyString())).thenReturn(producerBuilder);
+ PowerMockito.when(producerBuilder.create()).thenReturn(producer);
+ return client;
+ }
+
+ /**
+ * mockProducer
+ *
+ * @return
+ * @throws Exception
+ */
+ @SuppressWarnings({"unchecked"})
+ public static Producer<byte[]> mockProducer() throws Exception {
+ Producer<byte[]> producer = PowerMockito.mock(Producer.class);
+ PowerMockito.doNothing().when(producer, "close");
+ TypedMessageBuilder<byte[]> msgBuilder = PowerMockito.mock(TypedMessageBuilder.class);
+ PowerMockito.when(producer.newMessage()).thenReturn(msgBuilder);
+ PowerMockito.when(msgBuilder.key(anyString())).thenReturn(msgBuilder);
+ PowerMockito.when(msgBuilder.properties(any())).thenReturn(msgBuilder);
+ PowerMockito.when(msgBuilder.value(any())).thenReturn(msgBuilder);
+ CompletableFuture<MessageId> future = PowerMockito.mock(CompletableFuture.class);
+ PowerMockito.when(future.whenCompleteAsync(any())).thenReturn(future);
+ PowerMockito.when(msgBuilder.sendAsync()).thenReturn(future);
+ return producer;
+ }
+}
diff --git a/inlong-dataproxy/dataproxy-source/src/test/resources/common.properties b/inlong-dataproxy/dataproxy-source/src/test/resources/common.properties
new file mode 100644
index 0000000..4de61c1
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/test/resources/common.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+proxy_cluster_name=proxy_inlong5th_sz
+metricDomains=DataProxy
+metricDomains.DataProxy.domainListeners=org.apache.inlong.dataproxy.metrics.prometheus.PrometheusMetricListener
+metricDomains.DataProxy.snapshotInterval=60000
\ No newline at end of file
diff --git a/inlong-dataproxy/dataproxy-source/src/test/resources/flume.conf b/inlong-dataproxy/dataproxy-source/src/test/resources/flume.conf
new file mode 100644
index 0000000..d472f51
--- /dev/null
+++ b/inlong-dataproxy/dataproxy-source/src/test/resources/flume.conf
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+proxy_inlong5th_sz.channels=ch-msg1
+proxy_inlong5th_sz.sinks=pulsar-sink-more1
+proxy_inlong5th_sz.sources=agent-source sdk-source
+
+proxy_inlong5th_sz.channels.ch-msg1.capacity=200000
+proxy_inlong5th_sz.channels.ch-msg1.keep-alive=0
+proxy_inlong5th_sz.channels.ch-msg1.transactionCapacity=2000
+proxy_inlong5th_sz.channels.ch-msg1.type=memory
+
+proxy_inlong5th_sz.sinks.pulsar-sink-more1.channel=ch-msg1
+proxy_inlong5th_sz.sinks.pulsar-sink-more1.type=org.apache.inlong.dataproxy.sink.pulsar.federation.PulsarFederationSink
+proxy_inlong5th_sz.sinks.pulsar-sink-more1.maxBufferQueueSize=131072
+proxy_inlong5th_sz.sinks.pulsar-sink-more1.reloadInterval=60000
+#context
+proxy_inlong5th_sz.sinks.pulsar-sink-more1.idTopicConfig.type=org.apache.inlong.dataproxy.config.loader.ContextIdTopicConfigLoader
+proxy_inlong5th_sz.sinks.pulsar-sink-more1.idTopicConfig.03a00000026=pulsar-9xn9wp35pbxb/test/topic1
+#context
+proxy_inlong5th_sz.sinks.pulsar-sink-more1.cacheClusterConfig.type=org.apache.inlong.dataproxy.config.loader.ContextCacheClusterConfigLoader
+proxy_inlong5th_sz.sinks.pulsar-sink-more1.cacheClusterConfig=cache_inlong5th_sz1
+proxy_inlong5th_sz.sinks.pulsar-sink-more1.cacheClusterConfig.cache_inlong5th_sz1.serviceUrl=http://tdmq.ap-gz.tencentyun.com:8080
+proxy_inlong5th_sz.sinks.pulsar-sink-more1.cacheClusterConfig.cache_inlong5th_sz1.authentication=xxxx
+
+proxy_inlong5th_sz.sinks.pulsar-sink-more1.producer.enableBatching=true
+proxy_inlong5th_sz.sinks.pulsar-sink-more1.producer.batchingMaxBytes=5242880
+proxy_inlong5th_sz.sinks.pulsar-sink-more1.producer.batchingMaxMessages=3000
+proxy_inlong5th_sz.sinks.pulsar-sink-more1.producer.batchingMaxPublishDelay=1
+proxy_inlong5th_sz.sinks.pulsar-sink-more1.producer.maxPendingMessages=1000
+proxy_inlong5th_sz.sinks.pulsar-sink-more1.producer.maxPendingMessagesAcrossPartitions=50000
+proxy_inlong5th_sz.sinks.pulsar-sink-more1.producer.sendTimeout=0
+proxy_inlong5th_sz.sinks.pulsar-sink-more1.producer.compressionType=SNAPPY
+proxy_inlong5th_sz.sinks.pulsar-sink-more1.producer.blockIfQueueFull=true
+proxy_inlong5th_sz.sinks.pulsar-sink-more1.producer.roundRobinRouterBatchingPartitionSwitchFrequency=10
+
+proxy_inlong5th_sz.sources.agent-source.type=com.tencent.pcg.atta.dataproxy.source.agent.SimpleTcpSource
+proxy_inlong5th_sz.sources.agent-source.selector.type=org.apache.flume.channel.ReplicatingChannelSelector
+proxy_inlong5th_sz.sources.agent-source.channels=ch-msg1
+proxy_inlong5th_sz.sources.agent-source.tcpNoDelay=true
+proxy_inlong5th_sz.sources.agent-source.keepAlive=true
+proxy_inlong5th_sz.sources.agent-source.highWaterMark=65536
+proxy_inlong5th_sz.sources.agent-source.receiveBufferSize=65536
+proxy_inlong5th_sz.sources.agent-source.sendBufferSize=65536
+proxy_inlong5th_sz.sources.agent-source.trafficClass=0
+proxy_inlong5th_sz.sources.agent-source.max-threads=32
+proxy_inlong5th_sz.sources.agent-source.connections=5000
+proxy_inlong5th_sz.sources.agent-source.msg-factory-name=com.tencent.pcg.atta.dataproxy.source.agent.ServerMessageFactory
+proxy_inlong5th_sz.sources.agent-source.message-handler-name=com.tencent.pcg.atta.dataproxy.source.agent.ServerMessageHandler
+proxy_inlong5th_sz.sources.agent-source.max-msg-length=10485760
+proxy_inlong5th_sz.sources.agent-source.reloadInterval=60000
+proxy_inlong5th_sz.sources.agent-source.idTopicConfig.type=org.apache.inlong.dataproxy.config.loader.ContextIdTopicConfigLoader
+proxy_inlong5th_sz.sources.agent-source.idTopicConfig.03a00000026=pulsar-9xn9wp35pbxb/test/topic1
+
+proxy_inlong5th_sz.sources.sdk-source.type=com.tencent.pcg.atta.dataproxy.source.sdk.SimpleTcpSource
+proxy_inlong5th_sz.sources.sdk-source.selector.type=org.apache.flume.channel.ReplicatingChannelSelector
+proxy_inlong5th_sz.sources.sdk-source.channels=ch-msg1
+proxy_inlong5th_sz.sources.sdk-source.tcpNoDelay=true
+proxy_inlong5th_sz.sources.sdk-source.keepAlive=true
+proxy_inlong5th_sz.sources.sdk-source.highWaterMark=65536
+proxy_inlong5th_sz.sources.sdk-source.receiveBufferSize=65536
+proxy_inlong5th_sz.sources.sdk-source.sendBufferSize=65536
+proxy_inlong5th_sz.sources.sdk-source.trafficClass=0
+proxy_inlong5th_sz.sources.sdk-source.max-threads=32
+proxy_inlong5th_sz.sources.sdk-source.connections=5000
+proxy_inlong5th_sz.sources.sdk-source.msg-factory-name=com.tencent.pcg.atta.dataproxy.source.sdk.ServerMessageFactory
+proxy_inlong5th_sz.sources.sdk-source.message-handler-name=com.tencent.pcg.atta.dataproxy.source.sdk.ServerMessageHandler
+proxy_inlong5th_sz.sources.sdk-source.max-msg-length=10485760
+proxy_inlong5th_sz.sources.sdk-source.reloadInterval=60000
+proxy_inlong5th_sz.sources.sdk-source.idTopicConfig.type=org.apache.inlong.dataproxy.config.loader.ContextIdTopicConfigLoader
+proxy_inlong5th_sz.sources.sdk-source.idTopicConfig.03a00000026=pulsar-9xn9wp35pbxb/test/topic1