You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/11/08 14:04:42 UTC

[inlong] branch branch-1.4 updated (c0546e829 -> f30dec91e)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a change to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git


    from c0546e829 [INLONG-6435][Release] Change the tag of Docker images to 1.4.0 (#6458)
     new ce8f194f7 [INLONG-6417][SDK] Support proxy-send mode (#6437)
     new 19ba51d4a [INLONG-6409][Sort] Unspported Time and Timestamp iceberg auto create table in spark session query (#6424)
     new 1e3212cdb [INLONG-6438][SDK] Parses and handles ack response from DataProxy (#6448)
     new 077a7e08c [INLONG-6456][Dashboard] Supports management of Oracle sources (#6469)
     new 7644e71d5 [INLONG-6451][Manager] Optimize the resource process of Kafka MQ (#6455)
     new b04d4472e [INLONG-6426][Manager] SortSourceService support multi stream under one group (#6427)
     new f30dec91e [INLONG-6382][Sort] Iceberg data is messed up when the source table has no primary key in multiple sink scenes (#6461)

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../inlong/common/msg/AttributeConstants.java      |   5 +
 inlong-dashboard/src/locales/cn.json               |  10 +
 inlong-dashboard/src/locales/en.json               |  10 +
 .../sources/defaults/{MySQLBinlog.ts => Oracle.ts} |  71 ++--
 .../src/metas/sources/defaults/index.ts            |   5 +
 .../resources/mappers/InlongStreamEntityMapper.xml |   3 +-
 .../resources/mappers/StreamSinkEntityMapper.xml   |   1 +
 .../manager/pojo/group/kafka/InlongKafkaDTO.java   |   4 -
 .../manager/pojo/group/kafka/InlongKafkaInfo.java  |   4 -
 .../pojo/group/kafka/InlongKafkaRequest.java       |   4 -
 .../pojo/sort/standalone/SortSourceStreamInfo.java |  23 ++
 .../sort/standalone/SortSourceStreamSinkInfo.java  |   1 +
 .../service/core/impl/SortSourceServiceImpl.java   |  96 +++--
 .../resource/queue/kafka/KafkaOperator.java        |  31 +-
 .../queue/kafka/KafkaResourceOperators.java        |  18 +-
 .../service/resource/queue/kafka/KafkaUtils.java   |  27 +-
 .../service/source/kafka/KafkaSourceOperator.java  |  39 ++
 .../manager/service/sort/SortServiceImplTest.java  |  24 +-
 .../inlong/sdk/dataproxy/DefaultMessageSender.java | 376 +++++++++++++++----
 .../apache/inlong/sdk/dataproxy/SendResult.java    |  11 +-
 .../inlong/sdk/dataproxy/codec/EncodeObject.java   | 154 +++++---
 .../inlong/sdk/dataproxy/codec/ErrorCode.java      |  52 ---
 .../sdk/dataproxy/codec/ProtocolDecoder.java       |  38 +-
 .../inlong/sdk/dataproxy/network/Sender.java       |  92 +++--
 .../inlong/sdk/dataproxy/utils/ProxyUtils.java     |   3 +-
 .../org/apache/inlong/sort/base/Constants.java     |  14 +
 .../base/format/AbstractDynamicSchemaFormat.java   |   7 -
 .../base/format/CanalJsonDynamicSchemaFormat.java  |  25 +-
 .../format/DebeziumJsonDynamicSchemaFormat.java    |  24 +-
 .../base/format/DynamicSchemaFormatFactory.java    |  39 +-
 .../sort/base/format/JsonDynamicSchemaFormat.java  |  50 ++-
 .../sort/base/format/JsonToRowDataConverters.java  | 414 +++++++++++++++++++++
 .../inlong/sort/base/sink/MultipleSinkOption.java  |  48 ++-
 .../format/CanalJsonDynamicSchemaFormatTest.java   |  12 +-
 .../DebeziumJsonDynamicSchemaFormatTest.java       |   3 +-
 ...eziumJsonDynamicSchemaFormatWithSchemaTest.java |   3 +-
 .../sort/doris/table/DorisDynamicTableFactory.java |   6 +-
 .../sort/iceberg/FlinkDynamicTableFactory.java     |   4 +
 .../inlong/sort/iceberg/IcebergTableSink.java      |   4 +
 .../apache/inlong/sort/iceberg/sink/FlinkSink.java |   3 +-
 .../sink/multiple/DynamicSchemaHandleOperator.java |   4 +-
 .../sink/multiple/IcebergMultipleStreamWriter.java |   8 +-
 .../sort/kafka/table/KafkaDynamicTableFactory.java |   5 +-
 43 files changed, 1274 insertions(+), 501 deletions(-)
 copy inlong-dashboard/src/metas/sources/defaults/{MySQLBinlog.ts => Oracle.ts} (74%)
 delete mode 100644 inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ErrorCode.java
 create mode 100644 inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonToRowDataConverters.java


[inlong] 01/07: [INLONG-6417][SDK] Support proxy-send mode (#6437)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit ce8f194f72d67d7218bf7734bf832d9df3e093f5
Author: xueyingzhang <86...@users.noreply.github.com>
AuthorDate: Tue Nov 8 15:47:47 2022 +0800

    [INLONG-6417][SDK] Support proxy-send mode (#6437)
---
 .../inlong/sdk/dataproxy/DefaultMessageSender.java | 376 +++++++++++++++++----
 .../inlong/sdk/dataproxy/utils/ProxyUtils.java     |   3 +-
 2 files changed, 306 insertions(+), 73 deletions(-)

diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
index 2dc1c2ca5..4cba1a3ea 100644
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
@@ -18,6 +18,7 @@
 
 package org.apache.inlong.sdk.dataproxy;
 
+import org.apache.inlong.common.msg.AttributeConstants;
 import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
 import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
 import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
@@ -95,12 +96,12 @@ public class DefaultMessageSender implements MessageSender {
     /**
      * generate by cluster id
      *
-     * @param configure         - sender
+     * @param configure - sender
      * @param selfDefineFactory - sender factory
      * @return - sender
      */
     public static DefaultMessageSender generateSenderByClusterId(ProxyClientConfig configure,
-                                                                 ThreadFactory selfDefineFactory) throws Exception {
+            ThreadFactory selfDefineFactory) throws Exception {
         ProxyConfigManager proxyConfigManager = new ProxyConfigManager(configure,
                 Utils.getLocalIp(), null);
         proxyConfigManager.setGroupId(configure.getGroupId());
@@ -188,35 +189,61 @@ public class DefaultMessageSender implements MessageSender {
 
     @Deprecated
     public SendResult sendMessage(byte[] body, String attributes, String msgUUID,
-                                  long timeout, TimeUnit timeUnit) {
+            long timeout, TimeUnit timeUnit) {
         return sender.syncSendMessage(new EncodeObject(body, attributes,
                 idGenerator.getNextId()), msgUUID, timeout, timeUnit);
     }
 
     public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID,
-                                  long timeout, TimeUnit timeUnit) {
+            long timeout, TimeUnit timeUnit) {
+        return sendMessage(body, groupId, streamId, dt, msgUUID, timeout, timeUnit, false);
+    }
+
+    /**
+     * ync send single message
+     *
+     * @param body message data
+     * @param groupId groupId
+     * @param streamId streamId
+     * @param dt data report timestamp
+     * @param msgUUID msg uuid
+     * @param timeout
+     * @param timeUnit
+     * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
+     * @return SendResult.OK means success
+     */
+    public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID,
+            long timeout, TimeUnit timeUnit, boolean isProxySend) {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
             return SendResult.INVALID_ATTRIBUTES;
         }
         addIndexCnt(groupId, streamId, 1);
 
+        String proxySend = "";
+        if (isProxySend) {
+            proxySend = AttributeConstants.MESSAGE_PROXY_SEND + "=true";
+        }
+
         boolean isCompressEnd = (isCompress && (body.length > cpsSize));
 
         if (msgtype == 7 || msgtype == 8) {
             EncodeObject encodeObject = new EncodeObject(body, msgtype, isCompressEnd, isReport,
-                    isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(), groupId, streamId, "");
+                    isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(), groupId, streamId, proxySend);
             encodeObject.setSupportLF(isSupportLF);
             return sender.syncSendMessage(encodeObject, msgUUID, timeout, timeUnit);
         } else if (msgtype == 3 || msgtype == 5) {
+            if (isProxySend) {
+                proxySend = "&" + proxySend;
+            }
             if (isCompressEnd) {
                 return sender.syncSendMessage(new EncodeObject(body, "groupId=" + groupId + "&streamId="
-                        + streamId + "&dt=" + dt + "&cp=snappy", idGenerator.getNextId(), this.getMsgtype(),
-                        true, groupId), msgUUID, timeout, timeUnit);
+                        + streamId + "&dt=" + dt + "&cp=snappy" + proxySend, idGenerator.getNextId(),
+                        this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit);
             } else {
-                return sender.syncSendMessage(new EncodeObject(body, "groupId=" + groupId + "&streamId="
-                                + streamId + "&dt=" + dt, idGenerator.getNextId(), this.getMsgtype(), false, groupId),
-                        msgUUID, timeout, timeUnit);
+                return sender.syncSendMessage(new EncodeObject(body,
+                        "groupId=" + groupId + "&streamId=" + streamId + "&dt=" + dt + proxySend,
+                        idGenerator.getNextId(), this.getMsgtype(), false, groupId), msgUUID, timeout, timeUnit);
             }
         }
 
@@ -224,7 +251,26 @@ public class DefaultMessageSender implements MessageSender {
     }
 
     public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID,
-                                  long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) {
+            long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) {
+        return sendMessage(body, groupId, streamId, dt, msgUUID, timeout, timeUnit, extraAttrMap, false);
+    }
+
+    /**
+     * sync send single message
+     *
+     * @param body message data
+     * @param groupId groupId
+     * @param streamId streamId
+     * @param dt data report timestamp
+     * @param msgUUID msg uuid
+     * @param timeout
+     * @param timeUnit
+     * @param extraAttrMap extra attributes
+     * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
+     * @return SendResult.OK means success
+     */
+    public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID,
+            long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap, boolean isProxySend) {
 
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt) || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
@@ -232,6 +278,9 @@ public class DefaultMessageSender implements MessageSender {
         }
         addIndexCnt(groupId, streamId, 1);
 
+        if (isProxySend) {
+            extraAttrMap.put(AttributeConstants.MESSAGE_PROXY_SEND, "true");
+        }
         StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap);
 
         boolean isCompressEnd = (isCompress && (body.length > cpsSize));
@@ -260,27 +309,54 @@ public class DefaultMessageSender implements MessageSender {
     }
 
     public SendResult sendMessage(List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
-                                  long timeout, TimeUnit timeUnit) {
+            long timeout, TimeUnit timeUnit) {
+        return sendMessage(bodyList, groupId, streamId, dt, msgUUID, timeout, timeUnit, false);
+    }
+
+    /**
+     * sync send a batch of messages
+     *
+     * @param bodyList list of messages
+     * @param groupId groupId
+     * @param streamId streamId
+     * @param dt data report timestamp
+     * @param msgUUID msg uuid
+     * @param timeout
+     * @param timeUnit
+     * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
+     * @return SendResult.OK means success
+     */
+    public SendResult sendMessage(List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
+            long timeout, TimeUnit timeUnit, boolean isProxySend) {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)) {
             return SendResult.INVALID_ATTRIBUTES;
         }
         addIndexCnt(groupId, streamId, bodyList.size());
 
+        String proxySend = "";
+        if (isProxySend) {
+            proxySend = AttributeConstants.MESSAGE_SYNC_SEND + "=true";
+        }
+
         if (msgtype == 7 || msgtype == 8) {
             EncodeObject encodeObject = new EncodeObject(bodyList, msgtype, isCompress, isReport,
                     isGroupIdTransfer, dt / 1000,
-                    idGenerator.getNextInt(), groupId, streamId, "");
+                    idGenerator.getNextInt(), groupId, streamId, proxySend);
             encodeObject.setSupportLF(isSupportLF);
             return sender.syncSendMessage(encodeObject, msgUUID, timeout, timeUnit);
         } else if (msgtype == 3 || msgtype == 5) {
+            if (isProxySend) {
+                proxySend = "&" + proxySend;
+            }
             if (isCompress) {
                 return sender.syncSendMessage(new EncodeObject(bodyList, "groupId=" + groupId + "&streamId=" + streamId
-                        + "&dt=" + dt + "&cp=snappy" + "&cnt=" + bodyList.size(), idGenerator.getNextId(),
-                        this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit);
+                        + "&dt=" + dt + "&cp=snappy" + "&cnt=" + bodyList.size() + proxySend,
+                        idGenerator.getNextId(), this.getMsgtype(), true, groupId), msgUUID, timeout, timeUnit);
             } else {
                 return sender.syncSendMessage(new EncodeObject(bodyList, "groupId=" + groupId + "&streamId=" + streamId
-                        + "&dt=" + dt + "&cnt=" + bodyList.size(), idGenerator.getNextId(), this.getMsgtype(),
+                        + "&dt=" + dt + "&cnt=" + bodyList.size() + proxySend, idGenerator.getNextId(),
+                        this.getMsgtype(),
                         false, groupId), msgUUID, timeout, timeUnit);
             }
         }
@@ -288,13 +364,35 @@ public class DefaultMessageSender implements MessageSender {
     }
 
     public SendResult sendMessage(List<byte[]> bodyList, String groupId, String streamId, long dt,
-                                  String msgUUID, long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) {
+            String msgUUID, long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) {
+        return sendMessage(bodyList, groupId, streamId, dt, msgUUID, timeout, timeUnit, extraAttrMap, false);
+    }
+
+    /**
+     * sync send a batch of messages
+     *
+     * @param bodyList list of messages
+     * @param groupId groupId
+     * @param streamId streamId
+     * @param dt data report timestamp
+     * @param msgUUID msg uuid
+     * @param timeout
+     * @param timeUnit
+     * @param extraAttrMap extra attributes
+     * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
+     * @return SendResult.OK means success
+     */
+    public SendResult sendMessage(List<byte[]> bodyList, String groupId, String streamId, long dt,
+            String msgUUID, long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap, boolean isProxySend) {
         dt = ProxyUtils.covertZeroDt(dt);
-        if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
-                || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
+        if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt) || !ProxyUtils.isAttrKeysValid(
+                extraAttrMap)) {
             return SendResult.INVALID_ATTRIBUTES;
         }
         addIndexCnt(groupId, streamId, bodyList.size());
+        if (isProxySend) {
+            extraAttrMap.put(AttributeConstants.MESSAGE_PROXY_SEND, "true");
+        }
         StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap);
 
         if (msgtype == 7 || msgtype == 8) {
@@ -322,37 +420,62 @@ public class DefaultMessageSender implements MessageSender {
 
     @Deprecated
     public void asyncSendMessage(SendMessageCallback callback, byte[] body, String attributes,
-                                 String msgUUID, long timeout, TimeUnit timeUnit) throws ProxysdkException {
+            String msgUUID, long timeout, TimeUnit timeUnit) throws ProxysdkException {
         sender.asyncSendMessage(new EncodeObject(body, attributes, idGenerator.getNextId()),
                 callback, msgUUID, timeout, timeUnit);
     }
 
-    public void asyncSendMessage(SendMessageCallback callback, byte[] body,
-                                 String groupId, String streamId, long dt, String msgUUID,
-                                 long timeout, TimeUnit timeUnit) throws ProxysdkException {
+    public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, long dt,
+            String msgUUID, long timeout, TimeUnit timeUnit) throws ProxysdkException {
+        asyncSendMessage(callback, body, groupId, streamId, dt, msgUUID, timeout, timeUnit, false);
+    }
+
+    /**
+     * async send single message
+     *
+     * @param callback callback can be null
+     * @param body message data
+     * @param groupId groupId
+     * @param streamId streamId
+     * @param dt data report timestamp
+     * @param msgUUID msg uuid
+     * @param timeout
+     * @param timeUnit
+     * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
+     * @throws ProxysdkException
+     */
+    public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, long dt,
+            String msgUUID, long timeout, TimeUnit timeUnit, boolean isProxySend) throws ProxysdkException {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
             throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
         }
         addIndexCnt(groupId, streamId, 1);
 
+        String proxySend = "";
+        if (isProxySend) {
+            proxySend = AttributeConstants.MESSAGE_PROXY_SEND + "=true";
+        }
         boolean isCompressEnd = (isCompress && (body.length > cpsSize));
         if (msgtype == 7 || msgtype == 8) {
             EncodeObject encodeObject = new EncodeObject(body, this.getMsgtype(), isCompressEnd, isReport,
                     isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(),
-                    groupId, streamId, "");
+                    groupId, streamId, proxySend);
             encodeObject.setSupportLF(isSupportLF);
             sender.asyncSendMessage(encodeObject, callback, msgUUID, timeout, timeUnit);
         } else if (msgtype == 3 || msgtype == 5) {
             if (isCompressEnd) {
+                if (isProxySend) {
+                    proxySend = "&" + proxySend;
+                }
                 sender.asyncSendMessage(new EncodeObject(body, "groupId="
-                                + groupId + "&streamId=" + streamId + "&dt=" + dt + "&cp=snappy",
+                                + groupId + "&streamId=" + streamId + "&dt=" + dt + "&cp=snappy" + proxySend,
                                 idGenerator.getNextId(), this.getMsgtype(), true, groupId),
                         callback, msgUUID, timeout, timeUnit);
             } else {
                 sender.asyncSendMessage(
                         new EncodeObject(body, "groupId=" + groupId + "&streamId="
-                                + streamId + "&dt=" + dt, idGenerator.getNextId(),
+                                + streamId + "&dt=" + dt + proxySend, idGenerator.getNextId(),
                                 this.getMsgtype(), false, groupId), callback,
                         msgUUID, timeout, timeUnit);
             }
@@ -360,14 +483,38 @@ public class DefaultMessageSender implements MessageSender {
 
     }
 
-    public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId,
-                                 long dt, String msgUUID, long timeout, TimeUnit timeUnit,
-                                 Map<String, String> extraAttrMap) throws ProxysdkException {
+    public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, long dt,
+            String msgUUID, long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap)
+            throws ProxysdkException {
+        asyncSendMessage(callback, body, groupId, streamId, dt, msgUUID, timeout, timeUnit, extraAttrMap, false);
+    }
+
+    /**
+     * async send single message
+     *
+     * @param callback callback can be null
+     * @param body message data
+     * @param groupId groupId
+     * @param streamId streamId
+     * @param dt data report timestamp
+     * @param msgUUID msg uuid
+     * @param timeout
+     * @param timeUnit
+     * @param extraAttrMap extra attributes
+     * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
+     * @throws ProxysdkException
+     */
+    public void asyncSendMessage(SendMessageCallback callback, byte[] body, String groupId, String streamId, long dt,
+            String msgUUID, long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap, boolean isProxySend)
+            throws ProxysdkException {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt) || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
             throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
         }
         addIndexCnt(groupId, streamId, 1);
