You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by jn...@apache.org on 2014/11/12 01:05:05 UTC

[2/2] incubator-drill git commit: DRILL-1592: Detect drillbit failure and cancel the affected running queries.

DRILL-1592: Detect drillbit failure and cancel the affected running queries.

Revise code based on review comments.

Revise code based on comments: only track the newly dead drillbits.

Remove some debug msg.

Minor code change based on review comments.

clean up.

add comment. Clear listeners when drillbit is unregistered from CC.

Minor code change from review comments.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/c305c794
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/c305c794
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/c305c794

Branch: refs/heads/master
Commit: c305c794a21d8d657897cbbf85c66fc0d1adc024
Parents: 73f9827
Author: Jinfeng Ni <jn...@maprtech.com>
Authored: Tue Oct 14 15:26:23 2014 -0700
Committer: Jinfeng Ni <jn...@maprtech.com>
Committed: Tue Nov 11 14:30:53 2014 -0800

----------------------------------------------------------------------
 .../drill/exec/coord/ClusterCoordinator.java    | 38 ++++++++++++
 .../exec/coord/zk/ZKClusterCoordinator.java     | 35 ++++++++++-
 .../apache/drill/exec/ops/FragmentContext.java  |  2 +
 .../org/apache/drill/exec/ops/QueryContext.java |  4 ++
 .../work/foreman/DrillbitStatusListener.java    | 44 ++++++++++++++
 .../apache/drill/exec/work/foreman/Foreman.java |  5 +-
 .../drill/exec/work/foreman/QueryManager.java   | 61 +++++++++++++++++---
 .../exec/work/fragment/FragmentExecutor.java    | 28 ++++++++-
 8 files changed, 204 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c305c794/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java
