You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mb...@apache.org on 2016/09/29 15:48:00 UTC

asterixdb git commit: Run Active Shutdown On Separate Thread

Repository: asterixdb
Updated Branches:
  refs/heads/master 3b5c3c621 -> b8a149e78


Run Active Shutdown On Separate Thread

- As feed shutdown can be slow, do it on another thread to not tie up
  worker.
- use nc thread executor for feed adapter thread
- error handling

Change-Id: I8fd9bc454b290420682160364ac78e4b91a9abc3
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1223
Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <ti...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/b8a149e7
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/b8a149e7
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/b8a149e7

Branch: refs/heads/master
Commit: b8a149e78ea3d5d4fd4d5d7d5ec8c3c383737a67
Parents: 3b5c3c6
Author: Michael Blow <mb...@apache.org>
Authored: Wed Sep 28 21:30:37 2016 -0400
Committer: Michael Blow <mb...@apache.org>
Committed: Thu Sep 29 08:47:31 2016 -0700

----------------------------------------------------------------------
 .../apache/asterix/active/ActiveManager.java    | 22 +++++++-----
 .../ActiveSourceOperatorNodePushable.java       | 13 +++----
 .../app/nc/AsterixNCAppRuntimeContext.java      |  2 +-
 .../feed/runtime/AdapterRuntimeManager.java     | 38 +++++++++-----------
 .../FeedIntakeOperatorNodePushable.java         |  2 +-
 5 files changed, 37 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b8a149e7/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
index bd6dae9..b15cfca 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -20,6 +20,7 @@ package org.apache.asterix.active;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 
 import org.apache.asterix.active.message.ActiveManagerMessage;
 import org.apache.asterix.common.memory.ConcurrentFramePool;
