You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mb...@apache.org on 2017/09/25 17:02:13 UTC

asterixdb git commit: [NO ISSUE] Minor active refactoring

Repository: asterixdb
Updated Branches:
  refs/heads/master 17cd99cc7 -> 9ee6f2d7c


[NO ISSUE] Minor active refactoring

- Remove unused ActiveRuntimeManager
- Rename StatsRequestMessage -> ActiveStatsRequestMessage
- Add ActiveManager API to return all active runtimes
- Interrupt running HTTP requests after 5s upon shutdown
- Log thread dump when HTTP requests do not complete after interruption

Change-Id: I79249f7cd42496d6679eb9b0acbe8cda1892f9d3
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2021
Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mb...@apache.org>
Tested-by: Michael Blow <mb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/9ee6f2d7
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/9ee6f2d7
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/9ee6f2d7

Branch: refs/heads/master
Commit: 9ee6f2d7c40bf47348fbf254db0831d755bf1a5f
Parents: 17cd99c
Author: Michael Blow <mi...@couchbase.com>
Authored: Mon Sep 25 09:39:02 2017 -0400
Committer: Michael Blow <mb...@apache.org>
Committed: Mon Sep 25 10:01:53 2017 -0700

----------------------------------------------------------------------
 .../apache/asterix/active/ActiveManager.java    |  20 ++--
 .../asterix/active/ActiveRuntimeManager.java    |  95 -----------------
 .../message/ActiveStatsRequestMessage.java      |  35 +++++++
 .../active/message/StatsRequestMessage.java     |  35 -------
 .../api/http/server/NCQueryServiceServlet.java  |   5 +-
 .../app/active/ActiveEntityEventsListener.java  |   4 +-
 .../message/ExecuteStatementRequestMessage.java |   2 +-
 .../asterix/test/runtime/LangExecutionUtil.java |   8 +-
 .../main/resources/asx_errormsg/en.properties   |   2 +-
 .../dataflow/FeedRecordDataFlowController.java  |   7 +-
 .../control/cc/work/GetThreadDumpWork.java      |   5 +-
 .../control/common/utils/ThreadDumpHelper.java  |  95 -----------------
 .../control/nc/NodeControllerService.java       |   2 +-
 .../nc/io/profiling/IOCounterFactory.java       |   5 +
 .../hyracks/control/nc/task/ThreadDumpTask.java |   4 +-
 hyracks-fullstack/hyracks/hyracks-http/pom.xml  |   5 +
 .../apache/hyracks/http/server/HttpServer.java  |  16 ++-
 .../org/apache/hyracks/util/ThreadDumpUtil.java | 104 +++++++++++++++++++
 18 files changed, 197 insertions(+), 252 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9ee6f2d7/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
index df59dca..264e9bc 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -18,8 +18,10 @@
  */
 package org.apache.asterix.active;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
@@ -32,7 +34,7 @@ import java.util.logging.Logger;
 
 import org.apache.asterix.active.message.ActiveManagerMessage;
 import org.apache.asterix.active.message.ActiveStatsResponse;