index 508a5b2..be2f3b1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java
@@ -19,8 +19,11 @@ package org.apache.drill.exec.coord;
 
 import java.io.Closeable;
 import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.work.foreman.DrillbitStatusListener;
 
 /**
  * Pluggable interface built to manage cluster coordination. Allows Drillbit or DrillClient to register its capabilities
@@ -29,6 +32,9 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 public abstract class ClusterCoordinator implements Closeable {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ClusterCoordinator.class);
 
+  protected ConcurrentHashMap<DrillbitStatusListener, DrillbitStatusListener> listeners = new ConcurrentHashMap<>(
+      16, 0.75f, 16);
+
   /**
    * Start the cluster coordinator.  Millis to wait is
    * @param millisToWait The maximum time to wait before throwing an exception if the cluster coordination service has not successfully started.  Use 0 to wait indefinitely.
@@ -53,5 +59,37 @@ public abstract class ClusterCoordinator implements Closeable {
 
   public abstract DistributedSemaphore getSemaphore(String name, int maximumLeases);
 
+  /**
+   * Actions to take when there are a set of new de-active drillbits.
+   * @param unregisteredBits
+   */
+  public void drillbitUnregistered(Set<DrillbitEndpoint> unregisteredBits) {
+    for (DrillbitStatusListener listener : listeners.keySet()) {
+      listener.drillbitUnregistered(unregisteredBits);
+    }
+  }
+
+  public void drillbitRegistered(Set<DrillbitEndpoint> registeredBits) {
+    for (DrillbitStatusListener listener : listeners.keySet()) {
+      listener.drillbitRegistered(registeredBits);
+    }
+  }
+
+  /**
+   * Register a DrillbitStatusListener.
+   * Note : the listeners are not guaranteed to be called in the order in which they call this method, since all the listeners are in a ConcurrentHashMap.
+   * @param listener
+   */
+  public void addDrillbitStatusListener(DrillbitStatusListener listener) {
+    listeners.putIfAbsent(listener, listener);
+  }
+
+  /**
+   * Unregister a DrillbitStatusListener.
+   * @param listener
+   */
+  public void removeDrillbitStatusListener(DrillbitStatusListener listener) {
+    listeners.remove(listener);
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c305c794/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java
index 7f538d2..dab6318 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java
@@ -23,6 +23,9 @@ import static com.google.common.collect.Collections2.transform;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
@@ -47,6 +50,7 @@ import org.apache.drill.exec.coord.DrillServiceInstanceHelper;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
 import com.google.common.base.Function;
+import org.apache.drill.exec.work.foreman.DrillbitStatusListener;
 
 /**
  * Manages cluster coordination utilizing zookeeper. *
@@ -63,6 +67,8 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
 
   private static final Pattern ZK_COMPLEX_STRING = Pattern.compile("(^.*?)/(.*)/([^/]*)$");
 
+
+
   public ZKClusterCoordinator(DrillConfig config) throws IOException{
     this(config, null);
   }
@@ -172,6 +178,9 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
       throw new UnsupportedOperationException("Unknown handle type: " + handle.getClass().getName());
     }
 
+    // when Drillbit is unregistered, clean all the listeners registered in CC.
+    this.listeners.clear();
+
     ZKRegistrationHandle h = (ZKRegistrationHandle) handle;
     try {
       ServiceInstance<DrillbitEndpoint> serviceInstance = ServiceInstance.<DrillbitEndpoint>builder()
@@ -197,15 +206,39 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
     return new ZkDistributedSemaphore(curator, "/semaphore/" + name, maximumLeases);
   }
 
+
   private void updateEndpoints() {
     try {
-      endpoints = transform(discovery.queryForInstances(serviceName),
+      Collection<DrillbitEndpoint> newDrillbitSet =
+      transform(discovery.queryForInstances(serviceName),
         new Function<ServiceInstance<DrillbitEndpoint>, DrillbitEndpoint>() {
           @Override
           public DrillbitEndpoint apply(ServiceInstance<DrillbitEndpoint> input) {
             return input.getPayload();
           }
         });
+
+      // set of newly dead bits : original bits - new set of active bits.
+      Set<DrillbitEndpoint> unregisteredBits = new HashSet<>(endpoints);
+      unregisteredBits.removeAll(newDrillbitSet);
+
+      endpoints = newDrillbitSet;
+
+      if (logger.isDebugEnabled()) {
+        StringBuilder builder = new StringBuilder();
+        builder.append("# of active drillbits : " + newDrillbitSet.size() + "");
+        builder.append("Active drillbits : ");
+        for (DrillbitEndpoint bit: newDrillbitSet) {
+          builder.append(bit.toString() + "\t");
+        }
+        logger.debug("Active drillbits set changed: {}", builder.toString());
+      }
+
+      // Notify the drillbit listener for newly unregistered bits. For now, we only care when drillbits are down / unregistered.
+      if (! (unregisteredBits.isEmpty()) ) {
+        drillbitUnregistered(unregisteredBits);
+      }
+
     } catch (Exception e) {
       logger.error("Failure while update Drillbit service location cache.", e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c305c794/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 9b78c1d..e7beb40 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -171,6 +171,8 @@ public class FragmentContext implements Closeable {
     return this.rootFragmentTimeZone;
   }
 
+  public DrillbitEndpoint getForemanDrillbitEndPoint() {return fragment.getForeman();}
+
   /**
    * The FragmentHandle for this Fragment
    * @return FragmentHandle

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c305c794/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index ea48b05..c881432 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -23,6 +23,7 @@ import net.hydromatic.optiq.SchemaPlus;
 import net.hydromatic.optiq.jdbc.SimpleOptiqSchema;
 
 import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
@@ -139,4 +140,7 @@ public class QueryContext{
     return table;
   }
 
+  public ClusterCoordinator getClusterCoordinator() {
+    return drillbitContext.getClusterCoordinator();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c305c794/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/DrillbitStatusListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/DrillbitStatusListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/DrillbitStatusListener.java
new file mode 100644
index 0000000..ca52f0c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/DrillbitStatusListener.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman;
+
+
+import org.apache.drill.exec.proto.CoordinationProtos;
+
+import java.util.Set;
+
+/**
+ * Interface to define the listener to take actions when the set of active drillbits is changed.
+ */
+public interface DrillbitStatusListener {
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillbitStatusListener.class);
+
+  /**
+   * The action to taken when a set of drillbits are unregistered from the cluster.
+   * @param  unregisteredDrillbits the set of newly unregistered drillbits.
+   */
+  public void drillbitUnregistered(Set<CoordinationProtos.DrillbitEndpoint> unregisteredDrillbits);
+
+  /**
+   * The action to taken when a set of new drillbits are registered to the cluster.
+   * @param  registeredDrillbits the set of newly registered drillbits. Note: the complete set of currently registered bits could be different.
+   */
+  public void drillbitRegistered(Set<CoordinationProtos.DrillbitEndpoint> registeredDrillbits);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c305c794/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 0979f34..4d56173 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -158,12 +158,10 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
     if (isFinished()) {
       return;
     }
+    state.updateState(QueryState.RUNNING, QueryState.CANCELED);
 
     // cancel remote fragments.
     fragmentManager.cancel();
-
-    QueryResult result = QueryResult.newBuilder().setQueryState(QueryState.CANCELED).setIsLastChunk(true).setQueryId(queryId).build();
-    cleanupAndSendResult(result);
   }
 
   void cleanupAndSendResult(QueryResult result) {
@@ -361,6 +359,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
           queryId, context.getActiveEndpoints(), context.getPlanReader(), rootFragment, planningSet, initiatingClient.getSession());
 
       this.context.getWorkBus().setFragmentStatusListener(work.getRootFragment().getHandle().getQueryId(), fragmentManager);
