You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/05/18 14:25:14 UTC
[incubator-inlong] branch master updated: [INLONG-4245][Manager] Manager transmit consumer group of Kafka to Sort (#4248)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 9712ece16 [INLONG-4245][Manager] Manager transmit consumer group of Kafka to Sort (#4248)
9712ece16 is described below
commit 9712ece16531833ceb62ad8ab5da4f846782dafc
Author: pacino <ge...@gmail.com>
AuthorDate: Wed May 18 22:25:10 2022 +0800
[INLONG-4245][Manager] Manager transmit consumer group of Kafka to Sort (#4248)
Co-authored-by: healchow <he...@gmail.com>
---
.../inlong/manager/service/sort/util/ExtractNodeUtils.java | 4 +++-
.../apache/inlong/sort/protocol/constant/KafkaConstant.java | 2 ++
.../inlong/sort/protocol/node/extract/KafkaExtractNode.java | 10 +++++++++-
.../sort/protocol/node/extract/KafkaExtractNodeTest.java | 2 +-
.../singletenant/flink/parser/DistinctNodeSqlParseTest.java | 6 +++---
.../sort/singletenant/flink/parser/FlinkSqlParserTest.java | 2 +-
.../singletenant/flink/parser/FullOuterJoinSqlParseTest.java | 6 +++---
.../flink/parser/InnerJoinRelationShipSqlParseTest.java | 6 +++---
.../singletenant/flink/parser/LeftOuterJoinSqlParseTest.java | 6 +++---
.../sort/singletenant/flink/parser/MetaFieldSyncTest.java | 2 +-
.../singletenant/flink/parser/RightOuterJoinSqlParseTest.java | 6 +++---
11 files changed, 32 insertions(+), 20 deletions(-)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
index 073ba8c9e..cfe1b2626 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/ExtractNodeUtils.java
@@ -177,6 +177,7 @@ public class ExtractNodeUtils {
startupMode = ScanStartupMode.LATEST_OFFSET;
}
final String primaryKey = kafkaSourceResponse.getPrimaryKey();
+ String groupId = kafkaSourceResponse.getGroupId();
return new KafkaExtractNode(id,
name,
@@ -187,6 +188,7 @@ public class ExtractNodeUtils {
bootstrapServers,
format,
startupMode,
- primaryKey);
+ primaryKey,
+ groupId);
}
}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/KafkaConstant.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/KafkaConstant.java
index 2423293f1..64d6261b3 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/KafkaConstant.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/constant/KafkaConstant.java
@@ -27,6 +27,8 @@ public class KafkaConstant {
public static final String PROPERTIES_BOOTSTRAP_SERVERS = "properties.bootstrap.servers";
+ public static final String PROPERTIES_GROUP_ID = "properties.group.id";
+
public static final String CONNECTOR = "connector";
public static final String SCAN_STARTUP_MODE = "scan.startup.mode";
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
index ab28ca27f..4465a5e71 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
@@ -68,6 +68,9 @@ public class KafkaExtractNode extends ExtractNode implements Serializable {
@JsonProperty("primaryKey")
private String primaryKey;
+ @JsonProperty("groupId")
+ private String groupId;
+
@JsonCreator
public KafkaExtractNode(@JsonProperty("id") String id,
@JsonProperty("name") String name,
@@ -78,13 +81,15 @@ public class KafkaExtractNode extends ExtractNode implements Serializable {
@Nonnull @JsonProperty("bootstrapServers") String bootstrapServers,
@Nonnull @JsonProperty("format") Format format,
@JsonProperty("scanStartupMode") ScanStartupMode scanStartupMode,
- @JsonProperty("primaryKey") String primaryKey) {
+ @JsonProperty("primaryKey") String primaryKey,
+ @JsonProperty("groupId") String groupId) {
super(id, name, fields, watermarkField, properties);
this.topic = Preconditions.checkNotNull(topic, "kafka topic is empty");
this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "kafka bootstrapServers is empty");
this.format = Preconditions.checkNotNull(format, "kafka format is empty");
this.scanStartupMode = Preconditions.checkNotNull(scanStartupMode, "kafka scanStartupMode is empty");
this.primaryKey = primaryKey;
+ this.groupId = groupId;
}
/**
@@ -113,6 +118,9 @@ public class KafkaExtractNode extends ExtractNode implements Serializable {
} else {
throw new IllegalArgumentException("kafka extract node format is IllegalArgument");
}
+ if (StringUtils.isNotEmpty(groupId)) {
+ options.put(KafkaConstant.PROPERTIES_GROUP_ID, groupId);
+ }
return options;
}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
index d2987c320..e0a639fd8 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
@@ -39,6 +39,6 @@ public class KafkaExtractNodeTest extends SerializeBaseTest<Node> {
new FieldInfo("name", new StringFormatInfo()),
new FieldInfo("age", new IntFormatInfo()));
return new KafkaExtractNode("1", "kafka_input", fields, null, null, "workerCsv",
- "localhost:9092", new CsvFormat(), ScanStartupMode.EARLIEST_OFFSET, null);
+ "localhost:9092", new CsvFormat(), ScanStartupMode.EARLIEST_OFFSET, null, "groupId");
}
}
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/DistinctNodeSqlParseTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/DistinctNodeSqlParseTest.java
index 61a5134b8..3d72e2112 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/DistinctNodeSqlParseTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/DistinctNodeSqlParseTest.java
@@ -69,7 +69,7 @@ public class DistinctNodeSqlParseTest extends AbstractTestBase {
return new KafkaExtractNode("1", "kafka_input", fields, null,
null, "topic_input", "localhost:9092",
new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
- null);
+ null, "groupId");
}
private KafkaExtractNode buildKafkaExtractNode2() {
@@ -85,7 +85,7 @@ public class DistinctNodeSqlParseTest extends AbstractTestBase {
return new KafkaExtractNode("1", "kafka_input", fields, wk,
null, "topic_input", "localhost:9092",
new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
- null);
+ null, "groupId");
}
private KafkaExtractNode buildKafkaExtractNode3() {
@@ -96,7 +96,7 @@ public class DistinctNodeSqlParseTest extends AbstractTestBase {
new FieldInfo("ts", new TimestampFormatInfo()));
return new KafkaExtractNode("1", "kafka_input", fields, null,
null, "topic_input", "localhost:9092",
- new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET, null);
+ new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET, null, "groupId");
}
private KafkaLoadNode buildKafkaLoadNode() {
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FlinkSqlParserTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FlinkSqlParserTest.java
index 5a92c3a44..16b2dcb83 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FlinkSqlParserTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FlinkSqlParserTest.java
@@ -87,7 +87,7 @@ public class FlinkSqlParserTest extends AbstractTestBase {
new StringConstantParam("5"),
new TimeUnitConstantParam(TimeUnit.SECOND));
return new KafkaExtractNode(id, "kafka_input", fields, wk, null, "workerJson",
- "localhost:9092", new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET, null);
+ "localhost:9092", new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET, null, "groupId");
}
private KafkaLoadNode buildKafkaNode(String id) {
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FullOuterJoinSqlParseTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FullOuterJoinSqlParseTest.java
index cdf785ef6..bb4091a28 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FullOuterJoinSqlParseTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/FullOuterJoinSqlParseTest.java
@@ -76,7 +76,7 @@ public class FullOuterJoinSqlParseTest extends AbstractTestBase {
return new KafkaExtractNode("1", "kafka_input_1", fields, null,
null, "topic_input_1", "localhost:9092",
new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
- null);
+ null, "groupId");
}
/**
@@ -90,7 +90,7 @@ public class FullOuterJoinSqlParseTest extends AbstractTestBase {
return new KafkaExtractNode("2", "kafka_input_2", fields, null,
null, "topic_input_2", "localhost:9092",
new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
- null);
+ null, "groupId");
}
/**
@@ -104,7 +104,7 @@ public class FullOuterJoinSqlParseTest extends AbstractTestBase {
new FieldInfo("ts", new TimestampFormatInfo()));
return new KafkaExtractNode("3", "kafka_input_3", fields, null,
null, "topic_input_3", "localhost:9092",
- new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET, null);
+ new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET, null, "groupId");
}
/**
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/InnerJoinRelationShipSqlParseTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/InnerJoinRelationShipSqlParseTest.java
index 1eab9ff2c..ed05ab917 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/InnerJoinRelationShipSqlParseTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/InnerJoinRelationShipSqlParseTest.java
@@ -76,7 +76,7 @@ public class InnerJoinRelationShipSqlParseTest extends AbstractTestBase {
return new KafkaExtractNode("1", "kafka_input_1", fields, null,
null, "topic_input_1", "localhost:9092",
new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
- null);
+ null, "groupId");
}
/**
@@ -90,7 +90,7 @@ public class InnerJoinRelationShipSqlParseTest extends AbstractTestBase {
return new KafkaExtractNode("2", "kafka_input_2", fields, null,
null, "topic_input_2", "localhost:9092",
new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
- null);
+ null, "groupId");
}
/**
@@ -104,7 +104,7 @@ public class InnerJoinRelationShipSqlParseTest extends AbstractTestBase {
new FieldInfo("ts", new TimestampFormatInfo()));
return new KafkaExtractNode("3", "kafka_input_3", fields, null,
null, "topic_input_3", "localhost:9092",
- new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET, null);
+ new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET, null, "groupId");
}
/**
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/LeftOuterJoinSqlParseTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/LeftOuterJoinSqlParseTest.java
index 645e63915..f478b3c1c 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/LeftOuterJoinSqlParseTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/LeftOuterJoinSqlParseTest.java
@@ -77,7 +77,7 @@ public class LeftOuterJoinSqlParseTest extends AbstractTestBase {
return new KafkaExtractNode("1", "kafka_input_1", fields, null,
null, "topic_input_1", "localhost:9092",
new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
- null);
+ null, "groupId");
}
/**
@@ -91,7 +91,7 @@ public class LeftOuterJoinSqlParseTest extends AbstractTestBase {
return new KafkaExtractNode("2", "kafka_input_2", fields, null,
null, "topic_input_2", "localhost:9092",
new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
- null);
+ null, "groupId");
}
/**
@@ -105,7 +105,7 @@ public class LeftOuterJoinSqlParseTest extends AbstractTestBase {
new FieldInfo("ts", new TimestampFormatInfo()));
return new KafkaExtractNode("3", "kafka_input_3", fields, null,
null, "topic_input_3", "localhost:9092",
- new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET, null);
+ new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET, null, "groupId");
}
/**
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/MetaFieldSyncTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/MetaFieldSyncTest.java
index e1e2bf574..2c2498a4a 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/MetaFieldSyncTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/MetaFieldSyncTest.java
@@ -188,7 +188,7 @@ public class MetaFieldSyncTest extends AbstractTestBase {
return new KafkaExtractNode("3", "kafka_input", fields,
null, null, "topic1", "localhost:9092",
new CanalJsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
- null);
+ null, "groupId");
}
private Node buildKafkaLoadNode2() {
diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/RightOuterJoinSqlParseTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/RightOuterJoinSqlParseTest.java
index 2314ba4f5..5bc25b3ac 100644
--- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/RightOuterJoinSqlParseTest.java
+++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/RightOuterJoinSqlParseTest.java
@@ -76,7 +76,7 @@ public class RightOuterJoinSqlParseTest extends AbstractTestBase {
return new KafkaExtractNode("1", "kafka_input_1", fields, null,
null, "topic_input_1", "localhost:9092",
new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
- null);
+ null, "groupId");
}
/**
@@ -90,7 +90,7 @@ public class RightOuterJoinSqlParseTest extends AbstractTestBase {
return new KafkaExtractNode("2", "kafka_input_2", fields, null,
null, "topic_input_2", "localhost:9092",
new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET,
- null);
+ null, "groupId");
}
/**
@@ -104,7 +104,7 @@ public class RightOuterJoinSqlParseTest extends AbstractTestBase {
new FieldInfo("ts", new TimestampFormatInfo()));
return new KafkaExtractNode("3", "kafka_input_3", fields, null,
null, "topic_input_3", "localhost:9092",
- new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET, null);
+ new JsonFormat(), ScanStartupMode.EARLIEST_OFFSET, null, "groupId");
}
/**