You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/04/20 12:21:01 UTC
[incubator-inlong] branch master updated: [INLONG-3719][Manager] Support data_transformation feature in Inlong (#3774)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 39e7bc99d [INLONG-3719][Manager] Support data_transformation feature in Inlong (#3774)
39e7bc99d is described below
commit 39e7bc99d0a3ef7d377e833c8401828f0e5762c9
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Wed Apr 20 20:20:56 2022 +0800
[INLONG-3719][Manager] Support data_transformation feature in Inlong (#3774)
* Add constructor to streamTransform
* Add unit tests for stream transform
* Update API doc
* Add processing_time for stream
* resolve circular dependency in manager service
* Update SQL file format
Co-authored-by: healchow <he...@gmail.com>
---
.../manager/client/AutoPush2HiveExample.java | 4 +-
.../inlong/manager/client/File2HiveExample.java | 4 +-
.../inlong/manager/client/Kafka2HiveExample.java | 4 +-
.../inlong/manager/client/cli/CreateGroupConf.java | 2 +-
inlong-manager/manager-client/pom.xml | 6 +
.../inlong/manager/client/api/InlongGroup.java | 10 +
.../inlong/manager/client/api/InlongGroupConf.java | 5 +-
.../inlong/manager/client/api/InlongStream.java | 11 +
.../manager/client/api/InlongStreamBuilder.java | 9 +
.../manager/client/api/InlongStreamConf.java | 1 +
.../inlong/manager/client/api/SinkField.java | 1 +
.../inlong/manager/client/api/StreamSink.java | 3 +-
.../inlong/manager/client/api/StreamSource.java | 3 +-
.../api/{StreamSink.java => StreamTransform.java} | 25 +--
.../manager/client/api/impl/BlankInlongGroup.java | 5 +
.../api/impl/DefaultInlongStreamBuilder.java | 85 +++++--
.../manager/client/api/impl/InlongGroupImpl.java | 5 +
.../manager/client/api/impl/InlongStreamImpl.java | 101 ++++++++-
.../client/api/inner/InnerInlongManagerClient.java | 123 ++++++++++-
.../client/api/inner/InnerStreamContext.java | 6 +
.../api/transform/MultiDependencyTransform.java | 68 ++++++
.../api/transform/SingleDependencyTransform.java | 65 ++++++
.../inlong/manager/client/api/util/AssertUtil.java | 6 +
.../manager/client/api/util/InlongParser.java | 9 +
.../client/api/util/InlongStreamSinkTransfer.java | 2 +-
.../api/util/InlongStreamSourceTransfer.java | 87 +++++---
.../client/api/util/InlongStreamTransfer.java | 2 +-
.../api/util/InlongStreamTransformTransfer.java | 85 +++++++
.../client/api/impl/InlongStreamImplTest.java | 86 ++++++++
.../api/impl/InnerInlongManagerClientTest.java | 1 -
.../inlong/manager/common/enums/Constant.java | 31 ---
.../enums/DBCollectorDetailTaskEntityStatus.java | 1 +
.../common/enums/DBCollectorTaskConstant.java | 1 +
.../common/enums/DBCollectorTaskReturnCode.java | 1 +
.../inlong/manager/common/enums/ErrorCodeEnum.java | 17 +-
.../apache/inlong/manager/common/enums/MQType.java | 2 +-
.../inlong/manager/common/enums/SinkType.java | 3 -
.../inlong/manager/common/enums/SourceType.java | 3 -
.../enums/{MQType.java => TransformType.java} | 45 +++-
.../manager/common/pojo/source/SourceRequest.java | 5 +
.../manager/common/pojo/source/SourceResponse.java | 5 +
.../common/pojo/stream/InlongStreamRequest.java | 3 +
.../manager/common/pojo/stream}/StreamField.java | 11 +-
.../manager/common/pojo/stream/StreamNode.java} | 37 ++--
.../common/pojo/stream/StreamNodeRelationship.java | 51 +++++
.../manager/common/pojo/stream/StreamPipeline.java | 106 +++++++++
.../pojo/transform/TransformDefinition.java} | 31 ++-
.../TransformRequest.java} | 46 ++--
.../common/pojo/transform/TransformResponse.java | 56 +++++
.../deduplication/DeDuplicationDefinition.java | 83 +++++++
.../pojo/transform/filter/FilterDefinition.java | 113 ++++++++++
.../pojo/transform/joiner/JoinerDefinition.java | 79 +++++++
.../replacer/StringReplacerDefinition.java | 65 ++++++
.../transform/splitter/SplitterDefinition.java | 70 ++++++
.../common/settings/InlongGroupSettings.java | 77 +++----
.../common/util/TransformDefinitionUtils.java | 51 +++++
.../common/pojo/stream/StreamPipelineTest.java | 41 ++++
.../pojo/transform/TransformDefinitionTest.java | 99 +++++++++
.../dao/entity/StreamSourceFieldEntity.java} | 46 ++--
.../manager/dao/entity/StreamTransformEntity.java} | 45 ++--
.../dao/mapper/CommonDbServerEntityMapper.java | 1 +
.../dao/mapper/CommonFileServerEntityMapper.java | 1 +
.../dao/mapper/DBCollectorDetailTaskMapper.java | 1 +
...per.java => StreamSourceFieldEntityMapper.java} | 41 ++--
...apper.java => StreamTransformEntityMapper.java} | 22 +-
.../src/main/resources/generatorConfig.xml | 6 +
.../mappers/StreamSourceFieldEntityMapper.xml | 245 +++++++++++++++++++++
.../mappers/StreamTransformEntityMapper.xml | 234 ++++++++++++++++++++
.../manager/service/CommonOperateService.java | 100 +--------
.../core/impl/DBCollectorTaskServiceImpl.java | 1 +
.../service/core/impl/InlongGroupServiceImpl.java | 21 +-
.../service/core/impl/InlongStreamServiceImpl.java | 30 ++-
.../core/impl/StreamConfigLogServiceImpl.java | 13 +-
.../core/impl/ThirdPartyClusterServiceImpl.java | 9 +-
.../service/sink/StreamSinkServiceImpl.java | 122 +++++-----
.../service/sink/ck/ClickHouseSinkOperation.java | 6 +-
.../service/sink/hive/HiveSinkOperation.java | 6 +-
.../service/sink/iceberg/IcebergSinkOperation.java | 6 +-
.../service/sink/kafka/KafkaSinkOperation.java | 6 +-
.../service/source/AbstractSourceOperation.java | 68 +++++-
.../service/source/StreamSourceServiceImpl.java | 27 +--
.../source/autopush/AutoPushSourceOperation.java | 2 +-
.../source/binlog/BinlogSourceOperation.java | 2 +-
.../service/source/file/FileSourceOperation.java | 2 +-
.../service/source/kafka/KafkaSourceOperation.java | 2 +-
.../thirdparty/mq/CreateTubeGroupTaskListener.java | 4 +-
.../service/thirdparty/mq/TubeMqOptService.java | 10 +-
.../thirdparty/sort/CreateSortConfigListener.java | 8 +-
.../thirdparty/sort/PushSortConfigListener.java | 8 +-
.../thirdparty/sort/util/DataFlowUtils.java | 113 ++++++++++
.../service/transform/StreamTransformService.java | 65 ++++++
.../transform/StreamTransformServiceImpl.java | 153 +++++++++++++
.../inlong/manager/service/ServiceBaseTest.java | 6 +-
.../service/core/impl/AgentServiceTest.java | 10 +-
.../core/impl/InlongClusterServiceTest.java | 14 +-
.../transform/StreamTransformServiceTest.java | 73 ++++++
.../src/main/resources/application-test.properties | 1 +
.../main/resources/sql/apache_inlong_manager.sql | 48 ++++
.../manager-web/sql/apache_inlong_manager.sql | 49 +++++
.../web/controller/StreamTransformController.java | 84 +++++++
100 files changed, 3124 insertions(+), 544 deletions(-)
diff --git a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/AutoPush2HiveExample.java b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/AutoPush2HiveExample.java
index e0857e5c0..72567bd8d 100644
--- a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/AutoPush2HiveExample.java
+++ b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/AutoPush2HiveExample.java
@@ -17,7 +17,7 @@
package org.apache.inlong.manager.client;
-import org.apache.commons.compress.utils.Lists;
+import com.google.common.collect.Lists;
import org.apache.inlong.manager.client.api.ClientConfiguration;
import org.apache.inlong.manager.client.api.DataSeparator;
import org.apache.inlong.manager.client.api.FlinkSortBaseConf;
@@ -29,7 +29,7 @@ import org.apache.inlong.manager.client.api.InlongStreamBuilder;
import org.apache.inlong.manager.client.api.InlongStreamConf;
import org.apache.inlong.manager.client.api.PulsarBaseConf;
import org.apache.inlong.manager.client.api.SinkField;
-import org.apache.inlong.manager.client.api.StreamField;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
import org.apache.inlong.manager.client.api.sink.HiveSink;
import org.apache.inlong.manager.client.api.source.AutoPushSource;
diff --git a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2HiveExample.java b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2HiveExample.java
index 9f3d6b9f8..af9a79968 100644
--- a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2HiveExample.java
+++ b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2HiveExample.java
@@ -17,7 +17,7 @@
package org.apache.inlong.manager.client;
-import org.apache.commons.compress.utils.Lists;
+import com.google.common.collect.Lists;
import org.apache.inlong.manager.client.api.ClientConfiguration;
import org.apache.inlong.manager.client.api.DataSeparator;
import org.apache.inlong.manager.client.api.FlinkSortBaseConf;
@@ -29,7 +29,7 @@ import org.apache.inlong.manager.client.api.InlongStreamBuilder;
import org.apache.inlong.manager.client.api.InlongStreamConf;
import org.apache.inlong.manager.client.api.PulsarBaseConf;
import org.apache.inlong.manager.client.api.SinkField;
-import org.apache.inlong.manager.client.api.StreamField;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
import org.apache.inlong.manager.client.api.sink.HiveSink;
import org.apache.inlong.manager.client.api.source.AgentFileSource;
diff --git a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Kafka2HiveExample.java b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Kafka2HiveExample.java
index 4bd4b3573..71f359ee3 100644
--- a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Kafka2HiveExample.java
+++ b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Kafka2HiveExample.java
@@ -17,8 +17,8 @@
package org.apache.inlong.manager.client;
+import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.compress.utils.Lists;
import org.apache.inlong.manager.client.api.ClientConfiguration;
import org.apache.inlong.manager.client.api.DataFormat;
import org.apache.inlong.manager.client.api.DataSeparator;
@@ -31,7 +31,7 @@ import org.apache.inlong.manager.client.api.InlongStreamBuilder;
import org.apache.inlong.manager.client.api.InlongStreamConf;
import org.apache.inlong.manager.client.api.PulsarBaseConf;
import org.apache.inlong.manager.client.api.SinkField;
-import org.apache.inlong.manager.client.api.StreamField;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
import org.apache.inlong.manager.client.api.sink.HiveSink;
import org.apache.inlong.manager.client.api.source.KafkaSource;
diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateGroupConf.java b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateGroupConf.java
index 21bb90d30..dcab288c8 100644
--- a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateGroupConf.java
+++ b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateGroupConf.java
@@ -20,7 +20,7 @@ package org.apache.inlong.manager.client.cli;
import lombok.Data;
import org.apache.inlong.manager.client.api.InlongGroupConf;
import org.apache.inlong.manager.client.api.InlongStreamConf;
-import org.apache.inlong.manager.client.api.StreamField;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
import org.apache.inlong.manager.client.api.StreamSink;
import org.apache.inlong.manager.client.api.StreamSource;
diff --git a/inlong-manager/manager-client/pom.xml b/inlong-manager/manager-client/pom.xml
index 9985f26e3..0b893d37e 100644
--- a/inlong-manager/manager-client/pom.xml
+++ b/inlong-manager/manager-client/pom.xml
@@ -32,6 +32,12 @@
<groupId>org.apache.inlong</groupId>
<artifactId>manager-common</artifactId>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroup.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroup.java
index 94f94c77f..c299e4867 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroup.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroup.java
@@ -53,12 +53,22 @@ public interface InlongGroup {
*/
void update(InlongGroupConf conf) throws Exception;
+ /**
+ * ReInit inlong group after update configuration for group.
+ * Must be invoked when group is rejected,failed or started
+ *
+ * @return inlong group info
+ */
+ InlongGroupContext reInitOnUpdate(InlongGroupConf conf) throws Exception;
+
/**
* Init inlong group on updated conf.
* Must be invoked when group is rejected,failed or started
+ * This method is deprecated, recommend to use reInitOnUpdate
*
* @return inlong group info
*/
+ @Deprecated
InlongGroupContext initOnUpdate(InlongGroupConf conf) throws Exception;
/**
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupConf.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupConf.java
index bef4dccac..e6631a0f9 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupConf.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupConf.java
@@ -59,6 +59,9 @@ public class InlongGroupConf {
@ApiModelProperty("Need zookeeper support")
private boolean zookeeperEnabled = true;
- @ApiModelProperty("data proxy cluster id")
+ @ApiModelProperty("Data proxy cluster id")
private Integer proxyClusterId;
+
+ @ApiModelProperty("Use light weight group")
+ private boolean lightWeight = false;
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStream.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStream.java
index ab9c17da8..cf18ac91a 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStream.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStream.java
@@ -17,6 +17,9 @@
package org.apache.inlong.manager.client.api;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
+import org.apache.inlong.manager.common.pojo.stream.StreamPipeline;
+
import java.util.List;
import java.util.Map;
@@ -30,11 +33,19 @@ public abstract class InlongStream {
public abstract Map<String, StreamSink> getSinks();
+ public abstract Map<String, StreamTransform> getTransforms();
+
public abstract void addSource(StreamSource source);
public abstract void addSink(StreamSink sink);
+ public abstract void addTransform(StreamTransform transform);
+
+ public abstract StreamPipeline createPipeline();
+
+ @Deprecated
public abstract void updateSource(StreamSource source);
+ @Deprecated
public abstract void updateSink(StreamSink sink);
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStreamBuilder.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStreamBuilder.java
index 9972c2979..f7ee29f68 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStreamBuilder.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStreamBuilder.java
@@ -17,6 +17,8 @@
package org.apache.inlong.manager.client.api;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
+
import java.util.List;
public abstract class InlongStreamBuilder {
@@ -43,6 +45,13 @@ public abstract class InlongStreamBuilder {
*/
public abstract InlongStreamBuilder fields(List<StreamField> fieldList);
+ /**
+ * Create stream transform
+ *
+ * @return inlong stream builder
+ */
+ public abstract InlongStreamBuilder transform(StreamTransform streamTransform);
+
/**
* Create data stream by builder
*
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStreamConf.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStreamConf.java
index 2a4a3e9c4..8d7065c2f 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStreamConf.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStreamConf.java
@@ -22,6 +22,7 @@ import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SinkField.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SinkField.java
index acb8c7130..70ad16e03 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SinkField.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SinkField.java
@@ -23,6 +23,7 @@ import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import org.apache.inlong.manager.common.enums.FieldType;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
@Data
@EqualsAndHashCode(callSuper = true)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
index 0c18cd558..d5762e477 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
@@ -21,13 +21,14 @@ import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.pojo.stream.StreamNode;
import java.util.List;
import java.util.Map;
@Data
@ApiModel("Stream sink configuration")
-public abstract class StreamSink {
+public abstract class StreamSink extends StreamNode {
@ApiModelProperty(value = "DataSink name", required = true)
private String sinkName;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java
index dca118310..754cd7bb9 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java
@@ -22,10 +22,11 @@ import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import org.apache.inlong.manager.common.enums.SourceState;
import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.stream.StreamNode;
@Data
@ApiModel("Stream source configuration")
-public abstract class StreamSource {
+public abstract class StreamSource extends StreamNode {
public enum State {
INIT, NORMAL, FROZING, FROZEN, FAILED, DELETING, DELETE;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamTransform.java
similarity index 65%
copy from inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
copy to inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamTransform.java
index 0c18cd558..4c41ad813 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamTransform.java
@@ -20,25 +20,16 @@ package org.apache.inlong.manager.client.api;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
-import org.apache.inlong.manager.common.enums.SinkType;
-
-import java.util.List;
-import java.util.Map;
+import org.apache.inlong.manager.common.pojo.stream.StreamNode;
+import org.apache.inlong.manager.common.pojo.transform.TransformDefinition;
@Data
-@ApiModel("Stream sink configuration")
-public abstract class StreamSink {
-
- @ApiModelProperty(value = "DataSink name", required = true)
- private String sinkName;
-
- @ApiModelProperty("Other properties if need")
- private Map<String, Object> properties;
-
- public abstract SinkType getSinkType();
-
- public abstract List<SinkField> getSinkFields();
+@ApiModel("Stream Transform configuration")
+public abstract class StreamTransform extends StreamNode {
- public abstract DataFormat getDataFormat();
+ @ApiModelProperty(value = "Transform name", required = true)
+ protected String transformName;
+ @ApiModelProperty(value = "Transform name", required = true)
+ protected TransformDefinition transformDefinition;
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java
index d94521f4e..633ffd514 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java
@@ -48,6 +48,11 @@ public class BlankInlongGroup implements InlongGroup {
throw new UnsupportedOperationException("Inlong group is not exists");
}
+ @Override
+ public InlongGroupContext reInitOnUpdate(InlongGroupConf conf) throws Exception {
+ throw new UnsupportedOperationException("Inlong group is not exists");
+ }
+
@Override
public InlongGroupContext initOnUpdate(InlongGroupConf conf) throws Exception {
throw new UnsupportedOperationException("Inlong group is not exists");
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
index cd248ffee..92f7772eb 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
@@ -25,9 +25,9 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.inlong.manager.client.api.InlongStream;
import org.apache.inlong.manager.client.api.InlongStreamBuilder;
import org.apache.inlong.manager.client.api.InlongStreamConf;
-import org.apache.inlong.manager.client.api.StreamField;
import org.apache.inlong.manager.client.api.StreamSink;
import org.apache.inlong.manager.client.api.StreamSource;
+import org.apache.inlong.manager.client.api.StreamTransform;
import org.apache.inlong.manager.client.api.inner.InnerGroupContext;
import org.apache.inlong.manager.client.api.inner.InnerInlongManagerClient;
import org.apache.inlong.manager.client.api.inner.InnerStreamContext;
@@ -35,6 +35,7 @@ import org.apache.inlong.manager.client.api.util.GsonUtil;
import org.apache.inlong.manager.client.api.util.InlongStreamSinkTransfer;
import org.apache.inlong.manager.client.api.util.InlongStreamSourceTransfer;
import org.apache.inlong.manager.client.api.util.InlongStreamTransfer;
+import org.apache.inlong.manager.client.api.util.InlongStreamTransformTransfer;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
@@ -43,17 +44,18 @@ import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
+import org.apache.inlong.manager.common.pojo.stream.StreamPipeline;
+import org.apache.inlong.manager.common.pojo.transform.TransformRequest;
+import org.apache.inlong.manager.common.pojo.transform.TransformResponse;
import java.util.List;
+import java.util.Map;
public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
private InlongStreamImpl inlongStream;
- private InlongStreamConf streamConf;
-
- private InnerGroupContext groupContext;
-
private InnerStreamContext streamContext;
private InnerInlongManagerClient managerClient;
@@ -62,8 +64,6 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
InlongStreamConf streamConf,
InnerGroupContext groupContext,
InnerInlongManagerClient managerClient) {
- this.streamConf = streamConf;
- this.groupContext = groupContext;
this.managerClient = managerClient;
if (MapUtils.isEmpty(groupContext.getStreamContextMap())) {
groupContext.setStreamContextMap(Maps.newHashMap());
@@ -105,9 +105,20 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
return this;
}
+ @Override
+ public InlongStreamBuilder transform(StreamTransform streamTransform) {
+ inlongStream.addTransform(streamTransform);
+ TransformRequest transformRequest = InlongStreamTransformTransfer.createTransformRequest(streamTransform,
+ streamContext.getStreamInfo());
+ streamContext.setTransformRequest(transformRequest);
+ return this;
+ }
+
@Override
public InlongStream init() {
InlongStreamInfo streamInfo = streamContext.getStreamInfo();
+ StreamPipeline streamPipeline = inlongStream.createPipeline();
+ streamInfo.setTempView(GsonUtil.toJson(streamPipeline));
String streamIndex = managerClient.createStreamInfo(streamInfo);
streamInfo.setId(Double.valueOf(streamIndex).intValue());
//Create source and update index
@@ -122,24 +133,27 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
String sinkIndex = managerClient.createSink(sinkRequest);
sinkRequest.setId(Double.valueOf(sinkIndex).intValue());
}
+ //Create transform and update index
+ List<TransformRequest> transformRequests = Lists.newArrayList(streamContext.getTransformRequests().values());
+ for (TransformRequest transformRequest : transformRequests) {
+ String transformIndex = managerClient.createTransform(transformRequest);
+ transformRequest.setId(Double.valueOf(transformIndex).intValue());
+ }
return inlongStream;
}
@Override
public InlongStream initOrUpdate() {
InlongStreamInfo dataStreamInfo = streamContext.getStreamInfo();
+ StreamPipeline streamPipeline = inlongStream.createPipeline();
+ dataStreamInfo.setTempView(GsonUtil.toJson(streamPipeline));
Pair<Boolean, InlongStreamInfo> existMsg = managerClient.isStreamExists(dataStreamInfo);
if (existMsg.getKey()) {
Pair<Boolean, String> updateMsg = managerClient.updateStreamInfo(dataStreamInfo);
if (updateMsg.getKey()) {
- List<SourceRequest> sourceRequests = Lists.newArrayList(streamContext.getSourceRequests().values());
- for (SourceRequest sourceRequest : sourceRequests) {
- sourceRequest.setId(initOrUpdateSource(sourceRequest));
- }
- List<SinkRequest> sinkRequests = Lists.newArrayList(streamContext.getSinkRequests().values());
- for (SinkRequest sinkRequest : sinkRequests) {
- sinkRequest.setId(initOrUpdateSink(sinkRequest));
- }
+ initOrUpdateTransform();
+ initOrUpdateSource();
+ initOrUpdateSink();
} else {
throw new RuntimeException(String.format("Update data stream failed:%s", updateMsg.getValue()));
}
@@ -149,6 +163,40 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
}
}
+ private void initOrUpdateTransform() {
+ Map<String, TransformRequest> transformRequests = streamContext.getTransformRequests();
+ InlongStreamInfo streamInfo = streamContext.getStreamInfo();
+ final String groupId = streamInfo.getInlongGroupId();
+ final String streamId = streamInfo.getInlongStreamId();
+ List<TransformResponse> transformResponses = managerClient.listTransform(groupId, streamId);
+ for (TransformResponse transformResponse : transformResponses) {
+ StreamTransform transform = InlongStreamTransformTransfer.parseStreamTransform(transformResponse);
+ String transformName = transform.getTransformName();
+ if (transformRequests.get(transformName) == null) {
+ TransformRequest transformRequest = InlongStreamTransformTransfer.createTransformRequest(transform,
+ streamInfo);
+ boolean isDelete = managerClient.deleteTransform(transformRequest);
+ if (!isDelete) {
+ throw new RuntimeException(String.format("Delete transform=%s failed", transformRequest));
+ }
+ } else {
+ TransformRequest transformRequest = transformRequests.get(transformName);
+ Pair<Boolean, String> updateState = managerClient.updateTransform(transformRequest);
+ if (!updateState.getKey()) {
+ throw new RuntimeException(String.format("Update transform=%s failed with err=%s", transformRequest,
+ updateState.getValue()));
+ }
+ }
+ }
+ }
+
+ private void initOrUpdateSource() {
+ List<SourceRequest> sourceRequests = Lists.newArrayList(streamContext.getSourceRequests().values());
+ for (SourceRequest sourceRequest : sourceRequests) {
+ sourceRequest.setId(initOrUpdateSource(sourceRequest));
+ }
+ }
+
private int initOrUpdateSource(SourceRequest sourceRequest) {
String sourceType = sourceRequest.getSourceType();
if (SourceType.KAFKA.name().equals(sourceType) || SourceType.BINLOG.name().equals(sourceType)) {
@@ -184,6 +232,13 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
}
}
+ private void initOrUpdateSink() {
+ List<SinkRequest> sinkRequests = Lists.newArrayList(streamContext.getSinkRequests().values());
+ for (SinkRequest sinkRequest : sinkRequests) {
+ sinkRequest.setId(initOrUpdateSink(sinkRequest));
+ }
+ }
+
private int initOrUpdateSink(SinkRequest sinkRequest) {
String sinkType = sinkRequest.getSinkType();
boolean flag = SinkType.HIVE.name().equals(sinkType) || SinkType.KAFKA.name().equals(sinkType)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
index b8bb047ce..797a48b35 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
@@ -139,6 +139,11 @@ public class InlongGroupImpl implements InlongGroup {
AssertUtil.isNull(errMsg, errMsg);
}
+ @Override
+ public InlongGroupContext reInitOnUpdate(InlongGroupConf conf) throws Exception {
+ return initOnUpdate(conf);
+ }
+
@Override
public InlongGroupContext initOnUpdate(InlongGroupConf conf) throws Exception {
update(conf);
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
index 3991d825f..b3be9ff97 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
@@ -17,16 +17,19 @@
package org.apache.inlong.manager.client.api.impl;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.compress.utils.Lists;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.inlong.manager.client.api.InlongStream;
-import org.apache.inlong.manager.client.api.StreamField;
import org.apache.inlong.manager.client.api.StreamSink;
import org.apache.inlong.manager.client.api.StreamSource;
+import org.apache.inlong.manager.client.api.StreamTransform;
import org.apache.inlong.manager.client.api.util.AssertUtil;
import org.apache.inlong.manager.client.api.util.InlongStreamSinkTransfer;
import org.apache.inlong.manager.client.api.util.InlongStreamSourceTransfer;
@@ -36,9 +39,13 @@ import org.apache.inlong.manager.common.pojo.source.SourceResponse;
import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
+import org.apache.inlong.manager.common.pojo.stream.StreamNodeRelationship;
+import org.apache.inlong.manager.common.pojo.stream.StreamPipeline;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
@Data
@@ -52,6 +59,8 @@ public class InlongStreamImpl extends InlongStream {
private Map<String, StreamSink> streamSinks = Maps.newHashMap();
+ private Map<String, StreamTransform> streamTransforms = Maps.newHashMap();
+
private List<StreamField> streamFields = Lists.newArrayList();
public InlongStreamImpl(FullStreamResponse fullStreamResponse) {
@@ -116,6 +125,11 @@ public class InlongStreamImpl extends InlongStream {
return this.streamSinks;
}
+ @Override
+ public Map<String, StreamTransform> getTransforms() {
+ return this.streamTransforms;
+ }
+
@Override
public void addSource(StreamSource source) {
AssertUtil.notNull(source.getSourceName(), "Source name should not be empty");
@@ -136,6 +150,89 @@ public class InlongStreamImpl extends InlongStream {
streamSinks.put(sinkName, sink);
}
+ @Override
+ public void addTransform(StreamTransform transform) {
+ AssertUtil.notNull(transform.getTransformName(), "Transform name should not be empty");
+ String transformName = transform.getTransformName();
+ if (streamTransforms.get(transformName) != null) {
+ throw new IllegalArgumentException(String.format("TransformName=%s has already be set", transform));
+ }
+ streamTransforms.put(transformName, transform);
+ }
+
+ @Override
+ public StreamPipeline createPipeline() {
+ StreamPipeline streamPipeline = new StreamPipeline();
+ if (MapUtils.isEmpty(streamTransforms)) {
+ StreamNodeRelationship relationship = new StreamNodeRelationship();
+ relationship.setInputNodes(streamSources.keySet());
+ relationship.setOutputNodes(streamSinks.keySet());
+ streamPipeline.setPipeline(Lists.newArrayList(relationship));
+ return streamPipeline;
+ }
+ Map<Set<String>, List<StreamNodeRelationship>> relationshipMap = Maps.newHashMap();
+ // Create StreamNodeRelationships
+ // Check preNodes
+ for (StreamTransform streamTransform : streamTransforms.values()) {
+ String transformName = streamTransform.getTransformName();
+ Set<String> preNodes = streamTransform.getPreNodes();
+ StreamNodeRelationship relationship = new StreamNodeRelationship();
+ relationship.setInputNodes(preNodes);
+ relationship.setOutputNodes(Sets.newHashSet(transformName));
+ for (String preNode : preNodes) {
+ StreamTransform transform = streamTransforms.get(preNode);
+ if (transform != null) {
+ transform.addPost(transformName);
+ }
+ }
+ relationshipMap.computeIfAbsent(preNodes, key -> Lists.newArrayList()).add(relationship);
+ }
+ // Check postNodes
+ for (StreamTransform streamTransform : streamTransforms.values()) {
+ String transformName = streamTransform.getTransformName();
+ Set<String> postNodes = streamTransform.getPostNodes();
+ Set<String> sinkSet = Sets.newHashSet();
+ for (String postNode : postNodes) {
+ StreamSink sink = streamSinks.get(postNode);
+ if (sink != null) {
+ sinkSet.add(sink.getSinkName());
+ }
+ }
+ if (CollectionUtils.isNotEmpty(sinkSet)) {
+ StreamNodeRelationship relationship = new StreamNodeRelationship();
+ Set<String> preNodes = Sets.newHashSet(transformName);
+ relationship.setInputNodes(preNodes);
+ relationship.setOutputNodes(sinkSet);
+ relationshipMap.computeIfAbsent(preNodes, key -> Lists.newArrayList()).add(relationship);
+ }
+ }
+ List<StreamNodeRelationship> relationships = Lists.newArrayList();
+ // Merge StreamNodeRelationship with same preNodes
+ for (Map.Entry<Set<String>, List<StreamNodeRelationship>> entry : relationshipMap.entrySet()) {
+ List<StreamNodeRelationship> unmergedRelationships = entry.getValue();
+ if (unmergedRelationships.size() == 1) {
+ relationships.add(unmergedRelationships.get(0));
+ } else {
+ StreamNodeRelationship mergedRelationship = unmergedRelationships.get(0);
+ for (int index = 1; index < unmergedRelationships.size(); index++) {
+ StreamNodeRelationship unmergedRelationship = unmergedRelationships.get(index);
+ unmergedRelationship.getOutputNodes().stream()
+ .forEach(outputNode -> mergedRelationship.addOutputNode(outputNode));
+ }
+ relationships.add(mergedRelationship);
+ }
+ }
+ streamPipeline.setPipeline(relationships);
+ Pair<Boolean, Pair<String, String>> circleState = streamPipeline.hasCircle();
+ if (circleState.getLeft()) {
+ Pair<String, String> circleNodes = circleState.getRight();
+ throw new IllegalStateException(
+ String.format("There is circle dependency in streamPipeline for node=%s and node=%s",
+ circleNodes.getLeft(), circleNodes.getRight()));
+ }
+ return streamPipeline;
+ }
+
@Override
public void updateSource(StreamSource source) {
AssertUtil.notNull(source.getSourceName(), "Source name should not be empty");
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
index b957b3d9f..e8a2a6dd1 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
@@ -51,6 +51,8 @@ import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogListResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamPageRequest;
+import org.apache.inlong.manager.common.pojo.transform.TransformRequest;
+import org.apache.inlong.manager.common.pojo.transform.TransformResponse;
import org.apache.inlong.manager.common.pojo.workflow.EventLogView;
import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
import org.apache.inlong.manager.common.util.JsonUtils;
@@ -63,13 +65,13 @@ import java.util.List;
@Slf4j
public class InnerInlongManagerClient {
- private static final String HTTP_PATH = "api/inlong/manager";
+ protected static final String HTTP_PATH = "api/inlong/manager";
- private final OkHttpClient httpClient;
- private final String host;
- private final int port;
- private final String uname;
- private final String passwd;
+ protected final OkHttpClient httpClient;
+ protected final String host;
+ protected final int port;
+ protected final String uname;
+ protected final String passwd;
public InnerInlongManagerClient(ClientConfiguration configuration) {
this.host = configuration.getBindHost();
@@ -479,6 +481,111 @@ public class InnerInlongManagerClient {
}
}
+ public String createTransform(TransformRequest transformRequest) {
+ String path = HTTP_PATH + "/transform/save";
+ final String sink = GsonUtil.toJson(transformRequest);
+ final RequestBody transformBody = RequestBody.create(MediaType.parse("application/json"), sink);
+ final String url = formatUrl(path);
+ Request request = new Request.Builder()
+ .url(url)
+ .method("POST", transformBody)
+ .build();
+
+ Call call = httpClient.newCall(request);
+ try {
+ okhttp3.Response response = call.execute();
+ assert response.body() != null;
+ String body = response.body().string();
+ assertHttpSuccess(response, body, path);
+ Response responseBody = InlongParser.parseResponse(body);
+ AssertUtil.isTrue(responseBody.getErrMsg() == null,
+ String.format("Inlong request failed: %s", responseBody.getErrMsg()));
+ return responseBody.getData().toString();
+ } catch (Exception e) {
+ throw new RuntimeException(String.format("Inlong transform save failed: %s", e.getMessage()), e);
+ }
+ }
+
+ public List<TransformResponse> listTransform(String groupId, String streamId) {
+ final String path = HTTP_PATH + "/transform/list";
+ String url = formatUrl(path);
+ url = String.format("%s&inlongGroupId=%s&inlongStreamId=%s", url, groupId, streamId);
+ Request request = new Request.Builder().get()
+ .url(url)
+ .build();
+
+ Call call = httpClient.newCall(request);
+ try {
+ okhttp3.Response response = call.execute();
+ String body = response.body().string();
+ assertHttpSuccess(response, body, path);
+ Response responseBody = InlongParser.parseResponse(body);
+ AssertUtil.isTrue(responseBody.getErrMsg() == null,
+ String.format("Inlong request failed:%s", responseBody.getErrMsg()));
+ return InlongParser.parseTransformList(responseBody);
+ } catch (Exception e) {
+ throw new RuntimeException(String.format("Inlong transform list failed with ex:%s", e.getMessage()), e);
+ }
+ }
+
+ public Pair<Boolean, String> updateTransform(TransformRequest transformRequest) {
+ final String path = HTTP_PATH + "/transform/update";
+ final String url = formatUrl(path);
+ final String transform = GsonUtil.toJson(transformRequest);
+ final RequestBody storageBody = RequestBody.create(MediaType.parse("application/json"), transform);
+ Request request = new Request.Builder()
+ .method("POST", storageBody)
+ .url(url)
+ .build();
+
+ Call call = httpClient.newCall(request);
+ try {
+ okhttp3.Response response = call.execute();
+ String body = response.body().string();
+ assertHttpSuccess(response, body, path);
+ Response responseBody = InlongParser.parseResponse(body);
+ if (responseBody.getData() != null) {
+ return Pair.of(Boolean.valueOf(responseBody.getData().toString()), responseBody.getErrMsg());
+ } else {
+ return Pair.of(false, responseBody.getErrMsg());
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(String.format("Inlong transform update failed with ex:%s", e.getMessage()), e);
+ }
+ }
+
+ public boolean deleteTransform(TransformRequest transformRequest) {
+ AssertUtil.notEmpty(transformRequest.getInlongGroupId(), "inlongGroupId should not be null");
+ AssertUtil.notEmpty(transformRequest.getInlongStreamId(), "inlongStreamId should not be null");
+ AssertUtil.notEmpty(transformRequest.getTransformName(), "transformName should not be null");
+ final String path = HTTP_PATH + "/transform/delete";
+ String url = formatUrl(path);
+ url = String.format("%s&inlongGroupId=%s&inlongStreamId=%s&transformName=%s", url,
+ transformRequest.getInlongGroupId(),
+ transformRequest.getInlongStreamId(),
+ transformRequest.getTransformName());
+ RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), "");
+ Request request = new Request.Builder()
+ .url(url)
+ .method("DELETE", requestBody)
+ .build();
+
+ Call call = httpClient.newCall(request);
+ try {
+ okhttp3.Response response = call.execute();
+ assert response.body() != null;
+ String body = response.body().string();
+ assertHttpSuccess(response, body, path);
+ Response responseBody = InlongParser.parseResponse(body);
+ AssertUtil.isTrue(responseBody.getErrMsg() == null,
+ String.format("Inlong request failed: %s", responseBody.getErrMsg()));
+ return Boolean.parseBoolean(responseBody.getData().toString());
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("Inlong transform delete failed: %s", e.getMessage()), e);
+ }
+ }
+
public String createSink(SinkRequest sinkRequest) {
String path = HTTP_PATH + "/sink/save";
final String sink = GsonUtil.toJson(sinkRequest);
@@ -758,11 +865,11 @@ public class InnerInlongManagerClient {
}
}
- private void assertHttpSuccess(okhttp3.Response response, String body, String path) {
+ protected void assertHttpSuccess(okhttp3.Response response, String body, String path) {
AssertUtil.isTrue(response.isSuccessful(), String.format("Inlong request=%s failed: %s", path, body));
}
- private String formatUrl(String path) {
+ protected String formatUrl(String path) {
return String.format("http://%s:%s/%s?username=%s&password=%s", host, port, path, uname, passwd);
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerStreamContext.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerStreamContext.java
index 5b85758b3..86220b5dc 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerStreamContext.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerStreamContext.java
@@ -24,6 +24,7 @@ import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.transform.TransformRequest;
import java.util.List;
import java.util.Map;
@@ -38,6 +39,8 @@ public class InnerStreamContext {
private Map<String, SinkRequest> sinkRequests = Maps.newHashMap();
+ private Map<String, TransformRequest> transformRequests = Maps.newHashMap();
+
public InnerStreamContext(InlongStreamInfo streamInfo) {
this.streamInfo = streamInfo;
}
@@ -54,4 +57,7 @@ public class InnerStreamContext {
this.sinkRequests.put(sinkRequest.getSinkName(), sinkRequest);
}
+ public void setTransformRequest(TransformRequest transformRequest) {
+ this.transformRequests.put(transformRequest.getTransformName(), transformRequest);
+ }
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/transform/MultiDependencyTransform.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/transform/MultiDependencyTransform.java
new file mode 100644
index 000000000..11d5a466b
--- /dev/null
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/transform/MultiDependencyTransform.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.transform;
+
+import io.swagger.annotations.ApiModel;
+import org.apache.inlong.manager.client.api.StreamTransform;
+import org.apache.inlong.manager.client.api.util.AssertUtil;
+import org.apache.inlong.manager.common.pojo.transform.TransformDefinition;
+
+import java.util.List;
+
+@ApiModel("StreamTransform with multiple pre stream nodes, such as join")
+public class MultiDependencyTransform extends StreamTransform {
+
+ /**
+ * Constructor of MultiDependencyTransform
+ *
+ * @param transformName
+ * @param transformDefinition
+ * @param preNodes name of pre streamNodes, if pre streamNode is streamSource, then preNode is sourceName
+ * if pre streamNode is streamTransform, preNode is transformName
+ */
+ public MultiDependencyTransform(String transformName, TransformDefinition transformDefinition, String... preNodes) {
+ AssertUtil.notNull(transformDefinition, "TransformDefinition should not be null");
+ this.transformDefinition = transformDefinition;
+ AssertUtil.notNull(transformName, "TransformName should not be empty");
+ this.transformName = transformName;
+ AssertUtil.noNullElements(preNodes, "Pre streamNode should not be null");
+ for (String preNode : preNodes) {
+ this.addPre(preNode);
+ }
+ }
+
+ /**
+ * Constructor of MultiDependencyTransform
+ *
+ * @param transformName
+ * @param transformDefinition
+ * @param preNodes name of pre streamNodes, if pre streamNode is streamSource, then preNode is sourceName
+ * if pre streamNode is streamTransform, preNode is transformName
+ * @param postNodes postNodes name of post streamNode, if post streamNode is streamSource, then postNode is
+ * sourceName, if post streamNode is streamTransform, postNode is transformName
+ */
+ public MultiDependencyTransform(String transformName, TransformDefinition transformDefinition,
+ List<String> preNodes, List<String> postNodes) {
+ this(transformName, transformDefinition, preNodes.toArray(new String[preNodes.size()]));
+ if (postNodes != null) {
+ for (String postNode : postNodes) {
+ this.addPost(postNode);
+ }
+ }
+ }
+}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/transform/SingleDependencyTransform.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/transform/SingleDependencyTransform.java
new file mode 100644
index 000000000..e58851c05
--- /dev/null
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/transform/SingleDependencyTransform.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.transform;
+
+import io.swagger.annotations.ApiModel;
+import org.apache.inlong.manager.client.api.StreamTransform;
+import org.apache.inlong.manager.client.api.util.AssertUtil;
+import org.apache.inlong.manager.common.pojo.transform.TransformDefinition;
+
+@ApiModel("StreamTransform with one pre stream node, such as filter, splitter, etc")
+public class SingleDependencyTransform extends StreamTransform {
+
+ /**
+ * Constructor of SingleDependencyTransform
+ *
+ * @param transformName
+ * @param transformDefinition
+ * @param preNode name of pre streamNode, if pre streamNode is streamSource, then preNode is sourceName
+ * if pre streamNode is streamTransform, preNode is transformName
+ */
+ public SingleDependencyTransform(String transformName, TransformDefinition transformDefinition, String preNode) {
+ AssertUtil.notNull(transformDefinition, "TransformDefinition should not be null");
+ this.transformDefinition = transformDefinition;
+ AssertUtil.notNull(transformName, "TransformName should not be empty");
+ this.transformName = transformName;
+ AssertUtil.notNull(preNode, "Pre streamNode should not be null");
+ this.addPre(preNode);
+ }
+
+ /**
+ * Constructor of SingleDependencyTransform
+ *
+ * @param transformName
+ * @param transformDefinition
+ * @param preNode name of pre streamNode, if pre streamNode is streamSource, then preNode is sourceName
+ * if pre streamNode is streamTransform, preNode is transformName
+ * @param postNodes name of post streamNode, if post streamNode is streamSource, then postNode is sourceName
+ * if post streamNode is streamTransform, postNode is transformName
+ */
+ public SingleDependencyTransform(String transformName, TransformDefinition transformDefinition, String preNode,
+ String... postNodes) {
+ this(transformName, transformDefinition, preNode);
+ if (postNodes != null) {
+ for (String postNode : postNodes) {
+ this.addPost(postNode);
+ }
+ }
+ }
+
+}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/AssertUtil.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/AssertUtil.java
index 86ac32d04..90ce0478a 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/AssertUtil.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/AssertUtil.java
@@ -107,6 +107,12 @@ public class AssertUtil {
}
}
+ public static void notEmpty(String obj, String message) {
+ if (StringUtils.isEmpty(obj)) {
+ throw new IllegalStateException(message);
+ }
+ }
+
public static void notEmpty(Object[] array) {
notEmpty(array, "[Assertion failed] - this array must not be empty: it must contain at least 1 element");
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
index ba100dcc2..a161d1da9 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
@@ -52,6 +52,7 @@ import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogListResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.transform.TransformResponse;
import org.apache.inlong.manager.common.pojo.workflow.EventLogView;
import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
@@ -225,6 +226,14 @@ public class InlongParser {
}
}
+ public static List<TransformResponse> parseTransformList(Response response) {
+ Object data = response.getData();
+ String pageInfoJson = GsonUtil.toJson(data);
+ return GsonUtil.fromJson(pageInfoJson,
+ new TypeToken<List<TransformResponse>>() {
+ }.getType());
+ }
+
public static PageInfo<SinkListResponse> parseSinkList(Response response) {
Object data = response.getData();
String pageInfoJson = GsonUtil.toJson(data);
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
index 590b9fbae..9fe498b85 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
@@ -17,8 +17,8 @@
package org.apache.inlong.manager.client.api.util;
+import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.compress.utils.Lists;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.client.api.DataFormat;
import org.apache.inlong.manager.client.api.DataSeparator;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
index 2977df9ae..cc9106d70 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
@@ -46,9 +46,14 @@ import org.apache.inlong.manager.common.pojo.source.file.FileSourceResponse;
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceListResponse;
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceRequest;
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceResponse;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
/**
* Transfer the inlong stream source.
@@ -107,42 +112,43 @@ public class InlongStreamSourceTransfer {
throw new IllegalArgumentException(String.format("Unsupported source type : %s for Inlong", sourceType));
}
- private static KafkaSource parseKafkaSource(KafkaSourceResponse kafkaSourceResponse) {
+ private static KafkaSource parseKafkaSource(KafkaSourceResponse response) {
KafkaSource kafkaSource = new KafkaSource();
- kafkaSource.setSourceName(kafkaSourceResponse.getSourceName());
- kafkaSource.setConsumerGroup(kafkaSourceResponse.getGroupId());
- DataFormat dataFormat = DataFormat.forName(kafkaSourceResponse.getSerializationType());
+ kafkaSource.setSourceName(response.getSourceName());
+ kafkaSource.setConsumerGroup(response.getGroupId());
+ DataFormat dataFormat = DataFormat.forName(response.getSerializationType());
kafkaSource.setDataFormat(dataFormat);
- kafkaSource.setState(State.parseByStatus(kafkaSourceResponse.getStatus()));
- kafkaSource.setAgentIp(kafkaSourceResponse.getAgentIp());
- kafkaSource.setTopic(kafkaSourceResponse.getTopic());
- kafkaSource.setBootstrapServers(kafkaSourceResponse.getBootstrapServers());
- kafkaSource.setByteSpeedLimit(kafkaSourceResponse.getByteSpeedLimit());
- kafkaSource.setTopicPartitionOffset(kafkaSourceResponse.getTopicPartitionOffset());
- kafkaSource.setRecordSpeedLimit(kafkaSourceResponse.getRecordSpeedLimit());
+ kafkaSource.setState(State.parseByStatus(response.getStatus()));
+ kafkaSource.setAgentIp(response.getAgentIp());
+ kafkaSource.setTopic(response.getTopic());
+ kafkaSource.setBootstrapServers(response.getBootstrapServers());
+ kafkaSource.setByteSpeedLimit(response.getByteSpeedLimit());
+ kafkaSource.setTopicPartitionOffset(response.getTopicPartitionOffset());
+ kafkaSource.setRecordSpeedLimit(response.getRecordSpeedLimit());
kafkaSource.setSyncType(SyncType.FULL);
- kafkaSource.setDatabasePattern(kafkaSourceResponse.getDatabasePattern());
- kafkaSource.setTablePattern(kafkaSourceResponse.getTablePattern());
- kafkaSource.setIgnoreParseErrors(kafkaSourceResponse.isIgnoreParseErrors());
- kafkaSource.setTimestampFormatStandard(kafkaSourceResponse.getTimestampFormatStandard());
+ kafkaSource.setDatabasePattern(response.getDatabasePattern());
+ kafkaSource.setTablePattern(response.getTablePattern());
+ kafkaSource.setIgnoreParseErrors(response.isIgnoreParseErrors());
+ kafkaSource.setTimestampFormatStandard(response.getTimestampFormatStandard());
+ kafkaSource.setFields(parseStreamFields(response.getFieldList()));
return kafkaSource;
}
- private static KafkaSource parseKafkaSource(KafkaSourceListResponse kafkaResponse) {
+ private static KafkaSource parseKafkaSource(KafkaSourceListResponse response) {
KafkaSource kafkaSource = new KafkaSource();
- kafkaSource.setSourceName(kafkaResponse.getSourceName());
- kafkaSource.setConsumerGroup(kafkaResponse.getGroupId());
- kafkaSource.setState(State.parseByStatus(kafkaResponse.getStatus()));
- DataFormat dataFormat = DataFormat.forName(kafkaResponse.getSerializationType());
+ kafkaSource.setSourceName(response.getSourceName());
+ kafkaSource.setConsumerGroup(response.getGroupId());
+ kafkaSource.setState(State.parseByStatus(response.getStatus()));
+ DataFormat dataFormat = DataFormat.forName(response.getSerializationType());
kafkaSource.setDataFormat(dataFormat);
- kafkaSource.setTopic(kafkaResponse.getTopic());
- kafkaSource.setBootstrapServers(kafkaResponse.getBootstrapServers());
- kafkaSource.setByteSpeedLimit(kafkaResponse.getByteSpeedLimit());
- kafkaSource.setTopicPartitionOffset(kafkaResponse.getTopicPartitionOffset());
+ kafkaSource.setTopic(response.getTopic());
+ kafkaSource.setBootstrapServers(response.getBootstrapServers());
+ kafkaSource.setByteSpeedLimit(response.getByteSpeedLimit());
+ kafkaSource.setTopicPartitionOffset(response.getTopicPartitionOffset());
- KafkaOffset offset = KafkaOffset.forName(kafkaResponse.getAutoOffsetReset());
+ KafkaOffset offset = KafkaOffset.forName(response.getAutoOffsetReset());
kafkaSource.setAutoOffsetReset(offset);
- kafkaSource.setRecordSpeedLimit(kafkaResponse.getRecordSpeedLimit());
+ kafkaSource.setRecordSpeedLimit(response.getRecordSpeedLimit());
kafkaSource.setSyncType(SyncType.FULL);
return kafkaSource;
}
@@ -171,6 +177,7 @@ public class InlongStreamSourceTransfer {
if (StringUtils.isNotBlank(response.getTableWhiteList())) {
binlogSource.setTableNames(Arrays.asList(response.getTableWhiteList().split(",")));
}
+ binlogSource.setFields(parseStreamFields(response.getFieldList()));
return binlogSource;
}
@@ -208,6 +215,7 @@ public class InlongStreamSourceTransfer {
fileSource.setPattern(response.getPattern());
fileSource.setIp(response.getIp());
fileSource.setTimeOffset(response.getTimeOffset());
+ fileSource.setFields(parseStreamFields(response.getFieldList()));
return fileSource;
}
@@ -228,6 +236,7 @@ public class InlongStreamSourceTransfer {
autoPushSource.setState(State.parseByStatus(response.getStatus()));
autoPushSource.setDataFormat(DataFormat.NONE);
autoPushSource.setDataProxyGroup(response.getDataProxyGroup());
+ autoPushSource.setFields(parseStreamFields(response.getFieldList()));
return autoPushSource;
}
@@ -240,11 +249,11 @@ public class InlongStreamSourceTransfer {
return autoPushSource;
}
- private static KafkaSourceRequest createKafkaSourceRequest(KafkaSource kafkaSource, InlongStreamInfo stream) {
+ private static KafkaSourceRequest createKafkaSourceRequest(KafkaSource kafkaSource, InlongStreamInfo streamInfo) {
KafkaSourceRequest sourceRequest = new KafkaSourceRequest();
sourceRequest.setSourceName(kafkaSource.getSourceName());
- sourceRequest.setInlongGroupId(stream.getInlongGroupId());
- sourceRequest.setInlongStreamId(stream.getInlongStreamId());
+ sourceRequest.setInlongGroupId(streamInfo.getInlongGroupId());
+ sourceRequest.setInlongStreamId(streamInfo.getInlongStreamId());
sourceRequest.setSourceType(kafkaSource.getSourceType().getType());
sourceRequest.setBootstrapServers(kafkaSource.getBootstrapServers());
sourceRequest.setTopic(kafkaSource.getTopic());
@@ -258,6 +267,7 @@ public class InlongStreamSourceTransfer {
sourceRequest.setTablePattern(kafkaSource.getTablePattern());
sourceRequest.setIgnoreParseErrors(kafkaSource.isIgnoreParseErrors());
sourceRequest.setTimestampFormatStandard(kafkaSource.getTimestampFormatStandard());
+ sourceRequest.setFieldList(createStreamFields(kafkaSource.getFields(), streamInfo));
return sourceRequest;
}
@@ -288,6 +298,7 @@ public class InlongStreamSourceTransfer {
sourceRequest.setSnapshotMode("initial");
sourceRequest.setIntervalMs("500");
sourceRequest.setTimestampFormatStandard(binlogSource.getTimestampFormatStandard());
+ sourceRequest.setFieldList(createStreamFields(binlogSource.getFields(), streamInfo));
return sourceRequest;
}
@@ -308,6 +319,7 @@ public class InlongStreamSourceTransfer {
}
sourceRequest.setPattern(fileSource.getPattern());
sourceRequest.setTimeOffset(fileSource.getTimeOffset());
+ sourceRequest.setFieldList(createStreamFields(fileSource.getFields(), streamInfo));
return sourceRequest;
}
@@ -322,6 +334,23 @@ public class InlongStreamSourceTransfer {
sourceRequest.setInlongStreamId(streamInfo.getInlongStreamId());
sourceRequest.setSourceType(source.getSourceType().getType());
sourceRequest.setDataProxyGroup(source.getDataProxyGroup());
+ sourceRequest.setFieldList(createStreamFields(source.getFields(), streamInfo));
return sourceRequest;
}
+
+ private static List<InlongStreamFieldInfo> createStreamFields(
+ List<StreamField> fields, InlongStreamInfo streamInfo) {
+ if (CollectionUtils.isEmpty(fields)) {
+ return null;
+ }
+ return InlongStreamTransfer.createStreamFields(fields, streamInfo);
+ }
+
+ private static List<StreamField> parseStreamFields(List<InlongStreamFieldInfo> fields) {
+ if (CollectionUtils.isEmpty(fields)) {
+ return null;
+ }
+ return fields.stream().map(fieldInfo -> CommonBeanUtils.copyProperties(fieldInfo, StreamField::new))
+ .collect(Collectors.toList());
+ }
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
index e4cb801c7..f72e27d7f 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
@@ -20,7 +20,7 @@ package org.apache.inlong.manager.client.api.util;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.client.api.InlongStreamConf;
-import org.apache.inlong.manager.client.api.StreamField;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransformTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransformTransfer.java
new file mode 100644
index 000000000..d1fd7d76e
--- /dev/null
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransformTransfer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.util;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Sets;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.client.api.StreamTransform;
+import org.apache.inlong.manager.client.api.transform.MultiDependencyTransform;
+import org.apache.inlong.manager.client.api.transform.SingleDependencyTransform;
+import org.apache.inlong.manager.common.enums.TransformType;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.transform.TransformDefinition;
+import org.apache.inlong.manager.common.pojo.transform.TransformRequest;
+import org.apache.inlong.manager.common.pojo.transform.TransformResponse;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.common.util.TransformDefinitionUtils;
+
+import java.util.List;
+
+public class InlongStreamTransformTransfer {
+
+ public static TransformRequest createTransformRequest(StreamTransform streamTransform,
+ InlongStreamInfo streamInfo) {
+ TransformRequest transformRequest = new TransformRequest();
+ Preconditions.checkNotEmpty(streamTransform.getTransformName(), "TransformName should not be null");
+ transformRequest.setTransformName(streamTransform.getTransformName());
+ transformRequest.setInlongGroupId(streamInfo.getInlongGroupId());
+ transformRequest.setInlongStreamId(streamInfo.getInlongStreamId());
+ Preconditions.checkNotNull(streamTransform.getTransformDefinition(), "TransformDefinition should not be null");
+ TransformDefinition transformDefinition = streamTransform.getTransformDefinition();
+ transformRequest.setTransformType(transformDefinition.getTransformType().getType());
+ transformRequest.setVersion(1);
+ transformRequest.setTransformDefinition(GsonUtil.toJson(transformDefinition));
+ if (CollectionUtils.isNotEmpty(streamTransform.getPreNodes())) {
+ transformRequest.setPreNodeNames(Joiner.on(",").join(streamTransform.getPreNodes()));
+ }
+ if (CollectionUtils.isNotEmpty(streamTransform.getPostNodes())) {
+ transformRequest.setPostNodeNames(Joiner.on(",").join(streamTransform.getPostNodes()));
+ }
+ return transformRequest;
+ }
+
+ public static StreamTransform parseStreamTransform(TransformResponse transformResponse) {
+ TransformType transformType = TransformType.forType(transformResponse.getTransformType());
+ String transformDefinitionStr = transformResponse.getTransformDefinition();
+ TransformDefinition transformDefinition = TransformDefinitionUtils.parseTransformDefinition(
+ transformDefinitionStr, transformType);
+ String transformName = transformResponse.getTransformName();
+ String preNodeNames = transformResponse.getPreNodeNames();
+ List<String> preNodes = Splitter.on(",").splitToList(preNodeNames);
+ StreamTransform streamTransform = null;
+ if (preNodes.size() > 1) {
+ streamTransform = new MultiDependencyTransform(transformName, transformDefinition,
+ preNodes.toArray(new String[]{}));
+ } else {
+ streamTransform = new SingleDependencyTransform(transformName, transformDefinition,
+ preNodes.get(0));
+ }
+ String postNodeNames = transformResponse.getPostNodeNames();
+ if (StringUtils.isNotEmpty(postNodeNames)) {
+ List<String> postNodes = Splitter.on(",").splitToList(postNodeNames);
+ streamTransform.setPostNodes(Sets.newHashSet(postNodes));
+ }
+ return streamTransform;
+ }
+
+}
diff --git a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongStreamImplTest.java b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongStreamImplTest.java
new file mode 100644
index 000000000..bbe8e1bde
--- /dev/null
+++ b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongStreamImplTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.impl;
+
+import org.apache.inlong.manager.client.api.InlongStream;
+import org.apache.inlong.manager.client.api.StreamTransform;
+import org.apache.inlong.manager.client.api.sink.ClickHouseSink;
+import org.apache.inlong.manager.client.api.sink.HiveSink;
+import org.apache.inlong.manager.client.api.sink.KafkaSink;
+import org.apache.inlong.manager.client.api.source.KafkaSource;
+import org.apache.inlong.manager.client.api.source.MySQLBinlogSource;
+import org.apache.inlong.manager.client.api.transform.MultiDependencyTransform;
+import org.apache.inlong.manager.client.api.transform.SingleDependencyTransform;
+import org.apache.inlong.manager.client.api.util.GsonUtil;
+import org.apache.inlong.manager.common.pojo.stream.StreamPipeline;
+import org.apache.inlong.manager.common.pojo.transform.filter.FilterDefinition;
+import org.apache.inlong.manager.common.pojo.transform.filter.FilterDefinition.FilterStrategy;
+import org.apache.inlong.manager.common.pojo.transform.joiner.JoinerDefinition;
+import org.apache.inlong.manager.common.pojo.transform.joiner.JoinerDefinition.JoinMode;
+import org.apache.inlong.manager.common.pojo.transform.splitter.SplitterDefinition;
+import org.assertj.core.util.Lists;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class InlongStreamImplTest {
+
+ @Test
+ public void testCreatePipeline() {
+ InlongStream inlongStream = new InlongStreamImpl();
+ // add stream source
+ KafkaSource kafkaSource = new KafkaSource();
+ kafkaSource.setSourceName("A");
+ MySQLBinlogSource mySQLBinlogSource = new MySQLBinlogSource();
+ mySQLBinlogSource.setSourceName("B");
+ inlongStream.addSource(kafkaSource);
+ inlongStream.addSource(mySQLBinlogSource);
+ // add stream sink
+ ClickHouseSink clickHouseSink = new ClickHouseSink();
+ clickHouseSink.setSinkName("E");
+ HiveSink hiveSink = new HiveSink();
+ hiveSink.setSinkName("F");
+ KafkaSink kafkaSink1 = new KafkaSink();
+ kafkaSink1.setSinkName("I");
+ KafkaSink kafkaSink2 = new KafkaSink();
+ kafkaSink2.setSinkName("M");
+ inlongStream.addSink(clickHouseSink);
+ inlongStream.addSink(hiveSink);
+ inlongStream.addSink(kafkaSink1);
+ inlongStream.addSink(kafkaSink2);
+ // add stream transform
+ StreamTransform multiDependencyTransform = new MultiDependencyTransform(
+ "C",
+ new JoinerDefinition(kafkaSource, mySQLBinlogSource, Lists.newArrayList(), Lists.newArrayList(),
+ JoinMode.INNER_JOIN),
+ "A", "B");
+ StreamTransform singleDependencyTransform1 = new SingleDependencyTransform(
+ "D", new FilterDefinition(FilterStrategy.REMOVE, Lists.newArrayList()), "C", "E", "F"
+ );
+
+ StreamTransform singleDependencyTransform2 = new SingleDependencyTransform(
+ "G", new SplitterDefinition(Lists.newArrayList()), "C", "I"
+ );
+ inlongStream.addTransform(multiDependencyTransform);
+ inlongStream.addTransform(singleDependencyTransform1);
+ inlongStream.addTransform(singleDependencyTransform2);
+ StreamPipeline streamPipeline = inlongStream.createPipeline();
+ String pipelineView = GsonUtil.toJson(streamPipeline);
+ Assert.assertTrue(pipelineView.contains("{\"inputNodes\":[\"C\"],\"outputNodes\":[\"D\",\"G\"]"));
+ Assert.assertTrue(pipelineView.contains("{\"inputNodes\":[\"D\"],\"outputNodes\":[\"E\",\"F\"]}"));
+ }
+}
diff --git a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InnerInlongManagerClientTest.java b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InnerInlongManagerClientTest.java
index 9742174ba..3c51ea383 100644
--- a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InnerInlongManagerClientTest.java
+++ b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InnerInlongManagerClientTest.java
@@ -41,5 +41,4 @@ public class InnerInlongManagerClientTest {
List<FullStreamResponse> fullStreamResponseList = innerInlongManagerClient.listStreamInfo("test");
Assert.assertNull(fullStreamResponseList);
}
-
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
index 778a796ff..ae26600b3 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
@@ -23,8 +23,6 @@ package org.apache.inlong.manager.common.enums;
@Deprecated
public class Constant {
- public static final Integer UN_DELETED = 0;
- public static final Integer IS_DELETED = 1;
public static final String URL_SPLITTER = ",";
public static final String HOST_SPLITTER = ":";
@@ -38,28 +36,11 @@ public class Constant {
public static final String DATA_TYPE_KEY_VALUE = "KEY-VALUE";
- public static final String FILE_FORMAT_TEXT = "TextFile";
-
- public static final String FILE_FORMAT_ORC = "OrcFile";
-
- public static final String FILE_FORMAT_SEQUENCE = "SequenceFile";
-
- public static final String FILE_FORMAT_PARQUET = "Parquet";
-
public static final String SCHEMA_M0_DAY = "m0_day";
public static final String CLUSTER_TUBE = "TUBE";
public static final String CLUSTER_PULSAR = "PULSAR";
public static final String CLUSTER_TDMQ_PULSAR = "TDMQ_PULSAR";
- public static final String CLUSTER_DATA_PROXY = "DATA_PROXY";
-
- public static final String ID_IS_EMPTY = "primary key is empty";
-
- public static final String GROUP_ID_IS_EMPTY = "inlong group id is empty";
-
- public static final String STREAM_ID_IS_EMPTY = "inlong stream id is empty";
-
- public static final String REQUEST_IS_EMPTY = "request is empty";
public static final String PULSAR_TOPIC_TYPE_SERIAL = "SERIAL";
@@ -75,16 +56,4 @@ public class Constant {
public static final Integer DISABLE_CREATE_RESOURCE = 0; // Disable create resource
- public static final String CLUSTER_TUBE_MANAGER = "cluster_tube_manager";
-
- public static final String CLUSTER_TUBE_CLUSTER_ID = "cluster_tube_clusterId";
-
- public static final String PULSAR_ADMINURL = "pulsar_adminUrl";
-
- public static final String PULSAR_SERVICEURL = "pulsar_serviceUrl";
-
- public static final String TUBE_MASTER_URL = "tube_masterUrl";
-
- public static final String DATA_FLOW_GROUP_ID_KEY = "inlong.group.id";
-
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DBCollectorDetailTaskEntityStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DBCollectorDetailTaskEntityStatus.java
index 34388004c..59c9146b2 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DBCollectorDetailTaskEntityStatus.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DBCollectorDetailTaskEntityStatus.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.common.enums;
/**
* Entity status enum
*/
+@Deprecated
public enum DBCollectorDetailTaskEntityStatus {
INIT(0, "first inserted"),
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DBCollectorTaskConstant.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DBCollectorTaskConstant.java
index 1ba3fcd8d..db43a0439 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DBCollectorTaskConstant.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DBCollectorTaskConstant.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.common.enums;
/**
* Constant for db collector task
*/
+@Deprecated
public class DBCollectorTaskConstant {
public static final String INTERFACE_VERSION = "1.0";
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DBCollectorTaskReturnCode.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DBCollectorTaskReturnCode.java
index 796672458..5539990df 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DBCollectorTaskReturnCode.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DBCollectorTaskReturnCode.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.common.enums;
/**
* Operation type
*/
+@Deprecated
public enum DBCollectorTaskReturnCode {
SUCC(0, "success"),
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
index ff2ba5ec5..c16ceb3d5 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
@@ -23,6 +23,10 @@ public enum ErrorCodeEnum {
PERMISSION_REQUIRED(2003, "The current user does not have operation authority"),
AUTHENTICATION_REQUIRED(2004, "Authentication failed"),
+ ID_IS_EMPTY(101, "Primary key is empty"),
+ GROUP_ID_IS_EMPTY(102, "Inlong group id is empty"),
+ STREAM_ID_IS_EMPTY(103, "Inlong stream id is empty"),
+ REQUEST_IS_EMPTY(104, "Request is empty"),
USER_IS_NOT_MANAGER(110, "%s is not the manager, please contact %s"),
GROUP_NOT_FOUND(1001, "Inlong group does not exist/no operation authority"),
@@ -65,13 +69,8 @@ public enum ErrorCodeEnum {
SOURCE_ALREADY_EXISTS(1304, "Source already exist with the groupId and streamId"),
SOURCE_SAVE_FAILED(1305, "Failed to save or update source info"),
SOURCE_OPT_NOT_ALLOWED(1306, "Current status does not allow add/modification/delete source info"),
-
- SOURCE_DUPLICATE(1301, "Stream source already exists"),
- SOURCE_BASIC_NOT_FOUND(1302, "The basic information of the stream source does not exist"),
- SOURCE_DETAIL_NOT_FOUND(1303, "Stream source info does not exist"),
- SOURCE_TYPE_NOT_SUPPORTED(1304, "Source type is not supported"),
- SOURCE_BASIC_DELETE_HAS_DETAIL(1305,
- "The stream source contains detailed info and is not allowed to be deleted"),
+ SOURCE_TYPE_NOT_SAME(1307, "Expected source type is %s, but found %s"),
+ SOURCE_NAME_IS_NULL(1308, "Source name is null"),
HIVE_OPERATION_FAILED(1311, "Hive operation failed"),
@@ -90,7 +89,11 @@ public enum ErrorCodeEnum {
PARTITION_FIELD_NAME_IS_EMPTY(1412, "Partition field name cannot be empty"),
PARTITION_FIELD_NOT_FOUND(1413, "Sink partition field [%s] not found in sink field list"),
PARTITION_FIELD_NO_SOURCE_FIELD(1414, "Sink partition field [%s] must have a related source field name"),
+ SINK_TYPE_NOT_SAME(1415, "Expected sink type is %s, but found %s"),
+ SINK_NAME_IS_NULL(1416, "Sink name is null"),
+ TRANSFORM_TYPE_IS_NULL(1500, "Transform type is null"),
+ TRANSFORM_NAME_IS_NULL(1501, "Transform name is null"),
WORKFLOW_EXE_FAILED(4000, "Workflow execution exception"),
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MQType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MQType.java
index 855f74035..847652d44 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MQType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MQType.java
@@ -39,6 +39,6 @@ public enum MQType {
return mqType;
}
}
- throw new IllegalArgumentException(String.format("Unsupport queue=%s for Inlong", type));
+ throw new IllegalArgumentException(String.format("Unsupported queue=%s for Inlong", type));
}
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java
index 3eea7f22a..1c51ef520 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java
@@ -28,9 +28,6 @@ public enum SinkType {
public static final String SINK_ICEBERG = "ICEBERG";
public static final String SINK_CLICKHOUSE = "CLICKHOUSE";
- public static final String SINK_TYPE_IS_EMPTY = "Sink type is empty";
- public static final String SINK_TYPE_NOT_SAME = "Expected sink type is %s, but found %s";
-
/**
* Get the SinkType enum via the given sinkType string
*/
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
index 9d6415ca9..2ea9726fa 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
@@ -39,9 +39,6 @@ public enum SourceType {
public static final String SOURCE_BINLOG = "BINLOG";
public static final String SOURCE_KAFKA = "KAFKA";
- public static final String SOURCE_TYPE_IS_EMPTY = "Source type is empty";
- public static final String SOURCE_TYPE_NOT_SAME = "Expected source type is %s, but found %s";
-
@Getter
private final String type;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MQType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/TransformType.java
similarity index 55%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MQType.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/TransformType.java
index 855f74035..b4d326b91 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MQType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/TransformType.java
@@ -19,26 +19,49 @@ package org.apache.inlong.manager.common.enums;
import lombok.Getter;
-public enum MQType {
+import java.util.Locale;
- PULSAR("PULSAR"),
- TUBE("TUBE"),
- TDMQ_PULSAR("TDMQ_PULSAR"),
- NONE("NONE");
+public enum TransformType {
+
+ /**
+ * Replace string field based on regex
+ */
+ STRING_REPLACER("string_replacer"),
+
+ /**
+ * Split field by separator
+ */
+ SPLITTER("splitter"),
+
+ /**
+ * Filter stream records on given regulations
+ */
+ FILTER("filter"),
+
+ /**
+ * Remove duplication records on given fields
+ */
+ DE_DUPLICATION("de_duplication"),
+
+ /**
+ * Joins different sources in one stream
+ */
+ JOINER("joiner");
@Getter
private String type;
- MQType(String type) {
+ TransformType(String type) {
this.type = type;
}
- public static MQType forType(String type) {
- for (MQType mqType : values()) {
- if (mqType.getType().equals(type)) {
- return mqType;
+ public static TransformType forType(String type) {
+ for (TransformType transformType : values()) {
+ if (transformType.getType().equals(type) || transformType.getType().toUpperCase(Locale.ROOT).equals(type)) {
+ return transformType;
}
}
- throw new IllegalArgumentException(String.format("Unsupport queue=%s for Inlong", type));
+ throw new IllegalArgumentException(String.format("Unsupported transform=%s for Inlong", type));
}
+
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
index 1cc9bf76f..6cb66e5e8 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
@@ -22,8 +22,10 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
import javax.validation.constraints.NotNull;
+import java.util.List;
/**
* Request of source
@@ -69,4 +71,7 @@ public class SourceRequest {
@ApiModelProperty("Version")
private Integer version;
+ @ApiModelProperty("Field list, only support when inlong group in light weight mode")
+ private List<InlongStreamFieldInfo> fieldList;
+
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java
index 9c09ae51b..75b177a6a 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java
@@ -21,8 +21,10 @@ import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
import java.util.Date;
+import java.util.List;
/**
* Response of the stream source
@@ -84,4 +86,7 @@ public class SourceResponse {
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date modifyTime;
+ @ApiModelProperty(value = "Field list")
+ private List<InlongStreamFieldInfo> fieldList;
+
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamRequest.java
index b23e42db8..ba68f6d43 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamRequest.java
@@ -83,6 +83,9 @@ public class InlongStreamRequest extends InlongStreamBaseInfo {
@ApiModelProperty(value = "Names of responsible persons, separated by commas")
private String inCharges;
+ @ApiModelProperty(value = "StreamPipeline snapshot of stream,, string in JSON format")
+ private String tempView;
+
@ApiModelProperty(value = "Field list")
private List<InlongStreamFieldInfo> fieldList;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamField.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamField.java
similarity index 82%
rename from inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamField.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamField.java
index 3f487d0a5..b6a641d81 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamField.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamField.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.client.api;
+package org.apache.inlong.manager.common.pojo.stream;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
@@ -30,6 +30,9 @@ import org.apache.inlong.manager.common.enums.FieldType;
@ApiModel("Stream field configuration")
public class StreamField {
+ public static final StreamField PROCESSING_TIME = new StreamField(100, FieldType.BIGINT, "PROCESSING_TIME",
+ null, null, 1);
+
public StreamField(int index, FieldType fieldType, String fieldName, String fieldComment, String fieldValue) {
this.id = index;
this.fieldType = fieldType;
@@ -38,6 +41,12 @@ public class StreamField {
this.fieldValue = fieldValue;
}
+ public StreamField(int index, FieldType fieldType, String fieldName, String fieldComment, String fieldValue,
+ Integer isMetaField) {
+ this(index, fieldType, fieldName, fieldComment, fieldValue);
+ this.isMetaField = isMetaField;
+ }
+
@ApiModelProperty("Field index")
private Integer id;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamNode.java
similarity index 52%
copy from inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamNode.java
index 0c18cd558..bcea68d3c 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamNode.java
@@ -15,30 +15,37 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.client.api;
+package org.apache.inlong.manager.common.pojo.stream;
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
+import com.google.common.collect.Sets;
import lombok.Data;
-import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.util.Preconditions;
import java.util.List;
-import java.util.Map;
+import java.util.Set;
@Data
-@ApiModel("Stream sink configuration")
-public abstract class StreamSink {
+public class StreamNode {
- @ApiModelProperty(value = "DataSink name", required = true)
- private String sinkName;
+ protected Set<String> preNodes;
- @ApiModelProperty("Other properties if need")
- private Map<String, Object> properties;
+ protected Set<String> postNodes;
- public abstract SinkType getSinkType();
+ protected List<StreamField> fields;
- public abstract List<SinkField> getSinkFields();
-
- public abstract DataFormat getDataFormat();
+ public void addPre(String pre) {
+ Preconditions.checkNotEmpty(pre, "Pre node should not be empty");
+ if (preNodes == null) {
+ preNodes = Sets.newHashSet();
+ }
+ preNodes.add(pre);
+ }
+ public void addPost(String post) {
+ Preconditions.checkNotEmpty(post, "Post node should not be empty");
+ if (postNodes == null) {
+ postNodes = Sets.newHashSet();
+ }
+ postNodes.add(post);
+ }
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamNodeRelationship.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamNodeRelationship.java
new file mode 100644
index 000000000..280e6c0e4
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamNodeRelationship.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.stream;
+
+import com.google.common.collect.Sets;
+import lombok.Data;
+import org.apache.inlong.manager.common.util.Preconditions;
+
+import java.util.Set;
+
+@Data
+public class StreamNodeRelationship {
+
+ private Set<String> inputNodes;
+
+ private Set<String> outputNodes;
+
+ public StreamNodeRelationship() {
+ this(Sets.newHashSet(), Sets.newHashSet());
+ }
+
+ public StreamNodeRelationship(Set<String> inputNodes, Set<String> outputNodes) {
+ this.inputNodes = inputNodes;
+ this.outputNodes = outputNodes;
+ }
+
+ public void addInputNode(String inputNode) {
+ Preconditions.checkNotEmpty(inputNode, "Input node should not be empty");
+ inputNodes.add(inputNode);
+ }
+
+ public void addOutputNode(String outputNode) {
+ Preconditions.checkNotEmpty(outputNode, "Input node should not be empty");
+ outputNodes.add(outputNode);
+ }
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamPipeline.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamPipeline.java
new file mode 100644
index 000000000..6b12abb69
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/StreamPipeline.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.stream;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import lombok.Data;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.inlong.manager.common.util.Preconditions;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+@Data
+public class StreamPipeline {
+
+ private List<StreamNodeRelationship> pipeline;
+
+ public StreamPipeline() {
+ this(Lists.newArrayList());
+ }
+
+ public StreamPipeline(List<StreamNodeRelationship> pipeline) {
+ Preconditions.checkNotNull(pipeline, "Pipeline should not be null");
+ this.pipeline = pipeline;
+ }
+
+ public void addRelationShip(StreamNodeRelationship relationship) {
+ pipeline.add(relationship);
+ }
+
+ /**
+ * Check if pipeline has circle
+ * If has one, return circled node names;
+ *
+ * @return
+ */
+ public Pair<Boolean, Pair<String, String>> hasCircle() {
+ Map<String, Set<String>> priorityMap = Maps.newHashMap();
+ for (StreamNodeRelationship relationship : pipeline) {
+ Set<String> inputNodes = relationship.getInputNodes();
+ Set<String> outputNodes = relationship.getOutputNodes();
+ for (String inputNode : inputNodes) {
+ for (String outputNode : outputNodes) {
+ priorityMap.computeIfAbsent(inputNode, key -> Sets.newHashSet()).add(outputNode);
+ if (CollectionUtils.isEmpty(priorityMap.get(outputNode))) {
+ continue;
+ }
+ Set<String> priorityNodesOfOutput = priorityMap.get(outputNode);
+ if (priorityNodesOfOutput.contains(inputNode)) {
+ return Pair.of(true, Pair.of(inputNode, outputNode));
+ } else {
+ if (isReach(priorityMap, priorityNodesOfOutput, inputNode)) {
+ return Pair.of(true, Pair.of(inputNode, outputNode));
+ }
+ }
+ }
+ }
+ }
+ return Pair.of(false, null);
+ }
+
+ private boolean isReach(Map<String, Set<String>> paths, Set<String> inputs, String output) {
+ Queue<String> queue = new LinkedList<>();
+ queue.addAll(inputs);
+ Set<String> preNodes = new HashSet<>(inputs);
+ while (!queue.isEmpty()) {
+ String node = queue.remove();
+ if (paths.get(node) == null) {
+ continue;
+ }
+ Set<String> postNodes = paths.get(node);
+ if (postNodes.contains(output)) {
+ return true;
+ }
+ for (String postNode : postNodes) {
+ if (!inputs.contains(postNode)) {
+ preNodes.add(postNode);
+ queue.add(postNode);
+ }
+ }
+ }
+ return false;
+ }
+}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/TransformDefinition.java
similarity index 60%
copy from inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/TransformDefinition.java
index 5022d48d4..263f17a3f 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/TransformDefinition.java
@@ -15,18 +15,29 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service;
+package org.apache.inlong.manager.common.pojo.transform;
-import org.apache.inlong.manager.test.BaseTest;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.test.context.SpringBootTest;
+import com.fasterxml.jackson.annotation.JsonFormat;
+import lombok.Data;
+import org.apache.inlong.manager.common.enums.TransformType;
-@SpringBootApplication
-@SpringBootTest(classes = ServiceBaseTest.class)
-public class ServiceBaseTest extends BaseTest {
+@Data
+public abstract class TransformDefinition {
- public final String globalGroupId = "b_group1";
- public final String globalStreamId = "stream1";
- public final String globalOperator = "admin";
+ protected TransformType transformType;
+ @JsonFormat
+ public enum OperationType {
+ lt, le, eq, ne, ge, gt, is_null, not_null
+ }
+
+ @JsonFormat
+ public enum ScriptType {
+ PYTHON, JAVA
+ }
+
+ @JsonFormat
+ public enum RuleRelation {
+ AND, OR
+ }
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/TransformRequest.java
similarity index 53%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/TransformRequest.java
index 1cc9bf76f..79b83c5c9 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/TransformRequest.java
@@ -15,10 +15,8 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.common.pojo.source;
+package org.apache.inlong.manager.common.pojo.transform;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
@@ -26,14 +24,13 @@ import lombok.Data;
import javax.validation.constraints.NotNull;
/**
- * Request of source
+ * Request of transform
*/
@Data
-@ApiModel("Request of source")
-@JsonTypeInfo(use = Id.NAME, visible = true, property = "sourceType")
-public class SourceRequest {
+@ApiModel("Request of stream transform")
+public class TransformRequest {
- private Integer id;
+ private int id;
@NotNull
@ApiModelProperty("Inlong group id")
@@ -44,29 +41,26 @@ public class SourceRequest {
private String inlongStreamId;
@NotNull
- @ApiModelProperty("Source type, including: FILE, KAFKA, etc.")
- private String sourceType;
+ @ApiModelProperty("Transform name, unique in one stream")
+ private String transformName;
@NotNull
- @ApiModelProperty("Source name, unique in one stream")
- private String sourceName;
+ @ApiModelProperty("Transform type, including: splitter, filter, joiner, etc.")
+ private String transformType;
- @ApiModelProperty("Mac uuid of the agent running the task")
- private String uuid;
-
- @ApiModelProperty("Id of the source server")
- private Integer serverId;
-
- @ApiModelProperty("Id of the cluster that collected this source")
- private Integer clusterId;
+ @NotNull
+ @ApiModelProperty("Pre node names of transform in this stream, join by ','")
+ private String preNodeNames = "";
- @ApiModelProperty("Serialization type, support: csv, json, canal, avro, etc")
- private String serializationType;
+ @NotNull
+ @ApiModelProperty("Post node names of transform in this stream, join by ','")
+ private String postNodeNames = "";
- @ApiModelProperty("Snapshot of the source task")
- private String snapshot;
+ @NotNull
+ @ApiModelProperty("Transform definition in json type")
+ private String transformDefinition;
- @ApiModelProperty("Version")
+ @ApiModelProperty("Version of transform")
private Integer version;
-
}
+
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/TransformResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/TransformResponse.java
new file mode 100644
index 000000000..abb17b654
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/TransformResponse.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.transform;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+
+/**
+ * Response of transform
+ */
+@Data
+@ApiModel("Response of the stream transform")
+public class TransformResponse {
+
+ private int id;
+
+ @ApiModelProperty("Inlong group id")
+ private String inlongGroupId;
+
+ @ApiModelProperty("Inlong stream id")
+ private String inlongStreamId;
+
+ @ApiModelProperty("Transform name, unique in one stream")
+ private String transformName;
+
+ @ApiModelProperty("Transform type, including: splitter, filter, joiner, etc.")
+ private String transformType;
+
+ @ApiModelProperty("Pre node names of transform in this stream")
+ private String preNodeNames;
+
+ @ApiModelProperty("Post node names of transform in this stream")
+ private String postNodeNames;
+
+ @ApiModelProperty("Transform definition in json type")
+ private String transformDefinition;
+
+ @ApiModelProperty("Version of transform")
+ private Integer version;
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/deduplication/DeDuplicationDefinition.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/deduplication/DeDuplicationDefinition.java
new file mode 100644
index 000000000..fbb83f85c
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/deduplication/DeDuplicationDefinition.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.transform.deduplication;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import lombok.Builder;
+import lombok.Data;
+import org.apache.inlong.manager.common.enums.TransformType;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
+import org.apache.inlong.manager.common.pojo.transform.TransformDefinition;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A class to define operation to duplicate message in a time duration.
+ * DupFields is needed to judge whether stream records is duplicate.
+ * TimingField is eventTime of stream records.
+ * Interval and timeunit is required to modify a time interval,
+ * during which duplicate records is operated;
+ */
+@Data
+@Builder
+public class DeDuplicationDefinition extends TransformDefinition {
+
+ public DeDuplicationDefinition(List<StreamField> dupFields,
+ StreamField timingField,
+ long interval,
+ TimeUnit timeUnit,
+ DeDuplicationStrategy deDuplicationStrategy) {
+ this.transformType = TransformType.DE_DUPLICATION;
+ this.dupFields = dupFields;
+ this.timingField = timingField;
+ this.interval = interval;
+ this.timeUnit = timeUnit;
+ this.deDuplicationStrategy = deDuplicationStrategy;
+ }
+
+ /**
+ * Duplicate fields for de_duplication transform
+ */
+ private List<StreamField> dupFields;
+
+ /**
+ * Event time field for de_duplication transform
+ */
+ private StreamField timingField;
+
+ /**
+ * Time interval for de_duplication transform
+ */
+ private long interval;
+
+ /**
+ * TimeUnit for de_duplication transform
+ */
+ private TimeUnit timeUnit;
+
+ @JsonFormat
+ public enum DeDuplicationStrategy {
+ REMOVE_ALL, RESERVE_FIRST, RESERVE_LAST
+ }
+
+ /**
+ * Strategy for de_duplication operation
+ */
+ private DeDuplicationStrategy deDuplicationStrategy;
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/filter/FilterDefinition.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/filter/FilterDefinition.java
new file mode 100644
index 000000000..983b976c8
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/filter/FilterDefinition.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.transform.filter;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import org.apache.inlong.manager.common.enums.TransformType;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
+import org.apache.inlong.manager.common.pojo.transform.TransformDefinition;
+
+import java.util.List;
+
+/**
+ * A class to define operation to filter stream records by different modes.
+ * Rule mode is more recommended then script mode
+ */
+@Data
+public class FilterDefinition extends TransformDefinition {
+
+ public FilterDefinition(FilterStrategy filterStrategy, List<FilterRule> filterRules) {
+ this.transformType = TransformType.FILTER;
+ this.filterStrategy = filterStrategy;
+ this.filterMode = FilterMode.RULE;
+ this.filterRules = filterRules;
+ }
+
+ public FilterDefinition(FilterStrategy filterStrategy, ScriptBase scriptBase) {
+ this.transformType = TransformType.FILTER;
+ this.filterStrategy = filterStrategy;
+ this.filterMode = FilterMode.SCRIPT;
+ this.scriptBase = scriptBase;
+ }
+
+ @JsonFormat
+ public enum FilterStrategy {
+ RETAIN, REMOVE
+ }
+
+ @JsonFormat
+ public enum FilterMode {
+ RULE, SCRIPT
+ }
+
+ /**
+ * Strategy for Filter transform
+ */
+ private FilterStrategy filterStrategy;
+
+ /**
+ * Mode for Filter transform
+ */
+ private FilterMode filterMode;
+
+ @Data
+ @AllArgsConstructor
+ public static class TargetValue {
+
+ /**
+ * If target value is constant, set targetConstant, or set targetField if not;
+ */
+ private boolean isConstant;
+
+ private StreamField targetField;
+
+ private String targetConstant;
+ }
+
+ /**
+ * Filter rule is about relationship between sourceField and targetValue;
+ * such as 'a >= b' or 'a is not null'
+ */
+ @Data
+ @AllArgsConstructor
+ public static class FilterRule {
+
+ private StreamField sourceField;
+
+ private OperationType operationType;
+
+ private TargetValue targetValue;
+
+ private RuleRelation relationWithPost;
+ }
+
+ private List<FilterRule> filterRules;
+
+ @Data
+ @AllArgsConstructor
+ public static class ScriptBase {
+
+ private ScriptType scriptType;
+
+ private String script;
+ }
+
+ private ScriptBase scriptBase;
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/joiner/JoinerDefinition.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/joiner/JoinerDefinition.java
new file mode 100644
index 000000000..a682c5ed7
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/joiner/JoinerDefinition.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.transform.joiner;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import lombok.Builder;
+import lombok.Data;
+import org.apache.inlong.manager.common.enums.TransformType;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
+import org.apache.inlong.manager.common.pojo.stream.StreamNode;
+import org.apache.inlong.manager.common.pojo.transform.TransformDefinition;
+
+import java.util.List;
+
+/**
+ * A class to define operations to join two streamNode in one with relation defined.
+ */
+@Data
+@Builder
+public class JoinerDefinition extends TransformDefinition {
+
+ public JoinerDefinition(StreamNode leftNode,
+ StreamNode rightNode,
+ List<StreamField> leftJoinFields,
+ List<StreamField> rightJoinFields,
+ JoinMode joinMode) {
+ this.transformType = TransformType.JOINER;
+ this.leftNode = leftNode;
+ this.rightNode = rightNode;
+ this.leftJoinFields = leftJoinFields;
+ this.rightJoinFields = rightJoinFields;
+ this.joinMode = joinMode;
+ }
+
+ /**
+ * Left node for join
+ */
+ private StreamNode leftNode;
+
+ /**
+ * Right node for join
+ */
+ private StreamNode rightNode;
+
+ /**
+ * Join streamFields from left node
+ */
+ private List<StreamField> leftJoinFields;
+
+ /**
+ * Join streamFields from right node
+ */
+ private List<StreamField> rightJoinFields;
+
+ @JsonFormat
+ public enum JoinMode {
+ LEFT_JOIN, RIGHT_JOIN, INNER_JOIN
+ }
+
+ /**
+ * Join mode for join transform
+ */
+ private JoinMode joinMode;
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/replacer/StringReplacerDefinition.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/replacer/StringReplacerDefinition.java
new file mode 100644
index 000000000..8bcde2005
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/replacer/StringReplacerDefinition.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.transform.replacer;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import org.apache.inlong.manager.common.enums.TransformType;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
+import org.apache.inlong.manager.common.pojo.transform.TransformDefinition;
+
+import java.util.List;
+
+/**
+ * A class to define operation to replace stream fields in stream records by ReplaceRule defined.
+ */
+@Data
+@Builder
+public class StringReplacerDefinition extends TransformDefinition {
+
+ public StringReplacerDefinition(List<ReplaceRule> replaceRules) {
+ this.transformType = TransformType.STRING_REPLACER;
+ this.replaceRules = replaceRules;
+ }
+
+ public enum ReplaceMode {
+ RELACE_ALL, RELACE_FIRST
+ }
+
+ /**
+ * ReplaceRule is aim to define a replace action to string fields;
+ * If field value match regex, will be replaced by targetValue in REPLACE_ALL/REPLACE_FIRST mode;
+ */
+ @Data
+ @AllArgsConstructor
+ public static class ReplaceRule {
+
+ private StreamField sourceField;
+
+ private String regex;
+
+ private String targetValue;
+
+ private ReplaceMode mode;
+
+ private boolean isCaseSensitive;
+ }
+
+ private List<ReplaceRule> replaceRules;
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/splitter/SplitterDefinition.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/splitter/SplitterDefinition.java
new file mode 100644
index 000000000..5d27de018
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/transform/splitter/SplitterDefinition.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.transform.splitter;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import org.apache.inlong.manager.common.enums.TransformType;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
+import org.apache.inlong.manager.common.pojo.transform.TransformDefinition;
+
+import java.util.List;
+
+/**
+ * A class to define operation to split fields according to SplitRule defined.
+ */
+@Data
+@Builder
+public class SplitterDefinition extends TransformDefinition {
+
+ public SplitterDefinition(List<SplitRule> splitRules) {
+ this.transformType = TransformType.SPLITTER;
+ this.splitRules = splitRules;
+ }
+
+ /**
+ * SplitterRule is aim to define a splitter action below:
+ * SourceField will be splitted to targetFields by seperator
+ */
+ @Data
+ @AllArgsConstructor
+ public static class SplitRule {
+
+ /**
+ * Field to split;
+ */
+ private StreamField sourceField;
+
+ /**
+ * String seperator to split sourceField;
+ */
+ private String seperator;
+
+ /**
+ * Fields generated when sourceField is splitted
+ * Use sourceName_0, sourceName_1, sourceName_2 if not set
+ */
+ private List<String> targetFields;
+ }
+
+ /**
+ * Split rules for transform;
+ */
+ private List<SplitRule> splitRules;
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/settings/InlongGroupSettings.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/settings/InlongGroupSettings.java
index 2de3aa22b..a2717f9e7 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/settings/InlongGroupSettings.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/settings/InlongGroupSettings.java
@@ -19,73 +19,56 @@ package org.apache.inlong.manager.common.settings;
public class InlongGroupSettings {
- public static String PULSAR_ADMIN_URL = "pulsar.adminUrl";
+ public static final String DATA_FLOW_GROUP_ID_KEY = "inlong.group.id";
- public static String PULSAR_SERVICE_URL = "pulsar.serviceUrl";
-
- public static String PULSAR_AUTHENTICATION = "pulsar.authentication";
-
- public static String PULSAR_AUTHENTICATION_TYPE = "pulsar.authentication.type";
-
- public static String DEFAULT_PULSAR_AUTHENTICATION_TYPE = "token";
-
- public static String TUBE_MANAGER_URL = "tube.manager.url";
-
- public static String TUBE_MASTER_URL = "tube.master.url";
-
- public static String TUBE_CLUSTER_ID = "tube.cluster.id";
+ public static final String DATA_FLOW = "dataFlow";
/**
- * oceanus need param start
+ * config of pulsar
*/
- public static String SORT_JOB_ID = "sort.job.id";
-
- public static String ENDPOINT = "endpoint";
-
- public static String DATA_FLOW = "dataFlow";
-
- public static String SECRET_ID = "secretId";
-
- public static String SECRET_KEY = "secretKey";
+ public static final String PULSAR_ADMIN_URL = "pulsar.adminUrl";
- public static String REGION = "region";
+ public static final String PULSAR_SERVICE_URL = "pulsar.serviceUrl";
- public static String CLUSTER_ID = "clusterId";
+ public static final String PULSAR_AUTHENTICATION = "pulsar.authentication";
- public static String FS_ABSTRACT_FILE_SYSTEM_COSN_IMPL = "fs.AbstractFileSystem.cosn.impl";
+ public static final String PULSAR_AUTHENTICATION_TYPE = "pulsar.authentication.type";
- public static String FS_COSN_IMPL = "fs.cosn.impl";
+ public static final String DEFAULT_PULSAR_AUTHENTICATION_TYPE = "token";
- public static String FS_COSN_BUCKET_REGION = "fs.cosn.bucket.region";
-
- public static String FS_COSN_USERINFO_APPID = "fs.cosn.userinfo.appid";
-
- public static String FS_COSN_USERINFO_SECRET_ID = "fs.cosn.userinfo.secretId";
-
- public static String FS_COSN_USERINFO_SECRET_KEY = "fs.cosn.userinfo.secretKey";
+ /**
+ * config of tube mq
+ */
+ public static final String TUBE_MANAGER_URL = "tube.manager.url";
- public static String FS_ABSTRACT_FILE_SYSTEM_OFS_IMPL = "fs.AbstractFileSystem.ofs.impl";
+ public static final String TUBE_MASTER_URL = "tube.master.url";
- public static String FS_OFS_IMPL = "fs.ofs.impl";
+ public static final String TUBE_CLUSTER_ID = "tube.cluster.id";
- public static String FS_OFS_TMP_CACHE_DIR = "fs.ofs.tmp.cache.dir";
+ /**
+ * config of dataproxy
+ */
+ public static final String CLUSTER_DATA_PROXY = "DATA_PROXY";
- public static String FS_OFS_USER_APPID = "fs.ofs.user.appid";
+ /**
+ * config of sort
+ */
+ public static final String SORT_JOB_ID = "sort.job.id";
- public static String SORT_TYPE = "sort.type";
+ public static final String SORT_TYPE = "sort.type";
- public static String DEFAULT_SORT_TYPE = "flink";
+ public static final String DEFAULT_SORT_TYPE = "flink";
- public static String SORT_NAME = "sort.name";
+ public static final String SORT_NAME = "sort.name";
- public static String SORT_URL = "sort.url";
+ public static final String SORT_URL = "sort.url";
- public static String SORT_AUTHENTICATION = "sort.authentication";
+ public static final String SORT_AUTHENTICATION = "sort.authentication";
- public static String SORT_AUTHENTICATION_TYPE = "sort.authentication.type";
+ public static final String SORT_AUTHENTICATION_TYPE = "sort.authentication.type";
- public static String DEFAULT_SORT_AUTHENTICATION_TYPE = "secret_and_token";
+ public static final String DEFAULT_SORT_AUTHENTICATION_TYPE = "secret_and_token";
- public static String SORT_PROPERTIES = "sort.properties";
+ public static final String SORT_PROPERTIES = "sort.properties";
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/TransformDefinitionUtils.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/TransformDefinitionUtils.java
new file mode 100644
index 000000000..dc7c3de86
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/TransformDefinitionUtils.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.util;
+
+import com.google.gson.Gson;
+import org.apache.inlong.manager.common.enums.TransformType;
+import org.apache.inlong.manager.common.pojo.transform.TransformDefinition;
+import org.apache.inlong.manager.common.pojo.transform.deduplication.DeDuplicationDefinition;
+import org.apache.inlong.manager.common.pojo.transform.filter.FilterDefinition;
+import org.apache.inlong.manager.common.pojo.transform.joiner.JoinerDefinition;
+import org.apache.inlong.manager.common.pojo.transform.replacer.StringReplacerDefinition;
+import org.apache.inlong.manager.common.pojo.transform.splitter.SplitterDefinition;
+
+public class TransformDefinitionUtils {
+
+ private static Gson gson = new Gson();
+
+ public static TransformDefinition parseTransformDefinition(String transformDefinition,
+ TransformType transformType) {
+ switch (transformType) {
+ case FILTER:
+ return gson.fromJson(transformDefinition, FilterDefinition.class);
+ case JOINER:
+ return gson.fromJson(transformDefinition, JoinerDefinition.class);
+ case SPLITTER:
+ return gson.fromJson(transformDefinition, SplitterDefinition.class);
+ case DE_DUPLICATION:
+ return gson.fromJson(transformDefinition, DeDuplicationDefinition.class);
+ case STRING_REPLACER:
+ return gson.fromJson(transformDefinition, StringReplacerDefinition.class);
+ default:
+ throw new IllegalArgumentException(String.format("Unsupported transformType for %s", transformType));
+ }
+ }
+
+}
diff --git a/inlong-manager/manager-common/src/test/java/org/apache/inlong/manager/common/pojo/stream/StreamPipelineTest.java b/inlong-manager/manager-common/src/test/java/org/apache/inlong/manager/common/pojo/stream/StreamPipelineTest.java
new file mode 100644
index 000000000..456f67578
--- /dev/null
+++ b/inlong-manager/manager-common/src/test/java/org/apache/inlong/manager/common/pojo/stream/StreamPipelineTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.stream;
+
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.tuple.Pair;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class StreamPipelineTest {
+
+ @Test
+ public void testCheckHasCircle() {
+ StreamPipeline streamPipeline = new StreamPipeline();
+ streamPipeline.addRelationShip(new StreamNodeRelationship(Sets.newHashSet("A", "B"), Sets.newHashSet("C")));
+ streamPipeline.addRelationShip(new StreamNodeRelationship(Sets.newHashSet("C"), Sets.newHashSet("D")));
+ streamPipeline.addRelationShip(new StreamNodeRelationship(Sets.newHashSet("D"), Sets.newHashSet("E", "F")));
+ streamPipeline.addRelationShip(new StreamNodeRelationship(Sets.newHashSet("F"), Sets.newHashSet("G")));
+ streamPipeline.addRelationShip(new StreamNodeRelationship(Sets.newHashSet("E"), Sets.newHashSet("H", "C")));
+ Pair<Boolean, Pair<String, String>> circleState = streamPipeline.hasCircle();
+ Assert.assertTrue(circleState.getLeft());
+ Assert.assertTrue(Sets.newHashSet("E","C").contains(circleState.getRight().getLeft()));
+ Assert.assertTrue(Sets.newHashSet("E","C").contains(circleState.getRight().getRight()));
+ }
+
+}
diff --git a/inlong-manager/manager-common/src/test/java/org/apache/inlong/manager/common/pojo/transform/TransformDefinitionTest.java b/inlong-manager/manager-common/src/test/java/org/apache/inlong/manager/common/pojo/transform/TransformDefinitionTest.java
new file mode 100644
index 000000000..763ac04d5
--- /dev/null
+++ b/inlong-manager/manager-common/src/test/java/org/apache/inlong/manager/common/pojo/transform/TransformDefinitionTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.transform;
+
+import com.google.common.collect.Lists;
+import com.google.gson.Gson;
+import org.apache.inlong.manager.common.enums.FieldType;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
+import org.apache.inlong.manager.common.pojo.stream.StreamNode;
+import org.apache.inlong.manager.common.pojo.transform.TransformDefinition.OperationType;
+import org.apache.inlong.manager.common.pojo.transform.TransformDefinition.RuleRelation;
+import org.apache.inlong.manager.common.pojo.transform.deduplication.DeDuplicationDefinition;
+import org.apache.inlong.manager.common.pojo.transform.deduplication.DeDuplicationDefinition.DeDuplicationStrategy;
+import org.apache.inlong.manager.common.pojo.transform.filter.FilterDefinition;
+import org.apache.inlong.manager.common.pojo.transform.filter.FilterDefinition.FilterRule;
+import org.apache.inlong.manager.common.pojo.transform.filter.FilterDefinition.FilterStrategy;
+import org.apache.inlong.manager.common.pojo.transform.filter.FilterDefinition.TargetValue;
+import org.apache.inlong.manager.common.pojo.transform.joiner.JoinerDefinition;
+import org.apache.inlong.manager.common.pojo.transform.joiner.JoinerDefinition.JoinMode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class TransformDefinitionTest {
+
+ public class BlankStreamNode extends StreamNode {
+
+ }
+
+ public static Gson gson = new Gson();
+
+ @Test
+ public void testParseDeDuplicationDefinition() {
+ List<StreamField> streamFields = createStreamFields();
+ StreamField timingField = new StreamField(2, FieldType.TIMESTAMP, "event_time", null, null);
+ DeDuplicationDefinition deDuplicationDefinition = new DeDuplicationDefinition(streamFields, timingField, 100,
+ TimeUnit.MICROSECONDS, DeDuplicationStrategy.RESERVE_FIRST);
+ String definitionJson = gson.toJson(deDuplicationDefinition);
+ DeDuplicationDefinition parsedDefinition = gson.fromJson(definitionJson, DeDuplicationDefinition.class);
+ Assert.assertEquals(deDuplicationDefinition, parsedDefinition);
+ }
+
+ @Test
+ public void testParseFilterDefinition() {
+ List<FilterRule> filterRules = createFilterRule();
+ FilterDefinition filterDefinition = new FilterDefinition(FilterStrategy.RETAIN, filterRules);
+ String definitionJson = gson.toJson(filterDefinition);
+ FilterDefinition parsedDefinition = gson.fromJson(definitionJson, FilterDefinition.class);
+ Assert.assertEquals(filterDefinition, parsedDefinition);
+ }
+
+ @Test
+ public void testJoinerDefinition() {
+ List<StreamField> streamFields = createStreamFields();
+ StreamNode leftNode = new BlankStreamNode();
+ leftNode.setFields(streamFields);
+ StreamNode rightNode = new BlankStreamNode();
+ rightNode.setFields(streamFields);
+ JoinerDefinition joinerDefinition = new JoinerDefinition(leftNode, rightNode, streamFields, streamFields,
+ JoinMode.INNER_JOIN);
+ String definitionJson = gson.toJson(joinerDefinition);
+ JoinerDefinition parsedDefinition = gson.fromJson(definitionJson, JoinerDefinition.class);
+ Assert.assertEquals(joinerDefinition, parsedDefinition);
+ }
+
+ private List<StreamField> createStreamFields() {
+ List<StreamField> streamFieldList = Lists.newArrayList();
+ streamFieldList.add(new StreamField(0, FieldType.STRING, "name", null, null));
+ streamFieldList.add(new StreamField(1, FieldType.INT, "age", null, null));
+ return streamFieldList;
+ }
+
+ private List<FilterRule> createFilterRule() {
+ List<FilterRule> filterRules = Lists.newArrayList();
+ filterRules.add(new FilterRule(new StreamField(0, FieldType.STRING, "name", null, null),
+ OperationType.not_null, null, RuleRelation.OR));
+ filterRules.add(new FilterRule(new StreamField(1, FieldType.INT, "age", null, null),
+ OperationType.gt, new TargetValue(true, null, "50"), null));
+ return filterRules;
+ }
+
+}
diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateGroupConf.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceFieldEntity.java
similarity index 58%
copy from inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateGroupConf.java
copy to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceFieldEntity.java
index 21bb90d30..a9d3f9f60 100644
--- a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateGroupConf.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceFieldEntity.java
@@ -15,23 +15,39 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.client.cli;
+package org.apache.inlong.manager.dao.entity;
import lombok.Data;
-import org.apache.inlong.manager.client.api.InlongGroupConf;
-import org.apache.inlong.manager.client.api.InlongStreamConf;
-import org.apache.inlong.manager.client.api.StreamField;
-import org.apache.inlong.manager.client.api.StreamSink;
-import org.apache.inlong.manager.client.api.StreamSource;
-import java.util.List;
+import java.io.Serializable;
@Data
-public class CreateGroupConf {
-
- private InlongGroupConf groupConf;
- private InlongStreamConf streamConf;
- private List<StreamField> streamFieldList;
- private StreamSource streamSource;
- private StreamSink streamSink;
-}
+public class StreamSourceFieldEntity implements Serializable {
+ private Integer id;
+
+ private String inlongGroupId;
+
+ private String inlongStreamId;
+
+ private Integer sourceId;
+
+ private String sourceType;
+
+ private String fieldName;
+
+ private String fieldValue;
+
+ private String preExpression;
+
+ private String fieldType;
+
+ private String fieldComment;
+
+ private Short isMetaField;
+
+ private String fieldFormat;
+
+ private Short rankNum;
+
+ private Integer isDeleted;
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamTransformEntity.java
similarity index 55%
copy from inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
copy to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamTransformEntity.java
index 0c18cd558..b6efc4d60 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamTransformEntity.java
@@ -15,30 +15,43 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.client.api;
+package org.apache.inlong.manager.dao.entity;
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
-import org.apache.inlong.manager.common.enums.SinkType;
-import java.util.List;
-import java.util.Map;
+import java.io.Serializable;
+import java.util.Date;
@Data
-@ApiModel("Stream sink configuration")
-public abstract class StreamSink {
+public class StreamTransformEntity implements Serializable {
- @ApiModelProperty(value = "DataSink name", required = true)
- private String sinkName;
+ private static final long serialVersionUID = 1L;
- @ApiModelProperty("Other properties if need")
- private Map<String, Object> properties;
+ private int id;
- public abstract SinkType getSinkType();
+ private String inlongGroupId;
- public abstract List<SinkField> getSinkFields();
+ private String inlongStreamId;
- public abstract DataFormat getDataFormat();
+ private String transformName;
-}
+ private String transformType;
+
+ private String preNodeNames;
+
+ private String postNodeNames;
+
+ private String transformDefinition;
+
+ private Integer version;
+
+ private Integer isDeleted;
+
+ private String creator;
+
+ private String modifier;
+
+ private Date createTime;
+
+ private Date modifyTime;
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/CommonDbServerEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/CommonDbServerEntityMapper.java
index 95f3827d7..c9a607823 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/CommonDbServerEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/CommonDbServerEntityMapper.java
@@ -24,6 +24,7 @@ import org.apache.inlong.manager.common.pojo.commonserver.CommonDbServerPageRequ
import org.apache.inlong.manager.dao.entity.CommonDbServerEntity;
import org.springframework.stereotype.Repository;
+@Deprecated
@Repository
public interface CommonDbServerEntityMapper {
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/CommonFileServerEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/CommonFileServerEntityMapper.java
index b99090bb8..6192420a0 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/CommonFileServerEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/CommonFileServerEntityMapper.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.springframework.stereotype.Repository;
+@Deprecated
@Repository
public interface CommonFileServerEntityMapper {
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DBCollectorDetailTaskMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DBCollectorDetailTaskMapper.java
index 9c6d1d81e..c0b1661a2 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DBCollectorDetailTaskMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DBCollectorDetailTaskMapper.java
@@ -21,6 +21,7 @@ import org.apache.ibatis.annotations.Param;
import org.apache.inlong.manager.dao.entity.DBCollectorDetailTaskEntity;
import org.springframework.stereotype.Repository;
+@Deprecated
@Repository
public interface DBCollectorDetailTaskMapper {
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/CommonFileServerEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceFieldEntityMapper.java
similarity index 51%
copy from inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/CommonFileServerEntityMapper.java
copy to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceFieldEntityMapper.java
index b99090bb8..340a93160 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/CommonFileServerEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceFieldEntityMapper.java
@@ -17,31 +17,46 @@
package org.apache.inlong.manager.dao.mapper;
-import org.apache.inlong.manager.common.pojo.commonserver.CommonFileServerPageRequest;
-import org.apache.inlong.manager.dao.entity.CommonFileServerEntity;
+import org.apache.ibatis.annotations.Param;
+import org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity;
+import org.springframework.stereotype.Repository;
import java.util.List;
-import org.springframework.stereotype.Repository;
-
@Repository
-public interface CommonFileServerEntityMapper {
+public interface StreamSourceFieldEntityMapper {
int deleteByPrimaryKey(Integer id);
- int insert(CommonFileServerEntity record);
+ int insert(StreamSourceFieldEntity record);
- int insertSelective(CommonFileServerEntity record);
+ int insertSelective(StreamSourceFieldEntity record);
- CommonFileServerEntity selectByPrimaryKey(Integer id);
+ /**
+ * Selete undeleted source field by source id.
+ *
+ * @param sourceId
+ * @return
+ */
+ List<StreamSourceFieldEntity> selectBySourceId(@Param("sourceId") Integer sourceId);
- int updateByPrimaryKeySelective(CommonFileServerEntity record);
+ int updateByPrimaryKeySelective(StreamSourceFieldEntity record);
- int updateByPrimaryKey(CommonFileServerEntity record);
+ int updateByPrimaryKey(StreamSourceFieldEntity record);
- List<CommonFileServerEntity> selectByUsernameAndIpPort(String username, String ip, int port);
+ /**
+ * Insert all field list
+ *
+ * @param fieldList
+ */
+ void insertAll(@Param("list") List<StreamSourceFieldEntity> fieldList);
- List<CommonFileServerEntity> selectAll();
+ /**
+ * Delete all field list by sourceId
+ *
+ * @param sourceId
+ * @return
+ */
+ int deleteAll(@Param("sourceId") Integer sourceId);
- List<CommonFileServerEntity> selectByCondition(CommonFileServerPageRequest request);
}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DBCollectorDetailTaskMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamTransformEntityMapper.java
similarity index 60%
copy from inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DBCollectorDetailTaskMapper.java
copy to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamTransformEntityMapper.java
index 9c6d1d81e..88edd5e1d 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DBCollectorDetailTaskMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamTransformEntityMapper.java
@@ -18,16 +18,26 @@
package org.apache.inlong.manager.dao.mapper;
import org.apache.ibatis.annotations.Param;
-import org.apache.inlong.manager.dao.entity.DBCollectorDetailTaskEntity;
+import org.apache.inlong.manager.dao.entity.StreamTransformEntity;
import org.springframework.stereotype.Repository;
+import java.util.List;
+
@Repository
-public interface DBCollectorDetailTaskMapper {
+public interface StreamTransformEntityMapper {
+
+ int deleteById(Integer id);
+
+ int insert(StreamTransformEntity record);
+
+ int insertSelective(StreamTransformEntity record);
+
+ StreamTransformEntity selectById(Integer id);
- DBCollectorDetailTaskEntity selectOneByState(int state);
+ List<StreamTransformEntity> selectByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId,
+ @Param("transformName") String transformName);
- DBCollectorDetailTaskEntity selectByTaskId(int taskId);
+ int updateByIdSelective(StreamTransformEntity record);
- int changeState(@Param("id") int id, @Param("offset") int offset,
- @Param("oldState") int oldState, @Param("newState") int newState);
+ int updateById(StreamTransformEntity record);
}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml b/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml
index 1aae414bb..2378d1507 100644
--- a/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml
+++ b/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml
@@ -67,6 +67,12 @@
</javaClientGenerator>
<!-- Which entities to generate -->
+ <table tableName="stream_source_field" domainObjectName="StreamSourceFieldEntity"
+ enableSelectByPrimaryKey="true" enableUpdateByPrimaryKey="true"
+ enableDeleteByPrimaryKey="true" enableInsert="true"
+ enableCountByExample="false" enableDeleteByExample="false"
+ enableSelectByExample="false" enableUpdateByExample="false">
+ </table>
<!--<table tableName="user" domainObjectName="UserEntity"
enableSelectByPrimaryKey="true"
enableUpdateByPrimaryKey="true"
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml
new file mode 100644
index 000000000..123384e72
--- /dev/null
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml
@@ -0,0 +1,245 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="org.apache.inlong.manager.dao.mapper.StreamSourceFieldEntityMapper">
+ <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity">
+ <id column="id" jdbcType="INTEGER" property="id"/>
+ <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId"/>
+ <result column="inlong_stream_id" jdbcType="VARCHAR" property="inlongStreamId"/>
+ <result column="source_id" jdbcType="INTEGER" property="sourceId"/>
+ <result column="source_type" jdbcType="VARCHAR" property="sourceType"/>
+ <result column="field_name" jdbcType="VARCHAR" property="fieldName"/>
+ <result column="field_value" jdbcType="VARCHAR" property="fieldValue"/>
+ <result column="pre_expression" jdbcType="VARCHAR" property="preExpression"/>
+ <result column="field_type" jdbcType="VARCHAR" property="fieldType"/>
+ <result column="field_comment" jdbcType="VARCHAR" property="fieldComment"/>
+ <result column="is_meta_field" jdbcType="SMALLINT" property="isMetaField"/>
+ <result column="field_format" jdbcType="VARCHAR" property="fieldFormat"/>
+ <result column="rank_num" jdbcType="SMALLINT" property="rankNum"/>
+ <result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
+ </resultMap>
+ <sql id="Base_Column_List">
+ id
+ , inlong_group_id, inlong_stream_id, source_id, source_type, field_name, field_value,
+ pre_expression, field_type, field_comment, is_meta_field, field_format, rank_num,
+ is_deleted
+ </sql>
+ <select id="selectBySourceId" resultType="org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity">
+ select
+ <include refid="Base_Column_List"/>
+ from stream_source_field
+ where source_id = #{sourceId,jdbcType=INTEGER}
+ and is_deleted = 0
+ </select>
+ <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
+ delete
+ from stream_source_field
+ where id = #{id,jdbcType=INTEGER}
+ </delete>
+ <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity">
+ insert into stream_source_field (id, inlong_group_id, inlong_stream_id,
+ source_id, source_type, field_name,
+ field_value, pre_expression, field_type,
+ field_comment, is_meta_field, field_format,
+ rank_num, is_deleted)
+ values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
+ #{sourceId,jdbcType=INTEGER}, #{sourceType,jdbcType=VARCHAR}, #{fieldName,jdbcType=VARCHAR},
+ #{fieldValue,jdbcType=VARCHAR}, #{preExpression,jdbcType=VARCHAR}, #{fieldType,jdbcType=VARCHAR},
+ #{fieldComment,jdbcType=VARCHAR}, #{isMetaField,jdbcType=SMALLINT}, #{fieldFormat,jdbcType=VARCHAR},
+ #{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER})
+ </insert>
+ <insert id="insertSelective" parameterType="org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity">
+ insert into stream_source_field
+ <trim prefix="(" suffix=")" suffixOverrides=",">
+ <if test="id != null">
+ id,
+ </if>
+ <if test="inlongGroupId != null">
+ inlong_group_id,
+ </if>
+ <if test="inlongStreamId != null">
+ inlong_stream_id,
+ </if>
+ <if test="sourceId != null">
+ source_id,
+ </if>
+ <if test="sourceType != null">
+ source_type,
+ </if>
+ <if test="fieldName != null">
+ field_name,
+ </if>
+ <if test="fieldValue != null">
+ field_value,
+ </if>
+ <if test="preExpression != null">
+ pre_expression,
+ </if>
+ <if test="fieldType != null">
+ field_type,
+ </if>
+ <if test="fieldComment != null">
+ field_comment,
+ </if>
+ <if test="isMetaField != null">
+ is_meta_field,
+ </if>
+ <if test="fieldFormat != null">
+ field_format,
+ </if>
+ <if test="rankNum != null">
+ rank_num,
+ </if>
+ <if test="isDeleted != null">
+ is_deleted,
+ </if>
+ </trim>
+ <trim prefix="values (" suffix=")" suffixOverrides=",">
+ <if test="id != null">
+ #{id,jdbcType=INTEGER},
+ </if>
+ <if test="inlongGroupId != null">
+ #{inlongGroupId,jdbcType=VARCHAR},
+ </if>
+ <if test="inlongStreamId != null">
+ #{inlongStreamId,jdbcType=VARCHAR},
+ </if>
+ <if test="sourceId != null">
+ #{sourceId,jdbcType=INTEGER},
+ </if>
+ <if test="sourceType != null">
+ #{sourceType,jdbcType=VARCHAR},
+ </if>
+ <if test="fieldName != null">
+ #{fieldName,jdbcType=VARCHAR},
+ </if>
+ <if test="fieldValue != null">
+ #{fieldValue,jdbcType=VARCHAR},
+ </if>
+ <if test="preExpression != null">
+ #{preExpression,jdbcType=VARCHAR},
+ </if>
+ <if test="fieldType != null">
+ #{fieldType,jdbcType=VARCHAR},
+ </if>
+ <if test="fieldComment != null">
+ #{fieldComment,jdbcType=VARCHAR},
+ </if>
+ <if test="isMetaField != null">
+ #{isMetaField,jdbcType=SMALLINT},
+ </if>
+ <if test="fieldFormat != null">
+ #{fieldFormat,jdbcType=VARCHAR},
+ </if>
+ <if test="rankNum != null">
+ #{rankNum,jdbcType=SMALLINT},
+ </if>
+ <if test="isDeleted != null">
+ #{isDeleted,jdbcType=INTEGER},
+ </if>
+ </trim>
+ </insert>
+ <update id="updateByPrimaryKeySelective"
+ parameterType="org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity">
+ update stream_source_field
+ <set>
+ <if test="inlongGroupId != null">
+ inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
+ </if>
+ <if test="inlongStreamId != null">
+ inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
+ </if>
+ <if test="sourceId != null">
+ source_id = #{sourceId,jdbcType=INTEGER},
+ </if>
+ <if test="sourceType != null">
+ source_type = #{sourceType,jdbcType=VARCHAR},
+ </if>
+ <if test="fieldName != null">
+ field_name = #{fieldName,jdbcType=VARCHAR},
+ </if>
+ <if test="fieldValue != null">
+ field_value = #{fieldValue,jdbcType=VARCHAR},
+ </if>
+ <if test="preExpression != null">
+ pre_expression = #{preExpression,jdbcType=VARCHAR},
+ </if>
+ <if test="fieldType != null">
+ field_type = #{fieldType,jdbcType=VARCHAR},
+ </if>
+ <if test="fieldComment != null">
+ field_comment = #{fieldComment,jdbcType=VARCHAR},
+ </if>
+ <if test="isMetaField != null">
+ is_meta_field = #{isMetaField,jdbcType=SMALLINT},
+ </if>
+ <if test="fieldFormat != null">
+ field_format = #{fieldFormat,jdbcType=VARCHAR},
+ </if>
+ <if test="rankNum != null">
+ rank_num = #{rankNum,jdbcType=SMALLINT},
+ </if>
+ <if test="isDeleted != null">
+ is_deleted = #{isDeleted,jdbcType=INTEGER},
+ </if>
+ </set>
+ where id = #{id,jdbcType=INTEGER}
+ </update>
+ <update id="updateByPrimaryKey" parameterType="org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity">
+ update stream_source_field
+ set inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
+ inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
+ source_id = #{sourceId,jdbcType=INTEGER},
+ source_type = #{sourceType,jdbcType=VARCHAR},
+ field_name = #{fieldName,jdbcType=VARCHAR},
+ field_value = #{fieldValue,jdbcType=VARCHAR},
+ pre_expression = #{preExpression,jdbcType=VARCHAR},
+ field_type = #{fieldType,jdbcType=VARCHAR},
+ field_comment = #{fieldComment,jdbcType=VARCHAR},
+ is_meta_field = #{isMetaField,jdbcType=SMALLINT},
+ field_format = #{fieldFormat,jdbcType=VARCHAR},
+ rank_num = #{rankNum,jdbcType=SMALLINT},
+ is_deleted = #{isDeleted,jdbcType=INTEGER}
+ where id = #{id,jdbcType=INTEGER}
+ </update>
+
+ <insert id="insertAll" parameterType="org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity">
+ insert into stream_source_field (id, inlong_group_id, inlong_stream_id,
+ source_id, source_type, field_name,
+ field_value, pre_expression, field_type,
+ field_comment, is_meta_field, field_format,
+ rank_num, is_deleted)
+ values
+ <foreach collection="list" index="index" item="item" separator=",">
+ (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
+ #{sourceId,jdbcType=INTEGER}, #{sourceType,jdbcType=VARCHAR}, #{fieldName,jdbcType=VARCHAR},
+ #{fieldValue,jdbcType=VARCHAR}, #{preExpression,jdbcType=VARCHAR}, #{fieldType,jdbcType=VARCHAR},
+ #{fieldComment,jdbcType=VARCHAR}, #{isMetaField,jdbcType=SMALLINT}, #{fieldFormat,jdbcType=VARCHAR},
+ #{rankNum,jdbcType=SMALLINT}, #{isDeleted,jdbcType=INTEGER})
+ </foreach>
+ </insert>
+
+ <delete id="deleteAll">
+ delete
+ from stream_source_field
+ where source_id = #{sourceId,jdbcType=INTEGER}
+ </delete>
+</mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformEntityMapper.xml
new file mode 100644
index 000000000..6cddcec46
--- /dev/null
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamTransformEntityMapper.xml
@@ -0,0 +1,234 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="org.apache.inlong.manager.dao.mapper.StreamTransformEntityMapper">
+ <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.StreamTransformEntity">
+ <id column="id" jdbcType="INTEGER" property="id" />
+ <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId" />
+ <result column="inlong_stream_id" jdbcType="VARCHAR" property="inlongStreamId" />
+ <result column="transform_name" jdbcType="VARCHAR" property="transformName" />
+ <result column="transform_type" jdbcType="VARCHAR" property="transformType" />
+ <result column="pre_node_names" jdbcType="VARCHAR" property="preNodeNames" />
+ <result column="post_node_names" jdbcType="VARCHAR" property="postNodeNames" />
+ <result column="transform_definition" jdbcType="VARCHAR" property="transformDefinition" />
+ <result column="version" jdbcType="INTEGER" property="version" />
+ <result column="is_deleted" jdbcType="INTEGER" property="isDeleted" />
+ <result column="creator" jdbcType="VARCHAR" property="creator" />
+ <result column="modifier" jdbcType="VARCHAR" property="modifier" />
+ <result column="create_time" jdbcType="TIMESTAMP" property="createTime" />
+ <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime" />
+ </resultMap>
+ <sql id="Base_Column_List">
+ id, inlong_group_id, inlong_stream_id, transform_name, transform_type, pre_node_names,
+ post_node_names, transform_definition, version, is_deleted, creator, modifier, create_time,
+ modify_time
+ </sql>
+ <select id="selectById" parameterType="java.lang.Integer" resultMap="BaseResultMap">
+ select
+ <include refid="Base_Column_List" />
+ from stream_transform
+ where id = #{id,jdbcType=INTEGER}
+ </select>
+ <select id="selectByRelatedId" resultType="org.apache.inlong.manager.dao.entity.StreamTransformEntity">
+ select
+ <include refid="Base_Column_List"/>
+ from stream_transform
+ <where>
+ is_deleted = 0
+ and inlong_group_id = #{groupId, jdbcType=VARCHAR}
+ <if test="streamId != null and streamId != ''">
+ and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
+ </if>
+ <if test="transformName != null and transformName != ''">
+ and transform_name = #{transformName, jdbcType=VARCHAR}
+ </if>
+ </where>
+ </select>
+ <delete id="deleteById" parameterType="java.lang.Integer">
+ delete from stream_transform
+ where id = #{id,jdbcType=INTEGER}
+ </delete>
+ <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.StreamTransformEntity">
+ insert into stream_transform (id, inlong_group_id, inlong_stream_id,
+ transform_name, transform_type, pre_node_names,
+ post_node_names, transform_definition, version,
+ is_deleted, creator, modifier,
+ create_time, modify_time)
+ values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
+ #{transformName,jdbcType=VARCHAR}, #{transformType,jdbcType=VARCHAR}, #{preNodeNames,jdbcType=VARCHAR},
+ #{postNodeNames,jdbcType=VARCHAR}, #{transformDefinition,jdbcType=VARCHAR}, #{version,jdbcType=INTEGER},
+ #{isDeleted,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR},
+ #{createTime,jdbcType=TIMESTAMP}, #{modifyTime,jdbcType=TIMESTAMP})
+ </insert>
+ <insert id="insertSelective" parameterType="org.apache.inlong.manager.dao.entity.StreamTransformEntity">
+ insert into stream_transform
+ <trim prefix="(" suffix=")" suffixOverrides=",">
+ <if test="id != null">
+ id,
+ </if>
+ <if test="inlongGroupId != null">
+ inlong_group_id,
+ </if>
+ <if test="inlongStreamId != null">
+ inlong_stream_id,
+ </if>
+ <if test="transformName != null">
+ transform_name,
+ </if>
+ <if test="transformType != null">
+ transform_type,
+ </if>
+ <if test="preNodeNames != null">
+ pre_node_names,
+ </if>
+ <if test="postNodeNames != null">
+ post_node_names,
+ </if>
+ <if test="transformDefinition != null">
+ transform_definition,
+ </if>
+ <if test="version != null">
+ version,
+ </if>
+ <if test="isDeleted != null">
+ is_deleted,
+ </if>
+ <if test="creator != null">
+ creator,
+ </if>
+ <if test="modifier != null">
+ modifier,
+ </if>
+ <if test="createTime != null">
+ create_time,
+ </if>
+ <if test="modifyTime != null">
+ modify_time,
+ </if>
+ </trim>
+ <trim prefix="values (" suffix=")" suffixOverrides=",">
+ <if test="id != null">
+ #{id,jdbcType=INTEGER},
+ </if>
+ <if test="inlongGroupId != null">
+ #{inlongGroupId,jdbcType=VARCHAR},
+ </if>
+ <if test="inlongStreamId != null">
+ #{inlongStreamId,jdbcType=VARCHAR},
+ </if>
+ <if test="transformName != null">
+ #{transformName,jdbcType=VARCHAR},
+ </if>
+ <if test="transformType != null">
+ #{transformType,jdbcType=VARCHAR},
+ </if>
+ <if test="preNodeNames != null">
+ #{preNodeNames,jdbcType=VARCHAR},
+ </if>
+ <if test="postNodeNames != null">
+ #{postNodeNames,jdbcType=VARCHAR},
+ </if>
+ <if test="transformDefinition != null">
+ #{transformDefinition,jdbcType=VARCHAR},
+ </if>
+ <if test="version != null">
+ #{version,jdbcType=INTEGER},
+ </if>
+ <if test="isDeleted != null">
+ #{isDeleted,jdbcType=INTEGER},
+ </if>
+ <if test="creator != null">
+ #{creator,jdbcType=VARCHAR},
+ </if>
+ <if test="modifier != null">
+ #{modifier,jdbcType=VARCHAR},
+ </if>
+ <if test="createTime != null">
+ #{createTime,jdbcType=TIMESTAMP},
+ </if>
+ <if test="modifyTime != null">
+ #{modifyTime,jdbcType=TIMESTAMP},
+ </if>
+ </trim>
+ </insert>
+ <update id="updateByIdSelective" parameterType="org.apache.inlong.manager.dao.entity.StreamTransformEntity">
+ update stream_transform
+ <set>
+ <if test="inlongGroupId != null">
+ inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
+ </if>
+ <if test="inlongStreamId != null">
+ inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
+ </if>
+ <if test="transformName != null">
+ transform_name = #{transformName,jdbcType=VARCHAR},
+ </if>
+ <if test="transformType != null">
+ transform_type = #{transformType,jdbcType=VARCHAR},
+ </if>
+ <if test="preNodeNames != null">
+ pre_node_names = #{preNodeNames,jdbcType=VARCHAR},
+ </if>
+ <if test="postNodeNames != null">
+ post_node_names = #{postNodeNames,jdbcType=VARCHAR},
+ </if>
+ <if test="transformDefinition != null">
+ transform_definition = #{transformDefinition,jdbcType=VARCHAR},
+ </if>
+ <if test="version != null">
+ version = #{version,jdbcType=INTEGER},
+ </if>
+ <if test="isDeleted != null">
+ is_deleted = #{isDeleted,jdbcType=INTEGER},
+ </if>
+ <if test="creator != null">
+ creator = #{creator,jdbcType=VARCHAR},
+ </if>
+ <if test="modifier != null">
+ modifier = #{modifier,jdbcType=VARCHAR},
+ </if>
+ <if test="createTime != null">
+ create_time = #{createTime,jdbcType=TIMESTAMP},
+ </if>
+ <if test="modifyTime != null">
+ modify_time = #{modifyTime,jdbcType=TIMESTAMP},
+ </if>
+ </set>
+ where id = #{id,jdbcType=INTEGER}
+ </update>
+ <update id="updateById" parameterType="org.apache.inlong.manager.dao.entity.StreamTransformEntity">
+ update stream_transform
+ set inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
+ inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
+ transform_name = #{transformName,jdbcType=VARCHAR},
+ transform_type = #{transformType,jdbcType=VARCHAR},
+ pre_node_names = #{preNodeNames,jdbcType=VARCHAR},
+ post_node_names = #{postNodeNames,jdbcType=VARCHAR},
+ transform_definition = #{transformDefinition,jdbcType=VARCHAR},
+ version = #{version,jdbcType=INTEGER},
+ is_deleted = #{isDeleted,jdbcType=INTEGER},
+ creator = #{creator,jdbcType=VARCHAR},
+ modifier = #{modifier,jdbcType=VARCHAR},
+ create_time = #{createTime,jdbcType=TIMESTAMP},
+ modify_time = #{modifyTime,jdbcType=TIMESTAMP}
+ where id = #{id,jdbcType=INTEGER}
+ </update>
+</mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
index e446d7db7..15879aa30 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
@@ -19,47 +19,26 @@ package org.apache.inlong.manager.service;
import com.google.gson.Gson;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
-import org.apache.inlong.manager.common.beans.ClusterBean;
-import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupState;
import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
-import org.apache.inlong.manager.common.pojo.source.SourceResponse;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.ThirdPartyClusterEntity;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.ThirdPartyClusterEntityMapper;
-import org.apache.inlong.manager.service.core.InlongStreamService;
-import org.apache.inlong.manager.service.source.StreamSourceService;
-import org.apache.inlong.manager.service.thirdparty.sort.util.FieldInfoUtils;
-import org.apache.inlong.manager.service.thirdparty.sort.util.SinkInfoUtils;
-import org.apache.inlong.manager.service.thirdparty.sort.util.SourceInfoUtils;
-import org.apache.inlong.sort.protocol.DataFlowInfo;
-import org.apache.inlong.sort.protocol.FieldInfo;
-import org.apache.inlong.sort.protocol.sink.SinkInfo;
-import org.apache.inlong.sort.protocol.source.SourceInfo;
-import org.apache.inlong.sort.protocol.transformation.FieldMappingRule;
-import org.apache.inlong.sort.protocol.transformation.FieldMappingRule.FieldMappingUnit;
-import org.apache.inlong.sort.protocol.transformation.TransformationInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -71,12 +50,6 @@ public class CommonOperateService {
private static final Logger LOGGER = LoggerFactory.getLogger(CommonOperateService.class);
- @Autowired
- private ClusterBean clusterBean;
- @Autowired
- private InlongStreamService streamService;
- @Autowired
- private StreamSourceService streamSourceService;
@Autowired
private InlongGroupEntityMapper groupMapper;
@Autowired
@@ -95,14 +68,14 @@ public class CommonOperateService {
Map<String, String> params;
switch (key) {
- case Constant.PULSAR_SERVICEURL: {
+ case InlongGroupSettings.PULSAR_SERVICE_URL: {
clusterEntity = getMQCluster(MQType.PULSAR);
if (clusterEntity != null) {
result = clusterEntity.getUrl();
}
break;
}
- case Constant.PULSAR_ADMINURL: {
+ case InlongGroupSettings.PULSAR_ADMIN_URL: {
clusterEntity = getMQCluster(MQType.PULSAR);
if (clusterEntity != null) {
params = gson.fromJson(clusterEntity.getExtParams(), Map.class);
@@ -110,12 +83,12 @@ public class CommonOperateService {
}
break;
}
- case Constant.CLUSTER_TUBE_MANAGER:
- case Constant.CLUSTER_TUBE_CLUSTER_ID:
- case Constant.TUBE_MASTER_URL: {
+ case InlongGroupSettings.TUBE_MANAGER_URL:
+ case InlongGroupSettings.TUBE_CLUSTER_ID:
+ case InlongGroupSettings.TUBE_MASTER_URL: {
clusterEntity = getMQCluster(MQType.TUBE);
if (clusterEntity != null) {
- if (key.equals(Constant.TUBE_MASTER_URL)) {
+ if (key.equals(InlongGroupSettings.TUBE_MASTER_URL)) {
result = clusterEntity.getUrl();
} else {
params = gson.fromJson(clusterEntity.getExtParams(), Map.class);
@@ -136,7 +109,8 @@ public class CommonOperateService {
* TODO Add data_proxy_cluster_name for query.
*/
private ThirdPartyClusterEntity getMQCluster(MQType type) {
- List<ThirdPartyClusterEntity> clusterList = thirdPartyClusterMapper.selectByType(Constant.CLUSTER_DATA_PROXY);
+ List<ThirdPartyClusterEntity> clusterList = thirdPartyClusterMapper.selectByType(
+ InlongGroupSettings.CLUSTER_DATA_PROXY);
if (CollectionUtils.isEmpty(clusterList)) {
LOGGER.warn("no data proxy cluster found");
return null;
@@ -166,7 +140,7 @@ public class CommonOperateService {
Map<String, String> configParams = JsonUtils.parse(clusterEntity.getExtParams(), Map.class);
PulsarClusterInfo pulsarClusterInfo = PulsarClusterInfo.builder().brokerServiceUrl(
clusterEntity.getUrl()).token(clusterEntity.getToken()).build();
- String adminUrl = configParams.get(Constant.PULSAR_ADMINURL);
+ String adminUrl = configParams.get(InlongGroupSettings.PULSAR_ADMIN_URL);
Preconditions.checkNotNull(adminUrl, "adminUrl is empty, check third party cluster table");
pulsarClusterInfo.setAdminUrl(adminUrl);
pulsarClusterInfo.setType(clusterEntity.getType());
@@ -196,58 +170,4 @@ public class CommonOperateService {
return inlongGroupEntity;
}
-
- /**
- * Create dataflow info for sort.
- */
- public DataFlowInfo createDataFlow(InlongGroupInfo groupInfo, SinkResponse sinkResponse) {
- String groupId = sinkResponse.getInlongGroupId();
- String streamId = sinkResponse.getInlongStreamId();
- List<SourceResponse> sourceList = streamSourceService.listSource(groupId, streamId);
- if (CollectionUtils.isEmpty(sourceList)) {
- throw new WorkflowListenerException(String.format("Source not found by groupId=%s and streamId=%s",
- groupId, streamId));
- }
-
- // Get all field info
- List<FieldInfo> sourceFields = new ArrayList<>();
- List<FieldInfo> sinkFields = new ArrayList<>();
-
- // TODO Support more than one source and one sink
- final SourceResponse sourceResponse = sourceList.get(0);
- boolean isAllMigration = SourceInfoUtils.isBinlogAllMigration(sourceResponse);
-
- List<FieldMappingUnit> mappingUnitList;
- InlongStreamInfo streamInfo = streamService.get(groupId, streamId);
- if (isAllMigration) {
- mappingUnitList = FieldInfoUtils.setAllMigrationFieldMapping(sourceFields, sinkFields);
- } else {
- mappingUnitList = FieldInfoUtils.createFieldInfo(streamInfo.getFieldList(),
- sinkResponse.getFieldList(), sourceFields, sinkFields);
- }
-
- FieldMappingRule fieldMappingRule = new FieldMappingRule(mappingUnitList.toArray(new FieldMappingUnit[0]));
-
- // Get source info
- String masterAddress = getSpecifiedParam(Constant.TUBE_MASTER_URL);
- PulsarClusterInfo pulsarCluster = getPulsarClusterInfo(groupInfo.getMiddlewareType());
- SourceInfo sourceInfo = SourceInfoUtils.createSourceInfo(pulsarCluster, masterAddress, clusterBean,
- groupInfo, streamInfo, sourceResponse, sourceFields);
-
- // Get sink info
- SinkInfo sinkInfo = SinkInfoUtils.createSinkInfo(sourceResponse, sinkResponse, sinkFields);
-
- // Get transformation info
- TransformationInfo transInfo = new TransformationInfo(fieldMappingRule);
-
- // Get properties
- Map<String, Object> properties = new HashMap<>();
- if (MapUtils.isNotEmpty(sinkResponse.getProperties())) {
- properties.putAll(sinkResponse.getProperties());
- }
- properties.put(Constant.DATA_FLOW_GROUP_ID_KEY, groupId);
-
- return new DataFlowInfo(sinkResponse.getId(), sourceInfo, transInfo, sinkInfo, properties);
- }
-
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DBCollectorTaskServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DBCollectorTaskServiceImpl.java
index 51579cb5c..80c00d900 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DBCollectorTaskServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DBCollectorTaskServiceImpl.java
@@ -36,6 +36,7 @@ import java.util.Objects;
@Service
@Slf4j
+@Deprecated
public class DBCollectorTaskServiceImpl implements DBCollectorTaskService {
private static final Logger LOGGER = LoggerFactory.getLogger(DBCollectorTaskServiceImpl.class);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java
index eefec8892..e4d248861 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java
@@ -42,6 +42,7 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupPulsarInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
import org.apache.inlong.manager.common.pojo.group.InlongGroupTopicResponse;
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
+import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
@@ -174,7 +175,7 @@ public class InlongGroupServiceImpl implements InlongGroupService {
@Override
public InlongGroupInfo get(String groupId) {
LOGGER.debug("begin to get inlong group info by groupId={}", groupId);
- Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
+ Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
if (entity == null) {
LOGGER.error("inlong group not found by groupId={}", groupId);
@@ -200,7 +201,7 @@ public class InlongGroupServiceImpl implements InlongGroupService {
// For approved inlong group, encapsulate the cluster address of the middleware
if (GroupState.CONFIG_SUCCESSFUL == GroupState.forCode(groupInfo.getStatus())) {
if (mqType == MQType.TUBE) {
- groupInfo.setTubeMaster(commonOperateService.getSpecifiedParam(Constant.TUBE_MASTER_URL));
+ groupInfo.setTubeMaster(commonOperateService.getSpecifiedParam(InlongGroupSettings.TUBE_MASTER_URL));
} else if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
PulsarClusterInfo pulsarCluster = commonOperateService.getPulsarClusterInfo(mqType.name());
groupInfo.setPulsarAdminUrl(pulsarCluster.getAdminUrl());
@@ -253,7 +254,7 @@ public class InlongGroupServiceImpl implements InlongGroupService {
LOGGER.debug("begin to update inlong group={}", groupRequest);
Preconditions.checkNotNull(groupRequest, "inlong group is empty");
String groupId = groupRequest.getInlongGroupId();
- Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
+ Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
if (entity == null) {
@@ -328,7 +329,7 @@ public class InlongGroupServiceImpl implements InlongGroupService {
propagation = Propagation.REQUIRES_NEW)
public boolean updateStatus(String groupId, Integer status, String operator) {
LOGGER.info("begin to update group status to [{}] by groupId={}, username={}", status, groupId, operator);
- Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
+ Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
InlongGroupEntity entity = groupMapper.selectByGroupIdForUpdate(groupId);
if (entity == null) {
LOGGER.error("inlong group not found by groupId={}", groupId);
@@ -352,7 +353,7 @@ public class InlongGroupServiceImpl implements InlongGroupService {
@Override
public boolean delete(String groupId, String operator) {
LOGGER.debug("begin to delete inlong group, groupId={}", groupId);
- Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
+ Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
if (entity == null) {
@@ -401,7 +402,7 @@ public class InlongGroupServiceImpl implements InlongGroupService {
@Override
public boolean exist(String groupId) {
LOGGER.debug("begin to check inlong group, groupId={}", groupId);
- Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
+ Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
Integer count = groupMapper.selectIdentifierExist(groupId);
LOGGER.info("success to check inlong group");
@@ -441,12 +442,12 @@ public class InlongGroupServiceImpl implements InlongGroupService {
if (mqType == MQType.TUBE) {
// Tube Topic corresponds to inlong group one-to-one
topicVO.setMqResourceObj(groupInfo.getMqResourceObj());
- topicVO.setTubeMasterUrl(commonOperateService.getSpecifiedParam(Constant.TUBE_MASTER_URL));
+ topicVO.setTubeMasterUrl(commonOperateService.getSpecifiedParam(InlongGroupSettings.TUBE_MASTER_URL));
} else if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
// Pulsar's topic corresponds to the inlong stream one-to-one
topicVO.setDsTopicList(streamService.getTopicList(groupId));
- topicVO.setPulsarAdminUrl(commonOperateService.getSpecifiedParam(Constant.PULSAR_ADMINURL));
- topicVO.setPulsarServiceUrl(commonOperateService.getSpecifiedParam(Constant.PULSAR_SERVICEURL));
+ topicVO.setPulsarAdminUrl(commonOperateService.getSpecifiedParam(InlongGroupSettings.PULSAR_ADMIN_URL));
+ topicVO.setPulsarServiceUrl(commonOperateService.getSpecifiedParam(InlongGroupSettings.PULSAR_SERVICE_URL));
} else {
LOGGER.error("middleware type={} not supported", mqType);
throw new BusinessException(ErrorCodeEnum.MIDDLEWARE_TYPE_NOT_SUPPORTED);
@@ -465,7 +466,7 @@ public class InlongGroupServiceImpl implements InlongGroupService {
// Save the dataSchema, Topic and other information of the inlong group
Preconditions.checkNotNull(approveInfo, "InlongGroupApproveRequest is empty");
String groupId = approveInfo.getInlongGroupId();
- Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
+ Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
String middlewareType = approveInfo.getMiddlewareType();
Preconditions.checkNotNull(middlewareType, "Middleware type is empty");
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
index d0a31d5b1..b13d63f7e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
@@ -22,7 +22,6 @@ import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.enums.EntityStatus;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
@@ -35,10 +34,10 @@ import org.apache.inlong.manager.common.pojo.stream.FullStreamRequest;
import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamListResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamPageRequest;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamRequest;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamTopicResponse;
import org.apache.inlong.manager.common.pojo.stream.StreamBriefResponse;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
@@ -91,8 +90,8 @@ public class InlongStreamServiceImpl implements InlongStreamService {
Preconditions.checkNotNull(request, "inlong stream info is empty");
String groupId = request.getInlongGroupId();
String streamId = request.getInlongStreamId();
- Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
- Preconditions.checkNotNull(streamId, Constant.STREAM_ID_IS_EMPTY);
+ Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+ Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
// Check if it can be added
checkBizIsTempStatus(groupId);
@@ -122,8 +121,8 @@ public class InlongStreamServiceImpl implements InlongStreamService {
@Override
public InlongStreamInfo get(String groupId, String streamId) {
LOGGER.debug("begin to get inlong stream by groupId={}, streamId={}", groupId, streamId);
- Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
- Preconditions.checkNotNull(streamId, Constant.STREAM_ID_IS_EMPTY);
+ Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+ Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
if (streamEntity == null) {
@@ -141,7 +140,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
@Override
public Boolean exist(String groupId, String streamId) {
- Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
+ Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
return streamEntity != null;
}
@@ -199,9 +198,9 @@ public class InlongStreamServiceImpl implements InlongStreamService {
LOGGER.debug("begin to update inlong stream info={}", request);
Preconditions.checkNotNull(request, "inlong stream request is empty");
String groupId = request.getInlongGroupId();
- Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
+ Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
String streamId = request.getInlongStreamId();
- Preconditions.checkNotNull(streamId, Constant.STREAM_ID_IS_EMPTY);
+ Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
// Check if it can be modified
InlongGroupEntity inlongGroupEntity = this.checkBizIsTempStatus(groupId);
@@ -232,8 +231,8 @@ public class InlongStreamServiceImpl implements InlongStreamService {
@Override
public Boolean delete(String groupId, String streamId, String operator) {
LOGGER.debug("begin to delete inlong stream, groupId={}, streamId={}", groupId, streamId);
- Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
- Preconditions.checkNotNull(streamId, Constant.STREAM_ID_IS_EMPTY);
+ Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+ Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
// Check if it can be deleted
this.checkBizIsTempStatus(groupId);
@@ -274,7 +273,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
@Override
public Boolean logicDeleteAll(String groupId, String operator) {
LOGGER.debug("begin to delete all inlong stream by groupId={}", groupId);
- Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
+ Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
// Check if it can be deleted
this.checkBizIsTempStatus(groupId);
@@ -304,7 +303,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
@Override
public List<StreamBriefResponse> getBriefList(String groupId) {
LOGGER.debug("begin to get inlong stream brief list by groupId={}", groupId);
- Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
+ Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
List<InlongStreamEntity> entityList = streamMapper.selectByGroupId(groupId);
List<StreamBriefResponse> briefInfoList = CommonBeanUtils
@@ -400,7 +399,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
LOGGER.debug("begin to list full inlong stream page by {}", request);
}
Preconditions.checkNotNull(request, "request is empty");
- Preconditions.checkNotNull(request.getInlongGroupId(), Constant.GROUP_ID_IS_EMPTY);
+ Preconditions.checkNotNull(request.getInlongGroupId(), ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
// 1. Query all valid data sources under groupId
String groupId = request.getInlongGroupId();
@@ -459,7 +458,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
@Override
public List<InlongStreamTopicResponse> getTopicList(String groupId) {
LOGGER.debug("begin bo get topic list by group id={}", groupId);
- Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
+ Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
List<InlongStreamTopicResponse> topicList = streamMapper.selectTopicList(groupId);
LOGGER.debug("success to get topic list by groupId={}", groupId);
@@ -565,7 +564,6 @@ public class InlongStreamServiceImpl implements InlongStreamService {
for (InlongStreamFieldEntity entity : list) {
entity.setInlongGroupId(groupId);
entity.setInlongStreamId(streamId);
- entity.setIsDeleted(Constant.UN_DELETED);
}
streamFieldMapper.insertAll(list);
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StreamConfigLogServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StreamConfigLogServiceImpl.java
index 5b6726b41..2a8b899fb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StreamConfigLogServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StreamConfigLogServiceImpl.java
@@ -20,11 +20,7 @@ package org.apache.inlong.manager.service.core.impl;
import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
-import java.time.Duration;
-import java.time.Instant;
-import java.util.Date;
-import java.util.List;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogListResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogPageRequest;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogRequest;
@@ -38,6 +34,11 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Date;
+import java.util.List;
+
@Service
public class StreamConfigLogServiceImpl extends AbstractService<StreamConfigLogEntity>
implements StreamConfigLogService {
@@ -63,7 +64,7 @@ public class StreamConfigLogServiceImpl extends AbstractService<StreamConfigLogE
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("begin to list source page by " + request);
}
- Preconditions.checkNotNull(request.getInlongGroupId(), Constant.GROUP_ID_IS_EMPTY);
+ Preconditions.checkNotNull(request.getInlongGroupId(), ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
PageHelper.startPage(request.getPageNum(), request.getPageSize());
if (request.getReportTime() == null) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ThirdPartyClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ThirdPartyClusterServiceImpl.java
index b3fe99eb8..9ebcce253 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ThirdPartyClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ThirdPartyClusterServiceImpl.java
@@ -37,6 +37,7 @@ import org.apache.inlong.manager.common.pojo.cluster.ClusterPageRequest;
import org.apache.inlong.manager.common.pojo.cluster.ClusterRequest;
import org.apache.inlong.manager.common.pojo.cluster.ClusterResponse;
import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyResponse;
+import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.InLongStringUtils;
import org.apache.inlong.manager.common.util.Preconditions;
@@ -95,7 +96,6 @@ public class ThirdPartyClusterServiceImpl implements ThirdPartyClusterService {
}
Preconditions.checkNotNull(entity.getCreator(), "cluster creator is empty");
entity.setCreateTime(new Date());
- entity.setIsDeleted(Constant.UN_DELETED);
thirdPartyClusterMapper.insert(entity);
LOGGER.info("success to add a cluster");
return entity.getId();
@@ -184,9 +184,10 @@ public class ThirdPartyClusterServiceImpl implements ThirdPartyClusterService {
if (StringUtils.isNotBlank(clusterName)) {
entity = thirdPartyClusterMapper.selectByName(clusterName);
} else {
- List<ThirdPartyClusterEntity> list = thirdPartyClusterMapper.selectByType(Constant.CLUSTER_DATA_PROXY);
+ List<ThirdPartyClusterEntity> list = thirdPartyClusterMapper.selectByType(
+ InlongGroupSettings.CLUSTER_DATA_PROXY);
if (CollectionUtils.isEmpty(list)) {
- LOGGER.warn("data proxy cluster not found by type=" + Constant.CLUSTER_DATA_PROXY);
+ LOGGER.warn("data proxy cluster not found by type=" + InlongGroupSettings.CLUSTER_DATA_PROXY);
return null;
}
entity = list.get(0);
@@ -196,7 +197,7 @@ public class ThirdPartyClusterServiceImpl implements ThirdPartyClusterService {
LOGGER.warn("data proxy cluster not found by name={}", clusterName);
return null;
}
- if (!Constant.CLUSTER_DATA_PROXY.equals(entity.getType())) {
+ if (!InlongGroupSettings.CLUSTER_DATA_PROXY.equals(entity.getType())) {
LOGGER.warn("expected cluster type is DATA_PROXY, but found {}", entity.getType());
return null;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index f49f12508..e53b94246 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -25,29 +25,21 @@ import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.enums.EntityStatus;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.enums.GroupState;
import org.apache.inlong.manager.common.enums.SinkType;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.sink.SinkApproveDTO;
import org.apache.inlong.manager.common.pojo.sink.SinkBriefResponse;
import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
import org.apache.inlong.manager.common.pojo.sink.SinkPageRequest;
import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
-import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
import org.apache.inlong.manager.service.CommonOperateService;
-import org.apache.inlong.manager.service.workflow.ProcessName;
-import org.apache.inlong.manager.service.workflow.WorkflowService;
-import org.apache.inlong.manager.service.workflow.stream.CreateStreamWorkflowDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -89,8 +81,6 @@ public class StreamSinkServiceImpl implements StreamSinkService {
private StreamSinkEntityMapper sinkMapper;
@Autowired
private StreamSinkFieldEntityMapper sinkFieldMapper;
- @Autowired
- private WorkflowService workflowService;
@Override
@Transactional(rollbackFor = Throwable.class)
@@ -114,9 +104,9 @@ public class StreamSinkServiceImpl implements StreamSinkService {
// If the inlong group status is [Configuration Successful], then asynchronously initiate
// the [Single inlong stream Resource Creation] workflow
- if (GroupState.CONFIG_SUCCESSFUL.getCode().equals(groupEntity.getStatus())) {
- executorService.execute(new WorkflowStartRunnable(operator, groupEntity, streamId));
- }
+// if (GroupState.CONFIG_SUCCESSFUL.getCode().equals(groupEntity.getStatus())) {
+// executorService.execute(new WorkflowStartRunnable(operator, groupEntity, streamId));
+// }
LOGGER.info("success to save sink info: {}", request);
return id;
@@ -139,7 +129,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
@Override
public List<SinkResponse> listSink(String groupId, String streamId) {
- Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
+ Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
List<StreamSinkEntity> entityList = sinkMapper.selectByRelatedId(groupId, streamId);
if (CollectionUtils.isEmpty(entityList)) {
return Collections.emptyList();
@@ -153,8 +143,8 @@ public class StreamSinkServiceImpl implements StreamSinkService {
@Override
public List<SinkBriefResponse> listBrief(String groupId, String streamId) {
- Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
- Preconditions.checkNotNull(streamId, Constant.STREAM_ID_IS_EMPTY);
+ Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+ Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
// Query all sink information and encapsulate it in the result set
List<SinkBriefResponse> summaryList = sinkMapper.selectSummary(groupId, streamId);
@@ -165,7 +155,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
@Override
public PageInfo<? extends SinkListResponse> listByCondition(SinkPageRequest request) {
- Preconditions.checkNotNull(request.getInlongGroupId(), Constant.GROUP_ID_IS_EMPTY);
+ Preconditions.checkNotNull(request.getInlongGroupId(), ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
PageHelper.startPage(request.getPageNum(), request.getPageSize());
List<StreamSinkEntity> entityPage = sinkMapper.selectByCondition(request);
Map<SinkType, Page<StreamSinkEntity>> sinkMap = Maps.newHashMap();
@@ -192,7 +182,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
public Boolean update(SinkRequest request, String operator) {
LOGGER.info("begin to update sink info: {}", request);
this.checkParams(request);
- Preconditions.checkNotNull(request.getId(), Constant.ID_IS_EMPTY);
+ Preconditions.checkNotNull(request.getId(), ErrorCodeEnum.ID_IS_EMPTY.getMessage());
// Check if it can be modified
String groupId = request.getInlongGroupId();
@@ -206,9 +196,9 @@ public class StreamSinkServiceImpl implements StreamSinkService {
// The inlong group status is [Configuration successful], then asynchronously initiate
// the [Single inlong stream resource creation] workflow
- if (EntityStatus.GROUP_CONFIG_SUCCESSFUL.getCode().equals(groupEntity.getStatus())) {
- executorService.execute(new WorkflowStartRunnable(operator, groupEntity, streamId));
- }
+// if (EntityStatus.GROUP_CONFIG_SUCCESSFUL.getCode().equals(groupEntity.getStatus())) {
+// executorService.execute(new WorkflowStartRunnable(operator, groupEntity, streamId));
+// }
LOGGER.info("success to update sink info: {}", request);
return true;
}
@@ -228,7 +218,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
@Override
public Boolean delete(Integer id, String sinkType, String operator) {
LOGGER.info("begin to delete sink by id={}, sinkType={}", id, sinkType);
- Preconditions.checkNotNull(id, Constant.ID_IS_EMPTY);
+ Preconditions.checkNotNull(id, ErrorCodeEnum.ID_IS_EMPTY.getMessage());
// Preconditions.checkNotNull(sinkType, Constant.SINK_TYPE_IS_EMPTY);
StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(id);
@@ -251,8 +241,8 @@ public class StreamSinkServiceImpl implements StreamSinkService {
@Transactional(rollbackFor = Throwable.class)
public Boolean logicDeleteAll(String groupId, String streamId, String operator) {
LOGGER.info("begin to logic delete all sink info by groupId={}, streamId={}", groupId, streamId);
- Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
- Preconditions.checkNotNull(streamId, Constant.STREAM_ID_IS_EMPTY);
+ Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+ Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
// Check if it can be deleted
commonOperateService.checkGroupStatus(groupId, operator);
@@ -281,8 +271,8 @@ public class StreamSinkServiceImpl implements StreamSinkService {
@Transactional(rollbackFor = Throwable.class)
public Boolean deleteAll(String groupId, String streamId, String operator) {
LOGGER.info("begin to delete all sink by groupId={}, streamId={}", groupId, streamId);
- Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
- Preconditions.checkNotNull(streamId, Constant.STREAM_ID_IS_EMPTY);
+ Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+ Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
// Check if it can be deleted
commonOperateService.checkGroupStatus(groupId, operator);
@@ -333,7 +323,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
for (SinkApproveDTO dto : approveList) {
// According to the sink type, save sink information
String sinkType = dto.getSinkType();
- Preconditions.checkNotNull(sinkType, SinkType.SINK_TYPE_IS_EMPTY);
+ Preconditions.checkNotNull(sinkType, ErrorCodeEnum.SINK_TYPE_IS_NULL.getMessage());
StreamSinkEntity entity = new StreamSinkEntity();
entity.setId(dto.getId());
@@ -351,13 +341,15 @@ public class StreamSinkServiceImpl implements StreamSinkService {
}
private void checkParams(SinkRequest request) {
- Preconditions.checkNotNull(request, Constant.REQUEST_IS_EMPTY);
+ Preconditions.checkNotNull(request, ErrorCodeEnum.REQUEST_IS_EMPTY.getMessage());
String groupId = request.getInlongGroupId();
- Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
+ Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
String streamId = request.getInlongStreamId();
- Preconditions.checkNotNull(streamId, Constant.STREAM_ID_IS_EMPTY);
+ Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
String sinkType = request.getSinkType();
- Preconditions.checkNotNull(sinkType, SinkType.SINK_TYPE_IS_EMPTY);
+ Preconditions.checkNotNull(sinkType, ErrorCodeEnum.SINK_TYPE_IS_NULL.getMessage());
+ String sinkName = request.getSinkName();
+ Preconditions.checkNotNull(sinkName, ErrorCodeEnum.SINK_NAME_IS_NULL.getMessage());
}
/**
@@ -365,39 +357,39 @@ public class StreamSinkServiceImpl implements StreamSinkService {
*
* @see CreateStreamWorkflowDefinition
*/
- class WorkflowStartRunnable implements Runnable {
-
- private final String operator;
- private final InlongGroupEntity inlongGroupEntity;
- private final String streamId;
-
- public WorkflowStartRunnable(String operator, InlongGroupEntity inlongGroupEntity, String streamId) {
- this.operator = operator;
- this.inlongGroupEntity = inlongGroupEntity;
- this.streamId = streamId;
- }
-
- @Override
- public void run() {
- String groupId = inlongGroupEntity.getInlongGroupId();
- LOGGER.info("begin start inlong stream workflow for groupId={}, streamId={}", groupId, streamId);
-
- InlongGroupInfo groupInfo = CommonBeanUtils.copyProperties(inlongGroupEntity, InlongGroupInfo::new);
- GroupResourceProcessForm form = genGroupResourceProcessForm(groupInfo, streamId);
-
- workflowService.start(ProcessName.CREATE_STREAM_RESOURCE, operator, form);
- LOGGER.info("success start inlong stream workflow for groupId={}, streamId={}", groupId, streamId);
- }
-
- /**
- * Generate [Group Resource] form
- */
- private GroupResourceProcessForm genGroupResourceProcessForm(InlongGroupInfo groupInfo, String streamId) {
- GroupResourceProcessForm form = new GroupResourceProcessForm();
- form.setGroupInfo(groupInfo);
- form.setInlongStreamId(streamId);
- return form;
- }
- }
+// class WorkflowStartRunnable implements Runnable {
+//
+// private final String operator;
+// private final InlongGroupEntity inlongGroupEntity;
+// private final String streamId;
+//
+// public WorkflowStartRunnable(String operator, InlongGroupEntity inlongGroupEntity, String streamId) {
+// this.operator = operator;
+// this.inlongGroupEntity = inlongGroupEntity;
+// this.streamId = streamId;
+// }
+//
+// @Override
+// public void run() {
+// String groupId = inlongGroupEntity.getInlongGroupId();
+// LOGGER.info("begin start inlong stream workflow for groupId={}, streamId={}", groupId, streamId);
+//
+// InlongGroupInfo groupInfo = CommonBeanUtils.copyProperties(inlongGroupEntity, InlongGroupInfo::new);
+// GroupResourceProcessForm form = genGroupResourceProcessForm(groupInfo, streamId);
+//
+// workflowService.start(ProcessName.CREATE_STREAM_RESOURCE, operator, form);
+// LOGGER.info("success start inlong stream workflow for groupId={}, streamId={}", groupId, streamId);
+// }
+//
+// /**
+// * Generate [Group Resource] form
+// */
+// private GroupResourceProcessForm genGroupResourceProcessForm(InlongGroupInfo groupInfo, String streamId) {
+// GroupResourceProcessForm form = new GroupResourceProcessForm();
+// form.setGroupInfo(groupInfo);
+// form.setInlongStreamId(streamId);
+// return form;
+// }
+// }
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperation.java
index 6aff31f3c..142be98eb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperation.java
@@ -142,7 +142,7 @@ public class ClickHouseSinkOperation implements StreamSinkOperation {
Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
String existType = entity.getSinkType();
Preconditions.checkTrue(SinkType.SINK_CLICKHOUSE.equals(existType),
- String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_CLICKHOUSE, existType));
+ String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), SinkType.SINK_CLICKHOUSE, existType));
SinkResponse response = this.getFromEntity(entity, ClickHouseSinkResponse::new);
List<StreamSinkFieldEntity> entities = sinkFieldMapper.selectBySinkId(id);
@@ -162,7 +162,7 @@ public class ClickHouseSinkOperation implements StreamSinkOperation {
String existType = entity.getSinkType();
Preconditions.checkTrue(SinkType.SINK_CLICKHOUSE.equals(existType),
- String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_CLICKHOUSE, existType));
+ String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), SinkType.SINK_CLICKHOUSE, existType));
ClickHouseSinkDTO dto = ClickHouseSinkDTO.getFromJson(entity.getExtParams());
CommonBeanUtils.copyProperties(entity, result, true);
@@ -183,7 +183,7 @@ public class ClickHouseSinkOperation implements StreamSinkOperation {
public void updateOpt(SinkRequest request, String operator) {
String sinkType = request.getSinkType();
Preconditions.checkTrue(SinkType.SINK_CLICKHOUSE.equals(sinkType),
- String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_CLICKHOUSE, sinkType));
+ String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), SinkType.SINK_CLICKHOUSE, sinkType));
StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(request.getId());
Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveSinkOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveSinkOperation.java
index dc1b3aa09..c09ee5edf 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveSinkOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/hive/HiveSinkOperation.java
@@ -142,7 +142,7 @@ public class HiveSinkOperation implements StreamSinkOperation {
Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
String existType = entity.getSinkType();
Preconditions.checkTrue(SinkType.SINK_HIVE.equals(existType),
- String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_HIVE, existType));
+ String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), SinkType.SINK_HIVE, existType));
SinkResponse response = this.getFromEntity(entity, HiveSinkResponse::new);
List<StreamSinkFieldEntity> entities = sinkFieldMapper.selectBySinkId(id);
@@ -162,7 +162,7 @@ public class HiveSinkOperation implements StreamSinkOperation {
String existType = entity.getSinkType();
Preconditions.checkTrue(SinkType.SINK_HIVE.equals(existType),
- String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_HIVE, existType));
+ String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), SinkType.SINK_HIVE, existType));
HiveSinkDTO dto = HiveSinkDTO.getFromJson(entity.getExtParams());
CommonBeanUtils.copyProperties(entity, result, true);
@@ -183,7 +183,7 @@ public class HiveSinkOperation implements StreamSinkOperation {
public void updateOpt(SinkRequest request, String operator) {
String sinkType = request.getSinkType();
Preconditions.checkTrue(SinkType.SINK_HIVE.equals(sinkType),
- String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_HIVE, sinkType));
+ String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), SinkType.SINK_HIVE, sinkType));
StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(request.getId());
Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperation.java
index d09fa3ea3..9fd715491 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/iceberg/IcebergSinkOperation.java
@@ -136,7 +136,7 @@ public class IcebergSinkOperation implements StreamSinkOperation {
Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
String existType = entity.getSinkType();
Preconditions.checkTrue(SinkType.SINK_ICEBERG.equals(existType),
- String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_ICEBERG, existType));
+ String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), SinkType.SINK_ICEBERG, existType));
SinkResponse response = this.getFromEntity(entity, IcebergSinkResponse::new);
List<StreamSinkFieldEntity> entities = sinkFieldMapper.selectBySinkId(id);
@@ -156,7 +156,7 @@ public class IcebergSinkOperation implements StreamSinkOperation {
String existType = entity.getSinkType();
Preconditions.checkTrue(SinkType.SINK_ICEBERG.equals(existType),
- String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_ICEBERG, existType));
+ String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), SinkType.SINK_ICEBERG, existType));
IcebergSinkDTO dto = IcebergSinkDTO.getFromJson(entity.getExtParams());
CommonBeanUtils.copyProperties(entity, result, true);
@@ -177,7 +177,7 @@ public class IcebergSinkOperation implements StreamSinkOperation {
public void updateOpt(SinkRequest request, String operator) {
String sinkType = request.getSinkType();
Preconditions.checkTrue(SinkType.SINK_ICEBERG.equals(sinkType),
- String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_ICEBERG, sinkType));
+ String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), SinkType.SINK_ICEBERG, sinkType));
StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(request.getId());
Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperation.java
index e8a4a169c..e72251868 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperation.java
@@ -140,7 +140,7 @@ public class KafkaSinkOperation implements StreamSinkOperation {
Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
String existType = entity.getSinkType();
Preconditions.checkTrue(SinkType.SINK_KAFKA.equals(existType),
- String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_KAFKA, existType));
+ String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), SinkType.SINK_KAFKA, existType));
SinkResponse response = this.getFromEntity(entity, KafkaSinkResponse::new);
List<StreamSinkFieldEntity> entities = sinkFieldMapper.selectBySinkId(id);
List<SinkFieldResponse> infos = CommonBeanUtils.copyListProperties(entities, SinkFieldResponse::new);
@@ -156,7 +156,7 @@ public class KafkaSinkOperation implements StreamSinkOperation {
}
String existType = entity.getSinkType();
Preconditions.checkTrue(SinkType.SINK_KAFKA.equals(existType),
- String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_KAFKA, existType));
+ String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), SinkType.SINK_KAFKA, existType));
KafkaSinkDTO dto = KafkaSinkDTO.getFromJson(entity.getExtParams());
CommonBeanUtils.copyProperties(entity, result, true);
@@ -177,7 +177,7 @@ public class KafkaSinkOperation implements StreamSinkOperation {
public void updateOpt(SinkRequest request, String operator) {
String sinkType = request.getSinkType();
Preconditions.checkTrue(SinkType.SINK_KAFKA.equals(sinkType),
- String.format(SinkType.SINK_TYPE_NOT_SAME, SinkType.SINK_KAFKA, sinkType));
+ String.format(ErrorCodeEnum.SINK_TYPE_NOT_SAME.getMessage(), SinkType.SINK_KAFKA, sinkType));
StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(request.getId());
Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperation.java
index e99c00421..5d33e2581 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperation.java
@@ -18,18 +18,20 @@
package org.apache.inlong.manager.service.source;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupState;
import org.apache.inlong.manager.common.enums.SourceState;
-import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
import org.apache.inlong.manager.common.pojo.source.SourceResponse;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
+import org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StreamSourceFieldEntityMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -38,6 +40,7 @@ import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import javax.validation.constraints.NotNull;
+import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@@ -49,6 +52,8 @@ public abstract class AbstractSourceOperation implements StreamSourceOperation {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSourceOperation.class);
@Autowired
protected StreamSourceEntityMapper sourceMapper;
+ @Autowired
+ protected StreamSourceFieldEntityMapper sourceFieldMapper;
/**
* Setting the parameters of the latest entity.
@@ -91,15 +96,16 @@ public abstract class AbstractSourceOperation implements StreamSourceOperation {
} else {
entity.setStatus(SourceState.SOURCE_NEW.getCode());
}
- entity.setIsDeleted(Constant.UN_DELETED);
entity.setCreator(operator);
entity.setModifier(operator);
Date now = new Date();
entity.setCreateTime(now);
entity.setModifyTime(now);
+ entity.setIsDeleted(0);
// get the ext params
setTargetEntity(request, entity);
sourceMapper.insert(entity);
+ saveFieldOpt(entity, request.getFieldList());
return entity.getId();
}
@@ -110,8 +116,14 @@ public abstract class AbstractSourceOperation implements StreamSourceOperation {
Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
String existType = entity.getSourceType();
Preconditions.checkTrue(getSourceType().equals(existType),
- String.format(SourceType.SOURCE_TYPE_NOT_SAME, getSourceType(), existType));
- return this.getFromEntity(entity, this::getResponse);
+ String.format(ErrorCodeEnum.SOURCE_TYPE_NOT_SAME.getMessage(), getSourceType(), existType));
+
+ SourceResponse sourceResponse = this.getFromEntity(entity, this::getResponse);
+ List<StreamSourceFieldEntity> sourceFieldEntities = sourceFieldMapper.selectBySourceId(id);
+ List<InlongStreamFieldInfo> fieldInfos = CommonBeanUtils.copyListProperties(sourceFieldEntities,
+ InlongStreamFieldInfo::new);
+ sourceResponse.setFieldList(fieldInfos);
+ return sourceResponse;
}
@Override
@@ -129,6 +141,7 @@ public abstract class AbstractSourceOperation implements StreamSourceOperation {
entity.setModifier(operator);
entity.setModifyTime(new Date());
sourceMapper.updateByPrimaryKeySelective(entity);
+ updateFieldOpt(entity, request.getFieldList());
LOGGER.info("success to update source of type={}", request.getSourceType());
}
@@ -189,5 +202,50 @@ public abstract class AbstractSourceOperation implements StreamSourceOperation {
curEntity.setIsDeleted(id);
curEntity.setModifyTime(new Date());
sourceMapper.updateByPrimaryKeySelective(curEntity);
+ sourceFieldMapper.deleteAll(id);
+ LOGGER.info("success to delete source={}", request);
+ }
+
+ private void updateFieldOpt(StreamSourceEntity entity, List<InlongStreamFieldInfo> fieldInfos) {
+ Integer sourceId = entity.getId();
+ if (CollectionUtils.isEmpty(fieldInfos)) {
+ return;
+ }
+
+ // First physically delete the existing fields
+ sourceFieldMapper.deleteAll(sourceId);
+ // Then batch save the source fields
+ this.saveFieldOpt(entity, fieldInfos);
+
+ LOGGER.info("success to update field");
+ }
+
+ private void saveFieldOpt(StreamSourceEntity entity, List<InlongStreamFieldInfo> fieldInfos) {
+ LOGGER.info("begin to save field={}", fieldInfos);
+ if (CollectionUtils.isEmpty(fieldInfos)) {
+ return;
+ }
+
+ int size = fieldInfos.size();
+ List<StreamSourceFieldEntity> entityList = new ArrayList<>(size);
+ String groupId = entity.getInlongGroupId();
+ String streamId = entity.getInlongStreamId();
+ String sourceType = entity.getSourceType();
+ Integer sourceId = entity.getId();
+ for (InlongStreamFieldInfo fieldInfo : fieldInfos) {
+ StreamSourceFieldEntity fieldEntity = CommonBeanUtils.copyProperties(fieldInfo,
+ StreamSourceFieldEntity::new);
+ if (StringUtils.isEmpty(fieldEntity.getFieldComment())) {
+ fieldEntity.setFieldComment(fieldEntity.getFieldName());
+ }
+ fieldEntity.setInlongGroupId(groupId);
+ fieldEntity.setInlongStreamId(streamId);
+ fieldEntity.setSourceId(sourceId);
+ fieldEntity.setSourceType(sourceType);
+ entityList.add(fieldEntity);
+ }
+
+ sourceFieldMapper.insertAll(entityList);
+ LOGGER.info("success to save source fields");
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 76d9b9f5e..89e0db727 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -24,7 +24,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupState;
import org.apache.inlong.manager.common.enums.SourceState;
@@ -104,7 +103,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
@Override
public List<SourceResponse> listSource(String groupId, String streamId) {
- Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
+ Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
List<StreamSourceEntity> entityList = sourceMapper.selectByRelatedId(groupId, streamId, null);
if (CollectionUtils.isEmpty(entityList)) {
return Collections.emptyList();
@@ -117,7 +116,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
@Override
public PageInfo<? extends SourceListResponse> listByCondition(SourcePageRequest request) {
- Preconditions.checkNotNull(request.getInlongGroupId(), Constant.GROUP_ID_IS_EMPTY);
+ Preconditions.checkNotNull(request.getInlongGroupId(), ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
PageHelper.startPage(request.getPageNum(), request.getPageSize());
List<StreamSourceEntity> entityList = sourceMapper.selectByCondition(request);
@@ -146,7 +145,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
public boolean update(SourceRequest request, String operator) {
LOGGER.info("begin to update source info: {}", request);
this.checkParams(request);
- Preconditions.checkNotNull(request.getId(), Constant.ID_IS_EMPTY);
+ Preconditions.checkNotNull(request.getId(), ErrorCodeEnum.ID_IS_EMPTY.getMessage());
// Check if it can be modified
String groupId = request.getInlongGroupId();
@@ -175,7 +174,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
isolation = Isolation.READ_COMMITTED)
public boolean delete(Integer id, String sourceType, String operator) {
LOGGER.info("begin to delete source by id={}, sourceType={}", id, sourceType);
- Preconditions.checkNotNull(id, Constant.ID_IS_EMPTY);
+ Preconditions.checkNotNull(id, ErrorCodeEnum.ID_IS_EMPTY.getMessage());
StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
@@ -231,8 +230,8 @@ public class StreamSourceServiceImpl implements StreamSourceService {
isolation = Isolation.READ_COMMITTED)
public boolean logicDeleteAll(String groupId, String streamId, String operator) {
LOGGER.info("begin to logic delete all source info by groupId={}, streamId={}", groupId, streamId);
- Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
- Preconditions.checkNotNull(streamId, Constant.STREAM_ID_IS_EMPTY);
+ Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+ Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
// Check if it can be deleted
InlongGroupEntity groupEntity = commonOperateService.checkGroupStatus(groupId, operator);
@@ -266,8 +265,8 @@ public class StreamSourceServiceImpl implements StreamSourceService {
isolation = Isolation.READ_COMMITTED)
public boolean deleteAll(String groupId, String streamId, String operator) {
LOGGER.info("begin to delete all source by groupId={}, streamId={}", groupId, streamId);
- Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
- Preconditions.checkNotNull(streamId, Constant.STREAM_ID_IS_EMPTY);
+ Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+ Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
// Check if it can be deleted
commonOperateService.checkGroupStatus(groupId, operator);
@@ -289,13 +288,15 @@ public class StreamSourceServiceImpl implements StreamSourceService {
}
private void checkParams(SourceRequest request) {
- Preconditions.checkNotNull(request, Constant.REQUEST_IS_EMPTY);
+ Preconditions.checkNotNull(request, ErrorCodeEnum.REQUEST_IS_EMPTY.getMessage());
String groupId = request.getInlongGroupId();
- Preconditions.checkNotNull(groupId, Constant.GROUP_ID_IS_EMPTY);
+ Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
String streamId = request.getInlongStreamId();
- Preconditions.checkNotNull(streamId, Constant.STREAM_ID_IS_EMPTY);
+ Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
String sourceType = request.getSourceType();
- Preconditions.checkNotNull(sourceType, SourceType.SOURCE_TYPE_IS_EMPTY);
+ Preconditions.checkNotNull(sourceType, ErrorCodeEnum.SOURCE_TYPE_IS_NULL.getMessage());
+ String sourceName = request.getSourceName();
+ Preconditions.checkNotNull(sourceName, ErrorCodeEnum.SOURCE_NAME_IS_NULL.getMessage());
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/autopush/AutoPushSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/autopush/AutoPushSourceOperation.java
index cea98743f..af4028388 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/autopush/AutoPushSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/autopush/AutoPushSourceOperation.java
@@ -79,7 +79,7 @@ public class AutoPushSourceOperation extends AbstractSourceOperation {
}
String existType = entity.getSourceType();
Preconditions.checkTrue(getSourceType().equals(existType),
- String.format(SourceType.SOURCE_TYPE_NOT_SAME, getSourceType(), existType));
+ String.format(ErrorCodeEnum.SOURCE_TYPE_NOT_SAME.getMessage(), getSourceType(), existType));
AutoPushSourceDTO dto = AutoPushSourceDTO.getFromJson(entity.getExtParams());
CommonBeanUtils.copyProperties(entity, result, true);
CommonBeanUtils.copyProperties(dto, result, true);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperation.java
index 3f9c35923..68e06760f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperation.java
@@ -92,7 +92,7 @@ public class BinlogSourceOperation extends AbstractSourceOperation {
}
String existType = entity.getSourceType();
Preconditions.checkTrue(getSourceType().equals(existType),
- String.format(SourceType.SOURCE_TYPE_NOT_SAME, getSourceType(), existType));
+ String.format(ErrorCodeEnum.SOURCE_TYPE_NOT_SAME.getMessage(), getSourceType(), existType));
BinlogSourceDTO dto = BinlogSourceDTO.getFromJson(entity.getExtParams());
CommonBeanUtils.copyProperties(entity, result, true);
CommonBeanUtils.copyProperties(dto, result, true);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperation.java
index 46e24fcc1..79e078be1 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperation.java
@@ -90,7 +90,7 @@ public class FileSourceOperation extends AbstractSourceOperation {
}
String existType = entity.getSourceType();
Preconditions.checkTrue(getSourceType().equals(existType),
- String.format(SourceType.SOURCE_TYPE_NOT_SAME, getSourceType(), existType));
+ String.format(ErrorCodeEnum.SOURCE_TYPE_NOT_SAME.getMessage(), getSourceType(), existType));
FileSourceDTO dto = FileSourceDTO.getFromJson(entity.getExtParams());
CommonBeanUtils.copyProperties(entity, result, true);
CommonBeanUtils.copyProperties(dto, result, true);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperation.java
index 3fd771d74..3d1521ad8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperation.java
@@ -92,7 +92,7 @@ public class KafkaSourceOperation extends AbstractSourceOperation {
}
String existType = entity.getSourceType();
Preconditions.checkTrue(getSourceType().equals(existType),
- String.format(SourceType.SOURCE_TYPE_NOT_SAME, getSourceType(), existType));
+ String.format(ErrorCodeEnum.SOURCE_TYPE_NOT_SAME.getMessage(), getSourceType(), existType));
KafkaSourceDTO dto = KafkaSourceDTO.getFromJson(entity.getExtParams());
CommonBeanUtils.copyProperties(entity, result, true);
CommonBeanUtils.copyProperties(dto, result, true);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreateTubeGroupTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreateTubeGroupTaskListener.java
index a77d499eb..6133d4f50 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreateTubeGroupTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreateTubeGroupTaskListener.java
@@ -19,13 +19,13 @@ package org.apache.inlong.manager.service.thirdparty.mq;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.beans.ReTryConfigBean;
-import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest;
import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest.GroupNameJsonSetBean;
import org.apache.inlong.manager.common.pojo.tubemq.QueryTubeTopicRequest;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.inlong.manager.service.CommonOperateService;
import org.apache.inlong.manager.service.core.InlongGroupService;
import org.apache.inlong.manager.workflow.WorkflowContext;
@@ -64,7 +64,7 @@ public class CreateTubeGroupTaskListener implements QueueOperateListener {
InlongGroupInfo groupInfo = groupService.get(groupId);
String topicName = groupInfo.getMqResourceObj();
- int clusterId = Integer.parseInt(commonOperateService.getSpecifiedParam(Constant.CLUSTER_TUBE_CLUSTER_ID));
+ int clusterId = Integer.parseInt(commonOperateService.getSpecifiedParam(InlongGroupSettings.TUBE_CLUSTER_ID));
QueryTubeTopicRequest queryTubeTopicRequest = QueryTubeTopicRequest.builder()
.topicName(topicName).clusterId(clusterId)
.user(groupInfo.getCreator()).build();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/TubeMqOptService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/TubeMqOptService.java
index 124c30a32..e800fa0a1 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/TubeMqOptService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/TubeMqOptService.java
@@ -20,13 +20,13 @@ package org.apache.inlong.manager.service.thirdparty.mq;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest;
import org.apache.inlong.manager.common.pojo.tubemq.AddTubeMqTopicRequest;
import org.apache.inlong.manager.common.pojo.tubemq.QueryTubeTopicRequest;
import org.apache.inlong.manager.common.pojo.tubemq.TubeManagerResponse;
+import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.inlong.manager.common.util.HttpUtils;
import org.apache.inlong.manager.service.CommonOperateService;
import org.springframework.beans.factory.annotation.Autowired;
@@ -57,13 +57,13 @@ public class TubeMqOptService {
throw new Exception("topic cannot be empty");
}
AddTubeMqTopicRequest.AddTopicTasksBean addTopicTasksBean = request.getAddTopicTasks().get(0);
- String clusterIdStr = commonOperateService.getSpecifiedParam(Constant.CLUSTER_TUBE_CLUSTER_ID);
+ String clusterIdStr = commonOperateService.getSpecifiedParam(InlongGroupSettings.TUBE_CLUSTER_ID);
int clusterId = Integer.parseInt(clusterIdStr);
QueryTubeTopicRequest topicRequest = QueryTubeTopicRequest.builder()
.topicName(addTopicTasksBean.getTopicName()).clusterId(clusterId)
.user(request.getUser()).build();
- String tubeManager = commonOperateService.getSpecifiedParam(Constant.CLUSTER_TUBE_MANAGER);
+ String tubeManager = commonOperateService.getSpecifiedParam(InlongGroupSettings.TUBE_MANAGER_URL);
TubeManagerResponse response = httpUtils
.request(tubeManager + "/v1/topic?method=queryCanWrite", HttpMethod.POST,
GSON.toJson(topicRequest), httpHeaders, TubeManagerResponse.class);
@@ -91,7 +91,7 @@ public class TubeMqOptService {
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.add("Content-Type", "application/json");
try {
- String tubeManager = commonOperateService.getSpecifiedParam(Constant.CLUSTER_TUBE_MANAGER);
+ String tubeManager = commonOperateService.getSpecifiedParam(InlongGroupSettings.TUBE_MANAGER_URL);
log.info("create tube consumer group {} on {} ", GSON.toJson(request),
tubeManager + "/v1/task?method=addTopicTask");
TubeManagerResponse rsp = httpUtils.request(tubeManager + "/v1/group?method=add",
@@ -113,7 +113,7 @@ public class TubeMqOptService {
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.add("Content-Type", "application/json");
try {
- String tubeManager = commonOperateService.getSpecifiedParam(Constant.CLUSTER_TUBE_MANAGER);
+ String tubeManager = commonOperateService.getSpecifiedParam(InlongGroupSettings.TUBE_MANAGER_URL);
TubeManagerResponse response = httpUtils.request(tubeManager + "/v1/topic?method=queryCanWrite",
HttpMethod.POST, GSON.toJson(queryTubeTopicRequest), httpHeaders, TubeManagerResponse.class);
if (response.getErrCode() == 0) { // topic already exists
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
index 9b7afd472..5f03986b3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
@@ -31,8 +31,8 @@ import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.UpdateGroupProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.UpdateGroupProcessForm.OperateType;
import org.apache.inlong.manager.common.settings.InlongGroupSettings;
-import org.apache.inlong.manager.service.CommonOperateService;
import org.apache.inlong.manager.service.sink.StreamSinkService;
+import org.apache.inlong.manager.service.thirdparty.sort.util.DataFlowUtils;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
@@ -56,10 +56,10 @@ public class CreateSortConfigListener implements SortOperateListener {
private static final Logger LOGGER = LoggerFactory.getLogger(CreateSortConfigListener.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); // thread safe
- @Autowired
- private CommonOperateService commonOperateService;
@Autowired
private StreamSinkService streamSinkService;
+ @Autowired
+ private DataFlowUtils dataFlowUtils;
@Override
public TaskEvent event() {
@@ -93,7 +93,7 @@ public class CreateSortConfigListener implements SortOperateListener {
try {
// TODO Support more than one sinks under a stream
Map<String, DataFlowInfo> dataFlowInfoMap = sinkResponseList.stream().map(sink -> {
- DataFlowInfo flowInfo = commonOperateService.createDataFlow(groupInfo, sink);
+ DataFlowInfo flowInfo = dataFlowUtils.createDataFlow(groupInfo, sink);
return Pair.of(sink.getInlongStreamId(), flowInfo);
}
).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/PushSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/PushSortConfigListener.java
index 16ccb9d05..334427afc 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/PushSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/PushSortConfigListener.java
@@ -27,10 +27,10 @@ import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
-import org.apache.inlong.manager.service.CommonOperateService;
import org.apache.inlong.manager.service.core.InlongGroupService;
import org.apache.inlong.manager.service.core.InlongStreamService;
import org.apache.inlong.manager.service.sink.StreamSinkService;
+import org.apache.inlong.manager.service.thirdparty.sort.util.DataFlowUtils;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
@@ -55,8 +55,6 @@ public class PushSortConfigListener implements SortOperateListener {
private static final Logger LOGGER = LoggerFactory.getLogger(PushSortConfigListener.class);
- @Autowired
- private CommonOperateService commonOperateService;
@Autowired
private ClusterBean clusterBean;
@Autowired
@@ -65,6 +63,8 @@ public class PushSortConfigListener implements SortOperateListener {
private InlongStreamService streamService;
@Autowired
private StreamSinkService streamSinkService;
+ @Autowired
+ private DataFlowUtils dataFlowUtils;
@Override
public TaskEvent event() {
@@ -94,7 +94,7 @@ public class PushSortConfigListener implements SortOperateListener {
LOGGER.debug("sink info: {}", sinkResponse);
}
- DataFlowInfo dataFlowInfo = commonOperateService.createDataFlow(groupInfo, sinkResponse);
+ DataFlowInfo dataFlowInfo = dataFlowUtils.createDataFlow(groupInfo, sinkResponse);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("try to push config to sort: {}", JsonUtils.toJson(dataFlowInfo));
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/DataFlowUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/DataFlowUtils.java
new file mode 100644
index 000000000..9108afd43
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/DataFlowUtils.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.thirdparty.sort.util;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
+import org.apache.inlong.manager.common.beans.ClusterBean;
+import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
+import org.apache.inlong.manager.common.pojo.source.SourceResponse;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.settings.InlongGroupSettings;
+import org.apache.inlong.manager.service.CommonOperateService;
+import org.apache.inlong.manager.service.core.InlongStreamService;
+import org.apache.inlong.manager.service.source.StreamSourceService;
+import org.apache.inlong.sort.protocol.DataFlowInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.sink.SinkInfo;
+import org.apache.inlong.sort.protocol.source.SourceInfo;
+import org.apache.inlong.sort.protocol.transformation.FieldMappingRule;
+import org.apache.inlong.sort.protocol.transformation.FieldMappingRule.FieldMappingUnit;
+import org.apache.inlong.sort.protocol.transformation.TransformationInfo;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Service
+public class DataFlowUtils {
+
+ @Autowired
+ private ClusterBean clusterBean;
+ @Autowired
+ private CommonOperateService commonOperateService;
+ @Autowired
+ private StreamSourceService streamSourceService;
+ @Autowired
+ private InlongStreamService streamService;
+
+ /**
+ * Create dataflow info for sort.
+ */
+ public DataFlowInfo createDataFlow(InlongGroupInfo groupInfo, SinkResponse sinkResponse) {
+ String groupId = sinkResponse.getInlongGroupId();
+ String streamId = sinkResponse.getInlongStreamId();
+ List<SourceResponse> sourceList = streamSourceService.listSource(groupId, streamId);
+ if (CollectionUtils.isEmpty(sourceList)) {
+ throw new WorkflowListenerException(String.format("Source not found by groupId=%s and streamId=%s",
+ groupId, streamId));
+ }
+
+ // Get all field info
+ List<FieldInfo> sourceFields = new ArrayList<>();
+ List<FieldInfo> sinkFields = new ArrayList<>();
+
+ // TODO Support more than one source and one sink
+ final SourceResponse sourceResponse = sourceList.get(0);
+ boolean isAllMigration = SourceInfoUtils.isBinlogAllMigration(sourceResponse);
+
+ List<FieldMappingUnit> mappingUnitList;
+ InlongStreamInfo streamInfo = streamService.get(groupId, streamId);
+ if (isAllMigration) {
+ mappingUnitList = FieldInfoUtils.setAllMigrationFieldMapping(sourceFields, sinkFields);
+ } else {
+ mappingUnitList = FieldInfoUtils.createFieldInfo(streamInfo.getFieldList(),
+ sinkResponse.getFieldList(), sourceFields, sinkFields);
+ }
+
+ FieldMappingRule fieldMappingRule = new FieldMappingRule(mappingUnitList.toArray(new FieldMappingUnit[0]));
+
+ // Get source info
+ String masterAddress = commonOperateService.getSpecifiedParam(InlongGroupSettings.TUBE_MASTER_URL);
+ PulsarClusterInfo pulsarCluster = commonOperateService.getPulsarClusterInfo(groupInfo.getMiddlewareType());
+ SourceInfo sourceInfo = SourceInfoUtils.createSourceInfo(pulsarCluster, masterAddress, clusterBean,
+ groupInfo, streamInfo, sourceResponse, sourceFields);
+
+ // Get sink info
+ SinkInfo sinkInfo = SinkInfoUtils.createSinkInfo(sourceResponse, sinkResponse, sinkFields);
+
+ // Get transformation info
+ TransformationInfo transInfo = new TransformationInfo(fieldMappingRule);
+
+ // Get properties
+ Map<String, Object> properties = new HashMap<>();
+ if (MapUtils.isNotEmpty(sinkResponse.getProperties())) {
+ properties.putAll(sinkResponse.getProperties());
+ }
+ properties.put(InlongGroupSettings.DATA_FLOW_GROUP_ID_KEY, groupId);
+
+ return new DataFlowInfo(sinkResponse.getId(), sourceInfo, transInfo, sinkInfo, properties);
+ }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformService.java
new file mode 100644
index 000000000..245bef130
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformService.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.transform;
+
+import org.apache.inlong.manager.common.pojo.transform.TransformRequest;
+import org.apache.inlong.manager.common.pojo.transform.TransformResponse;
+
+import java.util.List;
+
+/**
+ * Service layer interface for stream transform
+ */
+public interface StreamTransformService {
+
+ /**
+ * Save the transform information.
+ *
+ * @param transformRequest
+ * @param operator
+ * @return transform id after saving
+ */
+ Integer save(TransformRequest transformRequest, String operator);
+
+ /**
+ * Query transform information based on inlong group id and inlong stream id.
+ *
+ * @param groupId
+ * @param streamId
+ * @return TransformResponse
+ */
+ List<TransformResponse> listTransform(String groupId, String streamId);
+
+ /**
+ * Modify data transform information.
+ *
+ * @param transformRequest
+ * @param operator
+ * @return Whether succeed
+ */
+ boolean update(TransformRequest transformRequest, String operator);
+
+ /**
+ * Delete the stream transform by the given id.
+ *
+ * @param id
+ * @param operator
+ * @return Whether succeed
+ */
+ boolean delete(String groupId, String streamId, String transformName, String operator);
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
new file mode 100644
index 000000000..8c1082e6a
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.transform;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.transform.TransformRequest;
+import org.apache.inlong.manager.common.pojo.transform.TransformResponse;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.StreamTransformEntity;
+import org.apache.inlong.manager.dao.mapper.StreamTransformEntityMapper;
+import org.apache.inlong.manager.service.CommonOperateService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Propagation;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Implementation of transform service interface
+ */
+@Service
+@Slf4j
+public class StreamTransformServiceImpl implements StreamTransformService {
+
+ @Autowired
+ protected StreamTransformEntityMapper transformEntityMapper;
+ @Autowired
+ protected CommonOperateService commonOperateService;
+
+ @Override
+ @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW)
+ public Integer save(TransformRequest transformRequest, String operator) {
+ log.info("begin to save transform info: {}", transformRequest);
+ this.checkParams(transformRequest);
+
+ // Check if can be added
+ final String groupId = transformRequest.getInlongGroupId();
+ final String streamId = transformRequest.getInlongStreamId();
+ final String transformName = transformRequest.getTransformName();
+ commonOperateService.checkGroupStatus(groupId, operator);
+
+ List<StreamTransformEntity> transformEntities = transformEntityMapper.selectByRelatedId(groupId,
+ streamId, transformName);
+ if (CollectionUtils.isNotEmpty(transformEntities)) {
+ String err = "stream transform already exists with groupId=%s, streamId=%s, transformName=%s";
+ throw new BusinessException(String.format(err, groupId, streamId, transformName));
+ }
+ StreamTransformEntity transformEntity = CommonBeanUtils.copyProperties(transformRequest,
+ StreamTransformEntity::new);
+ transformEntity.setCreator(operator);
+ transformEntity.setModifier(operator);
+ Date now = new Date();
+ transformEntity.setCreateTime(now);
+ transformEntity.setModifyTime(now);
+ transformEntityMapper.insertSelective(transformEntity);
+ return transformEntity.getId();
+ }
+
+ @Override
+ public List<TransformResponse> listTransform(String groupId, String streamId) {
+ log.info("begin to fetch transform info by groupId={} and streamId={} ", groupId, streamId);
+ Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+ List<StreamTransformEntity> transformEntities = transformEntityMapper.selectByRelatedId(groupId, streamId,
+ null);
+ if (CollectionUtils.isEmpty(transformEntities)) {
+ return Collections.emptyList();
+ }
+ List<TransformResponse> transformResponses = transformEntities.stream()
+ .map(transformEntity -> CommonBeanUtils.copyProperties(transformEntity,
+ TransformResponse::new))
+ .collect(Collectors.toList());
+ return transformResponses;
+ }
+
+ @Override
+ @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW)
+ public boolean update(TransformRequest transformRequest, String operator) {
+ log.info("begin to update transform info: {}", transformRequest);
+ this.checkParams(transformRequest);
+ // Check if can be modified
+ String groupId = transformRequest.getInlongGroupId();
+ commonOperateService.checkGroupStatus(groupId, operator);
+ Preconditions.checkNotNull(transformRequest.getId(), ErrorCodeEnum.ID_IS_EMPTY.getMessage());
+ StreamTransformEntity transformEntity = CommonBeanUtils.copyProperties(transformRequest,
+ StreamTransformEntity::new);
+ transformEntity.setModifier(operator);
+ transformEntity.setVersion(transformEntity.getVersion() + 1);
+ Date now = new Date();
+ transformEntity.setModifyTime(now);
+ return transformEntityMapper.updateByIdSelective(transformEntity) == transformEntity.getId();
+ }
+
+ @Override
+ @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW)
+ public boolean delete(String groupId, String streamId, String transformName, String operator) {
+ log.info("begin to delete source by groupId={} streamId={}, transformName={}", groupId, streamId,
+ transformName);
+ Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+ Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
+ commonOperateService.checkGroupStatus(groupId, operator);
+ Date now = new Date();
+ List<StreamTransformEntity> entityList = transformEntityMapper.selectByRelatedId(groupId, streamId,
+ transformName);
+ if (CollectionUtils.isNotEmpty(entityList)) {
+ for (StreamTransformEntity entity : entityList) {
+ Integer id = entity.getId();
+ entity.setVersion(entity.getVersion() + 1);
+ entity.setIsDeleted(id);
+ entity.setModifier(operator);
+ entity.setModifyTime(now);
+ transformEntityMapper.updateByIdSelective(entity);
+ }
+ }
+ log.info("success to logic delete transform by groupId={}, streamId={}, transformName={}", groupId, streamId,
+ transformName);
+ return true;
+ }
+
+ private void checkParams(TransformRequest request) {
+ Preconditions.checkNotNull(request, ErrorCodeEnum.REQUEST_IS_EMPTY.getMessage());
+ String groupId = request.getInlongGroupId();
+ Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
+ String streamId = request.getInlongStreamId();
+ Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
+ String transformType = request.getTransformType();
+ Preconditions.checkNotNull(transformType, ErrorCodeEnum.TRANSFORM_TYPE_IS_NULL.getMessage());
+ String transformName = request.getTransformName();
+ Preconditions.checkNotNull(transformName, ErrorCodeEnum.TRANSFORM_NAME_IS_NULL.getMessage());
+ }
+}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java
index 5022d48d4..2a656d8fe 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java
@@ -25,8 +25,8 @@ import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest(classes = ServiceBaseTest.class)
public class ServiceBaseTest extends BaseTest {
- public final String globalGroupId = "b_group1";
- public final String globalStreamId = "stream1";
- public final String globalOperator = "admin";
+ public static final String GLOBAL_GROUP_ID = "b_group1";
+ public static final String GLOBAL_STREAM_ID = "stream1";
+ public static final String GLOBAL_OPERATOR = "admin";
}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
index e387afef2..48946c2af 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java
@@ -44,14 +44,14 @@ public class AgentServiceTest extends ServiceBaseTest {
private InlongStreamServiceTest streamServiceTest;
public Integer saveSource() {
- streamServiceTest.saveInlongStream(globalGroupId, globalStreamId, globalOperator);
+ streamServiceTest.saveInlongStream(GLOBAL_GROUP_ID, GLOBAL_STREAM_ID, GLOBAL_OPERATOR);
BinlogSourceRequest sourceInfo = new BinlogSourceRequest();
- sourceInfo.setInlongGroupId(globalGroupId);
- sourceInfo.setInlongStreamId(globalStreamId);
+ sourceInfo.setInlongGroupId(GLOBAL_GROUP_ID);
+ sourceInfo.setInlongStreamId(GLOBAL_STREAM_ID);
sourceInfo.setSourceType(SourceType.BINLOG.getType());
- return sourceService.save(sourceInfo, globalOperator);
+ return sourceService.save(sourceInfo, GLOBAL_OPERATOR);
}
@Test
@@ -70,7 +70,7 @@ public class AgentServiceTest extends ServiceBaseTest {
Boolean result = agentService.reportSnapshot(request);
Assert.assertTrue(result);
- sourceService.delete(id, SourceType.BINLOG.getType(), globalOperator);
+ sourceService.delete(id, SourceType.BINLOG.getType(), GLOBAL_OPERATOR);
}
}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongClusterServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongClusterServiceTest.java
index c18304795..64394c678 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongClusterServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongClusterServiceTest.java
@@ -17,9 +17,9 @@
package org.apache.inlong.manager.service.core.impl;
-import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.pojo.cluster.ClusterRequest;
import org.apache.inlong.manager.common.pojo.dataproxy.DataProxyResponse;
+import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.inlong.manager.service.ServiceBaseTest;
import org.apache.inlong.manager.service.core.ThirdPartyClusterService;
import org.junit.Assert;
@@ -46,17 +46,17 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
request.setType(type);
request.setIp(ip);
request.setPort(port);
- request.setInCharges(globalOperator);
- return clusterService.save(request, globalOperator);
+ request.setInCharges(GLOBAL_OPERATOR);
+ return clusterService.save(request, GLOBAL_OPERATOR);
}
public Boolean deleteOpt(Integer id) {
- return clusterService.delete(id, globalOperator);
+ return clusterService.delete(id, GLOBAL_OPERATOR);
}
@Test
public void testSaveAndDelete() {
- Integer id = this.saveOpt(CLUSTER_NAME, Constant.CLUSTER_DATA_PROXY, CLUSTER_IP, CLUSTER_PORT);
+ Integer id = this.saveOpt(CLUSTER_NAME, InlongGroupSettings.CLUSTER_DATA_PROXY, CLUSTER_IP, CLUSTER_PORT);
Assert.assertNotNull(id);
Boolean success = this.deleteOpt(id);
@@ -69,7 +69,7 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
Integer p1 = 46800;
Integer p2 = 46801;
String url = "127.0.0.1:" + p1 + ",127.0.0.2";
- Integer id = this.saveOpt(CLUSTER_NAME, Constant.CLUSTER_DATA_PROXY, url, p2);
+ Integer id = this.saveOpt(CLUSTER_NAME, InlongGroupSettings.CLUSTER_DATA_PROXY, url, p2);
Assert.assertNotNull(id);
// Get the data proxy cluster ip list, the first port should is p1, second port is p2
@@ -86,7 +86,7 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
// Simulate saving and parsing dirty url without port, default port is p1
Integer p1 = 46801;
String url = ":,,, :127.0 .0.1:,: ,,,";
- Integer id = this.saveOpt(CLUSTER_NAME, Constant.CLUSTER_DATA_PROXY, url, p1);
+ Integer id = this.saveOpt(CLUSTER_NAME, InlongGroupSettings.CLUSTER_DATA_PROXY, url, p1);
List<DataProxyResponse> ipList = clusterService.getIpList(CLUSTER_NAME);
// The result port is p1
Assert.assertEquals(p1, ipList.get(0).getPort());
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/transform/StreamTransformServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/transform/StreamTransformServiceTest.java
new file mode 100644
index 000000000..a03fbf0e6
--- /dev/null
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/transform/StreamTransformServiceTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.transform;
+
+import org.apache.inlong.manager.common.enums.TransformType;
+import org.apache.inlong.manager.common.pojo.transform.TransformRequest;
+import org.apache.inlong.manager.common.pojo.transform.TransformResponse;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.StreamTransformEntity;
+import org.apache.inlong.manager.dao.mapper.StreamTransformEntityMapper;
+import org.apache.inlong.manager.service.ServiceBaseTest;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+import springfox.boot.starter.autoconfigure.OpenApiAutoConfiguration;
+
+import java.util.Date;
+import java.util.List;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@EnableAutoConfiguration(exclude = OpenApiAutoConfiguration.class)
+public class StreamTransformServiceTest extends ServiceBaseTest {
+
+ public static final String TRANSFORM_NAME = "test_transform";
+
+ @Autowired
+ protected StreamTransformService streamTransformService;
+ @Autowired
+ protected StreamTransformEntityMapper transformEntityMapper;
+
+ @Test
+ public void testSaveStreamTransform() {
+ TransformRequest transformRequest = new TransformRequest();
+ transformRequest.setTransformName(TRANSFORM_NAME);
+ transformRequest.setTransformType(TransformType.FILTER.getType());
+ transformRequest.setTransformDefinition("{}");
+ transformRequest.setInlongStreamId(GLOBAL_STREAM_ID);
+ transformRequest.setInlongGroupId(GLOBAL_GROUP_ID);
+ StreamTransformEntity transformEntity = CommonBeanUtils.copyProperties(transformRequest,
+ StreamTransformEntity::new);
+ transformEntity.setCreator(GLOBAL_OPERATOR);
+ transformEntity.setModifier(GLOBAL_OPERATOR);
+ Date now = new Date();
+ transformEntity.setCreateTime(now);
+ transformEntity.setModifyTime(now);
+ int index = transformEntityMapper.insertSelective(transformEntity);
+ Assert.assertTrue(index == 1);
+
+ List<TransformResponse> transformResponses = streamTransformService.listTransform(GLOBAL_GROUP_ID,
+ GLOBAL_STREAM_ID);
+ Assert.assertTrue(transformResponses.size() == 1);
+
+ }
+
+}
diff --git a/inlong-manager/manager-test/src/main/resources/application-test.properties b/inlong-manager/manager-test/src/main/resources/application-test.properties
index 26a274dd7..471175492 100644
--- a/inlong-manager/manager-test/src/main/resources/application-test.properties
+++ b/inlong-manager/manager-test/src/main/resources/application-test.properties
@@ -100,3 +100,4 @@ common.http-client.validateAfterInactivity=5000
common.http-client.connectionTimeout=3000
common.http-client.readTimeout=10000
common.http-client.connectionRequestTimeout=3000
+spring.main.allow-circular-references=true
\ No newline at end of file
diff --git a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
index 4322e502f..a2b05e03c 100644
--- a/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/sql/apache_inlong_manager.sql
@@ -477,6 +477,29 @@ CREATE TABLE `stream_source`
KEY `source_agent_ip_idx` (`agent_ip`, `is_deleted`)
);
+-- ----------------------------
+-- Table structure for stream_transform
+-- ----------------------------
+CREATE TABLE `stream_transform`
+(
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+ `inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id',
+ `inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id',
+ `transform_name` varchar(128) NOT NULL COMMENT 'Transform name, unique in one stream',
+ `transform_type` varchar(20) NOT NULL COMMENT 'Transform type, including: splitter, filter, joiner, etc.',
+ `pre_node_names` text NOT NULL COMMENT 'Pre node names of transform in this stream',
+ `post_node_names` text COMMENT 'Post node names of transform in this stream',
+ `transform_definition` text NOT NULL COMMENT 'Transform definition in json type',
+ `version` int(11) NOT NULL DEFAULT '1' COMMENT 'Stream transform version',
+ `is_deleted` int(11) NOT NULL DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+ `creator` varchar(64) NOT NULL COMMENT 'Creator name',
+ `modifier` varchar(64) DEFAULT '' COMMENT 'Modifier name',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `unique_transform_name` (`inlong_group_id`, `inlong_stream_id`, `transform_name`, `is_deleted`) USING BTREE
+);
+
-- ----------------------------
-- Table structure for stream_sink
-- ----------------------------
@@ -520,6 +543,31 @@ CREATE TABLE `stream_sink_ext`
KEY `index_sink_id` (`sink_id`)
);
+-- ----------------------------
+-- Table structure for stream_source_field
+-- ----------------------------
+DROP TABLE IF EXISTS `stream_source_field`;
+CREATE TABLE `stream_source_field`
+(
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+ `inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id',
+ `inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id',
+ `source_id` int(11) NOT NULL COMMENT 'Sink id',
+ `source_type` varchar(15) NOT NULL COMMENT 'Sink type',
+ `field_name` varchar(20) NOT NULL COMMENT 'field name',
+ `field_value` varchar(128) DEFAULT NULL COMMENT 'Field value, required if it is a predefined field',
+ `pre_expression` varchar(256) DEFAULT NULL COMMENT 'Pre-defined field value expression',
+ `field_type` varchar(20) NOT NULL COMMENT 'field type',
+ `field_comment` varchar(50) DEFAULT NULL COMMENT 'Field description',
+ `is_meta_field` smallint(3) DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
+ `field_format` varchar(50) DEFAULT NULL COMMENT 'Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such as yyyy-MM-dd HH:mm:ss',
+ `rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
+ `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+ PRIMARY KEY (`id`),
+ KEY `index_source_id` (`source_id`)
+) ENGINE = InnoDB
+ DEFAULT CHARSET = utf8mb4 COMMENT ='Stream source field table';
+
-- ----------------------------
-- Table structure for stream_sink_field
-- ----------------------------
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 0a2f70b1d..8c55cc281 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -502,6 +502,30 @@ CREATE TABLE `stream_source`
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Stream source table';
+-- ----------------------------
+-- Table structure for stream_transform
+-- ----------------------------
+CREATE TABLE `stream_transform`
+(
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+ `inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id',
+ `inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id',
+ `transform_name` varchar(128) NOT NULL COMMENT 'Transform name, unique in one stream',
+ `transform_type` varchar(20) NOT NULL COMMENT 'Transform type, including: splitter, filter, joiner, etc.',
+ `pre_node_names` text NOT NULL COMMENT 'Pre node names of transform in this stream',
+ `post_node_names` text COMMENT 'Post node names of transform in this stream',
+ `transform_definition` text NOT NULL COMMENT 'Transform definition in json type',
+ `version` int(11) NOT NULL DEFAULT '1' COMMENT 'Stream transform version',
+ `is_deleted` int(11) NOT NULL DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+ `creator` varchar(64) NOT NULL COMMENT 'Creator name',
+ `modifier` varchar(64) DEFAULT '' COMMENT 'Modifier name',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `unique_transform_name` (`inlong_group_id`, `inlong_stream_id`, `transform_name`, `is_deleted`) USING BTREE
+) ENGINE = InnoDB
+ DEFAULT CHARSET = utf8mb4 COMMENT ='Stream transform table';
+
-- ----------------------------
-- Table structure for stream_sink
-- ----------------------------
@@ -547,6 +571,31 @@ CREATE TABLE `stream_sink_ext`
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Stream sink extension table';
+-- ----------------------------
+-- Table structure for stream_source_field
+-- ----------------------------
+DROP TABLE IF EXISTS `stream_source_field`;
+CREATE TABLE `stream_source_field`
+(
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+ `inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id',
+ `inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id',
+ `source_id` int(11) NOT NULL COMMENT 'Sink id',
+ `source_type` varchar(15) NOT NULL COMMENT 'Sink type',
+ `field_name` varchar(20) NOT NULL COMMENT 'field name',
+ `field_value` varchar(128) DEFAULT NULL COMMENT 'Field value, required if it is a predefined field',
+ `pre_expression` varchar(256) DEFAULT NULL COMMENT 'Pre-defined field value expression',
+ `field_type` varchar(20) NOT NULL COMMENT 'field type',
+ `field_comment` varchar(50) DEFAULT NULL COMMENT 'Field description',
+ `is_meta_field` smallint(3) DEFAULT '0' COMMENT 'Is this field a meta field? 0: no, 1: yes',
+ `field_format` varchar(50) DEFAULT NULL COMMENT 'Field format, including: MICROSECONDS, MILLISECONDS, SECONDS, SQL, ISO_8601 and custom such as yyyy-MM-dd HH:mm:ss',
+ `rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
+ `is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+ PRIMARY KEY (`id`),
+ KEY `index_source_id` (`source_id`)
+) ENGINE = InnoDB
+ DEFAULT CHARSET = utf8mb4 COMMENT ='Stream source field table';
+
-- ----------------------------
-- Table structure for stream_sink_field
-- ----------------------------
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamTransformController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamTransformController.java
new file mode 100644
index 000000000..c0e201960
--- /dev/null
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamTransformController.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.web.controller;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.beans.Response;
+import org.apache.inlong.manager.common.enums.OperationType;
+import org.apache.inlong.manager.common.pojo.transform.TransformRequest;
+import org.apache.inlong.manager.common.pojo.transform.TransformResponse;
+import org.apache.inlong.manager.common.util.LoginUserUtils;
+import org.apache.inlong.manager.service.core.operationlog.OperationLog;
+import org.apache.inlong.manager.service.transform.StreamTransformService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.validation.annotation.Validated;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.List;
+
+/**
+ * Stream transform control layer
+ */
+@RestController
+@RequestMapping("/transform")
+@Api(tags = "Stream transform config")
+@Slf4j
+public class StreamTransformController {
+
+ @Autowired
+ protected StreamTransformService streamTransformService;
+
+ @RequestMapping(value = "/save", method = RequestMethod.POST)
+ @OperationLog(operation = OperationType.CREATE)
+ @ApiOperation(value = "Save stream transform")
+ public Response<Integer> save(@Validated @RequestBody TransformRequest request) {
+ return Response.success(
+ streamTransformService.save(request, LoginUserUtils.getLoginUserDetail().getUserName()));
+ }
+
+ @RequestMapping(value = "/list", method = RequestMethod.GET)
+ @ApiOperation(value = "Query stream transform list")
+ public Response<List<TransformResponse>> list(@RequestParam("inlongGroupId") String groupId,
+ @RequestParam("inlongStreamId") String streamId) {
+ return Response.success(streamTransformService.listTransform(groupId, streamId));
+ }
+
+ @RequestMapping(value = "/update", method = RequestMethod.POST)
+ @OperationLog(operation = OperationType.UPDATE)
+ @ApiOperation(value = "Modify stream source")
+ public Response<Boolean> update(@Validated @RequestBody TransformRequest request) {
+ return Response.success(
+ streamTransformService.update(request, LoginUserUtils.getLoginUserDetail().getUserName()));
+ }
+
+ @RequestMapping(value = "/delete", method = RequestMethod.DELETE)
+ @OperationLog(operation = OperationType.UPDATE)
+ @ApiOperation(value = "Modify stream source")
+ public Response<Boolean> delete(@RequestParam("inlongGroupId") String groupId,
+ @RequestParam("inlongStreamId") String streamId, @RequestParam("transformName") String transformName) {
+ return Response.success(
+ streamTransformService.delete(groupId, streamId, transformName,
+ LoginUserUtils.getLoginUserDetail().getUserName()));
+ }
+}