You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/05/18 14:25:14 UTC

[incubator-inlong] branch master updated: [INLONG-4245][Manager] Manager transmit consumer group of Kafka to Sort (#4248)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 9712ece16 [INLONG-4245][Manager] Manager transmit consumer group of Kafka to Sort (#4248)
9712ece16 is described below

commit 9712ece16531833ceb62ad8ab5da4f846782dafc
Author: pacino <ge...@gmail.com>
AuthorDate: Wed May 18 22:25:10 2022 +0800

    [INLONG-4245][Manager] Manager transmit consumer group of Kafka to Sort (#4248)
    
    Co-authored-by: healchow <he...@gmail.com>
---
 .../inlong/manager/service/sort/util/ExtractNodeUtils.java     |  4 +++-
 .../apache/inlong/sort/protocol/constant/KafkaConstant.java    |  2 ++
 .../inlong/sort/protocol/node/extract/KafkaExtractNode.java    | 10 +++++++++-
 .../sort/protocol/node/extract/KafkaExtractNodeTest.java       |  2 +-
 .../singletenant/flink/parser/DistinctNodeSqlParseTest.java    |  6 +++---
 .../sort/singletenant/flink/parser/FlinkSqlParserTest.java     |  2 +-
 .../singletenant/flink/parser/FullOuterJoinSqlParseTest.java   |  6 +++---
 .../flink/parser/InnerJoinRelationShipSqlParseTest.java        |  6 +++---
 .../singletenant/flink/parser/LeftOuterJoinSqlParseTest.java   |  6 +++---
 .../sort/singletenant/flink/parser/MetaFieldSyncTest.java      |  2 +-
 .../singletenant/flink/parser/RightOuterJoinSqlParseTest.java  |  6 +++---
 11 files changed, 32 insertions(+), 20 deletions(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
index 073ba8c9e..cfe1b2626 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
@@ -177,6 +177,7 @@ public class ExtractNodeUtils {
                 startupMode = ScanStartupMode.LATEST_OFFSET;
         }
         final String primaryKey = kafkaSourceResponse.getPrimaryKey();
+        String groupId = kafkaSourceResponse.getGroupId();
 
         return new KafkaExtractNode(id,
                 name,
@@ -187,6 +188,7 @@ public class ExtractNodeUtils {
                 bootstrapServers,
                 format,
                 startupMode,
-                primaryKey);
+                primaryKey,
+                groupId);
     }
 }
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/KafkaConstant.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/KafkaConstant.java
index 2423293f1..64d6261b3 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/KafkaConstant.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/KafkaConstant.java
@@ -27,6 +27,8 @@ public class KafkaConstant {
 
     public static final String PROPERTIES_BOOTSTRAP_SERVERS = "properties.bootstrap.servers";
 
+    public static final String PROPERTIES_GROUP_ID = "properties.group.id";
+
     public static final String CONNECTOR = "connector";
 
     public static final String SCAN_STARTUP_MODE = "scan.startup.mode";
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
index ab28ca27f..4465a5e71 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
@@ -68,6 +68,9 @@ public class KafkaExtractNode extends ExtractNode implements Serializable {
     @JsonProperty("primaryKey")
     private String primaryKey;
 
+    @JsonProperty("groupId")
+    private String groupId;
+
     @JsonCreator
     public KafkaExtractNode(@JsonProperty("id") String id,
             @JsonProperty("name") String name,
@@ -78,13 +81,15 @@ public class KafkaExtractNode extends ExtractNode implements Serializable {
             @Nonnull @JsonProperty("bootstrapServers") String bootstrapServers,
             @Nonnull @JsonProperty("format") Format format,
             @JsonProperty("scanStartupMode") ScanStartupMode scanStartupMode,
-            @JsonProperty("primaryKey") String primaryKey) {
+            @JsonProperty("primaryKey") String primaryKey,
+            @JsonProperty("groupId") String groupId) {
         super(id, name, fields, watermarkField, properties);
         this.topic = Preconditions.checkNotNull(topic, "kafka topic is empty");
         this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "kafka bootstrapServers is empty");
         this.format = Preconditions.checkNotNull(format, "kafka format is empty");
         this.scanStartupMode = Preconditions.checkNotNull(scanStartupMode, "kafka scanStartupMode is empty");
         this.primaryKey = primaryKey;
+        this.groupId = groupId;
     }
 
     /**
@@ -113,6 +118,9 @@ public class KafkaExtractNode extends ExtractNode implements Serializable {
         } else {
             throw new IllegalArgumentException("kafka extract node format is IllegalArgument");
         }
+        if (StringUtils.isNotEmpty(groupId)) {
+            options.put(KafkaConstant.PROPERTIES_GROUP_ID, groupId);
+        }
         return options;
     }
 
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
index d2987c320..e0a639fd8 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
@@ -39,6 +39,6 @@ public class KafkaExtractNodeTest extends SerializeBaseTest<Node> {
                 new FieldInfo("name", new StringFormatInfo()),
                 new FieldInfo("age", new IntFormatInfo()));
         return new KafkaExtractNode("1", "kafka_input", fields, null, null, "workerCsv",
-                "localhost:9092", new CsvFormat(), ScanStartupMode.EARLIEST_OFFSET, null);
+                "localhost:9092", new CsvFormat(), ScanStartupMode.EARLIEST_OFFSET, null, "groupId");
     }
 }
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/DistinctNodeSqlParseTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/DistinctNodeSqlParseTest.java
index 61a5134b8..3d72e2112 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/DistinctNodeSqlParseTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/DistinctNodeSqlParseTest.java
@@ -69,7 +69,7 @@ public class DistinctNodeSqlParseTest extends AbstractTestBase {
         return new KafkaExtractNode("1", "kafka_input", fields, null,
                 null, "topic_input", "localhost:9092",
                 new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
-                null);
+                null, "groupId");
     }
 
     private KafkaExtractNode buildKafkaExtractNode2() {
@@ -85,7 +85,7 @@ public class DistinctNodeSqlParseTest extends AbstractTestBase {
         return new KafkaExtractNode("1", "kafka_input", fields, wk,
                 null, "topic_input", "localhost:9092",
                 new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
-                null);
+                null, "groupId");
     }
 
     private KafkaExtractNode buildKafkaExtractNode3() {
@@ -96,7 +96,7 @@ public class DistinctNodeSqlParseTest extends AbstractTestBase {
                 new FieldInfo("ts", new TimestampFormatInfo()));
         return new KafkaExtractNode("1", "kafka_input", fields, null,
                 null, "topic_input", "localhost:9092",
-                new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET, null);
+                new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET, null, "groupId");
     }
 
     private KafkaLoadNode buildKafkaLoadNode() {
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FlinkSqlParserTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FlinkSqlParserTest.java
index 5a92c3a44..16b2dcb83 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FlinkSqlParserTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FlinkSqlParserTest.java
@@ -87,7 +87,7 @@ public class FlinkSqlParserTest extends AbstractTestBase {
                 new StringConstantParam("5"),
                 new TimeUnitConstantParam(TimeUnit.SECOND));
         return new KafkaExtractNode(id, "kafka_input", fields, wk, null, "workerJson",
-                "localhost:9092", new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET, null);
+                "localhost:9092", new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET, null, "groupId");
     }
 
     private KafkaLoadNode buildKafkaNode(String id) {
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FullOuterJoinSqlParseTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FullOuterJoinSqlParseTest.java
index cdf785ef6..bb4091a28 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FullOuterJoinSqlParseTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FullOuterJoinSqlParseTest.java
@@ -76,7 +76,7 @@ public class FullOuterJoinSqlParseTest extends AbstractTestBase {
         return new KafkaExtractNode("1", "kafka_input_1", fields, null,
                 null, "topic_input_1", "localhost:9092",
                 new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
-                null);
+                null, "groupId");
     }
 
     /**
@@ -90,7 +90,7 @@ public class FullOuterJoinSqlParseTest extends AbstractTestBase {
         return new KafkaExtractNode("2", "kafka_input_2", fields, null,
                 null, "topic_input_2", "localhost:9092",
                 new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
-                null);
+                null, "groupId");
     }
 
     /**
@@ -104,7 +104,7 @@ public class FullOuterJoinSqlParseTest extends AbstractTestBase {
                 new FieldInfo("ts", new TimestampFormatInfo()));
         return new KafkaExtractNode("3", "kafka_input_3", fields, null,
                 null, "topic_input_3", "localhost:9092",
-                new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET, null);
+                new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET, null, "groupId");
     }
 
     /**
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/InnerJoinRelationShipSqlParseTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/InnerJoinRelationShipSqlParseTest.java
index 1eab9ff2c..ed05ab917 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/InnerJoinRelationShipSqlParseTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/InnerJoinRelationShipSqlParseTest.java
@@ -76,7 +76,7 @@ public class InnerJoinRelationShipSqlParseTest extends AbstractTestBase {
         return new KafkaExtractNode("1", "kafka_input_1", fields, null,
                 null, "topic_input_1", "localhost:9092",
                 new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
-                null);
+                null, "groupId");
     }
 
     /**
@@ -90,7 +90,7 @@ public class InnerJoinRelationShipSqlParseTest extends AbstractTestBase {
         return new KafkaExtractNode("2", "kafka_input_2", fields, null,
                 null, "topic_input_2", "localhost:9092",
                 new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
-                null);
+                null, "groupId");
     }
 
     /**
@@ -104,7 +104,7 @@ public class InnerJoinRelationShipSqlParseTest extends AbstractTestBase {
                 new FieldInfo("ts", new TimestampFormatInfo()));
         return new KafkaExtractNode("3", "kafka_input_3", fields, null,
                 null, "topic_input_3", "localhost:9092",
-                new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET, null);
+                new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET, null, "groupId");
     }
 
     /**
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/LeftOuterJoinSqlParseTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/LeftOuterJoinSqlParseTest.java
index 645e63915..f478b3c1c 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/LeftOuterJoinSqlParseTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/LeftOuterJoinSqlParseTest.java
@@ -77,7 +77,7 @@ public class LeftOuterJoinSqlParseTest extends AbstractTestBase {
         return new KafkaExtractNode("1", "kafka_input_1", fields, null,
                 null, "topic_input_1", "localhost:9092",
                 new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
-                null);
+                null, "groupId");
     }
 
     /**
@@ -91,7 +91,7 @@ public class LeftOuterJoinSqlParseTest extends AbstractTestBase {
         return new KafkaExtractNode("2", "kafka_input_2", fields, null,
                 null, "topic_input_2", "localhost:9092",
                 new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
-                null);
+                null, "groupId");
     }
 
     /**
@@ -105,7 +105,7 @@ public class LeftOuterJoinSqlParseTest extends AbstractTestBase {
                 new FieldInfo("ts", new TimestampFormatInfo()));
         return new KafkaExtractNode("3", "kafka_input_3", fields, null,
                 null, "topic_input_3", "localhost:9092",
-                new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET, null);
+                new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET, null, "groupId");
     }
 
     /**
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/MetaFieldSyncTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/MetaFieldSyncTest.java
index e1e2bf574..2c2498a4a 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/MetaFieldSyncTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/MetaFieldSyncTest.java
@@ -188,7 +188,7 @@ public class MetaFieldSyncTest extends AbstractTestBase {
         return new KafkaExtractNode("3", "kafka_input", fields,
                 null, null, "topic1", "localhost:9092",
                 new CanalJsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
-                null);
+                null, "groupId");
     }
 
     private Node buildKafkaLoadNode2() {
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/RightOuterJoinSqlParseTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/RightOuterJoinSqlParseTest.java
index 2314ba4f5..5bc25b3ac 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/RightOuterJoinSqlParseTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/RightOuterJoinSqlParseTest.java
@@ -76,7 +76,7 @@ public class RightOuterJoinSqlParseTest extends AbstractTestBase {
         return new KafkaExtractNode("1", "kafka_input_1", fields, null,
                 null, "topic_input_1", "localhost:9092",
                 new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
-                null);
+                null, "groupId");
     }
 
     /**
@@ -90,7 +90,7 @@ public class RightOuterJoinSqlParseTest extends AbstractTestBase {
         return new KafkaExtractNode("2", "kafka_input_2", fields, null,
                 null, "topic_input_2", "localhost:9092",
                 new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
-                null);
+                null, "groupId");
     }
 
     /**
@@ -104,7 +104,7 @@ public class RightOuterJoinSqlParseTest extends AbstractTestBase {
                 new FieldInfo("ts", new TimestampFormatInfo()));
         return new KafkaExtractNode("3", "kafka_input_3", fields, null,
                 null, "topic_input_3", "localhost:9092",
-                new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET, null);
+                new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET, null, "groupId");
     }
 
     /**