You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by zh...@apache.org on 2019/10/28 12:18:50 UTC

[incubator-doris] branch master updated: Awareness of Backend down when loading data (#2076)

This is an automated email from the ASF dual-hosted git repository.

zhaoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e3ba03  Awareness of Backend down when loading data (#2076)
5e3ba03 is described below

commit 5e3ba03b5215cfddabfed531d3988e79a6c17f40
Author: Mingyu Chen <mo...@163.com>
AuthorDate: Mon Oct 28 20:18:45 2019 +0800

    Awareness of Backend down when loading data (#2076)
---
 .../java/org/apache/doris/analysis/InsertStmt.java |   7 +-
 .../org/apache/doris/load/loadv2/LoadTask.java     |   6 +-
 .../load/routineload/RoutineLoadTaskScheduler.java |   4 +-
 .../java/org/apache/doris/planner/Planner.java     |  23 +-
 .../main/java/org/apache/doris/qe/Coordinator.java | 329 ++++++++++++---------
 .../org/apache/doris/rpc/BackendServiceProxy.java  |  16 +-
 .../java/org/apache/doris/rpc/RpcException.java    |  18 +-
 .../main/java/org/apache/doris/system/Backend.java |   9 +-
 8 files changed, 223 insertions(+), 189 deletions(-)

diff --git a/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java b/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java
index 13fddc6..d554449 100644
--- a/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/doris/analysis/InsertStmt.java
@@ -238,9 +238,8 @@ public class InsertStmt extends DdlStmt {
         return uuid;
     }
 
-    // Only valid when this statement is streaming
-    public OlapTableSink getOlapTableSink() {
-        return (OlapTableSink) dataSink;
+    public DataSink getDataSink() {
+        return dataSink;
     }
 
     public Database getDbObj() {
@@ -706,7 +705,7 @@ public class InsertStmt extends DdlStmt {
         }
     }
 
-    public DataSink createDataSink() throws AnalysisException {
+    private DataSink createDataSink() throws AnalysisException {
         if (dataSink != null) {
             return dataSink;
         }
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadTask.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadTask.java
index 012e570..e9b286f 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadTask.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadTask.java
@@ -50,10 +50,14 @@ public abstract class LoadTask extends MasterTask {
             // callback on pending task finished
             callback.onTaskFinished(attachment);
             isFinished = true;
+        } catch (UserException e) {
+            failMsg.setMsg(e.getMessage() == null ? "" : e.getMessage());
+            LOG.warn(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId())
+                    .add("error_msg", "Failed to execute load task").build());
         } catch (Exception e) {
             failMsg.setMsg(e.getMessage() == null ? "" : e.getMessage());
             LOG.warn(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId())
-                             .add("error_msg", "Failed to execute load task").build(), e);
+                    .add("error_msg", "Unexpected failed to execute load task").build(), e);
         } finally {
             if (!isFinished) {
                 // callback on pending task failed
diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
index c41b8d7..763d619 100644
--- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
+++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
@@ -138,8 +138,8 @@ public class RoutineLoadTaskScheduler extends Daemon {
             // todo(ml): if cluster has been deleted, the job will be cancelled.
             needScheduleTasksQueue.put(routineLoadTaskInfo);
             LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, routineLoadTaskInfo.getId())
-                             .add("error_msg", "put task to the rear of queue with error " + e.getMessage())
-                             .build(), e);
+                    .add("error_msg", "put task to the rear of queue with error " + e.getMessage())
+                    .build());
             return;
         }
 
diff --git a/fe/src/main/java/org/apache/doris/planner/Planner.java b/fe/src/main/java/org/apache/doris/planner/Planner.java
index 3b34fc8..15db8c5 100644
--- a/fe/src/main/java/org/apache/doris/planner/Planner.java
+++ b/fe/src/main/java/org/apache/doris/planner/Planner.java
@@ -132,21 +132,21 @@ public class Planner {
      * Create plan fragments for an analyzed statement, given a set of execution options. The fragments are returned in
      * a list such that element i of that list can only consume output of the following fragments j > i.
      */
-    public void createPlanFragments(StatementBase statment, Analyzer analyzer, TQueryOptions queryOptions)
+    public void createPlanFragments(StatementBase statement, Analyzer analyzer, TQueryOptions queryOptions)
             throws NotImplementedException, UserException, AnalysisException {
         QueryStmt queryStmt;
-        if (statment instanceof InsertStmt) {
-            queryStmt = ((InsertStmt) statment).getQueryStmt();
+        if (statement instanceof InsertStmt) {
+            queryStmt = ((InsertStmt) statement).getQueryStmt();
         } else {
-            queryStmt = (QueryStmt) statment;
+            queryStmt = (QueryStmt) statement;
         }
 
-        plannerContext = new PlannerContext(analyzer, queryStmt, queryOptions, statment);
+        plannerContext = new PlannerContext(analyzer, queryStmt, queryOptions, statement);
         singleNodePlanner = new SingleNodePlanner(plannerContext);
         PlanNode singleNodePlan = singleNodePlanner.createSingleNodePlan();
 
-        if (statment instanceof InsertStmt) {
-            InsertStmt insertStmt = (InsertStmt) statment;
+        if (statement instanceof InsertStmt) {
+            InsertStmt insertStmt = (InsertStmt) statement;
             insertStmt.prepareExpressions();
         }
 
@@ -176,13 +176,12 @@ public class Planner {
         QueryStatisticsTransferOptimizer queryStatisticTransferOptimizer = new QueryStatisticsTransferOptimizer(rootFragment);
         queryStatisticTransferOptimizer.optimizeQueryStatisticsTransfer();
 
-        if (statment instanceof InsertStmt) {
-            InsertStmt insertStmt = (InsertStmt) statment;
-
+        if (statement instanceof InsertStmt) {
+            InsertStmt insertStmt = (InsertStmt) statement;
             rootFragment = distributedPlanner.createInsertFragment(rootFragment, insertStmt, fragments);
-            rootFragment.setSink(insertStmt.createDataSink());
+            rootFragment.setSink(insertStmt.getDataSink());
             insertStmt.finalize();
-            ArrayList<Expr> exprs = ((InsertStmt) statment).getResultExprs();
+            ArrayList<Expr> exprs = ((InsertStmt) statement).getResultExprs();
             List<Expr> resExprs = Expr.substituteList(
                     exprs, rootFragment.getPlanRoot().getOutputSmap(), analyzer, true);
             rootFragment.setOutputExprs(resExprs);
diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java
index 46237ab..19042a3 100644
--- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -48,6 +48,7 @@ import org.apache.doris.planner.ScanNode;
 import org.apache.doris.planner.UnionNode;
 import org.apache.doris.proto.PExecPlanFragmentResult;
 import org.apache.doris.proto.PPlanFragmentCancelReason;
+import org.apache.doris.qe.QueryStatisticsItem.FragmentInstanceInfo;
 import org.apache.doris.rpc.BackendServiceProxy;
 import org.apache.doris.rpc.RpcException;
 import org.apache.doris.service.FrontendOptions;
@@ -98,7 +99,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -154,10 +154,10 @@ public class Coordinator {
     private List<PlanFragment> fragments;
     // backend execute state
     private List<BackendExecState> backendExecStates = Lists.newArrayList();
+    // backend which state need to be checked when joining this coordinator.
+    // It is supposed to be the subset of backendExecStates.
+    private List<BackendExecState> needCheckBackendExecStates = Lists.newArrayList();
     private ResultReceiver receiver;
-    // fragment instance id to backend state
-    private ConcurrentMap<TUniqueId, BackendExecState> backendExecStateMap =
-            Maps.newConcurrentMap();
     private List<ScanNode> scanNodes;
     // number of instances of this query, equals to
     // number of backends executing plan fragments on behalf of this query;
@@ -190,7 +190,9 @@ public class Coordinator {
     // parallel execute
     private final TUniqueId nextInstanceId;
 
-    // Used for query
+    private boolean isQueryCoordinator;
+
+    // Used for query/insert
     public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) {
         this.isBlockQuery = planner.isBlockQuery();
         this.queryId = context.queryId();
@@ -284,12 +286,12 @@ public class Coordinator {
         lock.lock();
         try {
             this.backendExecStates.clear();
-            this.backendExecStateMap.clear();
             this.queryStatus.setStatus(new Status());
             if (this.exportFiles == null) {
                 this.exportFiles = Lists.newArrayList();
             }
             this.exportFiles.clear();
+            this.needCheckBackendExecStates.clear();
         } finally {
             lock.unlock();
         }
@@ -299,9 +301,8 @@ public class Coordinator {
         return commitInfos;
     }
 
-    // Initiate
+    // Initialize
     private void prepare() {
-        
         for (PlanFragment fragment : fragments) {
             fragmentExecParamsMap.put(fragment.getFragmentId(), new FragmentExecParams(fragment));
         }
@@ -327,7 +328,7 @@ public class Coordinator {
             queryProfile.addChild(fragmentProfile.get(i));
         }
 
-        this.idToBackend = Catalog.getCurrentSystemInfo().getBackendsInCluster(clusterName);
+        this.idToBackend = Catalog.getCurrentSystemInfo().getIdToBackend();
         if (LOG.isDebugEnabled()) {
             LOG.debug("idToBackend size={}", idToBackend.size());
             for (Map.Entry<Long, Backend> entry : idToBackend.entrySet()) {
@@ -401,6 +402,7 @@ public class Coordinator {
                     queryOptions.query_timeout * 1000);
         } else {
             // This is a load process.
+            Preconditions.checkState(queryOptions.getQuery_type() == TQueryType.LOAD);
             this.queryOptions.setIs_report_success(true);
             deltaUrls = Lists.newArrayList();
             loadCounters = Maps.newHashMap();
@@ -438,16 +440,26 @@ public class Coordinator {
                         tParam.query_options.setMem_limit(newmemory);
                     }
                 }
+                
+                boolean needCheckBackendState = false;
+                if (queryOptions.getQuery_type() == TQueryType.LOAD && profileFragmentId == 0) {
+                    // this is a load process, and it is the first fragment.
+                    // we should add all BackendExecState of this fragment to needCheckBackendExecStates,
+                    // so that we can check these backends' state when joining this Coordinator
+                    needCheckBackendState = true;
+                }
 
                 int instanceId = 0;
                 for (TExecPlanFragmentParams tParam : tParams) {
                     // TODO: pool of pre-formatted BackendExecStates?
-                    BackendExecState execState =
-                            new BackendExecState(fragment.getFragmentId(), instanceId++,
+                    BackendExecState execState = new BackendExecState(fragment.getFragmentId(), instanceId++,
                                     profileFragmentId, tParam, this.addressToBackendID);
                     backendExecStates.add(execState);
-                    backendExecStateMap.put(tParam.params.getFragment_instance_id(), execState);
-
+                    if (needCheckBackendState) {
+                        needCheckBackendExecStates.add(execState);
+                        LOG.debug("add need check backend {} for fragment, {} job: {}", execState.backend.getId(),
+                                fragment.getFragmentId().asInt(), jobId);
+                    }
                     futures.add(Pair.create(execState, execState.execRemoteFragmentAsync()));
 
                     backendId++;
@@ -476,7 +488,7 @@ public class Coordinator {
 
                     if (code != TStatusCode.OK) {
                         if (errMsg == null) {
-                            errMsg = "exec rpc error. backend id: " + pair.first.backendId;
+                            errMsg = "exec rpc error. backend id: " + pair.first.backend.getId();
                         }
                         queryStatus.setStatus(errMsg);
                         LOG.warn("exec plan fragment failed, errmsg={}, fragmentId={}, backend={}:{}",
@@ -484,13 +496,13 @@ public class Coordinator {
                                  pair.first.address.hostname, pair.first.address.port);
                         cancelInternal(PPlanFragmentCancelReason.INTERNAL_ERROR);
                         switch (code) {
-                            case TIMEOUT:
-                                throw new UserException("query timeout. backend id: " + pair.first.backendId);
-                            case THRIFT_RPC_ERROR:
-                                SimpleScheduler.updateBlacklistBackends(pair.first.backendId);
-                                throw new RpcException("rpc failed. backend id: " + pair.first.backendId);
-                            default:
-                                throw new UserException(errMsg);
+                        case TIMEOUT:
+                            throw new UserException("query timeout. backend id: " + pair.first.backend.getId());
+                        case THRIFT_RPC_ERROR:
+                            SimpleScheduler.updateBlacklistBackends(pair.first.backend.getId());
+                            throw new RpcException(pair.first.backend.getHost(), "rpc failed");
+                        default:
+                            throw new UserException(errMsg);
                         }
                     }
                 }
@@ -631,7 +643,7 @@ public class Coordinator {
                 copyStatus.rewriteErrorMsg();
             }
             if (copyStatus.isRpcError()) {
-                throw new RpcException(copyStatus.getErrorMsg());
+                throw new RpcException("unknown", copyStatus.getErrorMsg());
             } else {
                 String errMsg = copyStatus.getErrorMsg();
                 LOG.warn("query failed: {}", errMsg);
@@ -695,41 +707,7 @@ public class Coordinator {
 
     private void cancelRemoteFragmentsAsync(PPlanFragmentCancelReason cancelReason) {
         for (BackendExecState backendExecState : backendExecStates) {
-            TNetworkAddress address = backendExecState.getBackendAddress();
-            LOG.debug("cancelRemoteFragments initiated={} done={} hasCanceled={} ip={} port={} fragment instance id={}, reason: {}",
-                    backendExecState.initiated, backendExecState.done, backendExecState.hasCanceled,
-                    address.hostname, address.port, DebugUtil.printId(backendExecState.getFragmentInstanceId()),
-                    cancelReason.name());
-
-            backendExecState.lock();
-            try {
-                if (!backendExecState.initiated) {
-                    continue;
-                }
-                // don't cancel if it is already finished
-                if (backendExecState.done) {
-                    continue;
-                }
-                if (backendExecState.hasCanceled) {
-                    continue;
-                }
-                TNetworkAddress brpcAddress = toBrpcHost(address);
-
-                try {
-                    BackendServiceProxy.getInstance().cancelPlanFragmentAsync(
-                            brpcAddress, backendExecState.getFragmentInstanceId(), cancelReason);
-                } catch (RpcException e) {
-                    LOG.warn("cancel plan fragment get a exception, address={}:{}",
-                            brpcAddress.getHostname(), brpcAddress.getPort());
-                    SimpleScheduler.updateBlacklistBackends(addressToBackendID.get(brpcAddress));
-                }
-
-                backendExecState.hasCanceled = true;
-            } catch (Exception e) {
-                LOG.warn("catch a exception", e);
-            } finally {
-                backendExecState.unlock();
-            }
+            backendExecState.cancelFragmentInstance(cancelReason);
         }
     }
 
@@ -1136,27 +1114,15 @@ public class Coordinator {
             return;
         }
 
-        boolean done = false;
         BackendExecState execState = backendExecStates.get(params.backend_num);
-        execState.lock();
-        try {
-            if (execState.done) {
-                // duplicate packet
-                return;
-            }
-            if (params.isSetProfile()) {
-                execState.profile.update(params.profile);
-            }
-            done = params.done;
-            execState.done = params.done;
-        } finally {
-            execState.unlock();
+        if (!execState.updateProfile(params)) {
+            return;
         }
 
         // print fragment instance profile
         if (LOG.isDebugEnabled()) {
             StringBuilder builder = new StringBuilder();
-            execState.profile().prettyPrint(builder, "");
+            execState.printProfile(builder);
             LOG.debug("profile for query_id={} instance_id={}\n{}",
                     DebugUtil.printId(queryId),
                     DebugUtil.printId(params.getFragment_instance_id()),
@@ -1172,7 +1138,7 @@ public class Coordinator {
                     DebugUtil.printId(queryId), DebugUtil.printId(params.getFragment_instance_id()));
             updateStatus(status, params.getFragment_instance_id());
         }
-        if (done) {
+        if (execState.done) {
             if (params.isSetDelta_urls()) {
                 updateDeltas(params.getDelta_urls());
             }
@@ -1218,15 +1184,59 @@ public class Coordinator {
         }
     }
 
-    public boolean join(int seconds) {
-        try {
-            return profileDoneSignal.await(seconds, TimeUnit.SECONDS);
-        } catch (InterruptedException e) {
-            // Do nothing
+    /*
+     * Waiting the coordinator finish executing.
+     * return false if waiting timeout.
+     * return true otherwise.
+     * NOTICE: return true does not mean that coordinator executed success,
+     * the caller should check queryStatus for result.
+     * 
+     * We divide the entire waiting process into multiple rounds,
+     * with a maximum of 30 seconds per round. And after each round of waiting,
+     * check the status of the BE. If the BE status is abnormal, the wait is ended
+     * and the result is returned. Otherwise, continue to the next round of waiting.
+     * This method mainly avoids the problem that the Coordinator waits for a long time
+     * after some BE can no long return the result due to some exception, such as BE is down.
+     */
+    public boolean join(int timeoutS) {
+        final long fixedMaxWaitTime = 30;
+
+        long leftTimeoutS = timeoutS;
+        while (leftTimeoutS > 0) {
+            long waitTime = Math.min(leftTimeoutS, fixedMaxWaitTime);
+            boolean awaitRes = false;
+            try {
+                awaitRes = profileDoneSignal.await(waitTime, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                // Do nothing
+            }
+            if (awaitRes) {
+                return true;
+            }
+
+            if (!checkBackendState()) {
+                return true;
+            }
+
+            leftTimeoutS -= waitTime;
         }
         return false;
     }
 
+    /*
+     * Check the state of backends in needCheckBackendExecStates.
+     * return true if all of them are OK. Otherwise, return false.
+     */
+    private boolean checkBackendState() {
+        for (BackendExecState backendExecState : needCheckBackendExecStates) {
+            if (!backendExecState.isBackendStateHealthy()) {
+                queryStatus = new Status(TStatusCode.INTERNAL_ERROR, "backend " + backendExecState.backend.getId() + " is down");
+                return false;
+            }
+        }
+        return true;
+    }
+
     public boolean isDone() {
         return profileDoneSignal.getCount() == 0;
     }
@@ -1249,45 +1259,17 @@ public class Coordinator {
     // TODO(zhaochun): add profile information and others
     public class BackendExecState {
         TExecPlanFragmentParams rpcParams;
-        private PlanFragmentId fragmentId;
-        private int            instanceId;
-        private boolean initiated;
-        private boolean done;
-        private boolean hasCanceled;
-        private Lock lock = new ReentrantLock();
-        private int profileFragmentId;
+        PlanFragmentId fragmentId;
+        int instanceId;
+        boolean initiated;
+        boolean done;
+        boolean hasCanceled;
+        int profileFragmentId;
         RuntimeProfile profile;
         TNetworkAddress address;
-        Long backendId;
-
-        public int profileFragmentId() {
-            return profileFragmentId;
-        }
-
-        public boolean initiated() {
-            return initiated;
-        }
-
-        public RuntimeProfile profile() {
-            return profile;
-        }
-
-        public void lock() {
-            lock.lock();
-        }
-
-        public void unlock() {
-            lock.unlock();
-        }
-
-        public int getInstanceId() {
-            return instanceId;
-        }
-
-        public PlanFragmentId getFragmentId() {
-            return fragmentId;
-        }
-
+        Backend backend;
+        long lastMissingHeartbeatTime = -1;
+        
         public BackendExecState(PlanFragmentId fragmentId, int instanceId, int profileFragmentId,
             TExecPlanFragmentParams rpcParams, Map<TNetworkAddress, Long> addressToBackendID) {
             this.profileFragmentId = profileFragmentId;
@@ -1297,37 +1279,111 @@ public class Coordinator {
             this.initiated = false;
             this.done = false;
             this.address = fragmentExecParamsMap.get(fragmentId).instanceExecParams.get(instanceId).host;
-            this.backendId = addressToBackendID.get(address);
+            this.backend = idToBackend.get(addressToBackendID.get(address));
 
             String name = "Instance " + DebugUtil.printId(fragmentExecParamsMap.get(fragmentId)
                     .instanceExecParams.get(instanceId).instanceId) + " (host=" + address + ")";
             this.profile = new RuntimeProfile(name);
             this.hasCanceled = false;
+            this.lastMissingHeartbeatTime = backend.getLastMissingHeartbeatTime();
         }
 
-        public TNetworkAddress getBackendAddress() {
-            return address;
+        // update profile.
+        // return true if profile is updated. Otherwise, return false.
+        public synchronized boolean updateProfile(TReportExecStatusParams params) {
+            if (this.done) {
+                // duplicate packet
+                return false;
+            }
+            if (params.isSetProfile()) {
+                profile.update(params.profile);
+            }
+            this.done = params.done;
+            return true;
         }
 
-        public TUniqueId getFragmentInstanceId() {
-            return this.rpcParams.params.getFragment_instance_id();
+        public synchronized void printProfile(StringBuilder builder) {
+            this.profile.prettyPrint(builder, "");
+        }
+
+        // cancel the fragment instance.
+        // return true if cancel success. Otherwise, return false
+        public synchronized boolean cancelFragmentInstance(PPlanFragmentCancelReason cancelReason) {
+            LOG.debug("cancelRemoteFragments initiated={} done={} hasCanceled={} backend: {}, fragment instance id={}, reason: {}",
+                    this.initiated, this.done, this.hasCanceled, backend.getId(),
+                    DebugUtil.printId(fragmentInstanceId()), cancelReason.name());
+            try {
+                if (!this.initiated) {
+                    return false;
+                }
+                // don't cancel if it is already finished
+                if (this.done) {
+                    return false;
+                }
+                if (this.hasCanceled) {
+                    return false;
+                }
+                TNetworkAddress brpcAddress = toBrpcHost(address);
+
+                try {
+                    BackendServiceProxy.getInstance().cancelPlanFragmentAsync(brpcAddress,
+                            fragmentInstanceId(), cancelReason);
+                } catch (RpcException e) {
+                    LOG.warn("cancel plan fragment get a exception, address={}:{}", brpcAddress.getHostname(),
+                            brpcAddress.getPort());
+                    SimpleScheduler.updateBlacklistBackends(addressToBackendID.get(brpcAddress));
+                }
+
+                this.hasCanceled = true;
+            } catch (Exception e) {
+                LOG.warn("catch a exception", e);
+                return false;
+            }
+            return true;
+        }
+
+        public synchronized boolean computeTimeInProfile(int maxFragmentId) {
+            if (this.profileFragmentId < 0 || this.profileFragmentId > maxFragmentId) {
+                LOG.warn("profileFragmentId {} should be in [0, {})", profileFragmentId, maxFragmentId);
+                return false;
+            }
+            profile.computeTimeInProfile();
+            return true;
+        }
+
+        public boolean isBackendStateHealthy() {
+            if (backend.getLastMissingHeartbeatTime() > lastMissingHeartbeatTime) {
+                LOG.warn("backend {} is down while joining the coordinator. job id: {}", backend.getId(), jobId);
+                return false;
+            }
+            return true;
         }
 
         public Future<PExecPlanFragmentResult> execRemoteFragmentAsync() throws TException, RpcException {
             TNetworkAddress brpcAddress = null;
             try {
-                brpcAddress = toBrpcHost(address);
+                brpcAddress = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
             } catch (Exception e) {
                 throw new TException(e.getMessage());
             }
-            initiated = true;
+            this.initiated = true;
             try {
                 return BackendServiceProxy.getInstance().execPlanFragmentAsync(brpcAddress, rpcParams);
             } catch (RpcException e) {
-                SimpleScheduler.updateBlacklistBackends(backendId);
+                SimpleScheduler.updateBlacklistBackends(backend.getId());
                 throw e;
             }
         }
+
+        public FragmentInstanceInfo buildFragmentInstanceInfo() {
+            return new QueryStatisticsItem.FragmentInstanceInfo.Builder()
+                    .instanceId(fragmentInstanceId()).fragmentId(String.valueOf(fragmentId)).address(this.address)
+                    .build();
+        }
+
+        private TUniqueId fragmentInstanceId() {
+            return this.rpcParams.params.getFragment_instance_id();
+        }
     }
 
     // execution parameters for a single fragment,
@@ -1483,17 +1539,11 @@ public class Coordinator {
         final List<QueryStatisticsItem.FragmentInstanceInfo> result =
                 Lists.newArrayList();
         for (int index = 0; index < fragments.size(); index++) {
-            for (Map.Entry<TUniqueId, BackendExecState> entry: backendExecStateMap.entrySet()) {
-                final BackendExecState backendExecState = entry.getValue();
-                if (fragments.get(index).getFragmentId() != backendExecState.getFragmentId()) {
+            for (BackendExecState backendExecState: backendExecStates) {
+                if (fragments.get(index).getFragmentId() != backendExecState.fragmentId) {
                     continue;
                 }
-                final QueryStatisticsItem.FragmentInstanceInfo info
-                        = new QueryStatisticsItem.FragmentInstanceInfo.Builder()
-                        .instanceId(entry.getValue().getFragmentInstanceId())
-                        .fragmentId(String.valueOf(index))
-                        .address(backendExecState.getBackendAddress())
-                        .build();
+                final QueryStatisticsItem.FragmentInstanceInfo info = backendExecState.buildFragmentInstanceInfo();
                 result.add(info);
             }
         }
@@ -1501,20 +1551,11 @@ public class Coordinator {
     }
 
     private void attachInstanceProfileToFragmentProfile() {
-        for (int i = 0; i < backendExecStates.size(); ++i) {
-            if (backendExecStates.get(i) == null) {
-                continue;
-            }
-            BackendExecState backendExecState = backendExecStates.get(i);
-            backendExecState.profile().computeTimeInProfile();
-
-            int profileFragmentId = backendExecState.profileFragmentId();
-            if (profileFragmentId < 0 || profileFragmentId > fragmentProfile.size()) {
-                LOG.error("profileFragmentId " + profileFragmentId
-                        + " should be in [0," + fragmentProfile.size() + ")");
+        for (BackendExecState backendExecState : backendExecStates) {
+            if (!backendExecState.computeTimeInProfile(fragmentProfile.size())) {
                 return;
             }
-            fragmentProfile.get(profileFragmentId).addChild(backendExecState.profile());
+            fragmentProfile.get(backendExecState.profileFragmentId).addChild(backendExecState.profile);
         }
     }
 }
diff --git a/fe/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index 18dc80e..2a45c6d 100644
--- a/fe/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -20,9 +20,9 @@ package org.apache.doris.rpc;
 import org.apache.doris.common.Config;
 import org.apache.doris.proto.PCancelPlanFragmentRequest;
 import org.apache.doris.proto.PCancelPlanFragmentResult;
-import org.apache.doris.proto.PPlanFragmentCancelReason;
 import org.apache.doris.proto.PExecPlanFragmentResult;
 import org.apache.doris.proto.PFetchDataResult;
+import org.apache.doris.proto.PPlanFragmentCancelReason;
 import org.apache.doris.proto.PProxyRequest;
 import org.apache.doris.proto.PProxyResult;
 import org.apache.doris.proto.PTriggerProfileReportResult;
@@ -102,12 +102,12 @@ public class BackendServiceProxy {
             } catch (NoSuchElementException noSuchElementException) {
                 LOG.warn("Execute plan fragment retry failed, address={}:{}",
                         address.getHostname(), address.getPort(), noSuchElementException);
-                throw new RpcException(e.getMessage());               
+                throw new RpcException(address.hostname, e.getMessage());
             }
         } catch (Throwable e) {
             LOG.warn("Execute plan fragment catch a exception, address={}:{}",
                     address.getHostname(), address.getPort(), e);
-            throw new RpcException(e.getMessage());
+            throw new RpcException(address.hostname, e.getMessage());
         }
     }
 
@@ -135,12 +135,12 @@ public class BackendServiceProxy {
             } catch (NoSuchElementException noSuchElementException) {
                 LOG.warn("Cancel plan fragment retry failed, address={}:{}",
                         address.getHostname(), address.getPort(), noSuchElementException);
-                throw new RpcException(e.getMessage());            
+                throw new RpcException(address.hostname, e.getMessage());
             }
         } catch (Throwable e) {
             LOG.warn("Cancel plan fragment catch a exception, address={}:{}",
                     address.getHostname(), address.getPort(), e);
-            throw new RpcException(e.getMessage());
+            throw new RpcException(address.hostname, e.getMessage());
         }
     }
 
@@ -152,7 +152,7 @@ public class BackendServiceProxy {
         } catch (Throwable e) {
             LOG.warn("fetch data catch a exception, address={}:{}",
                     address.getHostname(), address.getPort(), e);
-            throw new RpcException(e.getMessage());
+            throw new RpcException(address.hostname, e.getMessage());
         }
     }
 
@@ -164,7 +164,7 @@ public class BackendServiceProxy {
         } catch (Throwable e) {
             LOG.warn("fetch data catch a exception, address={}:{}",
                     address.getHostname(), address.getPort(), e);
-            throw new RpcException(e.getMessage());
+            throw new RpcException(address.hostname, e.getMessage());
         }
     }
 
@@ -175,7 +175,7 @@ public class BackendServiceProxy {
             return service.getInfo(request);
         } catch (Throwable e) {
             LOG.warn("failed to get info, address={}:{}", address.getHostname(), address.getPort(), e);
-            throw new RpcException(e.getMessage());
+            throw new RpcException(address.hostname, e.getMessage());
         }
     }
 }
diff --git a/fe/src/main/java/org/apache/doris/rpc/RpcException.java b/fe/src/main/java/org/apache/doris/rpc/RpcException.java
index 5166dde..8648b1c 100644
--- a/fe/src/main/java/org/apache/doris/rpc/RpcException.java
+++ b/fe/src/main/java/org/apache/doris/rpc/RpcException.java
@@ -18,22 +18,8 @@
 package org.apache.doris.rpc;
 
 public class RpcException extends Exception {
-    public RpcException() {
-    }
-
-    public RpcException(String message) {
-        super(message);
-    }
-
-    public RpcException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public RpcException(Throwable cause) {
-        super(cause);
-    }
 
-    public RpcException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
-        super(message, cause, enableSuppression, writableStackTrace);
+    public RpcException(String host, String message) {
+        super(message + ", host: " + host);
     }
 }
diff --git a/fe/src/main/java/org/apache/doris/system/Backend.java b/fe/src/main/java/org/apache/doris/system/Backend.java
index 961a5cb..e86530c 100644
--- a/fe/src/main/java/org/apache/doris/system/Backend.java
+++ b/fe/src/main/java/org/apache/doris/system/Backend.java
@@ -88,6 +88,8 @@ public class Backend implements Writable {
     // after init it, this variable is set to true.
     private boolean initPathInfo = false;
 
+    long lastMissingHeartbeatTime = -1;
+
     public Backend() {
         this.host = "";
         this.lastUpdateMs = new AtomicLong();
@@ -235,6 +237,10 @@ public class Backend implements Writable {
         this.lastStartTime.set(currentTime);
     }
 
+    public long getLastMissingHeartbeatTime() {
+        return lastMissingHeartbeatTime;
+    }
+
     public boolean isAlive() {
         return this.isAlive.get();
     }
@@ -426,8 +432,6 @@ public class Backend implements Writable {
             // log disk changing
             Catalog.getInstance().getEditLog().logBackendStateChange(this);
         }
-        
-
     }
 
     public static Backend read(DataInput in) throws IOException {
@@ -607,6 +611,7 @@ public class Backend implements Writable {
             }
 
             heartbeatErrMsg = hbResponse.getMsg() == null ? "Unknown error" : hbResponse.getMsg();
+            lastMissingHeartbeatTime = System.currentTimeMillis();
         }
 
         return isChanged;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org