You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2017/11/29 11:15:40 UTC
[2/5] drill git commit: DRILL-4286: Graceful shutdown of drillbit
DRILL-4286: Graceful shutdown of drillbit
closes #921
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/5f044f2a
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/5f044f2a
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/5f044f2a
Branch: refs/heads/master
Commit: 5f044f2a6d0cd34a3d4107ece4c0637469f89b40
Parents: d3f8da2
Author: dvjyothsna <jy...@Skatkam-598.local>
Authored: Thu Aug 24 08:58:00 2017 -0700
Committer: Arina Ielchiieva <ar...@gmail.com>
Committed: Wed Nov 29 12:22:00 2017 +0200
----------------------------------------------------------------------
distribution/src/resources/drillbit.sh | 25 +-
.../org/apache/drill/exec/ExecConstants.java | 13 +
.../apache/drill/exec/client/DrillClient.java | 11 +-
.../drill/exec/coord/ClusterCoordinator.java | 24 +
.../coord/local/LocalClusterCoordinator.java | 56 ++-
.../exec/coord/zk/ZKClusterCoordinator.java | 83 +++-
.../exec/coord/zk/ZKRegistrationHandle.java | 14 +-
.../org/apache/drill/exec/ops/QueryContext.java | 4 +
.../org/apache/drill/exec/server/Drillbit.java | 69 ++-
.../drill/exec/server/DrillbitContext.java | 31 +-
.../drill/exec/server/DrillbitStateManager.java | 80 ++++
.../drill/exec/server/rest/DrillRestServer.java | 4 +-
.../drill/exec/server/rest/DrillRoot.java | 479 ++++++++++++-------
.../drill/exec/server/rest/WebServer.java | 10 +-
.../drill/exec/service/ServiceEngine.java | 2 +
.../drill/exec/store/sys/DrillbitIterator.java | 18 +-
.../org/apache/drill/exec/work/WorkManager.java | 43 +-
.../apache/drill/exec/work/foreman/Foreman.java | 14 +-
.../src/main/resources/drill-module.conf | 10 +
.../java-exec/src/main/resources/rest/index.ftl | 110 ++++-
.../work/metadata/TestMetadataProvider.java | 2 +-
.../org/apache/drill/test/ClusterFixture.java | 19 +
.../apache/drill/test/TestGracefulShutdown.java | 250 ++++++++++
.../drill/exec/proto/CoordinationProtos.java | 214 ++++++++-
.../exec/proto/SchemaCoordinationProtos.java | 7 +
.../exec/proto/beans/DrillbitEndpoint.java | 54 +++
protocol/src/main/protobuf/Coordination.proto | 7 +
27 files changed, 1416 insertions(+), 237 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/distribution/src/resources/drillbit.sh
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drillbit.sh b/distribution/src/resources/drillbit.sh
index de7f21a..49b92ed 100755
--- a/distribution/src/resources/drillbit.sh
+++ b/distribution/src/resources/drillbit.sh
@@ -92,15 +92,18 @@ waitForProcessEnd()
{
pidKilled=$1
commandName=$2
+ kill_drillbit=$3
processedAt=`date +%s`
origcnt=${DRILL_STOP_TIMEOUT:-120}
while kill -0 $pidKilled > /dev/null 2>&1;
do
echo -n "."
sleep 1;
- # if process persists more than $DRILL_STOP_TIMEOUT (default 120 sec) no mercy
- if [ $(( `date +%s` - $processedAt )) -gt $origcnt ]; then
- break;
+ if [ "$kill_drillbit" = true ] ; then
+ # if process persists more than $DRILL_STOP_TIMEOUT (default 120 sec) no mercy
+ if [ $(( `date +%s` - $processedAt )) -gt $origcnt ]; then
+ break;
+ fi
fi
done
echo
@@ -155,6 +158,7 @@ start_bit ( )
stop_bit ( )
{
+ kill_drillbit=$1
if [ -f $pid ]; then
pidToKill=`cat $pid`
# kill -0 == see if the PID exists
@@ -162,7 +166,7 @@ stop_bit ( )
echo "Stopping $command"
echo "`date` Terminating $command pid $pidToKill" >> "$DRILLBIT_LOG_PATH"
kill $pidToKill > /dev/null 2>&1
- waitForProcessEnd $pidToKill $command
+ waitForProcessEnd $pidToKill $command $kill_drillbit
retval=0
else
retval=$?
@@ -199,7 +203,18 @@ case $startStopStatus in
;;
(stop)
- stop_bit
+ kill_drillbit=true
+ stop_bit $kill_drillbit
+ exit $?
+ ;;
+
+# Shutdown the drillbit gracefully without disrupting the in-flight queries.
+# In this case, if there are any long running queries the drillbit will take a
+# little longer to shutdown. Incase if the user wishes to shutdown immediately
+# they can issue stop instead of graceful_stop.
+(graceful_stop)
+ kill_drillbit=false
+ stop_bit $kill_drillbit
exit $?
;;
http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 89b4b48..52aa52d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -602,4 +602,17 @@ public final class ExecConstants {
public static String bootDefaultFor(String name) {
return OPTION_DEFAULTS_ROOT + name;
}
+ /**
+ * Boot-time config option provided to modify duration of the grace period.
+ * Grace period is the amount of time where the drillbit accepts work after
+ * the shutdown request is triggered. The primary use of grace period is to
+ * avoid the race conditions caused by zookeeper delay in updating the state
+ * information of the drillbit that is shutting down. So, it is advisable
+ * to have a grace period that is atleast twice the amount of zookeeper
+ * refresh time.
+ */
+ public static final String GRACE_PERIOD = "drill.exec.grace_period_ms";
+
+ public static final String DRILL_PORT_HUNT = "drill.exec.port_hunt";
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 84b34a7..248058f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -329,7 +329,10 @@ public class DrillClient implements Closeable, ConnectionThrottle {
throw new RpcException("Failure setting up ZK for client.", e);
}
}
- endpoints.addAll(clusterCoordinator.getAvailableEndpoints());
+ // Gets the drillbit endpoints that are ONLINE and excludes the drillbits that are
+ // in QUIESCENT state. This avoids the clients connecting to drillbits that are
+ // shutting down thereby avoiding reducing the chances of query failures.
+ endpoints.addAll(clusterCoordinator.getOnlineEndPoints());
// Make sure we have at least one endpoint in the list
checkState(!endpoints.isEmpty(), "No active Drillbit endpoint found from ZooKeeper. Check connection parameters?");
}
@@ -418,7 +421,10 @@ public class DrillClient implements Closeable, ConnectionThrottle {
retry--;
try {
Thread.sleep(this.reconnectDelay);
- final ArrayList<DrillbitEndpoint> endpoints = new ArrayList<>(clusterCoordinator.getAvailableEndpoints());
+ // Gets the drillbit endpoints that are ONLINE and excludes the drillbits that are
+ // in QUIESCENT state. This avoids the clients connecting to drillbits that are
+ // shutting down thereby reducing the chances of query failures.
+ final ArrayList<DrillbitEndpoint> endpoints = new ArrayList<>(clusterCoordinator.getOnlineEndPoints());
if (endpoints.isEmpty()) {
continue;
}
@@ -434,6 +440,7 @@ public class DrillClient implements Closeable, ConnectionThrottle {
private void connect(DrillbitEndpoint endpoint) throws RpcException {
client.connect(endpoint, properties, getUserCredentials());
+ logger.info("Foreman drillbit is" + endpoint.getAddress());
}
public BufferAllocator getAllocator() {
http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java
index e758d6f..32b1633 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.drill.exec.coord.store.TransientStore;
import org.apache.drill.exec.coord.store.TransientStoreConfig;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State;
import org.apache.drill.exec.work.foreman.DrillbitStatusListener;
/**
@@ -60,7 +61,26 @@ public abstract class ClusterCoordinator implements AutoCloseable {
*/
public abstract Collection<DrillbitEndpoint> getAvailableEndpoints();
+ /**
+ * Get a collection of ONLINE drillbit endpoints by excluding the drillbits
+ * that are in QUIESCENT state (drillbits that are shutting down). Primarily used by the planner
+ * to plan queries only on ONLINE drillbits and used by the client during initial connection
+ * phase to connect to a drillbit (foreman)
+ * @return A collection of ONLINE endpoints
+ */
+
+ public abstract Collection<DrillbitEndpoint> getOnlineEndPoints();
+
+ public abstract RegistrationHandle update(RegistrationHandle handle, State state);
+
public interface RegistrationHandle {
+ /**
+ * Get the drillbit endpoint associated with the registration handle
+ * @return drillbit endpoint
+ */
+ public abstract DrillbitEndpoint getEndPoint();
+
+ public abstract void setEndPoint(DrillbitEndpoint endpoint);
}
public abstract DistributedSemaphore getSemaphore(String name, int maximumLeases);
@@ -108,4 +128,8 @@ public abstract class ClusterCoordinator implements AutoCloseable {
listeners.remove(listener);
}
+ public boolean isDrillbitInState(DrillbitEndpoint endpoint, DrillbitEndpoint.State state) {
+ return (!endpoint.hasState() || endpoint.getState().equals(state));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java
index 8c13c42..86bc606 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.coord.local;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
@@ -33,6 +34,7 @@ import org.apache.drill.exec.coord.store.TransientStore;
import org.apache.drill.exec.coord.store.TransientStoreConfig;
import org.apache.drill.exec.coord.store.TransientStoreFactory;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State;
import com.google.common.collect.Maps;
@@ -69,9 +71,10 @@ public class LocalClusterCoordinator extends ClusterCoordinator {
}
@Override
- public RegistrationHandle register(final DrillbitEndpoint data) {
+ public RegistrationHandle register( DrillbitEndpoint data) {
logger.debug("Endpoint registered {}.", data);
- final Handle h = new Handle();
+ final Handle h = new Handle(data);
+ data = data.toBuilder().setState(State.ONLINE).build();
endpoints.put(h, data);
return h;
}
@@ -85,13 +88,62 @@ public class LocalClusterCoordinator extends ClusterCoordinator {
endpoints.remove(handle);
}
+ /**
+ * Update drillbit endpoint state. Drillbit advertises its
+ * state. State information is used during planning and initial
+ * client connection phases.
+ */
+ @Override
+ public RegistrationHandle update(RegistrationHandle handle, State state) {
+ DrillbitEndpoint endpoint = handle.getEndPoint();
+ endpoint = endpoint.toBuilder().setState(state).build();
+ handle.setEndPoint(endpoint);
+ endpoints.put(handle,endpoint);
+ return handle;
+ }
+
@Override
public Collection<DrillbitEndpoint> getAvailableEndpoints() {
return endpoints.values();
}
+ /**
+ * Get a collection of ONLINE Drillbit endpoints by excluding the drillbits
+ * that are in QUIESCENT state (drillbits shutting down). Primarily used by the planner
+ * to plan queries only on ONLINE drillbits and used by the client during initial connection
+ * phase to connect to a drillbit (foreman)
+ * @return A collection of ONLINE endpoints
+ */
+ @Override
+ public Collection<DrillbitEndpoint> getOnlineEndPoints() {
+ Collection<DrillbitEndpoint> runningEndPoints = new ArrayList<>();
+ for (DrillbitEndpoint endpoint: endpoints.values()){
+ if(isDrillbitInState(endpoint, State.ONLINE)) {
+ runningEndPoints.add(endpoint);
+ }
+ }
+ return runningEndPoints;
+ }
+
private class Handle implements RegistrationHandle {
private final UUID id = UUID.randomUUID();
+ private DrillbitEndpoint drillbitEndpoint;
+
+ /**
+ * Get the drillbit endpoint associated with the registration handle
+ * @return drillbit endpoint
+ */
+ public DrillbitEndpoint getEndPoint() {
+ return drillbitEndpoint;
+ }
+
+ public void setEndPoint(DrillbitEndpoint endpoint) {
+ this.drillbitEndpoint = endpoint;
+ }
+
+ private Handle(DrillbitEndpoint data) {
+ drillbitEndpoint = data;
+ }
@Override
public int hashCode() {
http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java
index b14a151..472bc3d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java
@@ -23,15 +23,16 @@ import static com.google.common.collect.Collections2.transform;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
+import java.util.ArrayList;
import java.util.Set;
+import java.util.HashSet;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import com.google.common.collect.Lists;
+import org.apache.commons.collections.keyvalue.MultiKey;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -54,6 +55,7 @@ import org.apache.drill.exec.coord.store.TransientStore;
import org.apache.drill.exec.coord.store.TransientStoreConfig;
import org.apache.drill.exec.coord.store.TransientStoreFactory;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State;
import com.google.common.base.Function;
@@ -70,7 +72,10 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
private final CountDownLatch initialConnection = new CountDownLatch(1);
private final TransientStoreFactory factory;
private ServiceCache<DrillbitEndpoint> serviceCache;
+ private DrillbitEndpoint endpoint;
+ // endpointsMap maps Multikey( comprises of endoint address and port) to Drillbit endpoints
+ private ConcurrentHashMap<MultiKey, DrillbitEndpoint> endpointsMap = new ConcurrentHashMap<MultiKey,DrillbitEndpoint>();
private static final Pattern ZK_COMPLEX_STRING = Pattern.compile("(^.*?)/(.*)/([^/]*)$");
public ZKClusterCoordinator(DrillConfig config) throws IOException{
@@ -169,9 +174,10 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
@Override
public RegistrationHandle register(DrillbitEndpoint data) {
try {
+ data = data.toBuilder().setState(State.ONLINE).build();
ServiceInstance<DrillbitEndpoint> serviceInstance = newServiceInstance(data);
discovery.registerService(serviceInstance);
- return new ZKRegistrationHandle(serviceInstance.getId());
+ return new ZKRegistrationHandle(serviceInstance.getId(),data);
} catch (Exception e) {
throw propagate(e);
}
@@ -200,11 +206,50 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
}
}
+ /**
+ * Update drillbit endpoint state. Drillbit advertises its
+ * state in Zookeeper when a shutdown request of drillbit is
+ * triggered. State information is used during planning and
+ * initial client connection phases.
+ */
+ public RegistrationHandle update(RegistrationHandle handle, State state) {
+ ZKRegistrationHandle h = (ZKRegistrationHandle) handle;
+ try {
+ endpoint = h.endpoint.toBuilder().setState(state).build();
+ ServiceInstance<DrillbitEndpoint> serviceInstance = ServiceInstance.<DrillbitEndpoint>builder()
+ .name(serviceName)
+ .id(h.id)
+ .payload(endpoint).build();
+ discovery.updateService(serviceInstance);
+ } catch (Exception e) {
+ propagate(e);
+ }
+ return handle;
+ }
+
@Override
public Collection<DrillbitEndpoint> getAvailableEndpoints() {
return this.endpoints;
}
+ /*
+ * Get a collection of ONLINE Drillbit endpoints by excluding the drillbits
+ * that are in QUIESCENT state (drillbits shutting down). Primarily used by the planner
+ * to plan queries only on ONLINE drillbits and used by the client during initial connection
+ * phase to connect to a drillbit (foreman)
+ * @return A collection of ONLINE endpoints
+ */
+ @Override
+ public Collection<DrillbitEndpoint> getOnlineEndPoints() {
+ Collection<DrillbitEndpoint> runningEndPoints = new ArrayList<>();
+ for (DrillbitEndpoint endpoint: endpoints){
+ if(isDrillbitInState(endpoint, State.ONLINE)) {
+ runningEndPoints.add(endpoint);
+ }
+ }
+ logger.debug("Online endpoints in ZK are" + runningEndPoints.toString());
+ return runningEndPoints;
+ }
@Override
public DistributedSemaphore getSemaphore(String name, int maximumLeases) {
@@ -219,6 +264,7 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
private synchronized void updateEndpoints() {
try {
+ // All active bits in the Zookeeper
Collection<DrillbitEndpoint> newDrillbitSet =
transform(discovery.queryForInstances(serviceName),
new Function<ServiceInstance<DrillbitEndpoint>, DrillbitEndpoint>() {
@@ -229,27 +275,42 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
});
// set of newly dead bits : original bits - new set of active bits.
- Set<DrillbitEndpoint> unregisteredBits = new HashSet<>(endpoints);
- unregisteredBits.removeAll(newDrillbitSet);
-
+ Set<DrillbitEndpoint> unregisteredBits = new HashSet<>();
// Set of newly live bits : new set of active bits - original bits.
- Set<DrillbitEndpoint> registeredBits = new HashSet<>(newDrillbitSet);
- registeredBits.removeAll(endpoints);
+ Set<DrillbitEndpoint> registeredBits = new HashSet<>();
- endpoints = newDrillbitSet;
+ // Updates the endpoints map if there is a change in state of the endpoint or with the addition
+ // of new drillbit endpoints. Registered endpoints is set to newly live drillbit endpoints.
+ for ( DrillbitEndpoint endpoint : newDrillbitSet) {
+ String endpointAddress = endpoint.getAddress();
+ int endpointPort = endpoint.getUserPort();
+ if (! endpointsMap.containsKey(new MultiKey(endpointAddress, endpointPort))) {
+ registeredBits.add(endpoint);
+ }
+ endpointsMap.put(new MultiKey(endpointAddress, endpointPort),endpoint);
+ }
+ // Remove all the endpoints that are newly dead
+ for ( MultiKey key: endpointsMap.keySet()) {
+ if(!newDrillbitSet.contains(endpointsMap.get(key))) {
+ unregisteredBits.add(endpointsMap.get(key));
+ endpointsMap.remove(key);
+ }
+ }
+ endpoints = endpointsMap.values();
if (logger.isDebugEnabled()) {
StringBuilder builder = new StringBuilder();
builder.append("Active drillbit set changed. Now includes ");
builder.append(newDrillbitSet.size());
builder.append(" total bits. New active drillbits:\n");
- builder.append("Address | User Port | Control Port | Data Port | Version |\n");
+ builder.append("Address | User Port | Control Port | Data Port | Version | State\n");
for (DrillbitEndpoint bit: newDrillbitSet) {
builder.append(bit.getAddress()).append(" | ");
builder.append(bit.getUserPort()).append(" | ");
builder.append(bit.getControlPort()).append(" | ");
builder.append(bit.getDataPort()).append(" | ");
builder.append(bit.getVersion()).append(" |");
+ builder.append(bit.getState()).append(" | ");
builder.append('\n');
}
logger.debug(builder.toString());
http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKRegistrationHandle.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKRegistrationHandle.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKRegistrationHandle.java
index f0c465f..fca3296 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKRegistrationHandle.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKRegistrationHandle.java
@@ -18,15 +18,27 @@
package org.apache.drill.exec.coord.zk;
import org.apache.drill.exec.coord.ClusterCoordinator.RegistrationHandle;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
public class ZKRegistrationHandle implements RegistrationHandle {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZKRegistrationHandle.class);
public final String id;
+ public DrillbitEndpoint endpoint;
- public ZKRegistrationHandle(String id) {
+ public DrillbitEndpoint getEndPoint() {
+ return endpoint;
+ }
+
+ @Override
+ public void setEndPoint(DrillbitEndpoint endpoint) {
+ this.endpoint = endpoint;
+ }
+
+ public ZKRegistrationHandle(String id, DrillbitEndpoint endpoint) {
super();
this.id = id;
+ this.endpoint = endpoint;
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index 125dfac..eb32bc6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -213,6 +213,10 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext, Schem
return drillbitContext.getBits();
}
+ public Collection<DrillbitEndpoint> getOnlineEndpoints() {
+ return drillbitContext.getBits();
+ }
+
public DrillConfig getConfig() {
return drillbitContext.getConfig();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index a333ff2..4144da0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.StackTrace;
+import org.apache.drill.common.concurrent.ExtendedLatch;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.map.CaseInsensitiveMap;
import org.apache.drill.common.scanner.ClassPathScanner;
@@ -32,6 +33,7 @@ import org.apache.drill.exec.coord.ClusterCoordinator.RegistrationHandle;
import org.apache.drill.exec.coord.zk.ZKClusterCoordinator;
import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.server.DrillbitStateManager.DrillbitState;
import org.apache.drill.exec.server.options.OptionDefinition;
import org.apache.drill.exec.server.options.OptionValue;
import org.apache.drill.exec.server.options.OptionValue.OptionScope;
@@ -48,6 +50,7 @@ import org.apache.drill.exec.util.GuavaPatcher;
import org.apache.drill.exec.work.WorkManager;
import org.apache.zookeeper.Environment;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
@@ -77,6 +80,23 @@ public class Drillbit implements AutoCloseable {
private final WorkManager manager;
private final BootStrapContext context;
private final WebServer webServer;
+ private final int gracePeriod;
+ private DrillbitStateManager stateManager;
+ private boolean quiescentMode;
+ private boolean forcefulShutdown = false;
+
+ public void setQuiescentMode(boolean quiescentMode) {
+ this.quiescentMode = quiescentMode;
+ }
+
+ public void setForcefulShutdown(boolean forcefulShutdown) {
+ this.forcefulShutdown = forcefulShutdown;
+ }
+
+ public RegistrationHandle getRegistrationHandle() {
+ return registrationHandle;
+ }
+
private RegistrationHandle registrationHandle;
private volatile StoragePluginRegistry storageRegistry;
private final PersistentStoreProvider profileStoreProvider;
@@ -110,13 +130,15 @@ public class Drillbit implements AutoCloseable {
final CaseInsensitiveMap<OptionDefinition> definitions,
final RemoteServiceSet serviceSet,
final ScanResult classpathScan) throws Exception {
+ gracePeriod = config.getInt(ExecConstants.GRACE_PERIOD);
final Stopwatch w = Stopwatch.createStarted();
logger.debug("Construction started.");
- final boolean allowPortHunting = serviceSet != null;
+ boolean drillPortHunt = config.getBoolean(ExecConstants.DRILL_PORT_HUNT);
+ final boolean allowPortHunting = (serviceSet != null) || drillPortHunt;
context = new BootStrapContext(config, definitions, classpathScan);
manager = new WorkManager(context);
- webServer = new WebServer(context, manager);
+ webServer = new WebServer(context, manager, this);
boolean isDistributedMode = false;
if (serviceSet != null) {
coord = serviceSet.getCoordinator();
@@ -137,6 +159,7 @@ public class Drillbit implements AutoCloseable {
engine = new ServiceEngine(manager, context, allowPortHunting, isDistributedMode);
+ stateManager = new DrillbitStateManager(DrillbitState.STARTUP);
logger.info("Construction completed ({} ms).", w.elapsed(TimeUnit.MILLISECONDS));
}
@@ -152,6 +175,7 @@ public class Drillbit implements AutoCloseable {
final Stopwatch w = Stopwatch.createStarted();
logger.debug("Startup begun.");
coord.start(10000);
+ stateManager.setState(DrillbitState.ONLINE);
storeProvider.start();
if (profileStoreProvider != storeProvider) {
profileStoreProvider.start();
@@ -176,18 +200,43 @@ public class Drillbit implements AutoCloseable {
logger.info("Startup completed ({} ms).", w.elapsed(TimeUnit.MILLISECONDS));
}
+ /*
+ Wait uninterruptibly
+ */
+ public void waitForGracePeriod() {
+ ExtendedLatch exitLatch = new ExtendedLatch();
+ exitLatch.awaitUninterruptibly(gracePeriod);
+ }
+
+ /*
+
+ */
+ public void shutdown() {
+ this.close();
+ }
+ /*
+ The drillbit is moved into Quiescent state and the drillbit waits for grace period amount of time.
+ Then drillbit moves into draining state and waits for all the queries and fragments to complete.
+ */
@Override
public synchronized void close() {
- // avoid complaints about double closing
- if (isClosed) {
+ if ( !stateManager.getState().equals(DrillbitState.ONLINE)) {
return;
}
final Stopwatch w = Stopwatch.createStarted();
logger.debug("Shutdown begun.");
-
- // wait for anything that is running to complete
- manager.waitToExit();
-
+ registrationHandle = coord.update(registrationHandle, State.QUIESCENT);
+ stateManager.setState(DrillbitState.GRACE);
+ waitForGracePeriod();
+ stateManager.setState(DrillbitState.DRAINING);
+ // wait for all the in-flight queries to finish
+ manager.waitToExit(this, forcefulShutdown);
+ //safe to exit
+ registrationHandle = coord.update(registrationHandle, State.OFFLINE);
+ stateManager.setState(DrillbitState.OFFLINE);
+ if(quiescentMode == true) {
+ return;
+ }
if (coord != null && registrationHandle != null) {
coord.unregister(registrationHandle);
}
@@ -219,8 +268,8 @@ public class Drillbit implements AutoCloseable {
logger.warn("Failure on close()", e);
}
- logger.info("Shutdown completed ({} ms).", w.elapsed(TimeUnit.MILLISECONDS));
- isClosed = true;
+ logger.info("Shutdown completed ({} ms).", w.elapsed(TimeUnit.MILLISECONDS) );
+ stateManager.setState(DrillbitState.SHUTDOWN);
}
private void javaPropertiesToSystemOptions() {
http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
index b8a8b1e..f65592b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
@@ -157,10 +157,39 @@ public class DrillbitContext implements AutoCloseable {
return context.getConfig();
}
- public Collection<DrillbitEndpoint> getBits() {
+ public Collection<DrillbitEndpoint> getAvailableBits() {
return coord.getAvailableEndpoints();
}
+ public Collection<DrillbitEndpoint> getBits() {
+ return coord.getOnlineEndPoints();
+ }
+
+ public boolean isOnline(DrillbitEndpoint endpoint) {
+ return endpoint.getState().equals(DrillbitEndpoint.State.ONLINE);
+ }
+
+ public boolean isForeman(DrillbitEndpoint endpoint) {
+ DrillbitEndpoint foreman = getEndpoint();
+ if(endpoint.getAddress().equals(foreman.getAddress()) &&
+ endpoint.getUserPort() == foreman.getUserPort()) {
+ return true;
+ }
+ return false;
+ }
+
+ public boolean isForemanOnline() {
+ Collection<DrillbitEndpoint> dbs = getAvailableBits();
+ for (DrillbitEndpoint db : dbs) {
+ if( isForeman(db)) {
+ if (isOnline(db)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
public BufferAllocator getAllocator() {
return context.getAllocator();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitStateManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitStateManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitStateManager.java
new file mode 100644
index 0000000..dfffce8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitStateManager.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.server;
+/*
+ State manager to manage the state of drillbit.
+ */
+public class DrillbitStateManager {
+
+ public enum DrillbitState {
+ STARTUP, ONLINE, GRACE, DRAINING, OFFLINE, SHUTDOWN
+ }
+
+ private DrillbitState currentState;
+
+ public DrillbitStateManager(DrillbitState currentState) {
+ this.currentState = currentState;
+ }
+
+ public DrillbitState getState() {
+ return currentState;
+ }
+
+ public void setState(DrillbitState newState) {
+ switch (newState) {
+ case ONLINE:
+ if (currentState == DrillbitState.STARTUP) {
+ currentState = newState;
+ } else {
+ throw new IllegalStateException("Cannot set drillbit to" + newState + "from" + currentState);
+ }
+ break;
+ case GRACE:
+ if (currentState == DrillbitState.ONLINE) {
+ currentState = newState;
+ } else {
+ throw new IllegalStateException("Cannot set drillbit to" + newState + "from" + currentState);
+ }
+ break;
+ case DRAINING:
+ if (currentState == DrillbitState.GRACE) {
+ currentState = newState;
+ } else {
+ throw new IllegalStateException("Cannot set drillbit to" + newState + "from" + currentState);
+ }
+ break;
+ case OFFLINE:
+ if (currentState == DrillbitState.DRAINING) {
+ currentState = newState;
+ } else {
+ throw new IllegalStateException("Cannot set drillbit to" + newState + "from" + currentState);
+ }
+ break;
+ case SHUTDOWN:
+ if (currentState == DrillbitState.OFFLINE) {
+ currentState = newState;
+ } else {
+ throw new IllegalStateException("Cannot set drillbit to" + newState + "from" + currentState);
+ }
+ break;
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
index 1545847..89141d7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
@@ -37,6 +37,7 @@ import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.rpc.user.UserSession;
+import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.rest.WebUserConnection.AnonWebUserConnection;
import org.apache.drill.exec.server.rest.auth.AuthDynamicFeature;
@@ -73,7 +74,7 @@ import java.util.List;
public class DrillRestServer extends ResourceConfig {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillRestServer.class);
- public DrillRestServer(final WorkManager workManager, final ServletContext servletContext) {
+ public DrillRestServer(final WorkManager workManager, final ServletContext servletContext, final Drillbit drillbit) {
register(DrillRoot.class);
register(StatusResources.class);
register(StorageResources.class);
@@ -120,6 +121,7 @@ public class DrillRestServer extends ResourceConfig {
register(new AbstractBinder() {
@Override
protected void configure() {
+ bind(drillbit).to(Drillbit.class);
bind(workManager).to(WorkManager.class);
bind(executor).to(EventExecutor.class);
bind(workManager.getContext().getLpPersistence().getMapper()).to(ObjectMapper.class);
http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java
index 55bfca4..da1d2fb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java
@@ -18,12 +18,16 @@
package org.apache.drill.exec.server.rest;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
import javax.annotation.security.PermitAll;
import javax.inject.Inject;
import javax.ws.rs.GET;
+import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
import javax.ws.rs.core.SecurityContext;
import javax.xml.bind.annotation.XmlRootElement;
@@ -33,6 +37,7 @@ import com.google.common.collect.Sets;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.rest.DrillRestServer.UserAuthEnabled;
@@ -55,9 +60,18 @@ import com.fasterxml.jackson.annotation.JsonCreator;
public class DrillRoot {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillRoot.class);
- @Inject UserAuthEnabled authEnabled;
- @Inject WorkManager work;
- @Inject SecurityContext sc;
+ @Inject
+ UserAuthEnabled authEnabled;
+ @Inject
+ WorkManager work;
+ @Inject
+ SecurityContext sc;
+ @Inject
+ Drillbit drillbit;
+
+ public enum ShutdownMode {
+ forcefulShutdown, gracefulShutdown, quiescent
+ }
@GET
@Produces(MediaType.TEXT_HTML)
@@ -65,6 +79,90 @@ public class DrillRoot {
return ViewableWithPermissions.create(authEnabled.get(), "/rest/index.ftl", sc, getClusterInfoJSON());
}
+
+ @SuppressWarnings("resource")
+ @GET
+ @Path("/state")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getDrillbitStatus() {
+ Collection<DrillbitInfo> drillbits = getClusterInfoJSON().getDrillbits();
+ Map<String, String> drillStatusMap = new HashMap<String, String>();
+ for (DrillbitInfo drillbit : drillbits) {
+ drillStatusMap.put(drillbit.getAddress() + "-" + drillbit.getUserPort(), drillbit.getState());
+ }
+ Response response = setResponse(drillStatusMap);
+ return response;
+ }
+
+ @SuppressWarnings("resource")
+ @GET
+ @Path("/gracePeriod")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Map<String, Integer> getGracePeriod() {
+
+ final DrillConfig config = work.getContext().getConfig();
+ final int gracePeriod = config.getInt(ExecConstants.GRACE_PERIOD);
+ Map<String, Integer> gracePeriodMap = new HashMap<String, Integer>();
+ gracePeriodMap.put("graceperiod", gracePeriod);
+ return gracePeriodMap;
+ }
+
+ @SuppressWarnings("resource")
+ @GET
+ @Path("/portNum")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Map<String, Integer> getPortNum() {
+
+ final DrillConfig config = work.getContext().getConfig();
+ final int port = config.getInt(ExecConstants.HTTP_PORT);
+ Map<String, Integer> portMap = new HashMap<String, Integer>();
+ portMap.put("port", port);
+ return portMap;
+ }
+
+ @SuppressWarnings("resource")
+ @GET
+ @Path("/queriesCount")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getRemainingQueries() {
+ Map<String, Integer> queriesInfo = new HashMap<String, Integer>();
+ queriesInfo = work.getRemainingQueries();
+ Response response = setResponse(queriesInfo);
+ return response;
+ }
+
+ @SuppressWarnings("resource")
+ @POST
+ @Path("/gracefulShutdown")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response shutdownDrillbit() throws Exception {
+ String resp = "Graceful Shutdown request is triggered";
+ return shutdown(resp);
+
+ }
+
+ @SuppressWarnings("resource")
+ @POST
+ @Path("/shutdown")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response ShutdownForcefully() throws Exception {
+ drillbit.setForcefulShutdown(true);
+ String resp = "Forceful shutdown request is triggered";
+ return shutdown(resp);
+
+ }
+
+ @SuppressWarnings("resource")
+ @POST
+ @Path("/quiescent")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response drillbitToQuiescentMode() throws Exception {
+ drillbit.setQuiescentMode(true);
+ String resp = "Request to put drillbit in Quiescent mode is triggered";
+ return shutdown(resp);
+ }
+
+
@SuppressWarnings("resource")
@GET
@Path("/cluster.json")
@@ -79,8 +177,8 @@ public class DrillRoot {
final DrillConfig config = dbContext.getConfig();
final boolean userEncryptionEnabled =
- config.getBoolean(ExecConstants.USER_ENCRYPTION_SASL_ENABLED) ||
- config .getBoolean(ExecConstants.USER_SSL_ENABLED);
+ config.getBoolean(ExecConstants.USER_ENCRYPTION_SASL_ENABLED) ||
+ config .getBoolean(ExecConstants.USER_SSL_ENABLED);
final boolean bitEncryptionEnabled = config.getBoolean(ExecConstants.BIT_ENCRYPTION_SASL_ENABLED);
// If the user is logged in and is admin user then show the admin user info
// For all other cases the user info need-not or should-not be displayed
@@ -94,7 +192,7 @@ public class DrillRoot {
final boolean shouldShowUserInfo = isUserLoggedIn &&
((DrillUserPrincipal)sc.getUserPrincipal()).isAdminUser();
- for (DrillbitEndpoint endpoint : work.getContext().getBits()) {
+ for (DrillbitEndpoint endpoint : work.getContext().getAvailableBits()) {
final DrillbitInfo drillbit = new DrillbitInfo(endpoint,
currentDrillbit.equals(endpoint),
currentVersion.equals(endpoint.getVersion()));
@@ -107,219 +205,254 @@ public class DrillRoot {
" userLoggedIn " + isUserLoggedIn + " shouldShowUserInfo: " + shouldShowUserInfo );
return new ClusterInfo(drillbits, currentVersion, mismatchedVersions,
- userEncryptionEnabled, bitEncryptionEnabled, processUser, processUserGroups, adminUsers,
- adminUserGroups, shouldShowUserInfo, QueueInfo.build(dbContext.getResourceManager()));
+ userEncryptionEnabled, bitEncryptionEnabled, processUser, processUserGroups, adminUsers,
+ adminUserGroups, shouldShowUserInfo, QueueInfo.build(dbContext.getResourceManager()));
}
- /**
- * Pretty-printing wrapper class around the ZK-based queue summary.
- */
+ public Response setResponse(Map entity) {
+ return Response.ok()
+ .entity(entity)
+ .header("Access-Control-Allow-Origin", "*")
+ .header("Access-Control-Allow-Methods", "GET, POST, DELETE, PUT")
+ .header("Access-Control-Allow-Credentials","true")
+ .allow("OPTIONS").build();
+ }
- @XmlRootElement
- public static class QueueInfo {
- private final ZKQueueInfo zkQueueInfo;
+ public Response shutdown(String resp) throws Exception {
+ Map<String, String> shutdownInfo = new HashMap<String, String>();
+ new Thread(new Runnable() {
+ public void run() {
+ try {
+ drillbit.close();
+ } catch (Exception e) {
+ logger.error("Request to shutdown drillbit failed", e);
+ }
+ }
+ }).start();
+ shutdownInfo.put("response",resp);
+ Response response = setResponse(shutdownInfo);
+ return response;
+ }
- public static QueueInfo build(ResourceManager rm) {
- // Consider queues enabled only if the ZK-based queues are in use.
+/**
+ * Pretty-printing wrapper class around the ZK-based queue summary.
+ */
- ThrottledResourceManager throttledRM = null;
- if (rm != null && rm instanceof DynamicResourceManager) {
- DynamicResourceManager dynamicRM = (DynamicResourceManager) rm;
- rm = dynamicRM.activeRM();
- }
- if (rm != null && rm instanceof ThrottledResourceManager) {
- throttledRM = (ThrottledResourceManager) rm;
- }
- if (throttledRM == null) {
- return new QueueInfo(null);
- }
- QueryQueue queue = throttledRM.queue();
- if (queue == null || !(queue instanceof DistributedQueryQueue)) {
- return new QueueInfo(null);
- }
+@XmlRootElement
+public static class QueueInfo {
+ private final ZKQueueInfo zkQueueInfo;
- return new QueueInfo(((DistributedQueryQueue) queue).getInfo());
- }
+ public static QueueInfo build(ResourceManager rm) {
+
+ // Consider queues enabled only if the ZK-based queues are in use.
- @JsonCreator
- public QueueInfo(ZKQueueInfo queueInfo) {
- zkQueueInfo = queueInfo;
+ ThrottledResourceManager throttledRM = null;
+ if (rm != null && rm instanceof DynamicResourceManager) {
+ DynamicResourceManager dynamicRM = (DynamicResourceManager) rm;
+ rm = dynamicRM.activeRM();
+ }
+ if (rm != null && rm instanceof ThrottledResourceManager) {
+ throttledRM = (ThrottledResourceManager) rm;
+ }
+ if (throttledRM == null) {
+ return new QueueInfo(null);
+ }
+ QueryQueue queue = throttledRM.queue();
+ if (queue == null || !(queue instanceof DistributedQueryQueue)) {
+ return new QueueInfo(null);
}
- public boolean isEnabled() { return zkQueueInfo != null; }
+ return new QueueInfo(((DistributedQueryQueue) queue).getInfo());
+ }
- public int smallQueueSize() {
- return isEnabled() ? zkQueueInfo.smallQueueSize : 0;
- }
+ @JsonCreator
+ public QueueInfo(ZKQueueInfo queueInfo) {
+ zkQueueInfo = queueInfo;
+ }
- public int largeQueueSize() {
- return isEnabled() ? zkQueueInfo.largeQueueSize : 0;
- }
+ public boolean isEnabled() { return zkQueueInfo != null; }
- public String threshold() {
- return isEnabled()
- ? Double.toString(zkQueueInfo.queueThreshold)
- : "N/A";
- }
+ public int smallQueueSize() {
+ return isEnabled() ? zkQueueInfo.smallQueueSize : 0;
+ }
- public String smallQueueMemory() {
- return isEnabled()
- ? toBytes(zkQueueInfo.memoryPerSmallQuery)
- : "N/A";
- }
+ public int largeQueueSize() {
+ return isEnabled() ? zkQueueInfo.largeQueueSize : 0;
+ }
- public String largeQueueMemory() {
- return isEnabled()
- ? toBytes(zkQueueInfo.memoryPerLargeQuery)
- : "N/A";
- }
+ public String threshold() {
+ return isEnabled()
+ ? Double.toString(zkQueueInfo.queueThreshold)
+ : "N/A";
+ }
- public String totalMemory() {
- return isEnabled()
- ? toBytes(zkQueueInfo.memoryPerNode)
- : "N/A";
- }
+ public String smallQueueMemory() {
+ return isEnabled()
+ ? toBytes(zkQueueInfo.memoryPerSmallQuery)
+ : "N/A";
+ }
- private final long ONE_MB = 1024 * 1024;
+ public String largeQueueMemory() {
+ return isEnabled()
+ ? toBytes(zkQueueInfo.memoryPerLargeQuery)
+ : "N/A";
+ }
- private String toBytes(long memory) {
- if (memory < 10 * ONE_MB) {
- return String.format("%,d bytes", memory);
- } else {
- return String.format("%,.0f MB", memory * 1.0D / ONE_MB);
- }
- }
+ public String totalMemory() {
+ return isEnabled()
+ ? toBytes(zkQueueInfo.memoryPerNode)
+ : "N/A";
}
- @XmlRootElement
- public static class ClusterInfo {
- private final Collection<DrillbitInfo> drillbits;
- private final String currentVersion;
- private final Collection<String> mismatchedVersions;
- private final boolean userEncryptionEnabled;
- private final boolean bitEncryptionEnabled;
- private final String adminUsers;
- private final String adminUserGroups;
- private final String processUser;
- private final String processUserGroups;
- private final boolean shouldShowUserInfo;
- private final QueueInfo queueInfo;
-
- @JsonCreator
- public ClusterInfo(Collection<DrillbitInfo> drillbits,
- String currentVersion,
- Collection<String> mismatchedVersions,
- boolean userEncryption,
- boolean bitEncryption,
- String processUser,
- String processUserGroups,
- String adminUsers,
- String adminUserGroups,
- boolean shouldShowUserInfo,
- QueueInfo queueInfo) {
- this.drillbits = Sets.newTreeSet(drillbits);
- this.currentVersion = currentVersion;
- this.mismatchedVersions = Sets.newTreeSet(mismatchedVersions);
- this.userEncryptionEnabled = userEncryption;
- this.bitEncryptionEnabled = bitEncryption;
- this.processUser = processUser;
- this.processUserGroups = processUserGroups;
- this.adminUsers = adminUsers;
- this.adminUserGroups = adminUserGroups;
- this.shouldShowUserInfo = shouldShowUserInfo;
- this.queueInfo = queueInfo;
- }
+ private final long ONE_MB = 1024 * 1024;
- public Collection<DrillbitInfo> getDrillbits() {
- return Sets.newTreeSet(drillbits);
+ private String toBytes(long memory) {
+ if (memory < 10 * ONE_MB) {
+ return String.format("%,d bytes", memory);
+ } else {
+ return String.format("%,.0f MB", memory * 1.0D / ONE_MB);
}
+ }
+}
- public String getCurrentVersion() {
- return currentVersion;
- }
+@XmlRootElement
+public static class ClusterInfo {
+ private final Collection<DrillbitInfo> drillbits;
+ private final String currentVersion;
+ private final Collection<String> mismatchedVersions;
+ private final boolean userEncryptionEnabled;
+ private final boolean bitEncryptionEnabled;
+ private final String adminUsers;
+ private final String adminUserGroups;
+ private final String processUser;
+ private final String processUserGroups;
+ private final boolean shouldShowUserInfo;
+ private final QueueInfo queueInfo;
+
+ @JsonCreator
+ public ClusterInfo(Collection<DrillbitInfo> drillbits,
+ String currentVersion,
+ Collection<String> mismatchedVersions,
+ boolean userEncryption,
+ boolean bitEncryption,
+ String processUser,
+ String processUserGroups,
+ String adminUsers,
+ String adminUserGroups,
+ boolean shouldShowUserInfo,
+ QueueInfo queueInfo) {
+ this.drillbits = Sets.newTreeSet(drillbits);
+ this.currentVersion = currentVersion;
+ this.mismatchedVersions = Sets.newTreeSet(mismatchedVersions);
+ this.userEncryptionEnabled = userEncryption;
+ this.bitEncryptionEnabled = bitEncryption;
+ this.processUser = processUser;
+ this.processUserGroups = processUserGroups;
+ this.adminUsers = adminUsers;
+ this.adminUserGroups = adminUserGroups;
+ this.shouldShowUserInfo = shouldShowUserInfo;
+ this.queueInfo = queueInfo;
+ }
- public Collection<String> getMismatchedVersions() {
- return Sets.newTreeSet(mismatchedVersions);
- }
+ public Collection<DrillbitInfo> getDrillbits() {
+ return Sets.newTreeSet(drillbits);
+ }
+
+ public String getCurrentVersion() {
+ return currentVersion;
+ }
+
+ public Collection<String> getMismatchedVersions() {
+ return Sets.newTreeSet(mismatchedVersions);
+ }
+
+ public boolean isUserEncryptionEnabled() { return userEncryptionEnabled; }
- public boolean isUserEncryptionEnabled() { return userEncryptionEnabled; }
+ public boolean isBitEncryptionEnabled() { return bitEncryptionEnabled; }
- public boolean isBitEncryptionEnabled() { return bitEncryptionEnabled; }
+ public String getProcessUser() { return processUser; }
- public String getProcessUser() { return processUser; }
+ public String getProcessUserGroups() { return processUserGroups; }
- public String getProcessUserGroups() { return processUserGroups; }
+ public String getAdminUsers() { return adminUsers; }
- public String getAdminUsers() { return adminUsers; }
+ public String getAdminUserGroups() { return adminUserGroups; }
- public String getAdminUserGroups() { return adminUserGroups; }
+ public boolean shouldShowUserInfo() { return shouldShowUserInfo; }
- public boolean shouldShowUserInfo() { return shouldShowUserInfo; }
+ public QueueInfo queueInfo() { return queueInfo; }
+}
- public QueueInfo queueInfo() { return queueInfo; }
+public static class DrillbitInfo implements Comparable<DrillbitInfo> {
+ private final String address;
+ private final String userPort;
+ private final String controlPort;
+ private final String dataPort;
+ private final String version;
+ private final boolean current;
+ private final boolean versionMatch;
+ private final String state;
+
+ @JsonCreator
+ public DrillbitInfo(DrillbitEndpoint drillbit, boolean current, boolean versionMatch) {
+ this.address = drillbit.getAddress();
+ this.userPort = String.valueOf(drillbit.getUserPort());
+ this.controlPort = String.valueOf(drillbit.getControlPort());
+ this.dataPort = String.valueOf(drillbit.getDataPort());
+ this.version = Strings.isNullOrEmpty(drillbit.getVersion()) ? "Undefined" : drillbit.getVersion();
+ this.current = current;
+ this.versionMatch = versionMatch;
+ this.state = String.valueOf(drillbit.getState());
}
- public static class DrillbitInfo implements Comparable<DrillbitInfo> {
- private final String address;
- private final String userPort;
- private final String controlPort;
- private final String dataPort;
- private final String version;
- private final boolean current;
- private final boolean versionMatch;
-
- @JsonCreator
- public DrillbitInfo(DrillbitEndpoint drillbit, boolean current, boolean versionMatch) {
- this.address = drillbit.getAddress();
- this.userPort = String.valueOf(drillbit.getUserPort());
- this.controlPort = String.valueOf(drillbit.getControlPort());
- this.dataPort = String.valueOf(drillbit.getDataPort());
- this.version = Strings.isNullOrEmpty(drillbit.getVersion()) ? "Undefined" : drillbit.getVersion();
- this.current = current;
- this.versionMatch = versionMatch;
- }
+ public String getAddress() { return address; }
- public String getAddress() { return address; }
+ public String getUserPort() { return userPort; }
- public String getUserPort() { return userPort; }
+ public String getControlPort() { return controlPort; }
- public String getControlPort() { return controlPort; }
+ public String getDataPort() { return dataPort; }
- public String getDataPort() { return dataPort; }
+ public String getVersion() { return version; }
- public String getVersion() { return version; }
+ public boolean isCurrent() { return current; }
- public boolean isCurrent() { return current; }
+ public boolean isVersionMatch() { return versionMatch; }
- public boolean isVersionMatch() { return versionMatch; }
+ public String getState() { return state; }
- /**
- * Method used to sort Drillbits. Current Drillbit goes first.
- * Then Drillbits with matching versions, after them Drillbits with mismatching versions.
- * Matching Drillbits are sorted according address natural order,
- * mismatching Drillbits are sorted according version, address natural order.
- *
- * @param drillbitToCompare Drillbit to compare against
- * @return -1 if Drillbit should be before, 1 if after in list
- */
- @Override
- public int compareTo(DrillbitInfo drillbitToCompare) {
- if (this.isCurrent()) {
- return -1;
- }
+ /**
+ * Method used to sort Drillbits. Current Drillbit goes first.
+ * Then Drillbits with matching versions, after them Drillbits with mismatching versions.
+ * Matching Drillbits are sorted according address natural order,
+ * mismatching Drillbits are sorted according version, address natural order.
+ *
+ * @param drillbitToCompare Drillbit to compare against
+ * @return -1 if Drillbit should be before, 1 if after in list
+ */
+ @Override
+ public int compareTo(DrillbitInfo drillbitToCompare) {
+ if (this.isCurrent()) {
+ return -1;
+ }
- if (drillbitToCompare.isCurrent()) {
- return 1;
- }
+ if (drillbitToCompare.isCurrent()) {
+ return 1;
+ }
- if (this.isVersionMatch() == drillbitToCompare.isVersionMatch()) {
- if (this.version.equals(drillbitToCompare.getVersion())) {
- return this.address.compareTo(drillbitToCompare.getAddress());
+ if (this.isVersionMatch() == drillbitToCompare.isVersionMatch()) {
+ if (this.version.equals(drillbitToCompare.getVersion())) {
+ {
+ if (this.address.equals(drillbitToCompare.getAddress())) {
+ return (this.controlPort.compareTo(drillbitToCompare.getControlPort()));
+ }
+ return (this.address.compareTo(drillbitToCompare.getAddress()));
}
- return this.version.compareTo(drillbitToCompare.getVersion());
}
- return this.versionMatch ? -1 : 1;
+ return this.version.compareTo(drillbitToCompare.getVersion());
}
+ return this.versionMatch ? -1 : 1;
}
}
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java
index 1ad2a09..f0e822f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java
@@ -23,13 +23,14 @@ import com.codahale.metrics.servlets.ThreadDumpServlet;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.drill.common.exceptions.DrillException;
import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.DrillException;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ssl.SSLConfig;
import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.rpc.security.plain.PlainFactory;
import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.server.rest.auth.DrillRestLoginService;
import org.apache.drill.exec.ssl.SSLConfigBuilder;
import org.apache.drill.exec.work.WorkManager;
@@ -106,6 +107,8 @@ public class WebServer implements AutoCloseable {
private Server embeddedJetty;
+ private final Drillbit drillbit;
+
private int port;
/**
@@ -114,11 +117,12 @@ public class WebServer implements AutoCloseable {
* @param context Bootstrap context.
* @param workManager WorkManager instance.
*/
- public WebServer(final BootStrapContext context, final WorkManager workManager) {
+ public WebServer(final BootStrapContext context, final WorkManager workManager, final Drillbit drillbit) {
this.context = context;
this.config = context.getConfig();
this.metrics = context.getMetrics();
this.workManager = workManager;
+ this.drillbit = drillbit;
}
private static final String BASE_STATIC_PATH = "/rest/static/";
@@ -193,7 +197,7 @@ public class WebServer implements AutoCloseable {
servletContextHandler.setErrorHandler(errorHandler);
servletContextHandler.setContextPath("/");
- final ServletHolder servletHolder = new ServletHolder(new ServletContainer(new DrillRestServer(workManager, servletContextHandler.getServletContext())));
+ final ServletHolder servletHolder = new ServletHolder(new ServletContainer(new DrillRestServer(workManager, servletContextHandler.getServletContext(), drillbit)));
servletHolder.setInitOrder(1);
servletContextHandler.addServlet(servletHolder, "/*");
http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
index 29ae0f6..3efa054 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
@@ -35,6 +35,7 @@ import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State;
import org.apache.drill.exec.rpc.TransportCheck;
import org.apache.drill.exec.rpc.control.Controller;
import org.apache.drill.exec.rpc.control.ControllerImpl;
@@ -102,6 +103,7 @@ public class ServiceEngine implements AutoCloseable {
.setAddress(hostName)
.setUserPort(userPort)
.setVersion(DrillVersionInfo.getVersion())
+ .setState(State.STARTUP)
.build();
partialEndpoint = controller.start(partialEndpoint, allowPortHunting);
http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java
index 836d339..dc4e7c7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java
@@ -29,7 +29,7 @@ public class DrillbitIterator implements Iterator<Object> {
private DrillbitEndpoint current;
public DrillbitIterator(FragmentContext c) {
- this.endpoints = c.getDrillbitContext().getBits().iterator();
+ this.endpoints = c.getDrillbitContext().getAvailableBits().iterator();
this.current = c.getIdentity();
}
@@ -40,6 +40,7 @@ public class DrillbitIterator implements Iterator<Object> {
public int data_port;
public boolean current;
public String version;
+ public String state;
}
@Override
@@ -51,15 +52,28 @@ public class DrillbitIterator implements Iterator<Object> {
public Object next() {
DrillbitEndpoint ep = endpoints.next();
DrillbitInstance i = new DrillbitInstance();
- i.current = ep.equals(current);
+ i.current = isCurrent(ep);
i.hostname = ep.getAddress();
i.user_port = ep.getUserPort();
i.control_port = ep.getControlPort();
i.data_port = ep.getDataPort();
i.version = ep.getVersion();
+ i.state = ep.getState().toString();
return i;
}
+ public boolean isCurrent(DrillbitEndpoint ep) {
+
+ String epAddress = ep.getAddress();
+ int epPort = ep.getUserPort();
+ String currentEpAddress = current.getAddress();
+ int currentEpPort = current.getUserPort();
+ if (currentEpAddress.equals(epAddress) && currentEpPort == epPort) {
+ return true;
+ }
+ return false;
+ }
+
@Override
public void remove() {
throw new UnsupportedOperationException();
http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index 6e560a9..5d369de 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -37,6 +37,7 @@ import org.apache.drill.exec.rpc.control.Controller;
import org.apache.drill.exec.rpc.control.WorkEventBus;
import org.apache.drill.exec.rpc.data.DataConnectionCreator;
import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.sys.PersistentStoreProvider;
import org.apache.drill.exec.work.batch.ControlMessageHandler;
@@ -46,6 +47,7 @@ import org.apache.drill.exec.work.fragment.FragmentExecutor;
import org.apache.drill.exec.work.fragment.FragmentManager;
import org.apache.drill.exec.work.user.UserWorker;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -77,6 +79,8 @@ public class WorkManager implements AutoCloseable {
private final WorkEventBus workBus;
private final Executor executor;
private final StatusThread statusThread;
+ private long numOfRunningQueries;
+ private long numOfRunningFragments;
/**
* How often the StatusThread collects statistics about running fragments.
@@ -165,32 +169,57 @@ public class WorkManager implements AutoCloseable {
*
* <p>This is intended to be used by {@link org.apache.drill.exec.server.Drillbit#close()}.</p>
*/
- public void waitToExit() {
+ public void waitToExit(Drillbit bit, boolean forcefulShutdown) {
synchronized(this) {
- if (queries.isEmpty() && runningFragments.isEmpty()) {
+ numOfRunningQueries = queries.size();
+ numOfRunningFragments = runningFragments.size();
+ if ( queries.isEmpty() && runningFragments.isEmpty()) {
return;
}
-
+ logger.info("Draining " + queries +" queries and "+ runningFragments+" fragments.");
exitLatch = new ExtendedLatch();
}
-
- // Wait for at most 5 seconds or until the latch is released.
- exitLatch.awaitUninterruptibly(5000);
+ // Wait uninterruptibly until all the queries and running fragments on that drillbit goes down
+ // to zero
+ if( forcefulShutdown ) {
+ exitLatch.awaitUninterruptibly(5000);
+ } else {
+ exitLatch.awaitUninterruptibly();
+ }
}
/**
* If it is safe to exit, and the exitLatch is in use, signals it so that waitToExit() will
- * unblock.
+ * unblock. Logs the number of pending fragments and queries that are running on that
+ * drillbit to track the progress of shutdown process.
*/
private void indicateIfSafeToExit() {
synchronized(this) {
if (exitLatch != null) {
+ logger.info("Waiting for "+ queries.size() +" queries to complete before shutting down");
+ logger.info("Waiting for "+ runningFragments.size() +" running fragments to complete before shutting down");
+ if(runningFragments.size() > numOfRunningFragments|| queries.size() > numOfRunningQueries) {
+ logger.info("New Fragments or queries are added while drillbit is Shutting down");
+ }
if (queries.isEmpty() && runningFragments.isEmpty()) {
+ // Both Queries and Running fragments are empty.
+ // So its safe for the drillbit to exit.
exitLatch.countDown();
}
}
}
}
+ /**
+ * Get the number of queries that are running on a drillbit.
+ * Primarily used to monitor the number of running queries after a
+ * shutdown request is triggered.
+ */
+ public synchronized Map<String, Integer> getRemainingQueries() {
+ Map<String, Integer> queriesInfo = new HashMap<String, Integer>();
+ queriesInfo.put("queriesCount", queries.size());
+ queriesInfo.put("fragmentsCount", runningFragments.size());
+ return queriesInfo;
+ }
/**
* Narrowed interface to WorkManager that is made available to tasks it is managing.
http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index a1d150e..8ce8639 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -253,7 +253,17 @@ public class Foreman implements Runnable {
final Thread currentThread = Thread.currentThread();
final String originalName = currentThread.getName();
currentThread.setName(queryIdString + ":foreman");
-
+ try {
+ /*
+ Check if the foreman is ONLINE. If not dont accept any new queries.
+ */
+ if (!drillbitContext.isForemanOnline()) {
+ throw new ForemanException("Query submission failed since Foreman is shutting down.");
+ }
+ } catch (ForemanException e) {
+ logger.debug("Failure while submitting query", e);
+ addToEventQueue(QueryState.FAILED, e);
+ }
// track how long the query takes
queryManager.markStartTime();
enqueuedQueries.dec();
@@ -559,7 +569,7 @@ public class Foreman implements Runnable {
final SimpleParallelizer parallelizer = new SimpleParallelizer(queryContext);
return parallelizer.getFragments(
queryContext.getOptions().getOptionList(), queryContext.getCurrentEndpoint(),
- queryId, queryContext.getActiveEndpoints(), rootFragment,
+ queryId, queryContext.getOnlineEndpoints(), rootFragment,
initiatingClient.getSession(), queryContext.getQueryContextInfo());
}
http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index f5e85a3..c923e4f 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -369,6 +369,16 @@ drill.exec: {
// planning and managing queries. Primarily for testing.
cpus_per_node: 0,
}
+ # Grace period is the amount of time where the drillbit accepts work after
+ # the shutdown request is triggered. The primary use of grace period is to
+ # avoid the race conditions caused by zookeeper delay in updating the state
+ # information of the drillbit that is shutting down. So, it is advisable
+ # to have a grace period that is atleast twice the amount of zookeeper
+ # refresh time.
+ grace_period_ms : 0,
+ //port hunting for drillbits. Enabled only for testing purposes.
+ port_hunt : false
+
}
drill.jdbc: {
http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/resources/rest/index.ftl
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/rest/index.ftl b/exec/java-exec/src/main/resources/rest/index.ftl
index 45dc1c9..74425d6 100644
--- a/exec/java-exec/src/main/resources/rest/index.ftl
+++ b/exec/java-exec/src/main/resources/rest/index.ftl
@@ -46,7 +46,7 @@
<div class="row">
<div class="col-md-12">
- <h3>Drillbits <span class="label label-primary">${model.getDrillbits()?size}</span></h3>
+ <h3>Drillbits <span class="label label-primary" id="size" >${model.getDrillbits()?size}</span></h3>
<div class="table-responsive">
<table class="table table-hover">
<thead>
@@ -57,19 +57,19 @@
<th>Control Port</th>
<th>Data Port</th>
<th>Version</th>
+ <th>Status</th>
</tr>
</thead>
<tbody>
<#assign i = 1>
<#list model.getDrillbits() as drillbit>
- <tr>
+ <tr id="row-${i}">
<td>${i}</td>
- <td>${drillbit.getAddress()}
- <#if drillbit.isCurrent()>
+ <td id="address" >${drillbit.getAddress()}<#if drillbit.isCurrent()>
<span class="label label-info">Current</span>
</#if>
</td>
- <td>${drillbit.getUserPort()}</td>
+ <td id="port" >${drillbit.getUserPort()}</td>
<td>${drillbit.getControlPort()}</td>
<td>${drillbit.getDataPort()}</td>
<td>
@@ -78,6 +78,11 @@
${drillbit.getVersion()}
</span>
</td>
+ <td id="status" >${drillbit.getState()}</td>
+ <td>
+ <button type="button" id="shutdown" onClick="shutdown('${drillbit.getAddress()}',$(this));"> SHUTDOWN </button>
+ </td>
+ <td id="queriesCount"> </td>
</tr>
<#assign i = i + 1>
</#list>
@@ -179,6 +184,101 @@
</div>
</div>
</div>
+ <script charset="utf-8">
+ var refreshTime = 2000;
+ var refresh = getRefreshTime();
+ var portNum = 0;
+ var port = getPortNum();
+ console.log(portNum);
+ var timeout;
+ var size = $("#size").html();
+
+
+ function getPortNum() {
+ var port = $.ajax({
+ type: 'GET',
+ url: '/portNum',
+ dataType: "json",
+ complete: function(data) {
+ portNum = data.responseJSON["port"];
+ }
+ });
+ }
+
+ function getRefreshTime() {
+ var refresh = $.ajax({
+ type: 'GET',
+ url: '/gracePeriod',
+ dataType: "json",
+ complete: function(data) {
+ refreshTime = data.responseJSON["graceperiod"];
+ refreshTime = refreshTime/3;
+ timeout = setTimeout(reloadStatus,refreshTime );
+ }
+ });
+ }
+ function reloadStatus () {
+ console.log(refreshTime);
+ var result = $.ajax({
+ type: 'GET',
+ url: '/state',
+ dataType: "json",
+ complete: function(data) {
+ fillStatus(data,size);
+ }
+ });
+ timeout = setTimeout(reloadStatus, refreshTime);
+ }
+
+ function fillStatus(data,size) {
+ var status_map = (data.responseJSON);
+ for (i = 1; i <= size; i++) {
+ var address = $("#row-"+i).find("#address").contents().get(0).nodeValue;
+ address = address.trim();
+ var port = $("#row-"+i).find("#port").html();
+ var key = address+"-"+port;
+
+ if (status_map[key] == null) {
+ $("#row-"+i).find("#status").text("OFFLINE");
+ $("#row-"+i).find("#shutdown").prop('disabled',true).css('opacity',0.5);
+ $("#row-"+i).find("#queriesCount").text("");
+ }
+ else {
+ if( status_map[key] == "ONLINE") {
+ $("#row-"+i).find("#status").text(status_map[key]);
+ }
+ else {
+ fillQueryCount(address,i);
+ $("#row-"+i).find("#status").text(status_map[key]);
+ }
+ }
+ }
+ }
+ function fillQueryCount(address,row_id) {
+ url = "http://"+address+":"+portNum+"/queriesCount";
+ var result = $.ajax({
+ type: 'GET',
+ url: url,
+ complete: function(data) {
+ queries = data.responseJSON["queriesCount"];
+ fragments = data.responseJSON["fragmentsCount"];
+ $("#row-"+row_id).find("#queriesCount").text(queries+" queries and "+fragments+" fragments remaining before shutting down");
+ }
+ });
+ }
+ function shutdown(address,button) {
+ url = "http://"+address+":"+portNum+"/gracefulShutdown";
+ var result = $.ajax({
+ type: 'POST',
+ url: url,
+ contentType : 'text/plain',
+ complete: function(data) {
+ alert(data.responseJSON["response"]);
+ button.prop('disabled',true).css('opacity',0.5);
+ }
+ });
+ }
+ </script>
</#macro>
<@page_html/>
http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java
index c30cb09..37aa1db 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java
@@ -248,7 +248,7 @@ public class TestMetadataProvider extends BaseTestQuery {
assertEquals(RequestStatus.OK, resp.getStatus());
List<ColumnMetadata> columns = resp.getColumnsList();
- assertEquals(92, columns.size());
+ assertEquals(93, columns.size());
// too many records to verify the output.
}
http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
index f0653f7..a4d62d4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java
@@ -404,6 +404,25 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable {
}
/**
+ * Shutdown the drillbit given the name of the drillbit.
+ */
+ public void closeDrillbit(final String drillbitName) throws Exception {
+ Exception ex = null;
+ for (Drillbit bit : drillbits()) {
+ if (bit.equals(bits.get(drillbitName))) {
+ try {
+ bit.close();
+ } catch (Exception e) {
+ ex = ex == null ? e :ex;
+ }
+ }
+ }
+ if (ex != null) {
+ throw ex;
+ }
+ }
+
+ /**
* Close a resource, suppressing the exception, and keeping
* only the first exception that may occur. We assume that only
* the first is useful, any others are probably down-stream effects