You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/07 09:59:02 UTC

[incubator-inlong] branch master updated: [INLONG-2971][Manager] Support stream log collecting in manager client (#2975)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new c17bc54  [INLONG-2971][Manager] Support stream log collecting in manager client (#2975)
c17bc54 is described below

commit c17bc546b2c6d8c7c1110a4baa6d13d6c8ead699
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Mon Mar 7 17:58:56 2022 +0800

    [INLONG-2971][Manager] Support stream log collecting in manager client (#2975)
---
 .../manager/client/api/InlongGroupContext.java     | 13 ++++++---
 .../manager/client/api/impl/InlongGroupImpl.java   | 32 +++++++++++++++++++---
 .../client/api/inner/InnerInlongManagerClient.java | 32 ++++++++++++++++++++++
 .../manager/client/api/util/InlongParser.java      |  9 ++++++
 .../dao/mapper/StreamConfigLogEntityMapper.java    |  2 ++
 .../manager/service/core/impl/AbstractService.java |  6 +++-
 6 files changed, 85 insertions(+), 9 deletions(-)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
index 370e082..2d2865d 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
@@ -40,14 +40,19 @@ public class InlongGroupContext implements Serializable {
     private Map<String, InlongStream> inlongStreamMap;
 
     /**
-     * Extension configuration for Inlong group
+     * Extension configuration for Inlong group.
      */
     private Map<String, String> extensions;
 
     /**
-     * Error message for Inlong group, key: taskName,  value: message
+     * Error message for Inlong group, taskName->logs.
      */
-    private Map<String, String> errMsg;
+    private Map<String, List<String>> errMsgs;
+
+    /**
+     * Logs for each stream, key: streamName, value: componentName->log
+     */
+    private Map<String, Map<String, List<String>>> streamLogs = Maps.newHashMap();
 
     private InlongGroupState state;
 
@@ -58,7 +63,7 @@ public class InlongGroupContext implements Serializable {
         this.groupName = groupInfo.getName();
         this.groupConf = streamGroupConf;
         this.inlongStreamMap = groupContext.getStreamMap();
-        this.errMsg = Maps.newHashMap();
+        this.errMsgs = Maps.newHashMap();
         this.state = InlongGroupState.parseByBizStatus(groupInfo.getStatus());
         this.extensions = Maps.newHashMap();
         List<InlongGroupExtInfo> extInfos = groupInfo.getExtList();
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
index c0cbc73..f5d5db3 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
@@ -17,10 +17,13 @@
 
 package org.apache.inlong.manager.client.api.impl;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.inlong.manager.client.api.InlongGroup;
 import org.apache.inlong.manager.client.api.InlongGroupConf;
@@ -43,6 +46,7 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupResponse;
 import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogListResponse;
 import org.apache.inlong.manager.common.pojo.workflow.EventLogView;
 import org.apache.inlong.manager.common.pojo.workflow.ProcessResponse;
 import org.apache.inlong.manager.common.pojo.workflow.TaskResponse;
@@ -198,11 +202,31 @@ public class InlongGroupImpl implements InlongGroup {
         InlongGroupContext inlongGroupContext = new InlongGroupContext(groupContext, groupConf);
         List<EventLogView> logViews = managerClient.getInlongGroupError(inlongGroupId);
         if (CollectionUtils.isNotEmpty(logViews)) {
-            Map<String, String> errMsgs = logViews.stream()
-                    .filter(x -> null != x.getEvent() && null != x.getException())
-                    .collect(Collectors.toMap(EventLogView::getEvent, EventLogView::getException));
-            inlongGroupContext.setErrMsg(errMsgs);
+            Map<String, List<String>> errMsgs = Maps.newHashMap();
+            logViews.stream()
+                    .filter(x -> null != x.getElementName() && null != x.getException())
+                    .forEach(eventLogView -> {
+                        String taskName = eventLogView.getElementName();
+                        errMsgs.computeIfAbsent(taskName, Lists::newArrayList).add(eventLogView.getException());
+                    });
+            inlongGroupContext.setErrMsgs(errMsgs);
         }
+        Map<String, InlongStream> streams = inlongGroupContext.getInlongStreamMap();
+        streams.keySet().stream().forEach(streamName -> {
+            String inlongStreamId = "b_" + streamName;
+            List<InlongStreamConfigLogListResponse> logList = managerClient.getStreamLogs(inlongGroupId,
+                    inlongStreamId);
+            if (CollectionUtils.isNotEmpty(logList)) {
+                Map<String, List<String>> streamLogs = Maps.newHashMap();
+                logList.stream().filter(x -> StringUtils.isNotEmpty(x.getComponentName()))
+                        .forEach(streamLog -> {
+                            String componentName = streamLog.getComponentName();
+                            String log = GsonUtil.toJson(streamLog);
+                            streamLogs.computeIfAbsent(componentName, Lists::newArrayList).add(log);
+                        });
+                inlongGroupContext.getStreamLogs().put(streamName, streamLogs);
+            }
+        });
         return inlongGroupContext;
     }
 
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
index a934b59..5d4ce4a 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
@@ -48,6 +48,7 @@ import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
 import org.apache.inlong.manager.common.pojo.source.SourceRequest;
 import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogListResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.workflow.EventLogView;
 import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
@@ -658,6 +659,37 @@ public class InnerInlongManagerClient {
         }
     }
 
+    /**
+     * get inlong group error messages
+     *
+     * @param inlongGroupId
+     * @return
+     */
+    public List<InlongStreamConfigLogListResponse> getStreamLogs(String inlongGroupId, String inlongStreamId) {
+        final String path = HTTP_PATH + "/stream/config/log/list";
+        String url = formatUrl(path);
+        url = url + "&inlongGroupId=" + inlongGroupId + "&inlongStreamId=" + inlongStreamId;
+        Request request = new Request.Builder()
+                .get()
+                .url(url)
+                .build();
+
+        Call call = httpClient.newCall(request);
+        try {
+            Response response = call.execute();
+            assert response.body() != null;
+            String body = response.body().string();
+            AssertUtil.isTrue(response.isSuccessful(), String.format("Inlong request failed: %s", body));
+            org.apache.inlong.manager.common.beans.Response responseBody = InlongParser.parseResponse(body);
+            AssertUtil.isTrue(responseBody.getErrMsg() == null,
+                    String.format("Inlong request failed: %s", responseBody.getErrMsg()));
+            PageInfo<InlongStreamConfigLogListResponse> pageInfo = InlongParser.parseStreamLogList(responseBody);
+            return pageInfo.getList();
+        } catch (Exception e) {
+            throw new RuntimeException(String.format("Get inlong stream log failed: %s", e.getMessage()), e);
+        }
+    }
+
     private String formatUrl(String path) {
         return String.format("http://%s:%s/%s?username=%s&password=%s", host, port, path, uname, passwd);
     }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
index 28c57c8..e1ec229 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
@@ -49,6 +49,7 @@ import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceListRespons
 import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSourceResponse;
 import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogListResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.workflow.EventLogView;
 import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
@@ -244,4 +245,12 @@ public class InlongParser {
                 }.getType());
     }
 
+    public static PageInfo<InlongStreamConfigLogListResponse> parseStreamLogList(Response response) {
+        Object data = response.getData();
+        String pageInfoJson = GsonUtil.toJson(data);
+        return GsonUtil.fromJson(pageInfoJson,
+                new TypeToken<PageInfo<InlongStreamConfigLogListResponse>>() {
+                }.getType());
+    }
+
 }
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamConfigLogEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamConfigLogEntityMapper.java
index 66b83f0..93118d5 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamConfigLogEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamConfigLogEntityMapper.java
@@ -20,7 +20,9 @@ package org.apache.inlong.manager.dao.mapper;
 import java.util.List;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogPageRequest;
 import org.apache.inlong.manager.dao.entity.StreamConfigLogEntity;
