You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mb...@apache.org on 2017/09/25 17:02:13 UTC
asterixdb git commit: [NO ISSUE] Minor active refactoring
Repository: asterixdb
Updated Branches:
refs/heads/master 17cd99cc7 -> 9ee6f2d7c
[NO ISSUE] Minor active refactoring
- Remove unused ActiveRuntimeManager
- Rename StatsRequestMessage -> ActiveStatsRequestMessage
- Add ActiveManager API to return all active runtimes
- Interrupt running HTTP requests after 5s upon shutdown
- Log thread dump when HTTP requests do not complete after interruption
Change-Id: I79249f7cd42496d6679eb9b0acbe8cda1892f9d3
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2021
Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mb...@apache.org>
Tested-by: Michael Blow <mb...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/9ee6f2d7
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/9ee6f2d7
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/9ee6f2d7
Branch: refs/heads/master
Commit: 9ee6f2d7c40bf47348fbf254db0831d755bf1a5f
Parents: 17cd99c
Author: Michael Blow <mi...@couchbase.com>
Authored: Mon Sep 25 09:39:02 2017 -0400
Committer: Michael Blow <mb...@apache.org>
Committed: Mon Sep 25 10:01:53 2017 -0700
----------------------------------------------------------------------
.../apache/asterix/active/ActiveManager.java | 20 ++--
.../asterix/active/ActiveRuntimeManager.java | 95 -----------------
.../message/ActiveStatsRequestMessage.java | 35 +++++++
.../active/message/StatsRequestMessage.java | 35 -------
.../api/http/server/NCQueryServiceServlet.java | 5 +-
.../app/active/ActiveEntityEventsListener.java | 4 +-
.../message/ExecuteStatementRequestMessage.java | 2 +-
.../asterix/test/runtime/LangExecutionUtil.java | 8 +-
.../main/resources/asx_errormsg/en.properties | 2 +-
.../dataflow/FeedRecordDataFlowController.java | 7 +-
.../control/cc/work/GetThreadDumpWork.java | 5 +-
.../control/common/utils/ThreadDumpHelper.java | 95 -----------------
.../control/nc/NodeControllerService.java | 2 +-
.../nc/io/profiling/IOCounterFactory.java | 5 +
.../hyracks/control/nc/task/ThreadDumpTask.java | 4 +-
hyracks-fullstack/hyracks/hyracks-http/pom.xml | 5 +
.../apache/hyracks/http/server/HttpServer.java | 16 ++-
.../org/apache/hyracks/util/ThreadDumpUtil.java | 104 +++++++++++++++++++
18 files changed, 197 insertions(+), 252 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9ee6f2d7/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
index df59dca..264e9bc 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -18,8 +18,10 @@
*/
package org.apache.asterix.active;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
@@ -32,7 +34,7 @@ import java.util.logging.Logger;
import org.apache.asterix.active.message.ActiveManagerMessage;
import org.apache.asterix.active.message.ActiveStatsResponse;
-import org.apache.asterix.active.message.StatsRequestMessage;
+import org.apache.asterix.active.message.ActiveStatsRequestMessage;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.memory.ConcurrentFramePool;
@@ -54,7 +56,7 @@ public class ActiveManager {
private volatile boolean shutdown;
public ActiveManager(ExecutorService executor, String nodeId, long activeMemoryBudget, int frameSize,
- INCServiceContext serviceCtx) throws HyracksDataException {
+ INCServiceContext serviceCtx) throws HyracksDataException {
this.executor = executor;
this.nodeId = nodeId;
this.activeFramePool = new ConcurrentFramePool(nodeId, activeMemoryBudget, frameSize);
@@ -77,6 +79,10 @@ public class ActiveManager {
runtimes.remove(id);
}
+ public Set<ActiveRuntimeId> getRuntimeIds() {
+ return Collections.unmodifiableSet(runtimes.keySet());
+ }
+
public IActiveRuntime getRuntime(ActiveRuntimeId runtimeId) {
return runtimes.get(runtimeId);
}
@@ -93,14 +99,14 @@ public class ActiveManager {
stopRuntime(message);
break;
case REQUEST_STATS:
- requestStats((StatsRequestMessage) message);
+ requestStats((ActiveStatsRequestMessage) message);
break;
default:
LOGGER.warning("Unknown message type received: " + message.getKind());
}
}
- private void requestStats(StatsRequestMessage message) throws HyracksDataException {
+ private void requestStats(ActiveStatsRequestMessage message) throws HyracksDataException {
try {
ActiveRuntimeId runtimeId = (ActiveRuntimeId) message.getPayload();
IActiveRuntime runtime = runtimes.get(runtimeId);
@@ -111,9 +117,9 @@ public class ActiveManager {
((NodeControllerService) serviceCtx.getControllerService())
.sendApplicationMessageToCC(
JavaSerializationUtils
- .serialize(new ActiveStatsResponse(reqId, null, new RuntimeDataException(
- ErrorCode.ACTIVE_MANAGER_INVALID_RUNTIME, runtimeId.toString()))),
- null);
+ .serialize(new ActiveStatsResponse(reqId, null,
+ new RuntimeDataException(ErrorCode.ACTIVE_MANAGER_INVALID_RUNTIME,
+ runtimeId.toString()))), null);
return;
}
String stats = runtime.getStats();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9ee6f2d7/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java
deleted file mode 100644
index 18368ae..0000000
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.active;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class ActiveRuntimeManager {
-
- private static final Logger LOGGER = Logger.getLogger(ActiveRuntimeManager.class.getName());
- private final Map<ActiveRuntimeId, ActiveSourceOperatorNodePushable> activeRuntimes;
-
- private final ExecutorService executorService;
-
- public ActiveRuntimeManager() {
- this.activeRuntimes = new ConcurrentHashMap<>();
- this.executorService = Executors.newCachedThreadPool();
- }
-
- public void close() throws IOException {
- if (executorService != null) {
- executorService.shutdown();
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Shut down executor service for :" + ActiveRuntimeManager.class.getSimpleName());
- }
- try {
- executorService.awaitTermination(10L, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOGGER.log(Level.SEVERE, ActiveRuntimeManager.class.getSimpleName()
- + " was interrupted while waiting for runtime managers to shutdown", e);
- }
- if (!executorService.isTerminated()) {
- LOGGER.severe(ActiveRuntimeManager.class.getSimpleName()
- + " failed to shutdown successfully. Will be forced to shutdown");
- executorService.shutdownNow();
- }
- }
- }
-
- public ActiveSourceOperatorNodePushable getRuntime(ActiveRuntimeId runtimeId) {
- return activeRuntimes.get(runtimeId);
- }
-
- public void registerRuntime(ActiveRuntimeId runtimeId, ActiveSourceOperatorNodePushable feedRuntime)
- throws HyracksDataException {
- if (activeRuntimes.containsKey(runtimeId)) {
- throw new RuntimeDataException(ErrorCode.ACTIVE_RUNTIME_IS_ALREADY_REGISTERED, runtimeId);
- }
- activeRuntimes.put(runtimeId, feedRuntime);
- }
-
- public void deregisterRuntime(ActiveRuntimeId runtimeId) throws HyracksDataException {
- if (!activeRuntimes.containsKey(runtimeId)) {
- throw new RuntimeDataException(ErrorCode.ACTIVE_RUNTIME_IS_NOT_REGISTERED, runtimeId);
- }
- activeRuntimes.remove(runtimeId);
- }
-
- public ExecutorService getExecutorService() {
- return executorService;
- }
-
- public Set<ActiveRuntimeId> getFeedRuntimes() {
- return activeRuntimes.keySet();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9ee6f2d7/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java
new file mode 100644
index 0000000..0dbba52
--- /dev/null
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active.message;
+
+import java.io.Serializable;
+
+public class ActiveStatsRequestMessage extends ActiveManagerMessage {
+ private static final long serialVersionUID = 1L;
+ private final long reqId;
+
+ public ActiveStatsRequestMessage(Serializable payload, long reqId) {
+ super(Kind.REQUEST_STATS, payload);
+ this.reqId = reqId;
+ }
+
+ public long getReqId() {
+ return reqId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9ee6f2d7/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java
deleted file mode 100644
index d43f00e..0000000
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.active.message;
-
-import java.io.Serializable;
-
-public class StatsRequestMessage extends ActiveManagerMessage {
- private static final long serialVersionUID = 1L;
- private final long reqId;
-
- public StatsRequestMessage(Serializable payload, long reqId) {
- super(Kind.REQUEST_STATS, payload);
- this.reqId = reqId;
- }
-
- public long getReqId() {
- return reqId;
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9ee6f2d7/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index ebc2db5..254b6f0 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -22,6 +22,7 @@ package org.apache.asterix.api.http.server;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.logging.Level;
@@ -132,8 +133,8 @@ public class NCQueryServiceServlet extends QueryServiceServlet {
CancelQueryRequest cancelQueryMessage =
new CancelQueryRequest(nodeId, cancelQueryFuture.getFutureId(), clientContextID);
messageBroker.sendMessageToCC(cancelQueryMessage);
- cancelQueryFuture.get(ExecuteStatementRequestMessage.DEFAULT_QUERY_CANCELLATION_TIMEOUT_MILLIS,
- java.util.concurrent.TimeUnit.MILLISECONDS);
+ cancelQueryFuture.get(ExecuteStatementRequestMessage.DEFAULT_QUERY_CANCELLATION_WAIT_MILLIS,
+ TimeUnit.MILLISECONDS);
} catch (Exception e) {
exception.addSuppressed(e);
} finally {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9ee6f2d7/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 77f2b23..73d0840 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -38,7 +38,7 @@ import org.apache.asterix.active.IRetryPolicyFactory;
import org.apache.asterix.active.NoRetryPolicyFactory;
import org.apache.asterix.active.message.ActivePartitionMessage;
import org.apache.asterix.active.message.ActivePartitionMessage.Event;
-import org.apache.asterix.active.message.StatsRequestMessage;
+import org.apache.asterix.active.message.ActiveStatsRequestMessage;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.ErrorCode;
@@ -289,7 +289,7 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl
List<INcAddressedMessage> requests = new ArrayList<>();
List<String> ncs = Arrays.asList(locations.getLocations());
for (int i = 0; i < ncs.size(); i++) {
- requests.add(new StatsRequestMessage(new ActiveRuntimeId(entityId, runtimeName, i), reqId));
+ requests.add(new ActiveStatsRequestMessage(new ActiveRuntimeId(entityId, runtimeName, i), reqId));
}
try {
List<String> responses = (List<String>) messageBroker.sendSyncRequestToNCs(reqId, ncs, requests, timeout);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9ee6f2d7/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
index ce57a0c..28e55a6 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -62,7 +62,7 @@ public final class ExecuteStatementRequestMessage implements ICcAddressedMessage
//TODO: Make configurable: https://issues.apache.org/jira/browse/ASTERIXDB-2062
public static final long DEFAULT_NC_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(5);
//TODO: Make configurable: https://issues.apache.org/jira/browse/ASTERIXDB-2063
- public static final long DEFAULT_QUERY_CANCELLATION_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(1);
+ public static final long DEFAULT_QUERY_CANCELLATION_WAIT_MILLIS = TimeUnit.MINUTES.toMillis(0);
private final String requestNodeId;
private final long requestMessageId;
private final ILangExtension.Language lang;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9ee6f2d7/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
index c5b9d11..03f42f5 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/LangExecutionUtil.java
@@ -19,7 +19,7 @@
package org.apache.asterix.test.runtime;
-import static org.apache.hyracks.control.common.utils.ThreadDumpHelper.takeDumpJSONString;
+import static org.apache.hyracks.util.ThreadDumpUtil.takeDumpJSONString;
import java.io.BufferedReader;
import java.io.File;
@@ -39,7 +39,7 @@ import org.apache.asterix.testframework.context.TestCaseContext;
import org.apache.commons.lang.SystemUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hyracks.api.io.IODeviceHandle;
-import org.apache.hyracks.control.common.utils.ThreadDumpHelper;
+import org.apache.hyracks.util.ThreadDumpUtil;
import org.apache.hyracks.control.nc.NodeControllerService;
/**
@@ -190,7 +190,7 @@ public class LangExecutionUtil {
}
private static void checkThreadLeaks() throws IOException {
- String threadDump = ThreadDumpHelper.takeDumpJSONString(ManagementFactory.getThreadMXBean());
+ String threadDump = ThreadDumpUtil.takeDumpJSONString();
// Currently we only do sanity check for threads used in the execution engine.
// Later we should check if there are leaked storage threads as well.
if (threadDump.contains("Operator") || threadDump.contains("SuperActivity")
@@ -215,7 +215,7 @@ public class LangExecutionUtil {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
int runFileCount = Integer.parseInt(reader.readLine().trim());
if (runFileCount != 0) {
- System.out.print(takeDumpJSONString(ManagementFactory.getThreadMXBean()));
+ System.out.print(takeDumpJSONString());
outputLeakedOpenFiles(processId);
throw new AssertionError("There are " + runFileCount + " leaked run files.");
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9ee6f2d7/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index fd5dba2..b1a5ff2 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -63,7 +63,7 @@
27 = Operation not supported
28 = Invalid duration %1$s
29 = Unknown duration unit %1$s
-30 = Query timed out
+30 = Query timed out and will be cancelled
100 = Unable to instantiate class %1$s
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9ee6f2d7/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index c85e236..824f51a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -36,6 +36,9 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowController {
+ public static final String INCOMING_RECORDS_COUNT_FIELD_NAME = "incoming-records-count";
+ public static final String FAILED_AT_PARSER_RECORDS_COUNT_FIELD_NAME = "failed-at-parser-records-count";
+
public enum State {
CREATED,
STARTED,
@@ -278,7 +281,7 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl
@Override
public String getStats() {
- return "{\"incoming-records-count\": " + incomingRecordsCount + ", \"failed-at-parser-records-count\": "
- + failedRecordsCount + "}";
+ return "{\"" + INCOMING_RECORDS_COUNT_FIELD_NAME + "\": " + incomingRecordsCount + ", \"" +
+ FAILED_AT_PARSER_RECORDS_COUNT_FIELD_NAME + "\": " + failedRecordsCount + "}";
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9ee6f2d7/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java
index b5388c2..407f9cd 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetThreadDumpWork.java
@@ -19,7 +19,6 @@
package org.apache.hyracks.control.cc.work;
-import java.lang.management.ManagementFactory;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -29,7 +28,7 @@ import java.util.logging.Logger;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.NodeControllerState;
import org.apache.hyracks.control.cc.cluster.INodeManager;
-import org.apache.hyracks.control.common.utils.ThreadDumpHelper;
+import org.apache.hyracks.util.ThreadDumpUtil;
import org.apache.hyracks.control.common.work.AbstractWork;
import org.apache.hyracks.control.common.work.IResultCallback;
@@ -55,7 +54,7 @@ public class GetThreadDumpWork extends AbstractWork {
if (nodeId == null) {
// null nodeId means the request is for the cluster controller
try {
- callback.setValue(ThreadDumpHelper.takeDumpJSONString(ManagementFactory.getThreadMXBean()));
+ callback.setValue(ThreadDumpUtil.takeDumpJSONString());
} catch (Exception e) {
LOGGER.log(Level.WARNING, "Exception taking CC thread dump", e);
callback.setException(e);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9ee6f2d7/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ThreadDumpHelper.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ThreadDumpHelper.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ThreadDumpHelper.java
deleted file mode 100644
index 62c6586..0000000
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ThreadDumpHelper.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.common.utils;
-
-import java.io.IOException;
-import java.lang.management.ThreadInfo;
-import java.lang.management.ThreadMXBean;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-public class ThreadDumpHelper {
- private static final ObjectMapper om = new ObjectMapper();
-
- private ThreadDumpHelper() {
- om.enable(SerializationFeature.INDENT_OUTPUT);
- }
-
- public static String takeDumpJSONString(ThreadMXBean threadMXBean) throws IOException {
- ObjectNode json = takeDumpJSON(threadMXBean);
- return om.writerWithDefaultPrettyPrinter().writeValueAsString(json);
- }
-
- public static ObjectNode takeDumpJSON(ThreadMXBean threadMXBean) {
- ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(true, true);
- List<Map<String, Object>> threads = new ArrayList<>();
-
- for (ThreadInfo thread : threadInfos) {
- Map<String, Object> threadMap = new HashMap<>();
- threadMap.put("name", thread.getThreadName());
- threadMap.put("id", thread.getThreadId());
- threadMap.put("state", thread.getThreadState().name());
- List<String> stacktrace = new ArrayList<>();
- for (StackTraceElement element : thread.getStackTrace()) {
- stacktrace.add(element.toString());
- }
- threadMap.put("stack", stacktrace);
-
- if (thread.getLockName() != null) {
- threadMap.put("lock_name", thread.getLockName());
- }
- if (thread.getLockOwnerId() != -1) {
- threadMap.put("lock_owner_id", thread.getLockOwnerId());
- }
- if (thread.getBlockedTime() > 0) {
- threadMap.put("blocked_time", thread.getBlockedTime());
- }
- if (thread.getBlockedCount() > 0) {
- threadMap.put("blocked_count", thread.getBlockedCount());
- }
- if (thread.getLockedMonitors().length > 0) {
- threadMap.put("locked_monitors", thread.getLockedMonitors());
- }
- if (thread.getLockedSynchronizers().length > 0) {
- threadMap.put("locked_synchronizers", thread.getLockedSynchronizers());
- }
- threads.add(threadMap);
- }
- ObjectNode json = om.createObjectNode();
- json.put("date", String.valueOf(new Date()));
- json.putPOJO("threads", threads);
-
- long[] deadlockedThreads = threadMXBean.findDeadlockedThreads();
- long[] monitorDeadlockedThreads = threadMXBean.findMonitorDeadlockedThreads();
- if (deadlockedThreads != null && deadlockedThreads.length > 0) {
- json.putPOJO("deadlocked_thread_ids", deadlockedThreads);
- }
- if (monitorDeadlockedThreads != null && monitorDeadlockedThreads.length > 0) {
- json.putPOJO("monitor_deadlocked_thread_ids", monitorDeadlockedThreads);
- }
- return json;
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9ee6f2d7/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index ed5598b..a426d47 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -210,7 +210,7 @@ public class NodeControllerService implements IControllerService {
osMXBean = ManagementFactory.getOperatingSystemMXBean();
getNodeControllerInfosAcceptor = new MutableObject<>();
memoryManager = new MemoryManager((long) (memoryMXBean.getHeapMemoryUsage().getMax() * MEMORY_FUDGE_FACTOR));
- ioCounter = new IOCounterFactory().getIOCounter();
+ ioCounter = IOCounterFactory.INSTANCE.getIOCounter();
}
public IOManager getIoManager() {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9ee6f2d7/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterFactory.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterFactory.java
index 1b7cf8f..2301ae6 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/profiling/IOCounterFactory.java
@@ -21,6 +21,11 @@ package org.apache.hyracks.control.nc.io.profiling;
public class IOCounterFactory {
+ public static final IOCounterFactory INSTANCE = new IOCounterFactory();
+
+ private IOCounterFactory() {
+ }
+
/**
* Get the IOCounter for the specific underlying OS
*
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9ee6f2d7/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java
index abde87f..e23aaaa 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/ThreadDumpTask.java
@@ -21,7 +21,7 @@ package org.apache.hyracks.control.nc.task;
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.hyracks.control.common.utils.ThreadDumpHelper;
+import org.apache.hyracks.util.ThreadDumpUtil;
import org.apache.hyracks.control.nc.NodeControllerService;
public class ThreadDumpTask implements Runnable {
@@ -38,7 +38,7 @@ public class ThreadDumpTask implements Runnable {
public void run() {
String result;
try {
- result = ThreadDumpHelper.takeDumpJSONString(ncs.getThreadMXBean());
+ result = ThreadDumpUtil.takeDumpJSONString();
} catch (Exception e) {
LOGGER.log(Level.WARNING, "Exception taking thread dump", e);
result = null;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9ee6f2d7/hyracks-fullstack/hyracks/hyracks-http/pom.xml
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-http/pom.xml b/hyracks-fullstack/hyracks/hyracks-http/pom.xml
index ed0e8c8..09bf513 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-http/pom.xml
@@ -66,5 +66,10 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-util</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9ee6f2d7/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
index 44d4dfe..645bc01 100644
--- a/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
+++ b/hyracks-fullstack/hyracks/hyracks-http/src/main/java/org/apache/hyracks/http/server/HttpServer.java
@@ -31,6 +31,7 @@ import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.hyracks.http.api.IServlet;
+import org.apache.hyracks.util.ThreadDumpUtil;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
@@ -218,11 +219,22 @@ public class HttpServer {
}
protected void doStop() throws InterruptedException {
+ // stop taking new requests
executor.shutdown();
try {
- executor.awaitTermination(1, TimeUnit.MINUTES);
+ // wait 5s before interrupting existing requests
+ executor.awaitTermination(5, TimeUnit.SECONDS);
+ // interrupt
+ executor.shutdownNow();
+ // wait 30s for interrupted requests to unwind
+ executor.awaitTermination(30, TimeUnit.SECONDS);
if (!executor.isTerminated()) {
- LOGGER.log(Level.SEVERE, "Failed to shutdown http server executor");
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.log(Level.SEVERE, "Failed to shutdown http server executor; thread dump: " +
+ ThreadDumpUtil.takeDumpString());
+ } else {
+ LOGGER.log(Level.SEVERE, "Failed to shutdown http server executor");
+ }
}
} catch (Exception e) {
LOGGER.log(Level.SEVERE, "Error while shutting down http server executor", e);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/9ee6f2d7/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThreadDumpUtil.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThreadDumpUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThreadDumpUtil.java
new file mode 100644
index 0000000..ec1a0b2
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThreadDumpUtil.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+public class ThreadDumpUtil {
+ private static final ObjectMapper om = new ObjectMapper();
+ private static final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
+
+ private ThreadDumpUtil() {
+ om.enable(SerializationFeature.INDENT_OUTPUT);
+ }
+
+ public static String takeDumpJSONString() throws IOException {
+ ObjectNode json = takeDumpJSON();
+ return om.writerWithDefaultPrettyPrinter().writeValueAsString(json);
+ }
+
+ public static ObjectNode takeDumpJSON() {
+ ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(true, true);
+ List<Map<String, Object>> threads = new ArrayList<>();
+
+ for (ThreadInfo thread : threadInfos) {
+ Map<String, Object> threadMap = new HashMap<>();
+ threadMap.put("name", thread.getThreadName());
+ threadMap.put("id", thread.getThreadId());
+ threadMap.put("state", thread.getThreadState().name());
+ List<String> stacktrace = new ArrayList<>();
+ for (StackTraceElement element : thread.getStackTrace()) {
+ stacktrace.add(element.toString());
+ }
+ threadMap.put("stack", stacktrace);
+
+ if (thread.getLockName() != null) {
+ threadMap.put("lock_name", thread.getLockName());
+ }
+ if (thread.getLockOwnerId() != -1) {
+ threadMap.put("lock_owner_id", thread.getLockOwnerId());
+ }
+ if (thread.getBlockedTime() > 0) {
+ threadMap.put("blocked_time", thread.getBlockedTime());
+ }
+ if (thread.getBlockedCount() > 0) {
+ threadMap.put("blocked_count", thread.getBlockedCount());
+ }
+ if (thread.getLockedMonitors().length > 0) {
+ threadMap.put("locked_monitors", thread.getLockedMonitors());
+ }
+ if (thread.getLockedSynchronizers().length > 0) {
+ threadMap.put("locked_synchronizers", thread.getLockedSynchronizers());
+ }
+ threads.add(threadMap);
+ }
+ ObjectNode json = om.createObjectNode();
+ json.put("date", String.valueOf(new Date()));
+ json.putPOJO("threads", threads);
+
+ long[] deadlockedThreads = threadMXBean.findDeadlockedThreads();
+ long[] monitorDeadlockedThreads = threadMXBean.findMonitorDeadlockedThreads();
+ if (deadlockedThreads != null && deadlockedThreads.length > 0) {
+ json.putPOJO("deadlocked_thread_ids", deadlockedThreads);
+ }
+ if (monitorDeadlockedThreads != null && monitorDeadlockedThreads.length > 0) {
+ json.putPOJO("monitor_deadlocked_thread_ids", monitorDeadlockedThreads);
+ }
+ return json;
+ }
+
+ public static String takeDumpString() {
+ StringBuilder buf = new StringBuilder(2048);
+ Stream.of(threadMXBean.dumpAllThreads(true, true)).forEach(buf::append);
+ return buf.toString();
+ }
+}