You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/02/10 07:22:04 UTC
[incubator-inlong] branch master updated: [INLONG-2376] add sdk-common module for supporting PB compression protocol format based on Protobuffer protocol. (#2392)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new fc43f6e [INLONG-2376] add sdk-common module for supporting PB compression protocol format based on Protobuffer protocol. (#2392)
fc43f6e is described below
commit fc43f6e0533cd20a4985a074c76a04cdae141312
Author: 卢春亮 <94...@qq.com>
AuthorDate: Thu Feb 10 15:20:54 2022 +0800
[INLONG-2376] add sdk-common module for supporting PB compression protocol format based on Protobuffer protocol. (#2392)
---
inlong-sdk/pom.xml | 102 +++++++++-
inlong-sdk/sdk-common/pom.xml | 74 +++++++
.../sdk/commons/protocol/EventConstants.java | 46 +++++
.../inlong/sdk/commons/protocol/EventUtils.java | 215 +++++++++++++++++++++
.../inlong/sdk/commons/protocol/InlongId.java | 50 +++++
.../inlong/sdk/commons/protocol/ProxyEvent.java | 131 +++++++++++++
.../inlong/sdk/commons/protocol/SdkEvent.java | 166 ++++++++++++++++
.../inlong/sdk/commons/protocol/SortEvent.java | 117 +++++++++++
.../apache/inlong/sdk/commons/utils/GzipUtils.java | 92 +++++++++
.../sdk-common/src/main/proto/ProxySdk.proto | 73 +++++++
.../sdk/commons/protocol/TestEventUtils.java | 104 ++++++++++
11 files changed, 1164 insertions(+), 6 deletions(-)
diff --git a/inlong-sdk/pom.xml b/inlong-sdk/pom.xml
index 52f554e..274ce23 100644
--- a/inlong-sdk/pom.xml
+++ b/inlong-sdk/pom.xml
@@ -17,35 +17,125 @@
specific language governing permissions and limitations
under the License.
-->
-<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns="http://maven.apache.org/POM/4.0.0"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
<parent>
- <artifactId>inlong</artifactId>
<groupId>org.apache.inlong</groupId>
+ <artifactId>inlong</artifactId>
<version>1.1.0-incubating-SNAPSHOT</version>
</parent>
- <modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
<artifactId>inlong-sdk</artifactId>
<name>Apache InLong - SDK</name>
<modules>
+ <module>sdk-common</module>
<module>sort-sdk</module>
<module>dataproxy-sdk</module>
</modules>
<properties>
+ <compiler.source>1.8</compiler.source>
+ <compiler.target>1.8</compiler.target>
+ <flume.version>1.9.0</flume.version>
+ <plugin.assembly.version>3.2.0</plugin.assembly.version>
+ <netty.version>3.8.0.Final</netty.version>
+ <codec.version>1.15</codec.version>
+ <junit.version>4.13</junit.version>
+ <powermock.version>2.0.2</powermock.version>
+ <guava.version>19.0</guava.version>
+ <skipTests>false</skipTests>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
+ <protobuf-version>2.5.0</protobuf-version>
+ <snappy-version>1.1.8.4</snappy-version>
+ <fastjson.version>1.2.79</fastjson.version>
+ <log4j.version>2.17.1</log4j.version>
+ <mockito.version>2.23.0</mockito.version>
</properties>
<dependencies>
<dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-core</artifactId>
+ <version>${flume.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-node</artifactId>
+ <version>${flume.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-sdk</artifactId>
+ <version>${flume.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-configuration</artifactId>
+ <version>${flume.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ <version>${netty.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ <version>${codec.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>${protobuf-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ <version>${snappy-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>fastjson</artifactId>
+ <version>${fastjson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ <version>${log4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ <version>${powermock.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito2</artifactId>
+ <version>${powermock.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <artifactId>mockito-core</artifactId>
+ <groupId>org.mockito</groupId>
+ <scope>test</scope>
+ <version>${mockito.version}</version>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
- <version>4.11</version>
+ <version>${junit.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
diff --git a/inlong-sdk/sdk-common/pom.xml b/inlong-sdk/sdk-common/pom.xml
new file mode 100644
index 0000000..1d64468
--- /dev/null
+++ b/inlong-sdk/sdk-common/pom.xml
@@ -0,0 +1,74 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>inlong-sdk</artifactId>
+ <version>1.1.0-incubating-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sdk-common</artifactId>
+ <name>Apache InLong - SDK - Common</name>
+
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ <compiler.source>1.8</compiler.source>
+ <compiler.target>1.8</compiler.target>
+ </properties>
+
+ <dependencies>
+ </dependencies>
+
+ <build>
+ <extensions>
+ <extension>
+ <groupId>kr.motd.maven</groupId>
+ <artifactId>os-maven-plugin</artifactId>
+ <version>1.5.0.Final</version>
+ </extension>
+ </extensions>
+ <plugins>
+ <plugin>
+ <groupId>org.xolstice.maven.plugins</groupId>
+ <artifactId>protobuf-maven-plugin</artifactId>
+ <version>0.5.1</version>
+ <extensions>true</extensions>
+ <configuration>
+ <protoSourceRoot>${project.basedir}/src/main/proto</protoSourceRoot>
+ <protocArtifact>com.google.protobuf:protoc:2.5.0:exe:${os.detected.classifier}</protocArtifact>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/EventConstants.java b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/EventConstants.java
new file mode 100644
index 0000000..f3519b9
--- /dev/null
+++ b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/EventConstants.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.commons.protocol;
+
+/**
+ *
+ * EventConstants
+ */
+public interface EventConstants {
+ String HEADER_KEY_VERSION = "version";
+ String HEADER_SDK_VERSION_1 = "1";
+ String HEADER_CACHE_VERSION_1 = "1";
+ // sdk
+ String INLONG_GROUP_ID = "inlongGroupId";
+ String INLONG_STREAM_ID = "inlongStreamId";
+ String HEADER_KEY_MSG_TIME = "msgTime";
+ String HEADER_KEY_SOURCE_IP = "sourceIp";
+ // proxy
+ String HEADER_KEY_SOURCE_TIME = "sourceTime";
+ String TOPIC = "topic";
+ String HEADER_KEY_PROXY_NAME = "proxyName";
+ String HEADER_KEY_PACK_TIME = "packTime";
+ String HEADER_KEY_MSG_COUNT = "msgCount";
+ String HEADER_KEY_SRC_LENGTH = "srcLength";
+ String HEADER_KEY_COMPRESS_TYPE = "compressType";
+ // sort
+ String HEADER_KEY_MESSAGE_KEY = "messageKey";
+ String HEADER_KEY_MSG_OFFSET = "msgOffset";
+ // other
+ String RELOAD_INTERVAL = "reloadInterval";
+}
diff --git a/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/EventUtils.java b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/EventUtils.java
new file mode 100644
index 0000000..e7c1be9
--- /dev/null
+++ b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/EventUtils.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.commons.protocol;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.inlong.sdk.commons.protocol.ProxySdk.INLONG_COMPRESSED_TYPE;
+import org.apache.inlong.sdk.commons.protocol.ProxySdk.MapFieldEntry;
+import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObj;
+import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObjs;
+import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessagePack;
+import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessagePackHeader;
+import org.apache.inlong.sdk.commons.utils.GzipUtils;
+import org.xerial.snappy.Snappy;
+
+import com.google.protobuf.ByteString;
+
+/**
+ * EventUtils
+ */
+public class EventUtils {
+
+ /**
+ * encode
+ *
+ * @param inlongGroupId
+ * @param inlongStreamId
+ * @param compressedType
+ * @param events
+ * @return MessagePack
+ * @throws IOException
+ */
+ public static MessagePack encodeSdkEvents(String inlongGroupId, String inlongStreamId,
+ INLONG_COMPRESSED_TYPE compressedType, List<SdkEvent> events) throws IOException {
+ // MessageObjs
+ MessageObjs.Builder objsBuilder = MessageObjs.newBuilder();
+ for (SdkEvent event : events) {
+ MessageObj.Builder objBuilder = MessageObj.newBuilder();
+ objBuilder.setMsgTime(event.getMsgTime());
+ objBuilder.setSourceIp(event.getSourceIp());
+ objBuilder.setBody(ByteString.copyFrom(event.getBody()));
+ objsBuilder.addMsgs(objBuilder.build());
+ }
+ MessageObjs objs = objsBuilder.build();
+ byte[] srcBytes = objs.toByteArray();
+ byte[] compressedBytes = null;
+ switch (compressedType) {
+ case INLONG_SNAPPY :
+ compressedBytes = Snappy.compress(srcBytes);
+ break;
+ case INLONG_GZ :
+ compressedBytes = GzipUtils.compress(srcBytes);
+ break;
+ case INLONG_NO_COMPRESS :
+ default :
+ compressedBytes = srcBytes;
+ break;
+ }
+ // MessagePack
+ MessagePack.Builder packBuilder = MessagePack.newBuilder();
+ packBuilder.setCompressBytes(ByteString.copyFrom(compressedBytes));
+ // MessagePackHeader
+ MessagePackHeader.Builder headerBuilder = MessagePackHeader.newBuilder();
+ // string inlongGroupId = 1; //inlongGroupId
+ headerBuilder.setInlongGroupId(inlongGroupId);
+ // string inlongStreamId = 2; //inlongStreamId
+ headerBuilder.setInlongStreamId(inlongStreamId);
+ // int64 packTime = 3; //pack time, milliseconds
+ headerBuilder.setPackTime(System.currentTimeMillis());
+ // int32 msgCount = 4; //message count
+ headerBuilder.setMsgCount(events.size());
+ // int32 srcLength = 5; //total length of raw messages body
+ headerBuilder.setSrcLength(srcBytes.length);
+ // int32 compressLen = 6; //compress length of messages
+ headerBuilder.setCompressLen(compressedBytes.length);
+ // INLONG_COMPRESSED_TYPE compressType = 7; //compress type
+ headerBuilder.setCompressType(compressedType);
+ // map<string, string> params = 8; //additional parameters
+ packBuilder.setHeader(headerBuilder.build());
+ return packBuilder.build();
+ }
+
+ /**
+ * decodeSdkPack
+ *
+ * @param packBytes
+ * @param offset
+ * @param length
+ * @return List,ProxyEvent
+ * @throws IOException
+ */
+ public static List<ProxyEvent> decodeSdkPack(byte[] packBytes, int offset, int length) throws IOException {
+ MessagePack packObject = MessagePack.parseFrom(new ByteArrayInputStream(packBytes, offset, length));
+ MessagePackHeader header = packObject.getHeader();
+ // decompress
+ byte[] compressBytes = packObject.getCompressBytes().toByteArray();
+ byte[] srcBytes = null;
+ switch (header.getCompressType()) {
+ case INLONG_SNAPPY :
+ srcBytes = Snappy.uncompress(compressBytes);
+ break;
+ case INLONG_GZ :
+ srcBytes = GzipUtils.decompress(compressBytes);
+ break;
+ case INLONG_NO_COMPRESS :
+ default :
+ srcBytes = compressBytes;
+ break;
+ }
+ // decode
+ MessageObjs msgObjs = MessageObjs.parseFrom(srcBytes);
+ List<ProxyEvent> events = new ArrayList<>(msgObjs.getMsgsList().size());
+ String inlongGroupId = header.getInlongGroupId();
+ String inlongStreamId = header.getInlongStreamId();
+ for (MessageObj msgObj : msgObjs.getMsgsList()) {
+ ProxyEvent event = new ProxyEvent(inlongGroupId, inlongStreamId, msgObj);
+ events.add(event);
+ }
+ return events;
+ }
+
+ /**
+ * encodeCacheMessageBody
+ *
+ * @param compressedType
+ * @param events
+ * @return byte array
+ * @throws IOException
+ */
+ public static byte[] encodeCacheMessageBody(INLONG_COMPRESSED_TYPE compressedType, List<ProxyEvent> events)
+ throws IOException {
+ // encode
+ MessageObjs.Builder objs = MessageObjs.newBuilder();
+ for (ProxyEvent event : events) {
+ MessageObj.Builder builder = MessageObj.newBuilder();
+ builder.setMsgTime(event.getMsgTime());
+ builder.setSourceIp(event.getSourceIp());
+ event.getHeaders().forEach((key, value) -> {
+ builder.addParams(MapFieldEntry.newBuilder().setKey(key).setValue(value));
+ });
+ builder.setBody(ByteString.copyFrom(event.getBody()));
+ objs.addMsgs(builder.build());
+ }
+ byte[] srcBytes = objs.build().toByteArray();
+ // compress
+ byte[] compressBytes = null;
+ switch (compressedType) {
+ case INLONG_SNAPPY :
+ compressBytes = Snappy.compress(srcBytes);
+ break;
+ case INLONG_GZ :
+ compressBytes = GzipUtils.compress(srcBytes);
+ break;
+ case INLONG_NO_COMPRESS :
+ default :
+ compressBytes = srcBytes;
+ break;
+ }
+ return compressBytes;
+ }
+
+ /**
+ * decodeCacheMessageBody
+ *
+ * @param inlongGroupId
+ * @param inlongStreamId
+ * @param compressedType
+ * @param msgBody
+ * @return List,SortEvent
+ * @throws IOException
+ */
+ public static List<SortEvent> decodeCacheMessageBody(String inlongGroupId, String inlongStreamId,
+ INLONG_COMPRESSED_TYPE compressedType, byte[] msgBody) throws IOException {
+ // uncompress
+ byte[] srcBytes = null;
+ switch (compressedType) {
+ case INLONG_SNAPPY :
+ srcBytes = Snappy.uncompress(msgBody);
+ break;
+ case INLONG_GZ :
+ srcBytes = GzipUtils.decompress(msgBody);
+ break;
+ case INLONG_NO_COMPRESS :
+ default :
+ srcBytes = msgBody;
+ break;
+ }
+ // decode
+ MessageObjs msgObjs = MessageObjs.parseFrom(srcBytes);
+ List<SortEvent> events = new ArrayList<>(msgObjs.getMsgsList().size());
+ for (MessageObj msgObj : msgObjs.getMsgsList()) {
+ SortEvent event = new SortEvent(inlongGroupId, inlongStreamId, msgObj);
+ events.add(event);
+ }
+ return events;
+ }
+}
diff --git a/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/InlongId.java b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/InlongId.java
new file mode 100644
index 0000000..57831be
--- /dev/null
+++ b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/InlongId.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.commons.protocol;
+
+import org.apache.commons.lang.StringUtils;
+
+/**
+ *
+ * InlongId
+ */
+public class InlongId {
+
+ /**
+ * generateUid
+ *
+ * @param inlongGroupId
+ * @param inlongStreamId
+ * @return
+ */
+ public static String generateUid(String inlongGroupId, String inlongStreamId) {
+ if (StringUtils.isBlank(inlongGroupId)) {
+ if (StringUtils.isBlank(inlongStreamId)) {
+ return "";
+ } else {
+ return inlongStreamId;
+ }
+ } else {
+ if (StringUtils.isBlank(inlongStreamId)) {
+ return inlongGroupId;
+ } else {
+ return inlongGroupId + "." + inlongStreamId;
+ }
+ }
+ }
+}
diff --git a/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyEvent.java b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyEvent.java
new file mode 100644
index 0000000..b38b7c0
--- /dev/null
+++ b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/ProxyEvent.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.commons.protocol;
+
+import java.util.Map;
+
+import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObj;
+
+/**
+ *
+ * ProxyEvent
+ */
+/**
+ *
+ * ProxyEvent
+ */
+public class ProxyEvent extends SdkEvent {
+
+ protected long sourceTime;
+ protected String topic;
+
+ /**
+ * Constructor
+ */
+ public ProxyEvent() {
+
+ }
+
+ /**
+ * Constructor
+ *
+ * @param inlongGroupId
+ * @param inlongStreamId
+ * @param body
+ * @param msgTime
+ * @param sourceIp
+ */
+ public ProxyEvent(String inlongGroupId, String inlongStreamId, byte[] body, long msgTime, String sourceIp) {
+ this.inlongGroupId = inlongGroupId;
+ this.inlongStreamId = inlongStreamId;
+ super.setBody(body);
+ this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
+ this.msgTime = msgTime;
+ this.sourceIp = sourceIp;
+ Map<String, String> headers = super.getHeaders();
+ headers.put(EventConstants.INLONG_GROUP_ID, inlongGroupId);
+ headers.put(EventConstants.INLONG_STREAM_ID, inlongStreamId);
+ headers.put(EventConstants.HEADER_KEY_MSG_TIME, String.valueOf(msgTime));
+ headers.put(EventConstants.HEADER_KEY_SOURCE_IP, sourceIp);
+
+ this.sourceTime = System.currentTimeMillis();
+ this.getHeaders().put(EventConstants.HEADER_KEY_SOURCE_TIME, String.valueOf(sourceTime));
+ }
+
+ /**
+ * Constructor
+ *
+ * @param inlongGroupId
+ * @param inlongStreamId
+ * @param obj
+ */
+ public ProxyEvent(String inlongGroupId, String inlongStreamId, MessageObj obj) {
+ this.inlongGroupId = inlongGroupId;
+ this.inlongStreamId = inlongStreamId;
+ super.setBody(obj.getBody().toByteArray());
+ this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
+ this.msgTime = obj.getMsgTime();
+ this.sourceIp = obj.getSourceIp();
+ Map<String, String> headers = super.getHeaders();
+ headers.put(EventConstants.INLONG_GROUP_ID, inlongGroupId);
+ headers.put(EventConstants.INLONG_STREAM_ID, inlongStreamId);
+ headers.put(EventConstants.HEADER_KEY_MSG_TIME, String.valueOf(msgTime));
+ headers.put(EventConstants.HEADER_KEY_SOURCE_IP, sourceIp);
+
+ this.sourceTime = System.currentTimeMillis();
+ this.getHeaders().put(EventConstants.HEADER_KEY_SOURCE_TIME, String.valueOf(sourceTime));
+ }
+
+ /**
+ * get sourceTime
+ *
+ * @return the sourceTime
+ */
+ public long getSourceTime() {
+ return sourceTime;
+ }
+
+ /**
+ * get topic
+ *
+ * @return the topic
+ */
+ public String getTopic() {
+ return topic;
+ }
+
+ /**
+ * set topic
+ *
+ * @param topic the topic to set
+ */
+ public void setTopic(String topic) {
+ this.topic = topic;
+ this.getHeaders().put(EventConstants.TOPIC, topic);
+ }
+
+ /**
+ * set sourceTime
+ *
+ * @param sourceTime the sourceTime to set
+ */
+ public void setSourceTime(long sourceTime) {
+ this.sourceTime = sourceTime;
+ }
+
+}
diff --git a/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/SdkEvent.java b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/SdkEvent.java
new file mode 100644
index 0000000..416e1c8
--- /dev/null
+++ b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/SdkEvent.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.commons.protocol;
+
+import java.nio.charset.Charset;
+import java.util.Map;
+
+import org.apache.flume.event.SimpleEvent;
+
+/**
+ *
+ * SdkEvent
+ */
+public class SdkEvent extends SimpleEvent {
+
+ protected String inlongGroupId;
+ protected String inlongStreamId;
+ protected String uid;
+
+ protected long msgTime;
+ protected String sourceIp;
+
+ /**
+ * Constructor
+ */
+ public SdkEvent(){
+ }
+
+ /**
+ * Constructor
+ *
+ * @param inlongGroupId
+ * @param inlongStreamId
+ * @param body
+ */
+ public SdkEvent(String inlongGroupId, String inlongStreamId, byte[] body) {
+ this.inlongGroupId = inlongGroupId;
+ this.inlongStreamId = inlongStreamId;
+ super.setBody(body);
+ this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
+ this.msgTime = System.currentTimeMillis();
+ this.sourceIp = "127.0.0.1";
+ Map<String, String> headers = super.getHeaders();
+ headers.put(EventConstants.INLONG_GROUP_ID, inlongGroupId);
+ headers.put(EventConstants.INLONG_STREAM_ID, inlongStreamId);
+ headers.put(EventConstants.HEADER_KEY_MSG_TIME, String.valueOf(msgTime));
+ }
+
+ /**
+ * Constructor
+ *
+ * @param inlongGroupId
+ * @param inlongStreamId
+ * @param body
+ */
+ public SdkEvent(String inlongGroupId, String inlongStreamId, String body) {
+ this(inlongGroupId, inlongStreamId, body.getBytes(Charset.defaultCharset()));
+ }
+
+ /**
+ * get sourceIp
+ *
+ * @return the sourceIp
+ */
+ public String getSourceIp() {
+ return sourceIp;
+ }
+
+ /**
+ * set sourceIp
+ *
+ * @param sourceIp the sourceIp to set
+ */
+ public void setSourceIp(String sourceIp) {
+ this.sourceIp = sourceIp;
+ this.getHeaders().put(EventConstants.HEADER_KEY_SOURCE_IP, sourceIp);
+ }
+
+ /**
+ * get inlongGroupId
+ *
+ * @return the inlongGroupId
+ */
+ public String getInlongGroupId() {
+ return inlongGroupId;
+ }
+
+ /**
+ * get inlongStreamId
+ *
+ * @return the inlongStreamId
+ */
+ public String getInlongStreamId() {
+ return inlongStreamId;
+ }
+
+ /**
+ * get uid
+ *
+ * @return the uid
+ */
+ public String getUid() {
+ return uid;
+ }
+
+ /**
+ * get msgTime
+ *
+ * @return the msgTime
+ */
+ public long getMsgTime() {
+ return msgTime;
+ }
+
+ /**
+ * set inlongGroupId
+ *
+ * @param inlongGroupId the inlongGroupId to set
+ */
+ public void setInlongGroupId(String inlongGroupId) {
+ this.inlongGroupId = inlongGroupId;
+ }
+
+ /**
+ * set inlongStreamId
+ *
+ * @param inlongStreamId the inlongStreamId to set
+ */
+ public void setInlongStreamId(String inlongStreamId) {
+ this.inlongStreamId = inlongStreamId;
+ }
+
+ /**
+ * set uid
+ *
+ * @param uid the uid to set
+ */
+ public void setUid(String uid) {
+ this.uid = uid;
+ }
+
+ /**
+ * set msgTime
+ *
+ * @param msgTime the msgTime to set
+ */
+ public void setMsgTime(long msgTime) {
+ this.msgTime = msgTime;
+ }
+
+}
diff --git a/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/SortEvent.java b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/SortEvent.java
new file mode 100644
index 0000000..5128035
--- /dev/null
+++ b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/SortEvent.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.commons.protocol;
+
+import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObj;
+
+/**
+ *
+ * SortEvent
+ */
+public class SortEvent extends ProxyEvent {
+
+ protected String messageKey;
+ protected String messageOffset;
+ protected long sendTime;
+
+ /**
+ * Constructor
+ */
+ public SortEvent() {
+
+ }
+
+ /**
+ * Constructor
+ *
+ * @param inlongGroupId
+ * @param inlongStreamId
+ * @param body
+ * @param msgTime
+ * @param sourceIp
+ */
+ public SortEvent(String inlongGroupId, String inlongStreamId, byte[] body, long msgTime, String sourceIp) {
+ super(inlongGroupId, inlongStreamId, body, msgTime, sourceIp);
+ }
+
+ /**
+ * Constructor
+ *
+ * @param inlongGroupId
+ * @param inlongStreamId
+ * @param obj
+ */
+ public SortEvent(String inlongGroupId, String inlongStreamId, MessageObj obj) {
+ super(inlongGroupId, inlongStreamId, obj);
+ }
+
+ /**
+ * get messageKey
+ *
+ * @return the messageKey
+ */
+ public String getMessageKey() {
+ return messageKey;
+ }
+
+ /**
+ * set messageKey
+ *
+ * @param messageKey the messageKey to set
+ */
+ public void setMessageKey(String messageKey) {
+ this.messageKey = messageKey;
+ }
+
+ /**
+ * get messageOffset
+ *
+ * @return the messageOffset
+ */
+ public String getMessageOffset() {
+ return messageOffset;
+ }
+
+ /**
+ * set messageOffset
+ *
+ * @param messageOffset the messageOffset to set
+ */
+ public void setMessageOffset(String messageOffset) {
+ this.messageOffset = messageOffset;
+ }
+
+ /**
+ * get sendTime
+ *
+ * @return the sendTime
+ */
+ public long getSendTime() {
+ return sendTime;
+ }
+
+ /**
+ * set sendTime
+ *
+ * @param sendTime the sendTime to set
+ */
+ public void setSendTime(long sendTime) {
+ this.sendTime = sendTime;
+ }
+
+}
diff --git a/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/utils/GzipUtils.java b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/utils/GzipUtils.java
new file mode 100644
index 0000000..42ad935
--- /dev/null
+++ b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/utils/GzipUtils.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.commons.utils;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * GzipUtils
+ */
+public class GzipUtils {
+
+ public static final Logger LOG = LoggerFactory.getLogger(GzipUtils.class);
+
+ /**
+ * compress
+ *
+ * @param data
+ * @return byte array
+ */
+ public static byte[] compress(byte[] data) {
+ if (data == null || data.length == 0) {
+ return data;
+ }
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ GZIPOutputStream gzip = null;
+ try {
+ gzip = new GZIPOutputStream(out);
+ gzip.write(data);
+ } catch (IOException e) {
+ LOG.error("compress data error", e);
+ } finally {
+ if (gzip != null) {
+ try {
+ gzip.close();
+ } catch (IOException e) {
+ LOG.error(GzipUtils.class.getSimpleName(), e);
+ }
+ }
+ }
+
+ return out.toByteArray();
+ }
+
+ /**
+ * decompress
+ *
+ * @param bytes
+ * @return byte array
+ */
+ public static byte[] decompress(byte[] bytes) {
+ if (bytes == null || bytes.length == 0) {
+ return null;
+ }
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ByteArrayInputStream in = new ByteArrayInputStream(bytes);
+ try {
+ GZIPInputStream ungzip = new GZIPInputStream(in);
+ byte[] buffer = new byte[256];
+ int n;
+ while ((n = ungzip.read(buffer)) >= 0) {
+ out.write(buffer, 0, n);
+ }
+ } catch (IOException e) {
+ LOG.error("Decompress data error", e);
+ }
+
+ return out.toByteArray();
+ }
+}
diff --git a/inlong-sdk/sdk-common/src/main/proto/ProxySdk.proto b/inlong-sdk/sdk-common/src/main/proto/ProxySdk.proto
new file mode 100644
index 0000000..4842948
--- /dev/null
+++ b/inlong-sdk/sdk-common/src/main/proto/ProxySdk.proto
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto2";
+
+package org.apache.inlong.sdk.commons.protocol;
+
+enum INLONG_COMPRESSED_TYPE
+{
+ INLONG_NO_COMPRESS = 0;
+ INLONG_GZ = 1;
+ INLONG_SNAPPY = 2;
+};
+
+message MapFieldEntry {
+ optional string key = 1;
+ optional string value = 2;
+}
+
+message MessagePackHeader {
+ required string inlongGroupId = 1; //inlongGroupId
+ required string inlongStreamId = 2; //inlongStreamId
+ required int64 packTime = 3; //pack time, milliseconds
+ required int32 msgCount = 4; //message count
+ required int32 srcLength = 5; //total length of raw messages body
+ required int32 compressLen = 6; //compress length of messages
+ required INLONG_COMPRESSED_TYPE compressType = 7; //compress type
+ repeated MapFieldEntry params = 8; //additional parameters
+}
+
+message MessageObj {
+ required int64 msgTime = 1; //message generation time, milliseconds
+ required string sourceIp = 2; // agent ip of message generation
+ required bytes body = 3; //message body
+ repeated MapFieldEntry params = 4; //additional parameters
+}
+
+message MessageObjs {
+ repeated MessageObj msgs = 1; //message list
+}
+
+message MessagePack {
+ required MessagePackHeader header = 1; //message pack header
+ required bytes compressBytes = 2; //compressed bytes of MessageObjs serialization bytes, or MessageObjs serialization bytes(compressType=INLONG_NO_COMPRESS)
+}
+
+enum ResultCode{
+ SUCCUSS = 0; // success
+ FAIL = -1; // general error code
+ ERR_VERSION_ERROR = -101; // not support version
+ ERR_LENGTH_ERROR = -102; // error length, the length is negitive or over.
+ ERR_REJECT = -103; // Proxy rejects the request, because proxy can not process more data, or the cache cluster can not be connected.
+ ERR_PACKAGE_ERROR = -104; // request data can not be decoded with PB protocol.
+ ERR_ID_ERROR = -105; // inlongGroupId or inlongStreamId is bad; The length is wrong, or the CRC check is wrong.
+}
+
+message ResponseInfo {
+ required ResultCode result = 1; //response result
+}
diff --git a/inlong-sdk/sdk-common/src/test/java/org/apache/inlong/sdk/commons/protocol/TestEventUtils.java b/inlong-sdk/sdk-common/src/test/java/org/apache/inlong/sdk/commons/protocol/TestEventUtils.java
new file mode 100644
index 0000000..4952d1d
--- /dev/null
+++ b/inlong-sdk/sdk-common/src/test/java/org/apache/inlong/sdk/commons/protocol/TestEventUtils.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.commons.protocol;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.inlong.sdk.commons.protocol.ProxySdk.INLONG_COMPRESSED_TYPE;
+import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessagePack;
+import org.junit.Test;
+
+/**
+ *
+ * TestEventUtils
+ */
+public class TestEventUtils {
+
+ public static final String INLONG_GROUP_ID = "inlongGroupId";
+ public static final String INLONG_STREAM_ID = "inlongStreamId";
+ public static final String BODY = "body";
+ public static final String SOURCE_IP = "127.0.0.1";
+
+ @Test
+ public void testEncodeSdkEvents() {
+ try {
+ SdkEvent event = new SdkEvent(INLONG_GROUP_ID, INLONG_STREAM_ID, BODY);
+ List<SdkEvent> eventList = new ArrayList<>();
+ eventList.add(event);
+ INLONG_COMPRESSED_TYPE compressedType = INLONG_COMPRESSED_TYPE.INLONG_SNAPPY;
+ MessagePack packObj = EventUtils.encodeSdkEvents(INLONG_GROUP_ID, INLONG_STREAM_ID, compressedType,
+ eventList);
+ assertEquals(true, (packObj != null));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testDecodeSdkPack() {
+ try {
+ SdkEvent event = new SdkEvent(INLONG_GROUP_ID, INLONG_STREAM_ID, BODY);
+ event.setSourceIp(SOURCE_IP);
+ List<SdkEvent> eventList = new ArrayList<>();
+ eventList.add(event);
+ INLONG_COMPRESSED_TYPE compressedType = INLONG_COMPRESSED_TYPE.INLONG_SNAPPY;
+ MessagePack packObj = EventUtils.encodeSdkEvents(INLONG_GROUP_ID, INLONG_STREAM_ID, compressedType,
+ eventList);
+ byte[] packBytes = packObj.toByteArray();
+ List<ProxyEvent> proxyEventList = EventUtils.decodeSdkPack(packBytes, 0, packBytes.length);
+ assertEquals(1, proxyEventList.size());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testEncodeCacheMessageBody() {
+ try {
+ ProxyEvent event = new ProxyEvent(INLONG_GROUP_ID, INLONG_STREAM_ID, BODY.getBytes(),
+ System.currentTimeMillis(), SOURCE_IP);
+ List<ProxyEvent> eventList = new ArrayList<>();
+ eventList.add(event);
+ INLONG_COMPRESSED_TYPE compressedType = INLONG_COMPRESSED_TYPE.INLONG_SNAPPY;
+ byte[] bodyBytes = EventUtils.encodeCacheMessageBody(compressedType, eventList);
+ assertEquals(true, (bodyBytes.length > 0));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testDecodeCacheMessageBody() {
+ try {
+ ProxyEvent event = new ProxyEvent(INLONG_GROUP_ID, INLONG_STREAM_ID, BODY.getBytes(),
+ System.currentTimeMillis(), SOURCE_IP);
+ List<ProxyEvent> eventList = new ArrayList<>();
+ eventList.add(event);
+ INLONG_COMPRESSED_TYPE compressedType = INLONG_COMPRESSED_TYPE.INLONG_SNAPPY;
+ byte[] bodyBytes = EventUtils.encodeCacheMessageBody(compressedType, eventList);
+ List<SortEvent> sortEventList = EventUtils.decodeCacheMessageBody(INLONG_GROUP_ID, INLONG_STREAM_ID,
+ compressedType, bodyBytes);
+ assertEquals(1, sortEventList.size());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}