You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/07 09:59:02 UTC
[incubator-inlong] branch master updated: [INLONG-2971][Manager] Support stream log collecting in manager client (#2975)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new c17bc54 [INLONG-2971][Manager] Support stream log collecting in manager client (#2975)
c17bc54 is described below
commit c17bc546b2c6d8c7c1110a4baa6d13d6c8ead699
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Mon Mar 7 17:58:56 2022 +0800
[INLONG-2971][Manager] Support stream log collecting in manager client (#2975)
---
.../manager/client/api/InlongGroupContext.java | 13 ++++++---
.../manager/client/api/impl/InlongGroupImpl.java | 32 +++++++++++++++++++---
.../client/api/inner/InnerInlongManagerClient.java | 32 ++++++++++++++++++++++
.../manager/client/api/util/InlongParser.java | 9 ++++++
.../dao/mapper/StreamConfigLogEntityMapper.java | 2 ++
.../manager/service/core/impl/AbstractService.java | 6 +++-
6 files changed, 85 insertions(+), 9 deletions(-)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
index 370e082..2d2865d 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
@@ -40,14 +40,19 @@ public class InlongGroupContext implements Serializable {
private Map<String, InlongStream> inlongStreamMap;
/**
- * Extension configuration for Inlong group
+ * Extension configuration for Inlong group.
*/
private Map<String, String> extensions;
/**
- * Error message for Inlong group, key: taskName, value: message
+ * Error message for Inlong group, taskName->logs.
*/
- private Map<String, String> errMsg;
+ private Map<String, List<String>> errMsgs;
+
+ /**
+ * Logs for each stream, key: streamName, value: componentName->log
+ */
+ private Map<String, Map<String, List<String>>> streamLogs = Maps.newHashMap();
private InlongGroupState state;
@@ -58,7 +63,7 @@ public class InlongGroupContext implements Serializable {
this.groupName = groupInfo.getName();
this.groupConf = streamGroupConf;
this.inlongStreamMap = groupContext.getStreamMap();
- this.errMsg = Maps.newHashMap();
+ this.errMsgs = Maps.newHashMap();
this.state = InlongGroupState.parseByBizStatus(groupInfo.getStatus());
this.extensions = Maps.newHashMap();
List<InlongGroupExtInfo> extInfos = groupInfo.getExtList();
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
index c0cbc73..f5d5db3 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
@@ -17,10 +17,13 @@
package org.apache.inlong.manager.client.api.impl;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.inlong.manager.client.api.InlongGroup;
import org.apache.inlong.manager.client.api.InlongGroupConf;
@@ -43,6 +46,7 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
import org.apache.inlong.manager.common.pojo.group.InlongGroupResponse;
import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogListResponse;
import org.apache.inlong.manager.common.pojo.workflow.EventLogView;
import org.apache.inlong.manager.common.pojo.workflow.ProcessResponse;
import org.apache.inlong.manager.common.pojo.workflow.TaskResponse;
@@ -198,11 +202,31 @@ public class InlongGroupImpl implements InlongGroup {
InlongGroupContext inlongGroupContext = new InlongGroupContext(groupContext, groupConf);
List<EventLogView> logViews = managerClient.getInlongGroupError(inlongGroupId);
if (CollectionUtils.isNotEmpty(logViews)) {
- Map<String, String> errMsgs = logViews.stream()
- .filter(x -> null != x.getEvent() && null != x.getException())
- .collect(Collectors.toMap(EventLogView::getEvent, EventLogView::getException));
- inlongGroupContext.setErrMsg(errMsgs);
+ Map<String, List<String>> errMsgs = Maps.newHashMap();
+ logViews.stream()
+ .filter(x -> null != x.getElementName() && null != x.getException())
+ .forEach(eventLogView -> {
+ String taskName = eventLogView.getElementName();
+ errMsgs.computeIfAbsent(taskName, Lists::newArrayList).add(eventLogView.getException());
+ });
+ inlongGroupContext.setErrMsgs(errMsgs);
}
+ Map<String, InlongStream> streams = inlongGroupContext.getInlongStreamMap();
+ streams.keySet().stream().forEach(streamName -> {
+ String inlongStreamId = "b_" + streamName;
+ List<InlongStreamConfigLogListResponse> logList = managerClient.getStreamLogs(inlongGroupId,
+ inlongStreamId);
+ if (CollectionUtils.isNotEmpty(logList)) {
+ Map<String, List<String>> streamLogs = Maps.newHashMap();
+ logList.stream().filter(x -> StringUtils.isNotEmpty(x.getComponentName()))
+ .forEach(streamLog -> {
+ String componentName = streamLog.getComponentName();
+ String log = GsonUtil.toJson(streamLog);
+ streamLogs.computeIfAbsent(componentName, Lists::newArrayList).add(log);
+ });
+ inlongGroupContext.getStreamLogs().put(streamName, streamLogs);
+ }
+ });
return inlongGroupContext;
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
index a934b59..5d4ce4a 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
@@ -48,6 +48,7 @@ import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogListResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.workflow.EventLogView;
import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
@@ -658,6 +659,37 @@ public class InnerInlongManagerClient {
}
}
+ /**
+ * get inlong group error messages
+ *
+ * @param inlongGroupId
+ * @return
+ */
+ public List<InlongStreamConfigLogListResponse> getStreamLogs(String inlongGroupId, String inlongStreamId) {
+ final String path = HTTP_PATH + "/stream/config/log/list";
+ String url = formatUrl(path);
+ url = url + "&inlongGroupId=" + inlongGroupId + "&inlongStreamId=" + inlongStreamId;
+ Request request = new Request.Builder()
+ .get()
+ .url(url)
+ .build();
+
+ Call call = httpClient.newCall(request);
+ try {
+ Response response = call.execute();
+ assert response.body() != null;
+ String body = response.body().string();
+ AssertUtil.isTrue(response.isSuccessful(), String.format("Inlong request failed: %s", body));
+ org.apache.inlong.manager.common.beans.Response responseBody = InlongParser.parseResponse(body);
+ AssertUtil.isTrue(responseBody.getErrMsg() == null,
+ String.format("Inlong request failed: %s", responseBody.getErrMsg()));
+ PageInfo<InlongStreamConfigLogListResponse> pageInfo = InlongParser.parseStreamLogList(responseBody);
+ return pageInfo.getList();
+ } catch (Exception e) {
+ throw new RuntimeException(String.format("Get inlong stream log failed: %s", e.getMessage()), e);
+ }
+ }
+
private String formatUrl(String path) {
return String.format("http://%s:%s/%s?username=%s&password=%s", host, port, path, uname, passwd);
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
index 28c57c8..e1ec229 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
@@ -49,6 +49,7 @@ import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceListRespons
import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceResponse;
import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogListResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.workflow.EventLogView;
import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
@@ -244,4 +245,12 @@ public class InlongParser {
}.getType());
}
+ public static PageInfo<InlongStreamConfigLogListResponse> parseStreamLogList(Response response) {
+ Object data = response.getData();
+ String pageInfoJson = GsonUtil.toJson(data);
+ return GsonUtil.fromJson(pageInfoJson,
+ new TypeToken<PageInfo<InlongStreamConfigLogListResponse>>() {
+ }.getType());
+ }
+
}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamConfigLogEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamConfigLogEntityMapper.java
index 66b83f0..93118d5 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamConfigLogEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamConfigLogEntityMapper.java
@@ -20,7 +20,9 @@ package org.apache.inlong.manager.dao.mapper;
import java.util.List;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogPageRequest;
import org.apache.inlong.manager.dao.entity.StreamConfigLogEntity;
+import org.springframework.stereotype.Repository;
+@Repository
public interface StreamConfigLogEntityMapper {
int insertOrUpdateAll(List<StreamConfigLogEntity> records);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AbstractService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AbstractService.java
index 468fba2..3b819f8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AbstractService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AbstractService.java
@@ -58,6 +58,7 @@ public abstract class AbstractService<T> implements AutoCloseable, InitializingB
/**
* batch insert entities
+ *
* @param entryList entryList
* @return boolean true/false
*/
@@ -65,6 +66,7 @@ public abstract class AbstractService<T> implements AutoCloseable, InitializingB
/**
* put Data
+ *
* @param data data
* @return boolean true/false
*/
@@ -80,6 +82,7 @@ public abstract class AbstractService<T> implements AutoCloseable, InitializingB
isClose = true;
}
+ @Override
public void afterPropertiesSet() {
dataQueue = new LinkedBlockingQueue(queueSize);
pool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
@@ -91,7 +94,8 @@ public abstract class AbstractService<T> implements AutoCloseable, InitializingB
}
class Task implements Runnable {
- public void run() {
+ @Override
+ public void run() {
while (!isClose) {
try {
List<T> entryList = new ArrayList<>();