You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/05 09:32:00 UTC
[incubator-inlong] branch master updated: [INLONG-2931][Manager] Improve parse response in manager client (#2932)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new dcacfe2 [INLONG-2931][Manager] Improve parse response in manager client (#2932)
dcacfe2 is described below
commit dcacfe233b8f741b047d7bbfb8c61aac09696cde
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Sat Mar 5 17:31:52 2022 +0800
[INLONG-2931][Manager] Improve parse response in manager client (#2932)
---
.../manager/client/api/util/InlongParser.java | 29 ++++++++++++++++++++++
.../pojo/source/binlog/BinlogSourceResponse.java | 6 +++--
.../pojo/source/kafka/KafkaSourceResponse.java | 6 +++--
3 files changed, 37 insertions(+), 4 deletions(-)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
index 1c8987c..28c57c8 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
@@ -42,8 +42,11 @@ import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkResponse;
import org.apache.inlong.manager.common.pojo.sink.iceberg.IcebergSinkResponse;
import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkResponse;
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceResponse;
import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceResponse;
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceResponse;
import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
@@ -59,7 +62,9 @@ public class InlongParser {
public static final String MQ_EXT_INFO = "mqExtInfo";
public static final String MIDDLEWARE_TYPE = "middlewareType";
public static final String SINK_INFO = "sinkInfo";
+ public static final String SOURCE_INFO = "sourceInfo";
public static final String SINK_TYPE = "sinkType";
+ public static final String SOURCE_TYPE = "sourceType";
public static Response parseResponse(String responseBody) {
Response response = GsonUtil.fromJson(responseBody, Response.class);
@@ -109,6 +114,30 @@ public class InlongParser {
FullStreamResponse fullStreamResponse = GsonUtil.fromJson(fullStreamJson.toString(),
FullStreamResponse.class);
list.add(fullStreamResponse);
+ //Parse sourceResponse in each stream
+ JsonArray sourceJsonArr = fullStreamJson.getAsJsonArray(SOURCE_INFO);
+ List<SourceResponse> sourceResponses = Lists.newArrayList();
+ fullStreamResponse.setSourceInfo(sourceResponses);
+ for (int j = 0; j < sourceJsonArr.size(); j++) {
+ JsonObject sourceJson = (JsonObject) sourceJsonArr.get(i);
+ String type = sourceJson.get(SOURCE_TYPE).getAsString();
+ SourceType sourceType = SourceType.forType(type);
+ switch (sourceType) {
+ case BINLOG:
+ BinlogSourceResponse binlogSourceResponse = GsonUtil.fromJson(sourceJson.toString(),
+ BinlogSourceResponse.class);
+ sourceResponses.add(binlogSourceResponse);
+ break;
+ case KAFKA:
+ KafkaSourceResponse kafkaSourceResponse = GsonUtil.fromJson(sourceJson.toString(),
+ KafkaSourceResponse.class);
+ sourceResponses.add(kafkaSourceResponse);
+ break;
+ default:
+ throw new RuntimeException(String.format("Unsupport sourceType=%s for Inlong", sourceType));
+ }
+ }
+
//Parse sinkResponse in each stream
JsonArray sinkJsonArr = fullStreamJson.getAsJsonArray(SINK_INFO);
List<SinkResponse> sinkResponses = Lists.newArrayList();
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java
index 8941bc1..1cce061 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java
@@ -22,7 +22,7 @@ import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.pojo.source.SourceResponse;
/**
@@ -34,7 +34,9 @@ import org.apache.inlong.manager.common.pojo.source.SourceResponse;
@ApiModel(value = "Response of the binlog source")
public class BinlogSourceResponse extends SourceResponse {
- private String sourceType = Constant.SOURCE_BINLOG;
+ public BinlogSourceResponse() {
+ this.setSourceType(SourceType.BINLOG.name());
+ }
@ApiModelProperty("Username of the DB server")
private String user;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceResponse.java
index 770e111..7879032 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceResponse.java
@@ -22,7 +22,7 @@ import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.pojo.source.SourceResponse;
/**
@@ -34,7 +34,9 @@ import org.apache.inlong.manager.common.pojo.source.SourceResponse;
@ApiModel(value = "Response of the kafka source")
public class KafkaSourceResponse extends SourceResponse {
- private String sourceType = Constant.SOURCE_KAFKA;
+ public KafkaSourceResponse() {
+ this.setSourceType(SourceType.KAFKA.name());
+ }
@ApiModelProperty("Kafka topic")
private String topic;