You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/12/09 04:39:59 UTC

[incubator-inlong] branch master updated: [INLONG-1891] Inlong-Sort-Standalone add sort-standalone-source module. (#1930)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new cc35ea4  [INLONG-1891] Inlong-Sort-Standalone add sort-standalone-source module. (#1930)
cc35ea4 is described below

commit cc35ea43ddbf62bc73e1049c518039e832055193
Author: 卢春亮 <94...@qq.com>
AuthorDate: Thu Dec 9 12:39:50 2021 +0800

    [INLONG-1891] Inlong-Sort-Standalone add sort-standalone-source module. (#1930)
---
 inlong-sort-standalone/pom.xml                     |   5 +-
 .../sort-standalone-common/pom.xml                 |  28 ++-
 .../config/holder/SortClusterConfigHolder.java     |   1 -
 .../config/pojo/SortClusterResponse.java           | 108 ----------
 .../standalone/config/pojo/type/CacheType.java     |   4 +-
 .../sort/standalone/config/pojo/type/DataType.java |   4 +-
 .../sort/standalone/config/pojo/type/SortType.java |   4 +-
 .../sort/standalone/utils/InlongLoggerFactory.java |  22 ---
 .../sort-standalone-source/pom.xml                 |  55 ++----
 .../PropertiesConfigurationProvider.java           |  64 ++++++
 .../apache/inlong/sort/standalone/SortCluster.java | 135 +++++++++++++
 .../sort/standalone/SortStandaloneApplication.java |  23 ++-
 .../apache/inlong/sort/standalone/SortTask.java    | 212 ++++++++++++++++++++
 .../standalone/channel/BufferQueueChannel.java     | 180 +++++++++++++++++
 .../sort/standalone/channel/ProfileEvent.java      | 121 ++++++++++++
 .../standalone/channel/ProfileTransaction.java     | 110 +++++++++++
 .../sort/standalone/dispatch/DispatchManager.java  | 157 +++++++++++++++
 .../sort/standalone/dispatch/DispatchProfile.java  | 173 ++++++++++++++++
 .../inlong/sort/standalone/sink/SinkContext.java   | 220 +++++++++++++++++++++
 19 files changed, 1441 insertions(+), 185 deletions(-)

diff --git a/inlong-sort-standalone/pom.xml b/inlong-sort-standalone/pom.xml
index c329ff1..0672876 100644
--- a/inlong-sort-standalone/pom.xml
+++ b/inlong-sort-standalone/pom.xml
@@ -44,6 +44,7 @@
         <plugin.assembly.version>3.2.0</plugin.assembly.version>
         <pulsar.version>2.7.2</pulsar.version>
         <junit.version>4.13</junit.version>
+        <powermock.version>2.0.2</powermock.version>
         <guava.version>19.0</guava.version>
         <skipTests>false</skipTests>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@@ -85,13 +86,13 @@
         <dependency>
             <groupId>org.powermock</groupId>
             <artifactId>powermock-module-junit4</artifactId>
-            <version>2.0.2</version>
+            <version>${powermock.version}</version>
             <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.powermock</groupId>
             <artifactId>powermock-api-mockito2</artifactId>
-            <version>2.0.2</version>
+            <version>${powermock.version}</version>
             <scope>test</scope>
         </dependency>
         <dependency>
diff --git a/inlong-sort-standalone/sort-standalone-common/pom.xml b/inlong-sort-standalone/sort-standalone-common/pom.xml
index 4c99a28..c4fdccc 100644
--- a/inlong-sort-standalone/sort-standalone-common/pom.xml
+++ b/inlong-sort-standalone/sort-standalone-common/pom.xml
@@ -1,14 +1,22 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
-    license agreements. See the NOTICE file distributed with this work for additional 
-    information regarding copyright ownership. The ASF licenses this file to 
-    you under the Apache License, Version 2.0 (the "License"); you may not use 
-    this file except in compliance with the License. You may obtain a copy of 
-    the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
-    by applicable law or agreed to in writing, software distributed under the 
-    License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
-    OF ANY KIND, either express or implied. See the License for the specific 
-    language governing permissions and limitations under the License. -->
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
 
 <project xmlns="http://maven.apache.org/POM/4.0.0"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/SortClusterConfigHolder.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/SortClusterConfigHolder.java
index 67b4b6a..d593db1 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/SortClusterConfigHolder.java
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/SortClusterConfigHolder.java
@@ -78,7 +78,6 @@ public final class SortClusterConfigHolder {
                 }
             } catch (Throwable t) {
                 LOG.error("Fail to init loader,loaderType:{},error:{}", loaderType, t.getMessage());
-                LOG.error(t.getMessage(), t);
             }
             if (instance.loader == null) {
                 instance.loader = new ClassResourceSortClusterConfigLoader();
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortClusterResponse.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortClusterResponse.java
index a04c24f..3d0faa6 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortClusterResponse.java
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/SortClusterResponse.java
@@ -104,112 +104,4 @@ public class SortClusterResponse {
     public void setData(SortClusterConfig data) {
         this.data = data;
     }
-//
-//    /**
-//     * generateTdbankConfig
-//     * 
-//     * @return
-//     */
-//    public static SortClusterConfig generateTdbankConfig() {
-//        SortClusterConfig clusterConfig = new SortClusterConfig();
-//        clusterConfig.setClusterName("tdbankv3-sz-sz1");
-//        //
-//        List<SortTaskConfig> sortTasks = new ArrayList<>();
-//        clusterConfig.setSortTasks(sortTasks);
-//        SortTaskConfig taskConfig = new SortTaskConfig();
-//        sortTasks.add(taskConfig);
-//        taskConfig.setName("sid_tdbank_atta6th_v3");
-//        taskConfig.setType(SortType.TQTDBANK);
-//        //
-//        Map<String, String> sinkParams = new HashMap<>();
-//        taskConfig.setSinkParams(sinkParams);
-//        sinkParams.put("b_pcg_venus_szrecone_124_153_utf8", "10.56.15.195:46801,10.56.15.212:46801,"
-//                + "10.56.15.220:46801,10.56.15.221:46801,"
-//                + "10.56.15.230:46801,10.56.16.20:46801,10.56.16.38:46801,10.56.20.21:46801,10.56.20.80:46801,"
-//                + "10.56.20.85:46801,10.56.209.205:46801,10.56.21.17:46801,10.56.21.20:46801,10.56.21.79:46801,"
-//                + "10.56.21.85:46801,10.56.81.205:46801,10.56.81.211:46801,10.56.82.11:46801,10.56.82.12:46801,"
-//                + "10.56.82.37:46801,10.56.82.38:46801,10.56.82.40:46801,10.56.83.143:46801,10.56.83.80:46801,"
-//                + "10.56.84.17:46801");
-//        //
-//        List<Map<String, String>> idParams = new ArrayList<>();
-//        Map<String, String> idParam = new HashMap<>();
-//        idParams.add(idParam);
-//        taskConfig.setIdParams(idParams);
-//        idParam.put(Constants.INLONG_GROUP_ID, "0fc00000046");
-//        idParam.put(Constants.INLONG_STREAM_ID, "");
-//        idParam.put(TdbankConfig.KEY_BID, "b_pcg_venus_szrecone_124_153_utf8");
-//        idParam.put(TdbankConfig.KEY_TID, "t_sh_atta_v2_0fc00000046");
-//        idParam.put(TdbankConfig.KEY_DATA_TYPE, TdbankConfig.DATA_TYPE_ATTA_TEXT);
-//        return clusterConfig;
-//    }
-//
-//    /**
-//     * generateCdmqConfig
-//     * 
-//     * @return
-//     */
-//    public static SortClusterConfig generateCdmqConfig() {
-//        SortClusterConfig clusterConfig = new SortClusterConfig();
-//        clusterConfig.setClusterName("cdmqv3-sz-sz1");
-//        //
-//        List<SortTaskConfig> sortTasks = new ArrayList<>();
-//        clusterConfig.setSortTasks(sortTasks);
-//        SortTaskConfig taskConfig = new SortTaskConfig();
-//        sortTasks.add(taskConfig);
-//        taskConfig.setName("sid_cdmq_kg_videorequest_v3");
-//        taskConfig.setType(SortType.CDMQ);
-//        //
-//        Map<String, String> sinkParams = new HashMap<>();
-//        taskConfig.setSinkParams(sinkParams);
-//        sinkParams.put("cdmqAccessPoint", "cdmqszentry01.data.mig:10005,cdmqszentry05.data.mig:10033");
-//        sinkParams.put("cdmqClusterId", "kg_videorequest");
-//        sinkParams.put("clientId", "p_video_atta_196");
-//        sinkParams.put("batchSize", "122880");
-//        sinkParams.put("maxRequestSize", "8388608");
-//        sinkParams.put("lingerMs", "150");
-//        //
-//        List<Map<String, String>> idParams = new ArrayList<>();
-//        Map<String, String> idParam = new HashMap<>();
-//        idParams.add(idParam);
-//        taskConfig.setIdParams(idParams);
-//        idParam.put(Constants.INLONG_GROUP_ID, "0fc00000046");
-//        idParam.put(Constants.TOPIC, "U_TOPIC_0fc00000046");
-//        return clusterConfig;
-//    }
-//
-//    /**
-//     * main
-//     * 
-//     * @param args
-//     */
-//    public static void main(String[] args) {
-//        // tdbank
-//        {
-//            SortClusterConfig config = generateTdbankConfig();
-//            String configString = JSON.toJSONString(config, false);
-//            System.out.println("tdbank:" + configString);
-//            String md5 = DigestUtils.md5Hex(configString);
-//            SortClusterResponse response = new SortClusterResponse();
-//            response.setResult(true);
-//            response.setErrCode(SUCC);
-//            response.setMd5(md5);
-//            response.setData(config);
-//            String responseString = JSON.toJSONString(response, true);
-//            System.out.println("tdbank responseString:" + responseString);
-//        }
-//        // cdmq
-//        {
-//            SortClusterConfig config = generateCdmqConfig();
-//            String configString = JSON.toJSONString(config, false);
-//            System.out.println("cdmq:" + configString);
-//            String md5 = DigestUtils.md5Hex(configString);
-//            SortClusterResponse response = new SortClusterResponse();
-//            response.setResult(true);
-//            response.setErrCode(SUCC);
-//            response.setMd5(md5);
-//            response.setData(config);
-//            String responseString = JSON.toJSONString(response, true);
-//            System.out.println("cdmq responseString:" + responseString);
-//        }
-//    }
 }
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/CacheType.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/CacheType.java
index 963dc6e..845ccaa 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/CacheType.java
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/CacheType.java
@@ -22,7 +22,7 @@ package org.apache.inlong.sort.standalone.config.pojo.type;
  */
 public enum CacheType {
 
-    TUBE("tube"), KAFKA("kafka"), PULSAR("pulsar"), N("n");
+    TUBE("tube"), KAFKA("kafka"), PULSAR("pulsar"), UNKNOWN("n");
 
     private final String value;
 
@@ -65,6 +65,6 @@ public enum CacheType {
                 return v;
             }
         }
-        return N;
+        return UNKNOWN;
     }
 }
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/DataType.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/DataType.java
index 882e29e..d5a7d64 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/DataType.java
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/DataType.java
@@ -22,7 +22,7 @@ package org.apache.inlong.sort.standalone.config.pojo.type;
  */
 public enum DataType {
 
-    TEXT("text"), PB("pb"), JCE("jce"), N("n");
+    TEXT("text"), PB("pb"), JCE("jce"), UNKNOWN("n");
 
     private final String value;
 
@@ -65,6 +65,6 @@ public enum DataType {
                 return v;
             }
         }