-import org.apache.asterix.active.message.StatsRequestMessage;
+import org.apache.asterix.active.message.ActiveStatsRequestMessage;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.memory.ConcurrentFramePool;
@@ -54,7 +56,7 @@ public class ActiveManager {
     private volatile boolean shutdown;
 
     public ActiveManager(ExecutorService executor, String nodeId, long activeMemoryBudget, int frameSize,
-            INCServiceContext serviceCtx) throws HyracksDataException {
+                         INCServiceContext serviceCtx) throws HyracksDataException {
         this.executor = executor;
         this.nodeId = nodeId;
         this.activeFramePool = new ConcurrentFramePool(nodeId, activeMemoryBudget, frameSize);
@@ -77,6 +79,10 @@ public class ActiveManager {
         runtimes.remove(id);
     }
 
+    public Set<ActiveRuntimeId> getRuntimeIds() {
+        return Collections.unmodifiableSet(runtimes.keySet());
+    }
+
     public IActiveRuntime getRuntime(ActiveRuntimeId runtimeId) {
         return runtimes.get(runtimeId);
     }
@@ -93,14 +99,14 @@ public class ActiveManager {
                 stopRuntime(message);
                 break;
             case REQUEST_STATS:
-                requestStats((StatsRequestMessage) message);
+                requestStats((ActiveStatsRequestMessage) message);
                 break;
             default:
                 LOGGER.warning("Unknown message type received: " + message.getKind());
         }
     }
 
-    private void requestStats(StatsRequestMessage message) throws HyracksDataException {
+    private void requestStats(ActiveStatsRequestMessage message) throws HyracksDataException {
         try {
             ActiveRuntimeId runtimeId = (ActiveRuntimeId) message.getPayload();
             IActiveRuntime runtime = runtimes.get(runtimeId);
@@ -111,9 +117,9 @@ public class ActiveManager {
                 ((NodeControllerService) serviceCtx.getControllerService())
                         .sendApplicationMessageToCC(
                                 JavaSerializationUtils
-                                        .serialize(new ActiveStatsResponse(reqId, null, new RuntimeDataException(
-                                                ErrorCode.ACTIVE_MANAGER_INVALID_RUNTIME, runtimeId.toString()))),
-                                null);
+                                        .serialize(new ActiveStatsResponse(reqId, null,
+                                                new RuntimeDataException(ErrorCode.ACTIVE_MANAGER_INVALID_RUNTIME,
+                                                        runtimeId.toString()))), null);
                 return;
             }
             String stats = runtime.getStats();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9ee6f2d7/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java
deleted file mode 100644
index 18368ae..0000000
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.active;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class ActiveRuntimeManager {
-
-    private static final Logger LOGGER = Logger.getLogger(ActiveRuntimeManager.class.getName());
-    private final Map<ActiveRuntimeId, ActiveSourceOperatorNodePushable> activeRuntimes;
-
-    private final ExecutorService executorService;
-
-    public ActiveRuntimeManager() {
-        this.activeRuntimes = new ConcurrentHashMap<>();
-        this.executorService = Executors.newCachedThreadPool();
-    }
-
-    public void close() throws IOException {
-        if (executorService != null) {
-            executorService.shutdown();
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Shut down executor service for :" + ActiveRuntimeManager.class.getSimpleName());
-            }
-            try {
-                executorService.awaitTermination(10L, TimeUnit.SECONDS);
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                LOGGER.log(Level.SEVERE, ActiveRuntimeManager.class.getSimpleName()
-                        + " was interrupted while waiting for runtime managers to shutdown", e);
-            }
-            if (!executorService.isTerminated()) {
-                LOGGER.severe(ActiveRuntimeManager.class.getSimpleName()
-                        + " failed to shutdown successfully. Will be forced to shutdown");
-                executorService.shutdownNow();
-            }
-        }
-    }
-
-    public ActiveSourceOperatorNodePushable getRuntime(ActiveRuntimeId runtimeId) {
-        return activeRuntimes.get(runtimeId);
-    }
-
-    public void registerRuntime(ActiveRuntimeId runtimeId, ActiveSourceOperatorNodePushable feedRuntime)
-            throws HyracksDataException {
-        if (activeRuntimes.containsKey(runtimeId)) {
-            throw new RuntimeDataException(ErrorCode.ACTIVE_RUNTIME_IS_ALREADY_REGISTERED, runtimeId);
-        }
-        activeRuntimes.put(runtimeId, feedRuntime);
-    }
-
-    public void deregisterRuntime(ActiveRuntimeId runtimeId) throws HyracksDataException {
-        if (!activeRuntimes.containsKey(runtimeId)) {
-            throw new RuntimeDataException(ErrorCode.ACTIVE_RUNTIME_IS_NOT_REGISTERED, runtimeId);
-        }
-        activeRuntimes.remove(runtimeId);
-    }
-
-    public ExecutorService getExecutorService() {
-        return executorService;
-    }
-
-    public Set<ActiveRuntimeId> getFeedRuntimes() {
-        return activeRuntimes.keySet();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9ee6f2d7/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java
new file mode 100644
index 0000000..0dbba52
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active.message;
+
+import java.io.Serializable;
+
+public class ActiveStatsRequestMessage extends ActiveManagerMessage {
+    private static final long serialVersionUID = 1L;
+    private final long reqId;
+
+    public ActiveStatsRequestMessage(Serializable payload, long reqId) {
+        super(Kind.REQUEST_STATS, payload);
+        this.reqId = reqId;
+    }
+
+    public long getReqId() {
+        return reqId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9ee6f2d7/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java
deleted file mode 100644
index d43f00e..0000000
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.active.message;
-
-import java.io.Serializable;
-
-public class StatsRequestMessage extends ActiveManagerMessage {
-    private static final long serialVersionUID = 1L;
-    private final long reqId;
-
-    public StatsRequestMessage(Serializable payload, long reqId) {
-        super(Kind.REQUEST_STATS, payload);
-        this.reqId = reqId;
-    }
-
-    public long getReqId() {
-        return reqId;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9ee6f2d7/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index ebc2db5..254b6f0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -22,6 +22,7 @@ package org.apache.asterix.api.http.server;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
 import java.util.logging.Level;
@@ -132,8 +133,8 @@ public class NCQueryServiceServlet extends QueryServiceServlet {
             CancelQueryRequest cancelQueryMessage =
                     new CancelQueryRequest(nodeId, cancelQueryFuture.getFutureId(), clientContextID);
             messageBroker.sendMessageToCC(cancelQueryMessage);
-            cancelQueryFuture.get(ExecuteStatementRequestMessage.DEFAULT_QUERY_CANCELLATION_TIMEOUT_MILLIS,
-                    java.util.concurrent.TimeUnit.MILLISECONDS);
+            cancelQueryFuture.get(ExecuteStatementRequestMessage.DEFAULT_QUERY_CANCELLATION_WAIT_MILLIS,
+                    TimeUnit.MILLISECONDS);
         } catch (Exception e) {
             exception.addSuppressed(e);
         } finally {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9ee6f2d7/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 77f2b23..73d0840 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -38,7 +38,7 @@ import org.apache.asterix.active.IRetryPolicyFactory;
 import org.apache.asterix.active.NoRetryPolicyFactory;
 import org.apache.asterix.active.message.ActivePartitionMessage;
 import org.apache.asterix.active.message.ActivePartitionMessage.Event;
-import org.apache.asterix.active.message.StatsRequestMessage;
+import org.apache.asterix.active.message.ActiveStatsRequestMessage;
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.ErrorCode;
@@ -289,7 +289,7 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl
         List<INcAddressedMessage> requests = new ArrayList<>();
         List<String> ncs = Arrays.asList(locations.getLocations());
         for (int i = 0; i < ncs.size(); i++) {
-            requests.add(new StatsRequestMessage(new ActiveRuntimeId(entityId, runtimeName, i), reqId));
+            requests.add(new ActiveStatsRequestMessage(new ActiveRuntimeId(entityId, runtimeName, i), reqId));
         }
         try {
             List<String> responses = (List<String>) messageBroker.sendSyncRequestToNCs(reqId, ncs, requests, timeout);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9ee6f2d7/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
index ce57a0c..28e55a6 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -62,7 +62,7 @@ public final class ExecuteStatementRequestMessage implements ICcAddressedMessage
     //TODO: Make configurable: https://issues.apache.org/jira/browse/ASTERIXDB-2062
     public static final long DEFAULT_NC_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(5);
     //TODO: Make configurable: https://issues.apache.org/jira/browse/ASTERIXDB-2063
-    public static final long DEFAULT_QUERY_CANCELLATION_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(1);
+    public static final long DEFAULT_QUERY_CANCELLATION_WAIT_MILLIS = TimeUnit.MINUTES.toMillis(0);
     private final String requestNodeId;
     private final long requestMessageId;
     private final ILangExtension.Language lang;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9ee6f2d7/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
index c5b9d11..03f42f5 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
@@ -19,7 +19,7 @@
 
 package org.apache.asterix.test.runtime;
 
-import static org.apache.hyracks.control.common.utils.ThreadDumpHelper.takeDumpJSONString;
+import static org.apache.hyracks.util.ThreadDumpUtil.takeDumpJSONString;
 
 import java.io.BufferedReader;
 import java.io.File;
@@ -39,7 +39,7 @@ import org.apache.asterix.testframework.context.TestCaseContext;
 import org.apache.commons.lang.SystemUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hyracks.api.io.IODeviceHandle;
-import org.apache.hyracks.control.common.utils.ThreadDumpHelper;
+import org.apache.hyracks.util.ThreadDumpUtil;
 import org.apache.hyracks.control.nc.NodeControllerService;
 
 /**
@@ -190,7 +190,7 @@ public class LangExecutionUtil {
     }
 
     private static void checkThreadLeaks() throws IOException {
-        String threadDump = ThreadDumpHelper.takeDumpJSONString(ManagementFactory.getThreadMXBean());
+        String threadDump = ThreadDumpUtil.takeDumpJSONString();
         // Currently we only do sanity check for threads used in the execution engine.
         // Later we should check if there are leaked storage threads as well.
         if (threadDump.contains("Operator") || threadDump.contains("SuperActivity")
@@ -215,7 +215,7 @@ public class LangExecutionUtil {
         try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
             int runFileCount = Integer.parseInt(reader.readLine().trim());
             if (runFileCount != 0) {
-                System.out.print(takeDumpJSONString(ManagementFactory.getThreadMXBean()));
+                System.out.print(takeDumpJSONString());
                 outputLeakedOpenFiles(processId);
                 throw new AssertionError("There are " + runFileCount + " leaked run files.");
             }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9ee6f2d7/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index fd5dba2..b1a5ff2 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -63,7 +63,7 @@
 27 = Operation not supported
 28 = Invalid duration %1$s
 29 = Unknown duration unit %1$s
-30 = Query timed out
+30 = Query timed out and will be cancelled
 
 100 = Unable to instantiate class %1$s
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9ee6f2d7/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index c85e236..824f51a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -36,6 +36,9 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 
 public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowController {
+    public static final String INCOMING_RECORDS_COUNT_FIELD_NAME = "incoming-records-count";
+    public static final String FAILED_AT_PARSER_RECORDS_COUNT_FIELD_NAME = "failed-at-parser-records-count";
+
     public enum State {
         CREATED,
         STARTED,
@@ -278,7 +281,7 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl
 
     @Override
     public String getStats() {
-        return "{\"incoming-records-count\": " + incomingRecordsCount + ", \"failed-at-parser-records-count\": "
-                + failedRecordsCount + "}";
+        return "{\"" + INCOMING_RECORDS_COUNT_FIELD_NAME + "\": " + incomingRecordsCount + ", \"" +
+                FAILED_AT_PARSER_RECORDS_COUNT_FIELD_NAME + "\": " + failedRecordsCount + "}";
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9ee6f2d7/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java
index b5388c2..407f9cd 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java
@@ -19,7 +19,6 @@
 
 package org.apache.hyracks.control.cc.work;
 
-import java.lang.management.ManagementFactory;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -29,7 +28,7 @@ import java.util.logging.Logger;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.NodeControllerState;
 import org.apache.hyracks.control.cc.cluster.INodeManager;
-import org.apache.hyracks.control.common.utils.ThreadDumpHelper;
+import org.apache.hyracks.util.ThreadDumpUtil;
 import org.apache.hyracks.control.common.work.AbstractWork;
 import org.apache.hyracks.control.common.work.IResultCallback;
 
@@ -55,7 +54,7 @@ public class GetThreadDumpWork extends AbstractWork {
         if (nodeId == null) {
             // null nodeId means the request is for the cluster controller
             try {
-                callback.setValue(ThreadDumpHelper.takeDumpJSONString(ManagementFactory.getThreadMXBean()));
+                callback.setValue(ThreadDumpUtil.takeDumpJSONString());
             } catch (Exception e) {
                 LOGGER.log(Level.WARNING, "Exception taking CC thread dump", e);
                 callback.setException(e);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9ee6f2d7/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ThreadDumpHelper.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ThreadDumpHelper.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ThreadDumpHelper.java
deleted file mode 100644
index 62c6586..0000000
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ThreadDumpHelper.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.common.utils;
-
-import java.io.IOException;
-import java.lang.management.ThreadInfo;
-import java.lang.management.ThreadMXBean;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-public class ThreadDumpHelper {
-    private static final ObjectMapper om = new ObjectMapper();
-
-    private ThreadDumpHelper() {
-        om.enable(SerializationFeature.INDENT_OUTPUT);
-    }
-
-    public static String takeDumpJSONString(ThreadMXBean threadMXBean) throws IOException {
-        ObjectNode json = takeDumpJSON(threadMXBean);
-        return om.writerWithDefaultPrettyPrinter().writeValueAsString(json);
-    }
-
-    public static ObjectNode takeDumpJSON(ThreadMXBean threadMXBean) {
-        ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(true, true);
-        List<Map<String, Object>> threads = new ArrayList<>();
-
-        for (ThreadInfo thread : threadInfos) {
-            Map<String, Object> threadMap = new HashMap<>();
-            threadMap.put("name", thread.getThreadName());
-            threadMap.put("id", thread.getThreadId());
-            threadMap.put("state", thread.getThreadState().name());
-            List<String> stacktrace = new ArrayList<>();
-            for (StackTraceElement element : thread.getStackTrace()) {
-                stacktrace.add(element.toString());
-            }
-            threadMap.put("stack", stacktrace);
-
-            if (thread.getLockName() != null) {
-                threadMap.put("lock_name", thread.getLockName());
-            }
-            if (thread.getLockOwnerId() != -1) {
-                threadMap.put("lock_owner_id", thread.getLockOwnerId());
-            }
-            if (thread.getBlockedTime() > 0) {
-                threadMap.put("blocked_time", thread.getBlockedTime());
-            }
-            if (thread.getBlockedCount() > 0) {
-                threadMap.put("blocked_count", thread.getBlockedCount());
-            }
-            if (thread.getLockedMonitors().length > 0) {
-                threadMap.put("locked_monitors", thread.getLockedMonitors());
-            }
-            if (thread.getLockedSynchronizers().length > 0) {
-                threadMap.put("locked_synchronizers", thread.getLockedSynchronizers());
-            }
-            threads.add(threadMap);
-        }
-        ObjectNode json = om.createObjectNode();
-        json.put("date", String.valueOf(new Date()));
-        json.putPOJO("threads", threads);
-
-        long[] deadlockedThreads = threadMXBean.findDeadlockedThreads();
-        long[] monitorDeadlockedThreads = threadMXBean.findMonitorDeadlockedThreads();
-        if (deadlockedThreads != null && deadlockedThreads.length > 0) {
-            json.putPOJO("deadlocked_thread_ids", deadlockedThreads);
-        }
-        if (monitorDeadlockedThreads != null && monitorDeadlockedThreads.length > 0) {
-            json.putPOJO("monitor_deadlocked_thread_ids", monitorDeadlockedThreads);
-        }
-        return json;
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9ee6f2d7/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index ed5598b..a426d47 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -210,7 +210,7 @@ public class NodeControllerService implements IControllerService {
         osMXBean = ManagementFactory.getOperatingSystemMXBean();
         getNodeControllerInfosAcceptor = new MutableObject<>();
         memoryManager = new MemoryManager((long) (memoryMXBean.getHeapMemoryUsage().getMax() * MEMORY_FUDGE_FACTOR));
-        ioCounter = new IOCounterFactory().getIOCounter();
+        ioCounter = IOCounterFactory.INSTANCE.getIOCounter();
     }
 
     public IOManager getIoManager() {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9ee6f2d7/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterFactory.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterFactory.java
index 1b7cf8f..2301ae6 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterFactory.java
@@ -21,6 +21,11 @@ package org.apache.hyracks.control.nc.io.profiling;
 
 public class IOCounterFactory {
 
+    public static final IOCounterFactory INSTANCE = new IOCounterFactory();
+
+    private IOCounterFactory() {
+    }
+
     /**
      * Get the IOCounter for the specific underlying OS
      *

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9ee6f2d7/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java
index abde87f..e23aaaa 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java
@@ -21,7 +21,7 @@ package org.apache.hyracks.control.nc.task;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.hyracks.control.common.utils.ThreadDumpHelper;
+import org.apache.hyracks.util.ThreadDumpUtil;
 import org.apache.hyracks.control.nc.NodeControllerService;
 
 public class ThreadDumpTask implements Runnable {
@@ -38,7 +38,7 @@ public class ThreadDumpTask implements Runnable {
     public void run() {
         String result;
         try {
-            result = ThreadDumpHelper.takeDumpJSONString(ncs.getThreadMXBean());
+            result = ThreadDumpUtil.takeDumpJSONString();
         } catch (Exception e) {
             LOGGER.log(Level.WARNING, "Exception taking thread dump", e);
             result = null;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9ee6f2d7/hyracks-fullstack/hyracks/hyracks-http/pom.xml
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-http/pom.xml b/hyracks-fullstack/hyracks/hyracks-http/pom.xml
index ed0e8c8..09bf513 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-http/pom.xml
@@ -66,5 +66,10 @@
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-util</artifactId>
+      <version>${project.version}</version>
+    </dependency>
   </dependencies>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9ee6f2d7/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
index 44d4dfe..645bc01 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
@@ -31,6 +31,7 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.hyracks.http.api.IServlet;
+import org.apache.hyracks.util.ThreadDumpUtil;
 
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.PooledByteBufAllocator;
@@ -218,11 +219,22 @@ public class HttpServer {
     }
 
     protected void doStop() throws InterruptedException {
+        // stop taking new requests
         executor.shutdown();
         try {
-            executor.awaitTermination(1, TimeUnit.MINUTES);
+            // wait 5s before interrupting existing requests
+            executor.awaitTermination(5, TimeUnit.SECONDS);
+            // interrupt
+            executor.shutdownNow();
+            // wait 30s for interrupted requests to unwind
+            executor.awaitTermination(30, TimeUnit.SECONDS);
             if (!executor.isTerminated()) {
-                LOGGER.log(Level.SEVERE, "Failed to shutdown http server executor");
+                if (LOGGER.isLoggable(Level.INFO)) {
+                    LOGGER.log(Level.SEVERE, "Failed to shutdown http server executor; thread dump: " +
+                            ThreadDumpUtil.takeDumpString());
+                } else {
+                    LOGGER.log(Level.SEVERE, "Failed to shutdown http server executor");
+                }
             }
         } catch (Exception e) {
             LOGGER.log(Level.SEVERE, "Error while shutting down http server executor", e);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9ee6f2d7/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThreadDumpUtil.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThreadDumpUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThreadDumpUtil.java
new file mode 100644
index 0000000..ec1a0b2
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThreadDumpUtil.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+public class ThreadDumpUtil {
+    private static final ObjectMapper om = new ObjectMapper();
+    private static final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
+
+    private ThreadDumpUtil() {
+        om.enable(SerializationFeature.INDENT_OUTPUT);
+    }
+
+    public static String takeDumpJSONString() throws IOException {
+        ObjectNode json = takeDumpJSON();
+        return om.writerWithDefaultPrettyPrinter().writeValueAsString(json);
+    }
+
+    public static ObjectNode takeDumpJSON() {
+        ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(true, true);
+        List<Map<String, Object>> threads = new ArrayList<>();
+
+        for (ThreadInfo thread : threadInfos) {
+            Map<String, Object> threadMap = new HashMap<>();
+            threadMap.put("name", thread.getThreadName());
+            threadMap.put("id", thread.getThreadId());
+            threadMap.put("state", thread.getThreadState().name());
+            List<String> stacktrace = new ArrayList<>();
+            for (StackTraceElement element : thread.getStackTrace()) {
+                stacktrace.add(element.toString());
+            }
+            threadMap.put("stack", stacktrace);
+
+            if (thread.getLockName() != null) {
+                threadMap.put("lock_name", thread.getLockName());
+            }
+            if (thread.getLockOwnerId() != -1) {
+                threadMap.put("lock_owner_id", thread.getLockOwnerId());
+            }
+            if (thread.getBlockedTime() > 0) {
+                threadMap.put("blocked_time", thread.getBlockedTime());
+            }
+            if (thread.getBlockedCount() > 0) {
+                threadMap.put("blocked_count", thread.getBlockedCount());
+            }
+            if (thread.getLockedMonitors().length > 0) {
+                threadMap.put("locked_monitors", thread.getLockedMonitors());
+            }
+            if (thread.getLockedSynchronizers().length > 0) {
+                threadMap.put("locked_synchronizers", thread.getLockedSynchronizers());
+            }
+            threads.add(threadMap);
+        }
+        ObjectNode json = om.createObjectNode();
+        json.put("date", String.valueOf(new Date()));
+        json.putPOJO("threads", threads);
+
+        long[] deadlockedThreads = threadMXBean.findDeadlockedThreads();
+        long[] monitorDeadlockedThreads = threadMXBean.findMonitorDeadlockedThreads();
+        if (deadlockedThreads != null && deadlockedThreads.length > 0) {
+            json.putPOJO("deadlocked_thread_ids", deadlockedThreads);
+        }
+        if (monitorDeadlockedThreads != null && monitorDeadlockedThreads.length > 0) {
+            json.putPOJO("monitor_deadlocked_thread_ids", monitorDeadlockedThreads);
+        }
+        return json;
+    }
+
+    public static String takeDumpString() {
+        StringBuilder buf = new StringBuilder(2048);
+        Stream.of(threadMXBean.dumpAllThreads(true, true)).forEach(buf::append);
+        return buf.toString();
+    }
+}