You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ji...@apache.org on 2022/04/22 08:46:58 UTC

[incubator-doris-manager] branch master updated: [feature] instance state report (#53)

This is an automated email from the ASF dual-hosted git repository.

jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris-manager.git


The following commit(s) were added to refs/heads/master by this push:
     new 254cef5  [feature] instance state report (#53)
254cef5 is described below

commit 254cef55861545ff1d1ad853540a607ad7ed1348
Author: LiRui <11...@qq.com>
AuthorDate: Fri Apr 22 16:46:54 2022 +0800

    [feature] instance state report (#53)
    
    instance state report
---
 .../exceptions/InstanceNotInstallException.java    |  32 ++++
 .../exceptions/InstanceNotRunningException.java    |  32 ++++
 .../agent/exceptions/InstanceServiceException.java |  29 ++++
 .../manager/agent/service/HeartBeatService.java    | 170 ++++++++++++++++++---
 .../service/heartbeat/DorisInstanceOperator.java   | 118 ++++++++++++--
 .../service/heartbeat/InstanceEventHandler.java    |  12 +-
 .../apache/doris/manager/agent/util/Request.java   |  43 +++---
 .../src/main/resources/application.properties      |   3 +
 manager/dm-common/pom.xml                          |   6 +
 .../manager/common/heartbeat/HeartBeatContext.java |  32 ++++
 .../manager/common/heartbeat/HeartBeatResult.java  |  32 ++++
 .../manager/common/heartbeat/InstanceInfo.java     |  40 +++++
 .../common/heartbeat/InstanceStateResult.java      |  53 +++++++
 .../doris/manager/common/util/ConfigDefault.java   |   1 +
 .../common/util/ServerAndAgentConstant.java        |   2 +
 .../control/manager/DorisClusterModuleManager.java |   8 +-
 .../control/manager/ResourceClusterManager.java    |   4 +-
 .../manager/ResourceNodeAndAgentManager.java       |  32 +++-
 .../control/ResourceClusterNodeController.java     |  31 ++--
 .../control/ResourceClusterNodeService.java        | 165 ++++++++++++++++++++
 20 files changed, 775 insertions(+), 70 deletions(-)

diff --git a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/exceptions/InstanceNotInstallException.java b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/exceptions/InstanceNotInstallException.java
new file mode 100644
index 0000000..cc3eee8
--- /dev/null
+++ b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/exceptions/InstanceNotInstallException.java
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.manager.agent.exceptions;
+
+public class InstanceNotInstallException extends Exception {
+
+    private final String moduleName;
+    private final String installDir;
+
+    public InstanceNotInstallException(String moduleName, String installDir) {
+        super(String.format("instance %s is not installed at %s", moduleName, installDir));
+
+        this.moduleName = moduleName;
+        this.installDir = installDir;
+    }
+}
+
diff --git a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/exceptions/InstanceNotRunningException.java b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/exceptions/InstanceNotRunningException.java
new file mode 100644
index 0000000..7561fe6
--- /dev/null
+++ b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/exceptions/InstanceNotRunningException.java
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.manager.agent.exceptions;
+
+public class InstanceNotRunningException extends Exception {
+
+    private final String moduleName;
+    private final String installDir;
+
+    public InstanceNotRunningException(String moduleName, String installDir) {
+        super(String.format("instance %s is not running at %s", moduleName, installDir));
+
+        this.moduleName = moduleName;
+        this.installDir = installDir;
+    }
+}
+
diff --git a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/exceptions/InstanceServiceException.java b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/exceptions/InstanceServiceException.java
new file mode 100644
index 0000000..35e5cc8
--- /dev/null
+++ b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/exceptions/InstanceServiceException.java
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.manager.agent.exceptions;
+
+public class InstanceServiceException extends Exception {
+
+    private final String moduleName;
+
+    public InstanceServiceException(String moduleName) {
+        super(String.format("instance %s service exception", moduleName));
+        this.moduleName = moduleName;
+    }
+}
+
diff --git a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/HeartBeatService.java b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/HeartBeatService.java
index f590ea1..e869a8e 100644
--- a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/HeartBeatService.java
+++ b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/HeartBeatService.java
@@ -18,17 +18,29 @@
 package org.apache.doris.manager.agent.service;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.doris.manager.agent.exceptions.InstanceNotInstallException;
+import org.apache.doris.manager.agent.exceptions.InstanceNotRunningException;
+import org.apache.doris.manager.agent.exceptions.InstanceServiceException;
+import org.apache.doris.manager.agent.service.heartbeat.DorisInstanceOperator;
 import org.apache.doris.manager.agent.service.heartbeat.HeartbeatEventHandler;
 import org.apache.doris.manager.agent.util.Request;
+import org.apache.doris.manager.common.heartbeat.HeartBeatContext;
 import org.apache.doris.manager.common.heartbeat.HeartBeatEventInfo;
 import org.apache.doris.manager.common.heartbeat.HeartBeatEventResult;
+import org.apache.doris.manager.common.heartbeat.HeartBeatEventResultType;
+import org.apache.doris.manager.common.heartbeat.HeartBeatResult;
+import org.apache.doris.manager.common.heartbeat.InstanceInfo;
+import org.apache.doris.manager.common.heartbeat.InstanceStateResult;
+import org.apache.doris.stack.control.ModelControlState;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.core.env.Environment;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Service;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
 
 @Service
 @Slf4j
@@ -36,9 +48,19 @@ public class HeartBeatService {
     @Autowired
     private HeartbeatEventHandler heartbeatEventHandler;
 
+    @Autowired
+    private DorisInstanceOperator instanceOpera;
+
     @Autowired
     private Environment environment;
 
+    private ConcurrentHashMap<Long, HeartBeatEventInfo> events = new ConcurrentHashMap<>();  // event id
+    private ConcurrentHashMap<Long, InstanceInfo>  instanceInfos = new ConcurrentHashMap<>(); // instance id
+
+    // cache event result in case of http post result failure
+    // some event is not reentrant
+    private ConcurrentHashMap<Long, HeartBeatEventResult> cacheResults = new ConcurrentHashMap<>();
+
     // When the agent starts, it needs to complete the registration before it can handle other heartbeats
     private String agentNodeId = "";
 
@@ -46,38 +68,148 @@ public class HeartBeatService {
 
     private String heartBeatUrl = "";
 
-    // TODO:Reserved for subsequent active reporting of instance status
-//    private Set<InstanceInfo> instances = new HashSet<>();
-
-    // TODO: To be improved
-    // TODO: Currently, the heartbeat implemented here is only responsible for obtaining the events to be executed
-    //  from the server, and does not report the instance status controlled by the current agent
-    // TODO:Execute once when the agent process starts?
-    // Send heartbeat every 5 seconds, get heartbeat event list
-//    @PostConstruct
-    @Scheduled(cron = "0/5 * * * * ?")
-    public void heartBeat() {
+    @Scheduled(cron = "0/${agent.heartbeat.interval:5} * * * * ?")
+    public void handleHeartBeatContextLoop() {
         if (agentNodeId.isEmpty() || serverEndpoint.isEmpty()) {
             agentNodeId = environment.getProperty("agent.node.id");
             serverEndpoint = environment.getProperty("manager.server.endpoint");
-            heartBeatUrl = "http://" + serverEndpoint + "/api/control/node/" + agentNodeId + "/agent/heartbeat";
+            heartBeatUrl = "http://" + serverEndpoint + "/api/control/node/" + agentNodeId + "/agent/context";
         }
-        // TODO :Process according to the returned heartbeat event results
-        // TODO:If there is an event and it is processed, the result is sent
+
         log.info("agent node is " + agentNodeId);
         log.info("heartBeatUrl is " + heartBeatUrl);
 
-        List<HeartBeatEventInfo> eventInfos = Request.getHeartBeatEventInfo(heartBeatUrl);
+        HeartBeatContext ctx = Request.getHeartBeatContext(heartBeatUrl);
+
+        // duplicate task
+        HeartBeatContext newCtx = addAndFilterContextTask(ctx);
+
+        Thread contextTask = new Thread(() -> {
+            HeartBeatResult res = executeContextTask(newCtx);
+
+            cacheResults.clear();
+            try {
+                String dealRes = Request.sendHeartBeatContextResult(heartBeatUrl, res);
+                log.info("server return context deal result: {}", dealRes);
+            } catch (IOException e) {
+                log.warn("send heartbeat context result error: {}", e.getMessage());
+                res.getEventResults().forEach((eventRes) -> {
+                    if (eventRes.getResultType() != HeartBeatEventResultType.FAIL) {
+                        log.info("cache event {} result, event type {}, stage {}", eventRes.getEventId(),
+                                eventRes.getEventType(), eventRes.getEventStage());
+                        cacheResults.put(eventRes.getEventId(), eventRes);
+                    }
+                });
+            }
+
+            // clear completed tasks
+            newCtx.getEvents().forEach((e) -> {
+                log.info("remove finished [event {}] task", e.getEventId());
+                events.remove(e.getEventId());
+            });
+            newCtx.getInstanceInfos().forEach((ins) -> {
+                instanceInfos.remove(ins.getInstanceId());
+            });
+        });
+
+        contextTask.start();
+    }
+
+    private HeartBeatResult executeContextTask(HeartBeatContext ctx) {
+        List<HeartBeatEventResult> eventResults = new ArrayList<>();
+        List<InstanceStateResult> insStateResults = new ArrayList<>();
+
+        //TODO find from cache before
+
+        for (HeartBeatEventInfo eventInfo : ctx.getEvents()) {
+            log.info("handle event {}: resource:{} type:{} stage:{}", eventInfo.getEventId(),
+                    eventInfo.getResourceType(), eventInfo.getEventStage(), eventInfo.getEventStage());
+
+            // get result from cache if it has been executed
+            long eventId = eventInfo.getEventId();
+            if (cacheResults.containsKey(eventId)) {
+                HeartBeatEventResult cr = cacheResults.get(eventId);
+                log.info("result is in cache, event {}, stage {}, result type {}", cr.getEventId(),
+                        cr.getEventStage(), cr.getResultType());
+                if (cr.getResultType() == HeartBeatEventResultType.PROCESSING
+                        && eventInfo.getEventStage() < cr.getEventStage()) {
+                    log.info("return result from result cache");
+                    eventResults.add(cr);
+                    continue;
+                } else if (cr.getResultType() == HeartBeatEventResultType.SUCCESS
+                        && cr.getEventStage() == cr.getEventStage()) {
+                    log.info("return result form result cache");
+                    eventResults.add(cr);
+                    continue;
+                }
+            }
 
-        List<HeartBeatEventResult> results = new ArrayList<>();
-        for (HeartBeatEventInfo eventInfo : eventInfos) {
             HeartBeatEventResult result = heartbeatEventHandler.handHeartBeatEvent(eventInfo);
             if (result != null) {
-                results.add(result);
+                eventResults.add(result);
+            }
+        }
+
+        for (InstanceInfo instanceInfo : ctx.getInstanceInfos()) {
+            log.info("check module {} instance {} state", instanceInfo.getModuleName(), instanceInfo.getInstanceId());
+            InstanceStateResult stateResult = new InstanceStateResult(instanceInfo);
+            try {
+                instanceOpera.checkInstanceProcessState(instanceInfo.getModuleName(), instanceInfo.getInstallDir(),
+                        instanceInfo.getHttpPort());
+
+                stateResult.setState(ModelControlState.RUNNING);
+            } catch (InstanceNotInstallException e) {
+                log.error("{} instance check exception {}", instanceInfo.getModuleName(), e.getMessage());
+                // maybe instance has noe be installed
+                stateResult.setState(ModelControlState.INIT);
+                stateResult.setErrMsg(e.getMessage());
+            } catch (InstanceNotRunningException | InstanceServiceException e) {
+                log.error("{} instance check exception {}", instanceInfo.getModuleName(), e.getMessage());
+                stateResult.setState(ModelControlState.STOPPED);
+                stateResult.setErrMsg(e.getMessage());
             }
+
+            insStateResults.add(stateResult);
         }
 
-        Request.sendHeartBeatEventResult(heartBeatUrl, results);
+        HeartBeatResult res = new HeartBeatResult();
+        res.setEventResults(eventResults);
+        res.setStateResults(insStateResults);
+
+        return res;
     }
 
+    private HeartBeatContext addAndFilterContextTask(HeartBeatContext ctx) {
+        HeartBeatContext filterCtx = new HeartBeatContext();
+        List<HeartBeatEventInfo> newEvents = new ArrayList<>();
+        List<InstanceInfo> newInsInfos = new ArrayList<>();
+
+        if (ctx.getEvents() != null) {
+            for (HeartBeatEventInfo eventInfo : ctx.getEvents()) {
+                if (events.containsKey(eventInfo.getEventId())) {
+                    log.warn("heartbeat event {} is running", eventInfo.getEventId());
+                    continue;
+                }
+                log.info("add event {}", eventInfo.getEventId());
+                events.put(eventInfo.getEventId(), eventInfo);
+                newEvents.add(eventInfo);
+            }
+        }
+
+        if (ctx.getInstanceInfos() != null) {
+            for (InstanceInfo ins : ctx.getInstanceInfos()) {
+                if (instanceInfos.containsKey(ins.getInstanceId())) {
+                    log.warn("module {} instance {} check task is running", ins.getModuleName(), ins.getInstanceId());
+                    continue;
+                }
+                log.info("add {} instance {} check task", ins.getModuleName(), ins.getInstanceId());
+                instanceInfos.put(ins.getInstanceId(), ins);
+                newInsInfos.add(ins);
+            }
+        }
+
+        filterCtx.setEvents(newEvents);
+        filterCtx.setInstanceInfos(newInsInfos);
+        return filterCtx;
+    }
 }
diff --git a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/heartbeat/DorisInstanceOperator.java b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/heartbeat/DorisInstanceOperator.java
index 621049e..ecfb40c 100644
--- a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/heartbeat/DorisInstanceOperator.java
+++ b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/heartbeat/DorisInstanceOperator.java
@@ -17,10 +17,15 @@
 
 package org.apache.doris.manager.agent.service.heartbeat;
 
+import com.alibaba.fastjson.JSONObject;
 import com.google.common.base.Strings;
 import com.google.common.collect.Maps;
 import com.google.common.io.Files;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.doris.manager.agent.exceptions.InstanceNotInstallException;
+import org.apache.doris.manager.agent.exceptions.InstanceNotRunningException;
+import org.apache.doris.manager.agent.exceptions.InstanceServiceException;
+import org.apache.doris.manager.agent.util.Request;
 import org.apache.doris.manager.agent.util.ShellUtil;
 import org.apache.doris.manager.common.util.ConfigDefault;
 import org.apache.doris.manager.common.util.ServerAndAgentConstant;
@@ -30,8 +35,10 @@ import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.net.URISyntaxException;
 import java.nio.charset.Charset;
 import java.nio.file.Paths;
+import java.util.HashMap;
 import java.util.Map;
 
 @Slf4j
@@ -143,7 +150,7 @@ public class DorisInstanceOperator {
 
                 executePkgShellScriptWithBash(startScript, installInfo, moudleName, Maps.newHashMap());
             } else {
-                log.info("{} instance is running", moudleName);
+                log.info("instance {} is running", moudleName);
             }
             log.info("start {} instance success", moudleName);
             return true;
@@ -192,21 +199,108 @@ public class DorisInstanceOperator {
     }
 
     // Check whether the instance has been installed and started
-    public boolean checkInstanceDeploy(String moudleName, String installInfo) {
+    public void checkInstanceProcessState(String moduleName, String installDir, int httpPort)
+            throws InstanceNotInstallException, InstanceNotRunningException, InstanceServiceException {
+
+        if (moduleName.equals(ServerAndAgentConstant.BROKER_NAME)) {
+            moduleName = getBrokerInstallationPath(installDir);
+        }
+
+        // install dir check
+        log.info("to check module {} instance process state in {}", moduleName, installDir);
+        File moduleRoot = Paths.get(installDir, moduleName).toFile();
+        if (!moduleRoot.exists()) {
+            log.error("instance {} not installed, can not find root path: {}", moduleName, moduleRoot);
+            throw new InstanceNotInstallException(moduleName, installDir);
+        }
+
+        // process state check
+        // dont consider multiple be is deployed at one a machine node
         try {
-            if (moudleName.equals(ServerAndAgentConstant.BROKER_NAME)) {
-                moudleName = getBrokerInstallationPath(installInfo);
-            }
-            int bePid = processIsRunning(moudleName, installInfo);
-            if (bePid < 0) {
-                return false;
-            } else {
-                return true;
+            int insPid = processIsRunning(moduleName, installDir);
+            if (insPid < 0) {
+                log.error("instance {} is not running", moduleName);
+                throw new InstanceNotRunningException(moduleName, moduleRoot.getAbsolutePath());
             }
         } catch (Exception e) {
-            log.error("Check " + moudleName + " instance running error {}.", e);
-            return false;
+            log.error("check instance {} process state error: {}", moduleName, e.getMessage());
+            throw new InstanceNotRunningException(moduleName, moduleRoot.getAbsolutePath());
+        }
+
+        if (httpPort <= 0) {
+            log.warn("invalid http port {}, skip http service check", httpPort);
+            return;
         }
+
+        // fe/be http service check
+        String statusURL;
+        if (moduleName.equals(ServerAndAgentConstant.FE_NAME)) {
+            statusURL = "http://localhost:" + httpPort + "/api/bootstrap";
+        } else if (moduleName.equals(ServerAndAgentConstant.BE_NAME)) {
+            statusURL = "http://localhost:" + httpPort + "/api/health";
+        } else {
+            // can not check other module by http
+            log.error("unknown module name {}", moduleName);
+            return;
+        }
+
+        /*
+         * Before Palo 3.10, the FE api/bootstrap API return like this:
+         * {
+         *    "replayedJournalId": 0,
+         *    "queryPort": 0,
+         *    "rpcPort": 0,
+         *    "status": "OK",
+         *    "msg": "Success"
+         * }
+         *
+         * From 3.10, the FE api/bootstrap API return like this:
+         * {
+         *  "msg": "success",
+         *  "code": 0,
+         *  "data": {"replayedJournalId": 0, "queryPort": 0, "rpcPort": 0},
+         *  "count": 0
+         * }
+         *
+         * FE api/health return like this
+         * {
+         * "msg": "success",
+         * "code": 0,
+         * "data": {"online_backend_num": 2, "total_backend_num": 2},
+         * "count": 0
+         * }
+         *
+         * BE api/health return lik this
+         * {"status": "OK","msg": "To Be Added"}
+         *
+         */
+
+        String stateRes;
+        try {
+            stateRes = Request.sendGetRequest(statusURL, new HashMap<>());
+        } catch (URISyntaxException e) {
+            log.error("{} syntax error {}", statusURL, e.getMessage());
+            return;
+        } catch (IOException e) {
+            throw new InstanceServiceException(moduleName);
+        }
+        log.info("http health return: {}", stateRes);
+        // or status == ok
+        JSONObject stateJson =  JSONObject.parseObject(stateRes);
+        String status = stateJson.getString("status");
+        if (status != null && status.equals("OK")) {
+            log.info("module {} instance service is normal, return status OK", moduleName);
+            return;
+        }
+
+        // or code == 0
+        Integer code = stateJson.getInteger("code");
+        if (code != null && code == 0) {
+            log.info("modele {} instance service is normal return code 0", moduleName);
+            return;
+        }
+
+        throw new InstanceServiceException(moduleName);
     }
 
     /*
diff --git a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/heartbeat/InstanceEventHandler.java b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/heartbeat/InstanceEventHandler.java
index a7829c4..e86bc15 100644
--- a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/heartbeat/InstanceEventHandler.java
+++ b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/heartbeat/InstanceEventHandler.java
@@ -59,7 +59,17 @@ public class InstanceEventHandler {
         InstanceDeployCheckEventConfigInfo configInfo = JSON.parseObject(jsonConfigStr,
                 InstanceDeployCheckEventConfigInfo.class);
 
-        boolean isDeploy = instanceOperator.checkInstanceDeploy(configInfo.getModuleName(), configInfo.getInstallInfo());
+        boolean isDeploy = true;
+        try {
+            // here we do not check http service is ready or not
+            // because it is ready after a few seconds of the process starts
+            // which may cause `Doris Cluster Creation Request` failed and blocked
+            instanceOperator.checkInstanceProcessState(configInfo.getModuleName(),
+                    configInfo.getInstallInfo(), 0);
+        } catch (Exception e) {
+            log.error("check instance {} deploy error: {}", configInfo.getModuleName(), e.getMessage());
+            isDeploy = false;
+        }
 
         HeartBeatEventResult result = new HeartBeatEventResult(eventInfo);
         // There is only one step
diff --git a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/util/Request.java b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/util/Request.java
index c5d6ccc..9f6c95a 100644
--- a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/util/Request.java
+++ b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/util/Request.java
@@ -19,8 +19,8 @@ package org.apache.doris.manager.agent.util;
 
 import com.alibaba.fastjson.JSON;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.doris.manager.common.heartbeat.HeartBeatEventInfo;
-import org.apache.doris.manager.common.heartbeat.HeartBeatEventResult;
+import org.apache.doris.manager.common.heartbeat.HeartBeatContext;
+import org.apache.doris.manager.common.heartbeat.HeartBeatResult;
 import org.apache.http.client.config.RequestConfig;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpGet;
@@ -35,29 +35,34 @@ import org.apache.http.util.EntityUtils;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 @Slf4j
 public class Request {
+    public static HeartBeatContext getHeartBeatContext(String requestUrl) {
+        String ctx = null;
 
-    public static List<HeartBeatEventInfo> getHeartBeatEventInfo(String requestUrl) {
-        String getHeartBeatEventResults = sendGetRequest(requestUrl, new HashMap<>());
-        log.info("getHeartBeatEventResults:" + getHeartBeatEventResults);
-//        ManagerServerResponse response = JSON.parseObject(getHeartBeatEventResults, ManagerServerResponse.class);
-        if (getHeartBeatEventResults == null) {
-            return new ArrayList<>();
+        try {
+            ctx = sendGetRequest(requestUrl, new HashMap<>());
+            log.info("getHeartBeatContextResults:" + ctx);
+        } catch (Exception e) {
+            log.error("get heartbeat context error {}", e.getMessage());
+        }
+
+        if (ctx == null) {
+            log.warn("no context return");
+            return new HeartBeatContext();
         }
-        return JSON.parseArray(getHeartBeatEventResults, HeartBeatEventInfo.class);
+        return JSON.parseObject(ctx, HeartBeatContext.class);
     }
 
-    public static void sendHeartBeatEventResult(String requestUrl, List<HeartBeatEventResult> results) {
-       sendPostRequest(requestUrl, JSON.toJSONString(results));
+    public static String sendHeartBeatContextResult(String requestUrl, HeartBeatResult res) throws IOException {
+        log.info("send heart beat context result {} to {}", JSON.toJSONString(res), requestUrl);
+        return sendPostRequest(requestUrl, JSON.toJSONString(res));
     }
 
-    public static String sendPostRequest(String requestUrl, String bodyJson) {
+    public static String sendPostRequest(String requestUrl, String bodyJson) throws IOException {
         HttpPost httpPost = new HttpPost(requestUrl);
         httpPost.setConfig(timeout());
 
@@ -68,7 +73,7 @@ public class Request {
             return request(httpPost);
         } catch (IOException e) {
             log.error("request url error:{},param:{}", requestUrl, bodyJson, e);
-            throw new RuntimeException(e);
+            throw e;
         }
     }
 
@@ -87,7 +92,8 @@ public class Request {
         }
     }
 
-    public static String sendGetRequest(String requestUrl, Map<String, Object> params) {
+    public static String sendGetRequest(String requestUrl, Map<String, Object> params)
+            throws URISyntaxException, IOException {
         URI url = null;
         try {
             URIBuilder uriBuilder = null;
@@ -97,7 +103,8 @@ public class Request {
             }
             url = uriBuilder.build();
         } catch (URISyntaxException e) {
-            e.printStackTrace();
+            log.error("{} syntax error {}", requestUrl, e.getMessage());
+            throw e;
         }
 
         HttpGet httpGet = new HttpGet(url);
@@ -107,7 +114,7 @@ public class Request {
             return request(httpGet);
         } catch (IOException e) {
             log.error("request url error:{},param:{}", requestUrl, params, e);
-            throw new RuntimeException(e);
+            throw e;
         }
     }
 
diff --git a/manager/dm-agent/src/main/resources/application.properties b/manager/dm-agent/src/main/resources/application.properties
index 4a73966..0327076 100644
--- a/manager/dm-agent/src/main/resources/application.properties
+++ b/manager/dm-agent/src/main/resources/application.properties
@@ -21,3 +21,6 @@ server.port=8001
 manager.server.endpoint=
 # manager agent unique identification
 agent.node.id=
+
+# heart beat interval(s)
+agent.heartbeat.interval=5
diff --git a/manager/dm-common/pom.xml b/manager/dm-common/pom.xml
index 44cf13e..06c7265 100644
--- a/manager/dm-common/pom.xml
+++ b/manager/dm-common/pom.xml
@@ -27,6 +27,12 @@ under the License.
     <modelVersion>4.0.0</modelVersion>
 
     <artifactId>dm-common</artifactId>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.doris</groupId>
+            <artifactId>resource-common</artifactId>
+        </dependency>
+    </dependencies>
 
 
 </project>
\ No newline at end of file
diff --git a/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/HeartBeatContext.java b/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/HeartBeatContext.java
new file mode 100644
index 0000000..3556f34
--- /dev/null
+++ b/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/HeartBeatContext.java
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.manager.common.heartbeat;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class HeartBeatContext {
+    List<HeartBeatEventInfo> events;
+    List<InstanceInfo> instanceInfos;
+}
diff --git a/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/HeartBeatResult.java b/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/HeartBeatResult.java
new file mode 100644
index 0000000..d70a12a
--- /dev/null
+++ b/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/HeartBeatResult.java
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.manager.common.heartbeat;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class HeartBeatResult {
+    List<HeartBeatEventResult> eventResults;
+    List<InstanceStateResult> stateResults;
+}
diff --git a/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/InstanceInfo.java b/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/InstanceInfo.java
new file mode 100644
index 0000000..96b16d0
--- /dev/null
+++ b/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/InstanceInfo.java
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.manager.common.heartbeat;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class InstanceInfo {
+    private long agentNodeId;
+
+    private long moduleId;
+
+    private long instanceId;
+
+    private String moduleName;
+
+    private String installDir;
+
+    //only available when moduleName = fe/be
+    private int httpPort;
+}
diff --git a/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/InstanceStateResult.java b/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/InstanceStateResult.java
new file mode 100644
index 0000000..31a4e47
--- /dev/null
+++ b/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/InstanceStateResult.java
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.manager.common.heartbeat;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.doris.stack.control.ModelControlState;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class InstanceStateResult {
+    private long agentNodeId;
+
+    private long moduleId;
+
+    private long instanceId;
+
+    private String moduleName;
+
+    private String installDir;
+
+    ModelControlState state;
+
+    // is valid only when state is RUNNING
+    String errMsg;
+
+    public InstanceStateResult(InstanceInfo info) {
+        this.agentNodeId = info.getAgentNodeId();
+        this.moduleId = info.getModuleId();
+        this.instanceId = info.getInstanceId();
+        this.moduleName = info.getModuleName();
+        this.installDir = info.getInstallDir();
+
+        this.state = ModelControlState.UNKNOWN;
+    }
+}
diff --git a/manager/dm-common/src/main/java/org/apache/doris/manager/common/util/ConfigDefault.java b/manager/dm-common/src/main/java/org/apache/doris/manager/common/util/ConfigDefault.java
index 619bc0c..db6e7fa 100644
--- a/manager/dm-common/src/main/java/org/apache/doris/manager/common/util/ConfigDefault.java
+++ b/manager/dm-common/src/main/java/org/apache/doris/manager/common/util/ConfigDefault.java
@@ -37,6 +37,7 @@ public class ConfigDefault {
     public static final String FE_EDIT_LOG_PORT = "edit_log_port";
 
     public static final String BE_HEARTBEAT_PORT_CONFIG_NAME = "heartbeat_service_port";
+    public static final String BE_WEBSERVER_PORT_NAME = "webserver_port";
 
     public static final String BROKER_PORT_CONFIG_NAME = "broker_ipc_port";
 
diff --git a/manager/dm-common/src/main/java/org/apache/doris/manager/common/util/ServerAndAgentConstant.java b/manager/dm-common/src/main/java/org/apache/doris/manager/common/util/ServerAndAgentConstant.java
index dbc813f..eacfd4e 100644
--- a/manager/dm-common/src/main/java/org/apache/doris/manager/common/util/ServerAndAgentConstant.java
+++ b/manager/dm-common/src/main/java/org/apache/doris/manager/common/util/ServerAndAgentConstant.java
@@ -73,6 +73,8 @@ public class ServerAndAgentConstant {
     public static final String FE_EDIT_SERVICE = "fe_edit";
 
     public static final String BE_HEARTBEAT_SERVICE = "be_heartbeat";
+    public static final String BE_HTTP_SERVICE = "be_http";
+
     public static final String BROKER_PRC_SERVICE = "broker_rpc";
 
     public static final Map<String, String> BAIDU_BROKER_CONFIG_DEDAULT;
diff --git a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/DorisClusterModuleManager.java b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/DorisClusterModuleManager.java
index 492e19d..5ac39a5 100644
--- a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/DorisClusterModuleManager.java
+++ b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/DorisClusterModuleManager.java
@@ -136,7 +136,13 @@ public class DorisClusterModuleManager {
             // for be service, heartbeat
             for (DeployConfigItem configItem : deployConfig.getConfigs()) {
                 if (configItem.getKey().equals(ConfigDefault.BE_HEARTBEAT_PORT_CONFIG_NAME)) {
-                    serviceNamePorts.put(ServerAndAgentConstant.BE_HEARTBEAT_SERVICE, Integer.valueOf(configItem.getValue()));
+                    serviceNamePorts.put(ServerAndAgentConstant.BE_HEARTBEAT_SERVICE,
+                            Integer.valueOf(configItem.getValue()));
+                }
+
+                if (configItem.getKey().equals(ConfigDefault.BE_WEBSERVER_PORT_NAME)) {
+                    serviceNamePorts.put(ServerAndAgentConstant.BE_HTTP_SERVICE,
+                            Integer.valueOf(configItem.getValue()));
                 }
             }
             serviceCreateOperation(moduleEntity, serviceNamePorts, accessInfo);
diff --git a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceClusterManager.java b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceClusterManager.java
index d7e13c7..d55f34c 100644
--- a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceClusterManager.java
+++ b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceClusterManager.java
@@ -228,7 +228,7 @@ public class ResourceClusterManager {
 
         List<ResourceNodeEntity> agentInstalledNodes = new ArrayList<>();
         for (ResourceNodeEntity nodeEntity : nodeEntities) {
-            if (!nodeAndAgentManager.checkAgentOperation(nodeEntity)) {
+            if (!nodeAndAgentManager.isAgentInstalled(nodeEntity)) {
                 log.warn("the agent has not been installed on {} node {}", nodeEntity.getId(), nodeEntity.getHost());
             } else {
                 agentInstalledNodes.add(nodeEntity);
@@ -269,7 +269,7 @@ public class ResourceClusterManager {
         }
 
         // async delete agent
-        for (ResourceNodeEntity nodeEntity : nodeEntities) {
+        for (ResourceNodeEntity nodeEntity : agentInstalledNodes) {
             AgentUnInstallEventConfigInfo uninstallConfig = new AgentUnInstallEventConfigInfo(
                     accessInfo.getSshUser(), accessInfo.getSshPort(), accessInfo.getSshKey(),
                     nodeEntity.getHost(), nodeEntity.getAgentInstallDir(),
diff --git a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceNodeAndAgentManager.java b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceNodeAndAgentManager.java
index 45a5253..41aced1 100644
--- a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceNodeAndAgentManager.java
+++ b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceNodeAndAgentManager.java
@@ -81,7 +81,7 @@ public class ResourceNodeAndAgentManager {
             throws Exception {
 
         // check agent has been installed or not
-        if (!checkAgentOperation(node)) {
+        if (!isAgentInstalled(node)) {
             log.warn("node[{}]:{} does not install agent, no need to uninstall", node.getId(), node.getHost());
             return;
         }
@@ -182,6 +182,34 @@ public class ResourceNodeAndAgentManager {
         }
     }
 
+    public boolean isAgentInstalled(ResourceNodeEntity node) {
+        long eventId = node.getCurrentEventId();
+
+        if (eventId < 1L) {
+            log.warn("The node no have agent");
+            return false;
+        } else {
+            HeartBeatEventEntity eventEntity = heartBeatEventRepository.findById(eventId).get();
+
+            // handling other event, agent has been installed
+            if (!eventEntity.getType().equals(HeartBeatEventType.AGENT_INSTALL.name())) {
+                return true;
+            }
+
+            // AGENT_INSTALL event
+            if (eventEntity.isCompleted() && eventEntity.getStatus().equals(HeartBeatEventResultType.SUCCESS.name())) {
+                return true;
+            }
+
+            if (eventEntity.getStage() >= AgentInstallEventStage.AGENT_START.getStage()) {
+                return true;
+            }
+
+            log.warn("Agent has not been installed successfully");
+            return false;
+        }
+    }
+
     public boolean isAvailableAgentPort(ResourceNodeEntity node, AgentInstallEventConfigInfo configInfo)
             throws Exception {
         // agent port check, eg: Spring Boot Param server.port=8008
@@ -369,7 +397,7 @@ public class ResourceNodeAndAgentManager {
     }
 
     private void uninstallEventProcess(ResourceNodeEntity node, AgentUnInstallEventConfigInfo configInfo,
-                                       HeartBeatEventEntity agentUninstallAgentEntity) {
+                                     HeartBeatEventEntity agentUninstallAgentEntity) {
         if (!agentUninstallAgentEntity.getType().equals(HeartBeatEventType.AGENT_STOP.name())) {
             log.warn("agent no need to stop on {} node {}", node.getId(), node.getHost());
             return;
diff --git a/manager/dm-server/src/main/java/org/apache/doris/stack/controller/control/ResourceClusterNodeController.java b/manager/dm-server/src/main/java/org/apache/doris/stack/controller/control/ResourceClusterNodeController.java
index 9392d99..1f07142 100644
--- a/manager/dm-server/src/main/java/org/apache/doris/stack/controller/control/ResourceClusterNodeController.java
+++ b/manager/dm-server/src/main/java/org/apache/doris/stack/controller/control/ResourceClusterNodeController.java
@@ -20,8 +20,8 @@ package org.apache.doris.stack.controller.control;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.doris.manager.common.heartbeat.HeartBeatEventInfo;
-import org.apache.doris.manager.common.heartbeat.HeartBeatEventResult;
+import org.apache.doris.manager.common.heartbeat.HeartBeatContext;
+import org.apache.doris.manager.common.heartbeat.HeartBeatResult;
 import org.apache.doris.stack.entity.CoreUserEntity;
 import org.apache.doris.stack.rest.ResponseEntityBuilder;
 import org.apache.doris.stack.service.control.ResourceClusterNodeService;
@@ -38,7 +38,6 @@ import org.springframework.web.bind.annotation.RestController;
 
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
-import java.util.List;
 
 @Api(tags = "Resource Cluster Node Agent API")
 @RestController
@@ -51,20 +50,22 @@ public class ResourceClusterNodeController {
     @Autowired
     private ResourceClusterNodeService nodeService;
 
-    @ApiOperation(value = "get node agent heartbeat")
-    @GetMapping(value = "{agentNodeId}/agent/heartbeat", produces = MediaType.APPLICATION_JSON_VALUE)
-    public List<HeartBeatEventInfo> getHeartbeat(HttpServletRequest request,
-                                                 HttpServletResponse response,
-                                                 @PathVariable(value = "agentNodeId") long agentNodeId) {
-        return nodeService.getHeartbeat(agentNodeId);
+    @ApiOperation(value = "get heart beat context(node agent heartbeat and instance info)")
+    @GetMapping(value = "{agentNodeId}/agent/context", produces = MediaType.APPLICATION_JSON_VALUE)
+    public HeartBeatContext getHeartbeatContext(HttpServletRequest request,
+                                                HttpServletResponse response,
+                                                @PathVariable(value = "agentNodeId") long agentNodeId) {
+        return nodeService.getHeartBeatContext(agentNodeId);
     }
 
-    @ApiOperation(value = "deal node agent heartbeat event result")
-    @PostMapping(value = "{agentNodeId}/agent/heartbeat", produces = MediaType.APPLICATION_JSON_VALUE)
-    public Object postHeartbeat(HttpServletRequest request,
-                                HttpServletResponse response,
-                                @RequestBody List<HeartBeatEventResult> results) throws Exception {
-        nodeService.dealHeartbeatResult(results);
+    @ApiOperation(value = "deal heart beat context)")
+    @PostMapping(value = "{agentNodeId}/agent/context", produces = MediaType.APPLICATION_JSON_VALUE)
+    public Object postHeartbeatContext(HttpServletRequest request,
+                                                HttpServletResponse response,
+                                                @PathVariable(value = "agentNodeId") long agentNodeId,
+                                                @RequestBody HeartBeatResult ctx) {
+        log.info("agent {} post heartbeat context", agentNodeId);
+        nodeService.dealHeartbeatContext(ctx);
         return "SUCCESS";
     }
 
diff --git a/manager/dm-server/src/main/java/org/apache/doris/stack/service/control/ResourceClusterNodeService.java b/manager/dm-server/src/main/java/org/apache/doris/stack/service/control/ResourceClusterNodeService.java
index f151e74..b10f192 100644
--- a/manager/dm-server/src/main/java/org/apache/doris/stack/service/control/ResourceClusterNodeService.java
+++ b/manager/dm-server/src/main/java/org/apache/doris/stack/service/control/ResourceClusterNodeService.java
@@ -19,27 +19,41 @@ package org.apache.doris.stack.service.control;
 
 import com.alibaba.fastjson.JSON;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.doris.manager.common.heartbeat.HeartBeatContext;
 import org.apache.doris.manager.common.heartbeat.HeartBeatEventInfo;
 import org.apache.doris.manager.common.heartbeat.HeartBeatEventResourceType;
 import org.apache.doris.manager.common.heartbeat.HeartBeatEventResult;
 import org.apache.doris.manager.common.heartbeat.HeartBeatEventType;
+import org.apache.doris.manager.common.heartbeat.HeartBeatResult;
+import org.apache.doris.manager.common.heartbeat.InstanceInfo;
+import org.apache.doris.manager.common.heartbeat.InstanceStateResult;
 import org.apache.doris.manager.common.heartbeat.config.AgentInstallEventConfigInfo;
+import org.apache.doris.manager.common.util.ServerAndAgentConstant;
+import org.apache.doris.stack.control.ModelControlState;
 import org.apache.doris.stack.control.manager.ResourceNodeAndAgentManager;
 import org.apache.doris.stack.dao.ClusterInstanceRepository;
+import org.apache.doris.stack.dao.ClusterModuleRepository;
+import org.apache.doris.stack.dao.ClusterModuleServiceRepository;
 import org.apache.doris.stack.dao.HeartBeatEventRepository;
 import org.apache.doris.stack.dao.ResourceClusterRepository;
 import org.apache.doris.stack.dao.ResourceNodeRepository;
 import org.apache.doris.stack.entity.ClusterInstanceEntity;
+import org.apache.doris.stack.entity.ClusterModuleEntity;
+import org.apache.doris.stack.entity.ClusterModuleServiceEntity;
 import org.apache.doris.stack.entity.HeartBeatEventEntity;
 import org.apache.doris.stack.entity.ResourceClusterEntity;
 import org.apache.doris.stack.entity.ResourceNodeEntity;
 import org.apache.doris.stack.model.request.control.PMResourceClusterAccessInfo;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
 
 import java.sql.Timestamp;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 
 @Service
 @Slf4j
@@ -57,9 +71,15 @@ public class ResourceClusterNodeService {
     @Autowired
     private HeartBeatEventRepository heartBeatEventRepository;
 
+    @Autowired
+    private ClusterModuleRepository clusterModuleRepository;
+
     @Autowired
     private ResourceNodeAndAgentManager nodeAndAgentManager;
 
+    @Autowired
+    private ClusterModuleServiceRepository serviceRepository;
+
     // Send uncompleted heartbeat events that need to be handled by agent
     public List<HeartBeatEventInfo> getHeartbeat(long agentNodeId) {
         log.info("Get agent {} uncompleted heartbeat events", agentNodeId);
@@ -84,6 +104,60 @@ public class ResourceClusterNodeService {
         return eventInfos;
     }
 
+    public List<InstanceInfo> getInstanceInfo(long agentNodeId) {
+        log.info("get node {} instance info", agentNodeId);
+        List<ClusterInstanceEntity> instanceEntities = instanceRepository.getByNodeId(agentNodeId);
+
+        List<InstanceInfo> instanceInfos = new ArrayList<>();
+
+        for (ClusterInstanceEntity ins : instanceEntities) {
+            Optional<ClusterModuleEntity> moduleEntityOpt = clusterModuleRepository.findById(ins.getModuleId());
+            if (!moduleEntityOpt.isPresent()) {
+                log.error("this instance module {} is not find, ignore it", ins.getModuleId());
+                continue;
+            }
+
+            ClusterModuleEntity moduleEntity = moduleEntityOpt.get();
+
+            log.info("to get module {} instance {} info", moduleEntity.getModuleName(), ins.getId());
+
+            int httpPort = 0;
+            String httpServerName = "";
+            if (moduleEntity.getModuleName().equals(ServerAndAgentConstant.FE_NAME)) {
+                httpServerName = ServerAndAgentConstant.FE_HTTP_SERVICE;
+            } else if (moduleEntity.getModuleName().equals(ServerAndAgentConstant.BE_NAME)) {
+                httpServerName = ServerAndAgentConstant.BE_HTTP_SERVICE;
+            }
+
+            List<ClusterModuleServiceEntity> httpServices = serviceRepository.getByClusterIdAndName(
+                    moduleEntity.getClusterId(), httpServerName);
+
+            for (ClusterModuleServiceEntity service : httpServices) {
+                List<String> addrList = JSON.parseArray(service.getAddressInfo(), String.class);
+                if (addrList.contains(ins.getAddress())) {
+                    httpPort = service.getPort();
+                }
+            }
+
+            log.info("module {} instance {} http port is {}", moduleEntity.getModuleName(), ins.getId(), httpPort);
+
+            InstanceInfo instanceInfo = new InstanceInfo(ins.getNodeId(), ins.getModuleId(), ins.getId(),
+                    moduleEntity.getModuleName(), ins.getInstallInfo(), httpPort);
+            log.info("get instance {} info: {}", ins.getId(), instanceInfo);
+            instanceInfos.add(instanceInfo);
+        }
+
+        return instanceInfos;
+    }
+
+    public HeartBeatContext getHeartBeatContext(long agentNodeId) {
+        log.info("start to get heartbeat context");
+        HeartBeatContext ctx = new HeartBeatContext();
+        ctx.setEvents(getHeartbeat(agentNodeId));
+        ctx.setInstanceInfos(getInstanceInfo(agentNodeId));
+        return ctx;
+    }
+
     // Handle the result of the heartbeat event of the agent
     public void dealHeartbeatResult(List<HeartBeatEventResult> eventResults) {
         for (HeartBeatEventResult eventResult : eventResults) {
@@ -101,6 +175,38 @@ public class ResourceClusterNodeService {
         }
     }
 
+    public void dealHeartbeatContext(HeartBeatResult ctx) {
+        log.info("deal heart beat context {}", ctx);
+
+        if (ctx.getEventResults() == null) {
+            log.warn("no events to deal");
+        } else {
+            dealHeartbeatResult(ctx.getEventResults());
+        }
+
+        if (ctx.getStateResults() == null) {
+            log.warn("no instance state result to deal");
+        } else {
+            for (InstanceStateResult stateResult : ctx.getStateResults()) {
+                dealInstanceState(stateResult);
+            }
+        }
+    }
+
+    public void dealInstanceState(InstanceStateResult stateResult) {
+        log.info("update module {} instance {}  state is {}",  stateResult.getModuleName(), stateResult.getInstanceId(),
+                stateResult.getState());
+
+        Optional<ClusterInstanceEntity> instanceEntityOpt = instanceRepository.findById(stateResult.getInstanceId());
+        if (!instanceEntityOpt.isPresent()) {
+            log.error("instance {} does not exists", stateResult.getInstanceId());
+        }
+
+        ClusterInstanceEntity instanceEntity = instanceEntityOpt.get();
+        instanceEntity.setCurrentState(stateResult.getState().getValue());
+        instanceRepository.save(instanceEntity);
+    }
+
     public void operateAgent(long nodeId, String operateType) throws Exception {
 
         HeartBeatEventType eventType = HeartBeatEventType.valueOf(operateType);
@@ -149,4 +255,63 @@ public class ResourceClusterNodeService {
             return;
         }
     }
+
+    @Transactional(rollbackFor = Exception.class)
+    public void updateInstancesState(ResourceNodeEntity node, int state) {
+        log.info("update instances of node {} {} to state {}", node.getId(), node.getHost(), state);
+        List<ClusterInstanceEntity> instanceEntities =  instanceRepository.getByNodeId(node.getId());
+        if (instanceEntities.isEmpty()) {
+            log.warn("node {} does hava any instances", node.getId());
+            return;
+        }
+
+        for (ClusterInstanceEntity ins : instanceEntities) {
+            log.info("update instance {} of module {} to {}", ins.getId(), ins.getModuleId(), state);
+            ins.setCurrentState(state);
+            instanceRepository.save(ins);
+        }
+    }
+
+    @Scheduled(cron = "0/60 * * * * ?")
+    public void agentNodeStateCheck() {
+        log.info("start to check agent nodes state");
+        List<ResourceNodeEntity> nodes = nodeRepository.findAll();
+        if (nodes.isEmpty()) {
+            log.info("no any agent nodes");
+            return;
+        }
+
+        for (ResourceNodeEntity node : nodes) {
+            Timestamp lastTime = node.getLastHeartBeatTimestamp();
+            if (lastTime == null) {
+                log.warn("not receive heartbeat yet form node {} {}", node.getId(), node.getHost());
+                if (node.getCurrentState() != ModelControlState.INIT.getValue()) {
+                    node.setCurrentState(ModelControlState.INIT.getValue());
+                    nodeRepository.save(node);
+                }
+                continue;
+            }
+
+            log.info("node {} {} last heartbeat time {}", node.getId(), node.getHost(),
+                    new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(lastTime));
+            if (System.currentTimeMillis() - lastTime.getTime() > 60 * 1000) {
+                log.warn("node {} heartbeat timeout", node.getId());
+                log.warn("update node {} form {} to {}", node.getId(), node.getCurrentState(),
+                        ModelControlState.UNKNOWN.getValue());
+                if (node.getCurrentState() != ModelControlState.UNKNOWN.getValue()) {
+                    node.setCurrentState(ModelControlState.UNKNOWN.getValue());
+                    log.info("update all instance of node {} to UNKNOWN", node.getId());
+                    updateInstancesState(node, ModelControlState.UNKNOWN.getValue());
+                }
+            } else {
+                log.info("node {} heartbeat state normal", node.getId());
+                log.warn("update node {} form {} to {}", node.getId(), node.getCurrentState(),
+                        ModelControlState.RUNNING.getValue());
+                if (node.getCurrentState() != ModelControlState.RUNNING.getValue()) {
+                    node.setCurrentState(ModelControlState.RUNNING.getValue());
+                }
+            }
+            nodeRepository.save(node);
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org