-        return N;
+        return UNKNOWN;
     }
 }
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/SortType.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/SortType.java
index e066928..6444821 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/SortType.java
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/type/SortType.java
@@ -24,7 +24,7 @@ package org.apache.inlong.sort.standalone.config.pojo.type;
 public enum SortType {
 
     HIVE("hive"), TUBE("tube"), KAFKA("kafka"), PULSAR("pulsar"), ElasticSearch("ElasticSearch"), THTDBANK(
-            "thtdbank"), TQTDBANK("tqtdbank"), CDMQ("cdmq"), N("n");
+            "thtdbank"), TQTDBANK("tqtdbank"), CDMQ("cdmq"), UNKNOWN("n");
 
     private final String value;
 
@@ -67,6 +67,6 @@ public enum SortType {
                 return v;
             }
         }
-        return N;
+        return UNKNOWN;
     }
 }
\ No newline at end of file
diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/InlongLoggerFactory.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/InlongLoggerFactory.java
index d4bdea7..274483a 100644
--- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/InlongLoggerFactory.java
+++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/InlongLoggerFactory.java
@@ -60,26 +60,4 @@ public class InlongLoggerFactory {
         String namePrefix = className.substring(0, index);
         return namePrefix;
     }
-
-//    /**
-//     * main
-//     * @param args
-//     */
-//    public static void main(String[] args) {
-//        int layer = 3;
-//        String className = "";
-//        System.out.println(className + ":" + getClassNamePrefix(className, layer));
-//        className = "ccc";
-//        System.out.println(className + ":" + getClassNamePrefix(className, layer));
-//        className = "org.ccc";
-//        System.out.println(className + ":" + getClassNamePrefix(className, layer));
-//        className = "org.apache.ccc";
-//        System.out.println(className + ":" + getClassNamePrefix(className, layer));
-//        className = "org.apache.inlong.ccc";
-//        System.out.println(className + ":" + getClassNamePrefix(className, layer));
-//        className = "org.apache.inlong.sort.ccc";
-//        System.out.println(className + ":" + getClassNamePrefix(className, layer));
-//        className = "org.apache.inlong.sort.standalone.ccc";
-//        System.out.println(className + ":" + getClassNamePrefix(className, layer));
-//    }
 }
