You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/05/05 14:04:10 UTC
[incubator-inlong] branch master updated: [INLONG-4070][Manager] Add update and delete APIs for inlong stream in manager client (#4074)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 011d12bb0 [INLONG-4070][Manager] Add update and delete APIs for inlong stream in manager client (#4074)
011d12bb0 is described below
commit 011d12bb07d0f652ecf3f5e103f27c1129324caa
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Thu May 5 22:04:05 2022 +0800
[INLONG-4070][Manager] Add update and delete APIs for inlong stream in manager client (#4074)
---
.../inlong/manager/client/api/InlongStream.java | 6 +-
.../api/impl/DefaultInlongStreamBuilder.java | 162 ++++++++++----------
.../manager/client/api/impl/InlongGroupImpl.java | 4 +-
.../manager/client/api/impl/InlongStreamImpl.java | 165 +++++++++++++++++++--
.../client/api/inner/InnerInlongManagerClient.java | 56 +++++++
.../client/api/impl/InlongStreamImplTest.java | 4 +-
.../manager/service/sort/util/FieldInfoUtils.java | 18 ++-
.../manager/service/sort/util/LoadNodeUtils.java | 10 +-
8 files changed, 311 insertions(+), 114 deletions(-)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStream.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStream.java
index 169b7befa..26a3a2c6f 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStream.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStream.java
@@ -46,9 +46,5 @@ public abstract class InlongStream {
public abstract StreamPipeline createPipeline();
- @Deprecated
- public abstract void updateSource(StreamSource source);
-
- @Deprecated
- public abstract void updateSink(StreamSink sink);
+ public abstract InlongStream update();
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
index 60f20df6f..c94d39ee6 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
@@ -25,9 +25,6 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.inlong.manager.client.api.InlongStream;
import org.apache.inlong.manager.client.api.InlongStreamBuilder;
import org.apache.inlong.manager.client.api.InlongStreamConf;
-import org.apache.inlong.manager.common.pojo.stream.StreamSink;
-import org.apache.inlong.manager.common.pojo.stream.StreamSource;
-import org.apache.inlong.manager.common.pojo.stream.StreamTransform;
import org.apache.inlong.manager.client.api.inner.InnerGroupContext;
import org.apache.inlong.manager.client.api.inner.InnerInlongManagerClient;
import org.apache.inlong.manager.client.api.inner.InnerStreamContext;
@@ -36,8 +33,6 @@ import org.apache.inlong.manager.client.api.util.InlongStreamSinkTransfer;
import org.apache.inlong.manager.client.api.util.InlongStreamSourceTransfer;
import org.apache.inlong.manager.client.api.util.InlongStreamTransfer;
import org.apache.inlong.manager.client.api.util.InlongStreamTransformTransfer;
-import org.apache.inlong.manager.common.enums.SinkType;
-import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
@@ -46,6 +41,9 @@ import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.stream.StreamField;
import org.apache.inlong.manager.common.pojo.stream.StreamPipeline;
+import org.apache.inlong.manager.common.pojo.stream.StreamSink;
+import org.apache.inlong.manager.common.pojo.stream.StreamSource;
+import org.apache.inlong.manager.common.pojo.stream.StreamTransform;
import org.apache.inlong.manager.common.pojo.transform.TransformRequest;
import org.apache.inlong.manager.common.pojo.transform.TransformResponse;
@@ -72,7 +70,8 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
InnerStreamContext streamContext = new InnerStreamContext(stream);
groupContext.setStreamContext(streamContext);
this.streamContext = streamContext;
- this.inlongStream = new InlongStreamImpl(stream.getName());
+ this.inlongStream = new InlongStreamImpl(groupContext.getGroupInfo().getName(), stream.getName(),
+ managerClient);
if (CollectionUtils.isNotEmpty(streamConf.getStreamFields())) {
this.inlongStream.setStreamFields(streamConf.getStreamFields());
}
@@ -150,13 +149,12 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
Pair<Boolean, InlongStreamInfo> existMsg = managerClient.isStreamExists(dataStreamInfo);
if (existMsg.getKey()) {
Pair<Boolean, String> updateMsg = managerClient.updateStreamInfo(dataStreamInfo);
- if (updateMsg.getKey()) {
- initOrUpdateTransform();
- initOrUpdateSource();
- initOrUpdateSink();
- } else {
+ if (!updateMsg.getKey()) {
throw new RuntimeException(String.format("Update data stream failed:%s", updateMsg.getValue()));
}
+ initOrUpdateTransform();
+ initOrUpdateSource();
+ initOrUpdateSink();
return inlongStream;
} else {
return init();
@@ -169,6 +167,7 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
final String groupId = streamInfo.getInlongGroupId();
final String streamId = streamInfo.getInlongStreamId();
List<TransformResponse> transformResponses = managerClient.listTransform(groupId, streamId);
+ List<String> updateTransformNames = Lists.newArrayList();
for (TransformResponse transformResponse : transformResponses) {
StreamTransform transform = InlongStreamTransformTransfer.parseStreamTransform(transformResponse);
String transformName = transform.getTransformName();
@@ -186,93 +185,94 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
throw new RuntimeException(String.format("Update transform=%s failed with err=%s", transformRequest,
updateState.getValue()));
}
+ transformRequest.setId(transformResponse.getId());
+ updateTransformNames.add(transformName);
}
}
- }
-
- private void initOrUpdateSource() {
- List<SourceRequest> sourceRequests = Lists.newArrayList(streamContext.getSourceRequests().values());
- for (SourceRequest sourceRequest : sourceRequests) {
- sourceRequest.setId(initOrUpdateSource(sourceRequest));
+ for (Map.Entry<String, TransformRequest> requestEntry : transformRequests.entrySet()) {
+ String transformName = requestEntry.getKey();
+ if (updateTransformNames.contains(transformName)) {
+ continue;
+ }
+ TransformRequest transformRequest = requestEntry.getValue();
+ String index = managerClient.createTransform(transformRequest);
+ transformRequest.setId(Double.valueOf(index).intValue());
}
}
- private int initOrUpdateSource(SourceRequest sourceRequest) {
- String sourceType = sourceRequest.getSourceType();
- if (SourceType.KAFKA.name().equals(sourceType) || SourceType.BINLOG.name().equals(sourceType)) {
- List<SourceListResponse> responses = managerClient.listSources(sourceRequest.getInlongGroupId(),
- sourceRequest.getInlongStreamId(), sourceRequest.getSourceType());
- if (CollectionUtils.isEmpty(responses)) {
- String sourceIndex = managerClient.createSource(sourceRequest);
- return Double.valueOf(sourceIndex).intValue();
- } else {
- SourceListResponse sourceListResponse = null;
- for (SourceListResponse response : responses) {
- if (response.getSourceName().equals(sourceRequest.getSourceName())) {
- sourceListResponse = response;
- break;
- }
+ private void initOrUpdateSource() {
+ Map<String, SourceRequest> sourceRequests = streamContext.getSourceRequests();
+ InlongStreamInfo streamInfo = streamContext.getStreamInfo();
+ final String groupId = streamInfo.getInlongGroupId();
+ final String streamId = streamInfo.getInlongStreamId();
+ List<SourceListResponse> sourceListResponses = managerClient.listSources(groupId, streamId);
+ List<String> updateSourceNames = Lists.newArrayList();
+ for (SourceListResponse sourceListResponse : sourceListResponses) {
+ String sourceName = sourceListResponse.getSourceName();
+ int id = sourceListResponse.getId();
+ String type = sourceListResponse.getSourceType();
+ if (sourceRequests.get(sourceName) == null) {
+ boolean isDelete = managerClient.deleteSource(id, type);
+ if (!isDelete) {
+ throw new RuntimeException(String.format("Delete source=%s failed", sourceListResponse));
}
- if (sourceListResponse == null) {
- String sourceIndex = managerClient.createSource(sourceRequest);
- return Double.valueOf(sourceIndex).intValue();
+ } else {
+ SourceRequest sourceRequest = sourceRequests.get(sourceName);
+ Pair<Boolean, String> updateState = managerClient.updateSource(sourceRequest);
+ if (!updateState.getKey()) {
+ throw new RuntimeException(String.format("Update source=%s failed with err=%s", sourceRequest,
+ updateState.getValue()));
}
+ updateSourceNames.add(sourceName);
sourceRequest.setId(sourceListResponse.getId());
- Pair<Boolean, String> updateMsg = managerClient.updateSource(sourceRequest);
- if (updateMsg.getKey()) {
- return sourceListResponse.getId();
- } else {
- throw new RuntimeException(
- String.format("Update source:%s failed with ex:%s", GsonUtil.toJson(sourceRequest),
- updateMsg.getValue()));
- }
}
- } else {
- throw new IllegalArgumentException(String.format("Unsupported source type:%s", sourceType));
}
- }
-
- private void initOrUpdateSink() {
- List<SinkRequest> sinkRequests = Lists.newArrayList(streamContext.getSinkRequests().values());
- for (SinkRequest sinkRequest : sinkRequests) {
- sinkRequest.setId(initOrUpdateSink(sinkRequest));
+ for (Map.Entry<String, SourceRequest> requestEntry : sourceRequests.entrySet()) {
+ String sourceName = requestEntry.getKey();
+ if (updateSourceNames.contains(sourceName)) {
+ continue;
+ }
+ SourceRequest sourceRequest = requestEntry.getValue();
+ String index = managerClient.createSource(sourceRequest);
+ sourceRequest.setId(Double.valueOf(index).intValue());
}
}
- private int initOrUpdateSink(SinkRequest sinkRequest) {
- String sinkType = sinkRequest.getSinkType();
- boolean flag = SinkType.HIVE.name().equals(sinkType) || SinkType.KAFKA.name().equals(sinkType)
- || SinkType.CLICKHOUSE.name().equals(sinkType);
- if (flag) {
- List<SinkListResponse> responses = managerClient.listSinks(sinkRequest.getInlongGroupId(),
- sinkRequest.getInlongStreamId(), sinkRequest.getSinkType());
- if (CollectionUtils.isEmpty(responses)) {
- String sinkIndex = managerClient.createSink(sinkRequest);
- return Double.valueOf(sinkIndex).intValue();
- } else {
- SinkListResponse sinkListResponse = null;
- for (SinkListResponse response : responses) {
- if (response.getSinkName().equals(sinkRequest.getSinkName())) {
- sinkListResponse = response;
- break;
- }
+ private void initOrUpdateSink() {
+ Map<String, SinkRequest> sinkRequests = streamContext.getSinkRequests();
+ InlongStreamInfo streamInfo = streamContext.getStreamInfo();
+ final String groupId = streamInfo.getInlongGroupId();
+ final String streamId = streamInfo.getInlongStreamId();
+ List<SinkListResponse> sinkListResponses = managerClient.listSinks(groupId, streamId);
+ List<String> updateSinkNames = Lists.newArrayList();
+ for (SinkListResponse sinkListResponse : sinkListResponses) {
+ String sinkName = sinkListResponse.getSinkName();
+ int id = sinkListResponse.getId();
+ String type = sinkListResponse.getSinkType();
+ if (sinkRequests.get(sinkName) == null) {
+ boolean isDelete = managerClient.deleteSink(id, type);
+ if (!isDelete) {
+ throw new RuntimeException(String.format("Delete sink=%s failed", sinkListResponse));
}
- if (sinkListResponse == null) {
- String sinkIndex = managerClient.createSink(sinkRequest);
- return Double.valueOf(sinkIndex).intValue();
+ } else {
+ SinkRequest sinkRequest = sinkRequests.get(sinkName);
+ Pair<Boolean, String> updateState = managerClient.updateSink(sinkRequest);
+ if (!updateState.getKey()) {
+ throw new RuntimeException(String.format("Update sink=%s failed with err=%s", sinkRequest,
+ updateState.getValue()));
}
+ updateSinkNames.add(sinkName);
sinkRequest.setId(sinkListResponse.getId());
- Pair<Boolean, String> updateMsg = managerClient.updateSink(sinkRequest);
- if (updateMsg.getKey()) {
- return sinkListResponse.getId();
- } else {
- throw new RuntimeException(
- String.format("Update sink:%s failed with ex:%s", GsonUtil.toJson(sinkRequest),
- updateMsg.getValue()));
- }
}
- } else {
- throw new IllegalArgumentException(String.format("Unsupported sink type:%s", sinkType));
+ }
+ for (Map.Entry<String, SinkRequest> requestEntry : sinkRequests.entrySet()) {
+ String sinkName = requestEntry.getKey();
+ if (updateSinkNames.contains(sinkName)) {
+ continue;
+ }
+ SinkRequest sinkRequest = requestEntry.getValue();
+ String index = managerClient.createSink(sinkRequest);
+ sinkRequest.setId(Double.valueOf(index).intValue());
}
}
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
index 8f7e1b360..825ba0689 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
@@ -271,6 +271,8 @@ public class InlongGroupImpl implements InlongGroup {
if (CollectionUtils.isEmpty(streamResponses)) {
return null;
}
- return streamResponses.stream().map(InlongStreamImpl::new).collect(Collectors.toList());
+ return streamResponses.stream()
+ .map(fullStreamResponse -> new InlongStreamImpl(fullStreamResponse, managerClient))
+ .collect(Collectors.toList());
}
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
index 1915eaa96..60515cf1f 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
@@ -22,19 +22,23 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import lombok.Data;
import lombok.EqualsAndHashCode;
-import lombok.NoArgsConstructor;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.inlong.manager.client.api.InlongStream;
-import org.apache.inlong.manager.common.pojo.stream.StreamSink;
-import org.apache.inlong.manager.common.pojo.stream.StreamSource;
-import org.apache.inlong.manager.common.pojo.stream.StreamTransform;
+import org.apache.inlong.manager.client.api.inner.InnerInlongManagerClient;
import org.apache.inlong.manager.client.api.util.AssertUtil;
+import org.apache.inlong.manager.client.api.util.GsonUtil;
import org.apache.inlong.manager.client.api.util.InlongStreamSinkTransfer;
import org.apache.inlong.manager.client.api.util.InlongStreamSourceTransfer;
+import org.apache.inlong.manager.client.api.util.InlongStreamTransfer;
+import org.apache.inlong.manager.client.api.util.InlongStreamTransformTransfer;
import org.apache.inlong.manager.common.enums.FieldType;
+import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
+import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
import org.apache.inlong.manager.common.pojo.source.SourceResponse;
import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
@@ -42,6 +46,11 @@ import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.stream.StreamField;
import org.apache.inlong.manager.common.pojo.stream.StreamNodeRelationship;
import org.apache.inlong.manager.common.pojo.stream.StreamPipeline;
+import org.apache.inlong.manager.common.pojo.stream.StreamSink;
+import org.apache.inlong.manager.common.pojo.stream.StreamSource;
+import org.apache.inlong.manager.common.pojo.stream.StreamTransform;
+import org.apache.inlong.manager.common.pojo.transform.TransformRequest;
+import org.apache.inlong.manager.common.pojo.transform.TransformResponse;
import java.util.List;
import java.util.Map;
@@ -50,9 +59,12 @@ import java.util.stream.Collectors;
@Data
@EqualsAndHashCode(callSuper = true)
-@NoArgsConstructor
public class InlongStreamImpl extends InlongStream {
+ private InnerInlongManagerClient managerClient;
+
+ private String groupName;
+
private String name;
private Map<String, StreamSource> streamSources = Maps.newHashMap();
@@ -63,9 +75,11 @@ public class InlongStreamImpl extends InlongStream {
private List<StreamField> streamFields = Lists.newArrayList();
- public InlongStreamImpl(FullStreamResponse fullStreamResponse) {
+ public InlongStreamImpl(FullStreamResponse fullStreamResponse, InnerInlongManagerClient managerClient) {
InlongStreamInfo streamInfo = fullStreamResponse.getStreamInfo();
+ this.managerClient = managerClient;
this.name = streamInfo.getName();
+ this.groupName = streamInfo.getInlongGroupId().substring(2);
List<InlongStreamFieldInfo> streamFieldInfos = streamInfo.getFieldList();
if (CollectionUtils.isNotEmpty(streamFieldInfos)) {
this.streamFields = streamFieldInfos.stream()
@@ -106,7 +120,9 @@ public class InlongStreamImpl extends InlongStream {
}
- public InlongStreamImpl(String name) {
+ public InlongStreamImpl(String group, String name, InnerInlongManagerClient managerClient) {
+ this.managerClient = managerClient;
+ this.groupName = group;
this.name = name;
}
@@ -234,15 +250,136 @@ public class InlongStreamImpl extends InlongStream {
}
@Override
- public void updateSource(StreamSource source) {
- AssertUtil.notNull(source.getSourceName(), "Source name should not be empty");
- streamSources.put(source.getSourceName(), source);
+ public InlongStream update() {
+ InlongStreamInfo streamInfo = new InlongStreamInfo();
+ streamInfo.setInlongStreamId("b_" + name);
+ streamInfo.setInlongGroupId("b_" + groupName);
+ streamInfo = managerClient.getStreamInfo(streamInfo);
+ if (streamInfo == null) {
+ throw new IllegalArgumentException(
+ String.format("Stream is not exists for group=%s and stream=%s", groupName, name));
+ }
+ streamInfo.setFieldList(InlongStreamTransfer.createStreamFields(this.streamFields, streamInfo));
+ StreamPipeline streamPipeline = createPipeline();
+ streamInfo.setTempView(GsonUtil.toJson(streamPipeline));
+ Pair<Boolean, String> updateMsg = managerClient.updateStreamInfo(streamInfo);
+ if (!updateMsg.getKey()) {
+ throw new RuntimeException(String.format("Update data stream failed:%s", updateMsg.getValue()));
+ }
+ initOrUpdateTransform(streamInfo);
+ initOrUpdateSource(streamInfo);
+ initOrUpdateSink(streamInfo);
+ return this;
}
- @Override
- public void updateSink(StreamSink sink) {
- AssertUtil.notNull(sink.getSinkName(), "Sink name should not be empty");
- streamSinks.put(sink.getSinkName(), sink);
+ private void initOrUpdateTransform(InlongStreamInfo streamInfo) {
+ final String groupId = "b_" + groupName;
+ final String streamId = "b_" + name;
+ List<TransformResponse> transformResponses = managerClient.listTransform(groupId, streamId);
+ List<String> updateTransformNames = Lists.newArrayList();
+ for (TransformResponse transformResponse : transformResponses) {
+ StreamTransform transform = InlongStreamTransformTransfer.parseStreamTransform(transformResponse);
+ String transformName = transform.getTransformName();
+ if (this.streamTransforms.get(transformName) == null) {
+ TransformRequest transformRequest = InlongStreamTransformTransfer.createTransformRequest(transform,
+ streamInfo);
+ boolean isDelete = managerClient.deleteTransform(transformRequest);
+ if (!isDelete) {
+ throw new RuntimeException(String.format("Delete transform=%s failed", transformRequest));
+ }
+ } else {
+ StreamTransform newTransform = this.streamTransforms.get(transformName);
+ TransformRequest transformRequest = InlongStreamTransformTransfer.createTransformRequest(newTransform,
+ streamInfo);
+ Pair<Boolean, String> updateState = managerClient.updateTransform(transformRequest);
+ if (!updateState.getKey()) {
+ throw new RuntimeException(String.format("Update transform=%s failed with err=%s", transformRequest,
+ updateState.getValue()));
+ }
+ updateTransformNames.add(transformName);
+ }
+ }
+ for (Map.Entry<String, StreamTransform> transformEntry : this.streamTransforms.entrySet()) {
+ String transformName = transformEntry.getKey();
+ if (updateTransformNames.contains(transformName)) {
+ continue;
+ }
+ StreamTransform transform = transformEntry.getValue();
+ TransformRequest transformRequest = InlongStreamTransformTransfer.createTransformRequest(transform,
+ streamInfo);
+ managerClient.createTransform(transformRequest);
+ }
+ }
+
+ private void initOrUpdateSource(InlongStreamInfo streamInfo) {
+ final String groupId = "b_" + groupName;
+ final String streamId = "b_" + name;
+ List<SourceListResponse> sourceListResponses = managerClient.listSources(groupId, streamId);
+ List<String> updateSourceNames = Lists.newArrayList();
+ for (SourceListResponse sourceListResponse : sourceListResponses) {
+ String sourceName = sourceListResponse.getSourceName();
+ int id = sourceListResponse.getId();
+ String type = sourceListResponse.getSourceType();
+ if (this.streamSources.get(sourceName) == null) {
+ boolean isDelete = managerClient.deleteSource(id, type);
+ if (!isDelete) {
+ throw new RuntimeException(String.format("Delete source=%s failed", sourceListResponse));
+ }
+ } else {
+ StreamSource source = this.streamSources.get(sourceName);
+ SourceRequest sourceRequest = InlongStreamSourceTransfer.createSourceRequest(source, streamInfo);
+ Pair<Boolean, String> updateState = managerClient.updateSource(sourceRequest);
+ if (!updateState.getKey()) {
+ throw new RuntimeException(String.format("Update source=%s failed with err=%s", sourceRequest,
+ updateState.getValue()));
+ }
+ updateSourceNames.add(sourceName);
+ }
+ }
+ for (Map.Entry<String, StreamSource> requestEntry : streamSources.entrySet()) {
+ String sourceName = requestEntry.getKey();
+ if (updateSourceNames.contains(sourceName)) {
+ continue;
+ }
+ StreamSource streamSource = requestEntry.getValue();
+ SourceRequest sourceRequest = InlongStreamSourceTransfer.createSourceRequest(streamSource, streamInfo);
+ managerClient.createSource(sourceRequest);
+ }
}
+ private void initOrUpdateSink(InlongStreamInfo streamInfo) {
+ final String groupId = "b_" + groupName;
+ final String streamId = "b_" + name;
+ List<SinkListResponse> sinkListResponses = managerClient.listSinks(groupId, streamId);
+ List<String> updateSinkNames = Lists.newArrayList();
+ for (SinkListResponse sinkListResponse : sinkListResponses) {
+ String sinkName = sinkListResponse.getSinkName();
+ int id = sinkListResponse.getId();
+ String type = sinkListResponse.getSinkType();
+ if (this.streamSinks.get(sinkName) == null) {
+ boolean isDelete = managerClient.deleteSink(id, type);
+ if (!isDelete) {
+ throw new RuntimeException(String.format("Delete sink=%s failed", sinkListResponse));
+ }
+ } else {
+ StreamSink sink = this.streamSinks.get(sinkName);
+ SinkRequest sinkRequest = InlongStreamSinkTransfer.createSinkRequest(sink, streamInfo);
+ Pair<Boolean, String> updateState = managerClient.updateSink(sinkRequest);
+ if (!updateState.getKey()) {
+ throw new RuntimeException(String.format("Update sink=%s failed with err=%s", sinkRequest,
+ updateState.getValue()));
+ }
+ updateSinkNames.add(sinkName);
+ }
+ }
+ for (Map.Entry<String, StreamSink> requestEntry : streamSinks.entrySet()) {
+ String sinkName = requestEntry.getKey();
+ if (updateSinkNames.contains(sinkName)) {
+ continue;
+ }
+ StreamSink streamSink = requestEntry.getValue();
+ SinkRequest sinkRequest = InlongStreamSinkTransfer.createSinkRequest(streamSink, streamInfo);
+ managerClient.createSink(sinkRequest);
+ }
+ }
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
index e8a2a6dd1..1e67fa5ad 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
@@ -481,6 +481,34 @@ public class InnerInlongManagerClient {
}
}
+ public boolean deleteSource(int id, String type) {
+ AssertUtil.isTrue(id > 0, "sourceId is illegal");
+ AssertUtil.notEmpty(type, "sourceType should not be null");
+ final String path = HTTP_PATH + "/source/delete/" + id;
+ String url = formatUrl(path);
+ url = String.format("%s&sourceType=%s", url, type);
+ RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), "");
+ Request request = new Request.Builder()
+ .url(url)
+ .method("DELETE", requestBody)
+ .build();
+
+ Call call = httpClient.newCall(request);
+ try {
+ okhttp3.Response response = call.execute();
+ assert response.body() != null;
+ String body = response.body().string();
+ assertHttpSuccess(response, body, path);
+ Response responseBody = InlongParser.parseResponse(body);
+ AssertUtil.isTrue(responseBody.getErrMsg() == null,
+ String.format("Inlong request failed: %s", responseBody.getErrMsg()));
+ return Boolean.parseBoolean(responseBody.getData().toString());
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("Inlong source delete failed: %s", e.getMessage()), e);
+ }
+ }
+
public String createTransform(TransformRequest transformRequest) {
String path = HTTP_PATH + "/transform/save";
final String sink = GsonUtil.toJson(transformRequest);
@@ -611,6 +639,34 @@ public class InnerInlongManagerClient {
}
}
+ public boolean deleteSink(int id, String type) {
+ AssertUtil.isTrue(id > 0, "sinkId is illegal");
+ AssertUtil.notEmpty(type, "sinkType should not be null");
+ final String path = HTTP_PATH + "/sink/delete/" + id;
+ String url = formatUrl(path);
+ url = String.format("%s&sinkType=%s", url, type);
+ RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), "");
+ Request request = new Request.Builder()
+ .url(url)
+ .method("DELETE", requestBody)
+ .build();
+
+ Call call = httpClient.newCall(request);
+ try {
+ okhttp3.Response response = call.execute();
+ assert response.body() != null;
+ String body = response.body().string();
+ assertHttpSuccess(response, body, path);
+ Response responseBody = InlongParser.parseResponse(body);
+ AssertUtil.isTrue(responseBody.getErrMsg() == null,
+ String.format("Inlong request failed: %s", responseBody.getErrMsg()));
+ return Boolean.parseBoolean(responseBody.getData().toString());
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("Inlong sink delete failed: %s", e.getMessage()), e);
+ }
+ }
+
public List<SinkListResponse> listSinks(String groupId, String streamId) {
return listSinks(groupId, streamId, null);
}
diff --git a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongStreamImplTest.java b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongStreamImplTest.java
index 19d179588..14e660c23 100644
--- a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongStreamImplTest.java
+++ b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongStreamImplTest.java
@@ -18,7 +18,6 @@
package org.apache.inlong.manager.client.api.impl;
import org.apache.inlong.manager.client.api.InlongStream;
-import org.apache.inlong.manager.common.pojo.stream.StreamTransform;
import org.apache.inlong.manager.client.api.sink.ClickHouseSink;
import org.apache.inlong.manager.client.api.sink.HiveSink;
import org.apache.inlong.manager.client.api.sink.KafkaSink;
@@ -28,6 +27,7 @@ import org.apache.inlong.manager.client.api.transform.MultiDependencyTransform;
import org.apache.inlong.manager.client.api.transform.SingleDependencyTransform;
import org.apache.inlong.manager.client.api.util.GsonUtil;
import org.apache.inlong.manager.common.pojo.stream.StreamPipeline;
+import org.apache.inlong.manager.common.pojo.stream.StreamTransform;
import org.apache.inlong.manager.common.pojo.transform.filter.FilterDefinition;
import org.apache.inlong.manager.common.pojo.transform.filter.FilterDefinition.FilterStrategy;
import org.apache.inlong.manager.common.pojo.transform.joiner.JoinerDefinition;
@@ -41,7 +41,7 @@ public class InlongStreamImplTest {
@Test
public void testCreatePipeline() {
- InlongStream inlongStream = new InlongStreamImpl();
+ InlongStream inlongStream = new InlongStreamImpl("group", "stream", null);
// add stream source
KafkaSource kafkaSource = new KafkaSource();
kafkaSource.setSourceName("A");
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldInfoUtils.java
index 6538dd5c6..f0a19492d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FieldInfoUtils.java
@@ -45,7 +45,7 @@ import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.transformation.FieldMappingRule.FieldMappingUnit;
import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -58,7 +58,7 @@ public class FieldInfoUtils {
/**
* Built in field map, key is field name, value is built in field name
*/
- public static final Map<String, BuiltInField> BUILT_IN_FIELD_MAP = new HashMap<>();
+ public static final Map<String, BuiltInField> BUILT_IN_FIELD_MAP = new LinkedHashMap<>();
static {
BUILT_IN_FIELD_MAP.put(MetaFieldType.DATA_TIME.getName(), BuiltInField.DATA_TIME);
@@ -68,20 +68,28 @@ public class FieldInfoUtils {
BUILT_IN_FIELD_MAP.put(MetaFieldType.IS_DDL.getName(), BuiltInField.MYSQL_METADATA_IS_DDL);
BUILT_IN_FIELD_MAP.put(MetaFieldType.EVENT_TYPE.getName(), BuiltInField.MYSQL_METADATA_EVENT_TYPE);
BUILT_IN_FIELD_MAP.put(MetaFieldType.PROCESSING_TIME.getName(), BuiltInField.PROCESS_TIME);
- BUILT_IN_FIELD_MAP.put(MetaFieldType.MYSQL_DATA.getName(), BuiltInField.MYSQL_METADATA_DATA);
BUILT_IN_FIELD_MAP.put(MetaFieldType.UPDATE_BEFORE.getName(), BuiltInField.METADATA_UPDATE_BEFORE);
BUILT_IN_FIELD_MAP.put(MetaFieldType.BATCH_ID.getName(), BuiltInField.METADATA_BATCH_ID);
BUILT_IN_FIELD_MAP.put(MetaFieldType.SQL_TYPE.getName(), BuiltInField.METADATA_SQL_TYPE);
BUILT_IN_FIELD_MAP.put(MetaFieldType.TS.getName(), BuiltInField.METADATA_TS);
BUILT_IN_FIELD_MAP.put(MetaFieldType.MYSQL_TYPE.getName(), BuiltInField.METADATA_MYSQL_TYPE);
BUILT_IN_FIELD_MAP.put(MetaFieldType.PK_NAMES.getName(), BuiltInField.METADATA_PK_NAMES);
+ BUILT_IN_FIELD_MAP.put(MetaFieldType.MYSQL_DATA.getName(), BuiltInField.MYSQL_METADATA_DATA);
+ }
+
+ public static FieldInfo parseSinkFieldInfo(SinkFieldResponse sinkFieldResponse, String nodeId) {
+ boolean isBuiltIn = sinkFieldResponse.getIsMetaField() == 1;
+ FieldInfo fieldInfo = getFieldInfo(sinkFieldResponse.getFieldName(), sinkFieldResponse.getFieldType(),
+ isBuiltIn, sinkFieldResponse.getFieldFormat());
+ fieldInfo.setNodeId(nodeId);
+ return fieldInfo;
}
- public static FieldInfo parseStreamFieldInfo(InlongStreamFieldInfo streamField, String name) {
+ public static FieldInfo parseStreamFieldInfo(InlongStreamFieldInfo streamField, String nodeId) {
boolean isBuiltIn = streamField.getIsMetaField() == 1;
FieldInfo fieldInfo = getFieldInfo(streamField.getFieldName(), streamField.getFieldType(), isBuiltIn,
streamField.getFieldFormat());
- fieldInfo.setNodeId(name);
+ fieldInfo.setNodeId(nodeId);
return fieldInfo;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
index 1ebd697ad..c4cae249c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/LoadNodeUtils.java
@@ -73,9 +73,8 @@ public class LoadNodeUtils {
String bootstrapServers = kafkaSinkResponse.getBootstrapServers();
List<SinkFieldResponse> sinkFieldResponses = kafkaSinkResponse.getFieldList();
List<FieldInfo> fieldInfos = sinkFieldResponses.stream()
- .map(sinkFieldResponse -> new FieldInfo(sinkFieldResponse.getFieldName(), name,
- FieldInfoUtils.convertFieldFormat(sinkFieldResponse.getFieldType(),
- sinkFieldResponse.getFieldFormat()))).collect(Collectors.toList());
+ .map(sinkFieldResponse -> FieldInfoUtils.parseSinkFieldInfo(sinkFieldResponse, name))
+ .collect(Collectors.toList());
List<FieldRelationShip> fieldRelationShips = parseSinkFields(sinkFieldResponses, name);
Map<String, String> properties = kafkaSinkResponse.getProperties().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));
@@ -127,9 +126,8 @@ public class LoadNodeUtils {
String hiveVersion = hiveSinkResponse.getHiveVersion();
List<SinkFieldResponse> sinkFieldResponses = hiveSinkResponse.getFieldList();
List<FieldInfo> fields = sinkFieldResponses.stream()
- .map(sinkFieldResponse -> new FieldInfo(sinkFieldResponse.getFieldName(), name,
- FieldInfoUtils.convertFieldFormat(sinkFieldResponse.getFieldType(),
- sinkFieldResponse.getFieldFormat()))).collect(Collectors.toList());
+ .map(sinkFieldResponse -> FieldInfoUtils.parseSinkFieldInfo(sinkFieldResponse, name))
+ .collect(Collectors.toList());
List<FieldRelationShip> fieldRelationShips = parseSinkFields(sinkFieldResponses, name);
Map<String, String> properties = hiveSinkResponse.getProperties().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()));