@@ -29,11 +30,14 @@ import org.apache.log4j.Logger;
 public class ActiveManager {
 
     private static final Logger LOGGER = Logger.getLogger(ActiveManager.class.getName());
+    private final Executor executor;
     private final Map<ActiveRuntimeId, IActiveRuntime> runtimes;
     private final ConcurrentFramePool activeFramePool;
     private final String nodeId;
 
-    public ActiveManager(String nodeId, long activeMemoryBudget, int frameSize) throws HyracksDataException {
+    public ActiveManager(Executor executor, String nodeId, long activeMemoryBudget, int frameSize)
+            throws HyracksDataException {
+        this.executor = executor;
         this.nodeId = nodeId;
         this.activeFramePool = new ConcurrentFramePool(nodeId, activeMemoryBudget, frameSize);
         this.runtimes = new ConcurrentHashMap<>();
@@ -69,7 +73,7 @@ public class ActiveManager {
                 stopRuntime(message);
                 break;
             default:
-                LOGGER.warn("Unknown message type received");
+                LOGGER.warn("Unknown message type received: " + message.getKind());
         }
     }
 
@@ -79,12 +83,14 @@ public class ActiveManager {
         if (runtime == null) {
             LOGGER.warn("Request to stop a runtime that is not registered " + runtimeId);
         } else {
-            try {
-                runtime.stop();
-            } catch (HyracksDataException | InterruptedException e) {
-                // TODO(till) Figure out a better way to handle failure to stop a runtime
-                LOGGER.warn("Failed to stop runtime: " + runtimeId, e);
-            }
+            executor.execute(() -> {
+                try {
+                    runtime.stop();
+                } catch (Exception e) {
+                    // TODO(till) Figure out a better way to handle failure to stop a runtime
+                    LOGGER.warn("Failed to stop runtime: " + runtimeId, e);
+                }
+            });
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b8a149e7/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
index 1cda298..7f25896 100644
--- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
+++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
@@ -55,15 +55,10 @@ public abstract class ActiveSourceOperatorNodePushable extends AbstractOperatorN
 
     @Override
     public final void stop() throws HyracksDataException, InterruptedException {
-        try {
-            abort();
-        } finally {
-            if (!done) {
-                synchronized (this) {
-                    while (!done) {
-                        wait();
-                    }
-                }
+        abort();
+        synchronized (this) {
+            while (!done) {
+                wait();
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b8a149e7/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java
index 343fdb3..ed081b5 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/AsterixNCAppRuntimeContext.java
@@ -216,7 +216,7 @@ public class AsterixNCAppRuntimeContext implements IAsterixAppRuntimeContext, IA
 
         isShuttingdown = false;
 
-        activeManager = new ActiveManager(ncApplicationContext.getNodeId(),
+        activeManager = new ActiveManager(threadExecutor, ncApplicationContext.getNodeId(),
                 feedProperties.getMemoryComponentGlobalBudget(), compilerProperties.getFrameSize());
 
         if (ClusterProperties.INSTANCE.isReplicationEnabled()) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b8a149e7/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
index 424f2dc..7f5372b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
@@ -18,13 +18,12 @@
  */
 package org.apache.asterix.external.feed.runtime;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Future;
 
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.external.dataset.adapter.FeedAdapter;
 import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.log4j.Logger;
 
 /**
@@ -42,45 +41,42 @@ public class AdapterRuntimeManager {
 
     private final int partition; // The partition number
 
-    private final ExecutorService executorService; // Executor service to run/shutdown the adapter executor
+    private final IHyracksTaskContext ctx;
 
     private IngestionRuntime ingestionRuntime; // Runtime representing the ingestion stage of a feed
 
+    private Future<?> execution;
+
     private volatile boolean done = false;
     private volatile boolean failed = false;
 
-    public AdapterRuntimeManager(EntityId entityId, FeedAdapter feedAdapter, IFrameWriter writer, int partition) {
+    public AdapterRuntimeManager(IHyracksTaskContext ctx, EntityId entityId, FeedAdapter feedAdapter,
+                                 IFrameWriter writer, int partition) {
+        this.ctx = ctx;
         this.feedId = entityId;
         this.feedAdapter = feedAdapter;
         this.partition = partition;
         this.adapterExecutor = new AdapterExecutor(writer, feedAdapter, this);
-        this.executorService = Executors.newSingleThreadExecutor();
     }
 
     public void start() {
-        executorService.execute(adapterExecutor);
+        execution = ctx.getExecutorService().submit(adapterExecutor);
     }
 
     public void stop() throws InterruptedException {
-        boolean stopped = false;
         try {
-            stopped = feedAdapter.stop();
-        } catch (Exception exception) {
-            LOGGER.error("Unable to stop adapter " + feedAdapter, exception);
-        } finally {
-            if (stopped) {
+            if (feedAdapter.stop()) {
                 // stop() returned true, we wait for the process termination
-                executorService.shutdown();
-                try {
-                    executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
-                } catch (InterruptedException e) {
-                    LOGGER.error("Interrupted while waiting for feed adapter to finish its work", e);
-                    throw e;
-                }
+                execution.get();
             } else {
                 // stop() returned false, we try to force shutdown
-                executorService.shutdownNow();
+                execution.cancel(true);
             }
+        } catch (InterruptedException e) {
+            LOGGER.error("Interrupted while waiting for feed adapter to finish its work", e);
+            throw e;
+        } catch (Exception exception) {
+            LOGGER.error("Unable to stop adapter " + feedAdapter, exception);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b8a149e7/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index 7c8fe14..afe87c0 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -74,7 +74,7 @@ public class FeedIntakeOperatorNodePushable extends AbstractUnaryOutputSourceOpe
             // create the distributor
             frameDistributor = new DistributeFeedFrameWriter(feedId, writer, FeedRuntimeType.INTAKE, partition);
             // create adapter runtime manager
-            adapterRuntimeManager = new AdapterRuntimeManager(feedId, adapter, frameDistributor, partition);
+            adapterRuntimeManager = new AdapterRuntimeManager(ctx, feedId, adapter, frameDistributor, partition);
             // create and register the runtime
             ActiveRuntimeId runtimeId = new ActiveRuntimeId(feedId, FeedRuntimeType.INTAKE.toString(), partition);
             ingestionRuntime = new IngestionRuntime(feedId, runtimeId, frameDistributor, adapterRuntimeManager, ctx);