diff --git a/inlong-sort-standalone/sort-standalone-source/pom.xml b/inlong-sort-standalone/sort-standalone-source/pom.xml
index 630592a..3abc5a5 100644
--- a/inlong-sort-standalone/sort-standalone-source/pom.xml
+++ b/inlong-sort-standalone/sort-standalone-source/pom.xml
@@ -1,14 +1,22 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
-    license agreements. See the NOTICE file distributed with this work for additional 
-    information regarding copyright ownership. The ASF licenses this file to 
-    you under the Apache License, Version 2.0 (the "License"); you may not use 
-    this file except in compliance with the License. You may obtain a copy of 
-    the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
-    by applicable law or agreed to in writing, software distributed under the 
-    License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
-    OF ANY KIND, either express or implied. See the License for the specific 
-    language governing permissions and limitations under the License. -->
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
 
 <project xmlns="http://maven.apache.org/POM/4.0.0"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
@@ -26,34 +34,13 @@
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <compiler.source>1.8</compiler.source>
         <compiler.target>1.8</compiler.target>
-        <junit.version>4.13</junit.version>
-        <guava.version>19.0</guava.version>
-        <skipTests>false</skipTests>
     </properties>
 
     <dependencies>
         <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-            <version>${guava.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.powermock</groupId>
-            <artifactId>powermock-module-junit4</artifactId>
-            <version>2.0.2</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.powermock</groupId>
-            <artifactId>powermock-api-mockito2</artifactId>
-            <version>2.0.2</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <version>${junit.version}</version>
-            <scope>test</scope>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-standalone-common</artifactId>
+            <version>${project.version}</version>
         </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/PropertiesConfigurationProvider.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/PropertiesConfigurationProvider.java
new file mode 100644
index 0000000..853bfc2
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/PropertiesConfigurationProvider.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flume.conf.FlumeConfiguration;
+import org.apache.flume.node.AbstractConfigurationProvider;
+import org.slf4j.Logger;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+
+/**
+ * 
+ * PropertiesConfigurationProvider
+ */
+public class PropertiesConfigurationProvider extends
+        AbstractConfigurationProvider {
+
+    public static final Logger LOG = InlongLoggerFactory.getLogger(PropertiesConfigurationProvider.class);
+
+    private final Map<String, String> flumeConf;
+
+    /**
+     * PropertiesConfigurationProvider
+     * 
+     * @param agentName
+     * @param flumeConf
+     */
+    public PropertiesConfigurationProvider(String agentName, Map<String, String> flumeConf) {
+        super(agentName);
+        this.flumeConf = flumeConf;
+    }
+
+    /**
+     * getFlumeConfiguration
+     * 
+     * @return
+     */
+    @Override
+    public FlumeConfiguration getFlumeConfiguration() {
+        try {
+            return new FlumeConfiguration(flumeConf);
+        } catch (Exception e) {
+            LOG.error("exception catch:" + e.getMessage(), e);
+        }
+        return new FlumeConfiguration(new HashMap<String, String>());
+    }
+}
\ No newline at end of file
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortCluster.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortCluster.java
new file mode 100644
index 0000000..c91f985
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortCluster.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone;
+
+import static org.apache.inlong.sort.standalone.utils.Constants.RELOAD_INTERVAL;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
+import org.apache.inlong.sort.standalone.config.pojo.SortClusterConfig;
+import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ * 
+ * SortCluster
+ */
+public class SortCluster {
+
+    public static final Logger LOG = InlongLoggerFactory.getLogger(SortCluster.class);
+
+    private Timer reloadTimer;
+    private Map<String, SortTask> taskMap = new ConcurrentHashMap<>();
+    private List<SortTask> deletingTasks = new ArrayList<>();
+
+    /**
+     * start
+     */
+    public void start() {
+        try {
+            this.reload();
+            this.setReloadTimer();
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * close
+     */
+    public void close() {
+        try {
+            this.reloadTimer.cancel();
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * setReloadTimer
+     */
+    private void setReloadTimer() {
+        reloadTimer = new Timer(true);
+        TimerTask task = new TimerTask() {
+
+            /**
+             * run
+             */
+            public void run() {
+                reload();
+            }
+        };
+        long reloadInterval = CommonPropertiesHolder.getLong(RELOAD_INTERVAL, 60000L);
+        reloadTimer.schedule(task, new Date(System.currentTimeMillis() + reloadInterval), reloadInterval);
+    }
+
+    /**
+     * reload
+     */
+    public void reload() {
+        try {
+            // get new config
+            SortClusterConfig newConfig = SortClusterConfigHolder.getClusterConfig();
+            if (newConfig == null) {
+                return;
+            }
+            // add new task
+            for (SortTaskConfig taskConfig : newConfig.getSortTasks()) {
+                String newTaskName = taskConfig.getName();
+                if (taskMap.containsKey(newTaskName)) {
+                    continue;
+                }
+                SortTask newTask = new SortTask(newTaskName);
+                newTask.start();
+                this.taskMap.put(newTaskName, newTask);
+            }
+            // remove task
+            for (Entry<String, SortTask> entry : taskMap.entrySet()) {
+                String taskName = entry.getKey();
+                boolean isFound = false;
+                for (SortTaskConfig taskConfig : newConfig.getSortTasks()) {
+                    if (taskName.equals(taskConfig.getName())) {
+                        isFound = true;
+                        break;
+                    }
+                }
+                if (!isFound) {
+                    this.deletingTasks.add(entry.getValue());
+                }
+            }
+            // stop deleting task list
+            for (SortTask task : deletingTasks) {
+                task.stop();
+                taskMap.remove(task.getTaskName());
+            }
+            deletingTasks.clear();
+        } catch (Throwable e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortStandaloneApplication.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortStandaloneApplication.java
index c0dc143..48c0470 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortStandaloneApplication.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortStandaloneApplication.java
@@ -17,17 +17,36 @@
 
 package org.apache.inlong.sort.standalone;
 
+import org.apache.flume.node.Application;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.metrics.MetricObserver;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
 /**
  * 
- * Application
+ * SortStandaloneApplication
  */
 public class SortStandaloneApplication {
 
+    public static final Logger LOG = InlongLoggerFactory.getLogger(Application.class);
+
     /**
      * main
      * 
      * @param args
      */
     public static void main(String[] args) {
+        LOG.info("start to sort-standalone");
+        try {
+            SortCluster cluster = new SortCluster();
+            //
+            cluster.start();
+            // metrics
+            MetricObserver.init(CommonPropertiesHolder.get());
+            Thread.sleep(5000);
+        } catch (Exception e) {
+            LOG.error("A fatal error occurred while running. Exception follows.", e);
+        }
     }
-}
+}
\ No newline at end of file
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortTask.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortTask.java
new file mode 100644
index 0000000..7748ae6
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortTask.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.flume.Channel;
+import org.apache.flume.SinkRunner;
+import org.apache.flume.SourceRunner;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.flume.lifecycle.LifecycleSupervisor;
+import org.apache.flume.lifecycle.LifecycleSupervisor.SupervisorPolicy;
+import org.apache.flume.node.MaterializedConfiguration;
+import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
+import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+import com.google.common.eventbus.Subscribe;
+
+/**
+ * 
+ * SortTask
+ */
+public class SortTask {
+
+    public static final Logger LOG = InlongLoggerFactory.getLogger(SortTask.class);
+
+    private final String taskName;
+    private final LifecycleSupervisor supervisor;
+    private MaterializedConfiguration materializedConfiguration;
+    private final ReentrantLock lifecycleLock = new ReentrantLock();
+
+    /**
+     * Constructor
+     * 
+     * @param taskName
+     */
+    public SortTask(String taskName) {
+        this.taskName = taskName;
+        this.supervisor = new LifecycleSupervisor();
+    }
+
+    /**
+     * get taskName
+     * 
+     * @return the taskName
+     */
+    public String getTaskName() {
+        return taskName;
+    }
+
+    /**
+     * start
+     */
+    public void start() {
+        SortTaskConfig config = SortClusterConfigHolder.getTaskConfig(taskName);
+        if (config == null) {
+            return;
+        }
+
+        //
+        Map<String, String> flumeConfiguration = config.generateFlumeConfiguration();
+        LOG.info("Start sort task:{},config:{}", taskName, flumeConfiguration);
+        PropertiesConfigurationProvider configurationProvider = new PropertiesConfigurationProvider(
+                config.getName(), flumeConfiguration);
+        this.handleConfigurationEvent(configurationProvider.getConfiguration());
+    }
+
+    /**
+     * handleConfigurationEvent
+     * 
+     * @param conf
+     */
+    @Subscribe
+    public void handleConfigurationEvent(MaterializedConfiguration conf) {
+        try {
+            lifecycleLock.lockInterruptibly();
+            stopAllComponents();
+            startAllComponents(conf);
+        } catch (InterruptedException e) {
+            LOG.info("Interrupted while trying to handle configuration event");
+            return;
+        } finally {
+            // If interrupted while trying to lock, we don't own the lock, so must not attempt to unlock
+            if (lifecycleLock.isHeldByCurrentThread()) {
+                lifecycleLock.unlock();
+            }
+        }
+    }
+
+    /**
+     * stop
+     */
+    public void stop() {
+        lifecycleLock.lock();
+        stopAllComponents();
+        try {
+            supervisor.stop();
+        } finally {
+            lifecycleLock.unlock();
+        }
+    }
+
+    /**
+     * stopAllComponents
+     */
+    private void stopAllComponents() {
+        if (this.materializedConfiguration != null) {
+            LOG.info("Shutting down configuration: {}", this.materializedConfiguration);
+            for (Entry<String, SourceRunner> entry : this.materializedConfiguration.getSourceRunners().entrySet()) {
+                try {
+                    LOG.info("Stopping Source " + entry.getKey());
+                    supervisor.unsupervise(entry.getValue());
+                } catch (Exception e) {
+                    LOG.error("Error while stopping {}", entry.getValue(), e);
+                }
+            }
+
+            for (Entry<String, SinkRunner> entry : this.materializedConfiguration.getSinkRunners().entrySet()) {
+                try {
+                    LOG.info("Stopping Sink " + entry.getKey());
+                    supervisor.unsupervise(entry.getValue());
+                } catch (Exception e) {
+                    LOG.error("Error while stopping {}", entry.getValue(), e);
+                }
+            }
+
+            for (Entry<String, Channel> entry : this.materializedConfiguration.getChannels().entrySet()) {
+                try {
+                    LOG.info("Stopping Channel " + entry.getKey());
+                    supervisor.unsupervise(entry.getValue());
+                } catch (Exception e) {
+                    LOG.error("Error while stopping {}", entry.getValue(), e);
+                }
+            }
+        }
+    }
+
+    /**
+     * startAllComponents
+     * 
+     * @param materializedConfiguration
+     */
+    private void startAllComponents(MaterializedConfiguration materializedConfiguration) {
+        LOG.info("Starting new configuration:{}", materializedConfiguration);
+
+        this.materializedConfiguration = materializedConfiguration;
+
+        for (Entry<String, Channel> entry : materializedConfiguration.getChannels().entrySet()) {
+            try {
+                LOG.info("Starting Channel " + entry.getKey());
+                supervisor.supervise(entry.getValue(),
+                        new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
+            } catch (Exception e) {
+                LOG.error("Error while starting {}", entry.getValue(), e);
+            }
+        }
+
+        /*
+         * Wait for all channels to start.
+         */
+        for (Channel ch : materializedConfiguration.getChannels().values()) {
+            while (ch.getLifecycleState() != LifecycleState.START
+                    && !supervisor.isComponentInErrorState(ch)) {
+                try {
+                    LOG.info("Waiting for channel: " + ch.getName() + " to start. Sleeping for 500 ms");
+                    Thread.sleep(500);
+                } catch (InterruptedException e) {
+                    LOG.error("Interrupted while waiting for channel to start.", e);
+                }
+            }
+        }
+
+        for (Entry<String, SinkRunner> entry : materializedConfiguration.getSinkRunners().entrySet()) {
+            try {
+                LOG.info("Starting Sink " + entry.getKey());
+                supervisor.supervise(entry.getValue(),
+                        new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
+            } catch (Exception e) {
+                LOG.error("Error while starting {}", entry.getValue(), e);
+            }
+        }
+
+        for (Entry<String, SourceRunner> entry : materializedConfiguration.getSourceRunners().entrySet()) {
+            try {
+                LOG.info("Starting Source " + entry.getKey());
+                supervisor.supervise(entry.getValue(),
+                        new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
+            } catch (Exception e) {
+                LOG.error("Error while starting {}", entry.getValue(), e);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/BufferQueueChannel.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/BufferQueueChannel.java
new file mode 100644
index 0000000..b15a396
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/BufferQueueChannel.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.channel;
+
+import java.util.Date;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.AbstractChannel;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.utils.BufferQueue;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.apache.inlong.sort.standalone.utils.SizeSemaphore;
+import org.slf4j.Logger;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * 
+ * BufferQueueChannel
+ */
+public class BufferQueueChannel extends AbstractChannel {
+
+    public static final Logger LOG = InlongLoggerFactory.getLogger(BufferQueueChannel.class);
+
+    public static final String KEY_MAX_BUFFERQUEUE_SIZE_KB = "maxBufferQueueSizeKb";
+    public static final String KEY_RELOADINTERVAL = "reloadInterval";
+    public static final int DEFAULT_MAX_BUFFERQUEUE_SIZE_KB = 128 * 1024;
+
+    // global buffer size
+    private static SizeSemaphore globalBufferQueueSizeKb;
+    private BufferQueue<ProfileEvent> bufferQueue;
+    private ThreadLocal<ProfileTransaction> currentTransaction = new ThreadLocal<ProfileTransaction>();
+    protected Timer channelTimer;
+    private AtomicLong takeCounter = new AtomicLong(0);
+    private AtomicLong putCounter = new AtomicLong(0);
+
+    /**
+     * Constructor
+     */
+    public BufferQueueChannel() {
+        Context context = CommonPropertiesHolder.getContext();
+        SizeSemaphore globalSize = getGlobalBufferQueueSizeKb(context);
+        this.bufferQueue = new BufferQueue<>(globalSize.maxSize() / 3, globalSize);
+    }
+
+    /**
+     * put
+     * 
+     * @param  event
+     * @throws ChannelException
+     */
+    @Override
+    public void put(Event event) throws ChannelException {
+        putCounter.incrementAndGet();
+        int eventSize = event.getBody().length;
+        this.bufferQueue.acquire(eventSize);
+        ProfileTransaction transaction = currentTransaction.get();
+        Preconditions.checkState(transaction != null, "No transaction exists for this thread");
+        if (event instanceof ProfileEvent) {
+            ProfileEvent profile = (ProfileEvent) event;
+            transaction.doPut(profile);
+        } else {
+            ProfileEvent profile = new ProfileEvent(event.getBody(), event.getHeaders());
+            transaction.doPut(profile);
+        }
+    }
+
+    /**
+     * take
+     * 
+     * @return                  Event
+     * @throws ChannelException
+     */
+    @Override
+    public Event take() throws ChannelException {
+        ProfileEvent event = this.bufferQueue.pollRecord();
+        if (event != null) {
+            ProfileTransaction transaction = currentTransaction.get();
+            Preconditions.checkState(transaction != null, "No transaction exists for this thread");
+            transaction.doTake(event);
+            takeCounter.incrementAndGet();
+        }
+        return event;
+    }
+
+    /**
+     * getTransaction
+     * 
+     * @return
+     */
+    @Override
+    public Transaction getTransaction() {
+        ProfileTransaction newTransaction = new ProfileTransaction(this.bufferQueue);
+        this.currentTransaction.set(newTransaction);
+        return newTransaction;
+    }
+
+    /**
+     * start
+     */
+    @Override
+    public void start() {
+        super.start();
+        try {
+            this.setReloadTimer();
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * setReloadTimer
+     */
+    protected void setReloadTimer() {
+        channelTimer = new Timer(true);
+        long reloadInterval = CommonPropertiesHolder.getLong(KEY_RELOADINTERVAL, 60000L);
+        TimerTask channelTask = new TimerTask() {
+
+            public void run() {
+                LOG.info("queueSize:{},availablePermits:{},put:{},take:{}",
+                        bufferQueue.size(),
+                        bufferQueue.availablePermits(),
+                        putCounter.getAndSet(0),
+                        takeCounter.getAndSet(0));
+            }
+        };
+        channelTimer.schedule(channelTask,
+                new Date(System.currentTimeMillis() + reloadInterval),
+                reloadInterval);
+    }
+
+    /**
+     * configure
+     * 
+     * @param context
+     */
+    @Override
+    public void configure(Context context) {
+    }
+
+    /**
+     * getGlobalBufferQueueSizeKb
+     *
+     * @return
+     */
+    public static SizeSemaphore getGlobalBufferQueueSizeKb(Context context) {
+        if (globalBufferQueueSizeKb != null) {
+            return globalBufferQueueSizeKb;
+        }
+        synchronized (LOG) {
+            if (globalBufferQueueSizeKb != null) {
+                return globalBufferQueueSizeKb;
+            }
+            int maxBufferQueueSizeKb = context.getInteger(KEY_MAX_BUFFERQUEUE_SIZE_KB, DEFAULT_MAX_BUFFERQUEUE_SIZE_KB);
+            globalBufferQueueSizeKb = new SizeSemaphore(maxBufferQueueSizeKb, SizeSemaphore.ONEKB);
+            return globalBufferQueueSizeKb;
+        }
+    }
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileEvent.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileEvent.java
new file mode 100644
index 0000000..153cc25
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileEvent.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.channel;
+
+import java.util.Map;
+
+import org.apache.commons.lang.math.NumberUtils;
+import org.apache.flume.event.SimpleEvent;
+import org.apache.inlong.sort.standalone.config.pojo.InlongId;
+import org.apache.inlong.sort.standalone.utils.Constants;
+
+/**
+ * 
+ * ProfileEvent
+ */
+public class ProfileEvent extends SimpleEvent {
+
+    private final String inlongGroupId;
+    private final String inlongStreamId;
+    private final String uid;
+
+    private final long rawLogTime;
+    private final long fetchTime;
+    private long sendTime;
+
+    /**
+     * Constructor
+     * 
+     * @param body
+     * @param headers
+     */
+    public ProfileEvent(byte[] body, Map<String, String> headers) {
+        super.setBody(body);
+        super.setHeaders(headers);
+        this.inlongGroupId = headers.get(Constants.INLONG_GROUP_ID);
+        this.inlongStreamId = headers.get(Constants.INLONG_STREAM_ID);
+        this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
+        this.fetchTime = System.currentTimeMillis();
+        this.sendTime = fetchTime;
+        this.rawLogTime = NumberUtils.toLong(headers.get(Constants.HEADER_KEY_MSG_TIME), fetchTime);
+    }
+
+    /**
+     * get sendTime
+     * 
+     * @return the sendTime
+     */
+    public long getSendTime() {
+        return sendTime;
+    }
+
+    /**
+     * set sendTime
+     * 
+     * @param sendTime the sendTime to set
+     */
+    public void setSendTime(long sendTime) {
+        this.sendTime = sendTime;
+    }
+
+    /**
+     * get inlongGroupId
+     * 
+     * @return the inlongGroupId
+     */
+    public String getInlongGroupId() {
+        return inlongGroupId;
+    }
+
+    /**
+     * get inlongStreamId
+     * 
+     * @return the inlongStreamId
+     */
+    public String getInlongStreamId() {
+        return inlongStreamId;
+    }
+
+    /**
+     * get rawLogTime
+     * 
+     * @return the rawLogTime
+     */
+    public long getRawLogTime() {
+        return rawLogTime;
+    }
+
+    /**
+     * get fetchTime
+     * 
+     * @return the fetchTime
+     */
+    public long getFetchTime() {
+        return fetchTime;
+    }
+
+    /**
+     * get uid
+     * 
+     * @return the uid
+     */
+    public String getUid() {
+        return uid;
+    }
+
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileTransaction.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileTransaction.java
new file mode 100644
index 0000000..a885224
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileTransaction.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.channel;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flume.Transaction;
+import org.apache.inlong.sort.standalone.utils.BufferQueue;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ * 
+ * ProfileTransaction
+ */
+public class ProfileTransaction implements Transaction {
+
+    public static final Logger LOG = InlongLoggerFactory.getLogger(ProfileTransaction.class);
+
+    private BufferQueue<ProfileEvent> bufferQueue;
+    private List<ProfileEvent> takeList = new ArrayList<>();
+    private List<ProfileEvent> putList = new ArrayList<>();
+
+    /**
+     * Constructor
+     * 
+     * @param bufferQueue
+     */
+    public ProfileTransaction(BufferQueue<ProfileEvent> bufferQueue) {
+        this.bufferQueue = bufferQueue;
+    }
+
+    /**
+     * begin
+     */
+    @Override
+    public void begin() {
+    }
+
+    /**
+     * commit
+     */
+    @Override
+    public void commit() {
+        for (ProfileEvent event : takeList) {
+            bufferQueue.release(event.getBody().length);
+        }
+        this.takeList.clear();
+        for (ProfileEvent event : putList) {
+            this.bufferQueue.offer(event);
+        }
+        this.putList.clear();
+    }
+
+    /**
+     * rollback
+     */
+    @Override
+    public void rollback() {
+        for (ProfileEvent event : takeList) {
+            this.bufferQueue.offer(event);
+        }
+        this.takeList.clear();
+        for (ProfileEvent event : putList) {
+            bufferQueue.release(event.getBody().length);
+        }
+        this.putList.clear();
+    }
+
+    /**
+     * close
+     */
+    @Override
+    public void close() {
+    }
+
+    /**
+     * doTake
+     * 
+     * @param event
+     */
+    public void doTake(ProfileEvent event) {
+        this.takeList.add(event);
+    }
+
+    /**
+     * doPut
+     * 
+     * @param event
+     */
+    public void doPut(ProfileEvent event) {
+        this.putList.add(event);
+    }
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/dispatch/DispatchManager.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/dispatch/DispatchManager.java
new file mode 100644
index 0000000..3d9a326
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/dispatch/DispatchManager.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.dispatch;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.flume.Context;
+import org.apache.inlong.sort.standalone.channel.BufferQueueChannel;
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ * DispatchManager
+ */
+public class DispatchManager {
+
+    public static final Logger LOG = InlongLoggerFactory.getLogger(BufferQueueChannel.class);
+    public static final String KEY_DISPATCH_TIMEOUT = "dispatchTimeout";
+    public static final String KEY_DISPATCH_MAX_PACKCOUNT = "dispatchMaxPackCount";
+    public static final String KEY_DISPATCH_MAX_PACKSIZE = "dispatchMaxPackSize";
+    public static final long DEFAULT_DISPATCH_TIMEOUT = 2000;
+    public static final long DEFAULT_DISPATCH_MAX_PACKCOUNT = 256;
+    public static final long DEFAULT_DISPATCH_MAX_PACKSIZE = 327680;
+
+    private LinkedBlockingQueue<DispatchProfile> dispatchQueue;
+    private final long dispatchTimeout;
+    private final long maxPackCount;
+    private final long maxPackSize;
+    private ConcurrentHashMap<String, DispatchProfile> profileCache = new ConcurrentHashMap<>();
+    //
+    private AtomicBoolean needOutputOvertimeData = new AtomicBoolean(false);
+
+    /**
+     * Constructor
+     * 
+     * @param context
+     * @param dispatchQueue
+     */
+    public DispatchManager(Context context, LinkedBlockingQueue<DispatchProfile> dispatchQueue) {
+        this.dispatchQueue = dispatchQueue;
+        this.dispatchTimeout = context.getLong(KEY_DISPATCH_TIMEOUT, DEFAULT_DISPATCH_TIMEOUT);
+        this.maxPackCount = context.getLong(KEY_DISPATCH_MAX_PACKCOUNT, DEFAULT_DISPATCH_MAX_PACKCOUNT);
+        this.maxPackSize = context.getLong(KEY_DISPATCH_MAX_PACKSIZE, DEFAULT_DISPATCH_MAX_PACKSIZE);
+    }
+
+    /**
+     * addEvent
+     * 
+     * @param event
+     */
+    public void addEvent(ProfileEvent event) {
+        if (needOutputOvertimeData.get()) {
+            this.outputOvertimeData();
+            this.needOutputOvertimeData.set(false);
+        }
+        // parse
+        String uid = event.getUid();
+        //
+        DispatchProfile dispatchProfile = this.profileCache.get(uid);
+        if (dispatchProfile == null) {
+            dispatchProfile = new DispatchProfile(uid, event.getInlongGroupId(), event.getInlongStreamId());
+            this.profileCache.put(uid, dispatchProfile);
+        }
+        //
+        boolean addResult = dispatchProfile.addEvent(event, maxPackCount, maxPackSize);
+        if (!addResult) {
+            DispatchProfile newDispatchProfile = new DispatchProfile(uid, event.getInlongGroupId(),
+                    event.getInlongStreamId());
+            DispatchProfile oldDispatchProfile = this.profileCache.put(uid, newDispatchProfile);
+            this.dispatchQueue.offer(oldDispatchProfile);
+            newDispatchProfile.addEvent(event, maxPackCount, maxPackSize);
+        }
+    }
+
+    /**
+     * outputOvertimeData
+     * 
+     * @return
+     */
+    public void outputOvertimeData() {
+        LOG.info("start to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{}",
+                profileCache.size(), dispatchQueue.size());
+        long currentTime = System.currentTimeMillis();
+        long createThreshold = currentTime - dispatchTimeout;
+        List<String> removeKeys = new ArrayList<>();
+        long eventCount = 0;
+        for (Entry<String, DispatchProfile> entry : this.profileCache.entrySet()) {
+            DispatchProfile dispatchProfile = entry.getValue();
+            eventCount += dispatchProfile.getCount();
+            if (!dispatchProfile.isTimeout(createThreshold)) {
+                continue;
+            }
+            removeKeys.add(entry.getKey());
+        }
+        // output
+        removeKeys.forEach((key) -> {
+            dispatchQueue.offer(this.profileCache.remove(key));
+        });
+        LOG.info("end to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{},eventCount:{}",
+                profileCache.size(), dispatchQueue.size(), eventCount);
+    }
+
+    /**
+     * get dispatchTimeout
+     * 
+     * @return the dispatchTimeout
+     */
+    public long getDispatchTimeout() {
+        return dispatchTimeout;
+    }
+
+    /**
+     * get maxPackCount
+     * 
+     * @return the maxPackCount
+     */
+    public long getMaxPackCount() {
+        return maxPackCount;
+    }
+
+    /**
+     * get maxPackSize
+     * 
+     * @return the maxPackSize
+     */
+    public long getMaxPackSize() {
+        return maxPackSize;
+    }
+
+    /**
+     * setNeedOutputOvertimeData
+     */
+    public void setNeedOutputOvertimeData() {
+        this.needOutputOvertimeData.set(true);
+    }
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/dispatch/DispatchProfile.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/dispatch/DispatchProfile.java
new file mode 100644
index 0000000..2a5d5c6
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/dispatch/DispatchProfile.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.dispatch;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+
+/**
+ * 
+ * DispatchProfile
+ */
+public class DispatchProfile {
+
+    private final String inlongGroupId;
+    private final String inlongStreamId;
+    private final String uid;
+    private List<ProfileEvent> events = new ArrayList<>();
+    private long createTime = System.currentTimeMillis();
+    private long count = 0;
+    private long size = 0;
+    private long minEventTime = System.currentTimeMillis();
+
+    /**
+     * Constructor
+     * 
+     * @param uid
+     * @param inlongGroupId
+     * @param inlongStreamId
+     */
+    public DispatchProfile(String uid, String inlongGroupId, String inlongStreamId) {
+        this.uid = uid;
+        this.inlongGroupId = inlongGroupId;
+        this.inlongStreamId = inlongStreamId;
+    }
+
+    /**
+     * addEvent
+     * 
+     * @param  event
+     * @param  maxPackCount
+     * @param  maxPackSize
+     * @return
+     */
+    public boolean addEvent(ProfileEvent event, long maxPackCount, long maxPackSize) {
+        long eventLength = event.getBody().length;
+        if (count >= maxPackCount || (count > 0 && size + eventLength > maxPackSize)) {
+            return false;
+        }
+        this.events.add(event);
+        this.count++;
+        this.size += eventLength;
+        this.minEventTime = Math.min(minEventTime, event.getRawLogTime());
+        return true;
+    }
+
+    /**
+     * isTimeout
+     * 
+     * @param  createThreshold
+     * @return
+     */
+    public boolean isTimeout(long createThreshold) {
+        return createThreshold >= createTime;
+    }
+
+    /**
+     * get uid
+     * 
+     * @return the uid
+     */
+    public String getUid() {
+        return uid;
+    }
+
+    /**
+     * get events
+     * 
+     * @return the events
+     */
+    public List<ProfileEvent> getEvents() {
+        return events;
+    }
+
+    /**
+     * set events
+     * 
+     * @param events the events to set
+     */
+    public void setEvents(List<ProfileEvent> events) {
+        this.events = events;
+    }
+
+    /**
+     * get count
+     * 
+     * @return the count
+     */
+    public long getCount() {
+        return count;
+    }
+
+    /**
+     * set count
+     * 
+     * @param count the count to set
+     */
+    public void setCount(long count) {
+        this.count = count;
+    }
+
+    /**
+     * get size
+     * 
+     * @return the size
+     */
+    public long getSize() {
+        return size;
+    }
+
+    /**
+     * set size
+     * 
+     * @param size the size to set
+     */
+    public void setSize(long size) {
+        this.size = size;
+    }
+
+    /**
+     * get minEventTime
+     * 
+     * @return the minEventTime
+     */
+    public long getMinEventTime() {
+        return minEventTime;
+    }
+
+    /**
+     * get inlongGroupId
+     * 
+     * @return the inlongGroupId
+     */
+    public String getInlongGroupId() {
+        return inlongGroupId;
+    }
+
+    /**
+     * get inlongStreamId
+     * 
+     * @return the inlongStreamId
+     */
+    public String getInlongStreamId() {
+        return inlongStreamId;
+    }
+
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
new file mode 100644
index 0000000..8dac973
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink;
+
+import java.util.Date;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.inlong.commons.config.metrics.MetricRegister;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
+import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
+import org.apache.inlong.sort.standalone.metrics.SortMetricItemSet;
+import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ * 
+ * SinkContext
+ */
+public class SinkContext {
+
+    public static final Logger LOG = InlongLoggerFactory.getLogger(SinkContext.class);
+
+    public static final String KEY_MAX_THREADS = "maxThreads";
+    public static final String KEY_PROCESSINTERVAL = "processInterval";
+    public static final String KEY_RELOADINTERVAL = "reloadInterval";
+
+    protected final String clusterId;
+    protected final String taskName;
+    protected final String sinkName;
+    protected final Context sinkContext;
+
+    protected SortTaskConfig sortTaskConfig;
+
+    protected final Channel channel;
+    //
+    protected final int maxThreads;
+    protected final long processInterval;
+    protected final long reloadInterval;
+    //
+    protected final SortMetricItemSet metricItemSet;
+    protected Timer reloadTimer;
+
+    /**
+     * Constructor
+     * 
+     * @param sinkName
+     * @param context
+     * @param channel
+     */
+    public SinkContext(String sinkName, Context context, Channel channel) {
+        this.sinkName = sinkName;
+        this.sinkContext = context;
+        this.channel = channel;
+        this.clusterId = context.getString(CommonPropertiesHolder.KEY_CLUSTER_ID);
+        this.taskName = context.getString(SortTaskConfig.KEY_TASK_NAME);
+        this.maxThreads = sinkContext.getInteger(KEY_MAX_THREADS, 10);
+        this.processInterval = sinkContext.getInteger(KEY_PROCESSINTERVAL, 100);
+        this.reloadInterval = sinkContext.getLong(KEY_RELOADINTERVAL, 60000L);
+        //
+        this.metricItemSet = new SortMetricItemSet(sinkName);
+        MetricRegister.register(this.metricItemSet);
+    }
+
+    /**
+     * start
+     */
+    public void start() {
+        try {
+            this.reload();
+            this.setReloadTimer();
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * close
+     */
+    public void close() {
+        try {
+            this.reloadTimer.cancel();
+        } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * setReloadTimer
+     */
+    protected void setReloadTimer() {
+        reloadTimer = new Timer(true);
+        TimerTask task = new TimerTask() {
+
+            public void run() {
+                reload();
+            }
+        };
+        reloadTimer.schedule(task, new Date(System.currentTimeMillis() + reloadInterval), reloadInterval);
+    }
+
+    /**
+     * reload
+     */
+    public void reload() {
+        try {
+            this.sortTaskConfig = SortClusterConfigHolder.getTaskConfig(taskName);
+        } catch (Throwable e) {
+            LOG.error(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * get clusterId
+     * 
+     * @return the clusterId
+     */
+    public String getClusterId() {
+        return clusterId;
+    }
+
+    /**
+     * get taskName
+     * 
+     * @return the taskName
+     */
+    public String getTaskName() {
+        return taskName;
+    }
+
+    /**
+     * get sinkName
+     * 
+     * @return the sinkName
+     */
+    public String getSinkName() {
+        return sinkName;
+    }
+
+    /**
+     * get sinkContext
+     * 
+     * @return the sinkContext
+     */
+    public Context getSinkContext() {
+        return sinkContext;
+    }
+
+    /**
+     * get sortTaskConfig
+     * 
+     * @return the sortTaskConfig
+     */
+    public SortTaskConfig getSortTaskConfig() {
+        return sortTaskConfig;
+    }
+
+    /**
+     * get channel
+     * 
+     * @return the channel
+     */
+    public Channel getChannel() {
+        return channel;
+    }
+
+    /**
+     * get maxThreads
+     * 
+     * @return the maxThreads
+     */
+    public int getMaxThreads() {
+        return maxThreads;
+    }
+
+    /**
+     * get processInterval
+     * 
+     * @return the processInterval
+     */
+    public long getProcessInterval() {
+        return processInterval;
+    }
+
+    /**
+     * get reloadInterval
+     * 
+     * @return the reloadInterval
+     */
+    public long getReloadInterval() {
+        return reloadInterval;
+    }
+
+    /**
+     * get metricItemSet
+     * 
+     * @return the metricItemSet
+     */
+    public SortMetricItemSet getMetricItemSet() {
+        return metricItemSet;
+    }
+}