You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/01 08:52:25 UTC
[incubator-inlong] branch master updated: [INlONG-2791][Manager] Optimize manager client APIs (#2792)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 76b8b30 [INlONG-2791][Manager] Optimize manager client APIs (#2792)
76b8b30 is described below
commit 76b8b30eb0bc195d55680a5187af3239e73bf5dc
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Tue Mar 1 16:52:16 2022 +0800
[INlONG-2791][Manager] Optimize manager client APIs (#2792)
* [INlONG-2791][Manager] Optimize manager client APIs
* [INlONG-2791][Manager] Use enums of common in manager client
* [INlONG-2791][Manager] Fix sourceType and sinkType conflicts
* [INlONG-2791][Manager] Add errorMsg for InlongGroup
---
.../inlong/manager/client/api/InlongClient.java | 10 +-
.../inlong/manager/client/api/InlongGroup.java | 8 +-
.../inlong/manager/client/api/InlongGroupInfo.java | 15 +--
.../inlong/manager/client/api/MqBaseConf.java | 9 ++
.../inlong/manager/client/api/SortBaseConf.java | 26 +++-
.../inlong/manager/client/api/StreamSink.java | 14 +-
.../inlong/manager/client/api/StreamSource.java | 15 +--
.../manager/client/api/auth/Authentication.java | 9 ++
.../manager/client/api/impl/BlankInlongGroup.java | 63 +++++++++
.../api/impl/DefaultInlongStreamBuilder.java | 6 +-
.../manager/client/api/impl/InlongClientImpl.java | 30 ++++-
.../manager/client/api/impl/InlongGroupImpl.java | 14 +-
.../inlong/manager/client/api/sink/HiveSink.java | 1 +
.../manager/client/api/source/KafkaSource.java | 1 +
.../client/api/source/MySQLBinlogSource.java | 3 +-
.../manager/client/api/source/MySQLSource.java | 3 +-
.../client/api/util/InlongGroupTransfer.java | 146 ++++++++++++++++++++-
.../api/util/InlongStreamSourceTransfer.java | 6 +-
.../client/api/util/InlongStreamTransfer.java | 4 +-
.../inlong/manager/common/enums/SourceType.java | 23 ++--
.../manager/common/pojo/sink/SinkPageRequest.java | 3 +-
.../common/pojo/source/SourcePageRequest.java | 3 +-
.../common/settings/InlongGroupSettings.java | 2 +
.../dao/mapper/StreamSourceEntityMapper.java | 6 +-
.../resources/mappers/StreamSinkEntityMapper.xml | 4 +-
.../resources/mappers/StreamSourceEntityMapper.xml | 6 +-
.../service/core/impl/AgentTaskServiceImpl.java | 18 ++-
.../service/sink/StreamSinkServiceImpl.java | 41 +++++-
.../service/source/StreamSourceServiceImpl.java | 30 ++++-
29 files changed, 421 insertions(+), 98 deletions(-)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
index f844701..18a03ef 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java
@@ -17,10 +17,8 @@
package org.apache.inlong.manager.client.api;
-import com.github.pagehelper.PageInfo;
+import java.util.List;
import org.apache.inlong.manager.client.api.impl.InlongClientImpl;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupListResponse;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
/**
* An interface to manipulate Inlong Cluster
@@ -73,15 +71,15 @@ public interface InlongClient {
* @return the list
* @throws Exception the exception
*/
- PageInfo<InlongGroupListResponse> listGroup(String keyword, int status, int pageNum, int pageSize) throws Exception;
+ List<InlongGroup> listGroup(String expr, int status, int pageNum, int pageSize) throws Exception;
/**
* Gets group.
*
- * @param groupId the group name
+ * @param groupName the group name
* @return the group
* @throws Exception the exception
*/
- InlongGroupRequest getGroup(String groupId) throws Exception;
+ InlongGroup getGroup(String groupName) throws Exception;
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroup.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroup.java
index ed85871..b57dd2c 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroup.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroup.java
@@ -29,6 +29,13 @@ public interface InlongGroup {
InlongStreamBuilder createStream(InlongStreamConf streamConf) throws Exception;
/**
+ * Create snapshot for Inlong group
+ * @return
+ * @throws Exception
+ */
+ InlongGroupInfo snapshot() throws Exception;
+
+ /**
* Init inlong group.
* This operation will init all physical resources needed to start a stream group
* Must be operated after all inlong streams were created;
@@ -64,5 +71,4 @@ public interface InlongGroup {
* @return inlong stream contained in this group
*/
List<InlongStream> listStreams() throws Exception;
-
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupInfo.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupInfo.java
index 0ab09e7..883f0f5 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupInfo.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupInfo.java
@@ -17,16 +17,14 @@
package org.apache.inlong.manager.client.api;
-import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.Serializable;
+import java.util.Map;
import lombok.Data;
import org.apache.inlong.manager.client.api.inner.InnerGroupContext;
import org.apache.inlong.manager.client.api.util.AssertUtil;
import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
@Data
public class InlongGroupInfo implements Serializable {
@@ -38,7 +36,8 @@ public class InlongGroupInfo implements Serializable {
private Map<String, InlongStream> inlongStreamMap;
- private List<String> errMsg;
+ //k->taskName v->errorMsg
+ private Map<String, String> errMsg;
private InlongGroupState state;
@@ -49,14 +48,14 @@ public class InlongGroupInfo implements Serializable {
this.groupName = groupInfo.getName();
this.groupConf = streamGroupConf;
this.inlongStreamMap = groupContext.getStreamMap();
- this.errMsg = Lists.newArrayList();
+ this.errMsg = Maps.newHashMap();
this.state = InlongGroupState.parseByBizStatus(groupInfo.getStatus());
}
public enum InlongGroupState {
INIT, FAIL, START, SUSPEND, RESTART, DELETE;
- // Reference to org.apache.inlong.manager.common.enums.EntityStatus code
+ // Reference to rg.apache.inlong.manager.common.enums.GroupState code
public static InlongGroupState parseByBizStatus(int bizCode) {
switch (bizCode) {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java
index 8a500b3..73c0e2b 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java
@@ -37,6 +37,15 @@ public abstract class MqBaseConf implements Serializable {
PULSAR,
TUBE,
NONE;
+
+ public static MqType forType(String type) {
+ for (MqType mqType : values()) {
+ if (mqType.name().equals(type)) {
+ return mqType;
+ }
+ }
+ throw new IllegalArgumentException(String.format("Unsupport queue=%s for Inlong", type));
+ }
}
@ApiModelProperty("The number of partitions of Topic, 1-20")
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SortBaseConf.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SortBaseConf.java
index b670f5a..608870d 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SortBaseConf.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/SortBaseConf.java
@@ -25,9 +25,29 @@ import lombok.Data;
public abstract class SortBaseConf {
public enum SortType {
- FLINK,
- LOCAL,
- USER_DEFINED;
+ FLINK("flink"),
+ LOCAL("local"),
+ USER_DEFINED("user_defined");
+
+ private String type;
+
+ public String getType() {
+ return this.type;
+ }
+
+ SortType(String type) {
+ this.type = type;
+ }
+
+ public static SortType forType(String type) {
+ for (SortType sortType : values()) {
+ if (sortType.getType().equals(type)) {
+ return sortType;
+ }
+ }
+ throw new IllegalArgumentException(String.format("Unsupport type=%s for Inlong", type));
+ }
+
}
public abstract SortType getType();
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
index 689ed7c..fd25024 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSink.java
@@ -20,24 +20,12 @@ package org.apache.inlong.manager.client.api;
import io.swagger.annotations.ApiModel;
import java.util.List;
import lombok.Data;
+import org.apache.inlong.manager.common.enums.SinkType;
@Data
@ApiModel("Stream sink configuration")
public abstract class StreamSink {
- public enum SinkType {
- HIVE, ES, KAFKA;
-
- public static SinkType forType(String type) {
- for (SinkType sinkType : values()) {
- if (sinkType.name().equals(type)) {
- return sinkType;
- }
- }
- throw new IllegalArgumentException(String.format("Illegal sink type=%s for Inlong", type));
- }
- }
-
public abstract SinkType getSinkType();
public abstract List<StreamField> getStreamFields();
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java
index b588e66..80f09a9 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java
@@ -19,25 +19,12 @@ package org.apache.inlong.manager.client.api;
import io.swagger.annotations.ApiModel;
import lombok.Data;
+import org.apache.inlong.manager.common.enums.SourceType;
@Data
@ApiModel("Stream source configuration")
public abstract class StreamSource {
- public enum SourceType {
- FILE, KAFKA, DB, BINLOG;
-
- public static SourceType forType(String type) {
- for (SourceType sourceType : values()) {
- if (sourceType.name().equals(type)) {
- return sourceType;
- }
- }
- throw new IllegalArgumentException(
- String.format("Unsupport source type=%s for Inlong", type));
- }
- }
-
public enum SyncType {
FULL, INCREMENT
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/auth/Authentication.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/auth/Authentication.java
index 71433fe..443237a 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/auth/Authentication.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/auth/Authentication.java
@@ -32,6 +32,15 @@ public interface Authentication {
public String toString() {
return this.name().toLowerCase(Locale.ROOT);
}
+
+ public static AuthType forType(String type) {
+ for (AuthType authType : values()) {
+ if (authType.name().equals(type.toUpperCase())) {
+ return authType;
+ }
+ }
+ throw new IllegalArgumentException(String.format("Unsupported authType=%s for Inlong", type));
+ }
}
AuthType getAuthType();
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java
new file mode 100644
index 0000000..5c0c9b6
--- /dev/null
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/BlankInlongGroup.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.impl;
+
+import java.util.List;
+import org.apache.inlong.manager.client.api.InlongGroup;
+import org.apache.inlong.manager.client.api.InlongGroupInfo;
+import org.apache.inlong.manager.client.api.InlongStream;
+import org.apache.inlong.manager.client.api.InlongStreamBuilder;
+import org.apache.inlong.manager.client.api.InlongStreamConf;
+
+public class BlankInlongGroup implements InlongGroup {
+
+ @Override
+ public InlongStreamBuilder createStream(InlongStreamConf streamConf) throws Exception {
+ throw new UnsupportedOperationException("Inlong group is not exists");
+ }
+
+ @Override
+ public InlongGroupInfo snapshot() throws Exception {
+ throw new UnsupportedOperationException("Inlong group is not exists");
+ }
+
+ @Override
+ public InlongGroupInfo init() throws Exception {
+ throw new UnsupportedOperationException("Inlong group is not exists");
+ }
+
+ @Override
+ public InlongGroupInfo suspend() throws Exception {
+ throw new UnsupportedOperationException("Inlong group is not exists");
+ }
+
+ @Override
+ public InlongGroupInfo restart() throws Exception {
+ throw new UnsupportedOperationException("Inlong group is not exists");
+ }
+
+ @Override
+ public InlongGroupInfo delete() throws Exception {
+ throw new UnsupportedOperationException("Inlong group is not exists");
+ }
+
+ @Override
+ public List<InlongStream> listStreams() throws Exception {
+ throw new UnsupportedOperationException("Inlong group is not exists");
+ }
+}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
index bcd1819..e491c44 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
@@ -27,15 +27,15 @@ import org.apache.inlong.manager.client.api.InlongStreamBuilder;
import org.apache.inlong.manager.client.api.InlongStreamConf;
import org.apache.inlong.manager.client.api.StreamField;
import org.apache.inlong.manager.client.api.StreamSink;
-import org.apache.inlong.manager.client.api.StreamSink.SinkType;
import org.apache.inlong.manager.client.api.StreamSource;
-import org.apache.inlong.manager.client.api.StreamSource.SourceType;
import org.apache.inlong.manager.client.api.inner.InnerGroupContext;
import org.apache.inlong.manager.client.api.inner.InnerInlongManagerClient;
import org.apache.inlong.manager.client.api.inner.InnerStreamContext;
import org.apache.inlong.manager.client.api.util.GsonUtil;
import org.apache.inlong.manager.client.api.util.InlongStreamSourceTransfer;
import org.apache.inlong.manager.client.api.util.InlongStreamTransfer;
+import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
@@ -137,7 +137,7 @@ public class DefaultInlongStreamBuilder extends InlongStreamBuilder {
private int initOrUpdateSource(SourceRequest sourceRequest) {
String sourceType = sourceRequest.getSourceType();
- if (SourceType.KAFKA.name().equals(sourceType) || SourceType.BINLOG.name().equals(sourceType)) {
+ if (SourceType.KAFKA.name().equals(sourceType) || SourceType.DB_BINLOG.name().equals(sourceType)) {
List<SourceListResponse> responses = managerClient.listSources(sourceRequest.getInlongGroupId(),
sourceRequest.getInlongStreamId(), sourceRequest.getSourceType());
if (CollectionUtils.isEmpty(responses)) {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
index 59f1f62..75a619b 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
@@ -19,18 +19,23 @@ package org.apache.inlong.manager.client.api.impl;
import com.github.pagehelper.PageInfo;
import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
+import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.inlong.manager.client.api.ClientConfiguration;
import org.apache.inlong.manager.client.api.InlongClient;
import org.apache.inlong.manager.client.api.InlongGroup;
import org.apache.inlong.manager.client.api.InlongGroupConf;
import org.apache.inlong.manager.client.api.inner.InnerInlongManagerClient;
+import org.apache.inlong.manager.client.api.util.InlongGroupTransfer;
import org.apache.inlong.manager.common.pojo.group.InlongGroupListResponse;
import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
@@ -74,16 +79,33 @@ public class InlongClientImpl implements InlongClient {
}
@Override
- public PageInfo<InlongGroupListResponse> listGroup(String keyword, int status,
+ public List<InlongGroup> listGroup(String expr, int status,
int pageNum, int pageSize) throws Exception {
InnerInlongManagerClient managerClient = new InnerInlongManagerClient(this);
- return managerClient.listGroupInfo(keyword, status, pageNum, pageSize);
+ PageInfo<InlongGroupListResponse> responsePageInfo = managerClient.listGroupInfo(expr, status, pageNum,
+ pageSize);
+ if (CollectionUtils.isEmpty(responsePageInfo.getList())) {
+ return Lists.newArrayList();
+ } else {
+ return responsePageInfo.getList().stream().map(response -> {
+ String groupId = response.getInlongGroupId();
+ InlongGroupRequest request = managerClient.getGroupInfo(groupId);
+ InlongGroupConf groupConf = InlongGroupTransfer.parseGroupRequest(request);
+ return new InlongGroupImpl(groupConf, this);
+ }).collect(Collectors.toList());
+ }
}
@Override
- public InlongGroupRequest getGroup(String groupId) throws Exception {
+ public InlongGroup getGroup(String groupName) throws Exception {
InnerInlongManagerClient managerClient = new InnerInlongManagerClient(this);
- return managerClient.getGroupInfo(groupId);
+ final String groupId = "b_" + groupName;
+ InlongGroupRequest groupRequest = managerClient.getGroupInfo(groupId);
+ if (groupRequest == null) {
+ return new BlankInlongGroup();
+ }
+ InlongGroupConf groupConf = InlongGroupTransfer.parseGroupRequest(groupRequest);
+ return new InlongGroupImpl(groupConf, this);
}
private boolean checkConnectivity(String host, int port, int connectTimeout) {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
index dba26be..0c4624d 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.client.api.impl;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
@@ -43,6 +44,7 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
+import org.apache.inlong.manager.common.pojo.workflow.EventLogView;
import org.apache.inlong.manager.common.pojo.workflow.ProcessResponse;
import org.apache.inlong.manager.common.pojo.workflow.TaskResponse;
import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
@@ -79,6 +81,11 @@ public class InlongGroupImpl implements InlongGroup {
}
@Override
+ public InlongGroupInfo snapshot() throws Exception {
+ return generateSnapshot(groupContext.getGroupRequest());
+ }
+
+ @Override
public InlongGroupInfo init() throws Exception {
WorkflowResult initWorkflowResult = managerClient.initInlongGroup(this.groupContext.getGroupRequest());
List<TaskResponse> taskViews = initWorkflowResult.getNewTasks();
@@ -147,7 +154,12 @@ public class InlongGroupImpl implements InlongGroup {
String inlongGroupId = currentBizInfo.getInlongGroupId();
List<InlongStream> dataStreams = fetchDataStreams(inlongGroupId);
dataStreams.stream().forEach(dataStream -> groupContext.setStream(dataStream));
- return new InlongGroupInfo(groupContext, groupConf);
+ InlongGroupInfo groupInfo = new InlongGroupInfo(groupContext, groupConf);
+ List<EventLogView> logViews = managerClient.getInlongGroupError(inlongGroupId);
+ Map<String, String> errMsgs = logViews.stream().collect(
+ Collectors.toMap(EventLogView::getEvent, EventLogView::getException));
+ groupInfo.setErrMsg(errMsgs);
+ return groupInfo;
}
private List<InlongStream> fetchDataStreams(String groupId) {
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/HiveSink.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/HiveSink.java
index df1667c..f93adfb 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/HiveSink.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/HiveSink.java
@@ -32,6 +32,7 @@ import org.apache.inlong.manager.client.api.DataSeparator;
import org.apache.inlong.manager.client.api.StreamField;
import org.apache.inlong.manager.client.api.StreamSink;
import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
+import org.apache.inlong.manager.common.enums.SinkType;
@Data
@EqualsAndHashCode(callSuper = true)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/KafkaSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/KafkaSource.java
index f447f9d..45b3249 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/KafkaSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/KafkaSource.java
@@ -24,6 +24,7 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.inlong.manager.client.api.DataFormat;
import org.apache.inlong.manager.client.api.StreamSource;
+import org.apache.inlong.manager.common.enums.SourceType;
@Data
@AllArgsConstructor
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
index be68273..0b3743a 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLBinlogSource.java
@@ -25,6 +25,7 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.inlong.manager.client.api.DataFormat;
import org.apache.inlong.manager.client.api.StreamSource;
+import org.apache.inlong.manager.common.enums.SourceType;
@Data
@AllArgsConstructor
@@ -33,7 +34,7 @@ import org.apache.inlong.manager.client.api.StreamSource;
public class MySQLBinlogSource extends StreamSource {
@ApiModelProperty(value = "DataSource type", required = true)
- private SourceType sourceType = SourceType.BINLOG;
+ private SourceType sourceType = SourceType.DB_BINLOG;
@ApiModelProperty("SyncType for MySQL")
private SyncType syncType;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLSource.java
index 7395293..2846154 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLSource.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/source/MySQLSource.java
@@ -24,6 +24,7 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.inlong.manager.client.api.DataFormat;
import org.apache.inlong.manager.client.api.StreamSource;
+import org.apache.inlong.manager.common.enums.SourceType;
@Data
@AllArgsConstructor
@@ -32,7 +33,7 @@ import org.apache.inlong.manager.client.api.StreamSource;
public class MySQLSource extends StreamSource {
@ApiModelProperty(value = "DataSource type", required = true)
- private SourceType sourceType = SourceType.DB;
+ private SourceType sourceType = SourceType.DB_SQL;
@ApiModelProperty("SyncType for MySQL")
private SyncType syncType;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
index b614030..2e8a29c 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
@@ -18,6 +18,11 @@
package org.apache.inlong.manager.client.api.util;
import com.google.common.collect.Lists;
+import com.google.gson.reflect.TypeToken;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.client.api.FlinkSortBaseConf;
@@ -34,16 +39,143 @@ import org.apache.inlong.manager.client.api.auth.Authentication.AuthType;
import org.apache.inlong.manager.client.api.auth.SecretTokenAuthentication;
import org.apache.inlong.manager.client.api.auth.TokenAuthentication;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupMqExtBase;
import org.apache.inlong.manager.common.pojo.group.InlongGroupPulsarInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.inlong.manager.common.util.JsonUtils;
-import java.util.ArrayList;
-import java.util.List;
-
public class InlongGroupTransfer {
+ public static InlongGroupConf parseGroupRequest(InlongGroupRequest groupRequest) {
+ InlongGroupConf inlongGroupConf = new InlongGroupConf();
+ inlongGroupConf.setGroupName(groupRequest.getName());
+ inlongGroupConf.setDescription(groupRequest.getDescription());
+ inlongGroupConf.setCnName(groupRequest.getCnName());
+ inlongGroupConf.setZookeeperEnabled(groupRequest.getZookeeperEnabled() == 1);
+ inlongGroupConf.setDailyRecords(Long.valueOf(groupRequest.getDailyRecords()));
+ inlongGroupConf.setPeakRecords(Long.valueOf(groupRequest.getPeakRecords()));
+ inlongGroupConf.setMqBaseConf(parseMqBaseConf(groupRequest));
+ inlongGroupConf.setSortBaseConf(parseSortBaseConf(groupRequest));
+ return inlongGroupConf;
+ }
+
+ public static MqBaseConf parseMqBaseConf(InlongGroupRequest inlongGroupRequest) {
+ InlongGroupMqExtBase mqExtBase = inlongGroupRequest.getMqExtInfo();
+ String middleWare = mqExtBase.getMiddlewareType();
+ MqType mqType = MqType.forType(middleWare);
+ switch (mqType) {
+ case NONE:
+ return MqBaseConf.BLANK_MQ_CONF;
+ case PULSAR:
+ return parsePulsarConf(inlongGroupRequest);
+ case TUBE:
+ return parseTubeConf(inlongGroupRequest);
+ default:
+ throw new RuntimeException(String.format("Illegal mqType=%s for Inlong", mqType));
+ }
+ }
+
+ public static SortBaseConf parseSortBaseConf(InlongGroupRequest groupRequest) {
+ List<InlongGroupExtInfo> groupExtInfos = groupRequest.getExtList();
+ if (CollectionUtils.isEmpty(groupExtInfos)) {
+ return null;
+ }
+ String type = null;
+ for (InlongGroupExtInfo extInfo : groupExtInfos) {
+ if (extInfo.getKeyName().equals(InlongGroupSettings.SORT_TYPE)) {
+ type = extInfo.getKeyValue();
+ break;
+ }
+ }
+ if (type == null) {
+ return null;
+ }
+ SortType sortType = SortType.forType(type);
+ switch (sortType) {
+ case FLINK:
+ return parseFlinkSortConf(groupExtInfos);
+ case USER_DEFINED:
+ return parseUdf(groupExtInfos);
+ default:
+ throw new IllegalArgumentException(String.format("Unsupport sort type=%s for Inlong", sortType));
+ }
+ }
+
+ private static FlinkSortBaseConf parseFlinkSortConf(List<InlongGroupExtInfo> groupExtInfos) {
+ FlinkSortBaseConf sortBaseConf = new FlinkSortBaseConf();
+ for (InlongGroupExtInfo extInfo : groupExtInfos) {
+ if (extInfo.getKeyName().equals(InlongGroupSettings.SORT_URL)) {
+ sortBaseConf.setServiceUrl(extInfo.getKeyValue());
+ }
+ if (extInfo.getKeyName().equals(InlongGroupSettings.SORT_PROPERTIES)) {
+ Map<String, String> properties = GsonUtil.fromJson(extInfo.getKeyValue(),
+ new TypeToken<Map<String, String>>() {
+ }.getType());
+ sortBaseConf.setProperties(properties);
+ }
+ }
+ return sortBaseConf;
+ }
+
+ private static UserDefinedSortConf parseUdf(List<InlongGroupExtInfo> groupExtInfos) {
+ UserDefinedSortConf sortConf = new UserDefinedSortConf();
+ for (InlongGroupExtInfo extInfo : groupExtInfos) {
+ if (extInfo.getKeyName().equals(InlongGroupSettings.SORT_NAME)) {
+ sortConf.setSortName(extInfo.getKeyValue());
+ }
+ if (extInfo.getKeyName().equals(InlongGroupSettings.SORT_PROPERTIES)) {
+ Map<String, String> properties = GsonUtil.fromJson(extInfo.getKeyValue(),
+ new TypeToken<Map<String, String>>() {
+ }.getType());
+ sortConf.setProperties(properties);
+ }
+ }
+ return sortConf;
+ }
+
+ private static PulsarBaseConf parsePulsarConf(InlongGroupRequest groupRequest) {
+ PulsarBaseConf pulsarBaseConf = new PulsarBaseConf();
+ pulsarBaseConf.setNamespace(groupRequest.getMqResourceObj());
+ InlongGroupPulsarInfo inlongGroupPulsarInfo = (InlongGroupPulsarInfo) groupRequest.getMqExtInfo();
+ pulsarBaseConf.setAckQuorum(inlongGroupPulsarInfo.getAckQuorum());
+ pulsarBaseConf.setWriteQuorum(inlongGroupPulsarInfo.getWriteQuorum());
+ pulsarBaseConf.setEnsemble(inlongGroupPulsarInfo.getEnsemble());
+ pulsarBaseConf.setTtl(inlongGroupPulsarInfo.getTtl());
+ pulsarBaseConf.setRetentionTime(inlongGroupPulsarInfo.getRetentionTime());
+ pulsarBaseConf.setRetentionSize(inlongGroupPulsarInfo.getRetentionSize());
+ pulsarBaseConf.setRetentionSizeUnit(inlongGroupPulsarInfo.getRetentionSizeUnit());
+ pulsarBaseConf.setRetentionTimeUnit(inlongGroupPulsarInfo.getRetentionTimeUnit());
+ List<InlongGroupExtInfo> groupExtInfos = groupRequest.getExtList();
+ for (InlongGroupExtInfo extInfo : groupExtInfos) {
+ if (extInfo.getKeyName().equals(InlongGroupSettings.PULSAR_ADMIN_URL)) {
+ pulsarBaseConf.setPulsarAdminUrl(extInfo.getKeyValue());
+ }
+ if (extInfo.getKeyName().equals(InlongGroupSettings.PULSAR_SERVICE_URL)) {
+ pulsarBaseConf.setPulsarServiceUrl(extInfo.getKeyValue());
+ }
+ }
+ return pulsarBaseConf;
+ }
+
+ private static TubeBaseConf parseTubeConf(InlongGroupRequest groupRequest) {
+ TubeBaseConf tubeBaseConf = new TubeBaseConf();
+ tubeBaseConf.setGroupName(groupRequest.getMqResourceObj());
+ List<InlongGroupExtInfo> groupExtInfos = groupRequest.getExtList();
+ for (InlongGroupExtInfo extInfo : groupExtInfos) {
+ if (extInfo.getKeyName().equals(InlongGroupSettings.TUBE_CLUSTER_ID)) {
+ tubeBaseConf.setTubeClusterId(Integer.parseInt(extInfo.getKeyValue()));
+ }
+ if (extInfo.getKeyName().equals(InlongGroupSettings.TUBE_MANAGER_URL)) {
+ tubeBaseConf.setTubeManagerUrl(extInfo.getKeyValue());
+ }
+ if (extInfo.getKeyName().equals(InlongGroupSettings.TUBE_MASTER_URL)) {
+ tubeBaseConf.setTubeMasterUrl(extInfo.getKeyValue());
+ }
+ }
+ return tubeBaseConf;
+ }
+
public static InlongGroupRequest createGroupInfo(InlongGroupConf groupConf) {
InlongGroupRequest groupInfo = new InlongGroupRequest();
AssertUtil.hasLength(groupConf.getGroupName(), "GroupName should not be empty");
@@ -165,7 +297,7 @@ public class InlongGroupTransfer {
List<InlongGroupExtInfo> extInfos = new ArrayList<>();
InlongGroupExtInfo sortType = new InlongGroupExtInfo();
sortType.setKeyName(InlongGroupSettings.SORT_TYPE);
- sortType.setKeyValue(InlongGroupSettings.DEFAULT_SORT_TYPE);
+ sortType.setKeyValue(SortType.FLINK.getType());
extInfos.add(sortType);
if (flinkSortBaseConf.getAuthentication() != null) {
Authentication authentication = flinkSortBaseConf.getAuthentication();
@@ -201,8 +333,12 @@ public class InlongGroupTransfer {
List<InlongGroupExtInfo> extInfos = new ArrayList<>();
InlongGroupExtInfo sortType = new InlongGroupExtInfo();
sortType.setKeyName(InlongGroupSettings.SORT_TYPE);
- sortType.setKeyValue(userDefinedSortConf.getSortName());
+ sortType.setKeyValue(SortType.USER_DEFINED.getType());
extInfos.add(sortType);
+ InlongGroupExtInfo sortName = new InlongGroupExtInfo();
+ sortName.setKeyName(InlongGroupSettings.SORT_NAME);
+ sortName.setKeyValue(userDefinedSortConf.getSortName());
+ extInfos.add(sortName);
if (MapUtils.isNotEmpty(userDefinedSortConf.getProperties())) {
InlongGroupExtInfo flinkProperties = new InlongGroupExtInfo();
flinkProperties.setKeyName(InlongGroupSettings.SORT_PROPERTIES);
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
index 273464c..0aacb72 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSourceTransfer.java
@@ -22,10 +22,10 @@ import com.google.common.base.Splitter;
import java.util.List;
import org.apache.inlong.manager.client.api.DataFormat;
import org.apache.inlong.manager.client.api.StreamSource;
-import org.apache.inlong.manager.client.api.StreamSource.SourceType;
import org.apache.inlong.manager.client.api.StreamSource.SyncType;
import org.apache.inlong.manager.client.api.source.KafkaSource;
import org.apache.inlong.manager.client.api.source.MySQLBinlogSource;
+import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceListResponse;
@@ -41,7 +41,7 @@ public class InlongStreamSourceTransfer {
switch (sourceType) {
case KAFKA:
return createKafkaSourceRequest((KafkaSource) streamSource, streamInfo);
- case BINLOG:
+ case DB_BINLOG:
return createBinlogSourceRequest((MySQLBinlogSource) streamSource, streamInfo);
default:
throw new RuntimeException(String.format("Unsupport source=%s for Inlong", sourceType));
@@ -51,7 +51,7 @@ public class InlongStreamSourceTransfer {
public static StreamSource parseStreamSource(SourceListResponse sourceListResponse) {
String type = sourceListResponse.getSourceType();
SourceType sourceType = SourceType.forType(type);
- if (sourceType == SourceType.BINLOG) {
+ if (sourceType == SourceType.DB_BINLOG) {
return parseKafkaSource((KafkaSourceListResponse) sourceListResponse);
} else if (sourceType == SourceType.KAFKA) {
return parseMySQLBinlogSource((BinlogSourceListResponse) sourceListResponse);
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
index e3f4234..d8924c0 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransfer.java
@@ -20,13 +20,13 @@ package org.apache.inlong.manager.client.api.util;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.client.api.sink.HiveSink;
import org.apache.inlong.manager.client.api.InlongStreamConf;
import org.apache.inlong.manager.client.api.StreamField;
import org.apache.inlong.manager.client.api.StreamField.FieldType;
import org.apache.inlong.manager.client.api.StreamSink;
-import org.apache.inlong.manager.client.api.StreamSink.SinkType;
import org.apache.inlong.manager.client.api.auth.DefaultAuthentication;
+import org.apache.inlong.manager.client.api.sink.HiveSink;
+import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
import org.apache.inlong.manager.common.pojo.sink.SinkFieldRequest;
import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
index 1b4f0b4..5468154 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceType.java
@@ -18,18 +18,25 @@
package org.apache.inlong.manager.common.enums;
import java.util.Locale;
+import lombok.Getter;
+import org.apache.inlong.common.enums.TaskTypeEnum;
public enum SourceType {
- FILE("FILE"),
- DB_SQL("DB_SQL"),
- DB_BINLOG("DB_BINLOG"),
- KAFKA("KAFKA");
+ FILE("FILE", TaskTypeEnum.FILE),
+ DB_SQL("DB_SQL", TaskTypeEnum.SQL),
+ DB_BINLOG("DB_BINLOG", TaskTypeEnum.BINLOG),
+ KAFKA("KAFKA", TaskTypeEnum.KAFKA);
+ @Getter
private final String type;
- SourceType(String type) {
+ @Getter
+ private final TaskTypeEnum taskType;
+
+ SourceType(String type, TaskTypeEnum taskType) {
this.type = type;
+ this.taskType = taskType;
}
/**
@@ -37,17 +44,13 @@ public enum SourceType {
*/
public static SourceType forType(String sourceType) {
for (SourceType type : values()) {
- if (type.name().equals(sourceType)) {
+ if (type.getType().equals(sourceType)) {
return type;
}
}
throw new IllegalArgumentException(String.format("Illegal sink type for %s", sourceType));
}
- public String forType() {
- return this.type;
- }
-
@Override
public String toString() {
return this.name().toUpperCase(Locale.ROOT);
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkPageRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkPageRequest.java
index e0895d9..ac368e4 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkPageRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sink/SinkPageRequest.java
@@ -40,8 +40,7 @@ public class SinkPageRequest extends PageRequest {
@ApiModelProperty(value = "Inlong stream id")
private String inlongStreamId;
- @ApiModelProperty(value = "Sink type, such as HIVE", required = true)
- @NotNull
+ @ApiModelProperty(value = "Sink type, such as HIVE")
private String sinkType;
@ApiModelProperty(value = "Key word")
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourcePageRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourcePageRequest.java
index 5d8a5f5..6db1b42 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourcePageRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/source/SourcePageRequest.java
@@ -40,8 +40,7 @@ public class SourcePageRequest extends PageRequest {
@ApiModelProperty(value = "Inlong stream id")
private String inlongStreamId;
- @NotNull
- @ApiModelProperty(value = "Source type, such as FILE", required = true)
+ @ApiModelProperty(value = "Source type, such as FILE")
private String sourceType;
@ApiModelProperty(value = "Key word")
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/settings/InlongGroupSettings.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/settings/InlongGroupSettings.java
index dd51641..9e0d988 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/settings/InlongGroupSettings.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/settings/InlongGroupSettings.java
@@ -76,6 +76,8 @@ public class InlongGroupSettings {
public static String DEFAULT_SORT_TYPE = "flink";
+ public static String SORT_NAME = "sort.name";
+
public static String SORT_URL = "sort.url";
public static String SORT_AUTHENTICATION = "sort.authentication";
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index eff8215..0631499 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -17,15 +17,13 @@
package org.apache.inlong.manager.dao.mapper;
+import java.util.List;
import org.apache.ibatis.annotations.Param;
-import org.apache.inlong.common.pojo.agent.DataConfig;
import org.apache.inlong.common.pojo.agent.TaskRequest;
import org.apache.inlong.manager.common.pojo.source.SourcePageRequest;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.springframework.stereotype.Repository;
-import java.util.List;
-
@Repository
public interface StreamSourceEntityMapper {
@@ -87,6 +85,6 @@ public interface StreamSourceEntityMapper {
int deleteByPrimaryKey(Integer id);
- List<DataConfig> selectAgentTaskDataConfig(TaskRequest taskRequest);
+ List<StreamSourceEntity> selectAgentTaskDataConfig(TaskRequest taskRequest);
}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
index 3dc5433..a40f669 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
@@ -71,7 +71,9 @@
from stream_sink
<where>
is_deleted = 0
- and sink_type = #{request.sinkType, jdbcType=VARCHAR}
+ <if test="request.sinkType != null and request.sinkType != ''">
+ and sink_type = #{request.sinkType, jdbcType=VARCHAR}
+ </if>
and inlong_group_id = #{request.inlongGroupId, jdbcType=VARCHAR}
<if test="request.inlongStreamId != null and request.inlongStreamId != ''">
and inlong_stream_id = #{request.inlongStreamId, jdbcType=VARCHAR}
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index c7cbd26..317d3f9 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -232,7 +232,9 @@
from stream_source
<where>
is_deleted = 0
- and source_type = #{request.sourceType, jdbcType=VARCHAR}
+ <if test="request.sourceType != null and request.sourceType != ''">
+ and source_type = #{request.sourceType, jdbcType=VARCHAR}
+ </if>
and inlong_group_id = #{request.inlongGroupId, jdbcType=VARCHAR}
<if test="request.inlongStreamId != null and request.inlongStreamId != ''">
and inlong_stream_id = #{request.inlongStreamId, jdbcType=VARCHAR}
@@ -404,7 +406,7 @@
detail.modify_time,
detail.uuid,
detail.agent_ip,
- detail.status mod 100 as op,
+ detail.status,
detail.id,
detail.source_type,
detail.snapshot,
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentTaskServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentTaskServiceImpl.java
index 7746edc..02740a3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentTaskServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentTaskServiceImpl.java
@@ -33,6 +33,7 @@ import org.apache.inlong.common.pojo.agent.TaskRequest;
import org.apache.inlong.common.pojo.agent.TaskResult;
import org.apache.inlong.manager.common.enums.EntityStatus;
import org.apache.inlong.manager.common.enums.FileAgentDataGenerateRule;
+import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.pojo.agent.AgentStatusReportRequest;
import org.apache.inlong.manager.common.pojo.agent.CheckAgentTaskConfRequest;
import org.apache.inlong.manager.common.pojo.agent.ConfirmAgentIpRequest;
@@ -44,6 +45,7 @@ import org.apache.inlong.manager.common.pojo.agent.FileAgentTaskInfo;
import org.apache.inlong.manager.dao.entity.DataSourceCmdConfigEntity;
import org.apache.inlong.manager.dao.entity.InlongStreamFieldEntity;
import org.apache.inlong.manager.dao.entity.SourceFileDetailEntity;
+import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.mapper.DataSourceCmdConfigEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamFieldEntityMapper;
import org.apache.inlong.manager.dao.mapper.SourceFileDetailEntityMapper;
@@ -89,7 +91,21 @@ public class AgentTaskServiceImpl implements AgentTaskService {
}
private List<DataConfig> getAgentDataConfigs(TaskRequest taskRequest) {
- List<DataConfig> dataConfigs = streamSourceMapper.selectAgentTaskDataConfig(taskRequest);
+ List<StreamSourceEntity> sourceEntities = streamSourceMapper.selectAgentTaskDataConfig(taskRequest);
+ List<DataConfig> dataConfigs = sourceEntities.stream().map(sourceEntity -> {
+ DataConfig dataConfig = new DataConfig();
+ dataConfig.setOp(String.valueOf(sourceEntity.getStatus() % 100));
+ dataConfig.setJobId(sourceEntity.getId());
+ SourceType sourceType = SourceType.forType(sourceEntity.getSourceType());
+ dataConfig.setTaskType(sourceType.getTaskType().getType());
+ dataConfig.setInlongGroupId(sourceEntity.getInlongGroupId());
+ dataConfig.setInlongStreamId(sourceEntity.getInlongStreamId());
+ dataConfig.setIp(sourceEntity.getAgentIp());
+ dataConfig.setUuid(sourceEntity.getUuid());
+ dataConfig.setExtParams(sourceEntity.getExtParams());
+ dataConfig.setSnapshot(sourceEntity.getSnapshot());
+ return dataConfig;
+ }).collect(Collectors.toList());
//Forward Compatible File task type
return dataConfigs;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 5f8f2bf..b345460 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.sink;
import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
+import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -34,6 +35,10 @@ import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
import org.apache.inlong.manager.common.pojo.sink.SinkPageRequest;
import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
+import org.apache.inlong.manager.common.pojo.sink.ck.ClickHouseSinkListResponse;
+import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkListResponse;
+import org.apache.inlong.manager.common.pojo.sink.iceberg.IcebergSinkListResponse;
+import org.apache.inlong.manager.common.pojo.sink.kafka.KafkaSinkListResponse;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
@@ -171,16 +176,42 @@ public class StreamSinkServiceImpl implements StreamSinkService {
LOGGER.debug("begin to list sink page by " + request);
}
Preconditions.checkNotNull(request.getInlongGroupId(), Constant.GROUP_ID_IS_EMPTY);
- String sinkType = request.getSinkType();
- Preconditions.checkNotNull(sinkType, Constant.SINK_TYPE_IS_EMPTY);
PageHelper.startPage(request.getPageNum(), request.getPageSize());
Page<StreamSinkEntity> entityPage = (Page<StreamSinkEntity>) sinkMapper.selectByCondition(request);
+ List<SinkListResponse> sinkListResponses = Lists.newArrayList();
+ for (StreamSinkEntity entity : entityPage) {
+ SinkType sinkType = SinkType.forType(entity.getSinkType());
+ StreamSinkOperation operation = operationFactory.getInstance(sinkType);
+ switch (sinkType) {
+ case HIVE:
+ HiveSinkListResponse hiveSinkListResponse = operation.getFromEntity(entity,
+ HiveSinkListResponse::new);
+ sinkListResponses.add(hiveSinkListResponse);
+ break;
+ case CLICKHOUSE:
+ ClickHouseSinkListResponse clickHouseSinkListResponse = operation.getFromEntity(entity,
+ ClickHouseSinkListResponse::new);
+ sinkListResponses.add(clickHouseSinkListResponse);
+ break;
+ case ICEBERG:
+ IcebergSinkListResponse icebergSinkListResponse = operation.getFromEntity(entity,
+ IcebergSinkListResponse::new);
+ sinkListResponses.add(icebergSinkListResponse);
+ break;
+ case KAFKA:
+ KafkaSinkListResponse kafkaSinkListResponse = operation.getFromEntity(entity,
+ KafkaSinkListResponse::new);
+ sinkListResponses.add(kafkaSinkListResponse);
+ break;
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unsupported sinkType=%s for Inlong", sinkType));
+ }
+ }
// Encapsulate the paging query results into the PageInfo object to obtain related paging information
- StreamSinkOperation operation = operationFactory.getInstance(SinkType.forType(sinkType));
- PageInfo<? extends SinkListResponse> pageInfo = operation.getPageInfo(entityPage);
- pageInfo.setTotal(entityPage.getTotal());
+ PageInfo<? extends SinkListResponse> pageInfo = PageInfo.of(sinkListResponses);
LOGGER.debug("success to list sink page");
return pageInfo;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 4979f18..07b7721 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.source;
import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
+import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
@@ -35,6 +36,8 @@ import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
import org.apache.inlong.manager.common.pojo.source.SourcePageRequest;
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
import org.apache.inlong.manager.common.pojo.source.SourceResponse;
+import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceListResponse;
+import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceListResponse;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
@@ -127,17 +130,32 @@ public class StreamSourceServiceImpl implements StreamSourceService {
LOGGER.debug("begin to list source page by " + request);
}
Preconditions.checkNotNull(request.getInlongGroupId(), Constant.GROUP_ID_IS_EMPTY);
- String sourceType = request.getSourceType();
- Preconditions.checkNotNull(sourceType, Constant.SOURCE_TYPE_IS_EMPTY);
PageHelper.startPage(request.getPageNum(), request.getPageSize());
Page<StreamSourceEntity> entityPage = (Page<StreamSourceEntity>) sourceMapper.selectByCondition(request);
// Encapsulate the paging query results into the PageInfo object to obtain related paging information
- StreamSourceOperation operation = operationFactory.getInstance(SourceType.forType(sourceType));
- PageInfo<? extends SourceListResponse> pageInfo = operation.getPageInfo(entityPage);
- pageInfo.setTotal(entityPage.getTotal());
-
+ List<SourceListResponse> responses = Lists.newArrayList();
+ for (StreamSourceEntity entity : entityPage) {
+ SourceType sourceType = SourceType.forType(entity.getSourceType());
+ StreamSourceOperation operation = operationFactory.getInstance(sourceType);
+ switch (sourceType) {
+ case DB_BINLOG:
+ BinlogSourceListResponse binlogSourceListResponse = operation.getFromEntity(entity,
+ BinlogSourceListResponse::new);
+ responses.add(binlogSourceListResponse);
+ break;
+ case KAFKA:
+ KafkaSourceListResponse kafkaSourceListResponse = operation.getFromEntity(entity,
+ KafkaSourceListResponse::new);
+ responses.add(kafkaSourceListResponse);
+ break;
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unsupported sourceType=%s for Inlong", sourceType));
+ }
+ }
+ PageInfo<? extends SourceListResponse> pageInfo = PageInfo.of(responses);
LOGGER.debug("success to list source page");
return pageInfo;
}