You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/02/28 11:00:26 UTC
[incubator-inlong] branch master updated: [INLONG-2771][Manager] Add stream source API in manager client module (#2780)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 59afacf [INLONG-2771][Manager] Add stream source API in manager client module (#2780)
59afacf is described below
commit 59afacf63df37147fd1e3528ef59941d8f1bf3c2
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Mon Feb 28 19:00:22 2022 +0800
[INLONG-2771][Manager] Add stream source API in manager client module (#2780)
---
.../apache/inlong/common/enums/DataTypeEnum.java | 3 +-
.../inlong/manager/client/api/DataFormat.java | 20 ++--
.../inlong/manager/client/api/DataSeparator.java | 2 +-
.../inlong/manager/client/api/InlongClient.java | 2 +-
.../inlong/manager/client/api/StreamSink.java | 13 ++-
.../inlong/manager/client/api/StreamSource.java | 16 ++-
.../api/impl/DefaultInlongStreamBuilder.java | 59 ++++++++--
.../manager/client/api/impl/InlongClientImpl.java | 2 +-
.../manager/client/api/impl/InlongGroupImpl.java | 11 +-
.../manager/client/api/impl/InlongStreamImpl.java | 9 +-
.../client/api/inner/InnerInlongManagerClient.java | 101 ++++++++++++++++-
.../client/api/inner/InnerStreamContext.java | 5 +-
.../manager/client/api/{ => sink}/HiveSink.java | 9 +-
.../manager/client/api/source/KafkaSource.java | 64 +++++++++++
.../MySQLBinlogSource.java} | 39 ++++---
.../client/api/{ => source}/MySQLSource.java | 6 +-
.../manager/client/api/util/InlongParser.java | 12 +-
.../api/util/InlongStreamSourceTransfer.java | 122 +++++++++++++++++++++
.../client/api/util/InlongStreamTransfer.java | 9 +-
.../common/pojo/source/binlog/BinlogSourceDTO.java | 3 +
.../source/binlog/BinlogSourceListResponse.java | 5 +
.../pojo/source/kafka/KafkaSourceListResponse.java | 6 +
22 files changed, 451 insertions(+), 67 deletions(-)
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
index 7d32aa3..1916a87 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
@@ -17,6 +17,7 @@
package org.apache.inlong.common.enums;
+import java.util.Locale;
import lombok.Getter;
public enum DataTypeEnum {
@@ -35,7 +36,7 @@ public enum DataTypeEnum {
public static DataTypeEnum forName(String name) {
for (DataTypeEnum dataType : values()) {
- if (dataType.getName().equals(name)) {
+ if (dataType.getName().equals(name.toLowerCase(Locale.ROOT))) {
return dataType;
}
}
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataFormat.java
similarity index 72%
copy from inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
copy to inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataFormat.java
index 7d32aa3..2d9960e 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataFormat.java
@@ -15,30 +15,30 @@
* limitations under the License.
*/
-package org.apache.inlong.common.enums;
+package org.apache.inlong.manager.client.api;
+import java.util.Locale;
import lombok.Getter;
-public enum DataTypeEnum {
+public enum DataFormat {
CSV("csv"),
AVRO("avro"),
- JSON("json"),
CANAL("canal"),
- DEBEZIUM_JSON("debezium_json");
+ NONE("none");
@Getter
private String name;
- DataTypeEnum(String name) {
+ DataFormat(String name) {
this.name = name;
}
- public static DataTypeEnum forName(String name) {
- for (DataTypeEnum dataType : values()) {
- if (dataType.getName().equals(name)) {
- return dataType;
+ public static DataFormat forName(String name) {
+ for (DataFormat dataFormat : values()) {
+ if (dataFormat.getName().equals(name.toLowerCase(Locale.ROOT))) {
+ return dataFormat;
}
}
- throw new IllegalArgumentException(String.format("Unsupport dataType for Inlong:%s", name));
+ throw new IllegalArgumentException(String.format("Unsupport dataformat=%s for Inlong", name));
}
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataSeparator.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataSeparator.java
index c111931..96b0dfa 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataSeparator.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataSeparator.java
@@ -39,7 +39,7 @@ public enum DataSeparator {
return this.asciiCode;
}
- private DataSeparator(String seperator, int asciiCode) {
+ DataSeparator(String seperator, int asciiCode) {
this.asciiCode = asciiCode;
this.seperator = seperator;
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
index f736ee2..f844701 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
@@ -64,7 +64,7 @@ public interface InlongClient {
* @return the inlong group
* @throws Exception the exception
*/
- InlongGroup createGroup(InlongGroupConf groupConf) throws Exception;
+ InlongGroup forGroup(InlongGroupConf groupConf) throws Exception;
/**
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
index 224f12c..689ed7c 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
@@ -26,11 +26,22 @@ import lombok.Data;
public abstract class StreamSink {
public enum SinkType {
- HIVE, ES
+ HIVE, ES, KAFKA;
+
+ public static SinkType forType(String type) {
+ for (SinkType sinkType : values()) {
+ if (sinkType.name().equals(type)) {
+ return sinkType;
+ }
+ }
+ throw new IllegalArgumentException(String.format("Illegal sink type=%s for Inlong", type));
+ }
}
public abstract SinkType getSinkType();
public abstract List<StreamField> getStreamFields();
+ public abstract DataFormat getDataFormat();
+
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java
index 767f9a0..b588e66 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java
@@ -25,14 +25,26 @@ import lombok.Data;
public abstract class StreamSource {
public enum SourceType {
- FILE, KAFKA, DB, BINLOG
+ FILE, KAFKA, DB, BINLOG;
+
+ public static SourceType forType(String type) {
+ for (SourceType sourceType : values()) {
+ if (sourceType.name().equals(type)) {
+ return sourceType;
+ }
+ }
+ throw new IllegalArgumentException(
+ String.format("Unsupport source type=%s for Inlong", type));
+ }
}
public enum SyncType {
- FULL,INCREMENT
+ FULL, INCREMENT
}
public abstract SourceType getSourceType();
public abstract SyncType getSyncType();
+
+ public abstract DataFormat getDataFormat();
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
index e211363..bcd1819 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
@@ -29,13 +29,17 @@ import org.apache.inlong.manager.client.api.StreamField;
import org.apache.inlong.manager.client.api.StreamSink;
import org.apache.inlong.manager.client.api.StreamSink.SinkType;
import org.apache.inlong.manager.client.api.StreamSource;
+import org.apache.inlong.manager.client.api.StreamSource.SourceType;
import org.apache.inlong.manager.client.api.inner.InnerGroupContext;
import org.apache.inlong.manager.client.api.inner.InnerInlongManagerClient;
import org.apache.inlong.manager.client.api.inner.InnerStreamContext;
import org.apache.inlong.manager.client.api.util.GsonUtil;
+import org.apache.inlong.manager.client.api.util.InlongStreamSourceTransfer;
import org.apache.inlong.manager.client.api.util.InlongStreamTransfer;
import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
@@ -71,7 +75,10 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
@Override
public InlongStreamBuilder source(StreamSource source) {
- //todo create SourceRequest
+ inlongStream.setStreamSource(source);
+ SourceRequest sourceRequest = InlongStreamSourceTransfer.createSourceRequest(source,
+ streamContext.getStreamInfo());
+ streamContext.setSourceRequest(sourceRequest);
return this;
}
@@ -97,8 +104,11 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
InlongStreamInfo streamInfo = streamContext.getStreamInfo();
String streamIndex = managerClient.createStreamInfo(streamInfo);
streamInfo.setId(Double.valueOf(streamIndex).intValue());
- //todo save source
-
+ //Create source and update index
+ SourceRequest sourceRequest = streamContext.getSourceRequest();
+ String sourceIndex = managerClient.createSource(sourceRequest);
+ sourceRequest.setId(Double.valueOf(sourceIndex).intValue());
+ //Create sink and update index
SinkRequest sinkRequest = streamContext.getSinkRequest();
String sinkIndex = managerClient.createSink(sinkRequest);
sinkRequest.setId(Double.valueOf(sinkIndex).intValue());
@@ -112,10 +122,10 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
if (existMsg.getKey()) {
Pair<Boolean, String> updateMsg = managerClient.updateStreamInfo(dataStreamInfo);
if (updateMsg.getKey()) {
- //todo init or update source
-
+ SourceRequest sourceRequest = streamContext.getSourceRequest();
+ sourceRequest.setId(initOrUpdateSource(sourceRequest));
SinkRequest sinkRequest = streamContext.getSinkRequest();
- sinkRequest.setId(initOrUpdateStorage(sinkRequest));
+ sinkRequest.setId(initOrUpdateSink(sinkRequest));
} else {
throw new RuntimeException(String.format("Update data stream failed:%s", updateMsg.getValue()));
}
@@ -125,28 +135,53 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
}
}
- private int initOrUpdateStorage(SinkRequest sinkRequest) {
+ private int initOrUpdateSource(SourceRequest sourceRequest) {
+ String sourceType = sourceRequest.getSourceType();
+ if (SourceType.KAFKA.name().equals(sourceType) || SourceType.BINLOG.name().equals(sourceType)) {
+ List<SourceListResponse> responses = managerClient.listSources(sourceRequest.getInlongGroupId(),
+ sourceRequest.getInlongStreamId(), sourceRequest.getSourceType());
+ if (CollectionUtils.isEmpty(responses)) {
+ String sourceIndex = managerClient.createSource(sourceRequest);
+ return Double.valueOf(sourceIndex).intValue();
+ } else {
+ SourceListResponse response = responses.get(0);
+ sourceRequest.setId(response.getId());
+ Pair<Boolean, String> updateMsg = managerClient.updateSource(sourceRequest);
+ if (updateMsg.getKey()) {
+ return response.getId();
+ } else {
+ throw new RuntimeException(
+ String.format("Update source:%s failed with ex:%s", GsonUtil.toJson(sourceRequest),
+ updateMsg.getValue()));
+ }
+ }
+ } else {
+ throw new IllegalArgumentException(String.format("Unsupported source type:%s", sourceType));
+ }
+ }
+
+ private int initOrUpdateSink(SinkRequest sinkRequest) {
String sinkType = sinkRequest.getSinkType();
if (SinkType.HIVE.name().equals(sinkType)) {
- List<SinkListResponse> responses = managerClient.listHiveStorage(sinkRequest.getInlongGroupId(),
- sinkRequest.getInlongStreamId());
+ List<SinkListResponse> responses = managerClient.listSinks(sinkRequest.getInlongGroupId(),
+ sinkRequest.getInlongStreamId(), sinkRequest.getSinkType());
if (CollectionUtils.isEmpty(responses)) {
String storageIndex = managerClient.createSink(sinkRequest);
return Double.valueOf(storageIndex).intValue();
} else {
SinkListResponse response = responses.get(0);
sinkRequest.setId(response.getId());
- Pair<Boolean, String> updateMsg = managerClient.updateStorage(sinkRequest);
+ Pair<Boolean, String> updateMsg = managerClient.updateSink(sinkRequest);
if (updateMsg.getKey()) {
return response.getId();
} else {
throw new RuntimeException(
- String.format("Update storage:%s failed with ex:%s", GsonUtil.toJson(sinkRequest),
+ String.format("Update sink:%s failed with ex:%s", GsonUtil.toJson(sinkRequest),
updateMsg.getValue()));
}
}
} else {
- throw new IllegalArgumentException(String.format("Unsupported storage type:%s", sinkType));
+ throw new IllegalArgumentException(String.format("Unsupported sink type:%s", sinkType));
}
}
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
index 01d2a66..59f1f62 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
@@ -69,7 +69,7 @@ public class InlongClientImpl implements InlongClient {
}
@Override
- public InlongGroup createGroup(InlongGroupConf groupConf) throws Exception {
+ public InlongGroup forGroup(InlongGroupConf groupConf) throws Exception {
return new InlongGroupImpl(groupConf, this);
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
index bc82c4c..dba26be 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
@@ -35,10 +35,12 @@ import org.apache.inlong.manager.client.api.util.AssertUtil;
import org.apache.inlong.manager.client.api.util.GsonUtil;
import org.apache.inlong.manager.client.api.util.InlongGroupTransfer;
import org.apache.inlong.manager.client.api.util.InlongParser;
+import org.apache.inlong.manager.client.api.util.InlongStreamSourceTransfer;
import org.apache.inlong.manager.common.enums.GroupState;
import org.apache.inlong.manager.common.enums.ProcessStatus;
import org.apache.inlong.manager.common.pojo.group.InlongGroupApproveRequest;
import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
+import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
import org.apache.inlong.manager.common.pojo.workflow.ProcessResponse;
@@ -153,9 +155,16 @@ public class InlongGroupImpl implements InlongGroup {
List<InlongStream> streamList = new ArrayList<>();
if (CollectionUtils.isNotEmpty(streamResponses)) {
streamList = streamResponses.stream().map(fullStreamResponse -> {
+ List<SourceListResponse> sourceListResponses = managerClient.listSources(groupId,
+ fullStreamResponse.getStreamInfo().getInlongStreamId());
String streamName = fullStreamResponse.getStreamInfo().getName();
InlongStream stream = groupContext.getStream(streamName);
- return new InlongStreamImpl(fullStreamResponse, stream);
+ InlongStreamImpl inlongStream = new InlongStreamImpl(fullStreamResponse, stream);
+ if (CollectionUtils.isNotEmpty(sourceListResponses)) {
+ inlongStream.setStreamSource(
+ InlongStreamSourceTransfer.parseStreamSource(sourceListResponses.get(0)));
+ }
+ return inlongStream;
}).collect(Collectors.toList());
}
return streamList;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
index dd85857..8065a2f 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
@@ -17,9 +17,11 @@
package org.apache.inlong.manager.client.api.impl;
-import lombok.AllArgsConstructor;
+import java.util.List;
+import java.util.stream.Collectors;
import lombok.Data;
import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
import org.apache.commons.collections.CollectionUtils;
import org.apache.inlong.manager.client.api.InlongStream;
import org.apache.inlong.manager.client.api.StreamField;
@@ -32,12 +34,9 @@ import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
-import java.util.List;
-import java.util.stream.Collectors;
-
@Data
@EqualsAndHashCode(callSuper = true)
-@AllArgsConstructor
+@NoArgsConstructor
public class InlongStreamImpl extends InlongStream {
private String name;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
index 6667781..81ae88f 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
@@ -43,6 +43,8 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupListResponse;
import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
@@ -337,6 +339,88 @@ public class InnerInlongManagerClient {
}
}
+ public String createSource(SourceRequest sourceRequest) {
+ String path = HTTP_PATH + "/source/save";
+ final String source = GsonUtil.toJson(sourceRequest);
+ final RequestBody sourceBody = RequestBody.create(MediaType.parse("application/json"), source);
+ final String url = formatUrl(path);
+ Request request = new Request.Builder()
+ .url(url)
+ .method("POST", sourceBody)
+ .build();
+
+ Call call = httpClient.newCall(request);
+ try {
+ Response response = call.execute();
+ assert response.body() != null;
+ String body = response.body().string();
+ AssertUtil.isTrue(response.isSuccessful(), String.format("Inlong request failed: %s", body));
+ org.apache.inlong.manager.common.beans.Response responseBody = InlongParser.parseResponse(body);
+ AssertUtil.isTrue(responseBody.getErrMsg() == null,
+ String.format("Inlong request failed: %s", responseBody.getErrMsg()));
+ return responseBody.getData().toString();
+ } catch (Exception e) {
+ throw new RuntimeException(String.format("Inlong source save failed: %s", e.getMessage()), e);
+ }
+ }
+
+ public List<SourceListResponse> listSources(String groupId, String streamId) {
+ return listSources(groupId, streamId, null);
+ }
+
+ public List<SourceListResponse> listSources(String groupId, String streamId, String sourceType) {
+ final String path = HTTP_PATH + "/source/list";
+ String url = formatUrl(path);
+ url = String.format("%s&inlongGroupId=%s&inlongStreamId=%s", url, groupId, streamId);
+ if (StringUtils.isNotEmpty(sourceType)) {
+ url = String.format("%s&sourceType=%s", url, sourceType);
+ }
+ Request request = new Request.Builder().get()
+ .url(url)
+ .build();
+
+ Call call = httpClient.newCall(request);
+ try {
+ Response response = call.execute();
+ String body = response.body().string();
+ AssertUtil.isTrue(response.isSuccessful(), String.format("Inlong request failed:%s", body));
+ org.apache.inlong.manager.common.beans.Response responseBody = InlongParser.parseResponse(body);
+ AssertUtil.isTrue(responseBody.getErrMsg() == null,
+ String.format("Inlong request failed:%s", responseBody.getErrMsg()));
+ PageInfo<SourceListResponse> sourceListResponsePageInfo = InlongParser.parseSourceList(
+ responseBody);
+ return sourceListResponsePageInfo.getList();
+ } catch (Exception e) {
+ throw new RuntimeException(String.format("Inlong source list failed with ex:%s", e.getMessage()), e);
+ }
+ }
+
+ public Pair<Boolean, String> updateSource(SourceRequest sourceRequest) {
+ final String path = HTTP_PATH + "/source/update";
+ final String url = formatUrl(path);
+ final String storage = GsonUtil.toJson(sourceRequest);
+ final RequestBody storageBody = RequestBody.create(MediaType.parse("application/json"), storage);
+ Request request = new Request.Builder()
+ .method("POST", storageBody)
+ .url(url)
+ .build();
+
+ Call call = httpClient.newCall(request);
+ try {
+ Response response = call.execute();
+ String body = response.body().string();
+ AssertUtil.isTrue(response.isSuccessful(), String.format("Inlong request failed:%s", body));
+ org.apache.inlong.manager.common.beans.Response responseBody = InlongParser.parseResponse(body);
+ if (responseBody.getData() != null) {
+ return Pair.of(Boolean.valueOf(responseBody.getData().toString()), responseBody.getErrMsg());
+ } else {
+ return Pair.of(false, responseBody.getErrMsg());
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(String.format("Inlong source update failed with ex:%s", e.getMessage()), e);
+ }
+ }
+
public String createSink(SinkRequest sinkRequest) {
String path = HTTP_PATH + "/sink/save";
final String sink = GsonUtil.toJson(sinkRequest);
@@ -362,10 +446,17 @@ public class InnerInlongManagerClient {
}
}
- public List<SinkListResponse> listHiveStorage(String groupId, String streamId) {
+ public List<SinkListResponse> listSinks(String groupId, String streamId) {
+ return listSinks(groupId, streamId, null);
+ }
+
+ public List<SinkListResponse> listSinks(String groupId, String streamId, String sinkType) {
final String path = HTTP_PATH + "/sink/list";
String url = formatUrl(path);
- url = String.format("%s&inlongGroupId=%s&inlongStreamId=%s&sinkType=HIVE", url, groupId, streamId);
+ url = String.format("%s&inlongGroupId=%s&inlongStreamId=%s", url, groupId, streamId);
+ if (StringUtils.isNotEmpty(sinkType)) {
+ url = String.format("%s&sinkType=%s", url, sinkType);
+ }
Request request = new Request.Builder().get()
.url(url)
.build();
@@ -378,15 +469,15 @@ public class InnerInlongManagerClient {
org.apache.inlong.manager.common.beans.Response responseBody = InlongParser.parseResponse(body);
AssertUtil.isTrue(responseBody.getErrMsg() == null,
String.format("Inlong request failed:%s", responseBody.getErrMsg()));
- PageInfo<SinkListResponse> hiveStorageListResponsePageInfo = InlongParser.parseHiveSinkList(
+ PageInfo<SinkListResponse> sinkListResponsePageInfo = InlongParser.parseSinkList(
responseBody);
- return hiveStorageListResponsePageInfo.getList();
+ return sinkListResponsePageInfo.getList();
} catch (Exception e) {
throw new RuntimeException(String.format("Inlong storage list failed with ex:%s", e.getMessage()), e);
}
}
- public Pair<Boolean, String> updateStorage(SinkRequest sinkRequest) {
+ public Pair<Boolean, String> updateSink(SinkRequest sinkRequest) {
final String path = HTTP_PATH + "/sink/update";
final String url = formatUrl(path);
final String storage = GsonUtil.toJson(sinkRequest);
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerStreamContext.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerStreamContext.java
index 7dac32b..c74bb83 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerStreamContext.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerStreamContext.java
@@ -21,6 +21,7 @@ import java.util.List;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
@@ -28,10 +29,10 @@ import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
@NoArgsConstructor
public class InnerStreamContext {
- //todo add SourceRequest
-
private InlongStreamInfo streamInfo;
+ private SourceRequest sourceRequest;
+
private SinkRequest sinkRequest;
public InnerStreamContext(InlongStreamInfo streamInfo) {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/HiveSink.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/HiveSink.java
similarity index 89%
rename from inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/HiveSink.java
rename to inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/HiveSink.java
index debd21f..df1667c 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/HiveSink.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/HiveSink.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.client.api;
+package org.apache.inlong.manager.client.api.sink;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
@@ -27,6 +27,10 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.client.api.DataFormat;
+import org.apache.inlong.manager.client.api.DataSeparator;
+import org.apache.inlong.manager.client.api.StreamField;
+import org.apache.inlong.manager.client.api.StreamSink;
import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
@Data
@@ -93,5 +97,8 @@ public class HiveSink extends StreamSink {
@ApiModelProperty("Other properties if need")
private Map<String, String> properties;
+
+ @ApiModelProperty("Data format type for stream sink")
+ private DataFormat dataFormat;
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/KafkaSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/KafkaSource.java
new file mode 100644
index 0000000..f447f9d
--- /dev/null
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/KafkaSource.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.source;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.client.api.DataFormat;
+import org.apache.inlong.manager.client.api.StreamSource;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+@ApiModel("Base configuration for Kafka collection")
+public class KafkaSource extends StreamSource {
+
+ @ApiModelProperty(value = "DataSource type", required = true)
+ private SourceType sourceType = SourceType.KAFKA;
+
+ @ApiModelProperty("SyncType for Kafka")
+ private SyncType syncType;
+
+ @ApiModelProperty("Data format type for kafka")
+ private DataFormat dataFormat;
+
+ @ApiModelProperty("Kafka topic")
+ private String topic;
+
+ @ApiModelProperty("Kafka consumer group")
+ private String consumerGroup;
+
+ @ApiModelProperty("Kafka servers address, such as: 127.0.0.1:9092")
+ private String bootstrapServers;
+
+ @ApiModelProperty(value = "Limit the amount of data read per second",
+ notes = "Greater than or equal to 0, equal to zero means no limit")
+ private String recordSpeedLimit;
+
+ @ApiModelProperty(value = "Limit the number of bytes read per second",
+ notes = "Greater than or equal to 0, equal to zero means no limit")
+ private String byteSpeedLimit;
+
+ @ApiModelProperty(value = "Topic partition offset",
+ notes = "For example, '0#100_1#10' means the offset of partition 0 is 100, the offset of partition 1 is 10")
+ private String topicPartitionOffset;
+
+}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MySQLSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
similarity index 51%
copy from inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MySQLSource.java
copy to inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
index a93a767..be68273 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MySQLSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
@@ -15,45 +15,48 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.client.api;
+package org.apache.inlong.manager.client.api.source;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
+import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.client.api.DataFormat;
+import org.apache.inlong.manager.client.api.StreamSource;
@Data
@AllArgsConstructor
@NoArgsConstructor
-@ApiModel("Base configuration for MySQL collection")
-public class MySQLSource extends StreamSource {
+@ApiModel("Base configuration for MySQL binlog collection")
+public class MySQLBinlogSource extends StreamSource {
@ApiModelProperty(value = "DataSource type", required = true)
- private SourceType sourceType = SourceType.DB;
+ private SourceType sourceType = SourceType.BINLOG;
@ApiModelProperty("SyncType for MySQL")
private SyncType syncType;
- @ApiModelProperty("Database name")
- private String dbName;
+ @ApiModelProperty("Data format type for binlog")
+ private DataFormat dataFormat = DataFormat.NONE;
- @ApiModelProperty("Data table name, required for increment")
- private String tableName;
+ @ApiModelProperty("Username of the DB server")
+ private String user;
- @ApiModelProperty("Db server username")
- private String username;
-
- @ApiModelProperty("Db password")
+ @ApiModelProperty("Password of the DB server")
private String password;
- @ApiModelProperty("DB Server IP")
- private String dbServerIp;
+ @ApiModelProperty("Hostname of the DB server")
+ private String hostname;
- @ApiModelProperty("DB Server port")
- private int port;
+ @ApiModelProperty(value = "List of DBs to be collected, for example: db1.tb1,db2.tb2",
+ notes = "DBs not in this list are excluded. By default, all DBs are monitored")
+ private List<String> dbNames;
- @ApiModelProperty("SQL statement to collect source data, required for full amount")
- private String dataSql;
+ @ApiModelProperty("Database time zone, Default is UTC")
+ private String timeZone = "UTF";
+ @ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
+ private String timestampFormatStandard = "SQL";
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MySQLSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLSource.java
similarity index 88%
rename from inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MySQLSource.java
rename to inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLSource.java
index a93a767..7395293 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MySQLSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLSource.java
@@ -15,13 +15,15 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.client.api;
+package org.apache.inlong.manager.client.api.source;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.client.api.DataFormat;
+import org.apache.inlong.manager.client.api.StreamSource;
@Data
@AllArgsConstructor
@@ -56,4 +58,6 @@ public class MySQLSource extends StreamSource {
@ApiModelProperty("SQL statement to collect source data, required for full amount")
private String dataSql;
+ @ApiModelProperty("Data format type of source")
+ private DataFormat dataFormat;
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
index 03e65fa..2381c4d 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
@@ -29,6 +29,7 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupListResponse;
import org.apache.inlong.manager.common.pojo.group.InlongGroupPulsarInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
@@ -81,7 +82,16 @@ public class InlongParser {
return pageInfo;
}
- public static PageInfo<SinkListResponse> parseHiveSinkList(Response response) {
+ public static PageInfo<SourceListResponse> parseSourceList(Response response) {
+ Object data = response.getData();
+ String pageInfoJson = GsonUtil.toJson(data);
+ PageInfo<SourceListResponse> pageInfo = GsonUtil.fromJson(pageInfoJson,
+ new TypeToken<PageInfo<SourceListResponse>>() {
+ }.getType());
+ return pageInfo;
+ }
+
+ public static PageInfo<SinkListResponse> parseSinkList(Response response) {
Object data = response.getData();
String pageInfoJson = GsonUtil.toJson(data);
PageInfo<SinkListResponse> pageInfo = GsonUtil.fromJson(pageInfoJson,
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
new file mode 100644
index 0000000..273464c
--- /dev/null
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.util;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import java.util.List;
+import org.apache.inlong.manager.client.api.DataFormat;
+import org.apache.inlong.manager.client.api.StreamSource;
+import org.apache.inlong.manager.client.api.StreamSource.SourceType;
+import org.apache.inlong.manager.client.api.StreamSource.SyncType;
+import org.apache.inlong.manager.client.api.source.KafkaSource;
+import org.apache.inlong.manager.client.api.source.MySQLBinlogSource;
+import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
+import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceRequest;
+import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceRequest;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+
+public class InlongStreamSourceTransfer {
+
+ public static SourceRequest createSourceRequest(StreamSource streamSource, InlongStreamInfo streamInfo) {
+ SourceType sourceType = streamSource.getSourceType();
+ switch (sourceType) {
+ case KAFKA:
+ return createKafkaSourceRequest((KafkaSource) streamSource, streamInfo);
+ case BINLOG:
+ return createBinlogSourceRequest((MySQLBinlogSource) streamSource, streamInfo);
+ default:
+ throw new RuntimeException(String.format("Unsupport source=%s for Inlong", sourceType));
+ }
+ }
+
+ public static StreamSource parseStreamSource(SourceListResponse sourceListResponse) {
+ String type = sourceListResponse.getSourceType();
+ SourceType sourceType = SourceType.forType(type);
+ if (sourceType == SourceType.BINLOG) {
+ return parseKafkaSource((KafkaSourceListResponse) sourceListResponse);
+ } else if (sourceType == SourceType.KAFKA) {
+ return parseMySQLBinlogSource((BinlogSourceListResponse) sourceListResponse);
+ } else {
+ throw new IllegalArgumentException(String.format("Unsupport source type : %s for Inlong", sourceType));
+ }
+ }
+
+ private static KafkaSource parseKafkaSource(KafkaSourceListResponse kafkaSourceResponse) {
+ KafkaSource kafkaSource = new KafkaSource();
+ kafkaSource.setConsumerGroup(kafkaSourceResponse.getGroupId());
+ DataFormat dataFormat = DataFormat.forName(kafkaSourceResponse.getSerializationType());
+ kafkaSource.setDataFormat(dataFormat);
+ kafkaSource.setTopic(kafkaSourceResponse.getTopic());
+ kafkaSource.setBootstrapServers(kafkaSourceResponse.getBootstrapServers());
+ kafkaSource.setByteSpeedLimit(kafkaSourceResponse.getByteSpeedLimit());
+ kafkaSource.setTopicPartitionOffset(kafkaSourceResponse.getTopicPartitionOffset());
+ kafkaSource.setRecordSpeedLimit(kafkaSourceResponse.getRecordSpeedLimit());
+ kafkaSource.setSyncType(SyncType.FULL);
+ return kafkaSource;
+ }
+
+ private static MySQLBinlogSource parseMySQLBinlogSource(BinlogSourceListResponse binlogSourceResponse) {
+ MySQLBinlogSource binlogSource = new MySQLBinlogSource();
+ binlogSource.setHostname(binlogSourceResponse.getHostname());
+ binlogSource.setDataFormat(DataFormat.CANAL);
+ binlogSource.setPassword(binlogSourceResponse.getPassword());
+ binlogSource.setUser(binlogSourceResponse.getUser());
+ binlogSource.setTimeZone(binlogSourceResponse.getTimeZone());
+ binlogSource.setTimestampFormatStandard(binlogSourceResponse.getTimestampFormatStandard());
+ List<String> dbs = Splitter.on(",").splitToList(binlogSourceResponse.getWhitelist());
+ binlogSource.setDbNames(dbs);
+ return binlogSource;
+ }
+
+ private static KafkaSourceRequest createKafkaSourceRequest(KafkaSource kafkaSource, InlongStreamInfo streamInfo) {
+ KafkaSourceRequest sourceRequest = new KafkaSourceRequest();
+ sourceRequest.setInlongGroupId(streamInfo.getInlongGroupId());
+ sourceRequest.setInlongStreamId(streamInfo.getInlongStreamId());
+ sourceRequest.setSourceType(kafkaSource.getSourceType().name());
+ sourceRequest.setBootstrapServers(kafkaSource.getBootstrapServers());
+ sourceRequest.setTopic(kafkaSource.getTopic());
+ sourceRequest.setRecordSpeedLimit(kafkaSource.getRecordSpeedLimit());
+ sourceRequest.setByteSpeedLimit(kafkaSource.getByteSpeedLimit());
+ sourceRequest.setTopicPartitionOffset(kafkaSource.getTopicPartitionOffset());
+ sourceRequest.setGroupId(kafkaSource.getConsumerGroup());
+ sourceRequest.setSerializationType(kafkaSource.getDataFormat().getName());
+ return sourceRequest;
+ }
+
+ private static BinlogSourceRequest createBinlogSourceRequest(MySQLBinlogSource binlogSource,
+ InlongStreamInfo streamInfo) {
+ BinlogSourceRequest binlogSourceRequest = new BinlogSourceRequest();
+ binlogSourceRequest.setInlongGroupId(streamInfo.getInlongGroupId());
+ binlogSourceRequest.setInlongStreamId(streamInfo.getInlongStreamId());
+ binlogSourceRequest.setSourceType(binlogSource.getSourceType().name());
+ binlogSourceRequest.setPassword(binlogSource.getPassword());
+ binlogSourceRequest.setUser(binlogSource.getUser());
+ binlogSourceRequest.setHostname(binlogSource.getHostname());
+ String dbNames = Joiner.on(",").join(binlogSource.getDbNames());
+ binlogSourceRequest.setWhitelist(dbNames);
+ binlogSourceRequest.setTimestampFormatStandard(binlogSource.getTimestampFormatStandard());
+ binlogSourceRequest.setTimeZone(binlogSource.getTimeZone());
+ binlogSourceRequest.setSnapshotMode("initial");
+ binlogSourceRequest.setIntervalMs("500");
+ return binlogSourceRequest;
+ }
+}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
index db702cb..e3f4234 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
@@ -20,7 +20,7 @@ package org.apache.inlong.manager.client.api.util;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.client.api.HiveSink;
+import org.apache.inlong.manager.client.api.sink.HiveSink;
import org.apache.inlong.manager.client.api.InlongStreamConf;
import org.apache.inlong.manager.client.api.StreamField;
import org.apache.inlong.manager.client.api.StreamField.FieldType;
@@ -80,11 +80,12 @@ public class InlongStreamTransfer {
}
public static StreamSink parseStreamSink(SinkResponse sinkResponse, StreamSink streamSink) {
- String sinkType = sinkResponse.getSinkType();
- if ("HIVE".equals(sinkType)) {
+ String type = sinkResponse.getSinkType();
+ SinkType sinkType = SinkType.forType(type);
+ if (sinkType == SinkType.HIVE) {
return parseHiveSink(sinkResponse, (HiveSink) streamSink);
} else {
- throw new IllegalArgumentException(String.format("Unsupport storage type : %s for Inlong", sinkType));
+ throw new IllegalArgumentException(String.format("Unsupport sink type : %s for Inlong", sinkType));
}
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceDTO.java
index a7b54c1..e639869 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceDTO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceDTO.java
@@ -81,6 +81,9 @@ public class BinlogSourceDTO {
@ApiModelProperty("The file path to store history info")
private String storeHistoryFilename;
+ @ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
+ private String timestampFormatStandard = "SQL";
+
/**
* Get the dto instance from the request
*/
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java
index dbee39d..5f66e42 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/binlog/BinlogSourceListResponse.java
@@ -37,6 +37,9 @@ public class BinlogSourceListResponse extends SourceListResponse {
@ApiModelProperty("Hostname of the DB server")
private String hostname;
+ @ApiModelProperty("Password of the DB server")
+ private String password;
+
@ApiModelProperty(value = "List of DBs to be collected, supporting regular expressions")
private String whitelist;
@@ -52,5 +55,7 @@ public class BinlogSourceListResponse extends SourceListResponse {
@ApiModelProperty("Snapshot mode, supports: initial, when_needed, never, schema_only, schema_only_recovery")
private String snapshotMode;
+ @ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
+ private String timestampFormatStandard = "SQL";
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
index 96b8e4d..6c1c235 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
@@ -46,4 +46,10 @@ public class KafkaSourceListResponse extends SourceListResponse {
@ApiModelProperty("Limit the number of bytes read per second")
private String byteSpeedLimit;
+ @ApiModelProperty("Data Serialization, support: Json, Canal, Avro, etc")
+ private String serializationType = "none";
+
+ @ApiModelProperty(value = "Topic partition offset",
+ notes = "For example, '0#100_1#10' means the offset of partition 0 is 100, the offset of partition 1 is 10")
+ private String topicPartitionOffset;
}