+      this.context.getClusterCoordinator().addDrillbitStatusListener(fragmentManager);
 
       int totalFragments = 1 + work.getFragments().size();;
       fragmentManager.getStatus().setTotalFragments(totalFragments);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c305c794/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index b200edc..0f007ee 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -23,18 +23,24 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.drill.common.exceptions.DrillException;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
 import org.apache.drill.exec.proto.BitControl.InitializeFragments;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.proto.CoordinationProtos;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
@@ -46,6 +52,7 @@ import org.apache.drill.exec.rpc.control.WorkEventBus;
 import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
 import org.apache.drill.exec.store.sys.PStoreProvider;
 import org.apache.drill.exec.work.EndpointListener;
+import org.apache.drill.exec.work.ErrorHelper;
 import org.apache.drill.exec.work.WorkManager.WorkerBee;
 import org.apache.drill.exec.work.batch.IncomingBuffers;
 import org.apache.drill.exec.work.foreman.Foreman.ForemanManagerListener;
@@ -59,7 +66,7 @@ import com.google.common.collect.Multimap;
 /**
  * Each Foreman holds its own fragment manager.  This manages the events associated with execution of a particular query across all fragments.
  */
-public class QueryManager implements FragmentStatusListener{
+public class QueryManager implements FragmentStatusListener, DrillbitStatusListener{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryManager.class);
 
   private final QueryStatus status;
@@ -68,6 +75,7 @@ public class QueryManager implements FragmentStatusListener{
   private AtomicInteger remainingFragmentCount;
   private AtomicInteger failedFragmentCount;
   private WorkEventBus workBus;
+  private ClusterCoordinator coord;
   private QueryId queryId;
   private FragmentExecutor rootRunner;
   private RunQuery query;
@@ -100,6 +108,7 @@ public class QueryManager implements FragmentStatusListener{
     remainingFragmentCount.set(nonRootFragments.size() + 1);
     assert queryId == rootFragment.getHandle().getQueryId();
     workBus = bee.getContext().getWorkBus();
+    coord = bee.getContext().getClusterCoordinator();
 
     // set up the root fragment first so we'll have incoming buffers available.
     {
@@ -163,6 +172,8 @@ public class QueryManager implements FragmentStatusListener{
     running = true;
     if (cancelled && !stopped) {
       stopQuery();
+      QueryResult result = QueryResult.newBuilder().setQueryId(queryId).setQueryState(QueryState.CANCELED).setIsLastChunk(true).build();
+      foremanManagerListener.cleanupAndSendResult(result);
     }
   }
 
@@ -178,6 +189,29 @@ public class QueryManager implements FragmentStatusListener{
     controller.getTunnel(assignment).sendFragments(listener, initFrags);
   }
 
+  @Override
+  public void drillbitRegistered(Set<DrillbitEndpoint> registeredDrillbits) {
+  }
+
+  @Override
+  public void drillbitUnregistered(Set<DrillbitEndpoint> unregisteredDrillbits) {
+    List<FragmentData> fragments = status.getFragmentData();
+
+    for (FragmentData fragment : fragments) {
+      if (unregisteredDrillbits.contains(fragment.getEndpoint())) {
+        logger.warn("Drillbit {} for major{}:minor{} is not responding. Stop query {}",
+            fragment.getEndpoint(),
+            fragment.getHandle().getMajorFragmentId(),
+            fragment.getHandle().getMinorFragmentId(),
+            fragment.getHandle().getQueryId());
+
+        UserBitShared.DrillPBError error = ErrorHelper.logAndConvertError(fragment.getEndpoint(), "Failure while running fragment.",
+            new DrillRuntimeException(String.format("Drillbit %s not responding", fragment.getEndpoint())), logger);
+        failWithError(error);
+        break;
+      }
+    }
+  }
 
   @Override
   public void statusUpdate(FragmentStatus status) {
@@ -218,26 +252,29 @@ public class QueryManager implements FragmentStatusListener{
       this.status.setEndTime(System.currentTimeMillis());
       foremanManagerListener.cleanupAndSendResult(result);
       workBus.removeFragmentStatusListener(queryId);
+      coord.removeDrillbitStatusListener(this);
     }
     this.status.incrementFinishedFragments();
     updateFragmentStatus(status);
   }
 
   private void fail(FragmentStatus status){
-    logger.warn("Fragment faild : {}", status);
     updateFragmentStatus(status);
     int failed = this.failedFragmentCount.incrementAndGet();
     if (failed == 1) { // only first failed fragment need notify foreman (?)
-      stopQuery();
-      QueryResult result = QueryResult.newBuilder().setQueryId(queryId).setQueryState(QueryState.FAILED).addError(status.getProfile().getError()).build();
-      foremanManagerListener.cleanupAndSendResult(result);
-      this.status.setEndTime(System.currentTimeMillis());
+      failWithError(status.getProfile().getError());
     }
   }
 
+  private void failWithError(UserBitShared.DrillPBError error) {
+    stopQuery();
+    QueryResult result = QueryResult.newBuilder().setQueryId(queryId).setQueryState(QueryState.FAILED).addError(error).setIsLastChunk(true).build();
+    this.status.setEndTime(System.currentTimeMillis());
+    foremanManagerListener.cleanupAndSendResult(result);
+  }
+
 
   private void stopQuery(){
-    workBus.removeFragmentStatusListener(queryId);
     // Stop all queries with a currently active status.
     List<FragmentData> fragments = status.getFragmentData();
     Collections.sort(fragments, new Comparator<FragmentData>() {
@@ -262,6 +299,11 @@ public class QueryManager implements FragmentStatusListener{
         break;
       }
     }
+
+    workBus.removeFragmentStatusListener(queryId);
+    coord.removeDrillbitStatusListener(this);
+
+    stopped = true;
   }
 
   public void cancel(){
@@ -269,6 +311,8 @@ public class QueryManager implements FragmentStatusListener{
     if (running) {
       stopQuery();
       stopped = true;
+      QueryResult result = QueryResult.newBuilder().setQueryId(queryId).setQueryState(QueryState.CANCELED).setIsLastChunk(true).build();
+      foremanManagerListener.cleanupAndSendResult(result);
     }
   }
 
@@ -316,7 +360,8 @@ public class QueryManager implements FragmentStatusListener{
     @Override
     public void failed(RpcException ex) {
       logger.debug("Failure while sending fragment.  Stopping query.", ex);
-      stopQuery();
+      UserBitShared.DrillPBError error = ErrorHelper.logAndConvertError(endpoint, "Failure while sending fragment.", ex, logger);
+      failWithError(error);
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c305c794/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 37074d8..9f08e97 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.work.fragment;
 
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -24,7 +25,9 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.physical.impl.ImplCreator;
 import org.apache.drill.exec.physical.impl.RootExec;
+import org.apache.drill.exec.planner.fragment.Fragment;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
+import org.apache.drill.exec.proto.CoordinationProtos;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.UserBitShared.FragmentState;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
@@ -32,6 +35,7 @@ import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
 import org.apache.drill.exec.work.CancelableQuery;
 import org.apache.drill.exec.work.StatusProvider;
 import org.apache.drill.exec.work.WorkManager.WorkerBee;
+import org.apache.drill.exec.work.foreman.DrillbitStatusListener;
 
 /**
  * Responsible for running a single fragment on a single Drillbit. Listens/responds to status request and cancellation
@@ -48,6 +52,7 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
   private final StatusReporter listener;
   private Thread executionThread;
   private AtomicBoolean closed = new AtomicBoolean(false);
+  private final DrillbitStatusListener drillbitStatusListener = new FragmentDrillbitStatusListener();
 
   public FragmentExecutor(FragmentContext context, WorkerBee bee, FragmentRoot rootOperator, StatusReporter listener) {
     this.context = context;
@@ -66,6 +71,7 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
     updateState(FragmentState.CANCELLED);
     logger.debug("Cancelled Fragment {}", context.getHandle());
     context.cancel();
+
     if (executionThread != null) {
       executionThread.interrupt();
     }
@@ -93,6 +99,8 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
 
       root = ImplCreator.getExec(context, rootOperator);
 
+      context.getDrillbitContext().getClusterCoordinator().addDrillbitStatusListener(drillbitStatusListener);
+
       logger.debug("Starting fragment runner. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId());
       if (!updateState(FragmentState.AWAITING_ALLOCATION, FragmentState.RUNNING, false)) {
         internalFail(new RuntimeException(String.format("Run was called when fragment was in %s state.  FragmentRunnables should only be started when they are currently in awaiting allocation state.", FragmentState.valueOf(state.get()))));
@@ -120,8 +128,8 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
       internalFail(e);
     } finally {
       bee.removeFragment(context.getHandle());
+      context.getDrillbitContext().getClusterCoordinator().removeDrillbitStatusListener(drillbitStatusListener);
 
-      logger.debug("Fragment runner complete. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId());
       Thread.currentThread().setName(originalThread);
     }
   }
@@ -183,4 +191,22 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
     return context;
   }
 
+  private class FragmentDrillbitStatusListener implements DrillbitStatusListener {
+
+    @Override
+    public void drillbitRegistered(Set<CoordinationProtos.DrillbitEndpoint> registeredDrillbits) {
+      // Do nothing.
+    }
+
+    @Override
+    public void drillbitUnregistered(Set<CoordinationProtos.DrillbitEndpoint> unregisteredDrillbits) {
+      if (unregisteredDrillbits.contains(FragmentExecutor.this.context.getForemanDrillbitEndPoint())) {
+        logger.warn("Forman : {} seems not responding or not work properly. Cancel this fragment {}:{}",
+            FragmentExecutor.this.context.getForemanDrillbitEndPoint(),
+            FragmentExecutor.this.context.getHandle().getMajorFragmentId(),
+            FragmentExecutor.this.context.getHandle().getMinorFragmentId());
+        FragmentExecutor.this.cancel();
+      }
+    }
+  }
 }