You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ji...@apache.org on 2022/04/22 08:46:58 UTC
[incubator-doris-manager] branch master updated: [feature] instance state report (#53)
This is an automated email from the ASF dual-hosted git repository.
jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris-manager.git
The following commit(s) were added to refs/heads/master by this push:
new 254cef5 [feature] instance state report (#53)
254cef5 is described below
commit 254cef55861545ff1d1ad853540a607ad7ed1348
Author: LiRui <11...@qq.com>
AuthorDate: Fri Apr 22 16:46:54 2022 +0800
[feature] instance state report (#53)
instance state report
---
.../exceptions/InstanceNotInstallException.java | 32 ++++
.../exceptions/InstanceNotRunningException.java | 32 ++++
.../agent/exceptions/InstanceServiceException.java | 29 ++++
.../manager/agent/service/HeartBeatService.java | 170 ++++++++++++++++++---
.../service/heartbeat/DorisInstanceOperator.java | 118 ++++++++++++--
.../service/heartbeat/InstanceEventHandler.java | 12 +-
.../apache/doris/manager/agent/util/Request.java | 43 +++---
.../src/main/resources/application.properties | 3 +
manager/dm-common/pom.xml | 6 +
.../manager/common/heartbeat/HeartBeatContext.java | 32 ++++
.../manager/common/heartbeat/HeartBeatResult.java | 32 ++++
.../manager/common/heartbeat/InstanceInfo.java | 40 +++++
.../common/heartbeat/InstanceStateResult.java | 53 +++++++
.../doris/manager/common/util/ConfigDefault.java | 1 +
.../common/util/ServerAndAgentConstant.java | 2 +
.../control/manager/DorisClusterModuleManager.java | 8 +-
.../control/manager/ResourceClusterManager.java | 4 +-
.../manager/ResourceNodeAndAgentManager.java | 32 +++-
.../control/ResourceClusterNodeController.java | 31 ++--
.../control/ResourceClusterNodeService.java | 165 ++++++++++++++++++++
20 files changed, 775 insertions(+), 70 deletions(-)
diff --git a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/exceptions/InstanceNotInstallException.java b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/exceptions/InstanceNotInstallException.java
new file mode 100644
index 0000000..cc3eee8
--- /dev/null
+++ b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/exceptions/InstanceNotInstallException.java
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.manager.agent.exceptions;
+
+public class InstanceNotInstallException extends Exception {
+
+ private final String moduleName;
+ private final String installDir;
+
+ public InstanceNotInstallException(String moduleName, String installDir) {
+ super(String.format("instance %s is not installed at %s", moduleName, installDir));
+
+ this.moduleName = moduleName;
+ this.installDir = installDir;
+ }
+}
+
diff --git a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/exceptions/InstanceNotRunningException.java b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/exceptions/InstanceNotRunningException.java
new file mode 100644
index 0000000..7561fe6
--- /dev/null
+++ b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/exceptions/InstanceNotRunningException.java
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.manager.agent.exceptions;
+
+public class InstanceNotRunningException extends Exception {
+
+ private final String moduleName;
+ private final String installDir;
+
+ public InstanceNotRunningException(String moduleName, String installDir) {
+ super(String.format("instance %s is not running at %s", moduleName, installDir));
+
+ this.moduleName = moduleName;
+ this.installDir = installDir;
+ }
+}
+
diff --git a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/exceptions/InstanceServiceException.java b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/exceptions/InstanceServiceException.java
new file mode 100644
index 0000000..35e5cc8
--- /dev/null
+++ b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/exceptions/InstanceServiceException.java
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.manager.agent.exceptions;
+
+public class InstanceServiceException extends Exception {
+
+ private final String moduleName;
+
+ public InstanceServiceException(String moduleName) {
+ super(String.format("instance %s service exception", moduleName));
+ this.moduleName = moduleName;
+ }
+}
+
diff --git a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/HeartBeatService.java b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/HeartBeatService.java
index f590ea1..e869a8e 100644
--- a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/HeartBeatService.java
+++ b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/HeartBeatService.java
@@ -18,17 +18,29 @@
package org.apache.doris.manager.agent.service;
import lombok.extern.slf4j.Slf4j;
+import org.apache.doris.manager.agent.exceptions.InstanceNotInstallException;
+import org.apache.doris.manager.agent.exceptions.InstanceNotRunningException;
+import org.apache.doris.manager.agent.exceptions.InstanceServiceException;
+import org.apache.doris.manager.agent.service.heartbeat.DorisInstanceOperator;
import org.apache.doris.manager.agent.service.heartbeat.HeartbeatEventHandler;
import org.apache.doris.manager.agent.util.Request;
+import org.apache.doris.manager.common.heartbeat.HeartBeatContext;
import org.apache.doris.manager.common.heartbeat.HeartBeatEventInfo;
import org.apache.doris.manager.common.heartbeat.HeartBeatEventResult;
+import org.apache.doris.manager.common.heartbeat.HeartBeatEventResultType;
+import org.apache.doris.manager.common.heartbeat.HeartBeatResult;
+import org.apache.doris.manager.common.heartbeat.InstanceInfo;
+import org.apache.doris.manager.common.heartbeat.InstanceStateResult;
+import org.apache.doris.stack.control.ModelControlState;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
@Service
@Slf4j
@@ -36,9 +48,19 @@ public class HeartBeatService {
@Autowired
private HeartbeatEventHandler heartbeatEventHandler;
+ @Autowired
+ private DorisInstanceOperator instanceOpera;
+
@Autowired
private Environment environment;
+ private ConcurrentHashMap<Long, HeartBeatEventInfo> events = new ConcurrentHashMap<>(); // event id
+ private ConcurrentHashMap<Long, InstanceInfo> instanceInfos = new ConcurrentHashMap<>(); // instance id
+
+ // cache event result in case of http post result failure
+ // some event is not reentrant
+ private ConcurrentHashMap<Long, HeartBeatEventResult> cacheResults = new ConcurrentHashMap<>();
+
// When the agent starts, it needs to complete the registration before it can handle other heartbeats
private String agentNodeId = "";
@@ -46,38 +68,148 @@ public class HeartBeatService {
private String heartBeatUrl = "";
- // TODO:Reserved for subsequent active reporting of instance status
-// private Set<InstanceInfo> instances = new HashSet<>();
-
- // TODO: To be improved
- // TODO: Currently, the heartbeat implemented here is only responsible for obtaining the events to be executed
- // from the server, and does not report the instance status controlled by the current agent
- // TODO:Execute once when the agent process starts?
- // Send heartbeat every 5 seconds, get heartbeat event list
-// @PostConstruct
- @Scheduled(cron = "0/5 * * * * ?")
- public void heartBeat() {
+ @Scheduled(cron = "0/${agent.heartbeat.interval:5} * * * * ?")
+ public void handleHeartBeatContextLoop() {
if (agentNodeId.isEmpty() || serverEndpoint.isEmpty()) {
agentNodeId = environment.getProperty("agent.node.id");
serverEndpoint = environment.getProperty("manager.server.endpoint");
- heartBeatUrl = "http://" + serverEndpoint + "/api/control/node/" + agentNodeId + "/agent/heartbeat";
+ heartBeatUrl = "http://" + serverEndpoint + "/api/control/node/" + agentNodeId + "/agent/context";
}
- // TODO :Process according to the returned heartbeat event results
- // TODO:If there is an event and it is processed, the result is sent
+
log.info("agent node is " + agentNodeId);
log.info("heartBeatUrl is " + heartBeatUrl);
- List<HeartBeatEventInfo> eventInfos = Request.getHeartBeatEventInfo(heartBeatUrl);
+ HeartBeatContext ctx = Request.getHeartBeatContext(heartBeatUrl);
+
+ // duplicate task
+ HeartBeatContext newCtx = addAndFilterContextTask(ctx);
+
+ Thread contextTask = new Thread(() -> {
+ HeartBeatResult res = executeContextTask(newCtx);
+
+ cacheResults.clear();
+ try {
+ String dealRes = Request.sendHeartBeatContextResult(heartBeatUrl, res);
+ log.info("server return context deal result: {}", dealRes);
+ } catch (IOException e) {
+ log.warn("send heartbeat context result error: {}", e.getMessage());
+ res.getEventResults().forEach((eventRes) -> {
+ if (eventRes.getResultType() != HeartBeatEventResultType.FAIL) {
+ log.info("cache event {} result, event type {}, stage {}", eventRes.getEventId(),
+ eventRes.getEventType(), eventRes.getEventStage());
+ cacheResults.put(eventRes.getEventId(), eventRes);
+ }
+ });
+ }
+
+ // clear completed tasks
+ newCtx.getEvents().forEach((e) -> {
+ log.info("remove finished [event {}] task", e.getEventId());
+ events.remove(e.getEventId());
+ });
+ newCtx.getInstanceInfos().forEach((ins) -> {
+ instanceInfos.remove(ins.getInstanceId());
+ });
+ });
+
+ contextTask.start();
+ }
+
+ private HeartBeatResult executeContextTask(HeartBeatContext ctx) {
+ List<HeartBeatEventResult> eventResults = new ArrayList<>();
+ List<InstanceStateResult> insStateResults = new ArrayList<>();
+
+ //TODO find from cache before
+
+ for (HeartBeatEventInfo eventInfo : ctx.getEvents()) {
+ log.info("handle event {}: resource:{} type:{} stage:{}", eventInfo.getEventId(),
+ eventInfo.getResourceType(), eventInfo.getEventStage(), eventInfo.getEventStage());
+
+ // get result from cache if it has been executed
+ long eventId = eventInfo.getEventId();
+ if (cacheResults.containsKey(eventId)) {
+ HeartBeatEventResult cr = cacheResults.get(eventId);
+ log.info("result is in cache, event {}, stage {}, result type {}", cr.getEventId(),
+ cr.getEventStage(), cr.getResultType());
+ if (cr.getResultType() == HeartBeatEventResultType.PROCESSING
+ && eventInfo.getEventStage() < cr.getEventStage()) {
+ log.info("return result from result cache");
+ eventResults.add(cr);
+ continue;
+ } else if (cr.getResultType() == HeartBeatEventResultType.SUCCESS
+ && cr.getEventStage() == cr.getEventStage()) {
+ log.info("return result form result cache");
+ eventResults.add(cr);
+ continue;
+ }
+ }
- List<HeartBeatEventResult> results = new ArrayList<>();
- for (HeartBeatEventInfo eventInfo : eventInfos) {
HeartBeatEventResult result = heartbeatEventHandler.handHeartBeatEvent(eventInfo);
if (result != null) {
- results.add(result);
+ eventResults.add(result);
+ }
+ }
+
+ for (InstanceInfo instanceInfo : ctx.getInstanceInfos()) {
+ log.info("check module {} instance {} state", instanceInfo.getModuleName(), instanceInfo.getInstanceId());
+ InstanceStateResult stateResult = new InstanceStateResult(instanceInfo);
+ try {
+ instanceOpera.checkInstanceProcessState(instanceInfo.getModuleName(), instanceInfo.getInstallDir(),
+ instanceInfo.getHttpPort());
+
+ stateResult.setState(ModelControlState.RUNNING);
+ } catch (InstanceNotInstallException e) {
+ log.error("{} instance check exception {}", instanceInfo.getModuleName(), e.getMessage());
+ // maybe instance has noe be installed
+ stateResult.setState(ModelControlState.INIT);
+ stateResult.setErrMsg(e.getMessage());
+ } catch (InstanceNotRunningException | InstanceServiceException e) {
+ log.error("{} instance check exception {}", instanceInfo.getModuleName(), e.getMessage());
+ stateResult.setState(ModelControlState.STOPPED);
+ stateResult.setErrMsg(e.getMessage());
}
+
+ insStateResults.add(stateResult);
}
- Request.sendHeartBeatEventResult(heartBeatUrl, results);
+ HeartBeatResult res = new HeartBeatResult();
+ res.setEventResults(eventResults);
+ res.setStateResults(insStateResults);
+
+ return res;
}
+ private HeartBeatContext addAndFilterContextTask(HeartBeatContext ctx) {
+ HeartBeatContext filterCtx = new HeartBeatContext();
+ List<HeartBeatEventInfo> newEvents = new ArrayList<>();
+ List<InstanceInfo> newInsInfos = new ArrayList<>();
+
+ if (ctx.getEvents() != null) {
+ for (HeartBeatEventInfo eventInfo : ctx.getEvents()) {
+ if (events.containsKey(eventInfo.getEventId())) {
+ log.warn("heartbeat event {} is running", eventInfo.getEventId());
+ continue;
+ }
+ log.info("add event {}", eventInfo.getEventId());
+ events.put(eventInfo.getEventId(), eventInfo);
+ newEvents.add(eventInfo);
+ }
+ }
+
+ if (ctx.getInstanceInfos() != null) {
+ for (InstanceInfo ins : ctx.getInstanceInfos()) {
+ if (instanceInfos.containsKey(ins.getInstanceId())) {
+ log.warn("module {} instance {} check task is running", ins.getModuleName(), ins.getInstanceId());
+ continue;
+ }
+ log.info("add {} instance {} check task", ins.getModuleName(), ins.getInstanceId());
+ instanceInfos.put(ins.getInstanceId(), ins);
+ newInsInfos.add(ins);
+ }
+ }
+
+ filterCtx.setEvents(newEvents);
+ filterCtx.setInstanceInfos(newInsInfos);
+ return filterCtx;
+ }
}
diff --git a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/heartbeat/DorisInstanceOperator.java b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/heartbeat/DorisInstanceOperator.java
index 621049e..ecfb40c 100644
--- a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/heartbeat/DorisInstanceOperator.java
+++ b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/heartbeat/DorisInstanceOperator.java
@@ -17,10 +17,15 @@
package org.apache.doris.manager.agent.service.heartbeat;
+import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import lombok.extern.slf4j.Slf4j;
+import org.apache.doris.manager.agent.exceptions.InstanceNotInstallException;
+import org.apache.doris.manager.agent.exceptions.InstanceNotRunningException;
+import org.apache.doris.manager.agent.exceptions.InstanceServiceException;
+import org.apache.doris.manager.agent.util.Request;
import org.apache.doris.manager.agent.util.ShellUtil;
import org.apache.doris.manager.common.util.ConfigDefault;
import org.apache.doris.manager.common.util.ServerAndAgentConstant;
@@ -30,8 +35,10 @@ import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.net.URISyntaxException;
import java.nio.charset.Charset;
import java.nio.file.Paths;
+import java.util.HashMap;
import java.util.Map;
@Slf4j
@@ -143,7 +150,7 @@ public class DorisInstanceOperator {
executePkgShellScriptWithBash(startScript, installInfo, moudleName, Maps.newHashMap());
} else {
- log.info("{} instance is running", moudleName);
+ log.info("instance {} is running", moudleName);
}
log.info("start {} instance success", moudleName);
return true;
@@ -192,21 +199,108 @@ public class DorisInstanceOperator {
}
// Check whether the instance has been installed and started
- public boolean checkInstanceDeploy(String moudleName, String installInfo) {
+ public void checkInstanceProcessState(String moduleName, String installDir, int httpPort)
+ throws InstanceNotInstallException, InstanceNotRunningException, InstanceServiceException {
+
+ if (moduleName.equals(ServerAndAgentConstant.BROKER_NAME)) {
+ moduleName = getBrokerInstallationPath(installDir);
+ }
+
+ // install dir check
+ log.info("to check module {} instance process state in {}", moduleName, installDir);
+ File moduleRoot = Paths.get(installDir, moduleName).toFile();
+ if (!moduleRoot.exists()) {
+ log.error("instance {} not installed, can not find root path: {}", moduleName, moduleRoot);
+ throw new InstanceNotInstallException(moduleName, installDir);
+ }
+
+ // process state check
+ // dont consider multiple be is deployed at one a machine node
try {
- if (moudleName.equals(ServerAndAgentConstant.BROKER_NAME)) {
- moudleName = getBrokerInstallationPath(installInfo);
- }
- int bePid = processIsRunning(moudleName, installInfo);
- if (bePid < 0) {
- return false;
- } else {
- return true;
+ int insPid = processIsRunning(moduleName, installDir);
+ if (insPid < 0) {
+ log.error("instance {} is not running", moduleName);
+ throw new InstanceNotRunningException(moduleName, moduleRoot.getAbsolutePath());
}
} catch (Exception e) {
- log.error("Check " + moudleName + " instance running error {}.", e);
- return false;
+ log.error("check instance {} process state error: {}", moduleName, e.getMessage());
+ throw new InstanceNotRunningException(moduleName, moduleRoot.getAbsolutePath());
+ }
+
+ if (httpPort <= 0) {
+ log.warn("invalid http port {}, skip http service check", httpPort);
+ return;
}
+
+ // fe/be http service check
+ String statusURL;
+ if (moduleName.equals(ServerAndAgentConstant.FE_NAME)) {
+ statusURL = "http://localhost:" + httpPort + "/api/bootstrap";
+ } else if (moduleName.equals(ServerAndAgentConstant.BE_NAME)) {
+ statusURL = "http://localhost:" + httpPort + "/api/health";
+ } else {
+ // can not check other module by http
+ log.error("unknown module name {}", moduleName);
+ return;
+ }
+
+ /*
+ * Before Palo 3.10, the FE api/bootstrap API return like this:
+ * {
+ * "replayedJournalId": 0,
+ * "queryPort": 0,
+ * "rpcPort": 0,
+ * "status": "OK",
+ * "msg": "Success"
+ * }
+ *
+ * From 3.10, the FE api/bootstrap API return like this:
+ * {
+ * "msg": "success",
+ * "code": 0,
+ * "data": {"replayedJournalId": 0, "queryPort": 0, "rpcPort": 0},
+ * "count": 0
+ * }
+ *
+ * FE api/health return like this
+ * {
+ * "msg": "success",
+ * "code": 0,
+ * "data": {"online_backend_num": 2, "total_backend_num": 2},
+ * "count": 0
+ * }
+ *
+ * BE api/health return lik this
+ * {"status": "OK","msg": "To Be Added"}
+ *
+ */
+
+ String stateRes;
+ try {
+ stateRes = Request.sendGetRequest(statusURL, new HashMap<>());
+ } catch (URISyntaxException e) {
+ log.error("{} syntax error {}", statusURL, e.getMessage());
+ return;
+ } catch (IOException e) {
+ throw new InstanceServiceException(moduleName);
+ }
+ log.info("http health return: {}", stateRes);
+ // or status == ok
+ JSONObject stateJson = JSONObject.parseObject(stateRes);
+ String status = stateJson.getString("status");
+ if (status != null && status.equals("OK")) {
+ log.info("module {} instance service is normal, return status OK", moduleName);
+ return;
+ }
+
+ // or code == 0
+ Integer code = stateJson.getInteger("code");
+ if (code != null && code == 0) {
+ log.info("modele {} instance service is normal return code 0", moduleName);
+ return;
+ }
+
+ throw new InstanceServiceException(moduleName);
}
/*
diff --git a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/heartbeat/InstanceEventHandler.java b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/heartbeat/InstanceEventHandler.java
index a7829c4..e86bc15 100644
--- a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/heartbeat/InstanceEventHandler.java
+++ b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/service/heartbeat/InstanceEventHandler.java
@@ -59,7 +59,17 @@ public class InstanceEventHandler {
InstanceDeployCheckEventConfigInfo configInfo = JSON.parseObject(jsonConfigStr,
InstanceDeployCheckEventConfigInfo.class);
- boolean isDeploy = instanceOperator.checkInstanceDeploy(configInfo.getModuleName(), configInfo.getInstallInfo());
+ boolean isDeploy = true;
+ try {
+ // here we do not check http service is ready or not
+ // because it is ready after a few seconds of the process starts
+ // which may cause `Doris Cluster Creation Request` failed and blocked
+ instanceOperator.checkInstanceProcessState(configInfo.getModuleName(),
+ configInfo.getInstallInfo(), 0);
+ } catch (Exception e) {
+ log.error("check instance {} deploy error: {}", configInfo.getModuleName(), e.getMessage());
+ isDeploy = false;
+ }
HeartBeatEventResult result = new HeartBeatEventResult(eventInfo);
// There is only one step
diff --git a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/util/Request.java b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/util/Request.java
index c5d6ccc..9f6c95a 100644
--- a/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/util/Request.java
+++ b/manager/dm-agent/src/main/java/org/apache/doris/manager/agent/util/Request.java
@@ -19,8 +19,8 @@ package org.apache.doris.manager.agent.util;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
-import org.apache.doris.manager.common.heartbeat.HeartBeatEventInfo;
-import org.apache.doris.manager.common.heartbeat.HeartBeatEventResult;
+import org.apache.doris.manager.common.heartbeat.HeartBeatContext;
+import org.apache.doris.manager.common.heartbeat.HeartBeatResult;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
@@ -35,29 +35,34 @@ import org.apache.http.util.EntityUtils;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
@Slf4j
public class Request {
+ public static HeartBeatContext getHeartBeatContext(String requestUrl) {
+ String ctx = null;
- public static List<HeartBeatEventInfo> getHeartBeatEventInfo(String requestUrl) {
- String getHeartBeatEventResults = sendGetRequest(requestUrl, new HashMap<>());
- log.info("getHeartBeatEventResults:" + getHeartBeatEventResults);
-// ManagerServerResponse response = JSON.parseObject(getHeartBeatEventResults, ManagerServerResponse.class);
- if (getHeartBeatEventResults == null) {
- return new ArrayList<>();
+ try {
+ ctx = sendGetRequest(requestUrl, new HashMap<>());
+ log.info("getHeartBeatContextResults:" + ctx);
+ } catch (Exception e) {
+ log.error("get heartbeat context error {}", e.getMessage());
+ }
+
+ if (ctx == null) {
+ log.warn("no context return");
+ return new HeartBeatContext();
}
- return JSON.parseArray(getHeartBeatEventResults, HeartBeatEventInfo.class);
+ return JSON.parseObject(ctx, HeartBeatContext.class);
}
- public static void sendHeartBeatEventResult(String requestUrl, List<HeartBeatEventResult> results) {
- sendPostRequest(requestUrl, JSON.toJSONString(results));
+ public static String sendHeartBeatContextResult(String requestUrl, HeartBeatResult res) throws IOException {
+ log.info("send heart beat context result {} to {}", JSON.toJSONString(res), requestUrl);
+ return sendPostRequest(requestUrl, JSON.toJSONString(res));
}
- public static String sendPostRequest(String requestUrl, String bodyJson) {
+ public static String sendPostRequest(String requestUrl, String bodyJson) throws IOException {
HttpPost httpPost = new HttpPost(requestUrl);
httpPost.setConfig(timeout());
@@ -68,7 +73,7 @@ public class Request {
return request(httpPost);
} catch (IOException e) {
log.error("request url error:{},param:{}", requestUrl, bodyJson, e);
- throw new RuntimeException(e);
+ throw e;
}
}
@@ -87,7 +92,8 @@ public class Request {
}
}
- public static String sendGetRequest(String requestUrl, Map<String, Object> params) {
+ public static String sendGetRequest(String requestUrl, Map<String, Object> params)
+ throws URISyntaxException, IOException {
URI url = null;
try {
URIBuilder uriBuilder = null;
@@ -97,7 +103,8 @@ public class Request {
}
url = uriBuilder.build();
} catch (URISyntaxException e) {
- e.printStackTrace();
+ log.error("{} syntax error {}", requestUrl, e.getMessage());
+ throw e;
}
HttpGet httpGet = new HttpGet(url);
@@ -107,7 +114,7 @@ public class Request {
return request(httpGet);
} catch (IOException e) {
log.error("request url error:{},param:{}", requestUrl, params, e);
- throw new RuntimeException(e);
+ throw e;
}
}
diff --git a/manager/dm-agent/src/main/resources/application.properties b/manager/dm-agent/src/main/resources/application.properties
index 4a73966..0327076 100644
--- a/manager/dm-agent/src/main/resources/application.properties
+++ b/manager/dm-agent/src/main/resources/application.properties
@@ -21,3 +21,6 @@ server.port=8001
manager.server.endpoint=
# manager agent unique identification
agent.node.id=
+
+# heart beat interval(s)
+agent.heartbeat.interval=5
diff --git a/manager/dm-common/pom.xml b/manager/dm-common/pom.xml
index 44cf13e..06c7265 100644
--- a/manager/dm-common/pom.xml
+++ b/manager/dm-common/pom.xml
@@ -27,6 +27,12 @@ under the License.
<modelVersion>4.0.0</modelVersion>
<artifactId>dm-common</artifactId>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.doris</groupId>
+ <artifactId>resource-common</artifactId>
+ </dependency>
+ </dependencies>
</project>
\ No newline at end of file
diff --git a/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/HeartBeatContext.java b/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/HeartBeatContext.java
new file mode 100644
index 0000000..3556f34
--- /dev/null
+++ b/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/HeartBeatContext.java
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.manager.common.heartbeat;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class HeartBeatContext {
+ List<HeartBeatEventInfo> events;
+ List<InstanceInfo> instanceInfos;
+}
diff --git a/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/HeartBeatResult.java b/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/HeartBeatResult.java
new file mode 100644
index 0000000..d70a12a
--- /dev/null
+++ b/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/HeartBeatResult.java
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.manager.common.heartbeat;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.List;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class HeartBeatResult {
+ List<HeartBeatEventResult> eventResults;
+ List<InstanceStateResult> stateResults;
+}
diff --git a/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/InstanceInfo.java b/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/InstanceInfo.java
new file mode 100644
index 0000000..96b16d0
--- /dev/null
+++ b/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/InstanceInfo.java
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.manager.common.heartbeat;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class InstanceInfo {
+ private long agentNodeId;
+
+ private long moduleId;
+
+ private long instanceId;
+
+ private String moduleName;
+
+ private String installDir;
+
+ //only available when moduleName = fe/be
+ private int httpPort;
+}
diff --git a/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/InstanceStateResult.java b/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/InstanceStateResult.java
new file mode 100644
index 0000000..31a4e47
--- /dev/null
+++ b/manager/dm-common/src/main/java/org/apache/doris/manager/common/heartbeat/InstanceStateResult.java
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.manager.common.heartbeat;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.doris.stack.control.ModelControlState;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class InstanceStateResult {
+ private long agentNodeId;
+
+ private long moduleId;
+
+ private long instanceId;
+
+ private String moduleName;
+
+ private String installDir;
+
+ ModelControlState state;
+
+ // is valid only when state is RUNNING
+ String errMsg;
+
+ public InstanceStateResult(InstanceInfo info) {
+ this.agentNodeId = info.getAgentNodeId();
+ this.moduleId = info.getModuleId();
+ this.instanceId = info.getInstanceId();
+ this.moduleName = info.getModuleName();
+ this.installDir = info.getInstallDir();
+
+ this.state = ModelControlState.UNKNOWN;
+ }
+}
diff --git a/manager/dm-common/src/main/java/org/apache/doris/manager/common/util/ConfigDefault.java b/manager/dm-common/src/main/java/org/apache/doris/manager/common/util/ConfigDefault.java
index 619bc0c..db6e7fa 100644
--- a/manager/dm-common/src/main/java/org/apache/doris/manager/common/util/ConfigDefault.java
+++ b/manager/dm-common/src/main/java/org/apache/doris/manager/common/util/ConfigDefault.java
@@ -37,6 +37,7 @@ public class ConfigDefault {
public static final String FE_EDIT_LOG_PORT = "edit_log_port";
public static final String BE_HEARTBEAT_PORT_CONFIG_NAME = "heartbeat_service_port";
+ public static final String BE_WEBSERVER_PORT_NAME = "webserver_port";
public static final String BROKER_PORT_CONFIG_NAME = "broker_ipc_port";
diff --git a/manager/dm-common/src/main/java/org/apache/doris/manager/common/util/ServerAndAgentConstant.java b/manager/dm-common/src/main/java/org/apache/doris/manager/common/util/ServerAndAgentConstant.java
index dbc813f..eacfd4e 100644
--- a/manager/dm-common/src/main/java/org/apache/doris/manager/common/util/ServerAndAgentConstant.java
+++ b/manager/dm-common/src/main/java/org/apache/doris/manager/common/util/ServerAndAgentConstant.java
@@ -73,6 +73,8 @@ public class ServerAndAgentConstant {
public static final String FE_EDIT_SERVICE = "fe_edit";
public static final String BE_HEARTBEAT_SERVICE = "be_heartbeat";
+ public static final String BE_HTTP_SERVICE = "be_http";
+
public static final String BROKER_PRC_SERVICE = "broker_rpc";
public static final Map<String, String> BAIDU_BROKER_CONFIG_DEDAULT;
diff --git a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/DorisClusterModuleManager.java b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/DorisClusterModuleManager.java
index 492e19d..5ac39a5 100644
--- a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/DorisClusterModuleManager.java
+++ b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/DorisClusterModuleManager.java
@@ -136,7 +136,13 @@ public class DorisClusterModuleManager {
// for be service, heartbeat
for (DeployConfigItem configItem : deployConfig.getConfigs()) {
if (configItem.getKey().equals(ConfigDefault.BE_HEARTBEAT_PORT_CONFIG_NAME)) {
- serviceNamePorts.put(ServerAndAgentConstant.BE_HEARTBEAT_SERVICE, Integer.valueOf(configItem.getValue()));
+ serviceNamePorts.put(ServerAndAgentConstant.BE_HEARTBEAT_SERVICE,
+ Integer.valueOf(configItem.getValue()));
+ }
+
+ if (configItem.getKey().equals(ConfigDefault.BE_WEBSERVER_PORT_NAME)) {
+ serviceNamePorts.put(ServerAndAgentConstant.BE_HTTP_SERVICE,
+ Integer.valueOf(configItem.getValue()));
}
}
serviceCreateOperation(moduleEntity, serviceNamePorts, accessInfo);
diff --git a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceClusterManager.java b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceClusterManager.java
index d7e13c7..d55f34c 100644
--- a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceClusterManager.java
+++ b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceClusterManager.java
@@ -228,7 +228,7 @@ public class ResourceClusterManager {
List<ResourceNodeEntity> agentInstalledNodes = new ArrayList<>();
for (ResourceNodeEntity nodeEntity : nodeEntities) {
- if (!nodeAndAgentManager.checkAgentOperation(nodeEntity)) {
+ if (!nodeAndAgentManager.isAgentInstalled(nodeEntity)) {
log.warn("the agent has not been installed on {} node {}", nodeEntity.getId(), nodeEntity.getHost());
} else {
agentInstalledNodes.add(nodeEntity);
@@ -269,7 +269,7 @@ public class ResourceClusterManager {
}
// async delete agent
- for (ResourceNodeEntity nodeEntity : nodeEntities) {
+ for (ResourceNodeEntity nodeEntity : agentInstalledNodes) {
AgentUnInstallEventConfigInfo uninstallConfig = new AgentUnInstallEventConfigInfo(
accessInfo.getSshUser(), accessInfo.getSshPort(), accessInfo.getSshKey(),
nodeEntity.getHost(), nodeEntity.getAgentInstallDir(),
diff --git a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceNodeAndAgentManager.java b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceNodeAndAgentManager.java
index 45a5253..41aced1 100644
--- a/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceNodeAndAgentManager.java
+++ b/manager/dm-server/src/main/java/org/apache/doris/stack/control/manager/ResourceNodeAndAgentManager.java
@@ -81,7 +81,7 @@ public class ResourceNodeAndAgentManager {
throws Exception {
// check agent has been installed or not
- if (!checkAgentOperation(node)) {
+ if (!isAgentInstalled(node)) {
log.warn("node[{}]:{} does not install agent, no need to uninstall", node.getId(), node.getHost());
return;
}
@@ -182,6 +182,34 @@ public class ResourceNodeAndAgentManager {
}
}
+ public boolean isAgentInstalled(ResourceNodeEntity node) {
+ long eventId = node.getCurrentEventId();
+
+ if (eventId < 1L) {
+ log.warn("The node no have agent");
+ return false;
+ } else {
+ HeartBeatEventEntity eventEntity = heartBeatEventRepository.findById(eventId).get();
+
+ // handling other event, agent has been installed
+ if (!eventEntity.getType().equals(HeartBeatEventType.AGENT_INSTALL.name())) {
+ return true;
+ }
+
+ // AGENT_INSTALL event
+ if (eventEntity.isCompleted() && eventEntity.getStatus().equals(HeartBeatEventResultType.SUCCESS.name())) {
+ return true;
+ }
+
+ if (eventEntity.getStage() >= AgentInstallEventStage.AGENT_START.getStage()) {
+ return true;
+ }
+
+ log.warn("Agent has not been installed successfully");
+ return false;
+ }
+ }
+
public boolean isAvailableAgentPort(ResourceNodeEntity node, AgentInstallEventConfigInfo configInfo)
throws Exception {
// agent port check, eg: Spring Boot Param server.port=8008
@@ -369,7 +397,7 @@ public class ResourceNodeAndAgentManager {
}
private void uninstallEventProcess(ResourceNodeEntity node, AgentUnInstallEventConfigInfo configInfo,
- HeartBeatEventEntity agentUninstallAgentEntity) {
+ HeartBeatEventEntity agentUninstallAgentEntity) {
if (!agentUninstallAgentEntity.getType().equals(HeartBeatEventType.AGENT_STOP.name())) {
log.warn("agent no need to stop on {} node {}", node.getId(), node.getHost());
return;
diff --git a/manager/dm-server/src/main/java/org/apache/doris/stack/controller/control/ResourceClusterNodeController.java b/manager/dm-server/src/main/java/org/apache/doris/stack/controller/control/ResourceClusterNodeController.java
index 9392d99..1f07142 100644
--- a/manager/dm-server/src/main/java/org/apache/doris/stack/controller/control/ResourceClusterNodeController.java
+++ b/manager/dm-server/src/main/java/org/apache/doris/stack/controller/control/ResourceClusterNodeController.java
@@ -20,8 +20,8 @@ package org.apache.doris.stack.controller.control;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
-import org.apache.doris.manager.common.heartbeat.HeartBeatEventInfo;
-import org.apache.doris.manager.common.heartbeat.HeartBeatEventResult;
+import org.apache.doris.manager.common.heartbeat.HeartBeatContext;
+import org.apache.doris.manager.common.heartbeat.HeartBeatResult;
import org.apache.doris.stack.entity.CoreUserEntity;
import org.apache.doris.stack.rest.ResponseEntityBuilder;
import org.apache.doris.stack.service.control.ResourceClusterNodeService;
@@ -38,7 +38,6 @@ import org.springframework.web.bind.annotation.RestController;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-import java.util.List;
@Api(tags = "Resource Cluster Node Agent API")
@RestController
@@ -51,20 +50,22 @@ public class ResourceClusterNodeController {
@Autowired
private ResourceClusterNodeService nodeService;
- @ApiOperation(value = "get node agent heartbeat")
- @GetMapping(value = "{agentNodeId}/agent/heartbeat", produces = MediaType.APPLICATION_JSON_VALUE)
- public List<HeartBeatEventInfo> getHeartbeat(HttpServletRequest request,
- HttpServletResponse response,
- @PathVariable(value = "agentNodeId") long agentNodeId) {
- return nodeService.getHeartbeat(agentNodeId);
+ @ApiOperation(value = "get heart beat context(node agent heartbeat and instance info)")
+ @GetMapping(value = "{agentNodeId}/agent/context", produces = MediaType.APPLICATION_JSON_VALUE)
+ public HeartBeatContext getHeartbeatContext(HttpServletRequest request,
+ HttpServletResponse response,
+ @PathVariable(value = "agentNodeId") long agentNodeId) {
+ return nodeService.getHeartBeatContext(agentNodeId);
}
- @ApiOperation(value = "deal node agent heartbeat event result")
- @PostMapping(value = "{agentNodeId}/agent/heartbeat", produces = MediaType.APPLICATION_JSON_VALUE)
- public Object postHeartbeat(HttpServletRequest request,
- HttpServletResponse response,
- @RequestBody List<HeartBeatEventResult> results) throws Exception {
- nodeService.dealHeartbeatResult(results);
+ @ApiOperation(value = "deal heart beat context)")
+ @PostMapping(value = "{agentNodeId}/agent/context", produces = MediaType.APPLICATION_JSON_VALUE)
+ public Object postHeartbeatContext(HttpServletRequest request,
+ HttpServletResponse response,
+ @PathVariable(value = "agentNodeId") long agentNodeId,
+ @RequestBody HeartBeatResult ctx) {
+ log.info("agent {} post heartbeat context", agentNodeId);
+ nodeService.dealHeartbeatContext(ctx);
return "SUCCESS";
}
diff --git a/manager/dm-server/src/main/java/org/apache/doris/stack/service/control/ResourceClusterNodeService.java b/manager/dm-server/src/main/java/org/apache/doris/stack/service/control/ResourceClusterNodeService.java
index f151e74..b10f192 100644
--- a/manager/dm-server/src/main/java/org/apache/doris/stack/service/control/ResourceClusterNodeService.java
+++ b/manager/dm-server/src/main/java/org/apache/doris/stack/service/control/ResourceClusterNodeService.java
@@ -19,27 +19,41 @@ package org.apache.doris.stack.service.control;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
+import org.apache.doris.manager.common.heartbeat.HeartBeatContext;
import org.apache.doris.manager.common.heartbeat.HeartBeatEventInfo;
import org.apache.doris.manager.common.heartbeat.HeartBeatEventResourceType;
import org.apache.doris.manager.common.heartbeat.HeartBeatEventResult;
import org.apache.doris.manager.common.heartbeat.HeartBeatEventType;
+import org.apache.doris.manager.common.heartbeat.HeartBeatResult;
+import org.apache.doris.manager.common.heartbeat.InstanceInfo;
+import org.apache.doris.manager.common.heartbeat.InstanceStateResult;
import org.apache.doris.manager.common.heartbeat.config.AgentInstallEventConfigInfo;
+import org.apache.doris.manager.common.util.ServerAndAgentConstant;
+import org.apache.doris.stack.control.ModelControlState;
import org.apache.doris.stack.control.manager.ResourceNodeAndAgentManager;
import org.apache.doris.stack.dao.ClusterInstanceRepository;
+import org.apache.doris.stack.dao.ClusterModuleRepository;
+import org.apache.doris.stack.dao.ClusterModuleServiceRepository;
import org.apache.doris.stack.dao.HeartBeatEventRepository;
import org.apache.doris.stack.dao.ResourceClusterRepository;
import org.apache.doris.stack.dao.ResourceNodeRepository;
import org.apache.doris.stack.entity.ClusterInstanceEntity;
+import org.apache.doris.stack.entity.ClusterModuleEntity;
+import org.apache.doris.stack.entity.ClusterModuleServiceEntity;
import org.apache.doris.stack.entity.HeartBeatEventEntity;
import org.apache.doris.stack.entity.ResourceClusterEntity;
import org.apache.doris.stack.entity.ResourceNodeEntity;
import org.apache.doris.stack.model.request.control.PMResourceClusterAccessInfo;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
import java.sql.Timestamp;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
@Service
@Slf4j
@@ -57,9 +71,15 @@ public class ResourceClusterNodeService {
@Autowired
private HeartBeatEventRepository heartBeatEventRepository;
+ @Autowired
+ private ClusterModuleRepository clusterModuleRepository;
+
@Autowired
private ResourceNodeAndAgentManager nodeAndAgentManager;
+ @Autowired
+ private ClusterModuleServiceRepository serviceRepository;
+
// Send uncompleted heartbeat events that need to be handled by agent
public List<HeartBeatEventInfo> getHeartbeat(long agentNodeId) {
log.info("Get agent {} uncompleted heartbeat events", agentNodeId);
@@ -84,6 +104,60 @@ public class ResourceClusterNodeService {
return eventInfos;
}
+ public List<InstanceInfo> getInstanceInfo(long agentNodeId) {
+ log.info("get node {} instance info", agentNodeId);
+ List<ClusterInstanceEntity> instanceEntities = instanceRepository.getByNodeId(agentNodeId);
+
+ List<InstanceInfo> instanceInfos = new ArrayList<>();
+
+ for (ClusterInstanceEntity ins : instanceEntities) {
+ Optional<ClusterModuleEntity> moduleEntityOpt = clusterModuleRepository.findById(ins.getModuleId());
+ if (!moduleEntityOpt.isPresent()) {
+ log.error("this instance module {} is not find, ignore it", ins.getModuleId());
+ continue;
+ }
+
+ ClusterModuleEntity moduleEntity = moduleEntityOpt.get();
+
+ log.info("to get module {} instance {} info", moduleEntity.getModuleName(), ins.getId());
+
+ int httpPort = 0;
+ String httpServerName = "";
+ if (moduleEntity.getModuleName().equals(ServerAndAgentConstant.FE_NAME)) {
+ httpServerName = ServerAndAgentConstant.FE_HTTP_SERVICE;
+ } else if (moduleEntity.getModuleName().equals(ServerAndAgentConstant.BE_NAME)) {
+ httpServerName = ServerAndAgentConstant.BE_HTTP_SERVICE;
+ }
+
+ List<ClusterModuleServiceEntity> httpServices = serviceRepository.getByClusterIdAndName(
+ moduleEntity.getClusterId(), httpServerName);
+
+ for (ClusterModuleServiceEntity service : httpServices) {
+ List<String> addrList = JSON.parseArray(service.getAddressInfo(), String.class);
+ if (addrList.contains(ins.getAddress())) {
+ httpPort = service.getPort();
+ }
+ }
+
+ log.info("module {} instance {} http port is {}", moduleEntity.getModuleName(), ins.getId(), httpPort);
+
+ InstanceInfo instanceInfo = new InstanceInfo(ins.getNodeId(), ins.getModuleId(), ins.getId(),
+ moduleEntity.getModuleName(), ins.getInstallInfo(), httpPort);
+ log.info("get instance {} info: {}", ins.getId(), instanceInfo);
+ instanceInfos.add(instanceInfo);
+ }
+
+ return instanceInfos;
+ }
+
+ public HeartBeatContext getHeartBeatContext(long agentNodeId) {
+ log.info("start to get heartbeat context");
+ HeartBeatContext ctx = new HeartBeatContext();
+ ctx.setEvents(getHeartbeat(agentNodeId));
+ ctx.setInstanceInfos(getInstanceInfo(agentNodeId));
+ return ctx;
+ }
+
// Handle the result of the heartbeat event of the agent
public void dealHeartbeatResult(List<HeartBeatEventResult> eventResults) {
for (HeartBeatEventResult eventResult : eventResults) {
@@ -101,6 +175,38 @@ public class ResourceClusterNodeService {
}
}
+ public void dealHeartbeatContext(HeartBeatResult ctx) {
+ log.info("deal heart beat context {}", ctx);
+
+ if (ctx.getEventResults() == null) {
+ log.warn("no events to deal");
+ } else {
+ dealHeartbeatResult(ctx.getEventResults());
+ }
+
+ if (ctx.getStateResults() == null) {
+ log.warn("no instance state result to deal");
+ } else {
+ for (InstanceStateResult stateResult : ctx.getStateResults()) {
+ dealInstanceState(stateResult);
+ }
+ }
+ }
+
+ public void dealInstanceState(InstanceStateResult stateResult) {
+ log.info("update module {} instance {} state is {}", stateResult.getModuleName(), stateResult.getInstanceId(),
+ stateResult.getState());
+
+ Optional<ClusterInstanceEntity> instanceEntityOpt = instanceRepository.findById(stateResult.getInstanceId());
+ if (!instanceEntityOpt.isPresent()) {
+ log.error("instance {} does not exists", stateResult.getInstanceId());
+ }
+
+ ClusterInstanceEntity instanceEntity = instanceEntityOpt.get();
+ instanceEntity.setCurrentState(stateResult.getState().getValue());
+ instanceRepository.save(instanceEntity);
+ }
+
public void operateAgent(long nodeId, String operateType) throws Exception {
HeartBeatEventType eventType = HeartBeatEventType.valueOf(operateType);
@@ -149,4 +255,63 @@ public class ResourceClusterNodeService {
return;
}
}
+
+ @Transactional(rollbackFor = Exception.class)
+ public void updateInstancesState(ResourceNodeEntity node, int state) {
+ log.info("update instances of node {} {} to state {}", node.getId(), node.getHost(), state);
+ List<ClusterInstanceEntity> instanceEntities = instanceRepository.getByNodeId(node.getId());
+ if (instanceEntities.isEmpty()) {
+ log.warn("node {} does hava any instances", node.getId());
+ return;
+ }
+
+ for (ClusterInstanceEntity ins : instanceEntities) {
+ log.info("update instance {} of module {} to {}", ins.getId(), ins.getModuleId(), state);
+ ins.setCurrentState(state);
+ instanceRepository.save(ins);
+ }
+ }
+
+ @Scheduled(cron = "0/60 * * * * ?")
+ public void agentNodeStateCheck() {
+ log.info("start to check agent nodes state");
+ List<ResourceNodeEntity> nodes = nodeRepository.findAll();
+ if (nodes.isEmpty()) {
+ log.info("no any agent nodes");
+ return;
+ }
+
+ for (ResourceNodeEntity node : nodes) {
+ Timestamp lastTime = node.getLastHeartBeatTimestamp();
+ if (lastTime == null) {
+ log.warn("not receive heartbeat yet form node {} {}", node.getId(), node.getHost());
+ if (node.getCurrentState() != ModelControlState.INIT.getValue()) {
+ node.setCurrentState(ModelControlState.INIT.getValue());
+ nodeRepository.save(node);
+ }
+ continue;
+ }
+
+ log.info("node {} {} last heartbeat time {}", node.getId(), node.getHost(),
+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(lastTime));
+ if (System.currentTimeMillis() - lastTime.getTime() > 60 * 1000) {
+ log.warn("node {} heartbeat timeout", node.getId());
+ log.warn("update node {} form {} to {}", node.getId(), node.getCurrentState(),
+ ModelControlState.UNKNOWN.getValue());
+ if (node.getCurrentState() != ModelControlState.UNKNOWN.getValue()) {
+ node.setCurrentState(ModelControlState.UNKNOWN.getValue());
+ log.info("update all instance of node {} to UNKNOWN", node.getId());
+ updateInstancesState(node, ModelControlState.UNKNOWN.getValue());
+ }
+ } else {
+ log.info("node {} heartbeat state normal", node.getId());
+ log.warn("update node {} form {} to {}", node.getId(), node.getCurrentState(),
+ ModelControlState.RUNNING.getValue());
+ if (node.getCurrentState() != ModelControlState.RUNNING.getValue()) {
+ node.setCurrentState(ModelControlState.RUNNING.getValue());
+ }
+ }
+ nodeRepository.save(node);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org