You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by zh...@apache.org on 2019/10/28 12:18:50 UTC
[incubator-doris] branch master updated: Awareness of Backend down
when loading data (#2076)
This is an automated email from the ASF dual-hosted git repository.
zhaoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5e3ba03 Awareness of Backend down when loading data (#2076)
5e3ba03 is described below
commit 5e3ba03b5215cfddabfed531d3988e79a6c17f40
Author: Mingyu Chen <mo...@163.com>
AuthorDate: Mon Oct 28 20:18:45 2019 +0800
Awareness of Backend down when loading data (#2076)
---
.../java/org/apache/doris/analysis/InsertStmt.java | 7 +-
.../org/apache/doris/load/loadv2/LoadTask.java | 6 +-
.../load/routineload/RoutineLoadTaskScheduler.java | 4 +-
.../java/org/apache/doris/planner/Planner.java | 23 +-
.../main/java/org/apache/doris/qe/Coordinator.java | 329 ++++++++++++---------
.../org/apache/doris/rpc/BackendServiceProxy.java | 16 +-
.../java/org/apache/doris/rpc/RpcException.java | 18 +-
.../main/java/org/apache/doris/system/Backend.java | 9 +-
8 files changed, 223 insertions(+), 189 deletions(-)
diff --git a/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java b/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java
index 13fddc6..d554449 100644
--- a/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java
@@ -238,9 +238,8 @@ public class InsertStmt extends DdlStmt {
return uuid;
}
- // Only valid when this statement is streaming
- public OlapTableSink getOlapTableSink() {
- return (OlapTableSink) dataSink;
+ public DataSink getDataSink() {
+ return dataSink;
}
public Database getDbObj() {
@@ -706,7 +705,7 @@ public class InsertStmt extends DdlStmt {
}
}
- public DataSink createDataSink() throws AnalysisException {
+ private DataSink createDataSink() throws AnalysisException {
if (dataSink != null) {
return dataSink;
}
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadTask.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadTask.java
index 012e570..e9b286f 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadTask.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadTask.java
@@ -50,10 +50,14 @@ public abstract class LoadTask extends MasterTask {
// callback on pending task finished
callback.onTaskFinished(attachment);
isFinished = true;
+ } catch (UserException e) {
+ failMsg.setMsg(e.getMessage() == null ? "" : e.getMessage());
+ LOG.warn(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId())
+ .add("error_msg", "Failed to execute load task").build());
} catch (Exception e) {
failMsg.setMsg(e.getMessage() == null ? "" : e.getMessage());
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId())
- .add("error_msg", "Failed to execute load task").build(), e);
+ .add("error_msg", "Unexpected failed to execute load task").build(), e);
} finally {
if (!isFinished) {
// callback on pending task failed
diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
index c41b8d7..763d619 100644
--- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
+++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
@@ -138,8 +138,8 @@ public class RoutineLoadTaskScheduler extends Daemon {
// todo(ml): if cluster has been deleted, the job will be cancelled.
needScheduleTasksQueue.put(routineLoadTaskInfo);
LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, routineLoadTaskInfo.getId())
- .add("error_msg", "put task to the rear of queue with error " + e.getMessage())
- .build(), e);
+ .add("error_msg", "put task to the rear of queue with error " + e.getMessage())
+ .build());
return;
}
diff --git a/fe/src/main/java/org/apache/doris/planner/Planner.java b/fe/src/main/java/org/apache/doris/planner/Planner.java
index 3b34fc8..15db8c5 100644
--- a/fe/src/main/java/org/apache/doris/planner/Planner.java
+++ b/fe/src/main/java/org/apache/doris/planner/Planner.java
@@ -132,21 +132,21 @@ public class Planner {
* Create plan fragments for an analyzed statement, given a set of execution options. The fragments are returned in
* a list such that element i of that list can only consume output of the following fragments j > i.
*/
- public void createPlanFragments(StatementBase statment, Analyzer analyzer, TQueryOptions queryOptions)
+ public void createPlanFragments(StatementBase statement, Analyzer analyzer, TQueryOptions queryOptions)
throws NotImplementedException, UserException, AnalysisException {
QueryStmt queryStmt;
- if (statment instanceof InsertStmt) {
- queryStmt = ((InsertStmt) statment).getQueryStmt();
+ if (statement instanceof InsertStmt) {
+ queryStmt = ((InsertStmt) statement).getQueryStmt();
} else {
- queryStmt = (QueryStmt) statment;
+ queryStmt = (QueryStmt) statement;
}
- plannerContext = new PlannerContext(analyzer, queryStmt, queryOptions, statment);
+ plannerContext = new PlannerContext(analyzer, queryStmt, queryOptions, statement);
singleNodePlanner = new SingleNodePlanner(plannerContext);
PlanNode singleNodePlan = singleNodePlanner.createSingleNodePlan();
- if (statment instanceof InsertStmt) {
- InsertStmt insertStmt = (InsertStmt) statment;
+ if (statement instanceof InsertStmt) {
+ InsertStmt insertStmt = (InsertStmt) statement;
insertStmt.prepareExpressions();
}
@@ -176,13 +176,12 @@ public class Planner {
QueryStatisticsTransferOptimizer queryStatisticTransferOptimizer = new QueryStatisticsTransferOptimizer(rootFragment);
queryStatisticTransferOptimizer.optimizeQueryStatisticsTransfer();
- if (statment instanceof InsertStmt) {
- InsertStmt insertStmt = (InsertStmt) statment;
-
+ if (statement instanceof InsertStmt) {
+ InsertStmt insertStmt = (InsertStmt) statement;
rootFragment = distributedPlanner.createInsertFragment(rootFragment, insertStmt, fragments);
- rootFragment.setSink(insertStmt.createDataSink());
+ rootFragment.setSink(insertStmt.getDataSink());
insertStmt.finalize();
- ArrayList<Expr> exprs = ((InsertStmt) statment).getResultExprs();
+ ArrayList<Expr> exprs = ((InsertStmt) statement).getResultExprs();
List<Expr> resExprs = Expr.substituteList(
exprs, rootFragment.getPlanRoot().getOutputSmap(), analyzer, true);
rootFragment.setOutputExprs(resExprs);
diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java
index 46237ab..19042a3 100644
--- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -48,6 +48,7 @@ import org.apache.doris.planner.ScanNode;
import org.apache.doris.planner.UnionNode;
import org.apache.doris.proto.PExecPlanFragmentResult;
import org.apache.doris.proto.PPlanFragmentCancelReason;
+import org.apache.doris.qe.QueryStatisticsItem.FragmentInstanceInfo;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.service.FrontendOptions;
@@ -98,7 +99,6 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -154,10 +154,10 @@ public class Coordinator {
private List<PlanFragment> fragments;
// backend execute state
private List<BackendExecState> backendExecStates = Lists.newArrayList();
+ // backend which state need to be checked when joining this coordinator.
+ // It is supposed to be the subset of backendExecStates.
+ private List<BackendExecState> needCheckBackendExecStates = Lists.newArrayList();
private ResultReceiver receiver;
- // fragment instance id to backend state
- private ConcurrentMap<TUniqueId, BackendExecState> backendExecStateMap =
- Maps.newConcurrentMap();
private List<ScanNode> scanNodes;
// number of instances of this query, equals to
// number of backends executing plan fragments on behalf of this query;
@@ -190,7 +190,9 @@ public class Coordinator {
// parallel execute
private final TUniqueId nextInstanceId;
- // Used for query
+ private boolean isQueryCoordinator;
+
+ // Used for query/insert
public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) {
this.isBlockQuery = planner.isBlockQuery();
this.queryId = context.queryId();
@@ -284,12 +286,12 @@ public class Coordinator {
lock.lock();
try {
this.backendExecStates.clear();
- this.backendExecStateMap.clear();
this.queryStatus.setStatus(new Status());
if (this.exportFiles == null) {
this.exportFiles = Lists.newArrayList();
}
this.exportFiles.clear();
+ this.needCheckBackendExecStates.clear();
} finally {
lock.unlock();
}
@@ -299,9 +301,8 @@ public class Coordinator {
return commitInfos;
}
- // Initiate
+ // Initialize
private void prepare() {
-
for (PlanFragment fragment : fragments) {
fragmentExecParamsMap.put(fragment.getFragmentId(), new FragmentExecParams(fragment));
}
@@ -327,7 +328,7 @@ public class Coordinator {
queryProfile.addChild(fragmentProfile.get(i));
}
- this.idToBackend = Catalog.getCurrentSystemInfo().getBackendsInCluster(clusterName);
+ this.idToBackend = Catalog.getCurrentSystemInfo().getIdToBackend();
if (LOG.isDebugEnabled()) {
LOG.debug("idToBackend size={}", idToBackend.size());
for (Map.Entry<Long, Backend> entry : idToBackend.entrySet()) {
@@ -401,6 +402,7 @@ public class Coordinator {
queryOptions.query_timeout * 1000);
} else {
// This is a load process.
+ Preconditions.checkState(queryOptions.getQuery_type() == TQueryType.LOAD);
this.queryOptions.setIs_report_success(true);
deltaUrls = Lists.newArrayList();
loadCounters = Maps.newHashMap();
@@ -438,16 +440,26 @@ public class Coordinator {
tParam.query_options.setMem_limit(newmemory);
}
}
+
+ boolean needCheckBackendState = false;
+ if (queryOptions.getQuery_type() == TQueryType.LOAD && profileFragmentId == 0) {
+ // this is a load process, and it is the first fragment.
+ // we should add all BackendExecState of this fragment to needCheckBackendExecStates,
+ // so that we can check these backends' state when joining this Coordinator
+ needCheckBackendState = true;
+ }
int instanceId = 0;
for (TExecPlanFragmentParams tParam : tParams) {
// TODO: pool of pre-formatted BackendExecStates?
- BackendExecState execState =
- new BackendExecState(fragment.getFragmentId(), instanceId++,
+ BackendExecState execState = new BackendExecState(fragment.getFragmentId(), instanceId++,
profileFragmentId, tParam, this.addressToBackendID);
backendExecStates.add(execState);
- backendExecStateMap.put(tParam.params.getFragment_instance_id(), execState);
-
+ if (needCheckBackendState) {
+ needCheckBackendExecStates.add(execState);
+ LOG.debug("add need check backend {} for fragment, {} job: {}", execState.backend.getId(),
+ fragment.getFragmentId().asInt(), jobId);
+ }
futures.add(Pair.create(execState, execState.execRemoteFragmentAsync()));
backendId++;
@@ -476,7 +488,7 @@ public class Coordinator {
if (code != TStatusCode.OK) {
if (errMsg == null) {
- errMsg = "exec rpc error. backend id: " + pair.first.backendId;
+ errMsg = "exec rpc error. backend id: " + pair.first.backend.getId();
}
queryStatus.setStatus(errMsg);
LOG.warn("exec plan fragment failed, errmsg={}, fragmentId={}, backend={}:{}",
@@ -484,13 +496,13 @@ public class Coordinator {
pair.first.address.hostname, pair.first.address.port);
cancelInternal(PPlanFragmentCancelReason.INTERNAL_ERROR);
switch (code) {
- case TIMEOUT:
- throw new UserException("query timeout. backend id: " + pair.first.backendId);
- case THRIFT_RPC_ERROR:
- SimpleScheduler.updateBlacklistBackends(pair.first.backendId);
- throw new RpcException("rpc failed. backend id: " + pair.first.backendId);
- default:
- throw new UserException(errMsg);
+ case TIMEOUT:
+ throw new UserException("query timeout. backend id: " + pair.first.backend.getId());
+ case THRIFT_RPC_ERROR:
+ SimpleScheduler.updateBlacklistBackends(pair.first.backend.getId());
+ throw new RpcException(pair.first.backend.getHost(), "rpc failed");
+ default:
+ throw new UserException(errMsg);
}
}
}
@@ -631,7 +643,7 @@ public class Coordinator {
copyStatus.rewriteErrorMsg();
}
if (copyStatus.isRpcError()) {
- throw new RpcException(copyStatus.getErrorMsg());
+ throw new RpcException("unknown", copyStatus.getErrorMsg());
} else {
String errMsg = copyStatus.getErrorMsg();
LOG.warn("query failed: {}", errMsg);
@@ -695,41 +707,7 @@ public class Coordinator {
private void cancelRemoteFragmentsAsync(PPlanFragmentCancelReason cancelReason) {
for (BackendExecState backendExecState : backendExecStates) {
- TNetworkAddress address = backendExecState.getBackendAddress();
- LOG.debug("cancelRemoteFragments initiated={} done={} hasCanceled={} ip={} port={} fragment instance id={}, reason: {}",
- backendExecState.initiated, backendExecState.done, backendExecState.hasCanceled,
- address.hostname, address.port, DebugUtil.printId(backendExecState.getFragmentInstanceId()),
- cancelReason.name());
-
- backendExecState.lock();
- try {
- if (!backendExecState.initiated) {
- continue;
- }
- // don't cancel if it is already finished
- if (backendExecState.done) {
- continue;
- }
- if (backendExecState.hasCanceled) {
- continue;
- }
- TNetworkAddress brpcAddress = toBrpcHost(address);
-
- try {
- BackendServiceProxy.getInstance().cancelPlanFragmentAsync(
- brpcAddress, backendExecState.getFragmentInstanceId(), cancelReason);
- } catch (RpcException e) {
- LOG.warn("cancel plan fragment get a exception, address={}:{}",
- brpcAddress.getHostname(), brpcAddress.getPort());
- SimpleScheduler.updateBlacklistBackends(addressToBackendID.get(brpcAddress));
- }
-
- backendExecState.hasCanceled = true;
- } catch (Exception e) {
- LOG.warn("catch a exception", e);
- } finally {
- backendExecState.unlock();
- }
+ backendExecState.cancelFragmentInstance(cancelReason);
}
}
@@ -1136,27 +1114,15 @@ public class Coordinator {
return;
}
- boolean done = false;
BackendExecState execState = backendExecStates.get(params.backend_num);
- execState.lock();
- try {
- if (execState.done) {
- // duplicate packet
- return;
- }
- if (params.isSetProfile()) {
- execState.profile.update(params.profile);
- }
- done = params.done;
- execState.done = params.done;
- } finally {
- execState.unlock();
+ if (!execState.updateProfile(params)) {
+ return;
}
// print fragment instance profile
if (LOG.isDebugEnabled()) {
StringBuilder builder = new StringBuilder();
- execState.profile().prettyPrint(builder, "");
+ execState.printProfile(builder);
LOG.debug("profile for query_id={} instance_id={}\n{}",
DebugUtil.printId(queryId),
DebugUtil.printId(params.getFragment_instance_id()),
@@ -1172,7 +1138,7 @@ public class Coordinator {
DebugUtil.printId(queryId), DebugUtil.printId(params.getFragment_instance_id()));
updateStatus(status, params.getFragment_instance_id());
}
- if (done) {
+ if (execState.done) {
if (params.isSetDelta_urls()) {
updateDeltas(params.getDelta_urls());
}
@@ -1218,15 +1184,59 @@ public class Coordinator {
}
}
- public boolean join(int seconds) {
- try {
- return profileDoneSignal.await(seconds, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- // Do nothing
+ /*
+ * Waiting the coordinator finish executing.
+ * return false if waiting timeout.
+ * return true otherwise.
+ * NOTICE: return true does not mean that coordinator executed success,
+ * the caller should check queryStatus for result.
+ *
+ * We divide the entire waiting process into multiple rounds,
+ * with a maximum of 30 seconds per round. And after each round of waiting,
+ * check the status of the BE. If the BE status is abnormal, the wait is ended
+ * and the result is returned. Otherwise, continue to the next round of waiting.
+ * This method mainly avoids the problem that the Coordinator waits for a long time
+ * after some BE can no long return the result due to some exception, such as BE is down.
+ */
+ public boolean join(int timeoutS) {
+ final long fixedMaxWaitTime = 30;
+
+ long leftTimeoutS = timeoutS;
+ while (leftTimeoutS > 0) {
+ long waitTime = Math.min(leftTimeoutS, fixedMaxWaitTime);
+ boolean awaitRes = false;
+ try {
+ awaitRes = profileDoneSignal.await(waitTime, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ // Do nothing
+ }
+ if (awaitRes) {
+ return true;
+ }
+
+ if (!checkBackendState()) {
+ return true;
+ }
+
+ leftTimeoutS -= waitTime;
}
return false;
}
+ /*
+ * Check the state of backends in needCheckBackendExecStates.
+ * return true if all of them are OK. Otherwise, return false.
+ */
+ private boolean checkBackendState() {
+ for (BackendExecState backendExecState : needCheckBackendExecStates) {
+ if (!backendExecState.isBackendStateHealthy()) {
+ queryStatus = new Status(TStatusCode.INTERNAL_ERROR, "backend " + backendExecState.backend.getId() + " is down");
+ return false;
+ }
+ }
+ return true;
+ }
+
public boolean isDone() {
return profileDoneSignal.getCount() == 0;
}
@@ -1249,45 +1259,17 @@ public class Coordinator {
// TODO(zhaochun): add profile information and others
public class BackendExecState {
TExecPlanFragmentParams rpcParams;
- private PlanFragmentId fragmentId;
- private int instanceId;
- private boolean initiated;
- private boolean done;
- private boolean hasCanceled;
- private Lock lock = new ReentrantLock();
- private int profileFragmentId;
+ PlanFragmentId fragmentId;
+ int instanceId;
+ boolean initiated;
+ boolean done;
+ boolean hasCanceled;
+ int profileFragmentId;
RuntimeProfile profile;
TNetworkAddress address;
- Long backendId;
-
- public int profileFragmentId() {
- return profileFragmentId;
- }
-
- public boolean initiated() {
- return initiated;
- }
-
- public RuntimeProfile profile() {
- return profile;
- }
-
- public void lock() {
- lock.lock();
- }
-
- public void unlock() {
- lock.unlock();
- }
-
- public int getInstanceId() {
- return instanceId;
- }
-
- public PlanFragmentId getFragmentId() {
- return fragmentId;
- }
-
+ Backend backend;
+ long lastMissingHeartbeatTime = -1;
+
public BackendExecState(PlanFragmentId fragmentId, int instanceId, int profileFragmentId,
TExecPlanFragmentParams rpcParams, Map<TNetworkAddress, Long> addressToBackendID) {
this.profileFragmentId = profileFragmentId;
@@ -1297,37 +1279,111 @@ public class Coordinator {
this.initiated = false;
this.done = false;
this.address = fragmentExecParamsMap.get(fragmentId).instanceExecParams.get(instanceId).host;
- this.backendId = addressToBackendID.get(address);
+ this.backend = idToBackend.get(addressToBackendID.get(address));
String name = "Instance " + DebugUtil.printId(fragmentExecParamsMap.get(fragmentId)
.instanceExecParams.get(instanceId).instanceId) + " (host=" + address + ")";
this.profile = new RuntimeProfile(name);
this.hasCanceled = false;
+ this.lastMissingHeartbeatTime = backend.getLastMissingHeartbeatTime();
}
- public TNetworkAddress getBackendAddress() {
- return address;
+ // update profile.
+ // return true if profile is updated. Otherwise, return false.
+ public synchronized boolean updateProfile(TReportExecStatusParams params) {
+ if (this.done) {
+ // duplicate packet
+ return false;
+ }
+ if (params.isSetProfile()) {
+ profile.update(params.profile);
+ }
+ this.done = params.done;
+ return true;
}
- public TUniqueId getFragmentInstanceId() {
- return this.rpcParams.params.getFragment_instance_id();
+ public synchronized void printProfile(StringBuilder builder) {
+ this.profile.prettyPrint(builder, "");
+ }
+
+ // cancel the fragment instance.
+ // return true if cancel success. Otherwise, return false
+ public synchronized boolean cancelFragmentInstance(PPlanFragmentCancelReason cancelReason) {
+ LOG.debug("cancelRemoteFragments initiated={} done={} hasCanceled={} backend: {}, fragment instance id={}, reason: {}",
+ this.initiated, this.done, this.hasCanceled, backend.getId(),
+ DebugUtil.printId(fragmentInstanceId()), cancelReason.name());
+ try {
+ if (!this.initiated) {
+ return false;
+ }
+ // don't cancel if it is already finished
+ if (this.done) {
+ return false;
+ }
+ if (this.hasCanceled) {
+ return false;
+ }
+ TNetworkAddress brpcAddress = toBrpcHost(address);
+
+ try {
+ BackendServiceProxy.getInstance().cancelPlanFragmentAsync(brpcAddress,
+ fragmentInstanceId(), cancelReason);
+ } catch (RpcException e) {
+ LOG.warn("cancel plan fragment get a exception, address={}:{}", brpcAddress.getHostname(),
+ brpcAddress.getPort());
+ SimpleScheduler.updateBlacklistBackends(addressToBackendID.get(brpcAddress));
+ }
+
+ this.hasCanceled = true;
+ } catch (Exception e) {
+ LOG.warn("catch a exception", e);
+ return false;
+ }
+ return true;
+ }
+
+ public synchronized boolean computeTimeInProfile(int maxFragmentId) {
+ if (this.profileFragmentId < 0 || this.profileFragmentId > maxFragmentId) {
+ LOG.warn("profileFragmentId {} should be in [0, {})", profileFragmentId, maxFragmentId);
+ return false;
+ }
+ profile.computeTimeInProfile();
+ return true;
+ }
+
+ public boolean isBackendStateHealthy() {
+ if (backend.getLastMissingHeartbeatTime() > lastMissingHeartbeatTime) {
+ LOG.warn("backend {} is down while joining the coordinator. job id: {}", backend.getId(), jobId);
+ return false;
+ }
+ return true;
}
public Future<PExecPlanFragmentResult> execRemoteFragmentAsync() throws TException, RpcException {
TNetworkAddress brpcAddress = null;
try {
- brpcAddress = toBrpcHost(address);
+ brpcAddress = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
} catch (Exception e) {
throw new TException(e.getMessage());
}
- initiated = true;
+ this.initiated = true;
try {
return BackendServiceProxy.getInstance().execPlanFragmentAsync(brpcAddress, rpcParams);
} catch (RpcException e) {
- SimpleScheduler.updateBlacklistBackends(backendId);
+ SimpleScheduler.updateBlacklistBackends(backend.getId());
throw e;
}
}
+
+ public FragmentInstanceInfo buildFragmentInstanceInfo() {
+ return new QueryStatisticsItem.FragmentInstanceInfo.Builder()
+ .instanceId(fragmentInstanceId()).fragmentId(String.valueOf(fragmentId)).address(this.address)
+ .build();
+ }
+
+ private TUniqueId fragmentInstanceId() {
+ return this.rpcParams.params.getFragment_instance_id();
+ }
}
// execution parameters for a single fragment,
@@ -1483,17 +1539,11 @@ public class Coordinator {
final List<QueryStatisticsItem.FragmentInstanceInfo> result =
Lists.newArrayList();
for (int index = 0; index < fragments.size(); index++) {
- for (Map.Entry<TUniqueId, BackendExecState> entry: backendExecStateMap.entrySet()) {
- final BackendExecState backendExecState = entry.getValue();
- if (fragments.get(index).getFragmentId() != backendExecState.getFragmentId()) {
+ for (BackendExecState backendExecState: backendExecStates) {
+ if (fragments.get(index).getFragmentId() != backendExecState.fragmentId) {
continue;
}
- final QueryStatisticsItem.FragmentInstanceInfo info
- = new QueryStatisticsItem.FragmentInstanceInfo.Builder()
- .instanceId(entry.getValue().getFragmentInstanceId())
- .fragmentId(String.valueOf(index))
- .address(backendExecState.getBackendAddress())
- .build();
+ final QueryStatisticsItem.FragmentInstanceInfo info = backendExecState.buildFragmentInstanceInfo();
result.add(info);
}
}
@@ -1501,20 +1551,11 @@ public class Coordinator {
}
private void attachInstanceProfileToFragmentProfile() {
- for (int i = 0; i < backendExecStates.size(); ++i) {
- if (backendExecStates.get(i) == null) {
- continue;
- }
- BackendExecState backendExecState = backendExecStates.get(i);
- backendExecState.profile().computeTimeInProfile();
-
- int profileFragmentId = backendExecState.profileFragmentId();
- if (profileFragmentId < 0 || profileFragmentId > fragmentProfile.size()) {
- LOG.error("profileFragmentId " + profileFragmentId
- + " should be in [0," + fragmentProfile.size() + ")");
+ for (BackendExecState backendExecState : backendExecStates) {
+ if (!backendExecState.computeTimeInProfile(fragmentProfile.size())) {
return;
}
- fragmentProfile.get(profileFragmentId).addChild(backendExecState.profile());
+ fragmentProfile.get(backendExecState.profileFragmentId).addChild(backendExecState.profile);
}
}
}
diff --git a/fe/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index 18dc80e..2a45c6d 100644
--- a/fe/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -20,9 +20,9 @@ package org.apache.doris.rpc;
import org.apache.doris.common.Config;
import org.apache.doris.proto.PCancelPlanFragmentRequest;
import org.apache.doris.proto.PCancelPlanFragmentResult;
-import org.apache.doris.proto.PPlanFragmentCancelReason;
import org.apache.doris.proto.PExecPlanFragmentResult;
import org.apache.doris.proto.PFetchDataResult;
+import org.apache.doris.proto.PPlanFragmentCancelReason;
import org.apache.doris.proto.PProxyRequest;
import org.apache.doris.proto.PProxyResult;
import org.apache.doris.proto.PTriggerProfileReportResult;
@@ -102,12 +102,12 @@ public class BackendServiceProxy {
} catch (NoSuchElementException noSuchElementException) {
LOG.warn("Execute plan fragment retry failed, address={}:{}",
address.getHostname(), address.getPort(), noSuchElementException);
- throw new RpcException(e.getMessage());
+ throw new RpcException(address.hostname, e.getMessage());
}
} catch (Throwable e) {
LOG.warn("Execute plan fragment catch a exception, address={}:{}",
address.getHostname(), address.getPort(), e);
- throw new RpcException(e.getMessage());
+ throw new RpcException(address.hostname, e.getMessage());
}
}
@@ -135,12 +135,12 @@ public class BackendServiceProxy {
} catch (NoSuchElementException noSuchElementException) {
LOG.warn("Cancel plan fragment retry failed, address={}:{}",
address.getHostname(), address.getPort(), noSuchElementException);
- throw new RpcException(e.getMessage());
+ throw new RpcException(address.hostname, e.getMessage());
}
} catch (Throwable e) {
LOG.warn("Cancel plan fragment catch a exception, address={}:{}",
address.getHostname(), address.getPort(), e);
- throw new RpcException(e.getMessage());
+ throw new RpcException(address.hostname, e.getMessage());
}
}
@@ -152,7 +152,7 @@ public class BackendServiceProxy {
} catch (Throwable e) {
LOG.warn("fetch data catch a exception, address={}:{}",
address.getHostname(), address.getPort(), e);
- throw new RpcException(e.getMessage());
+ throw new RpcException(address.hostname, e.getMessage());
}
}
@@ -164,7 +164,7 @@ public class BackendServiceProxy {
} catch (Throwable e) {
LOG.warn("fetch data catch a exception, address={}:{}",
address.getHostname(), address.getPort(), e);
- throw new RpcException(e.getMessage());
+ throw new RpcException(address.hostname, e.getMessage());
}
}
@@ -175,7 +175,7 @@ public class BackendServiceProxy {
return service.getInfo(request);
} catch (Throwable e) {
LOG.warn("failed to get info, address={}:{}", address.getHostname(), address.getPort(), e);
- throw new RpcException(e.getMessage());
+ throw new RpcException(address.hostname, e.getMessage());
}
}
}
diff --git a/fe/src/main/java/org/apache/doris/rpc/RpcException.java b/fe/src/main/java/org/apache/doris/rpc/RpcException.java
index 5166dde..8648b1c 100644
--- a/fe/src/main/java/org/apache/doris/rpc/RpcException.java
+++ b/fe/src/main/java/org/apache/doris/rpc/RpcException.java
@@ -18,22 +18,8 @@
package org.apache.doris.rpc;
public class RpcException extends Exception {
- public RpcException() {
- }
-
- public RpcException(String message) {
- super(message);
- }
-
- public RpcException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public RpcException(Throwable cause) {
- super(cause);
- }
- public RpcException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
- super(message, cause, enableSuppression, writableStackTrace);
+ public RpcException(String host, String message) {
+ super(message + ", host: " + host);
}
}
diff --git a/fe/src/main/java/org/apache/doris/system/Backend.java b/fe/src/main/java/org/apache/doris/system/Backend.java
index 961a5cb..e86530c 100644
--- a/fe/src/main/java/org/apache/doris/system/Backend.java
+++ b/fe/src/main/java/org/apache/doris/system/Backend.java
@@ -88,6 +88,8 @@ public class Backend implements Writable {
// after init it, this variable is set to true.
private boolean initPathInfo = false;
+ long lastMissingHeartbeatTime = -1;
+
public Backend() {
this.host = "";
this.lastUpdateMs = new AtomicLong();
@@ -235,6 +237,10 @@ public class Backend implements Writable {
this.lastStartTime.set(currentTime);
}
+ public long getLastMissingHeartbeatTime() {
+ return lastMissingHeartbeatTime;
+ }
+
public boolean isAlive() {
return this.isAlive.get();
}
@@ -426,8 +432,6 @@ public class Backend implements Writable {
// log disk changing
Catalog.getInstance().getEditLog().logBackendStateChange(this);
}
-
-
}
public static Backend read(DataInput in) throws IOException {
@@ -607,6 +611,7 @@ public class Backend implements Writable {
}
heartbeatErrMsg = hbResponse.getMsg() == null ? "Unknown error" : hbResponse.getMsg();
+ lastMissingHeartbeatTime = System.currentTimeMillis();
}
return isChanged;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org