You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/09/13 18:26:13 UTC

apex-core git commit: Fixing cleanup during shutdown to avoid intermittent errors

Repository: apex-core
Updated Branches:
  refs/heads/master 989536773 -> 0670eb3e0


Fixing cleanup during shutdown to avoid intermittent errors


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/0670eb3e
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/0670eb3e
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/0670eb3e

Branch: refs/heads/master
Commit: 0670eb3e0be01097123263406692238e974f142a
Parents: 9895367
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Fri Feb 26 16:01:20 2016 -0800
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Tue Sep 13 10:57:56 2016 -0700

----------------------------------------------------------------------
 .../datatorrent/stram/StramLocalCluster.java    | 23 ++++++++++++++++----
 .../com/datatorrent/stram/PartitioningTest.java |  2 +-
 2 files changed, 20 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/0670eb3e/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
index 23737d0..dd28304 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
@@ -24,6 +24,7 @@ import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
@@ -236,13 +237,15 @@ public class StramLocalCluster implements Runnable, Controller
   /**
    * Starts the child "container" as thread.
    */
-  private class LocalStramChildLauncher implements Runnable
+  private class LocalStreamingContainerLauncher implements Runnable
   {
     final String containerId;
     final LocalStreamingContainer child;
 
+    Thread launchThread;
+
     @SuppressWarnings("CallToThreadStartDuringObjectConstruction")
-    private LocalStramChildLauncher(ContainerStartRequest cdr)
+    private LocalStreamingContainerLauncher(ContainerStartRequest cdr)
     {
       this.containerId = "container-" + containerSeq++;
       WindowGenerator wingen = null;
@@ -253,7 +256,7 @@ public class StramLocalCluster implements Runnable, Controller
       ContainerResource cr = new ContainerResource(cdr.container.getResourceRequestPriority(), containerId, "localhost", cdr.container.getRequiredMemoryMB(), cdr.container.getRequiredVCores(), null);
       StreamingContainerAgent sca = dnmgr.assignContainer(cr, perContainerBufferServer ? null : bufferServerAddress);
       if (sca != null) {
-        Thread launchThread = new Thread(this, containerId);
+        launchThread = new Thread(this, containerId);
         launchThread.start();
         childContainers.put(containerId, child);
         LOG.info("Started container {}", containerId);
@@ -437,6 +440,7 @@ public class StramLocalCluster implements Runnable, Controller
   public void run(long runMillis)
   {
     long endMillis = System.currentTimeMillis() + runMillis;
+    List<Thread> containerThreads = new LinkedList<Thread>();
 
     while (!appDone) {
 
@@ -458,7 +462,10 @@ public class StramLocalCluster implements Runnable, Controller
       while (!dnmgr.containerStartRequests.isEmpty()) {
         ContainerStartRequest cdr = dnmgr.containerStartRequests.poll();
         if (cdr != null) {
-          new LocalStramChildLauncher(cdr);
+          LocalStreamingContainerLauncher launcher = new LocalStreamingContainerLauncher(cdr);
+          if (launcher.launchThread != null) {
+            containerThreads.add(launcher.launchThread);
+          }
         }
       }
 
@@ -502,6 +509,14 @@ public class StramLocalCluster implements Runnable, Controller
       lsc.triggerHeartbeat();
     }
 
+    for (Thread thread: containerThreads) {
+      try {
+        thread.join(1000);
+      } catch (InterruptedException e) {
+        LOG.warn("Container thread didn't finish {}", thread.getName());
+      }
+    }
+
     dnmgr.teardown();
 
     LOG.info("Application finished.");

http://git-wip-us.apache.org/repos/asf/apex-core/blob/0670eb3e/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java b/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
index 4f8becd..9eef586 100644
--- a/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
@@ -260,7 +260,7 @@ public class PartitioningTest
 
     };
     StramTestSupport.awaitCompletion(c, 10000);
-    Assert.assertTrue("Number partitions " + ow, c.isComplete());
+    Assert.assertTrue("Number partitions match " + ow, c.isComplete());
     return lc.getPlanOperators(ow);
   }