+import org.springframework.stereotype.Repository;
 
+@Repository
 public interface StreamConfigLogEntityMapper {
 
     int insertOrUpdateAll(List<StreamConfigLogEntity> records);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AbstractService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AbstractService.java
index 468fba2..3b819f8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AbstractService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AbstractService.java
@@ -58,6 +58,7 @@ public abstract class AbstractService<T> implements AutoCloseable, InitializingB
 
     /**
      * batch insert entities
+     *
      * @param entryList entryList
      * @return boolean true/false
      */
@@ -65,6 +66,7 @@ public abstract class AbstractService<T> implements AutoCloseable, InitializingB
 
     /**
      * put Data
+     *
      * @param data data
      * @return boolean true/false
      */
@@ -80,6 +82,7 @@ public abstract class AbstractService<T> implements AutoCloseable, InitializingB
         isClose = true;
     }
 
+    @Override
     public void afterPropertiesSet() {
         dataQueue = new LinkedBlockingQueue(queueSize);
         pool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
@@ -91,7 +94,8 @@ public abstract class AbstractService<T> implements AutoCloseable, InitializingB
     }
 
     class Task implements Runnable {
-        public  void run() {
+        @Override
+        public void run() {
             while (!isClose) {
                 try {
                     List<T> entryList = new ArrayList<>();