You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/02/25 12:00:16 UTC
[incubator-inlong] branch master updated: [INLONG-2717][Manager][Sort] Support none of middleware and Add DataTypeEnum (#2718)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 958b182 [INLONG-2717][Manager][Sort] Support none of middleware and Add DataTypeEnum (#2718)
958b182 is described below
commit 958b18211b39dd5f8dfe7d37cafbcf4683ca551c
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Fri Feb 25 20:00:11 2022 +0800
[INLONG-2717][Manager][Sort] Support none of middleware and Add DataTypeEnum (#2718)
---
.../apache/inlong/common/enums/DataTypeEnum.java | 36 ++++++++------
.../manager/client/api/FlinkSortBaseConf.java | 5 +-
.../inlong/manager/client/api/MqBaseConf.java | 10 +++-
.../inlong/manager/client/api/SortBaseConf.java | 3 +-
.../api/{MqBaseConf.java => TubeBaseConf.java} | 28 +++++++----
.../{MqBaseConf.java => UserDefinedSortConf.java} | 20 ++++----
.../client/api/util/InlongGroupTransfer.java | 56 +++++++++++++++++++++-
.../inlong/manager/common/enums/Constant.java | 2 +
.../common/settings/InlongGroupSettings.java | 10 ++++
.../thirdparty/sort/ZkDisabledEventSelector.java | 7 ++-
.../thirdparty/sort/ZkEnabledEventSelector.java | 3 +-
.../deserialization/DeserializationInfo.java | 10 ++--
12 files changed, 141 insertions(+), 49 deletions(-)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
similarity index 57%
copy from inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java
copy to inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
index b44b162..7d32aa3 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
@@ -15,24 +15,30 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.client.api;
+package org.apache.inlong.common.enums;
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import java.io.Serializable;
-import lombok.Data;
+import lombok.Getter;
-@Data
-@ApiModel("Base configuration for message queue")
-public abstract class MqBaseConf implements Serializable {
+public enum DataTypeEnum {
+ CSV("csv"),
+ AVRO("avro"),
+ JSON("json"),
+ CANAL("canal"),
+ DEBEZIUM_JSON("debezium_json");
- public enum MqType {
- PULSAR,
- TUBE;
- }
+ @Getter
+ private String name;
- @ApiModelProperty("The number of partitions of Topic, 1-20")
- private int topicPartitionNum = 3;
+ DataTypeEnum(String name) {
+ this.name = name;
+ }
- public abstract MqType getType();
+ public static DataTypeEnum forName(String name) {
+ for (DataTypeEnum dataType : values()) {
+ if (dataType.getName().equals(name)) {
+ return dataType;
+ }
+ }
+ throw new IllegalArgumentException(String.format("Unsupport dataType for Inlong:%s", name));
+ }
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/FlinkSortBaseConf.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/FlinkSortBaseConf.java
index 706420a..c077966 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/FlinkSortBaseConf.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/FlinkSortBaseConf.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.client.api;
+import com.google.common.collect.Maps;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import java.util.Map;
@@ -39,6 +40,6 @@ public class FlinkSortBaseConf extends SortBaseConf {
@ApiModelProperty("Region for flink cluster, for example:ap-beijing|ap-chengdu|ap-chongqing or null if not exists")
private String region;
- @ApiModelProperty("Other properties if need")
- private Map<String, String> properties;
+ @ApiModelProperty("Other properties if needed")
+ private Map<String, String> properties = Maps.newHashMap();
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java
index b44b162..8a500b3 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java
@@ -26,9 +26,17 @@ import lombok.Data;
@ApiModel("Base configuration for message queue")
public abstract class MqBaseConf implements Serializable {
+ public static final MqBaseConf BLANK_MQ_CONF = new MqBaseConf() {
+ @Override
+ public MqType getType() {
+ return MqType.NONE;
+ }
+ };
+
public enum MqType {
PULSAR,
- TUBE;
+ TUBE,
+ NONE;
}
@ApiModelProperty("The number of partitions of Topic, 1-20")
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SortBaseConf.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SortBaseConf.java
index eb3b6b3..b670f5a 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SortBaseConf.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SortBaseConf.java
@@ -26,7 +26,8 @@ public abstract class SortBaseConf {
public enum SortType {
FLINK,
- LOCAL;
+ LOCAL,
+ USER_DEFINED;
}
public abstract SortType getType();
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/TubeBaseConf.java
similarity index 62%
copy from inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java
copy to inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/TubeBaseConf.java
index b44b162..274fb87 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/TubeBaseConf.java
@@ -19,20 +19,28 @@ package org.apache.inlong.manager.client.api;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
-import java.io.Serializable;
+import lombok.AllArgsConstructor;
import lombok.Data;
+import lombok.NoArgsConstructor;
@Data
-@ApiModel("Base configuration for message queue")
-public abstract class MqBaseConf implements Serializable {
+@AllArgsConstructor
+@NoArgsConstructor
+@ApiModel("Base configuration for Tube")
+public class TubeBaseConf extends MqBaseConf {
- public enum MqType {
- PULSAR,
- TUBE;
- }
+ @ApiModelProperty("Message queue type")
+ private MqType type = MqType.TUBE;
- @ApiModelProperty("The number of partitions of Topic, 1-20")
- private int topicPartitionNum = 3;
+ @ApiModelProperty("Tube manager URL")
+ private String tubeManagerUrl;
- public abstract MqType getType();
+ @ApiModelProperty("Tube master URL")
+ private String tubeMasterUrl;
+
+ @ApiModelProperty("Tube Cluster Id")
+ private int tubeClusterId = 1;
+
+ @ApiModelProperty("GroupName for tube producer")
+ private String groupName;
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/UserDefinedSortConf.java
similarity index 65%
copy from inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java
copy to inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/UserDefinedSortConf.java
index b44b162..3b60272 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/UserDefinedSortConf.java
@@ -17,22 +17,22 @@
package org.apache.inlong.manager.client.api;
+import com.google.common.collect.Maps;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
-import java.io.Serializable;
+import java.util.Map;
import lombok.Data;
@Data
-@ApiModel("Base configuration for message queue")
-public abstract class MqBaseConf implements Serializable {
+@ApiModel("Base configuration for user defined sort functions")
+public class UserDefinedSortConf extends SortBaseConf {
- public enum MqType {
- PULSAR,
- TUBE;
- }
+ @ApiModelProperty(value = "Sort type")
+ private SortType type = SortType.USER_DEFINED;
- @ApiModelProperty("The number of partitions of Topic, 1-20")
- private int topicPartitionNum = 3;
+ @ApiModelProperty("Name for user defined sort functions")
+ private String sortName;
- public abstract MqType getType();
+ @ApiModelProperty("Properties for user defined sort functions if needed")
+ private Map<String, String> properties = Maps.newHashMap();
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
index 079e46f..b614030 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
@@ -27,6 +27,8 @@ import org.apache.inlong.manager.client.api.MqBaseConf.MqType;
import org.apache.inlong.manager.client.api.PulsarBaseConf;
import org.apache.inlong.manager.client.api.SortBaseConf;
import org.apache.inlong.manager.client.api.SortBaseConf.SortType;
+import org.apache.inlong.manager.client.api.TubeBaseConf;
+import org.apache.inlong.manager.client.api.UserDefinedSortConf;
import org.apache.inlong.manager.client.api.auth.Authentication;
import org.apache.inlong.manager.client.api.auth.Authentication.AuthType;
import org.apache.inlong.manager.client.api.auth.SecretTokenAuthentication;
@@ -66,8 +68,12 @@ public class InlongGroupTransfer {
List<InlongGroupExtInfo> extInfos = createPulsarExtInfo(pulsarBaseConf);
groupInfo.getExtList().addAll(extInfos);
groupInfo.setTopicPartitionNum(pulsarBaseConf.getTopicPartitionNum());
- } else {
- // todo tubemq
+ } else if (mqType == MqType.TUBE) {
+ TubeBaseConf tubeBaseConf = (TubeBaseConf) mqConf;
+ List<InlongGroupExtInfo> extInfos = createTubeExtInfo(tubeBaseConf);
+ groupInfo.setMqResourceObj(tubeBaseConf.getGroupName());
+ groupInfo.getExtList().addAll(extInfos);
+ groupInfo.setTopicPartitionNum(tubeBaseConf.getTopicPartitionNum());
}
SortBaseConf sortBaseConf = groupConf.getSortBaseConf();
SortType sortType = sortBaseConf.getType();
@@ -75,6 +81,10 @@ public class InlongGroupTransfer {
FlinkSortBaseConf flinkSortBaseConf = (FlinkSortBaseConf) sortBaseConf;
List<InlongGroupExtInfo> sortExtInfos = createFlinkExtInfo(flinkSortBaseConf);
groupInfo.getExtList().addAll(sortExtInfos);
+ } else if (sortType == SortType.USER_DEFINED) {
+ UserDefinedSortConf udf = (UserDefinedSortConf) sortBaseConf;
+ List<InlongGroupExtInfo> sortExtInfos = createUserDefinedSortExtInfo(udf);
+ groupInfo.getExtList().addAll(sortExtInfos);
} else {
//todo local
}
@@ -128,8 +138,35 @@ public class InlongGroupTransfer {
return extInfos;
}
+ public static List<InlongGroupExtInfo> createTubeExtInfo(TubeBaseConf tubeBaseConf) {
+ List<InlongGroupExtInfo> extInfos = new ArrayList<>();
+ if (StringUtils.isNotEmpty(tubeBaseConf.getTubeMasterUrl())) {
+ InlongGroupExtInfo tubeManagerUrl = new InlongGroupExtInfo();
+ tubeManagerUrl.setKeyName(InlongGroupSettings.TUBE_MANAGER_URL);
+ tubeManagerUrl.setKeyValue(tubeBaseConf.getTubeManagerUrl());
+ extInfos.add(tubeManagerUrl);
+ }
+ if (StringUtils.isNotEmpty(tubeBaseConf.getTubeMasterUrl())) {
+ InlongGroupExtInfo tubeMasterUrl = new InlongGroupExtInfo();
+ tubeMasterUrl.setKeyName(InlongGroupSettings.TUBE_MASTER_URL);
+ tubeMasterUrl.setKeyValue(tubeBaseConf.getTubeMasterUrl());
+ extInfos.add(tubeMasterUrl);
+ }
+ if (tubeBaseConf.getTubeClusterId() > 0) {
+ InlongGroupExtInfo tubeClusterId = new InlongGroupExtInfo();
+ tubeClusterId.setKeyName(InlongGroupSettings.TUBE_CLUSTER_ID);
+ tubeClusterId.setKeyValue(String.valueOf(tubeBaseConf.getTubeClusterId()));
+ extInfos.add(tubeClusterId);
+ }
+ return extInfos;
+ }
+
public static List<InlongGroupExtInfo> createFlinkExtInfo(FlinkSortBaseConf flinkSortBaseConf) {
List<InlongGroupExtInfo> extInfos = new ArrayList<>();
+ InlongGroupExtInfo sortType = new InlongGroupExtInfo();
+ sortType.setKeyName(InlongGroupSettings.SORT_TYPE);
+ sortType.setKeyValue(InlongGroupSettings.DEFAULT_SORT_TYPE);
+ extInfos.add(sortType);
if (flinkSortBaseConf.getAuthentication() != null) {
Authentication authentication = flinkSortBaseConf.getAuthentication();
AuthType authType = authentication.getAuthType();
@@ -159,4 +196,19 @@ public class InlongGroupTransfer {
}
return extInfos;
}
+
+ public static List<InlongGroupExtInfo> createUserDefinedSortExtInfo(UserDefinedSortConf userDefinedSortConf) {
+ List<InlongGroupExtInfo> extInfos = new ArrayList<>();
+ InlongGroupExtInfo sortType = new InlongGroupExtInfo();
+ sortType.setKeyName(InlongGroupSettings.SORT_TYPE);
+ sortType.setKeyValue(userDefinedSortConf.getSortName());
+ extInfos.add(sortType);
+ if (MapUtils.isNotEmpty(userDefinedSortConf.getProperties())) {
+ InlongGroupExtInfo flinkProperties = new InlongGroupExtInfo();
+ flinkProperties.setKeyName(InlongGroupSettings.SORT_PROPERTIES);
+ flinkProperties.setKeyValue(JsonUtils.toJson(userDefinedSortConf.getProperties()));
+ extInfos.add(flinkProperties);
+ }
+ return extInfos;
+ }
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
index 2617c8e..dce0bf8 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
@@ -60,6 +60,8 @@ public class Constant {
public static final String MIDDLEWARE_PULSAR = "PULSAR";
+ public static final String MIDDLEWARE_NONE = "NONE";
+
public static final String SCHEMA_M0_DAY = "m0_day";
public static final String CLUSTER_HIVE_TOPO = "HIVE_TOPO";
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/settings/InlongGroupSettings.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/settings/InlongGroupSettings.java
index 76c0d8c..dd51641 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/settings/InlongGroupSettings.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/settings/InlongGroupSettings.java
@@ -29,6 +29,12 @@ public class InlongGroupSettings {
public static String DEFAULT_PULSAR_AUTHENTICATION_TYPE = "token";
+ public static String TUBE_MANAGER_URL = "tube.manager.url";
+
+ public static String TUBE_MASTER_URL = "tube.master.url";
+
+ public static String TUBE_CLUSTER_ID = "tube.cluster.id";
+
/**
* oceanus need param start
*/
@@ -66,6 +72,10 @@ public class InlongGroupSettings {
public static String FS_OFS_USER_APPID = "fs.ofs.user.appid";
+ public static String SORT_TYPE = "sort.type";
+
+ public static String DEFAULT_SORT_TYPE = "flink";
+
public static String SORT_URL = "sort.url";
public static String SORT_AUTHENTICATION = "sort.authentication";
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkDisabledEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkDisabledEventSelector.java
index f1d237e..3c8be51 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkDisabledEventSelector.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkDisabledEventSelector.java
@@ -18,6 +18,7 @@
package org.apache.inlong.manager.service.thirdparty.sort;
import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
@@ -36,11 +37,13 @@ public class ZkDisabledEventSelector implements EventSelector {
if (processForm instanceof GroupResourceProcessForm) {
GroupResourceProcessForm groupResourceForm = (GroupResourceProcessForm) processForm;
InlongGroupRequest groupInfo = groupResourceForm.getGroupInfo();
- return groupInfo.getZookeeperEnabled() == 0;
+ return groupInfo.getZookeeperEnabled() == 0
+ && !groupInfo.getMiddlewareType().equals(Constant.MIDDLEWARE_NONE);
} else if (processForm instanceof UpdateGroupProcessForm) {
UpdateGroupProcessForm updateGroupProcessForm = (UpdateGroupProcessForm) processForm;
InlongGroupRequest groupInfo = updateGroupProcessForm.getGroupInfo();
- return groupInfo.getZookeeperEnabled() == 0;
+ return groupInfo.getZookeeperEnabled() == 0
+ && !groupInfo.getMiddlewareType().equals(Constant.MIDDLEWARE_NONE);
} else {
return false;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkEnabledEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkEnabledEventSelector.java
index 88dca09..4818156 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkEnabledEventSelector.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkEnabledEventSelector.java
@@ -18,6 +18,7 @@
package org.apache.inlong.manager.service.thirdparty.sort;
import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
import org.apache.inlong.manager.workflow.WorkflowContext;
@@ -37,7 +38,7 @@ public class ZkEnabledEventSelector implements EventSelector {
}
GroupResourceProcessForm groupResourceForm = (GroupResourceProcessForm) processForm;
InlongGroupRequest groupInfo = groupResourceForm.getGroupInfo();
- return groupInfo.getZookeeperEnabled() == 1;
+ return groupInfo.getZookeeperEnabled() == 1 && !groupInfo.getMiddlewareType().equals(Constant.MIDDLEWARE_NONE);
}
}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java
index 85732fa..1ab13bc 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java
@@ -36,11 +36,11 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
@Type(value = JsonDeserializationInfo.class, name = "json"),
@Type(value = CanalDeserializationInfo.class, name = "canal"),
@Type(value = DebeziumDeserializationInfo.class, name = "debezium_json"),
- @Type(value = InLongMsgCsvDeserializationInfo.class, name = "inlongmsg_csv"),
- @Type(value = InLongMsgCsv2DeserializationInfo.class, name = "inlongmsg_csv2"),
- @Type(value = InLongMsgKvDeserializationInfo.class, name = "inlongmsg_kv"),
- @Type(value = InLongMsgTlogCsvDeserializationInfo.class, name = "inlongmsg_tlog_csv"),
- @Type(value = InLongMsgTlogKvDeserializationInfo.class, name = "inlongmsg_tlog_kv")
+ @Type(value = InLongMsgCsvDeserializationInfo.class, name = "inlong_msg_csv"),
+ @Type(value = InLongMsgCsv2DeserializationInfo.class, name = "inlong_msg_csv2"),
+ @Type(value = InLongMsgKvDeserializationInfo.class, name = "inlong_msg_kv"),
+ @Type(value = InLongMsgTlogCsvDeserializationInfo.class, name = "inlong_msg_tlog_csv"),
+ @Type(value = InLongMsgTlogKvDeserializationInfo.class, name = "inlong_msg_tlog_kv")
})
public interface DeserializationInfo extends Serializable {