You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/12/09 04:39:59 UTC
[incubator-inlong] branch master updated: [INLONG-1891] Inlong-Sort-Standalone add sort-standalone-source module. (#1930)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new cc35ea4 [INLONG-1891] Inlong-Sort-Standalone add sort-standalone-source module. (#1930)
cc35ea4 is described below
commit cc35ea43ddbf62bc73e1049c518039e832055193
Author: 卢春亮 <94...@qq.com>
AuthorDate: Thu Dec 9 12:39:50 2021 +0800
[INLONG-1891] Inlong-Sort-Standalone add sort-standalone-source module. (#1930)
---
inlong-sort-standalone/pom.xml | 5 +-
.../sort-standalone-common/pom.xml | 28 ++-
.../config/holder/SortClusterConfigHolder.java | 1 -
.../config/pojo/SortClusterResponse.java | 108 ----------
.../standalone/config/pojo/type/CacheType.java | 4 +-
.../sort/standalone/config/pojo/type/DataType.java | 4 +-
.../sort/standalone/config/pojo/type/SortType.java | 4 +-
.../sort/standalone/utils/InlongLoggerFactory.java | 22 ---
.../sort-standalone-source/pom.xml | 55 ++----
.../PropertiesConfigurationProvider.java | 64 ++++++
.../apache/inlong/sort/standalone/SortCluster.java | 135 +++++++++++++
.../sort/standalone/SortStandaloneApplication.java | 23 ++-
.../apache/inlong/sort/standalone/SortTask.java | 212 ++++++++++++++++++++
.../standalone/channel/BufferQueueChannel.java | 180 +++++++++++++++++
.../sort/standalone/channel/ProfileEvent.java | 121 ++++++++++++
.../standalone/channel/ProfileTransaction.java | 110 +++++++++++
.../sort/standalone/dispatch/DispatchManager.java | 157 +++++++++++++++
.../sort/standalone/dispatch/DispatchProfile.java | 173 ++++++++++++++++
.../inlong/sort/standalone/sink/SinkContext.java | 220 +++++++++++++++++++++
19 files changed, 1441 insertions(+), 185 deletions(-)
diff --git a/inlong-sort-standalone/pom.xml b/inlong-sort-standalone/pom.xml
index c329ff1..0672876 100644
--- a/inlong-sort-standalone/pom.xml
+++ b/inlong-sort-standalone/pom.xml
@@ -44,6 +44,7 @@
<plugin.assembly.version>3.2.0</plugin.assembly.version>
<pulsar.version>2.7.2</pulsar.version>
<junit.version>4.13</junit.version>
+ <powermock.version>2.0.2</powermock.version>
<guava.version>19.0</guava.version>
<skipTests>false</skipTests>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@@ -85,13 +86,13 @@
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
- <version>2.0.2</version>
+ <version>${powermock.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito2</artifactId>
- <version>2.0.2</version>
+ <version>${powermock.version}</version>
<scope>test</scope>
</dependency>
<dependency>
diff --git a/inlong-sort-standalone/sort-standalone-common/pom.xml b/inlong-sort-standalone/sort-standalone-common/pom.xml
index 4c99a28..c4fdccc 100644
--- a/inlong-sort-standalone/sort-standalone-common/pom.xml
+++ b/inlong-sort-standalone/sort-standalone-common/pom.xml
@@ -1,14 +1,22 @@
<?xml version="1.0" encoding="UTF-8"?>
-<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
- license agreements. See the NOTICE file distributed with this work for additional
- information regarding copyright ownership. The ASF licenses this file to
- you under the Apache License, Version 2.0 (the "License"); you may not use
- this file except in compliance with the License. You may obtain a copy of
- the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
- by applicable law or agreed to in writing, software distributed under the
- License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
- OF ANY KIND, either express or implied. See the License for the specific
- language governing permissions and limitations under the License. -->
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/SortClusterConfigHolder.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/SortClusterConfigHolder.java
index 67b4b6a..d593db1 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/SortClusterConfigHolder.java
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/SortClusterConfigHolder.java
@@ -78,7 +78,6 @@ public final class SortClusterConfigHolder {
}
} catch (Throwable t) {
LOG.error("Fail to init loader,loaderType:{},error:{}", loaderType, t.getMessage());
- LOG.error(t.getMessage(), t);
}
if (instance.loader == null) {
instance.loader = new ClassResourceSortClusterConfigLoader();
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortClusterResponse.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortClusterResponse.java
index a04c24f..3d0faa6 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortClusterResponse.java
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortClusterResponse.java
@@ -104,112 +104,4 @@ public class SortClusterResponse {
public void setData(SortClusterConfig data) {
this.data = data;
}
-//
-// /**
-// * generateTdbankConfig
-// *
-// * @return
-// */
-// public static SortClusterConfig generateTdbankConfig() {
-// SortClusterConfig clusterConfig = new SortClusterConfig();
-// clusterConfig.setClusterName("tdbankv3-sz-sz1");
-// //
-// List<SortTaskConfig> sortTasks = new ArrayList<>();
-// clusterConfig.setSortTasks(sortTasks);
-// SortTaskConfig taskConfig = new SortTaskConfig();
-// sortTasks.add(taskConfig);
-// taskConfig.setName("sid_tdbank_atta6th_v3");
-// taskConfig.setType(SortType.TQTDBANK);
-// //
-// Map<String, String> sinkParams = new HashMap<>();
-// taskConfig.setSinkParams(sinkParams);
-// sinkParams.put("b_pcg_venus_szrecone_124_153_utf8", "10.56.15.195:46801,10.56.15.212:46801,"
-// + "10.56.15.220:46801,10.56.15.221:46801,"
-// + "10.56.15.230:46801,10.56.16.20:46801,10.56.16.38:46801,10.56.20.21:46801,10.56.20.80:46801,"
-// + "10.56.20.85:46801,10.56.209.205:46801,10.56.21.17:46801,10.56.21.20:46801,10.56.21.79:46801,"
-// + "10.56.21.85:46801,10.56.81.205:46801,10.56.81.211:46801,10.56.82.11:46801,10.56.82.12:46801,"
-// + "10.56.82.37:46801,10.56.82.38:46801,10.56.82.40:46801,10.56.83.143:46801,10.56.83.80:46801,"
-// + "10.56.84.17:46801");
-// //
-// List<Map<String, String>> idParams = new ArrayList<>();
-// Map<String, String> idParam = new HashMap<>();
-// idParams.add(idParam);
-// taskConfig.setIdParams(idParams);
-// idParam.put(Constants.INLONG_GROUP_ID, "0fc00000046");
-// idParam.put(Constants.INLONG_STREAM_ID, "");
-// idParam.put(TdbankConfig.KEY_BID, "b_pcg_venus_szrecone_124_153_utf8");
-// idParam.put(TdbankConfig.KEY_TID, "t_sh_atta_v2_0fc00000046");
-// idParam.put(TdbankConfig.KEY_DATA_TYPE, TdbankConfig.DATA_TYPE_ATTA_TEXT);
-// return clusterConfig;
-// }
-//
-// /**
-// * generateCdmqConfig
-// *
-// * @return
-// */
-// public static SortClusterConfig generateCdmqConfig() {
-// SortClusterConfig clusterConfig = new SortClusterConfig();
-// clusterConfig.setClusterName("cdmqv3-sz-sz1");
-// //
-// List<SortTaskConfig> sortTasks = new ArrayList<>();
-// clusterConfig.setSortTasks(sortTasks);
-// SortTaskConfig taskConfig = new SortTaskConfig();
-// sortTasks.add(taskConfig);
-// taskConfig.setName("sid_cdmq_kg_videorequest_v3");
-// taskConfig.setType(SortType.CDMQ);
-// //
-// Map<String, String> sinkParams = new HashMap<>();
-// taskConfig.setSinkParams(sinkParams);
-// sinkParams.put("cdmqAccessPoint", "cdmqszentry01.data.mig:10005,cdmqszentry05.data.mig:10033");
-// sinkParams.put("cdmqClusterId", "kg_videorequest");
-// sinkParams.put("clientId", "p_video_atta_196");
-// sinkParams.put("batchSize", "122880");
-// sinkParams.put("maxRequestSize", "8388608");
-// sinkParams.put("lingerMs", "150");
-// //
-// List<Map<String, String>> idParams = new ArrayList<>();
-// Map<String, String> idParam = new HashMap<>();
-// idParams.add(idParam);
-// taskConfig.setIdParams(idParams);
-// idParam.put(Constants.INLONG_GROUP_ID, "0fc00000046");
-// idParam.put(Constants.TOPIC, "U_TOPIC_0fc00000046");
-// return clusterConfig;
-// }
-//
-// /**
-// * main
-// *
-// * @param args
-// */
-// public static void main(String[] args) {
-// // tdbank
-// {
-// SortClusterConfig config = generateTdbankConfig();
-// String configString = JSON.toJSONString(config, false);
-// System.out.println("tdbank:" + configString);
-// String md5 = DigestUtils.md5Hex(configString);
-// SortClusterResponse response = new SortClusterResponse();
-// response.setResult(true);
-// response.setErrCode(SUCC);
-// response.setMd5(md5);
-// response.setData(config);
-// String responseString = JSON.toJSONString(response, true);
-// System.out.println("tdbank responseString:" + responseString);
-// }
-// // cdmq
-// {
-// SortClusterConfig config = generateCdmqConfig();
-// String configString = JSON.toJSONString(config, false);
-// System.out.println("cdmq:" + configString);
-// String md5 = DigestUtils.md5Hex(configString);
-// SortClusterResponse response = new SortClusterResponse();
-// response.setResult(true);
-// response.setErrCode(SUCC);
-// response.setMd5(md5);
-// response.setData(config);
-// String responseString = JSON.toJSONString(response, true);
-// System.out.println("cdmq responseString:" + responseString);
-// }
-// }
}
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/CacheType.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/CacheType.java
index 963dc6e..845ccaa 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/CacheType.java
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/CacheType.java
@@ -22,7 +22,7 @@ package org.apache.inlong.sort.standalone.config.pojo.type;
*/
public enum CacheType {
- TUBE("tube"), KAFKA("kafka"), PULSAR("pulsar"), N("n");
+ TUBE("tube"), KAFKA("kafka"), PULSAR("pulsar"), UNKNOWN("n");
private final String value;
@@ -65,6 +65,6 @@ public enum CacheType {
return v;
}
}
- return N;
+ return UNKNOWN;
}
}
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/DataType.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/DataType.java
index 882e29e..d5a7d64 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/DataType.java
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/DataType.java
@@ -22,7 +22,7 @@ package org.apache.inlong.sort.standalone.config.pojo.type;
*/
public enum DataType {
- TEXT("text"), PB("pb"), JCE("jce"), N("n");
+ TEXT("text"), PB("pb"), JCE("jce"), UNKNOWN("n");
private final String value;
@@ -65,6 +65,6 @@ public enum DataType {
return v;
}
}
- return N;
+ return UNKNOWN;
}
}
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/SortType.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/SortType.java
index e066928..6444821 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/SortType.java
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/SortType.java
@@ -24,7 +24,7 @@ package org.apache.inlong.sort.standalone.config.pojo.type;
public enum SortType {
HIVE("hive"), TUBE("tube"), KAFKA("kafka"), PULSAR("pulsar"), ElasticSearch("ElasticSearch"), THTDBANK(
- "thtdbank"), TQTDBANK("tqtdbank"), CDMQ("cdmq"), N("n");
+ "thtdbank"), TQTDBANK("tqtdbank"), CDMQ("cdmq"), UNKNOWN("n");
private final String value;
@@ -67,6 +67,6 @@ public enum SortType {
return v;
}
}
- return N;
+ return UNKNOWN;
}
}
\ No newline at end of file
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/InlongLoggerFactory.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/InlongLoggerFactory.java
index d4bdea7..274483a 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/InlongLoggerFactory.java
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/InlongLoggerFactory.java
@@ -60,26 +60,4 @@ public class InlongLoggerFactory {
String namePrefix = className.substring(0, index);
return namePrefix;
}
-
-// /**
-// * main
-// * @param args
-// */
-// public static void main(String[] args) {
-// int layer = 3;
-// String className = "";
-// System.out.println(className + ":" + getClassNamePrefix(className, layer));
-// className = "ccc";
-// System.out.println(className + ":" + getClassNamePrefix(className, layer));
-// className = "org.ccc";
-// System.out.println(className + ":" + getClassNamePrefix(className, layer));
-// className = "org.apache.ccc";
-// System.out.println(className + ":" + getClassNamePrefix(className, layer));
-// className = "org.apache.inlong.ccc";
-// System.out.println(className + ":" + getClassNamePrefix(className, layer));
-// className = "org.apache.inlong.sort.ccc";
-// System.out.println(className + ":" + getClassNamePrefix(className, layer));
-// className = "org.apache.inlong.sort.standalone.ccc";
-// System.out.println(className + ":" + getClassNamePrefix(className, layer));
-// }
}
diff --git a/inlong-sort-standalone/sort-standalone-source/pom.xml b/inlong-sort-standalone/sort-standalone-source/pom.xml
index 630592a..3abc5a5 100644
--- a/inlong-sort-standalone/sort-standalone-source/pom.xml
+++ b/inlong-sort-standalone/sort-standalone-source/pom.xml
@@ -1,14 +1,22 @@
<?xml version="1.0" encoding="UTF-8"?>
-<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
- license agreements. See the NOTICE file distributed with this work for additional
- information regarding copyright ownership. The ASF licenses this file to
- you under the Apache License, Version 2.0 (the "License"); you may not use
- this file except in compliance with the License. You may obtain a copy of
- the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
- by applicable law or agreed to in writing, software distributed under the
- License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
- OF ANY KIND, either express or implied. See the License for the specific
- language governing permissions and limitations under the License. -->
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
@@ -26,34 +34,13 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<compiler.source>1.8</compiler.source>
<compiler.target>1.8</compiler.target>
- <junit.version>4.13</junit.version>
- <guava.version>19.0</guava.version>
- <skipTests>false</skipTests>
</properties>
<dependencies>
<dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>${guava.version}</version>
- </dependency>
- <dependency>
- <groupId>org.powermock</groupId>
- <artifactId>powermock-module-junit4</artifactId>
- <version>2.0.2</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.powermock</groupId>
- <artifactId>powermock-api-mockito2</artifactId>
- <version>2.0.2</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>${junit.version}</version>
- <scope>test</scope>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-standalone-common</artifactId>
+ <version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/PropertiesConfigurationProvider.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/PropertiesConfigurationProvider.java
new file mode 100644
index 0000000..853bfc2
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/PropertiesConfigurationProvider.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flume.conf.FlumeConfiguration;
+import org.apache.flume.node.AbstractConfigurationProvider;
+import org.slf4j.Logger;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+
+/**
+ *
+ * PropertiesConfigurationProvider
+ */
+public class PropertiesConfigurationProvider extends
+ AbstractConfigurationProvider {
+
+ public static final Logger LOG = InlongLoggerFactory.getLogger(PropertiesConfigurationProvider.class);
+
+ private final Map<String, String> flumeConf;
+
+ /**
+ * PropertiesConfigurationProvider
+ *
+ * @param agentName
+ * @param flumeConf
+ */
+ public PropertiesConfigurationProvider(String agentName, Map<String, String> flumeConf) {
+ super(agentName);
+ this.flumeConf = flumeConf;
+ }
+
+ /**
+ * getFlumeConfiguration
+ *
+ * @return
+ */
+ @Override
+ public FlumeConfiguration getFlumeConfiguration() {
+ try {
+ return new FlumeConfiguration(flumeConf);
+ } catch (Exception e) {
+ LOG.error("exception catch:" + e.getMessage(), e);
+ }
+ return new FlumeConfiguration(new HashMap<String, String>());
+ }
+}
\ No newline at end of file
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortCluster.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortCluster.java
new file mode 100644
index 0000000..c91f985
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortCluster.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone;
+
+import static org.apache.inlong.sort.standalone.utils.Constants.RELOAD_INTERVAL;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
+import org.apache.inlong.sort.standalone.config.pojo.SortClusterConfig;
+import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ *
+ * SortCluster
+ */
+public class SortCluster {
+
+ public static final Logger LOG = InlongLoggerFactory.getLogger(SortCluster.class);
+
+ private Timer reloadTimer;
+ private Map<String, SortTask> taskMap = new ConcurrentHashMap<>();
+ private List<SortTask> deletingTasks = new ArrayList<>();
+
+ /**
+ * start
+ */
+ public void start() {
+ try {
+ this.reload();
+ this.setReloadTimer();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * close
+ */
+ public void close() {
+ try {
+ this.reloadTimer.cancel();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * setReloadTimer
+ */
+ private void setReloadTimer() {
+ reloadTimer = new Timer(true);
+ TimerTask task = new TimerTask() {
+
+ /**
+ * run
+ */
+ public void run() {
+ reload();
+ }
+ };
+ long reloadInterval = CommonPropertiesHolder.getLong(RELOAD_INTERVAL, 60000L);
+ reloadTimer.schedule(task, new Date(System.currentTimeMillis() + reloadInterval), reloadInterval);
+ }
+
+ /**
+ * reload
+ */
+ public void reload() {
+ try {
+ // get new config
+ SortClusterConfig newConfig = SortClusterConfigHolder.getClusterConfig();
+ if (newConfig == null) {
+ return;
+ }
+ // add new task
+ for (SortTaskConfig taskConfig : newConfig.getSortTasks()) {
+ String newTaskName = taskConfig.getName();
+ if (taskMap.containsKey(newTaskName)) {
+ continue;
+ }
+ SortTask newTask = new SortTask(newTaskName);
+ newTask.start();
+ this.taskMap.put(newTaskName, newTask);
+ }
+ // remove task
+ for (Entry<String, SortTask> entry : taskMap.entrySet()) {
+ String taskName = entry.getKey();
+ boolean isFound = false;
+ for (SortTaskConfig taskConfig : newConfig.getSortTasks()) {
+ if (taskName.equals(taskConfig.getName())) {
+ isFound = true;
+ break;
+ }
+ }
+ if (!isFound) {
+ this.deletingTasks.add(entry.getValue());
+ }
+ }
+ // stop deleting task list
+ for (SortTask task : deletingTasks) {
+ task.stop();
+ taskMap.remove(task.getTaskName());
+ }
+ deletingTasks.clear();
+ } catch (Throwable e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortStandaloneApplication.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortStandaloneApplication.java
index c0dc143..48c0470 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortStandaloneApplication.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortStandaloneApplication.java
@@ -17,17 +17,36 @@
package org.apache.inlong.sort.standalone;
+import org.apache.flume.node.Application;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.metrics.MetricObserver;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
/**
*
- * Application
+ * SortStandaloneApplication
*/
public class SortStandaloneApplication {
+ public static final Logger LOG = InlongLoggerFactory.getLogger(Application.class);
+
/**
* main
*
* @param args
*/
public static void main(String[] args) {
+ LOG.info("start to sort-standalone");
+ try {
+ SortCluster cluster = new SortCluster();
+ //
+ cluster.start();
+ // metrics
+ MetricObserver.init(CommonPropertiesHolder.get());
+ Thread.sleep(5000);
+ } catch (Exception e) {
+ LOG.error("A fatal error occurred while running. Exception follows.", e);
+ }
}
-}
+}
\ No newline at end of file
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortTask.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortTask.java
new file mode 100644
index 0000000..7748ae6
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortTask.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.flume.Channel;
+import org.apache.flume.SinkRunner;
+import org.apache.flume.SourceRunner;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.flume.lifecycle.LifecycleSupervisor;
+import org.apache.flume.lifecycle.LifecycleSupervisor.SupervisorPolicy;
+import org.apache.flume.node.MaterializedConfiguration;
+import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
+import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+import com.google.common.eventbus.Subscribe;
+
+/**
+ *
+ * SortTask
+ */
+public class SortTask {
+
+ public static final Logger LOG = InlongLoggerFactory.getLogger(SortTask.class);
+
+ private final String taskName;
+ private final LifecycleSupervisor supervisor;
+ private MaterializedConfiguration materializedConfiguration;
+ private final ReentrantLock lifecycleLock = new ReentrantLock();
+
+ /**
+ * Constructor
+ *
+ * @param taskName
+ */
+ public SortTask(String taskName) {
+ this.taskName = taskName;
+ this.supervisor = new LifecycleSupervisor();
+ }
+
+ /**
+ * get taskName
+ *
+ * @return the taskName
+ */
+ public String getTaskName() {
+ return taskName;
+ }
+
+ /**
+ * start
+ */
+ public void start() {
+ SortTaskConfig config = SortClusterConfigHolder.getTaskConfig(taskName);
+ if (config == null) {
+ return;
+ }
+
+ //
+ Map<String, String> flumeConfiguration = config.generateFlumeConfiguration();
+ LOG.info("Start sort task:{},config:{}", taskName, flumeConfiguration);
+ PropertiesConfigurationProvider configurationProvider = new PropertiesConfigurationProvider(
+ config.getName(), flumeConfiguration);
+ this.handleConfigurationEvent(configurationProvider.getConfiguration());
+ }
+
+ /**
+ * handleConfigurationEvent
+ *
+ * @param conf
+ */
+ @Subscribe
+ public void handleConfigurationEvent(MaterializedConfiguration conf) {
+ try {
+ lifecycleLock.lockInterruptibly();
+ stopAllComponents();
+ startAllComponents(conf);
+ } catch (InterruptedException e) {
+ LOG.info("Interrupted while trying to handle configuration event");
+ return;
+ } finally {
+ // If interrupted while trying to lock, we don't own the lock, so must not attempt to unlock
+ if (lifecycleLock.isHeldByCurrentThread()) {
+ lifecycleLock.unlock();
+ }
+ }
+ }
+
+ /**
+ * stop
+ */
+ public void stop() {
+ lifecycleLock.lock();
+ stopAllComponents();
+ try {
+ supervisor.stop();
+ } finally {
+ lifecycleLock.unlock();
+ }
+ }
+
+ /**
+ * stopAllComponents
+ */
+ private void stopAllComponents() {
+ if (this.materializedConfiguration != null) {
+ LOG.info("Shutting down configuration: {}", this.materializedConfiguration);
+ for (Entry<String, SourceRunner> entry : this.materializedConfiguration.getSourceRunners().entrySet()) {
+ try {
+ LOG.info("Stopping Source " + entry.getKey());
+ supervisor.unsupervise(entry.getValue());
+ } catch (Exception e) {
+ LOG.error("Error while stopping {}", entry.getValue(), e);
+ }
+ }
+
+ for (Entry<String, SinkRunner> entry : this.materializedConfiguration.getSinkRunners().entrySet()) {
+ try {
+ LOG.info("Stopping Sink " + entry.getKey());
+ supervisor.unsupervise(entry.getValue());
+ } catch (Exception e) {
+ LOG.error("Error while stopping {}", entry.getValue(), e);
+ }
+ }
+
+ for (Entry<String, Channel> entry : this.materializedConfiguration.getChannels().entrySet()) {
+ try {
+ LOG.info("Stopping Channel " + entry.getKey());
+ supervisor.unsupervise(entry.getValue());
+ } catch (Exception e) {
+ LOG.error("Error while stopping {}", entry.getValue(), e);
+ }
+ }
+ }
+ }
+
+ /**
+ * startAllComponents
+ *
+ * @param materializedConfiguration
+ */
+ private void startAllComponents(MaterializedConfiguration materializedConfiguration) {
+ LOG.info("Starting new configuration:{}", materializedConfiguration);
+
+ this.materializedConfiguration = materializedConfiguration;
+
+ for (Entry<String, Channel> entry : materializedConfiguration.getChannels().entrySet()) {
+ try {
+ LOG.info("Starting Channel " + entry.getKey());
+ supervisor.supervise(entry.getValue(),
+ new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
+ } catch (Exception e) {
+ LOG.error("Error while starting {}", entry.getValue(), e);
+ }
+ }
+
+ /*
+ * Wait for all channels to start.
+ */
+ for (Channel ch : materializedConfiguration.getChannels().values()) {
+ while (ch.getLifecycleState() != LifecycleState.START
+ && !supervisor.isComponentInErrorState(ch)) {
+ try {
+ LOG.info("Waiting for channel: " + ch.getName() + " to start. Sleeping for 500 ms");
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted while waiting for channel to start.", e);
+ }
+ }
+ }
+
+ for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners().entrySet()) {
+ try {
+ LOG.info("Starting Sink " + entry.getKey());
+ supervisor.supervise(entry.getValue(),
+ new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
+ } catch (Exception e) {
+ LOG.error("Error while starting {}", entry.getValue(), e);
+ }
+ }
+
+ for (Entry<String, SourceRunner> entry : materializedConfiguration.getSourceRunners().entrySet()) {
+ try {
+ LOG.info("Starting Source " + entry.getKey());
+ supervisor.supervise(entry.getValue(),
+ new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
+ } catch (Exception e) {
+ LOG.error("Error while starting {}", entry.getValue(), e);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/BufferQueueChannel.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/BufferQueueChannel.java
new file mode 100644
index 0000000..b15a396
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/BufferQueueChannel.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.channel;
+
+import java.util.Date;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.AbstractChannel;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.utils.BufferQueue;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.apache.inlong.sort.standalone.utils.SizeSemaphore;
+import org.slf4j.Logger;
+
+import com.google.common.base.Preconditions;
+
+/**
+ *
+ * BufferQueueChannel
+ */
+public class BufferQueueChannel extends AbstractChannel {
+
+ public static final Logger LOG = InlongLoggerFactory.getLogger(BufferQueueChannel.class);
+
+ public static final String KEY_MAX_BUFFERQUEUE_SIZE_KB = "maxBufferQueueSizeKb";
+ public static final String KEY_RELOADINTERVAL = "reloadInterval";
+ public static final int DEFAULT_MAX_BUFFERQUEUE_SIZE_KB = 128 * 1024;
+
+ // global buffer size
+ private static SizeSemaphore globalBufferQueueSizeKb;
+ private BufferQueue<ProfileEvent> bufferQueue;
+ private ThreadLocal<ProfileTransaction> currentTransaction = new ThreadLocal<ProfileTransaction>();
+ protected Timer channelTimer;
+ private AtomicLong takeCounter = new AtomicLong(0);
+ private AtomicLong putCounter = new AtomicLong(0);
+
+ /**
+ * Constructor
+ */
+ public BufferQueueChannel() {
+ Context context = CommonPropertiesHolder.getContext();
+ SizeSemaphore globalSize = getGlobalBufferQueueSizeKb(context);
+ this.bufferQueue = new BufferQueue<>(globalSize.maxSize() / 3, globalSize);
+ }
+
+ /**
+ * put
+ *
+ * @param event
+ * @throws ChannelException
+ */
+ @Override
+ public void put(Event event) throws ChannelException {
+ putCounter.incrementAndGet();
+ int eventSize = event.getBody().length;
+ this.bufferQueue.acquire(eventSize);
+ ProfileTransaction transaction = currentTransaction.get();
+ Preconditions.checkState(transaction != null, "No transaction exists for this thread");
+ if (event instanceof ProfileEvent) {
+ ProfileEvent profile = (ProfileEvent) event;
+ transaction.doPut(profile);
+ } else {
+ ProfileEvent profile = new ProfileEvent(event.getBody(), event.getHeaders());
+ transaction.doPut(profile);
+ }
+ }
+
+ /**
+ * take
+ *
+ * @return Event
+ * @throws ChannelException
+ */
+ @Override
+ public Event take() throws ChannelException {
+ ProfileEvent event = this.bufferQueue.pollRecord();
+ if (event != null) {
+ ProfileTransaction transaction = currentTransaction.get();
+ Preconditions.checkState(transaction != null, "No transaction exists for this thread");
+ transaction.doTake(event);
+ takeCounter.incrementAndGet();
+ }
+ return event;
+ }
+
+ /**
+ * getTransaction
+ *
+ * @return
+ */
+ @Override
+ public Transaction getTransaction() {
+ ProfileTransaction newTransaction = new ProfileTransaction(this.bufferQueue);
+ this.currentTransaction.set(newTransaction);
+ return newTransaction;
+ }
+
+ /**
+ * start
+ */
+ @Override
+ public void start() {
+ super.start();
+ try {
+ this.setReloadTimer();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * setReloadTimer
+ */
+ protected void setReloadTimer() {
+ channelTimer = new Timer(true);
+ long reloadInterval = CommonPropertiesHolder.getLong(KEY_RELOADINTERVAL, 60000L);
+ TimerTask channelTask = new TimerTask() {
+
+ public void run() {
+ LOG.info("queueSize:{},availablePermits:{},put:{},take:{}",
+ bufferQueue.size(),
+ bufferQueue.availablePermits(),
+ putCounter.getAndSet(0),
+ takeCounter.getAndSet(0));
+ }
+ };
+ channelTimer.schedule(channelTask,
+ new Date(System.currentTimeMillis() + reloadInterval),
+ reloadInterval);
+ }
+
+ /**
+ * configure
+ *
+ * @param context
+ */
+ @Override
+ public void configure(Context context) {
+ }
+
+ /**
+ * getGlobalBufferQueueSizeKb
+ *
+ * @return
+ */
+ public static SizeSemaphore getGlobalBufferQueueSizeKb(Context context) {
+ if (globalBufferQueueSizeKb != null) {
+ return globalBufferQueueSizeKb;
+ }
+ synchronized (LOG) {
+ if (globalBufferQueueSizeKb != null) {
+ return globalBufferQueueSizeKb;
+ }
+ int maxBufferQueueSizeKb = context.getInteger(KEY_MAX_BUFFERQUEUE_SIZE_KB, DEFAULT_MAX_BUFFERQUEUE_SIZE_KB);
+ globalBufferQueueSizeKb = new SizeSemaphore(maxBufferQueueSizeKb, SizeSemaphore.ONEKB);
+ return globalBufferQueueSizeKb;
+ }
+ }
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileEvent.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileEvent.java
new file mode 100644
index 0000000..153cc25
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileEvent.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.channel;
+
+import java.util.Map;
+
+import org.apache.commons.lang.math.NumberUtils;
+import org.apache.flume.event.SimpleEvent;
+import org.apache.inlong.sort.standalone.config.pojo.InlongId;
+import org.apache.inlong.sort.standalone.utils.Constants;
+
+/**
+ *
+ * ProfileEvent
+ */
+public class ProfileEvent extends SimpleEvent {
+
+ private final String inlongGroupId;
+ private final String inlongStreamId;
+ private final String uid;
+
+ private final long rawLogTime;
+ private final long fetchTime;
+ private long sendTime;
+
+ /**
+ * Constructor
+ *
+ * @param body
+ * @param headers
+ */
+ public ProfileEvent(byte[] body, Map<String, String> headers) {
+ super.setBody(body);
+ super.setHeaders(headers);
+ this.inlongGroupId = headers.get(Constants.INLONG_GROUP_ID);
+ this.inlongStreamId = headers.get(Constants.INLONG_STREAM_ID);
+ this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
+ this.fetchTime = System.currentTimeMillis();
+ this.sendTime = fetchTime;
+ this.rawLogTime = NumberUtils.toLong(headers.get(Constants.HEADER_KEY_MSG_TIME), fetchTime);
+ }
+
+ /**
+ * get sendTime
+ *
+ * @return the sendTime
+ */
+ public long getSendTime() {
+ return sendTime;
+ }
+
+ /**
+ * set sendTime
+ *
+ * @param sendTime the sendTime to set
+ */
+ public void setSendTime(long sendTime) {
+ this.sendTime = sendTime;
+ }
+
+ /**
+ * get inlongGroupId
+ *
+ * @return the inlongGroupId
+ */
+ public String getInlongGroupId() {
+ return inlongGroupId;
+ }
+
+ /**
+ * get inlongStreamId
+ *
+ * @return the inlongStreamId
+ */
+ public String getInlongStreamId() {
+ return inlongStreamId;
+ }
+
+ /**
+ * get rawLogTime
+ *
+ * @return the rawLogTime
+ */
+ public long getRawLogTime() {
+ return rawLogTime;
+ }
+
+ /**
+ * get fetchTime
+ *
+ * @return the fetchTime
+ */
+ public long getFetchTime() {
+ return fetchTime;
+ }
+
+ /**
+ * get uid
+ *
+ * @return the uid
+ */
+ public String getUid() {
+ return uid;
+ }
+
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileTransaction.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileTransaction.java
new file mode 100644
index 0000000..a885224
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileTransaction.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.channel;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flume.Transaction;
+import org.apache.inlong.sort.standalone.utils.BufferQueue;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ *
+ * ProfileTransaction
+ */
+public class ProfileTransaction implements Transaction {
+
+ public static final Logger LOG = InlongLoggerFactory.getLogger(ProfileTransaction.class);
+
+ private BufferQueue<ProfileEvent> bufferQueue;
+ private List<ProfileEvent> takeList = new ArrayList<>();
+ private List<ProfileEvent> putList = new ArrayList<>();
+
+ /**
+ * Constructor
+ *
+ * @param bufferQueue
+ */
+ public ProfileTransaction(BufferQueue<ProfileEvent> bufferQueue) {
+ this.bufferQueue = bufferQueue;
+ }
+
+ /**
+ * begin
+ */
+ @Override
+ public void begin() {
+ }
+
+ /**
+ * commit
+ */
+ @Override
+ public void commit() {
+ for (ProfileEvent event : takeList) {
+ bufferQueue.release(event.getBody().length);
+ }
+ this.takeList.clear();
+ for (ProfileEvent event : putList) {
+ this.bufferQueue.offer(event);
+ }
+ this.putList.clear();
+ }
+
+ /**
+ * rollback
+ */
+ @Override
+ public void rollback() {
+ for (ProfileEvent event : takeList) {
+ this.bufferQueue.offer(event);
+ }
+ this.takeList.clear();
+ for (ProfileEvent event : putList) {
+ bufferQueue.release(event.getBody().length);
+ }
+ this.putList.clear();
+ }
+
+ /**
+ * close
+ */
+ @Override
+ public void close() {
+ }
+
+ /**
+ * doTake
+ *
+ * @param event
+ */
+ public void doTake(ProfileEvent event) {
+ this.takeList.add(event);
+ }
+
+ /**
+ * doPut
+ *
+ * @param event
+ */
+ public void doPut(ProfileEvent event) {
+ this.putList.add(event);
+ }
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/dispatch/DispatchManager.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/dispatch/DispatchManager.java
new file mode 100644
index 0000000..3d9a326
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/dispatch/DispatchManager.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.dispatch;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.flume.Context;
+import org.apache.inlong.sort.standalone.channel.BufferQueueChannel;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ * DispatchManager
+ */
+public class DispatchManager {
+
+ public static final Logger LOG = InlongLoggerFactory.getLogger(BufferQueueChannel.class);
+ public static final String KEY_DISPATCH_TIMEOUT = "dispatchTimeout";
+ public static final String KEY_DISPATCH_MAX_PACKCOUNT = "dispatchMaxPackCount";
+ public static final String KEY_DISPATCH_MAX_PACKSIZE = "dispatchMaxPackSize";
+ public static final long DEFAULT_DISPATCH_TIMEOUT = 2000;
+ public static final long DEFAULT_DISPATCH_MAX_PACKCOUNT = 256;
+ public static final long DEFAULT_DISPATCH_MAX_PACKSIZE = 327680;
+
+ private LinkedBlockingQueue<DispatchProfile> dispatchQueue;
+ private final long dispatchTimeout;
+ private final long maxPackCount;
+ private final long maxPackSize;
+ private ConcurrentHashMap<String, DispatchProfile> profileCache = new ConcurrentHashMap<>();
+ //
+ private AtomicBoolean needOutputOvertimeData = new AtomicBoolean(false);
+
+ /**
+ * Constructor
+ *
+ * @param context
+ * @param dispatchQueue
+ */
+ public DispatchManager(Context context, LinkedBlockingQueue<DispatchProfile> dispatchQueue) {
+ this.dispatchQueue = dispatchQueue;
+ this.dispatchTimeout = context.getLong(KEY_DISPATCH_TIMEOUT, DEFAULT_DISPATCH_TIMEOUT);
+ this.maxPackCount = context.getLong(KEY_DISPATCH_MAX_PACKCOUNT, DEFAULT_DISPATCH_MAX_PACKCOUNT);
+ this.maxPackSize = context.getLong(KEY_DISPATCH_MAX_PACKSIZE, DEFAULT_DISPATCH_MAX_PACKSIZE);
+ }
+
+ /**
+ * addEvent
+ *
+ * @param event
+ */
+ public void addEvent(ProfileEvent event) {
+ if (needOutputOvertimeData.get()) {
+ this.outputOvertimeData();
+ this.needOutputOvertimeData.set(false);
+ }
+ // parse
+ String uid = event.getUid();
+ //
+ DispatchProfile dispatchProfile = this.profileCache.get(uid);
+ if (dispatchProfile == null) {
+ dispatchProfile = new DispatchProfile(uid, event.getInlongGroupId(), event.getInlongStreamId());
+ this.profileCache.put(uid, dispatchProfile);
+ }
+ //
+ boolean addResult = dispatchProfile.addEvent(event, maxPackCount, maxPackSize);
+ if (!addResult) {
+ DispatchProfile newDispatchProfile = new DispatchProfile(uid, event.getInlongGroupId(),
+ event.getInlongStreamId());
+ DispatchProfile oldDispatchProfile = this.profileCache.put(uid, newDispatchProfile);
+ this.dispatchQueue.offer(oldDispatchProfile);
+ newDispatchProfile.addEvent(event, maxPackCount, maxPackSize);
+ }
+ }
+
+ /**
+ * outputOvertimeData
+ *
+ * @return
+ */
+ public void outputOvertimeData() {
+ LOG.info("start to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{}",
+ profileCache.size(), dispatchQueue.size());
+ long currentTime = System.currentTimeMillis();
+ long createThreshold = currentTime - dispatchTimeout;
+ List<String> removeKeys = new ArrayList<>();
+ long eventCount = 0;
+ for (Entry<String, DispatchProfile> entry : this.profileCache.entrySet()) {
+ DispatchProfile dispatchProfile = entry.getValue();
+ eventCount += dispatchProfile.getCount();
+ if (!dispatchProfile.isTimeout(createThreshold)) {
+ continue;
+ }
+ removeKeys.add(entry.getKey());
+ }
+ // output
+ removeKeys.forEach((key) -> {
+ dispatchQueue.offer(this.profileCache.remove(key));
+ });
+ LOG.info("end to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{},eventCount:{}",
+ profileCache.size(), dispatchQueue.size(), eventCount);
+ }
+
+ /**
+ * get dispatchTimeout
+ *
+ * @return the dispatchTimeout
+ */
+ public long getDispatchTimeout() {
+ return dispatchTimeout;
+ }
+
+ /**
+ * get maxPackCount
+ *
+ * @return the maxPackCount
+ */
+ public long getMaxPackCount() {
+ return maxPackCount;
+ }
+
+ /**
+ * get maxPackSize
+ *
+ * @return the maxPackSize
+ */
+ public long getMaxPackSize() {
+ return maxPackSize;
+ }
+
+ /**
+ * setNeedOutputOvertimeData
+ */
+ public void setNeedOutputOvertimeData() {
+ this.needOutputOvertimeData.set(true);
+ }
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/dispatch/DispatchProfile.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/dispatch/DispatchProfile.java
new file mode 100644
index 0000000..2a5d5c6
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/dispatch/DispatchProfile.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.dispatch;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+
+/**
+ *
+ * DispatchProfile
+ */
+public class DispatchProfile {
+
+ private final String inlongGroupId;
+ private final String inlongStreamId;
+ private final String uid;
+ private List<ProfileEvent> events = new ArrayList<>();
+ private long createTime = System.currentTimeMillis();
+ private long count = 0;
+ private long size = 0;
+ private long minEventTime = System.currentTimeMillis();
+
+ /**
+ * Constructor
+ *
+ * @param uid
+ * @param inlongGroupId
+ * @param inlongStreamId
+ */
+ public DispatchProfile(String uid, String inlongGroupId, String inlongStreamId) {
+ this.uid = uid;
+ this.inlongGroupId = inlongGroupId;
+ this.inlongStreamId = inlongStreamId;
+ }
+
+ /**
+ * addEvent
+ *
+ * @param event
+ * @param maxPackCount
+ * @param maxPackSize
+ * @return
+ */
+ public boolean addEvent(ProfileEvent event, long maxPackCount, long maxPackSize) {
+ long eventLength = event.getBody().length;
+ if (count >= maxPackCount || (count > 0 && size + eventLength > maxPackSize)) {
+ return false;
+ }
+ this.events.add(event);
+ this.count++;
+ this.size += eventLength;
+ this.minEventTime = Math.min(minEventTime, event.getRawLogTime());
+ return true;
+ }
+
+ /**
+ * isTimeout
+ *
+ * @param createThreshold
+ * @return
+ */
+ public boolean isTimeout(long createThreshold) {
+ return createThreshold >= createTime;
+ }
+
+ /**
+ * get uid
+ *
+ * @return the uid
+ */
+ public String getUid() {
+ return uid;
+ }
+
+ /**
+ * get events
+ *
+ * @return the events
+ */
+ public List<ProfileEvent> getEvents() {
+ return events;
+ }
+
+ /**
+ * set events
+ *
+ * @param events the events to set
+ */
+ public void setEvents(List<ProfileEvent> events) {
+ this.events = events;
+ }
+
+ /**
+ * get count
+ *
+ * @return the count
+ */
+ public long getCount() {
+ return count;
+ }
+
+ /**
+ * set count
+ *
+ * @param count the count to set
+ */
+ public void setCount(long count) {
+ this.count = count;
+ }
+
+ /**
+ * get size
+ *
+ * @return the size
+ */
+ public long getSize() {
+ return size;
+ }
+
+ /**
+ * set size
+ *
+ * @param size the size to set
+ */
+ public void setSize(long size) {
+ this.size = size;
+ }
+
+ /**
+ * get minEventTime
+ *
+ * @return the minEventTime
+ */
+ public long getMinEventTime() {
+ return minEventTime;
+ }
+
+ /**
+ * get inlongGroupId
+ *
+ * @return the inlongGroupId
+ */
+ public String getInlongGroupId() {
+ return inlongGroupId;
+ }
+
+ /**
+ * get inlongStreamId
+ *
+ * @return the inlongStreamId
+ */
+ public String getInlongStreamId() {
+ return inlongStreamId;
+ }
+
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
new file mode 100644
index 0000000..8dac973
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink;
+
+import java.util.Date;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.inlong.commons.config.metrics.MetricRegister;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
+import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
+import org.apache.inlong.sort.standalone.metrics.SortMetricItemSet;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ *
+ * SinkContext
+ */
+public class SinkContext {
+
+ public static final Logger LOG = InlongLoggerFactory.getLogger(SinkContext.class);
+
+ public static final String KEY_MAX_THREADS = "maxThreads";
+ public static final String KEY_PROCESSINTERVAL = "processInterval";
+ public static final String KEY_RELOADINTERVAL = "reloadInterval";
+
+ protected final String clusterId;
+ protected final String taskName;
+ protected final String sinkName;
+ protected final Context sinkContext;
+
+ protected SortTaskConfig sortTaskConfig;
+
+ protected final Channel channel;
+ //
+ protected final int maxThreads;
+ protected final long processInterval;
+ protected final long reloadInterval;
+ //
+ protected final SortMetricItemSet metricItemSet;
+ protected Timer reloadTimer;
+
+ /**
+ * Constructor
+ *
+ * @param sinkName
+ * @param context
+ * @param channel
+ */
+ public SinkContext(String sinkName, Context context, Channel channel) {
+ this.sinkName = sinkName;
+ this.sinkContext = context;
+ this.channel = channel;
+ this.clusterId = context.getString(CommonPropertiesHolder.KEY_CLUSTER_ID);
+ this.taskName = context.getString(SortTaskConfig.KEY_TASK_NAME);
+ this.maxThreads = sinkContext.getInteger(KEY_MAX_THREADS, 10);
+ this.processInterval = sinkContext.getInteger(KEY_PROCESSINTERVAL, 100);
+ this.reloadInterval = sinkContext.getLong(KEY_RELOADINTERVAL, 60000L);
+ //
+ this.metricItemSet = new SortMetricItemSet(sinkName);
+ MetricRegister.register(this.metricItemSet);
+ }
+
+ /**
+ * start
+ */
+ public void start() {
+ try {
+ this.reload();
+ this.setReloadTimer();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * close
+ */
+ public void close() {
+ try {
+ this.reloadTimer.cancel();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * setReloadTimer
+ */
+ protected void setReloadTimer() {
+ reloadTimer = new Timer(true);
+ TimerTask task = new TimerTask() {
+
+ public void run() {
+ reload();
+ }
+ };
+ reloadTimer.schedule(task, new Date(System.currentTimeMillis() + reloadInterval), reloadInterval);
+ }
+
+ /**
+ * reload
+ */
+ public void reload() {
+ try {
+ this.sortTaskConfig = SortClusterConfigHolder.getTaskConfig(taskName);
+ } catch (Throwable e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * get clusterId
+ *
+ * @return the clusterId
+ */
+ public String getClusterId() {
+ return clusterId;
+ }
+
+ /**
+ * get taskName
+ *
+ * @return the taskName
+ */
+ public String getTaskName() {
+ return taskName;
+ }
+
+ /**
+ * get sinkName
+ *
+ * @return the sinkName
+ */
+ public String getSinkName() {
+ return sinkName;
+ }
+
+ /**
+ * get sinkContext
+ *
+ * @return the sinkContext
+ */
+ public Context getSinkContext() {
+ return sinkContext;
+ }
+
+ /**
+ * get sortTaskConfig
+ *
+ * @return the sortTaskConfig
+ */
+ public SortTaskConfig getSortTaskConfig() {
+ return sortTaskConfig;
+ }
+
+ /**
+ * get channel
+ *
+ * @return the channel
+ */
+ public Channel getChannel() {
+ return channel;
+ }
+
+ /**
+ * get maxThreads
+ *
+ * @return the maxThreads
+ */
+ public int getMaxThreads() {
+ return maxThreads;
+ }
+
+ /**
+ * get processInterval
+ *
+ * @return the processInterval
+ */
+ public long getProcessInterval() {
+ return processInterval;
+ }
+
+ /**
+ * get reloadInterval
+ *
+ * @return the reloadInterval
+ */
+ public long getReloadInterval() {
+ return reloadInterval;
+ }
+
+ /**
+ * get metricItemSet
+ *
+ * @return the metricItemSet
+ */
+ public SortMetricItemSet getMetricItemSet() {
+ return metricItemSet;
+ }
+}