You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/10 09:10:23 UTC
[incubator-inlong] branch master updated: [INLONG-3039][Manager] Add properties in dataflow info (#3040)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b4fea7e [INLONG-3039][Manager] Add properties in dataflow info (#3040)
b4fea7e is described below
commit b4fea7e576ff5e328dbc40f15b76cfcdeaacec31
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Thu Mar 10 16:53:05 2022 +0800
[INLONG-3039][Manager] Add properties in dataflow info (#3040)
---
.../inlong/manager/client/api/ClientConfiguration.java | 5 +++--
.../org/apache/inlong/manager/client/api/DataFormat.java | 4 +++-
.../org/apache/inlong/manager/client/api/DataSeparator.java | 6 +++---
.../apache/inlong/manager/client/api/FlinkSortBaseConf.java | 3 ++-
.../org/apache/inlong/manager/client/api/InlongClient.java | 5 +++--
.../inlong/manager/client/api/InlongGroupContext.java | 7 ++++---
.../inlong/manager/client/api/InlongStreamBuilder.java | 1 +
.../org/apache/inlong/manager/client/api/MqBaseConf.java | 3 ++-
.../org/apache/inlong/manager/client/api/StreamField.java | 3 ++-
.../org/apache/inlong/manager/client/api/StreamSink.java | 4 ++++
.../inlong/manager/client/api/UserDefinedSortConf.java | 3 ++-
.../manager/client/api/auth/DefaultAuthentication.java | 3 ++-
.../manager/client/api/auth/SecretAuthentication.java | 3 ++-
.../manager/client/api/auth/SecretTokenAuthentication.java | 3 ++-
.../inlong/manager/client/api/auth/TokenAuthentication.java | 3 ++-
.../inlong/manager/client/api/impl/BlankInlongGroup.java | 3 ++-
.../inlong/manager/client/api/impl/InlongClientImpl.java | 13 +++++++------
.../inlong/manager/client/api/impl/InlongGroupImpl.java | 7 ++++---
.../inlong/manager/client/api/impl/InlongStreamImpl.java | 7 ++++---
.../inlong/manager/client/api/inner/InnerGroupContext.java | 5 +++--
.../manager/client/api/inner/InnerInlongManagerClient.java | 3 ++-
.../inlong/manager/client/api/inner/InnerStreamContext.java | 5 +++--
.../inlong/manager/client/api/sink/ClickHouseSink.java | 4 ----
.../org/apache/inlong/manager/client/api/sink/HiveSink.java | 9 ++++++---
.../apache/inlong/manager/client/api/sink/KafkaSink.java | 4 ----
.../apache/inlong/manager/client/api/util/AssertUtil.java | 3 ++-
.../org/apache/inlong/manager/client/api/util/GsonUtil.java | 5 +++--
.../inlong/manager/client/api/util/InlongGroupTransfer.java | 7 ++++---
.../apache/inlong/manager/client/api/util/InlongParser.java | 9 +++++----
.../manager/client/api/util/InlongStreamSinkTransfer.java | 9 ++++++---
.../manager/client/api/util/InlongStreamTransfer.java | 5 +++--
.../inlong/manager/common/pojo/sink/SinkListResponse.java | 3 +++
.../apache/inlong/manager/common/pojo/sink/SinkRequest.java | 4 ++++
.../inlong/manager/common/pojo/sink/SinkResponse.java | 4 ++++
.../manager/common/pojo/sink/ck/ClickHouseSinkDTO.java | 5 +++++
.../inlong/manager/common/pojo/sink/hive/HiveSinkDTO.java | 5 +++++
.../manager/common/pojo/sink/iceberg/IcebergSinkDTO.java | 5 +++++
.../inlong/manager/common/pojo/sink/kafka/KafkaSinkDTO.java | 7 ++++++-
.../common/pojo/sink/kafka/KafkaSinkListResponse.java | 2 +-
.../manager/common/pojo/sink/kafka/KafkaSinkRequest.java | 4 +---
.../manager/common/pojo/sink/kafka/KafkaSinkResponse.java | 2 +-
.../inlong/manager/common/pojo/source/SourceRequest.java | 2 +-
.../inlong/manager/common/pojo/source/SourceResponse.java | 2 +-
.../manager/common/pojo/source/kafka/KafkaSourceDTO.java | 2 +-
.../common/pojo/source/kafka/KafkaSourceListResponse.java | 2 +-
.../service/thirdparty/sort/CreateSortConfigListener.java | 8 +++++++-
46 files changed, 137 insertions(+), 74 deletions(-)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/ClientConfiguration.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/ClientConfiguration.java
index 068fb69..4903841 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/ClientConfiguration.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/ClientConfiguration.java
@@ -17,13 +17,14 @@
package org.apache.inlong.manager.client.api;
-import java.io.Serializable;
-import java.util.concurrent.TimeUnit;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.inlong.manager.client.api.auth.Authentication;
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+
/**
* A simple class to hold client configuration values.
*/
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataFormat.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataFormat.java
index 2d9960e..07ac2a9 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataFormat.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataFormat.java
@@ -17,13 +17,15 @@
package org.apache.inlong.manager.client.api;
-import java.util.Locale;
import lombok.Getter;
+import java.util.Locale;
+
public enum DataFormat {
CSV("csv"),
AVRO("avro"),
CANAL("canal"),
+ JSON("json"),
NONE("none");
@Getter
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataSeparator.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataSeparator.java
index 7e0f99a..6b795a8 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataSeparator.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataSeparator.java
@@ -23,9 +23,9 @@ public enum DataSeparator {
COLON(":", 58),
SEMICOLON(";", 59),
DASH("-", 45),
- SOH("\001",1),
- STX("\002",2),
- ETX("\003",3);
+ SOH("\001", 1),
+ STX("\002", 2),
+ ETX("\003", 3);
private String seperator;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/FlinkSortBaseConf.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/FlinkSortBaseConf.java
index c077966..0cced82 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/FlinkSortBaseConf.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/FlinkSortBaseConf.java
@@ -20,10 +20,11 @@ package org.apache.inlong.manager.client.api;
import com.google.common.collect.Maps;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
-import java.util.Map;
import lombok.Data;
import org.apache.inlong.manager.client.api.auth.Authentication;
+import java.util.Map;
+
@Data
@ApiModel("Base configuration for flink cluster")
public class FlinkSortBaseConf extends SortBaseConf {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
index 18a03ef..bc13b16 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
@@ -17,9 +17,10 @@
package org.apache.inlong.manager.client.api;
-import java.util.List;
import org.apache.inlong.manager.client.api.impl.InlongClientImpl;
+import java.util.List;
+
/**
* An interface to manipulate Inlong Cluster
* <p/>
@@ -47,7 +48,7 @@ public interface InlongClient {
/**
* Create inlong client.
*
- * @param serviceUrl the service url
+ * @param serviceUrl the service url
* @param configuration the configuration
* @return the inlong client
*/
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
index dd788f8..68789e1 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
@@ -18,9 +18,6 @@
package org.apache.inlong.manager.client.api;
import com.google.common.collect.Maps;
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
import lombok.Data;
import org.apache.commons.collections.CollectionUtils;
import org.apache.inlong.manager.client.api.inner.InnerGroupContext;
@@ -28,6 +25,10 @@ import org.apache.inlong.manager.client.api.util.AssertUtil;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
@Data
public class InlongGroupContext implements Serializable {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStreamBuilder.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStreamBuilder.java
index 6e202b9..9972c29 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStreamBuilder.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStreamBuilder.java
@@ -31,6 +31,7 @@ public abstract class InlongStreamBuilder {
/**
* create sink in stream.
* *
+ *
* @return inlong stream builder
*/
public abstract InlongStreamBuilder sink(StreamSink sink);
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java
index 619906b..d3993b2 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java
@@ -19,10 +19,11 @@ package org.apache.inlong.manager.client.api;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
-import java.io.Serializable;
import lombok.Data;
import org.apache.inlong.manager.common.enums.MqType;
+import java.io.Serializable;
+
@Data
@ApiModel("Base configuration for message queue")
public abstract class MqBaseConf implements Serializable {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamField.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamField.java
index 53d4363..cc1661a 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamField.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamField.java
@@ -19,11 +19,12 @@ package org.apache.inlong.manager.client.api;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
-import java.util.Locale;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
+import java.util.Locale;
+
@Data
@NoArgsConstructor
@AllArgsConstructor
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
index df6f610..0c18cd5 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
@@ -23,6 +23,7 @@ import lombok.Data;
import org.apache.inlong.manager.common.enums.SinkType;
import java.util.List;
+import java.util.Map;
@Data
@ApiModel("Stream sink configuration")
@@ -31,6 +32,9 @@ public abstract class StreamSink {
@ApiModelProperty(value = "DataSink name", required = true)
private String sinkName;
+ @ApiModelProperty("Other properties if need")
+ private Map<String, Object> properties;
+
public abstract SinkType getSinkType();
public abstract List<SinkField> getSinkFields();
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/UserDefinedSortConf.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/UserDefinedSortConf.java
index 3b60272..fd1756b 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/UserDefinedSortConf.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/UserDefinedSortConf.java
@@ -20,9 +20,10 @@ package org.apache.inlong.manager.client.api;
import com.google.common.collect.Maps;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
-import java.util.Map;
import lombok.Data;
+import java.util.Map;
+
@Data
@ApiModel("Base configuration for user defined sort functions")
public class UserDefinedSortConf extends SortBaseConf {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/auth/DefaultAuthentication.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/auth/DefaultAuthentication.java
index c6fad8a..e6ddf66 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/auth/DefaultAuthentication.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/auth/DefaultAuthentication.java
@@ -18,11 +18,12 @@
package org.apache.inlong.manager.client.api.auth;
import com.alibaba.fastjson.JSONObject;
-import java.util.Map;
import lombok.Getter;
import lombok.NoArgsConstructor;
import org.apache.inlong.manager.client.api.util.AssertUtil;
+import java.util.Map;
+
@NoArgsConstructor
public class DefaultAuthentication implements Authentication {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/auth/SecretAuthentication.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/auth/SecretAuthentication.java
index 23d566f..6d46e6e 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/auth/SecretAuthentication.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/auth/SecretAuthentication.java
@@ -18,11 +18,12 @@
package org.apache.inlong.manager.client.api.auth;
import com.alibaba.fastjson.JSONObject;
-import java.util.Map;
import lombok.Getter;
import lombok.NoArgsConstructor;
import org.apache.inlong.manager.client.api.util.AssertUtil;
+import java.util.Map;
+
@NoArgsConstructor
public class SecretAuthentication implements Authentication {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/auth/SecretTokenAuthentication.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/auth/SecretTokenAuthentication.java
index e0f8ddc..0837b02 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/auth/SecretTokenAuthentication.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/auth/SecretTokenAuthentication.java
@@ -18,11 +18,12 @@
package org.apache.inlong.manager.client.api.auth;
import com.alibaba.fastjson.JSONObject;
-import java.util.Map;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
+import java.util.Map;
+
@NoArgsConstructor
public class SecretTokenAuthentication extends SecretAuthentication {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/auth/TokenAuthentication.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/auth/TokenAuthentication.java
index c813943..cd63bed 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/auth/TokenAuthentication.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/auth/TokenAuthentication.java
@@ -18,11 +18,12 @@
package org.apache.inlong.manager.client.api.auth;
import com.alibaba.fastjson.JSONObject;
-import java.util.Map;
import lombok.Getter;
import lombok.NoArgsConstructor;
import org.apache.inlong.manager.client.api.util.AssertUtil;
+import java.util.Map;
+
@NoArgsConstructor
public class TokenAuthentication implements Authentication {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java
index c0e076d..37c62cb 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java
@@ -17,7 +17,6 @@
package org.apache.inlong.manager.client.api.impl;
-import java.util.List;
import org.apache.inlong.manager.client.api.InlongGroup;
import org.apache.inlong.manager.client.api.InlongGroupConf;
import org.apache.inlong.manager.client.api.InlongGroupContext;
@@ -25,6 +24,8 @@ import org.apache.inlong.manager.client.api.InlongStream;
import org.apache.inlong.manager.client.api.InlongStreamBuilder;
import org.apache.inlong.manager.client.api.InlongStreamConf;
+import java.util.List;
+
public class BlankInlongGroup implements InlongGroup {
@Override
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
index 50d30ac..017b95b 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
@@ -20,12 +20,6 @@ package org.apache.inlong.manager.client.api.impl;
import com.github.pagehelper.PageInfo;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
@@ -39,6 +33,13 @@ import org.apache.inlong.manager.client.api.util.InlongGroupTransfer;
import org.apache.inlong.manager.common.pojo.group.InlongGroupListResponse;
import org.apache.inlong.manager.common.pojo.group.InlongGroupResponse;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
@Slf4j
public class InlongClientImpl implements InlongClient {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
index 58cf865..08947d0 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
@@ -19,9 +19,6 @@ package org.apache.inlong.manager.client.api.impl;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
@@ -53,6 +50,10 @@ import org.apache.inlong.manager.common.pojo.workflow.TaskResponse;
import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
public class InlongGroupImpl implements InlongGroup {
private InlongGroupConf groupConf;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
index ce89b8c..1228871 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
@@ -18,9 +18,6 @@
package org.apache.inlong.manager.client.api.impl;
import com.google.common.collect.Maps;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
@@ -40,6 +37,10 @@ import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
@Data
@EqualsAndHashCode(callSuper = true)
@NoArgsConstructor
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerGroupContext.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerGroupContext.java
index fbb8312..dfe0290 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerGroupContext.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerGroupContext.java
@@ -18,8 +18,6 @@
package org.apache.inlong.manager.client.api.inner;
import com.google.common.collect.Maps;
-import java.util.List;
-import java.util.Map;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.collections.MapUtils;
@@ -31,6 +29,9 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupApproveRequest;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
+import java.util.List;
+import java.util.Map;
+
@Data
@NoArgsConstructor
public class InnerGroupContext {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
index 5d4ce4a..eb674c9 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
@@ -20,7 +20,6 @@ package org.apache.inlong.manager.client.api.inner;
import com.alibaba.fastjson.JSONObject;
import com.github.pagehelper.PageInfo;
import com.google.common.collect.Lists;
-import java.util.List;
import lombok.extern.slf4j.Slf4j;
import okhttp3.Call;
import okhttp3.MediaType;
@@ -53,6 +52,8 @@ import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.workflow.EventLogView;
import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
+import java.util.List;
+
/**
* InnerInlongManagerClient is used to invoke http api of inlong manager.
*/
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerStreamContext.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerStreamContext.java
index 46979cf..5b85758 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerStreamContext.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerStreamContext.java
@@ -18,8 +18,6 @@
package org.apache.inlong.manager.client.api.inner;
import com.google.common.collect.Maps;
-import java.util.List;
-import java.util.Map;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
@@ -27,6 +25,9 @@ import org.apache.inlong.manager.common.pojo.source.SourceRequest;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import java.util.List;
+import java.util.Map;
+
@Data
@NoArgsConstructor
public class InnerStreamContext {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/ClickHouseSink.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/ClickHouseSink.java
index 1bd24ad..b507b9e 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/ClickHouseSink.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/ClickHouseSink.java
@@ -30,7 +30,6 @@ import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
import org.apache.inlong.manager.common.enums.SinkType;
import java.util.List;
-import java.util.Map;
@Data
@EqualsAndHashCode(callSuper = true)
@@ -81,9 +80,6 @@ public class ClickHouseSink extends StreamSink {
@ApiModelProperty("Field definitions for clickhouse")
private List<SinkField> sinkFields;
- @ApiModelProperty("Other properties if need")
- private Map<String, String> properties;
-
@Override
public DataFormat getDataFormat() {
return DataFormat.NONE;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/HiveSink.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/HiveSink.java
index d9c1dde..a5a96a0 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/HiveSink.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/HiveSink.java
@@ -33,7 +33,6 @@ import org.apache.inlong.manager.common.enums.SinkType;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.List;
-import java.util.Map;
@Data
@EqualsAndHashCode(callSuper = true)
@@ -68,18 +67,22 @@ public class HiveSink extends StreamSink {
@ApiModelProperty("Data separator, stored as ASCII code")
private DataSeparator dataSeparator = DataSeparator.SOH;
+
@ApiModelProperty("File format, support: TextFile, RCFile, SequenceFile, Avro")
private FileFormat fileFormat;
+
@ApiModelProperty("Create table or not")
private boolean needCreated;
+
@ApiModelProperty("Primary partition field, default null")
private String primaryPartition;
+
@ApiModelProperty("Secondary partition field, default null")
private String secondaryPartition;
+
@ApiModelProperty("Field definitions for hive")
private List<SinkField> sinkFields;
- @ApiModelProperty("Other properties if need")
- private Map<String, String> properties;
+
@ApiModelProperty("Data format type for stream sink")
private DataFormat dataFormat;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/KafkaSink.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/KafkaSink.java
index 935f383..801d2d8 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/KafkaSink.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/KafkaSink.java
@@ -29,7 +29,6 @@ import org.apache.inlong.manager.client.api.StreamSink;
import org.apache.inlong.manager.common.enums.SinkType;
import java.util.List;
-import java.util.Map;
@Data
@EqualsAndHashCode(callSuper = true)
@@ -55,7 +54,4 @@ public class KafkaSink extends StreamSink {
@ApiModelProperty("Field definitions for kafka")
private List<SinkField> sinkFields;
-
- @ApiModelProperty("Other properties if need")
- private Map<String, String> properties;
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/AssertUtil.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/AssertUtil.java
index ccd1806..86ac32d 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/AssertUtil.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/AssertUtil.java
@@ -17,9 +17,10 @@
package org.apache.inlong.manager.client.api.util;
+import org.apache.commons.lang3.StringUtils;
+
import java.util.Collection;
import java.util.Map;
-import org.apache.commons.lang3.StringUtils;
public class AssertUtil {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/GsonUtil.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/GsonUtil.java
index b048b58..771bf3c 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/GsonUtil.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/GsonUtil.java
@@ -26,6 +26,9 @@ import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParseException;
import com.google.gson.JsonSyntaxException;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
import java.lang.reflect.Type;
import java.text.SimpleDateFormat;
import java.util.Date;
@@ -33,8 +36,6 @@ import java.util.Iterator;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
@Slf4j
public class GsonUtil {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
index 32660d4..a79c36e 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
@@ -19,9 +19,6 @@ package org.apache.inlong.manager.client.api.util;
import com.google.common.collect.Lists;
import com.google.gson.reflect.TypeToken;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
@@ -46,6 +43,10 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupResponse;
import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.inlong.manager.common.util.JsonUtils;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
public class InlongGroupTransfer {
public static InlongGroupConf parseGroupResponse(InlongGroupResponse groupResponse) {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
index e1ec229..21e288b 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
@@ -17,15 +17,11 @@
package org.apache.inlong.manager.client.api.util;
-import static org.apache.inlong.manager.common.enums.SourceType.BINLOG;
-import static org.apache.inlong.manager.common.enums.SourceType.KAFKA;
-
import com.github.pagehelper.PageInfo;
import com.google.common.collect.Lists;
import com.google.common.reflect.TypeToken;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
-import java.util.List;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.inlong.manager.common.beans.Response;
import org.apache.inlong.manager.common.enums.Constant;
@@ -54,6 +50,11 @@ import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.workflow.EventLogView;
import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
+import java.util.List;
+
+import static org.apache.inlong.manager.common.enums.SourceType.BINLOG;
+import static org.apache.inlong.manager.common.enums.SourceType.KAFKA;
+
/**
* Parser for Inlong entity
*/
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
index 8755dfe..8c517c3 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
@@ -103,10 +103,10 @@ public class InlongStreamSinkTransfer {
clickHouseSinkRequest.setKeyFieldNames(clickHouseSink.getKeyFieldNames());
clickHouseSinkRequest.setPartitionKey(clickHouseSink.getPartitionKey());
clickHouseSinkRequest.setPartitionStrategy(clickHouseSink.getPartitionStrategy());
- clickHouseSinkRequest.setPartitionKey(clickHouseSink.getPartitionKey());
clickHouseSinkRequest.setWriteMaxRetryTimes(clickHouseSink.getWriteMaxRetryTimes());
clickHouseSinkRequest.setInlongGroupId(streamInfo.getInlongGroupId());
clickHouseSinkRequest.setInlongStreamId(streamInfo.getInlongStreamId());
+ clickHouseSinkRequest.setProperties(clickHouseSink.getProperties());
clickHouseSinkRequest.setEnableCreateResource(clickHouseSink.isNeedCreated() ? 1 : 0);
if (CollectionUtils.isNotEmpty(clickHouseSink.getSinkFields())) {
List<SinkFieldRequest> fieldRequests = createSinkFieldRequests(streamSink.getSinkFields());
@@ -138,7 +138,7 @@ public class InlongStreamSinkTransfer {
clickHouseSink.setWriteMaxRetryTimes(sinkResponse.getWriteMaxRetryTimes());
clickHouseSink.setDistributedTable(sinkResponse.getDistributedTable());
}
-
+ clickHouseSink.setProperties(sinkResponse.getProperties());
clickHouseSink.setNeedCreated(sinkResponse.getEnableCreateResource() == 1);
if (CollectionUtils.isNotEmpty(sinkResponse.getFieldList())) {
clickHouseSink.setSinkFields(convertToSinkFields(sinkResponse.getFieldList()));
@@ -167,6 +167,7 @@ public class InlongStreamSinkTransfer {
kafkaSinkRequest.setInlongStreamId(streamInfo.getInlongStreamId());
kafkaSinkRequest.setSerializationType(kafkaSink.getDataFormat().name());
kafkaSinkRequest.setEnableCreateResource(kafkaSink.isNeedCreated() ? 1 : 0);
+ kafkaSinkRequest.setProperties(kafkaSink.getProperties());
if (CollectionUtils.isNotEmpty(kafkaSink.getSinkFields())) {
List<SinkFieldRequest> fieldRequests = createSinkFieldRequests(kafkaSink.getSinkFields());
kafkaSinkRequest.setFieldList(fieldRequests);
@@ -190,6 +191,7 @@ public class InlongStreamSinkTransfer {
kafkaSink.setTopicName(sinkResponse.getTopicName());
kafkaSink.setDataFormat(DataFormat.forName(sinkResponse.getSerializationType()));
}
+ kafkaSink.setProperties(sinkResponse.getProperties());
kafkaSink.setNeedCreated(sinkResponse.getEnableCreateResource() == 1);
if (CollectionUtils.isNotEmpty(sinkResponse.getFieldList())) {
kafkaSink.setSinkFields(convertToSinkFields(sinkResponse.getFieldList()));
@@ -220,6 +222,7 @@ public class InlongStreamSinkTransfer {
hiveSinkRequest.setPassword(defaultAuthentication.getPassword());
hiveSinkRequest.setPrimaryPartition(hiveSink.getPrimaryPartition());
hiveSinkRequest.setSecondaryPartition(hiveSink.getSecondaryPartition());
+ hiveSinkRequest.setProperties(hiveSink.getProperties());
if (CollectionUtils.isNotEmpty(hiveSink.getSinkFields())) {
List<SinkFieldRequest> fieldRequests = createSinkFieldRequests(streamSink.getSinkFields());
hiveSinkRequest.setFieldList(fieldRequests);
@@ -275,7 +278,7 @@ public class InlongStreamSinkTransfer {
hiveSink.setSecondaryPartition(sinkResponse.getSecondaryPartition());
hiveSink.setPrimaryPartition(sinkResponse.getPrimaryPartition());
}
-
+ hiveSink.setProperties(sinkResponse.getProperties());
hiveSink.setSinkType(SinkType.HIVE);
hiveSink.setNeedCreated(sinkResponse.getEnableCreateResource() == 1);
if (CollectionUtils.isNotEmpty(sinkResponse.getFieldList())) {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
index 12988b3..522affc 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
@@ -17,8 +17,6 @@
package org.apache.inlong.manager.client.api.util;
-import java.util.List;
-import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.client.api.InlongStreamConf;
@@ -27,6 +25,9 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import java.util.List;
+import java.util.stream.Collectors;
+
public class InlongStreamTransfer {
public static InlongStreamInfo createStreamInfo(InlongStreamConf streamConf, InlongGroupInfo groupInfo) {
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkListResponse.java
index cf218e2..4c46022 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkListResponse.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.common.pojo.sink;
import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.annotations.ApiModelProperty;
+import java.util.Map;
import lombok.Data;
import java.util.Date;
@@ -59,4 +60,6 @@ public class SinkListResponse {
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date modifyTime;
+ @ApiModelProperty("Properties for sink")
+ private Map<String, Object> properties;
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkRequest.java
index 34cec44..68bbc24 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkRequest.java
@@ -19,8 +19,10 @@ package org.apache.inlong.manager.common.pojo.sink;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
+import com.google.common.collect.Maps;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
+import java.util.Map;
import lombok.Data;
import javax.validation.constraints.NotNull;
@@ -61,4 +63,6 @@ public class SinkRequest {
@ApiModelProperty("Sink field list")
private List<SinkFieldRequest> fieldList;
+ @ApiModelProperty("Properties for sink")
+ private Map<String, Object> properties = Maps.newHashMap();
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkResponse.java
index f4d3a1e..e0683fa 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkResponse.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.common.pojo.sink;
import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
+import java.util.Map;
import lombok.Data;
import java.util.Date;
@@ -79,4 +80,7 @@ public class SinkResponse {
@ApiModelProperty("Sink field list")
private List<SinkFieldResponse> fieldList;
+ @ApiModelProperty("Properties for sink")
+ private Map<String, Object> properties;
+
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkDTO.java
index 93f7d9b..bce8e70 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkDTO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/ck/ClickHouseSinkDTO.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.common.pojo.sink.ck;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.swagger.annotations.ApiModelProperty;
+import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@@ -76,6 +77,9 @@ public class ClickHouseSinkDTO {
@ApiModelProperty("Write max retry times")
private Integer writeMaxRetryTimes;
+ @ApiModelProperty("Properties for clickhouse")
+ private Map<String, Object> properties;
+
/**
* Get the dto instance from the request
*/
@@ -93,6 +97,7 @@ public class ClickHouseSinkDTO {
.flushInterval(request.getFlushInterval())
.flushRecordNumber(request.getFlushRecordNumber())
.writeMaxRetryTimes(request.getWriteMaxRetryTimes())
+ .properties(request.getProperties())
.build();
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkDTO.java
index ed5eef6..65f3ea8 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkDTO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/hive/HiveSinkDTO.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.common.pojo.sink.hive;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.swagger.annotations.ApiModelProperty;
+import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@@ -85,6 +86,9 @@ public class HiveSinkDTO {
@ApiModelProperty("Data field separator")
private String dataSeparator;
+ @ApiModelProperty("Properties for hive")
+ private Map<String, Object> properties;
+
/**
* Get the dto instance from the request
*/
@@ -105,6 +109,7 @@ public class HiveSinkDTO {
.fileFormat(request.getFileFormat())
.dataEncoding(request.getDataEncoding())
.dataSeparator(request.getDataSeparator())
+ .properties(request.getProperties())
.build();
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkDTO.java
index 0af7778..7d4737c 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkDTO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/iceberg/IcebergSinkDTO.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.common.pojo.sink.iceberg;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.swagger.annotations.ApiModelProperty;
+import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@@ -43,12 +44,16 @@ public class IcebergSinkDTO {
@ApiModelProperty("table Location like hdfs://")
private String tableLocation;
+ @ApiModelProperty("Properties for iceberg")
+ private Map<String, Object> properties;
+
/**
* Get the dto instance from the request
*/
public static IcebergSinkDTO getFromRequest(IcebergSinkRequest request) {
return IcebergSinkDTO.builder()
.tableLocation(request.getTableLocation())
+ .properties(request.getProperties())
.build();
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkDTO.java
index 58b54e4..54b4fc6 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkDTO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkDTO.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.common.pojo.sink.kafka;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.swagger.annotations.ApiModelProperty;
+import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@@ -46,9 +47,12 @@ public class KafkaSinkDTO {
@ApiModelProperty("Kafka topicName")
private String topicName;
- @ApiModelProperty("Data Serialization, support: Json, Canal, Avro")
+ @ApiModelProperty("Data Serialization, support: json, canal, avro")
private String serializationType;
+ @ApiModelProperty("Properties for kafka")
+ private Map<String, Object> properties;
+
/**
* Get the dto instance from the request
*/
@@ -57,6 +61,7 @@ public class KafkaSinkDTO {
.address(request.getAddress())
.topicName(request.getTopicName())
.serializationType(request.getSerializationType())
+ .properties(request.getProperties())
.build();
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkListResponse.java
index 9aebfb9..eff2b46 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkListResponse.java
@@ -37,7 +37,7 @@ public class KafkaSinkListResponse extends SinkListResponse {
@ApiModelProperty("Kafka topicName")
private String topicName;
- @ApiModelProperty("Data Serialization, support: Json, Canal, Avro")
+ @ApiModelProperty("Data Serialization, support: json, canal, avro")
private String serializationType;
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkRequest.java
index 98cefb5..74ca8e4 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkRequest.java
@@ -42,8 +42,6 @@ public class KafkaSinkRequest extends SinkRequest {
@ApiModelProperty("Kafka topicName")
private String topicName;
- @ApiModelProperty("Data Serialization, support: Json, Canal, Avro")
+ @ApiModelProperty("Data Serialization, support: json, canal, avro")
private String serializationType;
-
-
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkResponse.java
index 665a74a..b60e2b1 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/kafka/KafkaSinkResponse.java
@@ -44,7 +44,7 @@ public class KafkaSinkResponse extends SinkResponse {
@ApiModelProperty("Kafka topicName")
private String topicName;
- @ApiModelProperty("Data Serialization, support: Json, Canal, Avro")
+ @ApiModelProperty("Data Serialization, support: json, canal, avro")
private String serializationType;
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
index ffd719f..069da43 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceRequest.java
@@ -72,6 +72,6 @@ public class SourceRequest {
@ApiModelProperty("Snapshot of the source task")
private String snapshot;
- @ApiModelProperty("Data Serialization, support: Json, Canal, Avro, etc")
+ @ApiModelProperty("Data Serialization, support: json, canal, avro, etc")
private String serializationType;
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java
index 67834d4..a35a36d 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourceResponse.java
@@ -84,6 +84,6 @@ public class SourceResponse {
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date modifyTime;
- @ApiModelProperty("Data Serialization, support: Json, Canal, Avro, etc")
+ @ApiModelProperty("Data Serialization, support: json, canal, avro, etc")
private String serializationType;
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
index dffdda0..2b81870 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceDTO.java
@@ -61,7 +61,7 @@ public class KafkaSourceDTO {
notes = "For example, '0#100_1#10' means the offset of partition 0 is 100, the offset of partition 1 is 10")
private String topicPartitionOffset;
- @ApiModelProperty("Data Serialization, support: Json, Canal, Avro, etc")
+ @ApiModelProperty("Data Serialization, support: json, canal, avro, etc")
private String serializationType;
/**
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
index 6c1c235..318eabb 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/kafka/KafkaSourceListResponse.java
@@ -46,7 +46,7 @@ public class KafkaSourceListResponse extends SourceListResponse {
@ApiModelProperty("Limit the number of bytes read per second")
private String byteSpeedLimit;
- @ApiModelProperty("Data Serialization, support: Json, Canal, Avro, etc")
+ @ApiModelProperty("Data Serialization, support: json, canal, avro, etc")
private String serializationType = "none";
@ApiModelProperty(value = "Topic partition offset",
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
index 969dd69..a752757 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
@@ -19,7 +19,9 @@ package org.apache.inlong.manager.service.thirdparty.sort;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
@@ -155,7 +157,11 @@ public class CreateSortConfigListener implements SortOperateListener {
// Get sink info
SinkInfo sinkInfo = SinkInfoUtils.createSinkInfo(sourceResponse, sinkResponse);
- return new DataFlowInfo(sinkId, sourceInfo, sinkInfo);
+ Map<String, Object> properties = Maps.newHashMap();
+ if (MapUtils.isNotEmpty(sinkResponse.getProperties())) {
+ properties.putAll(sinkResponse.getProperties());
+ }
+ return new DataFlowInfo(sinkId, sourceInfo, sinkInfo, properties);
}
private InlongGroupInfo getGroupInfo(ProcessForm processForm) {