+        if (isProxySend) {
+            extraAttrMap.put(AttributeConstants.MESSAGE_PROXY_SEND, "true");
+        }
         StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap);
 
         boolean isCompressEnd = (isCompress && (body.length > cpsSize));
@@ -392,45 +539,97 @@ public class DefaultMessageSender implements MessageSender {
         }
     }
 
+    public void asyncSendMessage(SendMessageCallback callback, List<byte[]> bodyList, String groupId, String streamId,
+            long dt, String msgUUID, long timeout, TimeUnit timeUnit) throws ProxysdkException {
+        asyncSendMessage(callback, bodyList, groupId, streamId, dt, msgUUID, timeout, timeUnit, false);
+    }
+
+    /**
+     * async send a batch of messages
+     *
+     * @param callback callback can be null
+     * @param bodyList list of messages
+     * @param groupId groupId
+     * @param streamId streamId
+     * @param dt data report time
+     * @param msgUUID msg uuid
+     * @param timeout
+     * @param timeUnit
+     * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
+     * @throws ProxysdkException
+     */
     public void asyncSendMessage(SendMessageCallback callback, List<byte[]> bodyList,
-                                 String groupId, String streamId, long dt, String msgUUID,
-                                 long timeout, TimeUnit timeUnit) throws ProxysdkException {
+            String groupId, String streamId, long dt, String msgUUID,
+            long timeout, TimeUnit timeUnit, boolean isProxySend) throws ProxysdkException {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)) {
             throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
         }
         addIndexCnt(groupId, streamId, bodyList.size());
+        String proxySend = "";
+        if (isProxySend) {
+            proxySend = AttributeConstants.MESSAGE_PROXY_SEND + "=true";
+        }
         if (msgtype == 7 || msgtype == 8) {
             EncodeObject encodeObject = new EncodeObject(bodyList, this.getMsgtype(), isCompress,
                     isReport, isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(),
-                    groupId, streamId, "");
+                    groupId, streamId, proxySend);
             encodeObject.setSupportLF(isSupportLF);
             sender.asyncSendMessage(encodeObject, callback, msgUUID, timeout, timeUnit);
         } else if (msgtype == 3 || msgtype == 5) {
+            if (isProxySend) {
+                proxySend = "&" + proxySend;
+            }
             if (isCompress) {
                 sender.asyncSendMessage(
                         new EncodeObject(bodyList, "groupId=" + groupId + "&streamId=" + streamId
-                                + "&dt=" + dt + "&cp=snappy" + "&cnt=" + bodyList.size(), idGenerator.getNextId(),
+                                + "&dt=" + dt + "&cp=snappy" + "&cnt=" + bodyList.size() + proxySend,
+                                idGenerator.getNextId(),
                                 this.getMsgtype(), true, groupId), callback, msgUUID, timeout, timeUnit);
             } else {
                 sender.asyncSendMessage(
-                        new EncodeObject(bodyList, "groupId=" + groupId + "&streamId=" + streamId + "&dt=" + dt
-                                + "&cnt=" + bodyList.size(), idGenerator.getNextId(), this.getMsgtype(),
-                                false, groupId), callback, msgUUID, timeout, timeUnit);
+                        new EncodeObject(bodyList,
+                                "groupId=" + groupId + "&streamId=" + streamId + "&dt=" + dt + "&cnt=" + bodyList.size()
+                                        + proxySend, idGenerator.getNextId(), this.getMsgtype(), false, groupId),
+                        callback, msgUUID, timeout, timeUnit);
             }
         }
     }
 
     public void asyncSendMessage(SendMessageCallback callback,
-                                 List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
-                                 long timeout, TimeUnit timeUnit,
-                                 Map<String, String> extraAttrMap) throws ProxysdkException {
+            List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
+            long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) throws ProxysdkException {
+        asyncSendMessage(callback, bodyList, groupId, streamId, dt, msgUUID, timeout, timeUnit, extraAttrMap, false);
+    }
+
+    /**
+     * async send a batch of messages
+     *
+     * @param callback callback can be null
+     * @param bodyList list of messages
+     * @param groupId groupId
+     * @param streamId streamId
+     * @param dt data report time
+     * @param msgUUID msg uuid
+     * @param timeout
+     * @param timeUnit
+     * @param extraAttrMap extra attributes
+     * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
+     * @throws ProxysdkException
+     */
+    public void asyncSendMessage(SendMessageCallback callback,
+            List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
+            long timeout, TimeUnit timeUnit,
+            Map<String, String> extraAttrMap, boolean isProxySend) throws ProxysdkException {
         dt = ProxyUtils.covertZeroDt(dt);
-        if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
-                || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
+        if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt) || !ProxyUtils.isAttrKeysValid(
+                extraAttrMap)) {
             throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
         }
         addIndexCnt(groupId, streamId, bodyList.size());
