You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/02/10 07:22:04 UTC

[incubator-inlong] branch master updated: [INLONG-2376] add sdk-common module for supporting PB compression protocol format based on Protobuffer protocol. (#2392)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new fc43f6e  [INLONG-2376] add sdk-common module for supporting PB compression protocol format based on Protobuffer protocol. (#2392)
fc43f6e is described below

commit fc43f6e0533cd20a4985a074c76a04cdae141312
Author: 卢春亮 <94...@qq.com>
AuthorDate: Thu Feb 10 15:20:54 2022 +0800

    [INLONG-2376] add sdk-common module for supporting PB compression protocol format based on Protobuffer protocol. (#2392)
---
 inlong-sdk/pom.xml                                 | 102 +++++++++-
 inlong-sdk/sdk-common/pom.xml                      |  74 +++++++
 .../sdk/commons/protocol/EventConstants.java       |  46 +++++
 .../inlong/sdk/commons/protocol/EventUtils.java    | 215 +++++++++++++++++++++
 .../inlong/sdk/commons/protocol/InlongId.java      |  50 +++++
 .../inlong/sdk/commons/protocol/ProxyEvent.java    | 131 +++++++++++++
 .../inlong/sdk/commons/protocol/SdkEvent.java      | 166 ++++++++++++++++
 .../inlong/sdk/commons/protocol/SortEvent.java     | 117 +++++++++++
 .../apache/inlong/sdk/commons/utils/GzipUtils.java |  92 +++++++++
 .../sdk-common/src/main/proto/ProxySdk.proto       |  73 +++++++
 .../sdk/commons/protocol/TestEventUtils.java       | 104 ++++++++++
 11 files changed, 1164 insertions(+), 6 deletions(-)

diff --git a/inlong-sdk/pom.xml b/inlong-sdk/pom.xml
index 52f554e..274ce23 100644
--- a/inlong-sdk/pom.xml
+++ b/inlong-sdk/pom.xml
@@ -17,35 +17,125 @@
     specific language governing permissions and limitations
     under the License.
 -->
-<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-        xmlns="http://maven.apache.org/POM/4.0.0"
-        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
     <parent>
-        <artifactId>inlong</artifactId>
         <groupId>org.apache.inlong</groupId>
+        <artifactId>inlong</artifactId>
         <version>1.1.0-incubating-SNAPSHOT</version>
     </parent>
-    <modelVersion>4.0.0</modelVersion>
     <packaging>pom</packaging>
     <artifactId>inlong-sdk</artifactId>
     <name>Apache InLong - SDK</name>
 
     <modules>
+        <module>sdk-common</module>
         <module>sort-sdk</module>
         <module>dataproxy-sdk</module>
     </modules>
 
     <properties>
+        <compiler.source>1.8</compiler.source>
+        <compiler.target>1.8</compiler.target>
+        <flume.version>1.9.0</flume.version>
+        <plugin.assembly.version>3.2.0</plugin.assembly.version>
+        <netty.version>3.8.0.Final</netty.version>
+        <codec.version>1.15</codec.version>
+        <junit.version>4.13</junit.version>
+        <powermock.version>2.0.2</powermock.version>
+        <guava.version>19.0</guava.version>
+        <skipTests>false</skipTests>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <maven.compiler.source>1.8</maven.compiler.source>
         <maven.compiler.target>1.8</maven.compiler.target>
+        <protobuf-version>2.5.0</protobuf-version>
+        <snappy-version>1.1.8.4</snappy-version>
+        <fastjson.version>1.2.79</fastjson.version>
+        <log4j.version>2.17.1</log4j.version>
+        <mockito.version>2.23.0</mockito.version>
     </properties>
 
     <dependencies>
         <dependency>
+            <groupId>org.apache.flume</groupId>
+            <artifactId>flume-ng-core</artifactId>
+            <version>${flume.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flume</groupId>
+            <artifactId>flume-ng-node</artifactId>
+            <version>${flume.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flume</groupId>
+            <artifactId>flume-ng-sdk</artifactId>
+            <version>${flume.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flume</groupId>
+            <artifactId>flume-ng-configuration</artifactId>
+            <version>${flume.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty</artifactId>
+            <version>${netty.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-codec</groupId>
+            <artifactId>commons-codec</artifactId>
+            <version>${codec.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+            <version>${protobuf-version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.xerial.snappy</groupId>
+            <artifactId>snappy-java</artifactId>
+            <version>${snappy-version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>${fastjson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-slf4j-impl</artifactId>
+            <version>${log4j.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${guava.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-junit4</artifactId>
+            <version>${powermock.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-api-mockito2</artifactId>
+            <version>${powermock.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <artifactId>mockito-core</artifactId>
+            <groupId>org.mockito</groupId>
+            <scope>test</scope>
+            <version>${mockito.version}</version>
+        </dependency>
+        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
-            <version>4.11</version>
+            <version>${junit.version}</version>
             <scope>test</scope>
         </dependency>
     </dependencies>
diff --git a/inlong-sdk/sdk-common/pom.xml b/inlong-sdk/sdk-common/pom.xml
new file mode 100644
index 0000000..1d64468
--- /dev/null
+++ b/inlong-sdk/sdk-common/pom.xml
@@ -0,0 +1,74 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<parent>
+		<groupId>org.apache.inlong</groupId>
+		<artifactId>inlong-sdk</artifactId>
+		<version>1.1.0-incubating-SNAPSHOT</version>
+	</parent>
+	<modelVersion>4.0.0</modelVersion>
+	<groupId>org.apache.inlong</groupId>
+	<artifactId>sdk-common</artifactId>
+	<name>Apache InLong - SDK - Common</name>
+
+
+	<properties>
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+		<maven.compiler.source>8</maven.compiler.source>
+		<maven.compiler.target>8</maven.compiler.target>
+		<compiler.source>1.8</compiler.source>
+		<compiler.target>1.8</compiler.target>
+	</properties>
+
+	<dependencies>
+	</dependencies>
+
+	<build>
+		<extensions>
+			<extension>
+				<groupId>kr.motd.maven</groupId>
+				<artifactId>os-maven-plugin</artifactId>
+				<version>1.5.0.Final</version>
+			</extension>
+		</extensions>
+		<plugins>
+			<plugin>
+				<groupId>org.xolstice.maven.plugins</groupId>
+				<artifactId>protobuf-maven-plugin</artifactId>
+				<version>0.5.1</version>
+				<extensions>true</extensions>
+				<configuration>
+					<protoSourceRoot>${project.basedir}/src/main/proto</protoSourceRoot>
+					<protocArtifact>com.google.protobuf:protoc:2.5.0:exe:${os.detected.classifier}</protocArtifact>
+				</configuration>
+				<executions>
+					<execution>
+						<goals>
+							<goal>compile</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>
diff --git a/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/EventConstants.java b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/EventConstants.java
new file mode 100644
index 0000000..f3519b9
--- /dev/null
+++ b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/EventConstants.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.commons.protocol;
+
+/**
+ * 
+ * EventConstants
+ */
+public interface EventConstants {
+    String HEADER_KEY_VERSION = "version";
+    String HEADER_SDK_VERSION_1 = "1";
+    String HEADER_CACHE_VERSION_1 = "1";
+    // sdk
+    String INLONG_GROUP_ID = "inlongGroupId";
+    String INLONG_STREAM_ID = "inlongStreamId";
+    String HEADER_KEY_MSG_TIME = "msgTime";
+    String HEADER_KEY_SOURCE_IP = "sourceIp";
+    // proxy
+    String HEADER_KEY_SOURCE_TIME = "sourceTime";
+    String TOPIC = "topic";
+    String HEADER_KEY_PROXY_NAME = "proxyName";
+    String HEADER_KEY_PACK_TIME = "packTime";
+    String HEADER_KEY_MSG_COUNT = "msgCount";
+    String HEADER_KEY_SRC_LENGTH = "srcLength";
+    String HEADER_KEY_COMPRESS_TYPE = "compressType";
+    // sort
+    String HEADER_KEY_MESSAGE_KEY = "messageKey";
+    String HEADER_KEY_MSG_OFFSET = "msgOffset";
+    // other
+    String RELOAD_INTERVAL = "reloadInterval";
+}
diff --git a/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/EventUtils.java b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/EventUtils.java
new file mode 100644
index 0000000..e7c1be9
--- /dev/null
+++ b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/EventUtils.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.commons.protocol;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.inlong.sdk.commons.protocol.ProxySdk.INLONG_COMPRESSED_TYPE;
+import org.apache.inlong.sdk.commons.protocol.ProxySdk.MapFieldEntry;
+import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObj;
+import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObjs;
+import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessagePack;
+import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessagePackHeader;
+import org.apache.inlong.sdk.commons.utils.GzipUtils;
+import org.xerial.snappy.Snappy;
+
+import com.google.protobuf.ByteString;
+
+/**
+ * EventUtils
+ */
+public class EventUtils {
+
+    /**
+     * encode
+     * 
+     * @param  inlongGroupId
+     * @param  inlongStreamId
+     * @param  compressedType
+     * @param  events
+     * @return MessagePack
+     * @throws IOException
+     */
+    public static MessagePack encodeSdkEvents(String inlongGroupId, String inlongStreamId,
+            INLONG_COMPRESSED_TYPE compressedType, List<SdkEvent> events) throws IOException {
+        // MessageObjs
+        MessageObjs.Builder objsBuilder = MessageObjs.newBuilder();
+        for (SdkEvent event : events) {
+            MessageObj.Builder objBuilder = MessageObj.newBuilder();
+            objBuilder.setMsgTime(event.getMsgTime());
+            objBuilder.setSourceIp(event.getSourceIp());
+            objBuilder.setBody(ByteString.copyFrom(event.getBody()));
+            objsBuilder.addMsgs(objBuilder.build());
+        }
+        MessageObjs objs = objsBuilder.build();
+        byte[] srcBytes = objs.toByteArray();
+        byte[] compressedBytes = null;
+        switch (compressedType) {
+            case INLONG_SNAPPY :
+                compressedBytes = Snappy.compress(srcBytes);
+                break;
+            case INLONG_GZ :
+                compressedBytes = GzipUtils.compress(srcBytes);
+                break;
+            case INLONG_NO_COMPRESS :
+            default :
+                compressedBytes = srcBytes;
+                break;
+        }
+        // MessagePack
+        MessagePack.Builder packBuilder = MessagePack.newBuilder();
+        packBuilder.setCompressBytes(ByteString.copyFrom(compressedBytes));
+        // MessagePackHeader
+        MessagePackHeader.Builder headerBuilder = MessagePackHeader.newBuilder();
+        // string inlongGroupId = 1; //inlongGroupId
+        headerBuilder.setInlongGroupId(inlongGroupId);
+        // string inlongStreamId = 2; //inlongStreamId
+        headerBuilder.setInlongStreamId(inlongStreamId);
+        // int64 packTime = 3; //pack time, milliseconds
+        headerBuilder.setPackTime(System.currentTimeMillis());
+        // int32 msgCount = 4; //message count
+        headerBuilder.setMsgCount(events.size());
+        // int32 srcLength = 5; //total length of raw messages body
+        headerBuilder.setSrcLength(srcBytes.length);
+        // int32 compressLen = 6; //compress length of messages
+        headerBuilder.setCompressLen(compressedBytes.length);
+        // INLONG_COMPRESSED_TYPE compressType = 7; //compress type
+        headerBuilder.setCompressType(compressedType);
+        // map<string, string> params = 8; //additional parameters
+        packBuilder.setHeader(headerBuilder.build());
+        return packBuilder.build();
+    }
+
+    /**
+     * decodeSdkPack
+     * 
+     * @param  packBytes
+     * @param  offset
+     * @param  length
+     * @return List,ProxyEvent
+     * @throws IOException
+     */
+    public static List<ProxyEvent> decodeSdkPack(byte[] packBytes, int offset, int length) throws IOException {
+        MessagePack packObject = MessagePack.parseFrom(new ByteArrayInputStream(packBytes, offset, length));
+        MessagePackHeader header = packObject.getHeader();
+        // decompress
+        byte[] compressBytes = packObject.getCompressBytes().toByteArray();
+        byte[] srcBytes = null;
+        switch (header.getCompressType()) {
+            case INLONG_SNAPPY :
+                srcBytes = Snappy.uncompress(compressBytes);
+                break;
+            case INLONG_GZ :
+                srcBytes = GzipUtils.decompress(compressBytes);
+                break;
+            case INLONG_NO_COMPRESS :
+            default :
+                srcBytes = compressBytes;
+                break;
+        }
+        // decode
+        MessageObjs msgObjs = MessageObjs.parseFrom(srcBytes);
+        List<ProxyEvent> events = new ArrayList<>(msgObjs.getMsgsList().size());
+        String inlongGroupId = header.getInlongGroupId();
+        String inlongStreamId = header.getInlongStreamId();
+        for (MessageObj msgObj : msgObjs.getMsgsList()) {
+            ProxyEvent event = new ProxyEvent(inlongGroupId, inlongStreamId, msgObj);
+            events.add(event);
+        }
+        return events;
+    }
+
+    /**
+     * encodeCacheMessageBody
+     * 
+     * @param  compressedType
+     * @param  events
+     * @return byte array
+     * @throws IOException
+     */
+    public static byte[] encodeCacheMessageBody(INLONG_COMPRESSED_TYPE compressedType, List<ProxyEvent> events)
+            throws IOException {
+        // encode
+        MessageObjs.Builder objs = MessageObjs.newBuilder();
+        for (ProxyEvent event : events) {
+            MessageObj.Builder builder = MessageObj.newBuilder();
+            builder.setMsgTime(event.getMsgTime());
+            builder.setSourceIp(event.getSourceIp());
+            event.getHeaders().forEach((key, value) -> {
+                builder.addParams(MapFieldEntry.newBuilder().setKey(key).setValue(value));
+            });
+            builder.setBody(ByteString.copyFrom(event.getBody()));
+            objs.addMsgs(builder.build());
+        }
+        byte[] srcBytes = objs.build().toByteArray();
+        // compress
+        byte[] compressBytes = null;
+        switch (compressedType) {
+            case INLONG_SNAPPY :
+                compressBytes = Snappy.compress(srcBytes);
+                break;
+            case INLONG_GZ :
+                compressBytes = GzipUtils.compress(srcBytes);
+                break;
+            case INLONG_NO_COMPRESS :
+            default :
+                compressBytes = srcBytes;
+                break;
+        }
+        return compressBytes;
+    }
+
+    /**
+     * decodeCacheMessageBody
+     * 
+     * @param  inlongGroupId
+     * @param  inlongStreamId
+     * @param  compressedType
+     * @param  msgBody
+     * @return List,SortEvent
+     * @throws IOException
+     */
+    public static List<SortEvent> decodeCacheMessageBody(String inlongGroupId, String inlongStreamId,
+            INLONG_COMPRESSED_TYPE compressedType, byte[] msgBody) throws IOException {
+        // uncompress
+        byte[] srcBytes = null;
+        switch (compressedType) {
+            case INLONG_SNAPPY :
+                srcBytes = Snappy.uncompress(msgBody);
+                break;
+            case INLONG_GZ :
+                srcBytes = GzipUtils.decompress(msgBody);
+                break;
+            case INLONG_NO_COMPRESS :
+            default :
+                srcBytes = msgBody;
+                break;
+        }
+        // decode
+        MessageObjs msgObjs = MessageObjs.parseFrom(srcBytes);
+        List<SortEvent> events = new ArrayList<>(msgObjs.getMsgsList().size());
+        for (MessageObj msgObj : msgObjs.getMsgsList()) {
+            SortEvent event = new SortEvent(inlongGroupId, inlongStreamId, msgObj);
+            events.add(event);
+        }
+        return events;
+    }
+}
diff --git a/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/InlongId.java b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/InlongId.java
new file mode 100644
index 0000000..57831be
--- /dev/null
+++ b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/InlongId.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.commons.protocol;
+
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * 
+ * InlongId
+ */
+public class InlongId {
+
+    /**
+     * generateUid
+     * 
+     * @param  inlongGroupId
+     * @param  inlongStreamId
+     * @return
+     */
+    public static String generateUid(String inlongGroupId, String inlongStreamId) {
+        if (StringUtils.isBlank(inlongGroupId)) {
+            if (StringUtils.isBlank(inlongStreamId)) {
+                return "";
+            } else {
+                return inlongStreamId;
+            }
+        } else {
+            if (StringUtils.isBlank(inlongStreamId)) {
+                return inlongGroupId;
+            } else {
+                return inlongGroupId + "." + inlongStreamId;
+            }
+        }
+    }
+}
diff --git a/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyEvent.java b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyEvent.java
new file mode 100644
index 0000000..b38b7c0
--- /dev/null
+++ b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyEvent.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.commons.protocol;
+
+import java.util.Map;
+
+import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObj;
+
+/**
+ * 
+ * ProxyEvent
+ */
+/**
+ * 
+ * ProxyEvent
+ */
+public class ProxyEvent extends SdkEvent {
+
+    protected long sourceTime;
+    protected String topic;
+
+    /**
+     * Constructor
+     */
+    public ProxyEvent() {
+
+    }
+
+    /**
+     * Constructor
+     * 
+     * @param inlongGroupId
+     * @param inlongStreamId
+     * @param body
+     * @param msgTime
+     * @param sourceIp
+     */
+    public ProxyEvent(String inlongGroupId, String inlongStreamId, byte[] body, long msgTime, String sourceIp) {
+        this.inlongGroupId = inlongGroupId;
+        this.inlongStreamId = inlongStreamId;
+        super.setBody(body);
+        this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
+        this.msgTime = msgTime;
+        this.sourceIp = sourceIp;
+        Map<String, String> headers = super.getHeaders();
+        headers.put(EventConstants.INLONG_GROUP_ID, inlongGroupId);
+        headers.put(EventConstants.INLONG_STREAM_ID, inlongStreamId);
+        headers.put(EventConstants.HEADER_KEY_MSG_TIME, String.valueOf(msgTime));
+        headers.put(EventConstants.HEADER_KEY_SOURCE_IP, sourceIp);
+
+        this.sourceTime = System.currentTimeMillis();
+        this.getHeaders().put(EventConstants.HEADER_KEY_SOURCE_TIME, String.valueOf(sourceTime));
+    }
+
+    /**
+     * Constructor
+     * 
+     * @param inlongGroupId
+     * @param inlongStreamId
+     * @param obj
+     */
+    public ProxyEvent(String inlongGroupId, String inlongStreamId, MessageObj obj) {
+        this.inlongGroupId = inlongGroupId;
+        this.inlongStreamId = inlongStreamId;
+        super.setBody(obj.getBody().toByteArray());
+        this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
+        this.msgTime = obj.getMsgTime();
+        this.sourceIp = obj.getSourceIp();
+        Map<String, String> headers = super.getHeaders();
+        headers.put(EventConstants.INLONG_GROUP_ID, inlongGroupId);
+        headers.put(EventConstants.INLONG_STREAM_ID, inlongStreamId);
+        headers.put(EventConstants.HEADER_KEY_MSG_TIME, String.valueOf(msgTime));
+        headers.put(EventConstants.HEADER_KEY_SOURCE_IP, sourceIp);
+
+        this.sourceTime = System.currentTimeMillis();
+        this.getHeaders().put(EventConstants.HEADER_KEY_SOURCE_TIME, String.valueOf(sourceTime));
+    }
+
+    /**
+     * get sourceTime
+     * 
+     * @return the sourceTime
+     */
+    public long getSourceTime() {
+        return sourceTime;
+    }
+
+    /**
+     * get topic
+     * 
+     * @return the topic
+     */
+    public String getTopic() {
+        return topic;
+    }
+
+    /**
+     * set topic
+     * 
+     * @param topic the topic to set
+     */
+    public void setTopic(String topic) {
+        this.topic = topic;
+        this.getHeaders().put(EventConstants.TOPIC, topic);
+    }
+
+    /**
+     * set sourceTime
+     * 
+     * @param sourceTime the sourceTime to set
+     */
+    public void setSourceTime(long sourceTime) {
+        this.sourceTime = sourceTime;
+    }
+
+}
diff --git a/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/SdkEvent.java b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/SdkEvent.java
new file mode 100644
index 0000000..416e1c8
--- /dev/null
+++ b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/SdkEvent.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.commons.protocol;
+
+import java.nio.charset.Charset;
+import java.util.Map;
+
+import org.apache.flume.event.SimpleEvent;
+
+/**
+ * 
+ * SdkEvent
+ */
+public class SdkEvent extends SimpleEvent {
+
+    protected String inlongGroupId;
+    protected String inlongStreamId;
+    protected String uid;
+
+    protected long msgTime;
+    protected String sourceIp;
+
+    /**
+     * Constructor
+     */
+    public SdkEvent(){
+    }
+    
+    /**
+     * Constructor
+     * 
+     * @param inlongGroupId
+     * @param inlongStreamId
+     * @param body
+     */
+    public SdkEvent(String inlongGroupId, String inlongStreamId, byte[] body) {
+        this.inlongGroupId = inlongGroupId;
+        this.inlongStreamId = inlongStreamId;
+        super.setBody(body);
+        this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
+        this.msgTime = System.currentTimeMillis();
+        this.sourceIp = "127.0.0.1";
+        Map<String, String> headers = super.getHeaders();
+        headers.put(EventConstants.INLONG_GROUP_ID, inlongGroupId);
+        headers.put(EventConstants.INLONG_STREAM_ID, inlongStreamId);
+        headers.put(EventConstants.HEADER_KEY_MSG_TIME, String.valueOf(msgTime));
+    }
+
+    /**
+     * Constructor
+     * 
+     * @param inlongGroupId
+     * @param inlongStreamId
+     * @param body
+     */
+    public SdkEvent(String inlongGroupId, String inlongStreamId, String body) {
+        this(inlongGroupId, inlongStreamId, body.getBytes(Charset.defaultCharset()));
+    }
+
+    /**
+     * get sourceIp
+     * 
+     * @return the sourceIp
+     */
+    public String getSourceIp() {
+        return sourceIp;
+    }
+
+    /**
+     * set sourceIp
+     * 
+     * @param sourceIp the sourceIp to set
+     */
+    public void setSourceIp(String sourceIp) {
+        this.sourceIp = sourceIp;
+        this.getHeaders().put(EventConstants.HEADER_KEY_SOURCE_IP, sourceIp);
+    }
+
+    /**
+     * get inlongGroupId
+     * 
+     * @return the inlongGroupId
+     */
+    public String getInlongGroupId() {
+        return inlongGroupId;
+    }
+
+    /**
+     * get inlongStreamId
+     * 
+     * @return the inlongStreamId
+     */
+    public String getInlongStreamId() {
+        return inlongStreamId;
+    }
+
+    /**
+     * get uid
+     * 
+     * @return the uid
+     */
+    public String getUid() {
+        return uid;
+    }
+
+    /**
+     * get msgTime
+     * 
+     * @return the msgTime
+     */
+    public long getMsgTime() {
+        return msgTime;
+    }
+
+    /**
+     * set inlongGroupId
+     * 
+     * @param inlongGroupId the inlongGroupId to set
+     */
+    public void setInlongGroupId(String inlongGroupId) {
+        this.inlongGroupId = inlongGroupId;
+    }
+
+    /**
+     * set inlongStreamId
+     * 
+     * @param inlongStreamId the inlongStreamId to set
+     */
+    public void setInlongStreamId(String inlongStreamId) {
+        this.inlongStreamId = inlongStreamId;
+    }
+
+    /**
+     * set uid
+     * 
+     * @param uid the uid to set
+     */
+    public void setUid(String uid) {
+        this.uid = uid;
+    }
+
+    /**
+     * set msgTime
+     * 
+     * @param msgTime the msgTime to set
+     */
+    public void setMsgTime(long msgTime) {
+        this.msgTime = msgTime;
+    }
+
+}
diff --git a/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/SortEvent.java b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/SortEvent.java
new file mode 100644
index 0000000..5128035
--- /dev/null
+++ b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/SortEvent.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.commons.protocol;
+
+import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObj;
+
+/**
+ * 
+ * SortEvent
+ */
+public class SortEvent extends ProxyEvent {
+
+    protected String messageKey;
+    protected String messageOffset;
+    protected long sendTime;
+
+    /**
+     * Constructor
+     */
+    public SortEvent() {
+
+    }
+
+    /**
+     * Constructor
+     * 
+     * @param inlongGroupId
+     * @param inlongStreamId
+     * @param body
+     * @param msgTime
+     * @param sourceIp
+     */
+    public SortEvent(String inlongGroupId, String inlongStreamId, byte[] body, long msgTime, String sourceIp) {
+        super(inlongGroupId, inlongStreamId, body, msgTime, sourceIp);
+    }
+
+    /**
+     * Constructor
+     * 
+     * @param inlongGroupId
+     * @param inlongStreamId
+     * @param obj
+     */
+    public SortEvent(String inlongGroupId, String inlongStreamId, MessageObj obj) {
+        super(inlongGroupId, inlongStreamId, obj);
+    }
+
+    /**
+     * get messageKey
+     * 
+     * @return the messageKey
+     */
+    public String getMessageKey() {
+        return messageKey;
+    }
+
+    /**
+     * set messageKey
+     * 
+     * @param messageKey the messageKey to set
+     */
+    public void setMessageKey(String messageKey) {
+        this.messageKey = messageKey;
+    }
+
+    /**
+     * get messageOffset
+     * 
+     * @return the messageOffset
+     */
+    public String getMessageOffset() {
+        return messageOffset;
+    }
+
+    /**
+     * set messageOffset
+     * 
+     * @param messageOffset the messageOffset to set
+     */
+    public void setMessageOffset(String messageOffset) {
+        this.messageOffset = messageOffset;
+    }
+
+    /**
+     * get sendTime
+     * 
+     * @return the sendTime
+     */
+    public long getSendTime() {
+        return sendTime;
+    }
+
+    /**
+     * set sendTime
+     * 
+     * @param sendTime the sendTime to set
+     */
+    public void setSendTime(long sendTime) {
+        this.sendTime = sendTime;
+    }
+
+}
diff --git a/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/utils/GzipUtils.java b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/utils/GzipUtils.java
new file mode 100644
index 0000000..42ad935
--- /dev/null
+++ b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/utils/GzipUtils.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.commons.utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * GzipUtils
+ */
+public class GzipUtils {
+
+    public static final Logger LOG = LoggerFactory.getLogger(GzipUtils.class);
+
+    /**
+     * compress
+     * 
+     * @param  data
+     * @return      byte array
+     */
+    public static byte[] compress(byte[] data) {
+        if (data == null || data.length == 0) {
+            return data;
+        }
+
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        GZIPOutputStream gzip = null;
+        try {
+            gzip = new GZIPOutputStream(out);
+            gzip.write(data);
+        } catch (IOException e) {
+            LOG.error("compress data error", e);
+        } finally {
+            if (gzip != null) {
+                try {
+                    gzip.close();
+                } catch (IOException e) {
+                    LOG.error(GzipUtils.class.getSimpleName(), e);
+                }
+            }
+        }
+
+        return out.toByteArray();
+    }
+
+    /**
+     * decompress
+     * 
+     * @param  bytes
+     * @return       byte array
+     */
+    public static byte[] decompress(byte[] bytes) {
+        if (bytes == null || bytes.length == 0) {
+            return null;
+        }
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        ByteArrayInputStream in = new ByteArrayInputStream(bytes);
+        try {
+            GZIPInputStream ungzip = new GZIPInputStream(in);
+            byte[] buffer = new byte[256];
+            int n;
+            while ((n = ungzip.read(buffer)) >= 0) {
+                out.write(buffer, 0, n);
+            }
+        } catch (IOException e) {
+            LOG.error("Decompress data error", e);
+        }
+
+        return out.toByteArray();
+    }
+}
diff --git a/inlong-sdk/sdk-common/src/main/proto/ProxySdk.proto b/inlong-sdk/sdk-common/src/main/proto/ProxySdk.proto
new file mode 100644
index 0000000..4842948
--- /dev/null
+++ b/inlong-sdk/sdk-common/src/main/proto/ProxySdk.proto
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto2";
+
+package org.apache.inlong.sdk.commons.protocol;
+
+enum INLONG_COMPRESSED_TYPE
+{
+  INLONG_NO_COMPRESS = 0;
+  INLONG_GZ = 1;
+  INLONG_SNAPPY = 2;
+};
+
+message MapFieldEntry {
+  optional string key = 1;
+  optional string value = 2;
+}
+
+message MessagePackHeader {
+  required string inlongGroupId = 1;  //inlongGroupId 
+  required string inlongStreamId = 2;  //inlongStreamId 
+  required int64 packTime = 3; //pack time, milliseconds
+  required int32 msgCount = 4;	//message count
+  required int32 srcLength = 5;	//total length of raw messages body
+  required int32 compressLen = 6;	//compress length of messages
+  required INLONG_COMPRESSED_TYPE compressType = 7;  //compress type
+  repeated MapFieldEntry params = 8;     //additional parameters
+}
+
+message MessageObj {
+  required int64 msgTime = 1; //message generation time, milliseconds
+  required string sourceIp = 2; // agent ip of message generation
+  required bytes body = 3; //message body
+  repeated MapFieldEntry params = 4;	//additional parameters
+}
+
+message MessageObjs {
+  repeated MessageObj msgs = 1;      //message list
+}
+
+message MessagePack {
+  required MessagePackHeader header = 1;  //message pack header
+  required bytes compressBytes = 2;     //compressed bytes of MessageObjs serialization bytes, or MessageObjs serialization bytes(compressType=INLONG_NO_COMPRESS)
+}
+
+enum ResultCode{
+  SUCCUSS = 0;				// success
+  FAIL = -1;				// general error code
+  ERR_VERSION_ERROR = -101;	// not support version
+  ERR_LENGTH_ERROR = -102;	// error length, the length is negitive or over.
+  ERR_REJECT = -103;        // Proxy rejects the request, because proxy can not process more data, or the cache cluster can not be connected.
+  ERR_PACKAGE_ERROR = -104;	// request data can not be decoded with PB protocol.
+  ERR_ID_ERROR = -105;		// inlongGroupId or inlongStreamId is bad; The length is wrong, or the CRC check is wrong.
+}
+
+message ResponseInfo {
+  required ResultCode result = 1;	//response result
+}
diff --git a/inlong-sdk/sdk-common/src/test/java/org/apache/inlong/sdk/commons/protocol/TestEventUtils.java b/inlong-sdk/sdk-common/src/test/java/org/apache/inlong/sdk/commons/protocol/TestEventUtils.java
new file mode 100644
index 0000000..4952d1d
--- /dev/null
+++ b/inlong-sdk/sdk-common/src/test/java/org/apache/inlong/sdk/commons/protocol/TestEventUtils.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.commons.protocol;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.inlong.sdk.commons.protocol.ProxySdk.INLONG_COMPRESSED_TYPE;
+import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessagePack;
+import org.junit.Test;
+
+/**
+ * 
+ * TestEventUtils
+ */
+public class TestEventUtils {
+
+    public static final String INLONG_GROUP_ID = "inlongGroupId";
+    public static final String INLONG_STREAM_ID = "inlongStreamId";
+    public static final String BODY = "body";
+    public static final String SOURCE_IP = "127.0.0.1";
+
+    @Test
+    public void testEncodeSdkEvents() {
+        try {
+            SdkEvent event = new SdkEvent(INLONG_GROUP_ID, INLONG_STREAM_ID, BODY);
+            List<SdkEvent> eventList = new ArrayList<>();
+            eventList.add(event);
+            INLONG_COMPRESSED_TYPE compressedType = INLONG_COMPRESSED_TYPE.INLONG_SNAPPY;
+            MessagePack packObj = EventUtils.encodeSdkEvents(INLONG_GROUP_ID, INLONG_STREAM_ID, compressedType,
+                    eventList);
+            assertEquals(true, (packObj != null));
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Test
+    public void testDecodeSdkPack() {
+        try {
+            SdkEvent event = new SdkEvent(INLONG_GROUP_ID, INLONG_STREAM_ID, BODY);
+            event.setSourceIp(SOURCE_IP);
+            List<SdkEvent> eventList = new ArrayList<>();
+            eventList.add(event);
+            INLONG_COMPRESSED_TYPE compressedType = INLONG_COMPRESSED_TYPE.INLONG_SNAPPY;
+            MessagePack packObj = EventUtils.encodeSdkEvents(INLONG_GROUP_ID, INLONG_STREAM_ID, compressedType,
+                    eventList);
+            byte[] packBytes = packObj.toByteArray();
+            List<ProxyEvent> proxyEventList = EventUtils.decodeSdkPack(packBytes, 0, packBytes.length);
+            assertEquals(1, proxyEventList.size());
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Test
+    public void testEncodeCacheMessageBody() {
+        try {
+            ProxyEvent event = new ProxyEvent(INLONG_GROUP_ID, INLONG_STREAM_ID, BODY.getBytes(),
+                    System.currentTimeMillis(), SOURCE_IP);
+            List<ProxyEvent> eventList = new ArrayList<>();
+            eventList.add(event);
+            INLONG_COMPRESSED_TYPE compressedType = INLONG_COMPRESSED_TYPE.INLONG_SNAPPY;
+            byte[] bodyBytes = EventUtils.encodeCacheMessageBody(compressedType, eventList);
+            assertEquals(true, (bodyBytes.length > 0));
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Test
+    public void testDecodeCacheMessageBody() {
+        try {
+            ProxyEvent event = new ProxyEvent(INLONG_GROUP_ID, INLONG_STREAM_ID, BODY.getBytes(),
+                    System.currentTimeMillis(), SOURCE_IP);
+            List<ProxyEvent> eventList = new ArrayList<>();
+            eventList.add(event);
+            INLONG_COMPRESSED_TYPE compressedType = INLONG_COMPRESSED_TYPE.INLONG_SNAPPY;
+            byte[] bodyBytes = EventUtils.encodeCacheMessageBody(compressedType, eventList);
+            List<SortEvent> sortEventList = EventUtils.decodeCacheMessageBody(INLONG_GROUP_ID, INLONG_STREAM_ID,
+                    compressedType, bodyBytes);
+            assertEquals(1, sortEventList.size());
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}