You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/05 09:32:00 UTC

[incubator-inlong] branch master updated: [INLONG-2931][Manager] Improve parse response in manager client (#2932)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new dcacfe2  [INLONG-2931][Manager] Improve parse response in manager client (#2932)
dcacfe2 is described below

commit dcacfe233b8f741b047d7bbfb8c61aac09696cde
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Sat Mar 5 17:31:52 2022 +0800

    [INLONG-2931][Manager] Improve parse response in manager client (#2932)
---
 .../manager/client/api/util/InlongParser.java      | 29 ++++++++++++++++++++++
 .../pojo/source/binlog/BinlogSourceResponse.java   |  6 +++--
 .../pojo/source/kafka/KafkaSourceResponse.java     |  6 +++--
 3 files changed, 37 insertions(+), 4 deletions(-)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
index 1c8987c..28c57c8 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
@@ -42,8 +42,11 @@ import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkResponse;
 import org.apache.inlong.manager.common.pojo.sink.iceberg.IcebergSinkResponse;
 import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkResponse;
 import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceResponse;
 import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceResponse;
 import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceResponse;
 import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
@@ -59,7 +62,9 @@ public class InlongParser {
     public static final String MQ_EXT_INFO = "mqExtInfo";
     public static final String MIDDLEWARE_TYPE = "middlewareType";
     public static final String SINK_INFO = "sinkInfo";
+    public static final String SOURCE_INFO = "sourceInfo";
     public static final String SINK_TYPE = "sinkType";
+    public static final String SOURCE_TYPE = "sourceType";
 
     public static Response parseResponse(String responseBody) {
         Response response = GsonUtil.fromJson(responseBody, Response.class);
@@ -109,6 +114,30 @@ public class InlongParser {
             FullStreamResponse fullStreamResponse = GsonUtil.fromJson(fullStreamJson.toString(),
                     FullStreamResponse.class);
             list.add(fullStreamResponse);
+            //Parse sourceResponse in each stream
+            JsonArray sourceJsonArr = fullStreamJson.getAsJsonArray(SOURCE_INFO);
+            List<SourceResponse> sourceResponses = Lists.newArrayList();
+            fullStreamResponse.setSourceInfo(sourceResponses);
+            for (int j = 0; j < sourceJsonArr.size(); j++) {
+                JsonObject sourceJson = (JsonObject) sourceJsonArr.get(i);
+                String type = sourceJson.get(SOURCE_TYPE).getAsString();
+                SourceType sourceType = SourceType.forType(type);
+                switch (sourceType) {
+                    case BINLOG:
+                        BinlogSourceResponse binlogSourceResponse = GsonUtil.fromJson(sourceJson.toString(),
+                                BinlogSourceResponse.class);
+                        sourceResponses.add(binlogSourceResponse);
+                        break;
+                    case KAFKA:
+                        KafkaSourceResponse kafkaSourceResponse = GsonUtil.fromJson(sourceJson.toString(),
+                                KafkaSourceResponse.class);
+                        sourceResponses.add(kafkaSourceResponse);
+                        break;
+                    default:
+                        throw new RuntimeException(String.format("Unsupport sourceType=%s for Inlong", sourceType));
+                }
+            }
+
             //Parse sinkResponse in each stream
             JsonArray sinkJsonArr = fullStreamJson.getAsJsonArray(SINK_INFO);
             List<SinkResponse> sinkResponses = Lists.newArrayList();
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java
index 8941bc1..1cce061 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceResponse.java
@@ -22,7 +22,7 @@ import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.pojo.source.SourceResponse;
 
 /**
@@ -34,7 +34,9 @@ import org.apache.inlong.manager.common.pojo.source.SourceResponse;
 @ApiModel(value = "Response of the binlog source")
 public class BinlogSourceResponse extends SourceResponse {
 
-    private String sourceType = Constant.SOURCE_BINLOG;
+    public BinlogSourceResponse() {
+        this.setSourceType(SourceType.BINLOG.name());
+    }
 
     @ApiModelProperty("Username of the DB server")
     private String user;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceResponse.java
index 770e111..7879032 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceResponse.java
@@ -22,7 +22,7 @@ import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.pojo.source.SourceResponse;
 
 /**
@@ -34,7 +34,9 @@ import org.apache.inlong.manager.common.pojo.source.SourceResponse;
 @ApiModel(value = "Response of the kafka source")
 public class KafkaSourceResponse extends SourceResponse {
 
-    private String sourceType = Constant.SOURCE_KAFKA;
+    public KafkaSourceResponse() {
+        this.setSourceType(SourceType.KAFKA.name());
+    }
 
     @ApiModelProperty("Kafka topic")
     private String topic;