+        if (isProxySend) {
+            extraAttrMap.put(AttributeConstants.MESSAGE_PROXY_SEND, "true");
+        }
         StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap);
 
         if (msgtype == 7 || msgtype == 8) {
@@ -465,28 +664,60 @@ public class DefaultMessageSender implements MessageSender {
      * @throws ProxysdkException
      */
     @Override
-    public void asyncSendMessage(String inlongGroupId, String inlongStreamId, byte[] body,
-                                 SendMessageCallback callback) throws ProxysdkException {
+    public void asyncSendMessage(String inlongGroupId, String inlongStreamId, byte[] body, SendMessageCallback callback)
+            throws ProxysdkException {
         this.asyncSendMessage(callback, body, inlongGroupId, inlongStreamId, System.currentTimeMillis(),
                 idGenerator.getNextId(), DEFAULT_SEND_TIMEOUT, DEFAULT_SEND_TIMEUNIT);
     }
 
     /**
-     * asyncSendMessage
+     * async send single message
      *
-     * @param inlongGroupId
-     * @param inlongStreamId
-     * @param bodyList
-     * @param callback
+     * @param inlongGroupId groupId
+     * @param inlongStreamId streamId
+     * @param body a single message
+     * @param callback callback can be null
+     * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
+     * @throws ProxysdkException
+     */
+    public void asyncSendMessage(String inlongGroupId, String inlongStreamId, byte[] body, SendMessageCallback callback,
+            boolean isProxySend) throws ProxysdkException {
+        this.asyncSendMessage(callback, body, inlongGroupId, inlongStreamId, System.currentTimeMillis(),
+                idGenerator.getNextId(), DEFAULT_SEND_TIMEOUT, DEFAULT_SEND_TIMEUNIT, isProxySend);
+    }
+
+    /**
+     * async send a batch of messages
+     *
+     * @param inlongGroupId groupId
+     * @param inlongStreamId streamId
+     * @param bodyList list of messages
+     * @param callback callback can be null
      * @throws ProxysdkException
      */
     @Override
     public void asyncSendMessage(String inlongGroupId, String inlongStreamId, List<byte[]> bodyList,
-                                 SendMessageCallback callback) throws ProxysdkException {
+            SendMessageCallback callback) throws ProxysdkException {
         this.asyncSendMessage(callback, bodyList, inlongGroupId, inlongStreamId, System.currentTimeMillis(),
                 idGenerator.getNextId(), DEFAULT_SEND_TIMEOUT, DEFAULT_SEND_TIMEUNIT);
     }
 
+    /**
+     * async send a batch of messages
+     *
+     * @param inlongGroupId groupId
+     * @param inlongStreamId streamId
+     * @param bodyList list of messages
+     * @param callback callback can be null
+     * @param isProxySend true: dataproxy doesn't return response message until data is sent to MQ
+     * @throws ProxysdkException
+     */
+    public void asyncSendMessage(String inlongGroupId, String inlongStreamId, List<byte[]> bodyList,
+            SendMessageCallback callback, boolean isProxySend) throws ProxysdkException {
+        this.asyncSendMessage(callback, bodyList, inlongGroupId, inlongStreamId, System.currentTimeMillis(),
+                idGenerator.getNextId(), DEFAULT_SEND_TIMEOUT, DEFAULT_SEND_TIMEUNIT, isProxySend);
+    }
+
     private void addIndexCnt(String groupId, String streamId, long cnt) {
         try {
             String key = groupId + "|" + streamId;
@@ -501,10 +732,10 @@ public class DefaultMessageSender implements MessageSender {
         }
     }
 
-    public void asyncsendMessageData(FileCallback callback, List<byte[]> bodyList, String groupId,
-                                     String streamId, long dt, int sid, boolean isSupportLF, String msgUUID,
-                                     long timeout, TimeUnit timeUnit,
-                                     Map<String, String> extraAttrMap) throws ProxysdkException {
+    @Deprecated
+    public void asyncsendMessageData(FileCallback callback, List<byte[]> bodyList, String groupId, String streamId,
+            long dt, int sid, boolean isSupportLF, String msgUUID, long timeout, TimeUnit timeUnit,
+            Map<String, String> extraAttrMap) throws ProxysdkException {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
                 || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
@@ -523,9 +754,9 @@ public class DefaultMessageSender implements MessageSender {
         }
     }
 
-    private void asyncSendMetric(FileCallback callback, byte[] body, String groupId,
-                                 String streamId, long dt, int sid, String ip, String msgUUID,
-                                 long timeout, TimeUnit timeUnit, String messageKey) throws ProxysdkException {
+    @Deprecated
+    private void asyncSendMetric(FileCallback callback, byte[] body, String groupId, String streamId, long dt, int sid,
+            String ip, String msgUUID, long timeout, TimeUnit timeUnit, String messageKey) throws ProxysdkException {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
             throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
@@ -538,23 +769,23 @@ public class DefaultMessageSender implements MessageSender {
         }
     }
 
-    public void asyncsendMessageProxy(FileCallback callback, byte[] body, String groupId, String streamId,
-                                      long dt, int sid, String ip, String msgUUID,
-                                      long timeout, TimeUnit timeUnit) throws ProxysdkException {
+    @Deprecated
+    public void asyncsendMessageProxy(FileCallback callback, byte[] body, String groupId, String streamId, long dt,
+            int sid, String ip, String msgUUID, long timeout, TimeUnit timeUnit) throws ProxysdkException {
         asyncSendMetric(callback, body, groupId, streamId, dt, sid, ip, msgUUID, timeout,
                 timeUnit, "minute");
     }
 
-    public void asyncsendMessageFile(FileCallback callback, byte[] body, String groupId,
-                                     String streamId, long dt, int sid, String msgUUID,
-                                     long timeout, TimeUnit timeUnit) throws ProxysdkException {
+    @Deprecated
+    public void asyncsendMessageFile(FileCallback callback, byte[] body, String groupId, String streamId, long dt,
+            int sid, String msgUUID, long timeout, TimeUnit timeUnit) throws ProxysdkException {
         asyncSendMetric(callback, body, groupId, streamId, dt, sid, "", msgUUID, timeout, timeUnit,
                 "file");
     }
 
-    public String sendMessageData(List<byte[]> bodyList, String groupId,
-                                  String streamId, long dt, int sid, boolean isSupportLF, String msgUUID,
-                                  long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) {
+    @Deprecated
+    public String sendMessageData(List<byte[]> bodyList, String groupId, String streamId, long dt, int sid,
+            boolean isSupportLF, String msgUUID, long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
                 || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
@@ -574,9 +805,9 @@ public class DefaultMessageSender implements MessageSender {
         return null;
     }
 
-    private String sendMetric(byte[] body, String groupId, String streamId, long dt, int sid, String ip,
-                              String msgUUID,
-                              long timeout, TimeUnit timeUnit, String messageKey) {
+    @Deprecated
+    private String sendMetric(byte[] body, String groupId, String streamId, long dt, int sid, String ip, String msgUUID,
+            long timeout, TimeUnit timeUnit, String messageKey) {
         dt = ProxyUtils.covertZeroDt(dt);
         if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
             return SendResult.INVALID_ATTRIBUTES.toString();
@@ -589,14 +820,15 @@ public class DefaultMessageSender implements MessageSender {
         return null;
     }
 
-    public String sendMessageProxy(byte[] body, String groupId, String streamId,
-                                   long dt, int sid, String ip, String msgUUID,
-                                   long timeout, TimeUnit timeUnit) {
+    @Deprecated
+    public String sendMessageProxy(byte[] body, String groupId, String streamId, long dt, int sid, String ip,
+            String msgUUID, long timeout, TimeUnit timeUnit) {
         return sendMetric(body, groupId, streamId, dt, sid, ip, msgUUID, timeout, timeUnit, "minute");
     }
 
+    @Deprecated
     public String sendMessageFile(byte[] body, String groupId, String streamId, long dt, int sid, String msgUUID,
-                                  long timeout, TimeUnit timeUnit) {
+            long timeout, TimeUnit timeUnit) {
         return sendMetric(body, groupId, streamId, dt, sid, "", msgUUID, timeout, timeUnit, "file");
     }
 
diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
index df1c4939a..fd6b0f275 100644
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
@@ -39,7 +39,8 @@ public class ProxyUtils {
     static {
         Collections.addAll(invalidAttr, "groupId", "streamId", "dt", "msgUUID", "cp",
                 "cnt", "mt", "m", "sid", "t", "NodeIP", "messageId", "_file_status_check", "_secretId",
-                "_signature", "_timeStamp", "_nonce", "_userName", "_clientIP", "_encyVersion", "_encyAesKey");
+                "_signature", "_timeStamp", "_nonce", "_userName", "_clientIP", "_encyVersion", "_encyAesKey",
+                "proxySend", "errMsg", "errCode");
     }
 
     public static boolean isAttrKeysValid(Map<String, String> attrsMap) {


[inlong] 04/07: [INLONG-6456][Dashboard] Supports management of Oracle sources (#6469)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 077a7e08c92cae66f61b8885b71680bbff2b01d6
Author: Lizhen <88...@users.noreply.github.com>
AuthorDate: Tue Nov 8 18:23:00 2022 +0800

    [INLONG-6456][Dashboard] Supports management of Oracle sources (#6469)
---
 inlong-dashboard/src/locales/cn.json               |  10 ++
 inlong-dashboard/src/locales/en.json               |  10 ++
 .../src/metas/sources/defaults/Oracle.ts           | 161 +++++++++++++++++++++
 .../src/metas/sources/defaults/index.ts            |   5 +
 4 files changed, 186 insertions(+)

diff --git a/inlong-dashboard/src/locales/cn.json b/inlong-dashboard/src/locales/cn.json
index 3d74445f9..c5ad9c67a 100644
--- a/inlong-dashboard/src/locales/cn.json
+++ b/inlong-dashboard/src/locales/cn.json
@@ -39,6 +39,16 @@
   "meta.Sources.Mongodb.Database": "数据库名",
   "meta.Sources.Mongodb.Collection": "集合名称",
   "meta.Sources.Mongodb.PrimaryKey": "主键",
+  "meta.Sources.Oracle.Hostname": "服务器主机",
+  "meta.Sources.Oracle.Port": "端口",
+  "meta.Sources.Oracle.Username": "用户",
+  "meta.Sources.Oracle.Password": "密码",
+  "meta.Sources.Oracle.Database": "数据库名",
+  "meta.Sources.Oracle.SchemaName": "集合名称",
+  "meta.Sources.Oracle.TableName": "表格名称",
+  "meta.Sources.Oracle.AllMigration": "是否整库迁移",
+  "meta.Sources.Oracle.ScanStartupMode": "扫描启动模式",
+  "meta.Sources.Oracle.PrimaryKey": "主键",
   "meta.Sinks.SinkName": "名称",
   "meta.Sinks.SinkNameRule": "以英文字母开头,只能包含英文字母、数字、中划线、下划线",
   "meta.Sinks.SinkType": "类型",
diff --git a/inlong-dashboard/src/locales/en.json b/inlong-dashboard/src/locales/en.json
index 5c7ba9880..dc7f103a1 100644
--- a/inlong-dashboard/src/locales/en.json
+++ b/inlong-dashboard/src/locales/en.json
@@ -39,6 +39,16 @@
   "meta.Sources.Mongodb.Database": "Database",
   "meta.Sources.Mongodb.Collection": "Collection",
   "meta.Sources.Mongodb.PrimaryKey": "PrimaryKey",
+  "meta.Sources.Oracle.Hostname": "Hostname",
+  "meta.Sources.Oracle.Port": "Port",
+  "meta.Sources.Oracle.Username": "Username",
+  "meta.Sources.Oracle.Password": "Password",
+  "meta.Sources.Oracle.Database": "Database",
+  "meta.Sources.Oracle.SchemaName": "SchemaName",
+  "meta.Sources.Oracle.TableName": "TableName",
+  "meta.Sources.Oracle.AllMigration": "AllMigration",
+  "meta.Sources.Oracle.ScanStartupMode": "Scan startup mode",
+  "meta.Sources.Oracle.PrimaryKey": "PrimaryKey",
   "meta.Sinks.SinkName": "Name",
   "meta.Sinks.SinkNameRule": "At the beginning of English letters, only English letters, numbers, minus, and underscores",
   "meta.Sinks.SinkType": "Type",
diff --git a/inlong-dashboard/src/metas/sources/defaults/Oracle.ts b/inlong-dashboard/src/metas/sources/defaults/Oracle.ts
new file mode 100644
index 000000000..2cc458e90
--- /dev/null
+++ b/inlong-dashboard/src/metas/sources/defaults/Oracle.ts
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import { DataWithBackend } from '@/metas/DataWithBackend';
+import { RenderRow } from '@/metas/RenderRow';
+import { RenderList } from '@/metas/RenderList';
+import { SourceInfo } from '../common/SourceInfo';
+import i18n from '@/i18n';
+
+const { I18n } = DataWithBackend;
+const { FieldDecorator } = RenderRow;
+const { ColumnDecorator } = RenderList;
+
+export default class OracleSource
+  extends SourceInfo
+  implements DataWithBackend, RenderRow, RenderList
+{
+  @FieldDecorator({
+    type: 'input',
+    rules: [{ required: true }],
+    props: values => ({
+      disabled: values?.status === 101,
+    }),
+  })
+  @ColumnDecorator()
+  @I18n('meta.Sources.Oracle.Hostname')
+  hostname: string;
+
+  @FieldDecorator({
+    type: 'inputnumber',
+    rules: [{ required: true }],
+    initialValue: 1521,
+    props: values => ({
+      min: 1,
+      max: 65535,
+      disabled: values?.status === 101,
+    }),
+  })
+  @ColumnDecorator()
+  @I18n('meta.Sources.Oracle.Port')
+  port: number;
+
+  @FieldDecorator({
+    type: 'input',
+    rules: [{ required: true }],
+    props: values => ({
+      disabled: values?.status === 101,
+    }),
+  })
+  @ColumnDecorator()
+  @I18n('meta.Sources.Oracle.Username')
+  username: string;
+
+  @FieldDecorator({
+    type: 'password',
+    rules: [{ required: true }],
+    props: values => ({
+      disabled: values?.status === 101,
+    }),
+  })
+  @I18n('meta.Sources.Oracle.Password')
+  password: string;
+
+  @FieldDecorator({
+    type: 'input',
+    rules: [{ required: true }],
+    props: values => ({
+      disabled: values?.status === 101,
+    }),
+  })
+  @I18n('meta.Sources.Oracle.Database')
+  database: string;
+
+  @FieldDecorator({
+    type: 'input',
+    rules: [{ required: true }],
+    props: values => ({
+      disabled: values?.status === 101,
+    }),
+  })
+  @I18n('meta.Sources.Oracle.SchemaName')
+  schemaName: string;
+
+  @FieldDecorator({
+    type: 'input',
+    rules: [{ required: true }],
+    props: values => ({
+      disabled: values?.status === 101,
+    }),
+  })
+  @I18n('meta.Sources.Oracle.TableName')
+  tableName: string;
+
+  @FieldDecorator({
+    type: 'radio',
+    rules: [{ required: true }],
+    initialValue: false,
+    props: values => ({
+      disabled: values?.status === 101,
+      options: [
+        {
+          label: i18n.t('basic.Yes'),
+          value: true,
+        },
+        {
+          label: i18n.t('basic.No'),
+          value: false,
+        },
+      ],
+    }),
+  })
+  @I18n('meta.Sources.Oracle.AllMigration')
+  allMigration: boolean;
+
+  @FieldDecorator({
+    type: 'radio',
+    initialValue: 'initial',
+    rules: [{ required: true }],
+    props: values => ({
+      disabled: values?.status === 101,
+      options: [
+        {
+          label: 'initial',
+          value: 'initial',
+        },
+        {
+          label: 'latest-offset',
+          value: 'latest-offset',
+        },
+      ],
+    }),
+  })
+  @I18n('meta.Sources.Oracle.ScanStartupMode')
+  scanStartupMode: string;
+
+  @FieldDecorator({
+    type: 'input',
+    rules: [{ required: true }],
+    props: values => ({
+      disabled: values?.status === 101,
+    }),
+  })
+  @I18n('meta.Sources.Oracle.PrimaryKey')
+  primaryKey: string;
+}
diff --git a/inlong-dashboard/src/metas/sources/defaults/index.ts b/inlong-dashboard/src/metas/sources/defaults/index.ts
index 8f2843153..881a7ff9a 100644
--- a/inlong-dashboard/src/metas/sources/defaults/index.ts
+++ b/inlong-dashboard/src/metas/sources/defaults/index.ts
@@ -46,4 +46,9 @@ export const allDefaultSources: MetaExportWithBackendList<SourceMetaType> = [
     value: 'MONGODB',
     LoadEntity: () => import('./Mongodb'),
   },
+  {
+    label: 'Oracle',
+    value: 'ORACLE',
+    LoadEntity: () => import('./Oracle'),
+  },
 ];


[inlong] 02/07: [INLONG-6409][Sort] Unspported Time and Timestamp iceberg auto create table in spark session query (#6424)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 19ba51d4a9d609b520c4b18f7cab14024c74894f
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Tue Nov 8 16:00:05 2022 +0800

    [INLONG-6409][Sort] Unspported Time and Timestamp iceberg auto create table in spark session query (#6424)
    
    Co-authored-by: thesumery <15...@qq.com>
---
 .../org/apache/inlong/sort/base/Constants.java     |   7 +
 .../base/format/AbstractDynamicSchemaFormat.java   |   7 -
 .../base/format/CanalJsonDynamicSchemaFormat.java  |  25 +-
 .../format/DebeziumJsonDynamicSchemaFormat.java    |  24 +-
 .../base/format/DynamicSchemaFormatFactory.java    |  39 +-
 .../sort/base/format/JsonDynamicSchemaFormat.java  |  50 ++-
 .../sort/base/format/JsonToRowDataConverters.java  | 414 +++++++++++++++++++++
 .../inlong/sort/base/sink/MultipleSinkOption.java  |  24 +-
 .../format/CanalJsonDynamicSchemaFormatTest.java   |  12 +-
 .../DebeziumJsonDynamicSchemaFormatTest.java       |   3 +-
 ...eziumJsonDynamicSchemaFormatWithSchemaTest.java |   3 +-
 .../sort/doris/table/DorisDynamicTableFactory.java |   6 +-
 .../sort/iceberg/FlinkDynamicTableFactory.java     |   2 +
 .../inlong/sort/iceberg/IcebergTableSink.java      |   2 +
 .../sink/multiple/DynamicSchemaHandleOperator.java |   4 +-
 .../sort/kafka/table/KafkaDynamicTableFactory.java |   5 +-
 16 files changed, 539 insertions(+), 88 deletions(-)

diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index 19c58e9c1..9cc3c979d 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -163,4 +163,11 @@ public final class Constants {
                     .booleanType()
                     .defaultValue(true)
                     .withDescription("Whether ignore the single table erros when multiple sink writing scenario.");
+
+    public static final ConfigOption<Boolean> SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK =
+            ConfigOptions.key("sink.multiple.typemap-compatible-with-spark")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Because spark do not support iceberg data type: `timestamp without time zone` and"
+                            + "`time`, so type conversions must be mapped to types supported by spark.");
 }
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/AbstractDynamicSchemaFormat.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/AbstractDynamicSchemaFormat.java
index 2def7bb4b..f32b0086e 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/AbstractDynamicSchemaFormat.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/AbstractDynamicSchemaFormat.java
@@ -187,11 +187,4 @@ public abstract class AbstractDynamicSchemaFormat<T> {
      * @throws IOException The exception will throws
      */
     public abstract String parse(T data, String pattern) throws IOException;
-
-    /**
-     * Get the identifier of this dynamic schema format
-     *
-     * @return The identifier of this dynamic schema format
-     */
-    public abstract String identifier();
 }
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java
index 88e51df7b..81256eda1 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java
@@ -17,22 +17,22 @@
 
 package org.apache.inlong.sort.base.format;
 
-import org.apache.flink.formats.json.JsonToRowDataConverters.JsonToRowDataConverter;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.RowKind;
+import org.apache.inlong.sort.base.format.JsonToRowDataConverters.JsonToRowDataConverter;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Canal json dynamic format
  */
 public class CanalJsonDynamicSchemaFormat extends JsonDynamicSchemaFormat {
 
-    private static final String IDENTIFIER = "canal-json";
     private static final String DDL_FLAG = "ddl";
     private static final String DATA = "data";
     private static final String OLD = "old";
@@ -43,15 +43,8 @@ public class CanalJsonDynamicSchemaFormat extends JsonDynamicSchemaFormat {
     private static final String OP_UPDATE = "UPDATE";
     private static final String OP_DELETE = "DELETE";
 
-    private static final CanalJsonDynamicSchemaFormat FORMAT = new CanalJsonDynamicSchemaFormat();
-
-    private CanalJsonDynamicSchemaFormat() {
-
-    }
-
-    @SuppressWarnings("rawtypes")
-    public static AbstractDynamicSchemaFormat getInstance() {
-        return FORMAT;
+    protected CanalJsonDynamicSchemaFormat(Map<String, String> props) {
+        super(props);
     }
 
     @Override
@@ -170,14 +163,4 @@ public class CanalJsonDynamicSchemaFormat extends JsonDynamicSchemaFormat {
 
         return rowDataList;
     }
-
-    /**
-     * Get the identifier of this dynamic schema format
-     *
-     * @return The identifier of this dynamic schema format
-     */
-    @Override
-    public String identifier() {
-        return IDENTIFIER;
-    }
 }
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java
index 9841d01f6..d535b87d1 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java
@@ -17,7 +17,6 @@
 
 package org.apache.inlong.sort.base.format;
 
-import org.apache.flink.formats.json.JsonToRowDataConverters.JsonToRowDataConverter;
 import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.table.data.RowData;
@@ -33,6 +32,7 @@ import org.apache.flink.table.types.logical.TinyIntType;
 import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.types.RowKind;
+import org.apache.inlong.sort.base.format.JsonToRowDataConverters.JsonToRowDataConverter;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -44,7 +44,6 @@ import java.util.Map;
  */
 public class DebeziumJsonDynamicSchemaFormat extends JsonDynamicSchemaFormat {
 
-    private static final String IDENTIFIER = "debezium-json";
     private static final String DDL_FLAG = "ddl";
     private static final String SCHEMA = "schema";
     private static final String SQL_TYPE = "sqlType";
@@ -87,15 +86,8 @@ public class DebeziumJsonDynamicSchemaFormat extends JsonDynamicSchemaFormat {
                     .put("BYTES", new VarBinaryType())
                     .build();
 
-    private static final DebeziumJsonDynamicSchemaFormat FORMAT = new DebeziumJsonDynamicSchemaFormat();
-
-    private DebeziumJsonDynamicSchemaFormat() {
-
-    }
-
-    @SuppressWarnings("rawtypes")
-    public static AbstractDynamicSchemaFormat getInstance() {
-        return FORMAT;
+    protected DebeziumJsonDynamicSchemaFormat(Map<String, String> props) {
+        super(props);
     }
 
     @Override
@@ -289,16 +281,6 @@ public class DebeziumJsonDynamicSchemaFormat extends JsonDynamicSchemaFormat {
         return extractRowData(payload, rowType);
     }
 
-    /**
-     * Get the identifier of this dynamic schema format
-     *
-     * @return The identifier of this dynamic schema format
-     */
-    @Override
-    public String identifier() {
-        return IDENTIFIER;
-    }
-
     private LogicalType debeziumType2FlinkType(String debeziumType) {
         if (DEBEZIUM_TYPE_2_FLINK_TYPE_MAPPING.containsKey(debeziumType.toUpperCase())) {
             return DEBEZIUM_TYPE_2_FLINK_TYPE_MAPPING.get(debeziumType.toUpperCase());
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DynamicSchemaFormatFactory.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DynamicSchemaFormatFactory.java
index 65a5b5e78..c260bdeaa 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DynamicSchemaFormatFactory.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DynamicSchemaFormatFactory.java
@@ -17,26 +17,36 @@
 
 package org.apache.inlong.sort.base.format;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.flink.util.Preconditions;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
 
 /**
  * Dynamic schema format factory
  */
 public class DynamicSchemaFormatFactory {
 
-    public static final List<AbstractDynamicSchemaFormat<?>> SUPPORT_FORMATS =
-            new ArrayList<AbstractDynamicSchemaFormat<?>>() {
+    public static Map<String, Function<Map<String, String>, AbstractDynamicSchemaFormat>> SUPPORT_FORMATS =
+            ImmutableMap.of(
+                    "canal-json", props -> new CanalJsonDynamicSchemaFormat(props),
+                    "debezium-json", props -> new DebeziumJsonDynamicSchemaFormat(props)
+            );
 
-                private static final long serialVersionUID = 1L;
+    /**
+     * Get format from the format name, it only supports [canal-json|debezium-json] for now
+     *
+     * @param identifier The identifier of this format
+     * @return The dynamic format
+     */
+    @SuppressWarnings("rawtypes")
+    public static AbstractDynamicSchemaFormat getFormat(String identifier) {
+        return getFormat(identifier, new HashMap<>());
+    }
 
-                {
-                    add(CanalJsonDynamicSchemaFormat.getInstance());
-                    add(DebeziumJsonDynamicSchemaFormat.getInstance());
-                }
-            };
 
     /**
      * Get format from the format name, it only supports [canal-json|debezium-json] for now
@@ -45,10 +55,11 @@ public class DynamicSchemaFormatFactory {
      * @return The dynamic format
      */
     @SuppressWarnings("rawtypes")
-    public static AbstractDynamicSchemaFormat getFormat(String identifier) {
+    public static AbstractDynamicSchemaFormat getFormat(String identifier, Map<String, String> properties) {
         Preconditions.checkNotNull(identifier, "The identifier is null");
-        return Preconditions.checkNotNull(SUPPORT_FORMATS.stream().filter(s -> s.identifier().equals(identifier))
-                .findFirst().orElse(null), "Unsupport dynamic schema format for:" + identifier);
+        return Optional.ofNullable(SUPPORT_FORMATS.get(identifier))
+                .orElseThrow(() ->
+                        new UnsupportedOperationException("Unsupport dynamic schema format for:" + identifier))
+                .apply(properties);
     }
-
 }
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
index a0d564d0c..e6fc75528 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
@@ -17,8 +17,9 @@
 
 package org.apache.inlong.sort.base.format;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.formats.common.TimestampFormat;
-import org.apache.flink.formats.json.JsonToRowDataConverters;
 import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -51,6 +52,8 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.regex.Matcher;
 
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK;
+
 /**
  * Json dynamic format class
  * This class main handle:
@@ -64,6 +67,7 @@ import java.util.regex.Matcher;
  * 3). give a pattern "prefix_${a}_{b}_{c}_suffix" and the root Node contains the keys(a: '1', b: '2', c: '3')
  * the result of pared will be 'prefix_1_2_3_suffix'
  */
+@SuppressWarnings("LanguageDetectionInspection")
 public abstract class JsonDynamicSchemaFormat extends AbstractDynamicSchemaFormat<JsonNode> {
 
     /**
@@ -71,7 +75,7 @@ public abstract class JsonDynamicSchemaFormat extends AbstractDynamicSchemaForma
      */
     private static final Integer FIRST = 0;
 
-    private static final Map<Integer, LogicalType> SQL_TYPE_2_ICEBERG_TYPE_MAPPING =
+    private static final Map<Integer, LogicalType> SQL_TYPE_2_FLINK_TYPE_MAPPING =
             ImmutableMap.<Integer, LogicalType>builder()
                     .put(java.sql.Types.CHAR, new CharType())
                     .put(java.sql.Types.VARCHAR, new VarCharType())
@@ -94,12 +98,44 @@ public abstract class JsonDynamicSchemaFormat extends AbstractDynamicSchemaForma
                     .put(java.sql.Types.BOOLEAN, new BooleanType())
                     .put(java.sql.Types.OTHER, new VarCharType())
                     .build();
+
+    private static final Map<Integer, LogicalType> SQL_TYPE_2_SPARK_SUPPORTED_FLINK_TYPE_MAPPING =
+            ImmutableMap.<Integer, LogicalType>builder()
+                    .put(java.sql.Types.CHAR, new CharType())
+                    .put(java.sql.Types.VARCHAR, new VarCharType())
+                    .put(java.sql.Types.SMALLINT, new SmallIntType())
+                    .put(java.sql.Types.INTEGER, new IntType())
+                    .put(java.sql.Types.BIGINT, new BigIntType())
+                    .put(java.sql.Types.REAL, new FloatType())
+                    .put(java.sql.Types.DOUBLE, new DoubleType())
+                    .put(java.sql.Types.FLOAT, new FloatType())
+                    .put(java.sql.Types.DECIMAL, new DecimalType())
+                    .put(java.sql.Types.NUMERIC, new DecimalType())
+                    .put(java.sql.Types.BIT, new BooleanType())
+                    .put(java.sql.Types.TIME, new VarCharType())
+                    .put(java.sql.Types.TIMESTAMP_WITH_TIMEZONE, new LocalZonedTimestampType())
+                    .put(java.sql.Types.TIMESTAMP, new LocalZonedTimestampType())
+                    .put(java.sql.Types.BINARY, new BinaryType())
+                    .put(java.sql.Types.VARBINARY, new VarBinaryType())
+                    .put(java.sql.Types.BLOB, new VarBinaryType())
+                    .put(java.sql.Types.DATE, new DateType())
+                    .put(java.sql.Types.BOOLEAN, new BooleanType())
+                    .put(java.sql.Types.OTHER, new VarCharType())
+                    .build();
+
     public final ObjectMapper objectMapper = new ObjectMapper();
     protected final JsonToRowDataConverters rowDataConverters;
+    protected final boolean adaptSparkEngine;
 
-    protected JsonDynamicSchemaFormat() {
+    public JsonDynamicSchemaFormat(Map<String, String> properties) {
+        ReadableConfig config = Configuration.fromMap(properties);
+        this.adaptSparkEngine = config.get(SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK);
         this.rowDataConverters =
-                new JsonToRowDataConverters(false, false, TimestampFormat.ISO_8601);
+                new JsonToRowDataConverters(
+                        false,
+                        false,
+                        TimestampFormat.ISO_8601,
+                        adaptSparkEngine);
     }
 
     /**
@@ -286,8 +322,10 @@ public abstract class JsonDynamicSchemaFormat extends AbstractDynamicSchemaForma
     }
 
     private LogicalType sqlType2FlinkType(int jdbcType) {
-        if (SQL_TYPE_2_ICEBERG_TYPE_MAPPING.containsKey(jdbcType)) {
-            return SQL_TYPE_2_ICEBERG_TYPE_MAPPING.get(jdbcType);
+        Map<Integer, LogicalType> typeMap = adaptSparkEngine
+                ? SQL_TYPE_2_SPARK_SUPPORTED_FLINK_TYPE_MAPPING : SQL_TYPE_2_FLINK_TYPE_MAPPING;
+        if (typeMap.containsKey(jdbcType)) {
+            return typeMap.get(jdbcType);
         } else {
             throw new IllegalArgumentException("Unsupported jdbcType: " + jdbcType);
         }
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonToRowDataConverters.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonToRowDataConverters.java
new file mode 100644
index 000000000..da6128942
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonToRowDataConverters.java
@@ -0,0 +1,414 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.base.format;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeParseException;
+import java.time.temporal.TemporalAccessor;
+import java.time.temporal.TemporalQueries;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
+import static org.apache.flink.formats.common.TimeFormats.ISO8601_TIMESTAMP_FORMAT;
+import static org.apache.flink.formats.common.TimeFormats.ISO8601_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT;
+import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_FORMAT;
+import static org.apache.flink.formats.common.TimeFormats.SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT;
+import static org.apache.flink.formats.common.TimeFormats.SQL_TIME_FORMAT;
+
+/** Tool class used to convert from {@link JsonNode} to {@link RowData}. * */
+@Internal
+public class JsonToRowDataConverters implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /** Flag indicating whether to fail if a field is missing. */
+    private final boolean failOnMissingField;
+
+    /** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */
+    private final boolean ignoreParseErrors;
+
+    /** Timestamp format specification which is used to parse timestamp. */
+    private final TimestampFormat timestampFormat;
+
+    /** Wherther adapt spark sql program. */
+    private final boolean adaptSpark;
+
+    public JsonToRowDataConverters(
+            boolean failOnMissingField,
+            boolean ignoreParseErrors,
+            TimestampFormat timestampFormat,
+            boolean adaptSpark) {
+        this.failOnMissingField = failOnMissingField;
+        this.ignoreParseErrors = ignoreParseErrors;
+        this.timestampFormat = timestampFormat;
+        this.adaptSpark = adaptSpark;
+    }
+
+    /**
+     * Runtime converter that converts {@link JsonNode}s into objects of Flink Table & SQL internal
+     * data structures.
+     */
+    @FunctionalInterface
+    public interface JsonToRowDataConverter extends Serializable {
+        Object convert(JsonNode jsonNode);
+    }
+
+    /** Creates a runtime converter which is null safe. */
+    public JsonToRowDataConverter createConverter(LogicalType type) {
+        return wrapIntoNullableConverter(createNotNullConverter(type));
+    }
+
+    /** Creates a runtime converter which assuming input object is not null. */
+    private JsonToRowDataConverter createNotNullConverter(LogicalType type) {
+        switch (type.getTypeRoot()) {
+            case NULL:
+                return jsonNode -> null;
+            case BOOLEAN:
+                return this::convertToBoolean;
+            case TINYINT:
+                return jsonNode -> Byte.parseByte(jsonNode.asText().trim());
+            case SMALLINT:
+                return jsonNode -> Short.parseShort(jsonNode.asText().trim());
+            case INTEGER:
+            case INTERVAL_YEAR_MONTH:
+                return this::convertToInt;
+            case BIGINT:
+            case INTERVAL_DAY_TIME:
+                return this::convertToLong;
+            case DATE:
+                return this::convertToDate;
+            case TIME_WITHOUT_TIME_ZONE:
+                return this::convertToTime;
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return this::convertToTimestamp;
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                if (adaptSpark) {
+                  return  jsonNode -> {
+                      try {
+                          return convertToTimestampWithLocalZone(jsonNode);
+                      } catch (DateTimeParseException e) {
+                          return convertToTimestamp(jsonNode);
+                      }
+                  };
+                }
+                return this::convertToTimestampWithLocalZone;
+            case FLOAT:
+                return this::convertToFloat;
+            case DOUBLE:
+                return this::convertToDouble;
+            case CHAR:
+            case VARCHAR:
+                return this::convertToString;
+            case BINARY:
+            case VARBINARY:
+                return this::convertToBytes;
+            case DECIMAL:
+                return createDecimalConverter((DecimalType) type);
+            case ARRAY:
+                return createArrayConverter((ArrayType) type);
+            case MAP:
+                MapType mapType = (MapType) type;
+                return createMapConverter(
+                        mapType.asSummaryString(), mapType.getKeyType(), mapType.getValueType());
+            case MULTISET:
+                MultisetType multisetType = (MultisetType) type;
+                return createMapConverter(
+                        multisetType.asSummaryString(),
+                        multisetType.getElementType(),
+                        new IntType());
+            case ROW:
+                return createRowConverter((RowType) type);
+            case RAW:
+            default:
+                throw new UnsupportedOperationException("Unsupported type: " + type);
+        }
+    }
+
+    private boolean convertToBoolean(JsonNode jsonNode) {
+        if (jsonNode.isBoolean()) {
+            // avoid redundant toString and parseBoolean, for better performance
+            return jsonNode.asBoolean();
+        } else {
+            return Boolean.parseBoolean(jsonNode.asText().trim());
+        }
+    }
+
+    private int convertToInt(JsonNode jsonNode) {
+        if (jsonNode.canConvertToInt()) {
+            // avoid redundant toString and parseInt, for better performance
+            return jsonNode.asInt();
+        } else {
+            return Integer.parseInt(jsonNode.asText().trim());
+        }
+    }
+
+    private long convertToLong(JsonNode jsonNode) {
+        if (jsonNode.canConvertToLong()) {
+            // avoid redundant toString and parseLong, for better performance
+            return jsonNode.asLong();
+        } else {
+            return Long.parseLong(jsonNode.asText().trim());
+        }
+    }
+
+    private double convertToDouble(JsonNode jsonNode) {
+        if (jsonNode.isDouble()) {
+            // avoid redundant toString and parseDouble, for better performance
+            return jsonNode.asDouble();
+        } else {
+            return Double.parseDouble(jsonNode.asText().trim());
+        }
+    }
+
+    private float convertToFloat(JsonNode jsonNode) {
+        if (jsonNode.isDouble()) {
+            // avoid redundant toString and parseDouble, for better performance
+            return (float) jsonNode.asDouble();
+        } else {
+            return Float.parseFloat(jsonNode.asText().trim());
+        }
+    }
+
+    private int convertToDate(JsonNode jsonNode) {
+        LocalDate date = ISO_LOCAL_DATE.parse(jsonNode.asText()).query(TemporalQueries.localDate());
+        return (int) date.toEpochDay();
+    }
+
+    private int convertToTime(JsonNode jsonNode) {
+        TemporalAccessor parsedTime = SQL_TIME_FORMAT.parse(jsonNode.asText());
+        LocalTime localTime = parsedTime.query(TemporalQueries.localTime());
+
+        // get number of milliseconds of the day
+        return localTime.toSecondOfDay() * 1000;
+    }
+
+    private TimestampData convertToTimestamp(JsonNode jsonNode) {
+        TemporalAccessor parsedTimestamp;
+        switch (timestampFormat) {
+            case SQL:
+                parsedTimestamp = SQL_TIMESTAMP_FORMAT.parse(jsonNode.asText());
+                break;
+            case ISO_8601:
+                parsedTimestamp = ISO8601_TIMESTAMP_FORMAT.parse(jsonNode.asText());
+                break;
+            default:
+                throw new TableException(
+                        String.format(
+                                "Unsupported timestamp format '%s'. Validator should have checked that.",
+                                timestampFormat));
+        }
+        LocalTime localTime = parsedTimestamp.query(TemporalQueries.localTime());
+        LocalDate localDate = parsedTimestamp.query(TemporalQueries.localDate());
+
+        return TimestampData.fromLocalDateTime(LocalDateTime.of(localDate, localTime));
+    }
+
+    private TimestampData convertToTimestampWithLocalZone(JsonNode jsonNode) {
+        TemporalAccessor parsedTimestampWithLocalZone;
+        switch (timestampFormat) {
+            case SQL:
+                parsedTimestampWithLocalZone =
+                        SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT.parse(jsonNode.asText());
+                break;
+            case ISO_8601:
+                parsedTimestampWithLocalZone =
+                        ISO8601_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT.parse(jsonNode.asText());
+                break;
+            default:
+                throw new TableException(
+                        String.format(
+                                "Unsupported timestamp format '%s'. Validator should have checked that.",
+                                timestampFormat));
+        }
+        LocalTime localTime = parsedTimestampWithLocalZone.query(TemporalQueries.localTime());
+        LocalDate localDate = parsedTimestampWithLocalZone.query(TemporalQueries.localDate());
+
+        return TimestampData.fromInstant(
+                LocalDateTime.of(localDate, localTime).toInstant(ZoneOffset.UTC));
+    }
+
+    private StringData convertToString(JsonNode jsonNode) {
+        if (jsonNode.isContainerNode()) {
+            return StringData.fromString(jsonNode.toString());
+        } else {
+            return StringData.fromString(jsonNode.asText());
+        }
+    }
+
+    private byte[] convertToBytes(JsonNode jsonNode) {
+        try {
+            return jsonNode.binaryValue();
+        } catch (IOException e) {
+            throw new JsonParseException("Unable to deserialize byte array.", e);
+        }
+    }
+
+    private JsonToRowDataConverter createDecimalConverter(DecimalType decimalType) {
+        final int precision = decimalType.getPrecision();
+        final int scale = decimalType.getScale();
+        return jsonNode -> {
+            BigDecimal bigDecimal;
+            if (jsonNode.isBigDecimal()) {
+                bigDecimal = jsonNode.decimalValue();
+            } else {
+                bigDecimal = new BigDecimal(jsonNode.asText());
+            }
+            return DecimalData.fromBigDecimal(bigDecimal, precision, scale);
+        };
+    }
+
+    private JsonToRowDataConverter createArrayConverter(ArrayType arrayType) {
+        JsonToRowDataConverter elementConverter = createConverter(arrayType.getElementType());
+        final Class<?> elementClass =
+                LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType());
+        return jsonNode -> {
+            final ArrayNode node = (ArrayNode) jsonNode;
+            final Object[] array = (Object[]) Array.newInstance(elementClass, node.size());
+            for (int i = 0; i < node.size(); i++) {
+                final JsonNode innerNode = node.get(i);
+                array[i] = elementConverter.convert(innerNode);
+            }
+            return new GenericArrayData(array);
+        };
+    }
+
+    private JsonToRowDataConverter createMapConverter(
+            String typeSummary, LogicalType keyType, LogicalType valueType) {
+        if (!LogicalTypeChecks.hasFamily(keyType, LogicalTypeFamily.CHARACTER_STRING)) {
+            throw new UnsupportedOperationException(
+                    "JSON format doesn't support non-string as key type of map. "
+                            + "The type is: "
+                            + typeSummary);
+        }
+        final JsonToRowDataConverter keyConverter = createConverter(keyType);
+        final JsonToRowDataConverter valueConverter = createConverter(valueType);
+
+        return jsonNode -> {
+            Iterator<Entry<String, JsonNode>> fields = jsonNode.fields();
+            Map<Object, Object> result = new HashMap<>();
+            while (fields.hasNext()) {
+                Map.Entry<String, JsonNode> entry = fields.next();
+                Object key = keyConverter.convert(TextNode.valueOf(entry.getKey()));
+                Object value = valueConverter.convert(entry.getValue());
+                result.put(key, value);
+            }
+            return new GenericMapData(result);
+        };
+    }
+
+    public JsonToRowDataConverter createRowConverter(RowType rowType) {
+        final JsonToRowDataConverter[] fieldConverters =
+                rowType.getFields().stream()
+                        .map(RowType.RowField::getType)
+                        .map(this::createConverter)
+                        .toArray(JsonToRowDataConverter[]::new);
+        final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
+
+        return jsonNode -> {
+            ObjectNode node = (ObjectNode) jsonNode;
+            int arity = fieldNames.length;
+            GenericRowData row = new GenericRowData(arity);
+            for (int i = 0; i < arity; i++) {
+                String fieldName = fieldNames[i];
+                JsonNode field = node.get(fieldName);
+                Object convertedField = convertField(fieldConverters[i], fieldName, field);
+                row.setField(i, convertedField);
+            }
+            return row;
+        };
+    }
+
+    private Object convertField(
+            JsonToRowDataConverter fieldConverter, String fieldName, JsonNode field) {
+        if (field == null) {
+            if (failOnMissingField) {
+                throw new JsonParseException("Could not find field with name '" + fieldName + "'.");
+            } else {
+                return null;
+            }
+        } else {
+            return fieldConverter.convert(field);
+        }
+    }
+
+    private JsonToRowDataConverter wrapIntoNullableConverter(
+            JsonToRowDataConverter converter) {
+        return jsonNode -> {
+            if (jsonNode == null || jsonNode.isNull() || jsonNode.isMissingNode()) {
+                return null;
+            }
+            try {
+                return converter.convert(jsonNode);
+            } catch (Throwable t) {
+                if (!ignoreParseErrors) {
+                    throw t;
+                }
+                return null;
+            }
+        };
+    }
+
+    /** Exception which refers to parse errors in converters. */
+    private static final class JsonParseException extends RuntimeException {
+        private static final long serialVersionUID = 1L;
+
+        public JsonParseException(String message) {
+            super(message);
+        }
+
+        public JsonParseException(String message, Throwable cause) {
+            super(message, cause);
+        }
+    }
+}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
index 77c924b95..10bc3f3d1 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
@@ -18,11 +18,14 @@
 
 package org.apache.inlong.sort.base.sink;
 
+import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
+import java.util.Map;
 
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK;
 import static org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy.ALERT_WITH_IGNORE;
 import static org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE;
 import static org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy.TRY_IT_BEST;
@@ -37,6 +40,8 @@ public class MultipleSinkOption implements Serializable {
 
     private String format;
 
+    private boolean sparkEngineEnable;
+
     private SchemaUpdateExceptionPolicy schemaUpdatePolicy;
 
     private String databasePattern;
@@ -44,10 +49,12 @@ public class MultipleSinkOption implements Serializable {
     private String tablePattern;
 
     public MultipleSinkOption(String format,
+            boolean sparkEngineEnable,
             SchemaUpdateExceptionPolicy schemaUpdatePolicy,
             String databasePattern,
             String tablePattern) {
         this.format = format;
+        this.sparkEngineEnable = sparkEngineEnable;
         this.schemaUpdatePolicy = schemaUpdatePolicy;
         this.databasePattern = databasePattern;
         this.tablePattern = tablePattern;
@@ -57,6 +64,15 @@ public class MultipleSinkOption implements Serializable {
         return format;
     }
 
+    public boolean isSparkEngineEnable() {
+        return sparkEngineEnable;
+    }
+
+    public Map<String, String> getFormatOption() {
+        return ImmutableMap.of(
+                SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK.key(), String.valueOf(isSparkEngineEnable()));
+    }
+
     public SchemaUpdateExceptionPolicy getSchemaUpdatePolicy() {
         return schemaUpdatePolicy;
     }
@@ -75,6 +91,7 @@ public class MultipleSinkOption implements Serializable {
 
     public static class Builder {
         private String format;
+        private boolean sparkEngineEnable;
         private SchemaUpdateExceptionPolicy schemaUpdatePolicy;
         private String databasePattern;
         private String tablePattern;
@@ -84,6 +101,11 @@ public class MultipleSinkOption implements Serializable {
             return this;
         }
 
+        public MultipleSinkOption.Builder withSparkEngineEnable(boolean sparkEngineEnable) {
+            this.sparkEngineEnable = sparkEngineEnable;
+            return this;
+        }
+
         public MultipleSinkOption.Builder withSchemaUpdatePolicy(SchemaUpdateExceptionPolicy schemaUpdatePolicy) {
             this.schemaUpdatePolicy = schemaUpdatePolicy;
             return this;
@@ -100,7 +122,7 @@ public class MultipleSinkOption implements Serializable {
         }
 
         public MultipleSinkOption build() {
-            return new MultipleSinkOption(format, schemaUpdatePolicy, databasePattern, tablePattern);
+            return new MultipleSinkOption(format, sparkEngineEnable, schemaUpdatePolicy, databasePattern, tablePattern);
         }
     }
 
diff --git a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormatTest.java b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormatTest.java
index dc7ce5ec8..29dbcb6a1 100644
--- a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormatTest.java
+++ b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormatTest.java
@@ -37,6 +37,7 @@ import java.util.Map;
  * Test for {@link CanalJsonDynamicSchemaFormat}
  */
 public class CanalJsonDynamicSchemaFormatTest extends DynamicSchemaFormatBaseTest<JsonNode> {
+    private AbstractDynamicSchemaFormat schemaFormat = DynamicSchemaFormatFactory.getFormat("canal-json");
 
     @Override
     protected String getSource() {
@@ -91,6 +92,11 @@ public class CanalJsonDynamicSchemaFormatTest extends DynamicSchemaFormatBaseTes
         return expectedValues;
     }
 
+    @Override
+    protected AbstractDynamicSchemaFormat<JsonNode> getDynamicSchemaFormat() {
+        return schemaFormat;
+    }
+
     @Test
     @SuppressWarnings({"unchecked"})
     public void testExtractPrimaryKey() throws IOException {
@@ -119,10 +125,4 @@ public class CanalJsonDynamicSchemaFormatTest extends DynamicSchemaFormatBaseTes
                 5.18f));
         Assert.assertEquals(values, rowDataList);
     }
-
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    @Override
-    protected AbstractDynamicSchemaFormat getDynamicSchemaFormat() {
-        return CanalJsonDynamicSchemaFormat.getInstance();
-    }
 }
diff --git a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatTest.java b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatTest.java
index 5e555a657..c0eaface9 100644
--- a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatTest.java
+++ b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatTest.java
@@ -32,6 +32,7 @@ import java.util.Map;
  * Test for {@link DebeziumJsonDynamicSchemaFormat}
  */
 public class DebeziumJsonDynamicSchemaFormatTest extends DynamicSchemaFormatBaseTest<JsonNode> {
+    private AbstractDynamicSchemaFormat schemaFormat = DynamicSchemaFormatFactory.getFormat("debezium-json");
 
     @Override
     protected String getSource() {
@@ -98,6 +99,6 @@ public class DebeziumJsonDynamicSchemaFormatTest extends DynamicSchemaFormatBase
     @SuppressWarnings({"unchecked", "rawtypes"})
     @Override
     protected AbstractDynamicSchemaFormat getDynamicSchemaFormat() {
-        return DebeziumJsonDynamicSchemaFormat.getInstance();
+        return schemaFormat;
     }
 }
diff --git a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatWithSchemaTest.java b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatWithSchemaTest.java
index fb268b9a7..d4a3dd188 100644
--- a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatWithSchemaTest.java
+++ b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatWithSchemaTest.java
@@ -36,6 +36,7 @@ import java.util.Map;
  * Test for {@link DebeziumJsonDynamicSchemaFormat}
  */
 public class DebeziumJsonDynamicSchemaFormatWithSchemaTest extends DynamicSchemaFormatBaseTest<JsonNode> {
+    private AbstractDynamicSchemaFormat schemaFormat = DynamicSchemaFormatFactory.getFormat("debezium-json");
 
     @Override
     protected String getSource() {
@@ -270,6 +271,6 @@ public class DebeziumJsonDynamicSchemaFormatWithSchemaTest extends DynamicSchema
     @SuppressWarnings({"unchecked", "rawtypes"})
     @Override
     protected AbstractDynamicSchemaFormat getDynamicSchemaFormat() {
-        return DebeziumJsonDynamicSchemaFormat.getInstance();
+        return schemaFormat;
     }
 }
diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
index f3c263f93..30ef92b69 100644
--- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
@@ -35,16 +35,13 @@ import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.table.utils.TableSchemaUtils;
-import org.apache.inlong.sort.base.format.AbstractDynamicSchemaFormat;
 import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
 
 import java.time.Duration;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
-import java.util.stream.Collectors;
 import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT;
 import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT;
 import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT;
@@ -332,8 +329,7 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory
                                 + "is not allowed blank when the option 'sink.multiple.enable' is 'true'");
             }
             DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
-            List<String> supportFormats = DynamicSchemaFormatFactory.SUPPORT_FORMATS.stream().map(
-                    AbstractDynamicSchemaFormat::identifier).collect(Collectors.toList());
+            Set<String> supportFormats = DynamicSchemaFormatFactory.SUPPORT_FORMATS.keySet();
             if (!supportFormats.contains(sinkMultipleFormat)) {
                 throw new ValidationException(String.format(
                         "Unsupported value '%s' for '%s'. "
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
index 4852bca33..040e73df6 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
@@ -55,6 +55,7 @@ import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK;
 import static org.apache.inlong.sort.iceberg.FlinkConfigOptions.ICEBERG_IGNORE_ALL_CHANGELOG;
 
 /**
@@ -241,6 +242,7 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, Dynami
         options.add(SINK_MULTIPLE_DATABASE_PATTERN);
         options.add(SINK_MULTIPLE_TABLE_PATTERN);
         options.add(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY);
+        options.add(SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK);
         return options;
     }
 
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
index ee7cf2c89..67ca5fdf2 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
@@ -49,6 +49,7 @@ import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK;
 import static org.apache.inlong.sort.iceberg.FlinkConfigOptions.ICEBERG_IGNORE_ALL_CHANGELOG;
 
 /**
@@ -104,6 +105,7 @@ public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning,
                     .multipleSink(tableOptions.get(SINK_MULTIPLE_ENABLE))
                     .multipleSinkOption(MultipleSinkOption.builder()
                             .withFormat(tableOptions.get(SINK_MULTIPLE_FORMAT))
+                            .withSparkEngineEnable(tableOptions.get(SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK))
                             .withDatabasePattern(tableOptions.get(SINK_MULTIPLE_DATABASE_PATTERN))
                             .withTablePattern(tableOptions.get(SINK_MULTIPLE_TABLE_PATTERN))
                             .withSchemaUpdatePolicy(tableOptions.get(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY))
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
index a0a9092c6..378f040d9 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
@@ -94,7 +94,9 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi
         this.catalog = catalogLoader.loadCatalog();
         this.asNamespaceCatalog =
                 catalog instanceof SupportsNamespaces ? (SupportsNamespaces) catalog : null;
-        this.dynamicSchemaFormat = DynamicSchemaFormatFactory.getFormat(multipleSinkOption.getFormat());
+        this.dynamicSchemaFormat = DynamicSchemaFormatFactory.getFormat(
+                multipleSinkOption.getFormat(), multipleSinkOption.getFormatOption());
+
         this.processingTimeService = getRuntimeContext().getProcessingTimeService();
         processingTimeService.registerTimer(
                 processingTimeService.getCurrentProcessingTime() + HELPER_DEBUG_INTERVEL, this);
diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
index 480d55948..69716dfb6 100644
--- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
@@ -51,7 +51,6 @@ import org.apache.flink.table.formats.raw.RawFormatSerializationSchema;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.types.RowKind;
-import org.apache.inlong.sort.base.format.AbstractDynamicSchemaFormat;
 import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
 import org.apache.inlong.sort.kafka.KafkaDynamicSink;
 import org.apache.inlong.sort.kafka.partitioner.RawDataHashPartitioner;
@@ -66,7 +65,6 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.regex.Pattern;
-import java.util.stream.Collectors;
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.KEY_FIELDS;
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.KEY_FIELDS_PREFIX;
 import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.KEY_FORMAT;
@@ -440,8 +438,7 @@ public class KafkaDynamicTableFactory implements DynamicTableSourceFactory, Dyna
         if (valueEncodingFormat instanceof RawFormatSerializationSchema
                 && StringUtils.isNotBlank(sinkMultipleFormat)) {
             DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
-            List<String> supportFormats = DynamicSchemaFormatFactory.SUPPORT_FORMATS.stream().map(
-                    AbstractDynamicSchemaFormat::identifier).collect(Collectors.toList());
+            Set<String> supportFormats = DynamicSchemaFormatFactory.SUPPORT_FORMATS.keySet();
             if (!supportFormats.contains(sinkMultipleFormat)) {
                 throw new ValidationException(String.format(
                         "Unsupported value '%s' for '%s'. "


[inlong] 03/07: [INLONG-6438][SDK] Parses and handles ack response from DataProxy (#6448)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 1e3212cdb7fe27bdf3140766f3834b80028dacde
Author: xueyingzhang <86...@users.noreply.github.com>
AuthorDate: Tue Nov 8 17:06:39 2022 +0800

    [INLONG-6438][SDK] Parses and handles ack response from DataProxy (#6448)
---
 .../inlong/common/msg/AttributeConstants.java      |   5 +
 .../apache/inlong/sdk/dataproxy/SendResult.java    |  11 +-
 .../inlong/sdk/dataproxy/codec/EncodeObject.java   | 154 ++++++++++++++-------
 .../inlong/sdk/dataproxy/codec/ErrorCode.java      |  52 -------
 .../sdk/dataproxy/codec/ProtocolDecoder.java       |  38 ++---
 .../inlong/sdk/dataproxy/network/Sender.java       |  92 ++++++------
 6 files changed, 177 insertions(+), 175 deletions(-)

diff --git a/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java b/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
index 645de6d4f..6cd635ea7 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
@@ -89,4 +89,9 @@ public interface AttributeConstants {
 
     // error message, add by receiver
     String MESSAGE_PROCESS_ERRMSG = "errMsg";
+
+    String MESSAGE_ID = "messageId";
+
+    // dataproxy IP from dp response ack
+    String MESSAGE_DP_IP = "dpIP";
 }
diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/SendResult.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/SendResult.java
index 10c89af38..75f057b8f 100644
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/SendResult.java
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/SendResult.java
@@ -19,14 +19,19 @@
 package org.apache.inlong.sdk.dataproxy;
 
 public enum SendResult {
-    INVALID_ATTRIBUTES,
+    INVALID_ATTRIBUTES, // including DataProxyErrCode(100,101,102,112)
     OK,
     TIMEOUT,
     CONNECTION_BREAK,
     THREAD_INTERRUPT,
     ASYNC_CALLBACK_BUFFER_FULL,
     NO_CONNECTION,
-    INVALID_DATA,
-    UNKOWN_ERROR
+    INVALID_DATA, // including DataProxyErrCode(103, 111)
+    BODY_EXCEED_MAX_LEN, // DataProxyErrCode(104)
+    SINK_SERVICE_UNREADY, // DataProxyErrCode(1)
+    UNCONFIGURED_GROUPID_OR_STREAMID, // DataProxyErrCode(113)
+    TOPIC_IS_BLANK, // DataProxyErrCode(115)
+    DATAPROXY_FAIL_TO_RECEIVE, // DataProxyErrCode(114,116,117,118,119,120)
 
+    UNKOWN_ERROR
 }
diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java
index bf50507e2..8634621cf 100644
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/EncodeObject.java
@@ -18,12 +18,21 @@
 
 package org.apache.inlong.sdk.dataproxy.codec;
 
-import java.util.List;
-
+import com.google.common.base.Splitter;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.common.enums.DataProxyErrCode;
+import org.apache.inlong.common.msg.AttributeConstants;
+import org.apache.inlong.sdk.dataproxy.SendResult;
 import org.apache.inlong.sdk.dataproxy.config.EncryptConfigEntry;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 public class EncodeObject {
-    private static final String MESSAGE_ID_PREFIX = "messageId=";
+
+    private static final Splitter.MapSplitter MAP_SPLITTER = Splitter.on(AttributeConstants.SEPARATOR).trimResults()
+            .withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR);
 
     private byte[] bodyBytes;
     private String attributes;
@@ -54,25 +63,24 @@ public class EncodeObject {
     private String msgUUID = null;
     private EncryptConfigEntry encryptEntry = null;
 
-    private boolean isException = false;
-    private ErrorCode exceptionError = null;
+    private SendResult sendResult = SendResult.OK;
+    private String errMsg;
+    private String dpIp;
 
-    /* Used by de_serialization. msgtype=7/8*/
+    /* Used by de_serialization. msgtype=8*/
     public EncodeObject() {
     }
 
+    /* Used by de_serialization. msgtype=7*/
+    public EncodeObject(String attributes) {
+        handleAttr(attributes);
+    }
+
     /* Used by de_serialization. */
     public EncodeObject(byte[] bodyBytes, String attributes) {
         this.bodyBytes = bodyBytes;
         this.attributes = attributes;
-        this.messageId = "";
-        String[] tokens = attributes.split("&");
-        for (int i = 0; i < tokens.length; i++) {
-            if (tokens[i].startsWith("messageId=")) {
-                this.messageId = tokens[i].substring(MESSAGE_ID_PREFIX.length(), tokens[i].length());
-                break;
-            }
-        }
+        handleAttr(attributes);
     }
 
     /* Used by serialization.But never used */
@@ -85,7 +93,7 @@ public class EncodeObject {
 
     // used for bytes initializtion,msgtype=3/5
     public EncodeObject(byte[] bodyBytes, String attributes, String messageId,
-                        int msgtype, boolean isCompress, final String groupId) {
+            int msgtype, boolean isCompress, final String groupId) {
         this.bodyBytes = bodyBytes;
         this.messageId = messageId;
         this.attributes = attributes + "&messageId=" + messageId;
@@ -96,7 +104,7 @@ public class EncodeObject {
 
     // used for bodylist initializtion,msgtype=3/5
     public EncodeObject(List<byte[]> bodyList, String attributes, String messageId,
-                        int msgtype, boolean isCompress, final String groupId) {
+            int msgtype, boolean isCompress, final String groupId) {
         this.bodylist = bodyList;
         this.messageId = messageId;
         this.attributes = attributes + "&messageId=" + messageId;
@@ -107,8 +115,8 @@ public class EncodeObject {
 
     // used for bytes initializtion,msgtype=7/8
     public EncodeObject(byte[] bodyBytes, int msgtype, boolean isCompress, boolean isReport,
-                        boolean isGroupIdTransfer, long dt, long seqId, String groupId,
-                        String streamId, String commonattr) {
+            boolean isGroupIdTransfer, long dt, long seqId, String groupId,
+            String streamId, String commonattr) {
         this.bodyBytes = bodyBytes;
         this.msgtype = msgtype;
         this.isCompress = isCompress;
@@ -123,8 +131,8 @@ public class EncodeObject {
 
     // used for bodylist initializtion,msgtype=7/8
     public EncodeObject(List<byte[]> bodyList, int msgtype, boolean isCompress,
-                        boolean isReport, boolean isGroupIdTransfer, long dt,
-                        long seqId, String groupId, String streamId, String commonattr) {
+            boolean isReport, boolean isGroupIdTransfer, long dt,
+            long seqId, String groupId, String streamId, String commonattr) {
         this.bodylist = bodyList;
         this.msgtype = msgtype;
         this.isCompress = isCompress;
@@ -139,9 +147,9 @@ public class EncodeObject {
 
     // file agent, used for bytes initializtion,msgtype=7/8
     public EncodeObject(byte[] bodyBytes, int msgtype, boolean isCompress,
-                        boolean isReport, boolean isGroupIdTransfer, long dt,
-                        long seqId, String groupId, String streamId, String commonattr,
-                        String messageKey, String proxyIp) {
+            boolean isReport, boolean isGroupIdTransfer, long dt,
+            long seqId, String groupId, String streamId, String commonattr,
+            String messageKey, String proxyIp) {
         this.bodyBytes = bodyBytes;
         this.msgtype = msgtype;
         this.isCompress = isCompress;
@@ -158,9 +166,9 @@ public class EncodeObject {
 
     // file agent, used for bodylist initializtion,msgtype=7/8
     public EncodeObject(List<byte[]> bodyList, int msgtype, boolean isCompress,
-                        boolean isReport, boolean isGroupIdTransfer, long dt,
-                        long seqId, String groupId, String streamId, String commonattr,
-                        String messageKey, String proxyIp) {
+            boolean isReport, boolean isGroupIdTransfer, long dt,
+            long seqId, String groupId, String streamId, String commonattr,
+            String messageKey, String proxyIp) {
         this.bodylist = bodyList;
         this.msgtype = msgtype;
         this.isCompress = isCompress;
@@ -175,6 +183,60 @@ public class EncodeObject {
         this.proxyIp = proxyIp;
     }
 
+    private void handleAttr(String attributes) {
+        if (StringUtils.isBlank(attributes)) {
+            return;
+        }
+        Map<String, String> backAttrs = new HashMap<>(MAP_SPLITTER.split(attributes));
+        if (backAttrs.containsKey(AttributeConstants.MESSAGE_ID)) {
+            this.messageId = backAttrs.get(AttributeConstants.MESSAGE_ID);
+        }
+        dpIp = backAttrs.get(AttributeConstants.MESSAGE_DP_IP);
+
+        String errCode = backAttrs.get(AttributeConstants.MESSAGE_PROCESS_ERRCODE);
+        // errCode is empty or equals 0 -> success
+        if (StringUtils.isBlank(errCode) || Integer.parseInt(errCode) == 0) {
+            this.sendResult = SendResult.OK;
+        } else {
+            // get errMsg
+            this.errMsg = backAttrs.get(AttributeConstants.MESSAGE_PROCESS_ERRMSG);
+            if (StringUtils.isBlank(errMsg)) {
+                this.errMsg = DataProxyErrCode.valueOf(Integer.parseInt(errCode)).getErrMsg();
+            }
+            //sendResult
+            this.sendResult = convertToSendResult(Integer.parseInt(errCode));
+        }
+    }
+
+    private SendResult convertToSendResult(int errCode) {
+        DataProxyErrCode dpErrCode = DataProxyErrCode.valueOf(errCode);
+        switch (dpErrCode) {
+            case SINK_SERVICE_UNREADY:
+                return SendResult.SINK_SERVICE_UNREADY;
+            case MISS_REQUIRED_GROUPID_ARGUMENT:
+            case MISS_REQUIRED_STREAMID_ARGUMENT:
+            case MISS_REQUIRED_DT_ARGUMENT:
+            case UNSUPPORTED_EXTEND_FIELD_VALUE:
+                return SendResult.INVALID_ATTRIBUTES;
+            case MISS_REQUIRED_BODY_ARGUMENT:
+            case EMPTY_MSG:
+                return SendResult.INVALID_DATA;
+            case BODY_EXCEED_MAX_LEN:
+                return SendResult.BODY_EXCEED_MAX_LEN;
+            case UNCONFIGURED_GROUPID_OR_STREAMID:
+                return SendResult.UNCONFIGURED_GROUPID_OR_STREAMID;
+            case PUT_EVENT_TO_CHANNEL_FAILURE:
+            case NO_AVAILABLE_PRODUCER:
+            case PRODUCER_IS_NULL:
+            case SEND_REQUEST_TO_MQ_FAILURE:
+            case MQ_RETURN_ERROR:
+            case DUPLICATED_MESSAGE:
+                return SendResult.DATAPROXY_FAIL_TO_RECEIVE;
+            default:
+                return SendResult.UNKOWN_ERROR;
+        }
+    }
+
     public String getMsgUUID() {
         return msgUUID;
     }
@@ -215,14 +277,6 @@ public class EncodeObject {
         this.streamId = streamId;
     }
 
-    public void setMsgtype(int msgtype) {
-        this.msgtype = msgtype;
-    }
-
-    public void setBodyBytes(byte[] bodyBytes) {
-        this.bodyBytes = bodyBytes;
-    }
-
     public boolean isReport() {
         return isReport;
     }
@@ -317,10 +371,18 @@ public class EncodeObject {
         return msgtype;
     }
 
+    public void setMsgtype(int msgtype) {
+        this.msgtype = msgtype;
+    }
+
     public byte[] getBodyBytes() {
         return bodyBytes;
     }
 
+    public void setBodyBytes(byte[] bodyBytes) {
+        this.bodyBytes = bodyBytes;
+    }
+
     public String getAttributes() {
         return attributes;
     }
@@ -361,6 +423,10 @@ public class EncodeObject {
         return cnt;
     }
 
+    public void setCnt(int cnt) {
+        this.cnt = cnt;
+    }
+
     public int getRealCnt() {
         if (bodylist != null) {
             return bodylist.size();
@@ -368,23 +434,15 @@ public class EncodeObject {
         return 1;
     }
 
-    public void setCnt(int cnt) {
-        this.cnt = cnt;
-    }
-
-    public boolean isException() {
-        return isException;
-    }
-
-    public void setException(boolean exception) {
-        isException = exception;
+    public String getDpIp() {
+        return dpIp;
     }
 
-    public ErrorCode getExceptionError() {
-        return exceptionError;
+    public String getErrMsg() {
+        return errMsg;
     }
 
-    public void setExceptionError(ErrorCode exceptionError) {
-        this.exceptionError = exceptionError;
+    public SendResult getSendResult() {
+        return sendResult;
     }
 }
diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ErrorCode.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ErrorCode.java
deleted file mode 100644
index 3598860a4..000000000
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ErrorCode.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dataproxy.codec;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public enum ErrorCode {
-
-    ATTR_ERROR(1),
-
-    DT_ERROR(2),
-
-    COMPRESS_ERROR(3),
-
-    OTHER_ERROR(4),
-
-    LONG_LENGTH_ERROR(5);
-    private final int value;
-    private static final Map<Integer, ErrorCode> map = new HashMap<>();
-
-    static {
-        for (ErrorCode errorCode : ErrorCode.values()) {
-            map.put(errorCode.value, errorCode);
-        }
-    }
-
-    ErrorCode(int value) {
-        this.value = value;
-    }
-
-    public static ErrorCode valueOf(int value) {
-        return map.get(value);
-    }
-
-}
diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java
index 579b07b1e..7bf9bd883 100644
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/codec/ProtocolDecoder.java
@@ -21,15 +21,15 @@ package org.apache.inlong.sdk.dataproxy.codec;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.MessageToMessageDecoder;
-import java.nio.charset.StandardCharsets;
-
-import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
 public class ProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
 
-    private static final Logger logger = LoggerFactory.getLogger(ProtocolDecoder.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolDecoder.class);
 
     @Override
     protected void decode(ChannelHandlerContext ctx,
@@ -37,9 +37,9 @@ public class ProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
         buffer.markReaderIndex();
         // totallen
         int totalLen = buffer.readInt();
-        logger.debug("decode totalLen : {}", totalLen);
+        LOGGER.debug("decode totalLen : {}", totalLen);
         if (totalLen != buffer.readableBytes()) {
-            logger.error("totalLen is not equal readableBytes.total:" + totalLen
+            LOGGER.error("totalLen is not equal readableBytes.total:" + totalLen
                     + ";readableBytes:" + buffer.readableBytes());
             buffer.resetReaderIndex();
             throw new Exception("totalLen is not equal readableBytes.total");
@@ -48,13 +48,13 @@ public class ProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
         int msgType = buffer.readByte() & 0x1f;
 
         if (msgType == 4) {
-            logger.info("debug decode");
+            LOGGER.info("debug decode");
         }
         if (msgType == 3 | msgType == 5) {
             // bodylen
             int bodyLength = buffer.readInt();
             if (bodyLength >= totalLen) {
-                logger.error("bodyLen is greater than totalLen.totalLen:" + totalLen
+                LOGGER.error("bodyLen is greater than totalLen.totalLen:" + totalLen
                         + ";bodyLen:" + bodyLength);
                 buffer.resetReaderIndex();
                 throw new Exception("bodyLen is greater than totalLen.totalLen");
@@ -72,28 +72,20 @@ public class ProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
                 attrBytes = new byte[attrLength];
                 buffer.readBytes(attrBytes);
             }
-            EncodeObject object = new EncodeObject(bodyBytes, new String(attrBytes,
-                    StandardCharsets.UTF_8));
+            EncodeObject object = new EncodeObject(bodyBytes, new String(attrBytes, StandardCharsets.UTF_8));
             object.setMsgtype(5);
             out.add(object);
         } else if (msgType == 7) {
 
             int seqId = buffer.readInt();
             int attrLen = buffer.readShort();
-            EncodeObject object = new EncodeObject();
-            object.setMessageId(String.valueOf(seqId));
-
-            if (attrLen == 4) {
-                int errorValue = buffer.readInt();
-                ErrorCode errorCode = ErrorCode.valueOf(errorValue);
-                if (errorCode != null) {
-                    object.setException(true);
-                    object.setExceptionError(errorCode);
-                }
-            } else {
-                byte[] attrContent = new byte[attrLen];
-                buffer.readBytes(attrContent);
+            byte[] attrBytes = null;
+            if (attrLen > 0) {
+                attrBytes = new byte[attrLen];
+                buffer.readBytes(attrBytes);
             }
+            EncodeObject object = new EncodeObject(new String(attrBytes, StandardCharsets.UTF_8));
+            object.setMessageId(String.valueOf(seqId));
 
             buffer.readShort();
 
diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
index 49be1ce54..f1eef9978 100644
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
@@ -22,10 +22,10 @@ import io.netty.channel.Channel;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.ObjectUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
 import org.apache.inlong.sdk.dataproxy.FileCallback;
-import org.apache.inlong.sdk.dataproxy.SendResult;
+import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
 import org.apache.inlong.sdk.dataproxy.SendMessageCallback;
+import org.apache.inlong.sdk.dataproxy.SendResult;
 import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
 import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
 import org.apache.inlong.sdk.dataproxy.threads.MetricWorkerThread;
@@ -36,11 +36,11 @@ import org.slf4j.LoggerFactory;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -48,7 +48,7 @@ import java.util.concurrent.locks.ReentrantLock;
 
 public class Sender {
 
-    private static final Logger logger = LoggerFactory.getLogger(Sender.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(Sender.class);
 
     /* Store the callback used by asynchronously message sending. */
     private final ConcurrentHashMap<Channel, ConcurrentHashMap<String, QueueObject>> callbacks =
@@ -104,12 +104,12 @@ public class Sender {
 
         metricWorker = new MetricWorkerThread(configure, this);
         metricWorker.start();
-        logger.info("proxy sdk is starting!");
+        LOGGER.info("proxy sdk is starting!");
     }
 
     private void checkCallbackList() {
         // max wait for 1 min
-        logger.info("checking call back list before close, current size is {}",
+        LOGGER.info("checking call back list before close, current size is {}",
                 currentBufferSize.get());
         int count = 0;
         try {
@@ -118,10 +118,10 @@ public class Sender {
                 count += 1;
             }
             if (currentBufferSize.get() > 0) {
-                logger.warn("callback not empty {}, please check it", currentBufferSize.get());
+                LOGGER.warn("callback not empty {}, please check it", currentBufferSize.get());
             }
         } catch (Exception ex) {
-            logger.error("exception while checking callback list", ex);
+            LOGGER.error("exception while checking callback list", ex);
         }
     }
 
@@ -141,13 +141,13 @@ public class Sender {
             e.printStackTrace(pw);
             exceptStr = sw.toString();
         } catch (Exception ex) {
-            logger.error(getExceptionStack(ex));
+            LOGGER.error(getExceptionStack(ex));
         } finally {
             try {
                 pw.close();
                 sw.close();
             } catch (Exception ex) {
-                logger.error(getExceptionStack(ex));
+                LOGGER.error(getExceptionStack(ex));
             }
         }
         return exceptStr;
@@ -155,7 +155,7 @@ public class Sender {
 
     /*Used for asynchronously message sending.*/
     public void notifyCallback(Channel channel, String messageId, SendResult result) {
-        logger.debug("Channel = {} , ack messageId = {}", channel, messageId);
+        LOGGER.debug("Channel = {} , ack messageId = {}", channel, messageId);
         if (channel == null) {
             return;
         }
@@ -178,16 +178,13 @@ public class Sender {
         }
     }
 
-    private SendResult syncSendInternalMessage(NettyClient client,
-                                               EncodeObject encodeObject, String msgUUID,
-                                               long timeout, TimeUnit timeUnit)
-            throws ExecutionException, InterruptedException, TimeoutException {
-
+    private SendResult syncSendInternalMessage(NettyClient client, EncodeObject encodeObject, String msgUUID,
+            long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException {
         if (client == null) {
             return SendResult.NO_CONNECTION;
         }
         if (isNotValidateAttr(encodeObject.getCommonattr(), encodeObject.getAttributes())) {
-            logger.error("error attr format {} {}", encodeObject.getCommonattr(),
+            LOGGER.error("error attr format {} {}", encodeObject.getCommonattr(),
                     encodeObject.getAttributes());
             return SendResult.INVALID_ATTRIBUTES;
         }
@@ -237,17 +234,17 @@ public class Sender {
             message = syncSendInternalMessage(client, encodeObject, msgUUID, timeout, timeUnit);
         } catch (InterruptedException e) {
             // TODO Auto-generated catch block
-            logger.error("send message error {} ", getExceptionStack(e));
+            LOGGER.error("send message error {} ", getExceptionStack(e));
             syncCallables.remove(encodeObject.getMessageId());
             return SendResult.THREAD_INTERRUPT;
         } catch (ExecutionException e) {
             // TODO Auto-generated catch block
-            logger.error("ExecutionException {} ", getExceptionStack(e));
+            LOGGER.error("ExecutionException {} ", getExceptionStack(e));
             syncCallables.remove(encodeObject.getMessageId());
             return SendResult.UNKOWN_ERROR;
         } catch (TimeoutException e) {
             // TODO Auto-generated catch block
-            logger.error("TimeoutException {} ", getExceptionStack(e));
+            LOGGER.error("TimeoutException {} ", getExceptionStack(e));
             //e.printStackTrace();
             SyncMessageCallable syncMessageCallable = syncCallables.remove(encodeObject.getMessageId());
             if (syncMessageCallable != null) {
@@ -255,14 +252,14 @@ public class Sender {
                 if (tmpClient != null) {
                     Channel curChannel = tmpClient.getChannel();
                     if (curChannel != null) {
-                        logger.error("channel maybe busy {}", curChannel);
+                        LOGGER.error("channel maybe busy {}", curChannel);
                         scanThread.addTimeoutChannel(curChannel);
                     }
                 }
             }
             return SendResult.TIMEOUT;
         } catch (Throwable e) {
-            logger.error("syncSendMessage exception {} ", getExceptionStack(e));
+            LOGGER.error("syncSendMessage exception {} ", getExceptionStack(e));
             syncCallables.remove(encodeObject.getMessageId());
             return SendResult.UNKOWN_ERROR;
         }
@@ -280,8 +277,7 @@ public class Sender {
     }
 
     private SendResult syncSendMessageIndexInternal(NettyClient client, EncodeObject encodeObject, String msgUUID,
-                                                    long timeout, TimeUnit timeUnit)
-            throws ExecutionException, InterruptedException, TimeoutException {
+            long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException {
         if (client == null || !client.isActive()) {
             chooseProxy.remove(encodeObject.getMessageId());
             client = clientMgr.getClientByRoundRobin();
@@ -336,7 +332,7 @@ public class Sender {
                 client = clientMgr.getContainProxy(proxyip);
             }
             if (isNotValidateAttr(encodeObject.getCommonattr(), encodeObject.getAttributes())) {
-                logger.error("error attr format {} {}", encodeObject.getCommonattr(),
+                LOGGER.error("error attr format {} {}", encodeObject.getCommonattr(),
                         encodeObject.getAttributes());
                 return SendResult.INVALID_ATTRIBUTES.toString();
             }
@@ -345,17 +341,17 @@ public class Sender {
                         msgUUID, timeout, timeUnit);
             } catch (InterruptedException e) {
                 // TODO Auto-generated catch block
-                logger.error("send message error {}", getExceptionStack(e));
+                LOGGER.error("send message error {}", getExceptionStack(e));
                 syncCallables.remove(encodeObject.getMessageId());
                 return SendResult.THREAD_INTERRUPT.toString();
             } catch (ExecutionException e) {
                 // TODO Auto-generated catch block
-                logger.error("ExecutionException {}", getExceptionStack(e));
+                LOGGER.error("ExecutionException {}", getExceptionStack(e));
                 syncCallables.remove(encodeObject.getMessageId());
                 return SendResult.UNKOWN_ERROR.toString();
             } catch (TimeoutException e) {
                 // TODO Auto-generated catch block
-                logger.error("TimeoutException {}", getExceptionStack(e));
+                LOGGER.error("TimeoutException {}", getExceptionStack(e));
                 //e.printStackTrace();
                 SyncMessageCallable syncMessageCallable = syncCallables.remove(encodeObject.getMessageId());
                 if (syncMessageCallable != null) {
@@ -363,21 +359,21 @@ public class Sender {
                     if (tmpClient != null) {
                         Channel curChannel = tmpClient.getChannel();
                         if (curChannel != null) {
-                            logger.error("channel maybe busy {}", curChannel);
+                            LOGGER.error("channel maybe busy {}", curChannel);
                             scanThread.addTimeoutChannel(curChannel);
                         }
                     }
                 }
                 return SendResult.TIMEOUT.toString();
             } catch (Throwable e) {
-                logger.error("syncSendMessage exception {}", getExceptionStack(e));
+                LOGGER.error("syncSendMessage exception {}", getExceptionStack(e));
                 syncCallables.remove(encodeObject.getMessageId());
                 return SendResult.UNKOWN_ERROR.toString();
             }
             scanThread.resetTimeoutChannel(client.getChannel());
             return message.toString() + "=" + client.getServerIP();
         } catch (Exception e) {
-            logger.error("agent send error {}", getExceptionStack(e));
+            LOGGER.error("agent send error {}", getExceptionStack(e));
             syncCallables.remove(encodeObject.getMessageId());
             return SendResult.UNKOWN_ERROR.toString();
         }
@@ -394,7 +390,7 @@ public class Sender {
      * @throws ProxysdkException
      */
     public void asyncSendMessageIndex(EncodeObject encodeObject, FileCallback callback, String msgUUID, long timeout,
-                                      TimeUnit timeUnit) throws ProxysdkException {
+            TimeUnit timeUnit) throws ProxysdkException {
         NettyClient client = chooseProxy.get(encodeObject.getMessageId());
         String proxyip = encodeObject.getProxyIp();
         if (proxyip != null && proxyip.length() != 0) {
@@ -510,7 +506,7 @@ public class Sender {
      * Following methods used by asynchronously message sending.
      */
     public void asyncSendMessage(EncodeObject encodeObject, SendMessageCallback callback, String msgUUID,
-                                 long timeout, TimeUnit timeUnit) throws ProxysdkException {
+            long timeout, TimeUnit timeUnit) throws ProxysdkException {
         metricWorker.recordNumByKey(encodeObject.getMessageId(), encodeObject.getGroupId(),
                 encodeObject.getStreamId(), Utils.getLocalIp(), encodeObject.getPackageTime(),
                 encodeObject.getDt(), encodeObject.getRealCnt());
@@ -525,7 +521,7 @@ public class Sender {
             throw new ProxysdkException("ASYNC_CALLBACK_BUFFER_FULL");
         }
         if (isNotValidateAttr(encodeObject.getCommonattr(), encodeObject.getAttributes())) {
-            logger.error("error attr format {} {}", encodeObject.getCommonattr(),
+            LOGGER.error("error attr format {} {}", encodeObject.getCommonattr(),
                     encodeObject.getAttributes());
             throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
         }
@@ -554,7 +550,7 @@ public class Sender {
         QueueObject queueObject = msgQueueMap.putIfAbsent(encodeObject.getMessageId(),
                 new QueueObject(System.currentTimeMillis(), callback, size, timeout, timeUnit));
         if (queueObject != null) {
-            logger.warn("message id {} has existed.", encodeObject.getMessageId());
+            LOGGER.warn("message id {} has existed.", encodeObject.getMessageId());
         }
         if (encodeObject.getMsgtype() == 7) {
             int groupIdnum = 0;
@@ -584,17 +580,15 @@ public class Sender {
         String messageId = response.getMessageId();
         chooseProxy.remove(messageId);
         SyncMessageCallable callable = syncCallables.remove(messageId);
-        SendResult result = response.isException() ? SendResult.INVALID_ATTRIBUTES : SendResult.OK;
+        SendResult result = response.getSendResult();
         if (result == SendResult.OK) {
             metricWorker.recordSuccessByMessageId(messageId);
+        } else {
+            LOGGER.error("{} exception happens, error message {}", channel, response.getErrMsg());
         }
         if (callable != null) { // for syncSend
             callable.update(result);
         }
-        if (response.isException()) {
-            logger.error("{} exception happens, error message {}", channel,
-                    response.getExceptionError());
-        }
         notifyCallback(channel, messageId, result); // for asyncSend
     }
 
@@ -606,7 +600,7 @@ public class Sender {
         if (channel == null) {
             return;
         }
-        logger.info("channel {} connection is disconnected!", channel);
+        LOGGER.info("channel {} connection is disconnected!", channel);
         try {
             ConcurrentHashMap<String, QueueObject> msgQueueMap = callbacks.remove(channel);
             if (msgQueueMap != null) {
@@ -627,7 +621,7 @@ public class Sender {
                 msgQueueMap.clear();
             }
         } catch (Throwable e2) {
-            logger.info("process channel {} disconnected callbacks throw error,", channel, e2);
+            LOGGER.info("process channel {} disconnected callbacks throw error,", channel, e2);
         }
 
         try {
@@ -654,7 +648,7 @@ public class Sender {
                 }
             }
         } catch (Throwable e) {
-            logger.info("process channel {} disconnected syncCallables throw error,", channel, e);
+            LOGGER.info("process channel {} disconnected syncCallables throw error,", channel, e);
         }
     }
 
@@ -663,28 +657,28 @@ public class Sender {
         if (channel == null) {
             return;
         }
-        logger.info("wait for ack for channel {}", channel);
+        LOGGER.info("wait for ack for channel {}", channel);
         try {
             ConcurrentHashMap<String, QueueObject> queueObjMap = callbacks.get(channel);
             if (queueObjMap != null) {
                 while (true) {
                     if (queueObjMap.isEmpty()) {
-                        logger.info("this channel {} is empty!", channel);
+                        LOGGER.info("this channel {} is empty!", channel);
                         break;
                     }
                     try {
                         Thread.sleep(1000);
                     } catch (InterruptedException e) {
                         // TODO Auto-generated catch block
-                        logger.error("wait for ack for channel {}, error {}",
+                        LOGGER.error("wait for ack for channel {}, error {}",
                                 channel, e.getMessage());
                         e.printStackTrace();
                     }
                 }
             }
-            logger.info("waitForAckForChannel finished , channel is {}", channel);
+            LOGGER.info("waitForAckForChannel finished , channel is {}", channel);
         } catch (Throwable e) {
-            logger.error("waitForAckForChannel exception, channel is {}", channel, e);
+            LOGGER.error("waitForAckForChannel exception, channel is {}", channel, e);
         }
     }
 


[inlong] 06/07: [INLONG-6426][Manager] SortSourceService support multi stream under one group (#6427)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit b04d4472ebffaed97f0bd1ac389392ca66b494c6
Author: vernedeng <de...@pku.edu.cn>
AuthorDate: Tue Nov 8 19:20:52 2022 +0800

    [INLONG-6426][Manager] SortSourceService support multi stream under one group (#6427)
---
 .../resources/mappers/InlongStreamEntityMapper.xml |  3 +-
 .../resources/mappers/StreamSinkEntityMapper.xml   |  1 +
 .../pojo/sort/standalone/SortSourceStreamInfo.java | 23 ++++++
 .../sort/standalone/SortSourceStreamSinkInfo.java  |  1 +
 .../service/core/impl/SortSourceServiceImpl.java   | 96 +++++++++++++---------
 .../manager/service/sort/SortServiceImplTest.java  | 24 +++---
 6 files changed, 97 insertions(+), 51 deletions(-)

diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamEntityMapper.xml
index 231769ccc..9fec7e7cd 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongStreamEntityMapper.xml
@@ -289,7 +289,8 @@
         select
                inlong_group_id,
                inlong_stream_id,
-               mq_resource
+               mq_resource,
+               ext_params
         from inlong_stream
         where is_deleted = 0
     </select>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
index f6422643c..996c85cf4 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
@@ -372,6 +372,7 @@
         select inlong_cluster_name as sortClusterName,
                sort_task_name,
                inlong_group_id     as groupId,
+               inlong_stream_id    as streamId,
                ext_params
         from stream_sink
         where is_deleted = 0
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortSourceStreamInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortSourceStreamInfo.java
index bd89a573c..294212c27 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortSourceStreamInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortSourceStreamInfo.java
@@ -17,11 +17,34 @@
 
 package org.apache.inlong.manager.pojo.sort.standalone;
 
+import com.google.gson.Gson;
 import lombok.Data;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 @Data
 public class SortSourceStreamInfo {
+    private static final Logger LOGGER = LoggerFactory.getLogger(SortSourceStreamInfo.class);
+    private static final Gson GSON = new Gson();
+
     private String inlongGroupId;
     private String inlongStreamId;
     private String mqResource;
+    String extParams;
+    Map<String, String> extParamsMap = new ConcurrentHashMap<>();
+
+    public Map<String, String> getExtParamsMap() {
+        if (extParamsMap.isEmpty() && StringUtils.isNotBlank(extParams)) {
+            try {
+                extParamsMap = GSON.fromJson(extParams, Map.class);
+            } catch (Throwable t) {
+                LOGGER.error("fail to parse group ext params", t);
+            }
+        }
+        return extParamsMap;
+    }
 }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortSourceStreamSinkInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortSourceStreamSinkInfo.java
index d144b2574..917494252 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortSourceStreamSinkInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/standalone/SortSourceStreamSinkInfo.java
@@ -38,6 +38,7 @@ public class SortSourceStreamSinkInfo {
     String sortClusterName;
     String sortTaskName;
     String groupId;
+    String streamId;
     String extParams;
     Map<String, String> extParamsMap;
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
index 4ec62c36d..df92a55d9 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
@@ -50,7 +50,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
@@ -95,11 +94,11 @@ public class SortSourceServiceImpl implements SortSourceService {
 
     private Map<String, List<SortSourceClusterInfo>> mqClusters;
     private Map<String, SortSourceGroupInfo> groupInfos;
-    private Map<String, SortSourceStreamInfo> allStreams;
+    private Map<String, Map<String, SortSourceStreamInfo>> allStreams;
     private Map<String, String> backupClusterTag;
     private Map<String, String> backupGroupMqResource;
-    private Map<String, String> backupStreamMqResource;
-    private Map<String, Map<String, List<String>>> groupMap;
+    private Map<String, Map<String, String>> backupStreamMqResource;
+    private Map<String, Map<String, List<SortSourceStreamSinkInfo>>> streamSinkMap;
 
     @Autowired
     private SortConfigLoader configLoader;
@@ -195,16 +194,16 @@ public class SortSourceServiceImpl implements SortSourceService {
 
         // reload all stream sinks, to Map<clusterName, Map<taskName, List<groupId>>> format
         List<SortSourceStreamSinkInfo> allStreamSinks = configLoader.loadAllStreamSinks();
-        groupMap = new HashMap<>();
+        streamSinkMap = new HashMap<>();
         allStreamSinks.stream()
                 .filter(sink -> sink.getSortClusterName() != null)
                 .filter(sink -> sink.getSortTaskName() != null)
                 .forEach(sink -> {
-                    Map<String, List<String>> task2groupsMap =
-                            groupMap.computeIfAbsent(sink.getSortClusterName(), k -> new ConcurrentHashMap<>());
-                    List<String> groupIdList =
+                    Map<String, List<SortSourceStreamSinkInfo>> task2groupsMap =
+                            streamSinkMap.computeIfAbsent(sink.getSortClusterName(), k -> new ConcurrentHashMap<>());
+                    List<SortSourceStreamSinkInfo> sinkInfoList =
                             task2groupsMap.computeIfAbsent(sink.getSortTaskName(), k -> new ArrayList<>());
-                    groupIdList.add(sink.getGroupId());
+                    sinkInfoList.add(sink);
                 });
 
         // reload all groups
@@ -225,12 +224,15 @@ public class SortSourceServiceImpl implements SortSourceService {
         // reload all streams
         allStreams = configLoader.loadAllStreams()
                 .stream()
-                .collect(Collectors.toMap(SortSourceStreamInfo::getInlongGroupId, stream -> stream));
+                .collect(Collectors.groupingBy(SortSourceStreamInfo::getInlongGroupId,
+                        Collectors.toMap(SortSourceStreamInfo::getInlongStreamId, info -> info)));
 
         // reload all back up stream mq resource
         backupStreamMqResource = configLoader.loadStreamBackupInfo(ClusterSwitch.BACKUP_MQ_RESOURCE)
                 .stream()
-                .collect(Collectors.toMap(InlongStreamExtEntity::getInlongGroupId, InlongStreamExtEntity::getKeyValue));
+                .collect(Collectors.groupingBy(InlongStreamExtEntity::getInlongGroupId,
+                        Collectors.toMap(InlongStreamExtEntity::getInlongStreamId,
+                                InlongStreamExtEntity::getKeyValue)));
     }
 
     private void parseAll() {
@@ -239,12 +241,12 @@ public class SortSourceServiceImpl implements SortSourceService {
         Map<String, Map<String, String>> newMd5Map = new ConcurrentHashMap<>();
         Map<String, Map<String, CacheZoneConfig>> newConfigMap = new ConcurrentHashMap<>();
 
-        groupMap.forEach((sortClusterName, task2GroupList) -> {
+        streamSinkMap.forEach((sortClusterName, task2SinkList) -> {
             // prepare the new config and md5
             Map<String, CacheZoneConfig> task2Config = new ConcurrentHashMap<>();
             Map<String, String> task2Md5 = new ConcurrentHashMap<>();
 
-            task2GroupList.forEach((taskName, groupList) -> {
+            task2SinkList.forEach((taskName, sinkList) -> {
                 try {
                     CacheZoneConfig cacheZoneConfig =
                             CacheZoneConfig.builder()
@@ -252,7 +254,7 @@ public class SortSourceServiceImpl implements SortSourceService {
                                     .sortTaskId(taskName)
                                     .build();
                     Map<String, CacheZone> cacheZoneMap =
-                            this.parseCacheZones(sortClusterName, taskName, groupList);
+                            this.parseCacheZones(sinkList);
                     cacheZoneConfig.setCacheZones(cacheZoneMap);
 
                     // prepare md5
@@ -277,31 +279,33 @@ public class SortSourceServiceImpl implements SortSourceService {
         backupClusterTag = null;
         backupGroupMqResource = null;
         backupStreamMqResource = null;
-        groupMap = null;
+        streamSinkMap = null;
     }
 
     private Map<String, CacheZone> parseCacheZones(
-            String sortClusterName,
-            String taskName,
-            List<String> groupIdList) {
+            List<SortSourceStreamSinkInfo> sinkList) {
 
         // get group infos
-        List<SortSourceGroupInfo> groupInfoList = groupIdList.stream()
-                .filter(groupInfos::containsKey)
-                .map(groupInfos::get)
+        List<SortSourceStreamSinkInfo> sinkInfoList = sinkList.stream()
+                .filter(sinkInfo -> groupInfos.containsKey(sinkInfo.getGroupId())
+                                && allStreams.containsKey(sinkInfo.getGroupId())
+                                && allStreams.get(sinkInfo.getGroupId()).containsKey(sinkInfo.getStreamId()))
                 .collect(Collectors.toList());
 
         // group them by cluster tag.
-        Map<String, List<SortSourceGroupInfo>> tag2GroupInfos = groupInfoList.stream()
-                .collect(Collectors.groupingBy(SortSourceGroupInfo::getClusterTag));
+        Map<String, List<SortSourceStreamSinkInfo>> tag2SinkInfos = sinkInfoList.stream()
+                .collect(Collectors.groupingBy(sink -> {
+                    SortSourceGroupInfo groupInfo = groupInfos.get(sink.getGroupId());
+                    return groupInfo.getClusterTag();
+                }));
 
         // group them by second cluster tag.
-        Map<String, List<SortSourceGroupInfo>> backupTag2GroupInfos = groupInfoList.stream()
+        Map<String, List<SortSourceStreamSinkInfo>> backupTag2SinkInfos = sinkInfoList.stream()
                 .filter(info -> backupClusterTag.containsKey(info.getGroupId()))
                 .collect(Collectors.groupingBy(info -> backupClusterTag.get(info.getGroupId())));
 
-        List<CacheZone> cacheZones = this.parseCacheZonesByTag(tag2GroupInfos, false);
-        List<CacheZone> backupCacheZones = this.parseCacheZonesByTag(backupTag2GroupInfos, true);
+        List<CacheZone> cacheZones = this.parseCacheZonesByTag(tag2SinkInfos, false);
+        List<CacheZone> backupCacheZones = this.parseCacheZonesByTag(backupTag2SinkInfos, true);
 
         return Stream.of(cacheZones, backupCacheZones)
                 .flatMap(Collection::stream)
@@ -315,18 +319,20 @@ public class SortSourceServiceImpl implements SortSourceService {
                 );
     }
 
-    private List<CacheZone> parseCacheZonesByTag(Map<String, List<SortSourceGroupInfo>> tag2Groups, boolean isBackup) {
+    private List<CacheZone> parseCacheZonesByTag(
+            Map<String, List<SortSourceStreamSinkInfo>> tag2Sinks,
+            boolean isBackup) {
 
-        return tag2Groups.keySet().stream()
+        return tag2Sinks.keySet().stream()
                 .filter(mqClusters::containsKey)
                 .flatMap(tag -> {
-                    List<SortSourceGroupInfo> groups = tag2Groups.get(tag);
+                    List<SortSourceStreamSinkInfo> sinks = tag2Sinks.get(tag);
                     List<SortSourceClusterInfo> clusters = mqClusters.get(tag);
                     return clusters.stream()
                             .map(cluster -> {
                                 CacheZone zone = null;
                                 try {
-                                    zone = this.parseCacheZone(groups, cluster, isBackup);
+                                    zone = this.parseCacheZone(sinks, cluster, isBackup);
                                 } catch (IllegalStateException e) {
                                     LOGGER.error("fail to init cache zone for cluster " + cluster, e);
                                 }
@@ -337,11 +343,11 @@ public class SortSourceServiceImpl implements SortSourceService {
     }
 
     private CacheZone parseCacheZone(
-            List<SortSourceGroupInfo> groups,
+            List<SortSourceStreamSinkInfo> sinks,
             SortSourceClusterInfo cluster,
             boolean isBackupTag) {
         switch (cluster.getType()) {
-            case ClusterType.PULSAR: return parsePulsarZone(groups, cluster, isBackupTag);
+            case ClusterType.PULSAR: return parsePulsarZone(sinks, cluster, isBackupTag);
             default:
                 throw new BusinessException(String.format("do not support cluster type=%s of cluster=%s",
                         cluster.getType(), cluster));
@@ -349,24 +355,34 @@ public class SortSourceServiceImpl implements SortSourceService {
     }
 
     private CacheZone parsePulsarZone(
-            List<SortSourceGroupInfo> groups,
+            List<SortSourceStreamSinkInfo> sinks,
             SortSourceClusterInfo cluster,
             boolean isBackupTag) {
         Map<String, String> param = cluster.getExtParamsMap();
         String tenant = param.get(KEY_TENANT);
         String auth = param.get(KEY_AUTH);
-        List<Topic> sdkTopics = groups.stream()
-                .map(info -> {
-                    String namespace = info.getMqResource();
-                    String topic = allStreams.get(info.getGroupId()).getMqResource();
+        List<Topic> sdkTopics = sinks.stream()
+                .map(sink -> {
+                    String groupId = sink.getGroupId();
+                    String streamId = sink.getStreamId();
+                    SortSourceGroupInfo groupInfo = groupInfos.get(groupId);
+                    SortSourceStreamInfo streamInfo = allStreams.get(groupId).get(streamId);
+
+                    String namespace = groupInfo.getMqResource();
+                    String topic = streamInfo.getMqResource();
                     if (isBackupTag) {
-                        namespace = Optional.ofNullable(backupGroupMqResource.get(info.getGroupId())).orElse(namespace);
-                        topic = Optional.ofNullable(backupStreamMqResource.get(info.getGroupId())).orElse(topic);
+                        if (backupGroupMqResource.containsKey(groupId)) {
+                            namespace = backupGroupMqResource.get(groupId);
+                        }
+                        if (backupStreamMqResource.containsKey(groupId)
+                                && backupStreamMqResource.get(groupId).containsKey(streamId)) {
+                            topic = backupStreamMqResource.get(groupId).get(streamId);
+                        }
                     }
                     String fullTopic = tenant.concat("/").concat(namespace).concat("/").concat(topic);
                     return Topic.builder()
                             .topic(fullTopic)
-                            .topicProperties(info.getExtParamsMap())
+                            .topicProperties(streamInfo.getExtParamsMap())
                             .build();
                 })
                 .collect(Collectors.toList());
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java
index 338ee3276..d52b47486 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/SortServiceImplTest.java
@@ -69,10 +69,12 @@ public class SortServiceImplTest extends ServiceBaseTest {
     private static final String TEST_CLUSTER = "testCluster";
     private static final String TEST_TASK = "testTask";
     private static final String TEST_GROUP = "testGroup";
-    private static final String TEST_STREAM = "1";
+    private static final String TEST_STREAM_1 = "1";
+    private static final String TEST_STREAM_2 = "2";
     private static final String TEST_TAG = "testTag";
     private static final String BACK_UP_TAG = "testBackupTag";
-    private static final String TEST_TOPIC = "testTopic";
+    private static final String TEST_TOPIC_1 = "testTopic";
+    private static final String TEST_TOPIC_2 = "testTopic2";
     private static final String TEST_SINK_TYPE = "testSinkType";
     private static final String TEST_CREATOR = "testUser";
 
@@ -204,11 +206,13 @@ public class SortServiceImplTest extends ServiceBaseTest {
     private void prepareAll() {
         this.prepareCluster(TEST_CLUSTER);
         this.preparePulsar("testPulsar", true, TEST_TAG);
-        this.preparePulsar("testPulsar2", true, BACK_UP_TAG);
+        this.preparePulsar("backupPulsar", true, BACK_UP_TAG);
         this.prepareDataNode(TEST_TASK);
         this.prepareGroupId(TEST_GROUP);
-        this.prepareStreamId(TEST_GROUP, TEST_STREAM);
-        this.prepareTask(TEST_TASK, TEST_GROUP, TEST_CLUSTER);
+        this.prepareStreamId(TEST_GROUP, TEST_STREAM_1, TEST_TOPIC_1);
+        this.prepareStreamId(TEST_GROUP, TEST_STREAM_2, TEST_TOPIC_2);
+        this.prepareTask(TEST_TASK, TEST_GROUP, TEST_CLUSTER, TEST_STREAM_1);
+        this.prepareTask(TEST_TASK, TEST_GROUP, TEST_CLUSTER, TEST_STREAM_2);
     }
 
     private void prepareDataNode(String taskName) {
@@ -255,12 +259,12 @@ public class SortServiceImplTest extends ServiceBaseTest {
         groupService.save(request, "test operator");
     }
 
-    private void prepareStreamId(String groupId, String streamId) {
+    private void prepareStreamId(String groupId, String streamId, String topic) {
         InlongStreamRequest request = new InlongStreamRequest();
         request.setInlongGroupId(groupId);
         request.setInlongStreamId(streamId);
         request.setName("test_stream_name");
-        request.setMqResource(TEST_TOPIC);
+        request.setMqResource(topic);
         request.setVersion(InlongConstants.INITIAL_VERSION);
         List<InlongStreamExtInfo> extInfos = new ArrayList<>();
         InlongStreamExtInfo ext = new InlongStreamExtInfo();
@@ -268,7 +272,7 @@ public class SortServiceImplTest extends ServiceBaseTest {
         ext.setInlongStreamId(streamId);
         ext.setInlongGroupId(groupId);
         ext.setKeyName(ClusterSwitch.BACKUP_MQ_RESOURCE);
-        ext.setKeyValue("backup_topic");
+        ext.setKeyValue("backup_" + topic);
         request.setExtList(extInfos);
         streamService.save(request, "test_operator");
     }
@@ -305,7 +309,7 @@ public class SortServiceImplTest extends ServiceBaseTest {
         clusterService.save(request, "test operator");
     }
 
-    private void prepareTask(String taskName, String groupId, String clusterName) {
+    private void prepareTask(String taskName, String groupId, String clusterName, String streamId) {
         SinkRequest request = new HiveSinkRequest();
         request.setDataNodeName(taskName);
         request.setSinkType(SinkType.HIVE);
@@ -313,7 +317,7 @@ public class SortServiceImplTest extends ServiceBaseTest {
         request.setSinkName(taskName);
         request.setSortTaskName(taskName);
         request.setInlongGroupId(groupId);
-        request.setInlongStreamId("1");
+        request.setInlongStreamId(streamId);
         Map<String, Object> properties = new HashMap<>();
         properties.put("delimiter", "|");
         properties.put("dataType", "text");


[inlong] 05/07: [INLONG-6451][Manager] Optimize the resource process of Kafka MQ (#6455)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 7644e71d58c79ed26e447b13a671309a83e2a0c3
Author: haifxu <xh...@gmail.com>
AuthorDate: Tue Nov 8 19:20:28 2022 +0800

    [INLONG-6451][Manager] Optimize the resource process of Kafka MQ (#6455)
---
 .../manager/pojo/group/kafka/InlongKafkaDTO.java   |  4 ---
 .../manager/pojo/group/kafka/InlongKafkaInfo.java  |  4 ---
 .../pojo/group/kafka/InlongKafkaRequest.java       |  4 ---
 .../resource/queue/kafka/KafkaOperator.java        | 31 +++--------------
 .../queue/kafka/KafkaResourceOperators.java        | 18 ++++------
 .../service/resource/queue/kafka/KafkaUtils.java   | 27 ++-------------
 .../service/source/kafka/KafkaSourceOperator.java  | 39 ++++++++++++++++++++++
 7 files changed, 51 insertions(+), 76 deletions(-)

diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaDTO.java
index 093441768..0136a7d93 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaDTO.java
@@ -44,10 +44,6 @@ public class InlongKafkaDTO extends BaseInlongGroup {
     private Integer numPartitions;
     // replicationFactor number
     private Short replicationFactor = 1;
-    //consumer grouping
-    private String groupId;
-    // autocommit interval
-    private String autoCommit;
 
     /**
      * Get the dto instance from the request
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaInfo.java
index 20a699237..becfead26 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaInfo.java
@@ -40,10 +40,6 @@ public class InlongKafkaInfo extends InlongGroupInfo {
     private Integer numPartitions;
     // replicationFactor number
     private Short replicationFactor = 1;
-    //consumer grouping
-    private String groupId;
-    // autocommit interval
-    private String autoCommit;
 
     public InlongKafkaInfo() {
         this.setMqType(MQType.KAFKA);
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaRequest.java
index 67cfe0550..d66e05ffd 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaRequest.java
@@ -39,10 +39,6 @@ public class InlongKafkaRequest extends InlongGroupRequest {
     private Integer numPartitions;
     // replicationFactor number
     private Short replicationFactor = 1;
-    //consumer grouping
-    private String groupId;
-    // autocommit interval
-    private String autoCommit;
 
     public InlongKafkaRequest() {
         this.setMqType(MQType.KAFKA);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
index 7e886cf95..6691b55d0 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
@@ -17,12 +17,6 @@
 
 package org.apache.inlong.manager.service.resource.queue.kafka;
 
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-
 import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterInfo;
 import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaInfo;
 import org.apache.inlong.manager.service.cluster.InlongClusterServiceImpl;
@@ -30,12 +24,14 @@ import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.CreateTopicsResult;
 import org.apache.kafka.clients.admin.DeleteTopicsResult;
 import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.PartitionInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
 
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
 /**
  * kafka operator, supports creating topics and creating subscription.
  */
@@ -77,23 +73,4 @@ public class KafkaOperator {
         return topicList.contains(topic);
     }
 
-    public void createSubscription(InlongKafkaInfo inlongKafkaInfo, KafkaClusterInfo kafkaClusterInfo,
-            String subscription) {
-        KafkaConsumer kafkaConsumer = KafkaUtils.createKafkaConsumer(inlongKafkaInfo, kafkaClusterInfo);
-        kafkaConsumer.subscribe(Collections.singletonList(subscription));
-    }
-
-    public boolean subscriptionIsExists(InlongKafkaInfo inlongKafkaInfo, KafkaClusterInfo kafkaClusterInfo,
-            String topic) {
-        try (KafkaConsumer consumer = KafkaUtils.createKafkaConsumer(inlongKafkaInfo, kafkaClusterInfo)) {
-            Map<String, List<PartitionInfo>> topics = consumer.listTopics();
-            List<PartitionInfo> partitions = topics.get(topic);
-            if (partitions == null) {
-                LOGGER.info("subscription is not exist");
-                return false;
-            }
-            return true;
-        }
-    }
-
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
index 37a1ee66d..4e2b7246a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
@@ -79,7 +79,7 @@ public class KafkaResourceOperators implements QueueResourceOperator {
                 return;
             }
             for (InlongStreamBriefInfo streamInfo : streamInfoList) {
-                this.createKafkaTopic(inlongKafkaInfo, streamInfo.getInlongStreamId());
+                this.createKafkaTopic(inlongKafkaInfo, streamInfo.getMqResource());
             }
         } catch (Exception e) {
             String msg = String.format("failed to create kafka resource for groupId=%s", groupId);
@@ -126,7 +126,7 @@ public class KafkaResourceOperators implements QueueResourceOperator {
         try {
             InlongKafkaInfo inlongKafkaInfo = (InlongKafkaInfo) groupInfo;
             // create kafka topic
-            this.createKafkaTopic(inlongKafkaInfo, streamInfo.getInlongStreamId());
+            this.createKafkaTopic(inlongKafkaInfo, streamInfo.getMqResource());
         } catch (Exception e) {
             String msg = String.format("failed to create kafka topic for groupId=%s, streamId=%s", groupId, streamId);
             log.error(msg, e);
@@ -159,10 +159,9 @@ public class KafkaResourceOperators implements QueueResourceOperator {
     /**
      * Create Kafka Topic and Subscription, and save the consumer group info.
      */
-    private void createKafkaTopic(InlongKafkaInfo kafkaInfo, String streamId) throws Exception {
+    private void createKafkaTopic(InlongKafkaInfo kafkaInfo, String topicName) throws Exception {
         // 1. create kafka topic
         ClusterInfo clusterInfo = clusterService.getOne(kafkaInfo.getInlongClusterTag(), null, ClusterType.KAFKA);
-        String topicName = kafkaInfo.getInlongGroupId() + "_" + streamId;
         kafkaOperator.createTopic(kafkaInfo, (KafkaClusterInfo) clusterInfo, topicName);
 
         boolean exist = kafkaOperator.topicIsExists((KafkaClusterInfo) clusterInfo, topicName);
@@ -172,17 +171,12 @@ public class KafkaResourceOperators implements QueueResourceOperator {
             throw new WorkflowListenerException("topic=" + topicName + " not exists in " + bootStrapServers);
         }
 
-        // 2. create a subscription for the kafka topic
-        kafkaOperator.createSubscription(kafkaInfo, (KafkaClusterInfo) clusterInfo, topicName);
-        String groupId = kafkaInfo.getInlongGroupId();
-        log.info("success to create kafka subscription for groupId={}, topic={}, consumeGroup={}",
-                groupId, topicName, topicName);
-
-        // 3. insert the consumer group info
+        // Kafka consumers do not need to register in advance
+        // 2. insert the consumer group info
         String consumeGroup = String.format(KAFKA_CONSUMER_GROUP, kafkaInfo.getInlongClusterTag(), topicName);
         Integer id = consumeService.saveBySystem(kafkaInfo, topicName, consumeGroup);
         log.info("success to save inlong consume [{}] for consumerGroup={}, groupId={}, topic={}",
-                id, consumeGroup, groupId, topicName);
+                id, consumeGroup, kafkaInfo.getInlongGroupId(), topicName);
     }
 
     /**
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaUtils.java
index d793b8ce4..a9bd98ced 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaUtils.java
@@ -17,15 +17,12 @@
 
 package org.apache.inlong.manager.service.resource.queue.kafka;
 
-import java.util.Properties;
-
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterInfo;
-import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaInfo;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.AdminClientConfig;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+
+import java.util.Properties;
 
 /**
  * kafka connection utils
@@ -40,24 +37,4 @@ public class KafkaUtils {
         // Create AdminClient instance
         return AdminClient.create(properties);
     }
-
-    public static KafkaConsumer createKafkaConsumer(InlongKafkaInfo inlongKafkaInfo,KafkaClusterInfo kafkaClusterInfo) {
-        Properties properties = new Properties();
-        // The connected kafka cluster address
-        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaClusterInfo.getUrl());
-        // consumer grouping
-        properties.put(ConsumerConfig.GROUP_ID_CONFIG, inlongKafkaInfo.getGroupId());
-        // Confirm Auto Commit
-        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
-        // autocommit interval
-        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
-        // Serialization
-        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-                "org.apache.kafka.common.serialization.IntegerDeserializer");
-        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-                "org.apache.kafka.common.serialization.StringDeserializer");
-        // For different groupid to ensure that the previous message can be consumed, reset the offset
-        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        return new KafkaConsumer(properties);
-    }
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
index 17c14eea2..2ce0f1077 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
@@ -18,22 +18,33 @@
 package org.apache.inlong.manager.service.source.kafka;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
+import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
+import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterInfo;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.source.SourceRequest;
 import org.apache.inlong.manager.pojo.source.StreamSource;
+import org.apache.inlong.manager.pojo.source.kafka.KafkaOffset;
 import org.apache.inlong.manager.pojo.source.kafka.KafkaSource;
 import org.apache.inlong.manager.pojo.source.kafka.KafkaSourceDTO;
 import org.apache.inlong.manager.pojo.source.kafka.KafkaSourceRequest;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.source.AbstractSourceOperator;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 
 /**
  * kafka stream source operator
@@ -43,6 +54,8 @@ public class KafkaSourceOperator extends AbstractSourceOperator {
 
     @Autowired
     private ObjectMapper objectMapper;
+    @Autowired
+    private InlongClusterService clusterService;
 
     @Override
     public Boolean accept(String sourceType) {
@@ -82,4 +95,30 @@ public class KafkaSourceOperator extends AbstractSourceOperator {
         return source;
     }
 
+    @Override
+    public Map<String, List<StreamSource>> getSourcesMap(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfos,
+            List<StreamSource> streamSources) {
+        ClusterInfo clusterInfo = clusterService.getOne(groupInfo.getInlongClusterTag(), null, ClusterType.KAFKA);
+        KafkaClusterInfo kafkaClusterInfo = (KafkaClusterInfo) clusterInfo;
+        String bootstrapServers = kafkaClusterInfo.getUrl();
+
+        Map<String, List<StreamSource>> sourceMap = Maps.newHashMap();
+        streamInfos.forEach(streamInfo -> {
+            KafkaSource kafkaSource = new KafkaSource();
+            String streamId = streamInfo.getInlongStreamId();
+            kafkaSource.setSourceName(streamId);
+            kafkaSource.setBootstrapServers(bootstrapServers);
+            kafkaSource.setTopic(streamInfo.getMqResource());
+            for (StreamSource sourceInfo : streamSources) {
+                if (!Objects.equals(streamId, sourceInfo.getInlongStreamId())) {
+                    continue;
+                }
+                kafkaSource.setSerializationType(sourceInfo.getSerializationType());
+            }
+            kafkaSource.setAutoOffsetReset(KafkaOffset.EARLIEST.getName());
+            kafkaSource.setFieldList(streamInfo.getFieldList());
+            sourceMap.computeIfAbsent(streamId, key -> Lists.newArrayList()).add(kafkaSource);
+        });
+        return sourceMap;
+    }
 }


[inlong] 07/07: [INLONG-6382][Sort] Iceberg data is messed up when the source table has no primary key in multiple sink scenes (#6461)

Posted by do...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit f30dec91e613dc8058ad8c077b63eebe965d8d5f
Author: thesumery <10...@users.noreply.github.com>
AuthorDate: Tue Nov 8 21:37:30 2022 +0800

    [INLONG-6382][Sort] Iceberg data is messed up when the source table has no primary key in multiple sink scenes (#6461)
    
    Co-authored-by: thesumery <15...@qq.com>
---
 .../org/apache/inlong/sort/base/Constants.java     |  7 ++++++
 .../inlong/sort/base/sink/MultipleSinkOption.java  | 26 +++++++++++++++++-----
 .../sort/iceberg/FlinkDynamicTableFactory.java     |  2 ++
 .../inlong/sort/iceberg/IcebergTableSink.java      |  2 ++
 .../apache/inlong/sort/iceberg/sink/FlinkSink.java |  3 +--
 .../sink/multiple/IcebergMultipleStreamWriter.java |  8 +++++--
 6 files changed, 38 insertions(+), 10 deletions(-)

diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index 9cc3c979d..9780370f2 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -164,6 +164,13 @@ public final class Constants {
                     .defaultValue(true)
                     .withDescription("Whether ignore the single table erros when multiple sink writing scenario.");
 
+    public static final ConfigOption<Boolean> SINK_MULTIPLE_PK_AUTO_GENERATED =
+            ConfigOptions.key("sink.multiple.pk-auto-generated")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Whether generated pk fields as whole data when source table does not have a "
+                            + "primary key.");
+
     public static final ConfigOption<Boolean> SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK =
             ConfigOptions.key("sink.multiple.typemap-compatible-with-spark")
                     .booleanType()
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
index 10bc3f3d1..3d5663b74 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
@@ -38,26 +38,29 @@ public class MultipleSinkOption implements Serializable {
     private static final long serialVersionUID = 1L;
     private static final Logger LOG = LoggerFactory.getLogger(MultipleSinkOption.class);
 
-    private String format;
+    private final String format;
 
     private boolean sparkEngineEnable;
 
-    private SchemaUpdateExceptionPolicy schemaUpdatePolicy;
+    private final SchemaUpdateExceptionPolicy schemaUpdatePolicy;
+    private final String databasePattern;
 
-    private String databasePattern;
+    private final String tablePattern;
 
-    private String tablePattern;
+    private final boolean pkAutoGenerated;
 
     public MultipleSinkOption(String format,
             boolean sparkEngineEnable,
             SchemaUpdateExceptionPolicy schemaUpdatePolicy,
             String databasePattern,
-            String tablePattern) {
+            String tablePattern,
+            boolean pkAutoGenerated) {
         this.format = format;
         this.sparkEngineEnable = sparkEngineEnable;
         this.schemaUpdatePolicy = schemaUpdatePolicy;
         this.databasePattern = databasePattern;
         this.tablePattern = tablePattern;
+        this.pkAutoGenerated = pkAutoGenerated;
     }
 
     public String getFormat() {
@@ -85,6 +88,10 @@ public class MultipleSinkOption implements Serializable {
         return tablePattern;
     }
 
+    public boolean isPkAutoGenerated() {
+        return pkAutoGenerated;
+    }
+
     public static Builder builder() {
         return new Builder();
     }
@@ -95,6 +102,7 @@ public class MultipleSinkOption implements Serializable {
         private SchemaUpdateExceptionPolicy schemaUpdatePolicy;
         private String databasePattern;
         private String tablePattern;
+        private boolean pkAutoGenerated;
 
         public MultipleSinkOption.Builder withFormat(String format) {
             this.format = format;
@@ -121,8 +129,14 @@ public class MultipleSinkOption implements Serializable {
             return this;
         }
 
+        public MultipleSinkOption.Builder withPkAutoGenerated(boolean pkAutoGenerated) {
+            this.pkAutoGenerated = pkAutoGenerated;
+            return this;
+        }
+
         public MultipleSinkOption build() {
-            return new MultipleSinkOption(format, sparkEngineEnable, schemaUpdatePolicy, databasePattern, tablePattern);
+            return new MultipleSinkOption(
+                    format, sparkEngineEnable, schemaUpdatePolicy, databasePattern, tablePattern, pkAutoGenerated);
         }
     }
 
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
index 040e73df6..94a7cc305 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
@@ -53,6 +53,7 @@ import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_PK_AUTO_GENERATED;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK;
@@ -242,6 +243,7 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, Dynami
         options.add(SINK_MULTIPLE_DATABASE_PATTERN);
         options.add(SINK_MULTIPLE_TABLE_PATTERN);
         options.add(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY);
+        options.add(SINK_MULTIPLE_PK_AUTO_GENERATED);
         options.add(SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK);
         return options;
     }
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
index 67ca5fdf2..367727309 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
@@ -47,6 +47,7 @@ import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
+import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_PK_AUTO_GENERATED;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK;
@@ -109,6 +110,7 @@ public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning,
                             .withDatabasePattern(tableOptions.get(SINK_MULTIPLE_DATABASE_PATTERN))
                             .withTablePattern(tableOptions.get(SINK_MULTIPLE_TABLE_PATTERN))
                             .withSchemaUpdatePolicy(tableOptions.get(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY))
+                            .withPkAutoGenerated(tableOptions.get(SINK_MULTIPLE_PK_AUTO_GENERATED))
                             .build())
                     .append();
         } else {
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
index 25f9e963b..9191aa575 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
@@ -400,7 +400,6 @@ public class FlinkSink {
         }
 
 
-
         /**
          * Append the iceberg sink operators to write records to iceberg table.
          *
@@ -529,7 +528,7 @@ public class FlinkSink {
 
             IcebergProcessOperator streamWriter =
                     new IcebergProcessOperator(new IcebergMultipleStreamWriter(
-                            appendMode, catalogLoader, inlongMetric, auditHostAndPorts));
+                            appendMode, catalogLoader, inlongMetric, auditHostAndPorts, multipleSinkOption));
             SingleOutputStreamOperator<MultipleWriteResult> writerStream = routeStream
                     .transform(operatorName(ICEBERG_MULTIPLE_STREAM_WRITER_NAME),
                             TypeInformation.of(IcebergProcessOperator.class),
diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
index 617eb6d69..2c67cbd51 100644
--- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
+++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
@@ -43,6 +43,7 @@ import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
 import org.apache.inlong.sort.base.util.MetricStateUtils;
+import org.apache.inlong.sort.base.sink.MultipleSinkOption;
 import org.apache.inlong.sort.iceberg.sink.RowDataTaskWriterFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -76,6 +77,7 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWi
 
     private final boolean appendMode;
     private final CatalogLoader catalogLoader;
+    private final MultipleSinkOption multipleSinkOption;
 
     private transient Catalog catalog;
     private transient Map<TableIdentifier, IcebergSingleStreamWriter<RowData>> multipleWriters;
@@ -95,11 +97,13 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWi
             boolean appendMode,
             CatalogLoader catalogLoader,
             String inlongMetric,
-            String auditHostAndPorts) {
+            String auditHostAndPorts,
+            MultipleSinkOption multipleSinkOption) {
         this.appendMode = appendMode;
         this.catalogLoader = catalogLoader;
         this.inlongMetric = inlongMetric;
         this.auditHostAndPorts = auditHostAndPorts;
+        this.multipleSinkOption = multipleSinkOption;
     }
 
     @Override
@@ -170,7 +174,7 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWi
                     .map(pk -> recordWithSchema.getSchema().findField(pk).fieldId())
                     .collect(Collectors.toList());
             // if physical primary key not exist, put all field to logical primary key
-            if (equalityFieldIds.isEmpty()) {
+            if (equalityFieldIds.isEmpty() && multipleSinkOption.isPkAutoGenerated()) {
                 equalityFieldIds = recordWithSchema.getSchema().columns().stream()
                         .map(NestedField::fieldId)
                         .collect(Collectors.toList());