You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2017/11/29 11:15:40 UTC

[2/5] drill git commit: DRILL-4286: Graceful shutdown of drillbit

DRILL-4286: Graceful shutdown of drillbit

closes #921


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/5f044f2a
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/5f044f2a
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/5f044f2a

Branch: refs/heads/master
Commit: 5f044f2a6d0cd34a3d4107ece4c0637469f89b40
Parents: d3f8da2
Author: dvjyothsna <jy...@Skatkam-598.local>
Authored: Thu Aug 24 08:58:00 2017 -0700
Committer: Arina Ielchiieva <ar...@gmail.com>
Committed: Wed Nov 29 12:22:00 2017 +0200

----------------------------------------------------------------------
 distribution/src/resources/drillbit.sh          |  25 +-
 .../org/apache/drill/exec/ExecConstants.java    |  13 +
 .../apache/drill/exec/client/DrillClient.java   |  11 +-
 .../drill/exec/coord/ClusterCoordinator.java    |  24 +
 .../coord/local/LocalClusterCoordinator.java    |  56 ++-
 .../exec/coord/zk/ZKClusterCoordinator.java     |  83 +++-
 .../exec/coord/zk/ZKRegistrationHandle.java     |  14 +-
 .../org/apache/drill/exec/ops/QueryContext.java |   4 +
 .../org/apache/drill/exec/server/Drillbit.java  |  69 ++-
 .../drill/exec/server/DrillbitContext.java      |  31 +-
 .../drill/exec/server/DrillbitStateManager.java |  80 ++++
 .../drill/exec/server/rest/DrillRestServer.java |   4 +-
 .../drill/exec/server/rest/DrillRoot.java       | 479 ++++++++++++-------
 .../drill/exec/server/rest/WebServer.java       |  10 +-
 .../drill/exec/service/ServiceEngine.java       |   2 +
 .../drill/exec/store/sys/DrillbitIterator.java  |  18 +-
 .../org/apache/drill/exec/work/WorkManager.java |  43 +-
 .../apache/drill/exec/work/foreman/Foreman.java |  14 +-
 .../src/main/resources/drill-module.conf        |  10 +
 .../java-exec/src/main/resources/rest/index.ftl | 110 ++++-
 .../work/metadata/TestMetadataProvider.java     |   2 +-
 .../org/apache/drill/test/ClusterFixture.java   |  19 +
 .../apache/drill/test/TestGracefulShutdown.java | 250 ++++++++++
 .../drill/exec/proto/CoordinationProtos.java    | 214 ++++++++-
 .../exec/proto/SchemaCoordinationProtos.java    |   7 +
 .../exec/proto/beans/DrillbitEndpoint.java      |  54 +++
 protocol/src/main/protobuf/Coordination.proto   |   7 +
 27 files changed, 1416 insertions(+), 237 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/distribution/src/resources/drillbit.sh
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drillbit.sh b/distribution/src/resources/drillbit.sh
index de7f21a..49b92ed 100755
--- a/distribution/src/resources/drillbit.sh
+++ b/distribution/src/resources/drillbit.sh
@@ -92,15 +92,18 @@ waitForProcessEnd()
 {
   pidKilled=$1
   commandName=$2
+  kill_drillbit=$3
   processedAt=`date +%s`
   origcnt=${DRILL_STOP_TIMEOUT:-120}
   while kill -0 $pidKilled > /dev/null 2>&1;
    do
      echo -n "."
      sleep 1;
-     # if process persists more than $DRILL_STOP_TIMEOUT (default 120 sec) no mercy
-     if [ $(( `date +%s` - $processedAt )) -gt $origcnt ]; then
-       break;
+     if [ "$kill_drillbit" = true ] ; then
+        # if process persists more than $DRILL_STOP_TIMEOUT (default 120 sec) no mercy
+        if [ $(( `date +%s` - $processedAt )) -gt $origcnt ]; then
+          break;
+        fi
      fi
   done
   echo
@@ -155,6 +158,7 @@ start_bit ( )
 
 stop_bit ( )
 {
+  kill_drillbit=$1
   if [ -f $pid ]; then
     pidToKill=`cat $pid`
     # kill -0 == see if the PID exists
@@ -162,7 +166,7 @@ stop_bit ( )
       echo "Stopping $command"
       echo "`date` Terminating $command pid $pidToKill" >> "$DRILLBIT_LOG_PATH"
       kill $pidToKill > /dev/null 2>&1
-      waitForProcessEnd $pidToKill $command
+      waitForProcessEnd $pidToKill $command $kill_drillbit
       retval=0
     else
       retval=$?
@@ -199,7 +203,18 @@ case $startStopStatus in
   ;;
 
 (stop)
-  stop_bit
+  kill_drillbit=true
+  stop_bit $kill_drillbit
+  exit $?
+  ;;
+
+# Shutdown the drillbit gracefully without disrupting the in-flight queries.
+# In this case, if there are any long running queries the drillbit will take a
+# little longer to shutdown. Incase if the user wishes to shutdown immediately
+# they can issue stop instead of graceful_stop.
+(graceful_stop)
+  kill_drillbit=false
+  stop_bit $kill_drillbit
   exit $?
   ;;
 

http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 89b4b48..52aa52d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -602,4 +602,17 @@ public final class ExecConstants {
   public static String bootDefaultFor(String name) {
     return OPTION_DEFAULTS_ROOT + name;
 }
+  /**
+   * Boot-time config option provided to modify duration of the grace period.
+   * Grace period is the amount of time where the drillbit accepts work after
+   * the shutdown request is triggered. The primary use of grace period is to
+   * avoid the race conditions caused by zookeeper delay in updating the state
+   * information of the drillbit that is shutting down. So, it is advisable
+   * to have a grace period that is atleast twice the amount of zookeeper
+   * refresh time.
+   */
+  public static final String GRACE_PERIOD = "drill.exec.grace_period_ms";
+
+  public static final String DRILL_PORT_HUNT = "drill.exec.port_hunt";
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 84b34a7..248058f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -329,7 +329,10 @@ public class DrillClient implements Closeable, ConnectionThrottle {
           throw new RpcException("Failure setting up ZK for client.", e);
         }
       }
-      endpoints.addAll(clusterCoordinator.getAvailableEndpoints());
+      // Gets the drillbit endpoints that are ONLINE and excludes the drillbits that are
+      // in QUIESCENT state. This avoids the clients connecting to drillbits that are
+      // shutting down thereby avoiding reducing the chances of query failures.
+      endpoints.addAll(clusterCoordinator.getOnlineEndPoints());
       // Make sure we have at least one endpoint in the list
       checkState(!endpoints.isEmpty(), "No active Drillbit endpoint found from ZooKeeper. Check connection parameters?");
     }
@@ -418,7 +421,10 @@ public class DrillClient implements Closeable, ConnectionThrottle {
       retry--;
       try {
         Thread.sleep(this.reconnectDelay);
-        final ArrayList<DrillbitEndpoint> endpoints = new ArrayList<>(clusterCoordinator.getAvailableEndpoints());
+        // Gets the drillbit endpoints that are ONLINE and excludes the drillbits that are
+        // in QUIESCENT state. This avoids the clients connecting to drillbits that are
+        // shutting down thereby reducing the chances of query failures.
+        final ArrayList<DrillbitEndpoint> endpoints = new ArrayList<>(clusterCoordinator.getOnlineEndPoints());
         if (endpoints.isEmpty()) {
           continue;
         }
@@ -434,6 +440,7 @@ public class DrillClient implements Closeable, ConnectionThrottle {
 
   private void connect(DrillbitEndpoint endpoint) throws RpcException {
     client.connect(endpoint, properties, getUserCredentials());
+    logger.info("Foreman drillbit is" + endpoint.getAddress());
   }
 
   public BufferAllocator getAllocator() {

http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java
index e758d6f..32b1633 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.drill.exec.coord.store.TransientStore;
 import org.apache.drill.exec.coord.store.TransientStoreConfig;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State;
 import org.apache.drill.exec.work.foreman.DrillbitStatusListener;
 
 /**
@@ -60,7 +61,26 @@ public abstract class ClusterCoordinator implements AutoCloseable {
    */
   public abstract Collection<DrillbitEndpoint> getAvailableEndpoints();
 
+  /**
+   * Get a collection of ONLINE drillbit endpoints by excluding the drillbits
+   * that are in QUIESCENT state (drillbits that are shutting down). Primarily used by the planner
+   * to plan queries only on ONLINE drillbits and used by the client during initial connection
+   * phase to connect to a drillbit (foreman)
+   * @return A collection of ONLINE endpoints
+   */
+
+  public abstract Collection<DrillbitEndpoint> getOnlineEndPoints();
+
+  public abstract RegistrationHandle update(RegistrationHandle handle, State state);
+
   public interface RegistrationHandle {
+    /**
+     * Get the drillbit endpoint associated with the registration handle
+     * @return drillbit endpoint
+     */
+    public abstract DrillbitEndpoint getEndPoint();
+
+    public abstract void setEndPoint(DrillbitEndpoint endpoint);
   }
 
   public abstract DistributedSemaphore getSemaphore(String name, int maximumLeases);
@@ -108,4 +128,8 @@ public abstract class ClusterCoordinator implements AutoCloseable {
     listeners.remove(listener);
   }
 
+  public boolean isDrillbitInState(DrillbitEndpoint endpoint, DrillbitEndpoint.State state) {
+    return (!endpoint.hasState() || endpoint.getState().equals(state));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java
index 8c13c42..86bc606 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.coord.local;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Map;
 import java.util.UUID;
@@ -33,6 +34,7 @@ import org.apache.drill.exec.coord.store.TransientStore;
 import org.apache.drill.exec.coord.store.TransientStoreConfig;
 import org.apache.drill.exec.coord.store.TransientStoreFactory;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State;
 
 import com.google.common.collect.Maps;
 
@@ -69,9 +71,10 @@ public class LocalClusterCoordinator extends ClusterCoordinator {
   }
 
   @Override
-  public RegistrationHandle register(final DrillbitEndpoint data) {
+  public RegistrationHandle register( DrillbitEndpoint data) {
     logger.debug("Endpoint registered {}.", data);
-    final Handle h = new Handle();
+    final Handle h = new Handle(data);
+    data = data.toBuilder().setState(State.ONLINE).build();
     endpoints.put(h, data);
     return h;
   }
@@ -85,13 +88,62 @@ public class LocalClusterCoordinator extends ClusterCoordinator {
     endpoints.remove(handle);
   }
 
+  /**
+   * Update drillbit endpoint state. Drillbit advertises its
+   * state. State information is used during planning and initial
+   * client connection phases.
+   */
+  @Override
+  public RegistrationHandle update(RegistrationHandle handle, State state) {
+    DrillbitEndpoint endpoint = handle.getEndPoint();
+    endpoint = endpoint.toBuilder().setState(state).build();
+    handle.setEndPoint(endpoint);
+    endpoints.put(handle,endpoint);
+    return handle;
+  }
+
   @Override
   public Collection<DrillbitEndpoint> getAvailableEndpoints() {
     return endpoints.values();
   }
 
+  /**
+   * Get a collection of ONLINE Drillbit endpoints by excluding the drillbits
+   * that are in QUIESCENT state (drillbits shutting down). Primarily used by the planner
+   * to plan queries only on ONLINE drillbits and used by the client during initial connection
+   * phase to connect to a drillbit (foreman)
+   * @return A collection of ONLINE endpoints
+   */
+  @Override
+  public Collection<DrillbitEndpoint> getOnlineEndPoints() {
+    Collection<DrillbitEndpoint> runningEndPoints = new ArrayList<>();
+    for (DrillbitEndpoint endpoint: endpoints.values()){
+      if(isDrillbitInState(endpoint, State.ONLINE)) {
+        runningEndPoints.add(endpoint);
+      }
+    }
+    return runningEndPoints;
+  }
+
   private class Handle implements RegistrationHandle {
     private final UUID id = UUID.randomUUID();
+    private DrillbitEndpoint drillbitEndpoint;
+
+    /**
+     * Get the drillbit endpoint associated with the registration handle
+     * @return drillbit endpoint
+     */
+    public DrillbitEndpoint getEndPoint() {
+      return drillbitEndpoint;
+    }
+
+    public void setEndPoint(DrillbitEndpoint endpoint) {
+      this.drillbitEndpoint = endpoint;
+    }
+
+    private Handle(DrillbitEndpoint data) {
+      drillbitEndpoint = data;
+    }
 
     @Override
     public int hashCode() {

http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java
index b14a151..472bc3d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java
@@ -23,15 +23,16 @@ import static com.google.common.collect.Collections2.transform;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
+import java.util.ArrayList;
 import java.util.Set;
+import java.util.HashSet;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import com.google.common.collect.Lists;
+import org.apache.commons.collections.keyvalue.MultiKey;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -54,6 +55,7 @@ import org.apache.drill.exec.coord.store.TransientStore;
 import org.apache.drill.exec.coord.store.TransientStoreConfig;
 import org.apache.drill.exec.coord.store.TransientStoreFactory;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State;
 
 import com.google.common.base.Function;
 
@@ -70,7 +72,10 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
   private final CountDownLatch initialConnection = new CountDownLatch(1);
   private final TransientStoreFactory factory;
   private ServiceCache<DrillbitEndpoint> serviceCache;
+  private DrillbitEndpoint endpoint;
 
+  // endpointsMap maps Multikey( comprises of endoint address and port) to Drillbit endpoints
+  private ConcurrentHashMap<MultiKey, DrillbitEndpoint> endpointsMap = new ConcurrentHashMap<MultiKey,DrillbitEndpoint>();
   private static final Pattern ZK_COMPLEX_STRING = Pattern.compile("(^.*?)/(.*)/([^/]*)$");
 
   public ZKClusterCoordinator(DrillConfig config) throws IOException{
@@ -169,9 +174,10 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
   @Override
   public RegistrationHandle register(DrillbitEndpoint data) {
     try {
+      data = data.toBuilder().setState(State.ONLINE).build();
       ServiceInstance<DrillbitEndpoint> serviceInstance = newServiceInstance(data);
       discovery.registerService(serviceInstance);
-      return new ZKRegistrationHandle(serviceInstance.getId());
+      return new ZKRegistrationHandle(serviceInstance.getId(),data);
     } catch (Exception e) {
       throw propagate(e);
     }
@@ -200,11 +206,50 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
     }
   }
 
+  /**
+   * Update drillbit endpoint state. Drillbit advertises its
+   * state in Zookeeper when a shutdown request of drillbit is
+   * triggered. State information is used during planning and
+   * initial client connection phases.
+   */
+  public RegistrationHandle update(RegistrationHandle handle, State state) {
+    ZKRegistrationHandle h = (ZKRegistrationHandle) handle;
+      try {
+        endpoint = h.endpoint.toBuilder().setState(state).build();
+        ServiceInstance<DrillbitEndpoint> serviceInstance = ServiceInstance.<DrillbitEndpoint>builder()
+                .name(serviceName)
+                .id(h.id)
+                .payload(endpoint).build();
+        discovery.updateService(serviceInstance);
+      } catch (Exception e) {
+        propagate(e);
+      }
+      return handle;
+  }
+
   @Override
   public Collection<DrillbitEndpoint> getAvailableEndpoints() {
     return this.endpoints;
   }
 
+  /*
+   * Get a collection of ONLINE Drillbit endpoints by excluding the drillbits
+   * that are in QUIESCENT state (drillbits shutting down). Primarily used by the planner
+   * to plan queries only on ONLINE drillbits and used by the client during initial connection
+   * phase to connect to a drillbit (foreman)
+   * @return A collection of ONLINE endpoints
+   */
+  @Override
+  public Collection<DrillbitEndpoint> getOnlineEndPoints() {
+    Collection<DrillbitEndpoint> runningEndPoints = new ArrayList<>();
+    for (DrillbitEndpoint endpoint: endpoints){
+      if(isDrillbitInState(endpoint, State.ONLINE)) {
+        runningEndPoints.add(endpoint);
+      }
+    }
+    logger.debug("Online endpoints in ZK are" + runningEndPoints.toString());
+    return runningEndPoints;
+  }
 
   @Override
   public DistributedSemaphore getSemaphore(String name, int maximumLeases) {
@@ -219,6 +264,7 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
 
   private synchronized void updateEndpoints() {
     try {
+      // All active bits in the Zookeeper
       Collection<DrillbitEndpoint> newDrillbitSet =
       transform(discovery.queryForInstances(serviceName),
         new Function<ServiceInstance<DrillbitEndpoint>, DrillbitEndpoint>() {
@@ -229,27 +275,42 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
         });
 
       // set of newly dead bits : original bits - new set of active bits.
-      Set<DrillbitEndpoint> unregisteredBits = new HashSet<>(endpoints);
-      unregisteredBits.removeAll(newDrillbitSet);
-
+      Set<DrillbitEndpoint> unregisteredBits = new HashSet<>();
       // Set of newly live bits : new set of active bits - original bits.
-      Set<DrillbitEndpoint> registeredBits = new HashSet<>(newDrillbitSet);
-      registeredBits.removeAll(endpoints);
+      Set<DrillbitEndpoint> registeredBits = new HashSet<>();
 
-      endpoints = newDrillbitSet;
 
+      // Updates the endpoints map if there is a change in state of the endpoint or with the addition
+      // of new drillbit endpoints. Registered endpoints is set to newly live drillbit endpoints.
+      for ( DrillbitEndpoint endpoint : newDrillbitSet) {
+        String endpointAddress = endpoint.getAddress();
+        int endpointPort = endpoint.getUserPort();
+        if (! endpointsMap.containsKey(new MultiKey(endpointAddress, endpointPort))) {
+          registeredBits.add(endpoint);
+        }
+        endpointsMap.put(new MultiKey(endpointAddress, endpointPort),endpoint);
+      }
+      // Remove all the endpoints that are newly dead
+      for ( MultiKey key: endpointsMap.keySet()) {
+        if(!newDrillbitSet.contains(endpointsMap.get(key))) {
+          unregisteredBits.add(endpointsMap.get(key));
+          endpointsMap.remove(key);
+        }
+      }
+      endpoints = endpointsMap.values();
       if (logger.isDebugEnabled()) {
         StringBuilder builder = new StringBuilder();
         builder.append("Active drillbit set changed.  Now includes ");
         builder.append(newDrillbitSet.size());
         builder.append(" total bits. New active drillbits:\n");
-        builder.append("Address | User Port | Control Port | Data Port | Version |\n");
+        builder.append("Address | User Port | Control Port | Data Port | Version | State\n");
         for (DrillbitEndpoint bit: newDrillbitSet) {
           builder.append(bit.getAddress()).append(" | ");
           builder.append(bit.getUserPort()).append(" | ");
           builder.append(bit.getControlPort()).append(" | ");
           builder.append(bit.getDataPort()).append(" | ");
           builder.append(bit.getVersion()).append(" |");
+          builder.append(bit.getState()).append(" | ");
           builder.append('\n');
         }
         logger.debug(builder.toString());

http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKRegistrationHandle.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKRegistrationHandle.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKRegistrationHandle.java
index f0c465f..fca3296 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKRegistrationHandle.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKRegistrationHandle.java
@@ -18,15 +18,27 @@
 package org.apache.drill.exec.coord.zk;
 
 import org.apache.drill.exec.coord.ClusterCoordinator.RegistrationHandle;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
 public class ZKRegistrationHandle implements RegistrationHandle {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZKRegistrationHandle.class);
 
   public final String id;
+  public DrillbitEndpoint endpoint;
 
-  public ZKRegistrationHandle(String id) {
+  public DrillbitEndpoint getEndPoint() {
+    return endpoint;
+  }
+
+  @Override
+  public void setEndPoint(DrillbitEndpoint endpoint) {
+    this.endpoint = endpoint;
+  }
+
+  public ZKRegistrationHandle(String id, DrillbitEndpoint endpoint) {
     super();
     this.id = id;
+    this.endpoint = endpoint;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index 125dfac..eb32bc6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -213,6 +213,10 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext, Schem
     return drillbitContext.getBits();
   }
 
+  public Collection<DrillbitEndpoint> getOnlineEndpoints() {
+    return drillbitContext.getBits();
+  }
+
   public DrillConfig getConfig() {
     return drillbitContext.getConfig();
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index a333ff2..4144da0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.StackTrace;
+import org.apache.drill.common.concurrent.ExtendedLatch;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.map.CaseInsensitiveMap;
 import org.apache.drill.common.scanner.ClassPathScanner;
@@ -32,6 +33,7 @@ import org.apache.drill.exec.coord.ClusterCoordinator.RegistrationHandle;
 import org.apache.drill.exec.coord.zk.ZKClusterCoordinator;
 import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.server.DrillbitStateManager.DrillbitState;
 import org.apache.drill.exec.server.options.OptionDefinition;
 import org.apache.drill.exec.server.options.OptionValue;
 import org.apache.drill.exec.server.options.OptionValue.OptionScope;
@@ -48,6 +50,7 @@ import org.apache.drill.exec.util.GuavaPatcher;
 import org.apache.drill.exec.work.WorkManager;
 import org.apache.zookeeper.Environment;
 
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Stopwatch;
 
@@ -77,6 +80,23 @@ public class Drillbit implements AutoCloseable {
   private final WorkManager manager;
   private final BootStrapContext context;
   private final WebServer webServer;
+  private final int gracePeriod;
+  private DrillbitStateManager stateManager;
+  private boolean quiescentMode;
+  private boolean forcefulShutdown = false;
+
+  public void setQuiescentMode(boolean quiescentMode) {
+    this.quiescentMode = quiescentMode;
+  }
+
+  public void setForcefulShutdown(boolean forcefulShutdown) {
+    this.forcefulShutdown = forcefulShutdown;
+  }
+
+  public RegistrationHandle getRegistrationHandle() {
+    return registrationHandle;
+  }
+
   private RegistrationHandle registrationHandle;
   private volatile StoragePluginRegistry storageRegistry;
   private final PersistentStoreProvider profileStoreProvider;
@@ -110,13 +130,15 @@ public class Drillbit implements AutoCloseable {
     final CaseInsensitiveMap<OptionDefinition> definitions,
     final RemoteServiceSet serviceSet,
     final ScanResult classpathScan) throws Exception {
+    gracePeriod = config.getInt(ExecConstants.GRACE_PERIOD);
     final Stopwatch w = Stopwatch.createStarted();
     logger.debug("Construction started.");
-    final boolean allowPortHunting = serviceSet != null;
+    boolean drillPortHunt = config.getBoolean(ExecConstants.DRILL_PORT_HUNT);
+    final boolean allowPortHunting = (serviceSet != null) || drillPortHunt;
     context = new BootStrapContext(config, definitions, classpathScan);
     manager = new WorkManager(context);
 
-    webServer = new WebServer(context, manager);
+    webServer = new WebServer(context, manager, this);
     boolean isDistributedMode = false;
     if (serviceSet != null) {
       coord = serviceSet.getCoordinator();
@@ -137,6 +159,7 @@ public class Drillbit implements AutoCloseable {
 
     engine = new ServiceEngine(manager, context, allowPortHunting, isDistributedMode);
 
+    stateManager = new DrillbitStateManager(DrillbitState.STARTUP);
     logger.info("Construction completed ({} ms).", w.elapsed(TimeUnit.MILLISECONDS));
   }
 
@@ -152,6 +175,7 @@ public class Drillbit implements AutoCloseable {
     final Stopwatch w = Stopwatch.createStarted();
     logger.debug("Startup begun.");
     coord.start(10000);
+    stateManager.setState(DrillbitState.ONLINE);
     storeProvider.start();
     if (profileStoreProvider != storeProvider) {
       profileStoreProvider.start();
@@ -176,18 +200,43 @@ public class Drillbit implements AutoCloseable {
     logger.info("Startup completed ({} ms).", w.elapsed(TimeUnit.MILLISECONDS));
   }
 
+  /*
+    Wait uninterruptibly
+   */
+  public void waitForGracePeriod() {
+    ExtendedLatch exitLatch = new ExtendedLatch();
+    exitLatch.awaitUninterruptibly(gracePeriod);
+  }
+
+  /*
+
+   */
+  public void shutdown() {
+    this.close();
+  }
+ /*
+  The drillbit is moved into Quiescent state and the drillbit waits for grace period amount of time.
+  Then drillbit moves into draining state and waits for all the queries and fragments to complete.
+  */
   @Override
   public synchronized void close() {
-    // avoid complaints about double closing
-    if (isClosed) {
+    if ( !stateManager.getState().equals(DrillbitState.ONLINE)) {
       return;
     }
     final Stopwatch w = Stopwatch.createStarted();
     logger.debug("Shutdown begun.");
-
-    // wait for anything that is running to complete
-    manager.waitToExit();
-
+    registrationHandle = coord.update(registrationHandle, State.QUIESCENT);
+    stateManager.setState(DrillbitState.GRACE);
+    waitForGracePeriod();
+    stateManager.setState(DrillbitState.DRAINING);
+    // wait for all the in-flight queries to finish
+    manager.waitToExit(this, forcefulShutdown);
+    //safe to exit
+    registrationHandle = coord.update(registrationHandle, State.OFFLINE);
+    stateManager.setState(DrillbitState.OFFLINE);
+    if(quiescentMode == true) {
+      return;
+    }
     if (coord != null && registrationHandle != null) {
       coord.unregister(registrationHandle);
     }
@@ -219,8 +268,8 @@ public class Drillbit implements AutoCloseable {
       logger.warn("Failure on close()", e);
     }
 
-    logger.info("Shutdown completed ({} ms).", w.elapsed(TimeUnit.MILLISECONDS));
-    isClosed = true;
+    logger.info("Shutdown completed ({} ms).", w.elapsed(TimeUnit.MILLISECONDS) );
+    stateManager.setState(DrillbitState.SHUTDOWN);
   }
 
   private void javaPropertiesToSystemOptions() {

http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
index b8a8b1e..f65592b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
@@ -157,10 +157,39 @@ public class DrillbitContext implements AutoCloseable {
     return context.getConfig();
   }
 
-  public Collection<DrillbitEndpoint> getBits() {
+  public Collection<DrillbitEndpoint> getAvailableBits() {
     return coord.getAvailableEndpoints();
   }
 
+  public Collection<DrillbitEndpoint> getBits() {
+    return coord.getOnlineEndPoints();
+  }
+
+  public boolean isOnline(DrillbitEndpoint endpoint) {
+    return endpoint.getState().equals(DrillbitEndpoint.State.ONLINE);
+  }
+
+  public boolean isForeman(DrillbitEndpoint endpoint) {
+    DrillbitEndpoint foreman = getEndpoint();
+    if(endpoint.getAddress().equals(foreman.getAddress()) &&
+            endpoint.getUserPort() == foreman.getUserPort()) {
+      return true;
+    }
+    return false;
+  }
+
+  public boolean isForemanOnline() {
+    Collection<DrillbitEndpoint> dbs = getAvailableBits();
+    for (DrillbitEndpoint db : dbs) {
+      if( isForeman(db)) {
+        if (isOnline(db)) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
   public BufferAllocator getAllocator() {
     return context.getAllocator();
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitStateManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitStateManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitStateManager.java
new file mode 100644
index 0000000..dfffce8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitStateManager.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.server;
+/*
+  State manager to manage the state of drillbit.
+ */
+public class DrillbitStateManager {
+
+  public enum DrillbitState {
+    STARTUP, ONLINE, GRACE, DRAINING, OFFLINE, SHUTDOWN
+  }
+
+  private DrillbitState currentState;
+
+  public DrillbitStateManager(DrillbitState currentState) {
+    this.currentState = currentState;
+  }
+
+  public DrillbitState getState() {
+    return currentState;
+  }
+
+  public void setState(DrillbitState newState) {
+    switch (newState) {
+      case ONLINE:
+        if (currentState == DrillbitState.STARTUP) {
+          currentState = newState;
+        } else {
+          throw new IllegalStateException("Cannot set drillbit to" + newState + "from" + currentState);
+        }
+        break;
+      case GRACE:
+        if (currentState == DrillbitState.ONLINE) {
+          currentState = newState;
+        } else {
+          throw new IllegalStateException("Cannot set drillbit to" + newState + "from" + currentState);
+        }
+        break;
+      case DRAINING:
+        if (currentState == DrillbitState.GRACE) {
+          currentState = newState;
+        } else {
+          throw new IllegalStateException("Cannot set drillbit to" + newState + "from" + currentState);
+        }
+        break;
+      case OFFLINE:
+        if (currentState == DrillbitState.DRAINING) {
+          currentState = newState;
+        } else {
+          throw new IllegalStateException("Cannot set drillbit to" + newState + "from" + currentState);
+        }
+        break;
+      case SHUTDOWN:
+        if (currentState == DrillbitState.OFFLINE) {
+          currentState = newState;
+        } else {
+          throw new IllegalStateException("Cannot set drillbit to" + newState + "from" + currentState);
+        }
+        break;
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
index 1545847..89141d7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
@@ -37,6 +37,7 @@ import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.rpc.user.UserSession;
+import org.apache.drill.exec.server.Drillbit;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.rest.WebUserConnection.AnonWebUserConnection;
 import org.apache.drill.exec.server.rest.auth.AuthDynamicFeature;
@@ -73,7 +74,7 @@ import java.util.List;
 public class DrillRestServer extends ResourceConfig {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillRestServer.class);
 
-  public DrillRestServer(final WorkManager workManager, final ServletContext servletContext) {
+  public DrillRestServer(final WorkManager workManager, final ServletContext servletContext, final Drillbit drillbit) {
     register(DrillRoot.class);
     register(StatusResources.class);
     register(StorageResources.class);
@@ -120,6 +121,7 @@ public class DrillRestServer extends ResourceConfig {
     register(new AbstractBinder() {
       @Override
       protected void configure() {
+        bind(drillbit).to(Drillbit.class);
         bind(workManager).to(WorkManager.class);
         bind(executor).to(EventExecutor.class);
         bind(workManager.getContext().getLpPersistence().getMapper()).to(ObjectMapper.class);

http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java
index 55bfca4..da1d2fb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java
@@ -18,12 +18,16 @@
 package org.apache.drill.exec.server.rest;
 
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
 import javax.annotation.security.PermitAll;
 import javax.inject.Inject;
 import javax.ws.rs.GET;
+import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
 import javax.ws.rs.core.SecurityContext;
 import javax.xml.bind.annotation.XmlRootElement;
 
@@ -33,6 +37,7 @@ import com.google.common.collect.Sets;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.server.Drillbit;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.rest.DrillRestServer.UserAuthEnabled;
@@ -55,9 +60,18 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 public class DrillRoot {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillRoot.class);
 
-  @Inject UserAuthEnabled authEnabled;
-  @Inject WorkManager work;
-  @Inject SecurityContext sc;
+  @Inject
+  UserAuthEnabled authEnabled;
+  @Inject
+  WorkManager work;
+  @Inject
+  SecurityContext sc;
+  @Inject
+  Drillbit drillbit;
+
+  public enum ShutdownMode {
+    forcefulShutdown, gracefulShutdown, quiescent
+  }
 
   @GET
   @Produces(MediaType.TEXT_HTML)
@@ -65,6 +79,90 @@ public class DrillRoot {
     return ViewableWithPermissions.create(authEnabled.get(), "/rest/index.ftl", sc, getClusterInfoJSON());
   }
 
+
+  @SuppressWarnings("resource")
+  @GET
+  @Path("/state")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response getDrillbitStatus() {
+    Collection<DrillbitInfo> drillbits = getClusterInfoJSON().getDrillbits();
+    Map<String, String> drillStatusMap = new HashMap<String, String>();
+    for (DrillbitInfo drillbit : drillbits) {
+      drillStatusMap.put(drillbit.getAddress() + "-" + drillbit.getUserPort(), drillbit.getState());
+    }
+    Response response = setResponse(drillStatusMap);
+    return response;
+  }
+
+  @SuppressWarnings("resource")
+  @GET
+  @Path("/gracePeriod")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Map<String, Integer> getGracePeriod() {
+
+    final DrillConfig config = work.getContext().getConfig();
+    final int gracePeriod = config.getInt(ExecConstants.GRACE_PERIOD);
+    Map<String, Integer> gracePeriodMap = new HashMap<String, Integer>();
+    gracePeriodMap.put("graceperiod", gracePeriod);
+    return gracePeriodMap;
+  }
+
+  @SuppressWarnings("resource")
+  @GET
+  @Path("/portNum")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Map<String, Integer> getPortNum() {
+
+    final DrillConfig config = work.getContext().getConfig();
+    final int port = config.getInt(ExecConstants.HTTP_PORT);
+    Map<String, Integer> portMap = new HashMap<String, Integer>();
+    portMap.put("port", port);
+    return portMap;
+  }
+
+  @SuppressWarnings("resource")
+  @GET
+  @Path("/queriesCount")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response getRemainingQueries() {
+    Map<String, Integer> queriesInfo = new HashMap<String, Integer>();
+    queriesInfo = work.getRemainingQueries();
+    Response response = setResponse(queriesInfo);
+    return response;
+  }
+
+  @SuppressWarnings("resource")
+  @POST
+  @Path("/gracefulShutdown")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response shutdownDrillbit() throws Exception {
+    String resp = "Graceful Shutdown request is triggered";
+    return shutdown(resp);
+
+  }
+
+  @SuppressWarnings("resource")
+  @POST
+  @Path("/shutdown")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response ShutdownForcefully() throws Exception {
+    drillbit.setForcefulShutdown(true);
+    String resp = "Forceful shutdown request is triggered";
+    return shutdown(resp);
+
+  }
+
+  @SuppressWarnings("resource")
+  @POST
+  @Path("/quiescent")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response drillbitToQuiescentMode() throws Exception {
+    drillbit.setQuiescentMode(true);
+    String resp = "Request to put drillbit in Quiescent mode is triggered";
+    return shutdown(resp);
+  }
+
+
   @SuppressWarnings("resource")
   @GET
   @Path("/cluster.json")
@@ -79,8 +177,8 @@ public class DrillRoot {
 
     final DrillConfig config = dbContext.getConfig();
     final boolean userEncryptionEnabled =
-        config.getBoolean(ExecConstants.USER_ENCRYPTION_SASL_ENABLED) ||
-            config .getBoolean(ExecConstants.USER_SSL_ENABLED);
+            config.getBoolean(ExecConstants.USER_ENCRYPTION_SASL_ENABLED) ||
+                    config .getBoolean(ExecConstants.USER_SSL_ENABLED);
     final boolean bitEncryptionEnabled = config.getBoolean(ExecConstants.BIT_ENCRYPTION_SASL_ENABLED);
     // If the user is logged in and is admin user then show the admin user info
     // For all other cases the user info need-not or should-not be displayed
@@ -94,7 +192,7 @@ public class DrillRoot {
     final boolean shouldShowUserInfo = isUserLoggedIn &&
             ((DrillUserPrincipal)sc.getUserPrincipal()).isAdminUser();
 
-    for (DrillbitEndpoint endpoint : work.getContext().getBits()) {
+    for (DrillbitEndpoint endpoint : work.getContext().getAvailableBits()) {
       final DrillbitInfo drillbit = new DrillbitInfo(endpoint,
               currentDrillbit.equals(endpoint),
               currentVersion.equals(endpoint.getVersion()));
@@ -107,219 +205,254 @@ public class DrillRoot {
             " userLoggedIn "  + isUserLoggedIn + " shouldShowUserInfo: " + shouldShowUserInfo );
 
     return new ClusterInfo(drillbits, currentVersion, mismatchedVersions,
-      userEncryptionEnabled, bitEncryptionEnabled, processUser, processUserGroups, adminUsers,
-      adminUserGroups, shouldShowUserInfo, QueueInfo.build(dbContext.getResourceManager()));
+            userEncryptionEnabled, bitEncryptionEnabled, processUser, processUserGroups, adminUsers,
+            adminUserGroups, shouldShowUserInfo, QueueInfo.build(dbContext.getResourceManager()));
   }
 
-  /**
-   * Pretty-printing wrapper class around the ZK-based queue summary.
-   */
+  public Response setResponse(Map entity) {
+    return Response.ok()
+            .entity(entity)
+            .header("Access-Control-Allow-Origin", "*")
+            .header("Access-Control-Allow-Methods", "GET, POST, DELETE, PUT")
+            .header("Access-Control-Allow-Credentials","true")
+            .allow("OPTIONS").build();
+  }
 
-  @XmlRootElement
-  public static class QueueInfo {
-    private final ZKQueueInfo zkQueueInfo;
+  public Response shutdown(String resp) throws Exception {
+    Map<String, String> shutdownInfo = new HashMap<String, String>();
+    new Thread(new Runnable() {
+        public void run() {
+          try {
+            drillbit.close();
+          } catch (Exception e) {
+            logger.error("Request to shutdown drillbit failed", e);
+          }
+        }
+      }).start();
+    shutdownInfo.put("response",resp);
+    Response response = setResponse(shutdownInfo);
+    return response;
+  }
 
-    public static QueueInfo build(ResourceManager rm) {
 
-      // Consider queues enabled only if the ZK-based queues are in use.
+/**
+ * Pretty-printing wrapper class around the ZK-based queue summary.
+ */
 
-      ThrottledResourceManager throttledRM = null;
-      if (rm != null && rm instanceof DynamicResourceManager) {
-        DynamicResourceManager dynamicRM = (DynamicResourceManager) rm;
-        rm = dynamicRM.activeRM();
-      }
-      if (rm != null && rm instanceof ThrottledResourceManager) {
-        throttledRM = (ThrottledResourceManager) rm;
-      }
-      if (throttledRM == null) {
-        return new QueueInfo(null);
-      }
-      QueryQueue queue = throttledRM.queue();
-      if (queue == null || !(queue instanceof DistributedQueryQueue)) {
-        return new QueueInfo(null);
-      }
+@XmlRootElement
+public static class QueueInfo {
+  private final ZKQueueInfo zkQueueInfo;
 
-      return new QueueInfo(((DistributedQueryQueue) queue).getInfo());
-    }
+  public static QueueInfo build(ResourceManager rm) {
+
+    // Consider queues enabled only if the ZK-based queues are in use.
 
-    @JsonCreator
-    public QueueInfo(ZKQueueInfo queueInfo) {
-      zkQueueInfo = queueInfo;
+    ThrottledResourceManager throttledRM = null;
+    if (rm != null && rm instanceof DynamicResourceManager) {
+      DynamicResourceManager dynamicRM = (DynamicResourceManager) rm;
+      rm = dynamicRM.activeRM();
+    }
+    if (rm != null && rm instanceof ThrottledResourceManager) {
+      throttledRM = (ThrottledResourceManager) rm;
+    }
+    if (throttledRM == null) {
+      return new QueueInfo(null);
+    }
+    QueryQueue queue = throttledRM.queue();
+    if (queue == null || !(queue instanceof DistributedQueryQueue)) {
+      return new QueueInfo(null);
     }
 
-    public boolean isEnabled() { return zkQueueInfo != null; }
+    return new QueueInfo(((DistributedQueryQueue) queue).getInfo());
+  }
 
-    public int smallQueueSize() {
-      return isEnabled() ? zkQueueInfo.smallQueueSize : 0;
-    }
+  @JsonCreator
+  public QueueInfo(ZKQueueInfo queueInfo) {
+    zkQueueInfo = queueInfo;
+  }
 
-    public int largeQueueSize() {
-      return isEnabled() ? zkQueueInfo.largeQueueSize : 0;
-    }
+  public boolean isEnabled() { return zkQueueInfo != null; }
 
-    public String threshold() {
-      return isEnabled()
-          ? Double.toString(zkQueueInfo.queueThreshold)
-          : "N/A";
-    }
+  public int smallQueueSize() {
+    return isEnabled() ? zkQueueInfo.smallQueueSize : 0;
+  }
 
-    public String smallQueueMemory() {
-      return isEnabled()
-          ? toBytes(zkQueueInfo.memoryPerSmallQuery)
-          : "N/A";
-    }
+  public int largeQueueSize() {
+    return isEnabled() ? zkQueueInfo.largeQueueSize : 0;
+  }
 
-    public String largeQueueMemory() {
-      return isEnabled()
-          ? toBytes(zkQueueInfo.memoryPerLargeQuery)
-          : "N/A";
-    }
+  public String threshold() {
+    return isEnabled()
+            ? Double.toString(zkQueueInfo.queueThreshold)
+            : "N/A";
+  }
 
-    public String totalMemory() {
-      return isEnabled()
-          ? toBytes(zkQueueInfo.memoryPerNode)
-          : "N/A";
-    }
+  public String smallQueueMemory() {
+    return isEnabled()
+            ? toBytes(zkQueueInfo.memoryPerSmallQuery)
+            : "N/A";
+  }
 
-    private final long ONE_MB = 1024 * 1024;
+  public String largeQueueMemory() {
+    return isEnabled()
+            ? toBytes(zkQueueInfo.memoryPerLargeQuery)
+            : "N/A";
+  }
 
-    private String toBytes(long memory) {
-      if (memory < 10 * ONE_MB) {
-        return String.format("%,d bytes", memory);
-      } else {
-        return String.format("%,.0f MB", memory * 1.0D / ONE_MB);
-      }
-    }
+  public String totalMemory() {
+    return isEnabled()
+            ? toBytes(zkQueueInfo.memoryPerNode)
+            : "N/A";
   }
 
-  @XmlRootElement
-  public static class ClusterInfo {
-    private final Collection<DrillbitInfo> drillbits;
-    private final String currentVersion;
-    private final Collection<String> mismatchedVersions;
-    private final boolean userEncryptionEnabled;
-    private final boolean bitEncryptionEnabled;
-    private final String adminUsers;
-    private final String adminUserGroups;
-    private final String processUser;
-    private final String processUserGroups;
-    private final boolean shouldShowUserInfo;
-    private final QueueInfo queueInfo;
-
-    @JsonCreator
-    public ClusterInfo(Collection<DrillbitInfo> drillbits,
-                       String currentVersion,
-                       Collection<String> mismatchedVersions,
-                       boolean userEncryption,
-                       boolean bitEncryption,
-                       String processUser,
-                       String processUserGroups,
-                       String adminUsers,
-                       String adminUserGroups,
-                       boolean shouldShowUserInfo,
-                       QueueInfo queueInfo) {
-      this.drillbits = Sets.newTreeSet(drillbits);
-      this.currentVersion = currentVersion;
-      this.mismatchedVersions = Sets.newTreeSet(mismatchedVersions);
-      this.userEncryptionEnabled = userEncryption;
-      this.bitEncryptionEnabled = bitEncryption;
-      this.processUser = processUser;
-      this.processUserGroups = processUserGroups;
-      this.adminUsers = adminUsers;
-      this.adminUserGroups = adminUserGroups;
-      this.shouldShowUserInfo = shouldShowUserInfo;
-      this.queueInfo = queueInfo;
-    }
+  private final long ONE_MB = 1024 * 1024;
 
-    public Collection<DrillbitInfo> getDrillbits() {
-      return Sets.newTreeSet(drillbits);
+  private String toBytes(long memory) {
+    if (memory < 10 * ONE_MB) {
+      return String.format("%,d bytes", memory);
+    } else {
+      return String.format("%,.0f MB", memory * 1.0D / ONE_MB);
     }
+  }
+}
 
-    public String getCurrentVersion() {
-      return currentVersion;
-    }
+@XmlRootElement
+public static class ClusterInfo {
+  private final Collection<DrillbitInfo> drillbits;
+  private final String currentVersion;
+  private final Collection<String> mismatchedVersions;
+  private final boolean userEncryptionEnabled;
+  private final boolean bitEncryptionEnabled;
+  private final String adminUsers;
+  private final String adminUserGroups;
+  private final String processUser;
+  private final String processUserGroups;
+  private final boolean shouldShowUserInfo;
+  private final QueueInfo queueInfo;
+
+  @JsonCreator
+  public ClusterInfo(Collection<DrillbitInfo> drillbits,
+                     String currentVersion,
+                     Collection<String> mismatchedVersions,
+                     boolean userEncryption,
+                     boolean bitEncryption,
+                     String processUser,
+                     String processUserGroups,
+                     String adminUsers,
+                     String adminUserGroups,
+                     boolean shouldShowUserInfo,
+                     QueueInfo queueInfo) {
+    this.drillbits = Sets.newTreeSet(drillbits);
+    this.currentVersion = currentVersion;
+    this.mismatchedVersions = Sets.newTreeSet(mismatchedVersions);
+    this.userEncryptionEnabled = userEncryption;
+    this.bitEncryptionEnabled = bitEncryption;
+    this.processUser = processUser;
+    this.processUserGroups = processUserGroups;
+    this.adminUsers = adminUsers;
+    this.adminUserGroups = adminUserGroups;
+    this.shouldShowUserInfo = shouldShowUserInfo;
+    this.queueInfo = queueInfo;
+  }
 
-    public Collection<String> getMismatchedVersions() {
-      return Sets.newTreeSet(mismatchedVersions);
-    }
+  public Collection<DrillbitInfo> getDrillbits() {
+    return Sets.newTreeSet(drillbits);
+  }
+
+  public String getCurrentVersion() {
+    return currentVersion;
+  }
+
+  public Collection<String> getMismatchedVersions() {
+    return Sets.newTreeSet(mismatchedVersions);
+  }
+
+  public boolean isUserEncryptionEnabled() { return userEncryptionEnabled; }
 
-    public boolean isUserEncryptionEnabled() { return userEncryptionEnabled; }
+  public boolean isBitEncryptionEnabled() { return bitEncryptionEnabled; }
 
-    public boolean isBitEncryptionEnabled() { return bitEncryptionEnabled; }
+  public String getProcessUser() { return processUser; }
 
-    public String getProcessUser() { return processUser; }
+  public String getProcessUserGroups() { return processUserGroups; }
 
-    public String getProcessUserGroups() { return processUserGroups; }
+  public String getAdminUsers() { return adminUsers; }
 
-    public String getAdminUsers() { return adminUsers; }
+  public String getAdminUserGroups() { return adminUserGroups; }
 
-    public String getAdminUserGroups() { return adminUserGroups; }
+  public boolean shouldShowUserInfo() { return shouldShowUserInfo; }
 
-    public boolean shouldShowUserInfo() { return shouldShowUserInfo; }
+  public QueueInfo queueInfo() { return queueInfo; }
+}
 
-    public QueueInfo queueInfo() { return queueInfo; }
+public static class DrillbitInfo implements Comparable<DrillbitInfo> {
+  private final String address;
+  private final String userPort;
+  private final String controlPort;
+  private final String dataPort;
+  private final String version;
+  private final boolean current;
+  private final boolean versionMatch;
+  private final String state;
+
+  @JsonCreator
+  public DrillbitInfo(DrillbitEndpoint drillbit, boolean current, boolean versionMatch) {
+    this.address = drillbit.getAddress();
+    this.userPort = String.valueOf(drillbit.getUserPort());
+    this.controlPort = String.valueOf(drillbit.getControlPort());
+    this.dataPort = String.valueOf(drillbit.getDataPort());
+    this.version = Strings.isNullOrEmpty(drillbit.getVersion()) ? "Undefined" : drillbit.getVersion();
+    this.current = current;
+    this.versionMatch = versionMatch;
+    this.state = String.valueOf(drillbit.getState());
   }
 
-  public static class DrillbitInfo implements Comparable<DrillbitInfo> {
-    private final String address;
-    private final String userPort;
-    private final String controlPort;
-    private final String dataPort;
-    private final String version;
-    private final boolean current;
-    private final boolean versionMatch;
-
-    @JsonCreator
-    public DrillbitInfo(DrillbitEndpoint drillbit, boolean current, boolean versionMatch) {
-      this.address = drillbit.getAddress();
-      this.userPort = String.valueOf(drillbit.getUserPort());
-      this.controlPort = String.valueOf(drillbit.getControlPort());
-      this.dataPort = String.valueOf(drillbit.getDataPort());
-      this.version = Strings.isNullOrEmpty(drillbit.getVersion()) ? "Undefined" : drillbit.getVersion();
-      this.current = current;
-      this.versionMatch = versionMatch;
-    }
+  public String getAddress() { return address; }
 
-    public String getAddress() { return address; }
+  public String getUserPort() { return userPort; }
 
-    public String getUserPort() { return userPort; }
+  public String getControlPort() { return controlPort; }
 
-    public String getControlPort() { return controlPort; }
+  public String getDataPort() { return dataPort; }
 
-    public String getDataPort() { return dataPort; }
+  public String getVersion() { return version; }
 
-    public String getVersion() { return version; }
+  public boolean isCurrent() { return current; }
 
-    public boolean isCurrent() { return current; }
+  public boolean isVersionMatch() { return versionMatch; }
 
-    public boolean isVersionMatch() { return versionMatch; }
+  public String getState() { return state; }
 
-    /**
-     * Method used to sort Drillbits. Current Drillbit goes first.
-     * Then Drillbits with matching versions, after them Drillbits with mismatching versions.
-     * Matching Drillbits are sorted according address natural order,
-     * mismatching Drillbits are sorted according version, address natural order.
-     *
-     * @param drillbitToCompare Drillbit to compare against
-     * @return -1 if Drillbit should be before, 1 if after in list
-     */
-    @Override
-    public int compareTo(DrillbitInfo drillbitToCompare) {
-      if (this.isCurrent()) {
-        return -1;
-      }
+  /**
+   * Method used to sort Drillbits. Current Drillbit goes first.
+   * Then Drillbits with matching versions, after them Drillbits with mismatching versions.
+   * Matching Drillbits are sorted according address natural order,
+   * mismatching Drillbits are sorted according version, address natural order.
+   *
+   * @param drillbitToCompare Drillbit to compare against
+   * @return -1 if Drillbit should be before, 1 if after in list
+   */
+  @Override
+  public int compareTo(DrillbitInfo drillbitToCompare) {
+    if (this.isCurrent()) {
+      return -1;
+    }
 
-      if (drillbitToCompare.isCurrent()) {
-        return 1;
-      }
+    if (drillbitToCompare.isCurrent()) {
+      return 1;
+    }
 
-      if (this.isVersionMatch() == drillbitToCompare.isVersionMatch()) {
-        if (this.version.equals(drillbitToCompare.getVersion())) {
-          return this.address.compareTo(drillbitToCompare.getAddress());
+    if (this.isVersionMatch() == drillbitToCompare.isVersionMatch()) {
+      if (this.version.equals(drillbitToCompare.getVersion())) {
+        {
+          if (this.address.equals(drillbitToCompare.getAddress())) {
+            return (this.controlPort.compareTo(drillbitToCompare.getControlPort()));
+          }
+          return (this.address.compareTo(drillbitToCompare.getAddress()));
         }
-        return this.version.compareTo(drillbitToCompare.getVersion());
       }
-      return this.versionMatch ? -1 : 1;
+      return this.version.compareTo(drillbitToCompare.getVersion());
     }
+    return this.versionMatch ? -1 : 1;
   }
 }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java
index 1ad2a09..f0e822f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java
@@ -23,13 +23,14 @@ import com.codahale.metrics.servlets.ThreadDumpServlet;
 import com.google.common.collect.ImmutableSet;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.drill.common.exceptions.DrillException;
 import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.DrillException;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ssl.SSLConfig;
 import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.rpc.security.plain.PlainFactory;
 import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.server.Drillbit;
 import org.apache.drill.exec.server.rest.auth.DrillRestLoginService;
 import org.apache.drill.exec.ssl.SSLConfigBuilder;
 import org.apache.drill.exec.work.WorkManager;
@@ -106,6 +107,8 @@ public class WebServer implements AutoCloseable {
 
   private Server embeddedJetty;
 
+  private final Drillbit drillbit;
+
   private int port;
 
   /**
@@ -114,11 +117,12 @@ public class WebServer implements AutoCloseable {
    * @param context Bootstrap context.
    * @param workManager WorkManager instance.
    */
-  public WebServer(final BootStrapContext context, final WorkManager workManager) {
+  public WebServer(final BootStrapContext context, final WorkManager workManager, final Drillbit drillbit) {
     this.context = context;
     this.config = context.getConfig();
     this.metrics = context.getMetrics();
     this.workManager = workManager;
+    this.drillbit = drillbit;
   }
 
   private static final String BASE_STATIC_PATH = "/rest/static/";
@@ -193,7 +197,7 @@ public class WebServer implements AutoCloseable {
     servletContextHandler.setErrorHandler(errorHandler);
     servletContextHandler.setContextPath("/");
 
-    final ServletHolder servletHolder = new ServletHolder(new ServletContainer(new DrillRestServer(workManager, servletContextHandler.getServletContext())));
+    final ServletHolder servletHolder = new ServletHolder(new ServletContainer(new DrillRestServer(workManager, servletContextHandler.getServletContext(), drillbit)));
     servletHolder.setInitOrder(1);
     servletContextHandler.addServlet(servletHolder, "/*");
 

http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
index 29ae0f6..3efa054 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
@@ -35,6 +35,7 @@ import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State;
 import org.apache.drill.exec.rpc.TransportCheck;
 import org.apache.drill.exec.rpc.control.Controller;
 import org.apache.drill.exec.rpc.control.ControllerImpl;
@@ -102,6 +103,7 @@ public class ServiceEngine implements AutoCloseable {
         .setAddress(hostName)
         .setUserPort(userPort)
         .setVersion(DrillVersionInfo.getVersion())
+        .setState(State.STARTUP)
         .build();
 
     partialEndpoint = controller.start(partialEndpoint, allowPortHunting);

http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java
index 836d339..dc4e7c7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java
@@ -29,7 +29,7 @@ public class DrillbitIterator implements Iterator<Object> {
   private DrillbitEndpoint current;
 
   public DrillbitIterator(FragmentContext c) {
-    this.endpoints = c.getDrillbitContext().getBits().iterator();
+    this.endpoints = c.getDrillbitContext().getAvailableBits().iterator();
     this.current = c.getIdentity();
   }
 
@@ -40,6 +40,7 @@ public class DrillbitIterator implements Iterator<Object> {
     public int data_port;
     public boolean current;
     public String version;
+    public String state;
   }
 
   @Override
@@ -51,15 +52,28 @@ public class DrillbitIterator implements Iterator<Object> {
   public Object next() {
     DrillbitEndpoint ep = endpoints.next();
     DrillbitInstance i = new DrillbitInstance();
-    i.current = ep.equals(current);
+    i.current = isCurrent(ep);
     i.hostname = ep.getAddress();
     i.user_port = ep.getUserPort();
     i.control_port = ep.getControlPort();
     i.data_port = ep.getDataPort();
     i.version = ep.getVersion();
+    i.state = ep.getState().toString();
     return i;
   }
 
+  public boolean isCurrent(DrillbitEndpoint ep) {
+
+    String epAddress = ep.getAddress();
+    int epPort = ep.getUserPort();
+    String currentEpAddress = current.getAddress();
+    int currentEpPort = current.getUserPort();
+    if (currentEpAddress.equals(epAddress) && currentEpPort == epPort) {
+      return true;
+    }
+    return false;
+  }
+
   @Override
   public void remove() {
     throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index 6e560a9..5d369de 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -37,6 +37,7 @@ import org.apache.drill.exec.rpc.control.Controller;
 import org.apache.drill.exec.rpc.control.WorkEventBus;
 import org.apache.drill.exec.rpc.data.DataConnectionCreator;
 import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.server.Drillbit;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.sys.PersistentStoreProvider;
 import org.apache.drill.exec.work.batch.ControlMessageHandler;
@@ -46,6 +47,7 @@ import org.apache.drill.exec.work.fragment.FragmentExecutor;
 import org.apache.drill.exec.work.fragment.FragmentManager;
 import org.apache.drill.exec.work.user.UserWorker;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -77,6 +79,8 @@ public class WorkManager implements AutoCloseable {
   private final WorkEventBus workBus;
   private final Executor executor;
   private final StatusThread statusThread;
+  private long numOfRunningQueries;
+  private long numOfRunningFragments;
 
   /**
    * How often the StatusThread collects statistics about running fragments.
@@ -165,32 +169,57 @@ public class WorkManager implements AutoCloseable {
    *
    * <p>This is intended to be used by {@link org.apache.drill.exec.server.Drillbit#close()}.</p>
    */
-  public void waitToExit() {
+  public void waitToExit(Drillbit bit, boolean forcefulShutdown) {
     synchronized(this) {
-      if (queries.isEmpty() && runningFragments.isEmpty()) {
+      numOfRunningQueries = queries.size();
+      numOfRunningFragments = runningFragments.size();
+      if ( queries.isEmpty() && runningFragments.isEmpty()) {
         return;
       }
-
+      logger.info("Draining " + queries +" queries and "+ runningFragments+" fragments.");
       exitLatch = new ExtendedLatch();
     }
-
-    // Wait for at most 5 seconds or until the latch is released.
-    exitLatch.awaitUninterruptibly(5000);
+    // Wait uninterruptibly until all the queries and running fragments on that drillbit goes down
+    // to zero
+    if( forcefulShutdown ) {
+      exitLatch.awaitUninterruptibly(5000);
+    } else {
+      exitLatch.awaitUninterruptibly();
+    }
   }
 
   /**
    * If it is safe to exit, and the exitLatch is in use, signals it so that waitToExit() will
-   * unblock.
+   * unblock. Logs the number of pending fragments and queries that are running on that
+   * drillbit to track the progress of shutdown process.
    */
   private void indicateIfSafeToExit() {
     synchronized(this) {
       if (exitLatch != null) {
+        logger.info("Waiting for "+ queries.size() +" queries to complete before shutting down");
+        logger.info("Waiting for "+ runningFragments.size() +" running fragments to complete before shutting down");
+        if(runningFragments.size() > numOfRunningFragments|| queries.size() > numOfRunningQueries) {
+          logger.info("New Fragments or queries are added while drillbit is Shutting down");
+        }
         if (queries.isEmpty() && runningFragments.isEmpty()) {
+          // Both Queries and Running fragments are empty.
+          // So its safe for the drillbit to exit.
           exitLatch.countDown();
         }
       }
     }
   }
+  /**
+   *  Get the number of queries that are running on a drillbit.
+   *  Primarily used to monitor the number of running queries after a
+   *  shutdown request is triggered.
+   */
+  public synchronized Map<String, Integer> getRemainingQueries() {
+        Map<String, Integer> queriesInfo = new HashMap<String, Integer>();
+        queriesInfo.put("queriesCount", queries.size());
+        queriesInfo.put("fragmentsCount", runningFragments.size());
+        return queriesInfo;
+  }
 
   /**
    * Narrowed interface to WorkManager that is made available to tasks it is managing.

http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index a1d150e..8ce8639 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -253,7 +253,17 @@ public class Foreman implements Runnable {
     final Thread currentThread = Thread.currentThread();
     final String originalName = currentThread.getName();
     currentThread.setName(queryIdString + ":foreman");
-
+    try {
+      /*
+       Check if the foreman is ONLINE. If not dont accept any new queries.
+      */
+      if (!drillbitContext.isForemanOnline()) {
+        throw new ForemanException("Query submission failed since Foreman is shutting down.");
+      }
+    } catch (ForemanException e) {
+      logger.debug("Failure while submitting query", e);
+      addToEventQueue(QueryState.FAILED, e);
+    }
     // track how long the query takes
     queryManager.markStartTime();
     enqueuedQueries.dec();
@@ -559,7 +569,7 @@ public class Foreman implements Runnable {
     final SimpleParallelizer parallelizer = new SimpleParallelizer(queryContext);
     return parallelizer.getFragments(
         queryContext.getOptions().getOptionList(), queryContext.getCurrentEndpoint(),
-        queryId, queryContext.getActiveEndpoints(), rootFragment,
+        queryId, queryContext.getOnlineEndpoints(), rootFragment,
         initiatingClient.getSession(), queryContext.getQueryContextInfo());
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index f5e85a3..c923e4f 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -369,6 +369,16 @@ drill.exec: {
     // planning and managing queries. Primarily for testing.
     cpus_per_node: 0,
   }
+  # Grace period is the amount of time where the drillbit accepts work after
+  # the shutdown request is triggered. The primary use of grace period is to
+  # avoid the race conditions caused by zookeeper delay in updating the state
+  # information of the drillbit that is shutting down. So, it is advisable
+  # to have a grace period that is atleast twice the amount of zookeeper
+  # refresh time.
+  grace_period_ms : 0,
+  //port hunting for drillbits. Enabled only for testing purposes.
+  port_hunt : false
+
 }
 
 drill.jdbc: {

http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/resources/rest/index.ftl
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/rest/index.ftl b/exec/java-exec/src/main/resources/rest/index.ftl
index 45dc1c9..74425d6 100644
--- a/exec/java-exec/src/main/resources/rest/index.ftl
+++ b/exec/java-exec/src/main/resources/rest/index.ftl
@@ -46,7 +46,7 @@
 
   <div class="row">
     <div class="col-md-12">
-      <h3>Drillbits <span class="label label-primary">${model.getDrillbits()?size}</span></h3>
+      <h3>Drillbits <span class="label label-primary" id="size" >${model.getDrillbits()?size}</span></h3>
       <div class="table-responsive">
         <table class="table table-hover">
           <thead>
@@ -57,19 +57,19 @@
               <th>Control Port</th>
               <th>Data Port</th>
               <th>Version</th>
+              <th>Status</th>
             </tr>
           </thead>
           <tbody>
             <#assign i = 1>
             <#list model.getDrillbits() as drillbit>
-              <tr>
+              <tr id="row-${i}">
                 <td>${i}</td>
-                <td>${drillbit.getAddress()}
-                  <#if drillbit.isCurrent()>
+                <td id="address" >${drillbit.getAddress()}<#if drillbit.isCurrent()>
                     <span class="label label-info">Current</span>
                   </#if>
                 </td>
-                <td>${drillbit.getUserPort()}</td>
+                <td id="port" >${drillbit.getUserPort()}</td>
                 <td>${drillbit.getControlPort()}</td>
                 <td>${drillbit.getDataPort()}</td>
                 <td>
@@ -78,6 +78,11 @@
                     ${drillbit.getVersion()}
                   </span>
                 </td>
+                <td id="status" >${drillbit.getState()}</td>
+                <td>
+                    <button type="button" id="shutdown" onClick="shutdown('${drillbit.getAddress()}',$(this));"> SHUTDOWN </button>
+                </td>
+                <td id="queriesCount">  </td>
               </tr>
               <#assign i = i + 1>
             </#list>
@@ -179,6 +184,101 @@
         </div>
       </div>
   </div>
+   <script charset="utf-8">
+      var refreshTime = 2000;
+      var refresh = getRefreshTime();
+      var portNum = 0;
+      var port = getPortNum();
+      console.log(portNum);
+      var timeout;
+      var size = $("#size").html();
+
+
+      function getPortNum() {
+          var port = $.ajax({
+                          type: 'GET',
+                          url: '/portNum',
+                          dataType: "json",
+                          complete: function(data) {
+                                portNum = data.responseJSON["port"];
+                                }
+                          });
+      }
+
+      function getRefreshTime() {
+          var refresh = $.ajax({
+                          type: 'GET',
+                          url: '/gracePeriod',
+                          dataType: "json",
+                          complete: function(data) {
+                                refreshTime = data.responseJSON["graceperiod"];
+                                refreshTime = refreshTime/3;
+                                timeout = setTimeout(reloadStatus,refreshTime );
+                                }
+                          });
+      }
+      function reloadStatus () {
+          console.log(refreshTime);
+          var result = $.ajax({
+                      type: 'GET',
+                      url: '/state',
+                      dataType: "json",
+                      complete: function(data) {
+                            fillStatus(data,size);
+                            }
+                      });
+          timeout = setTimeout(reloadStatus, refreshTime);
+      }
+
+      function fillStatus(data,size) {
+          var status_map = (data.responseJSON);
+          for (i = 1; i <= size; i++) {
+            var address = $("#row-"+i).find("#address").contents().get(0).nodeValue;
+            address = address.trim();
+            var port = $("#row-"+i).find("#port").html();
+            var key = address+"-"+port;
+
+            if (status_map[key] == null) {
+                $("#row-"+i).find("#status").text("OFFLINE");
+                $("#row-"+i).find("#shutdown").prop('disabled',true).css('opacity',0.5);
+                $("#row-"+i).find("#queriesCount").text("");
+            }
+            else {
+                if( status_map[key] == "ONLINE") {
+                    $("#row-"+i).find("#status").text(status_map[key]);
+                }
+                else {
+                    fillQueryCount(address,i);
+                    $("#row-"+i).find("#status").text(status_map[key]);
+                }
+            }
+          }
+      }
+      function fillQueryCount(address,row_id) {
+          url = "http://"+address+":"+portNum+"/queriesCount";
+          var result = $.ajax({
+                        type: 'GET',
+                        url: url,
+                        complete: function(data) {
+                              queries = data.responseJSON["queriesCount"];
+                              fragments = data.responseJSON["fragmentsCount"];
+                              $("#row-"+row_id).find("#queriesCount").text(queries+" queries and "+fragments+" fragments remaining before shutting down");
+                              }
+                        });
+      }
+      function shutdown(address,button) {
+          url = "http://"+address+":"+portNum+"/gracefulShutdown";
+          var result = $.ajax({
+                type: 'POST',
+                url: url,
+                contentType : 'text/plain',
+                complete: function(data) {
+                    alert(data.responseJSON["response"]);
+                    button.prop('disabled',true).css('opacity',0.5);
+                }
+          });
+      }
+    </script>
 </#macro>
 
 <@page_html/>

http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java
index c30cb09..37aa1db 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java
@@ -248,7 +248,7 @@ public class TestMetadataProvider extends BaseTestQuery {
 
     assertEquals(RequestStatus.OK, resp.getStatus());
     List<ColumnMetadata> columns = resp.getColumnsList();
-    assertEquals(92, columns.size());
+    assertEquals(93, columns.size());
     // too many records to verify the output.
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
index f0653f7..a4d62d4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
@@ -404,6 +404,25 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable {
   }
 
   /**
+   * Shutdown the drillbit given the name of the drillbit.
+   */
+  public void closeDrillbit(final String drillbitName) throws Exception {
+    Exception ex = null;
+    for (Drillbit bit : drillbits()) {
+      if (bit.equals(bits.get(drillbitName))) {
+        try {
+          bit.close();
+        } catch (Exception e) {
+          ex = ex == null ? e :ex;
+        }
+      }
+    }
+    if (ex != null) {
+      throw ex;
+    }
+  }
+
+  /**
    * Close a resource, suppressing the exception, and keeping
    * only the first exception that may occur. We assume that only
    * the first is useful, any others are probably down-stream effects