You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/09/13 18:26:13 UTC
apex-core git commit: Fixing cleanup during shutdown to avoid
intermittent errors
Repository: apex-core
Updated Branches:
refs/heads/master 989536773 -> 0670eb3e0
Fixing cleanup during shutdown to avoid intermittent errors
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/0670eb3e
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/0670eb3e
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/0670eb3e
Branch: refs/heads/master
Commit: 0670eb3e0be01097123263406692238e974f142a
Parents: 9895367
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Fri Feb 26 16:01:20 2016 -0800
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Tue Sep 13 10:57:56 2016 -0700
----------------------------------------------------------------------
.../datatorrent/stram/StramLocalCluster.java | 23 ++++++++++++++++----
.../com/datatorrent/stram/PartitioningTest.java | 2 +-
2 files changed, 20 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-core/blob/0670eb3e/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
index 23737d0..dd28304 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
@@ -24,6 +24,7 @@ import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
@@ -236,13 +237,15 @@ public class StramLocalCluster implements Runnable, Controller
/**
* Starts the child "container" as thread.
*/
- private class LocalStramChildLauncher implements Runnable
+ private class LocalStreamingContainerLauncher implements Runnable
{
final String containerId;
final LocalStreamingContainer child;
+ Thread launchThread;
+
@SuppressWarnings("CallToThreadStartDuringObjectConstruction")
- private LocalStramChildLauncher(ContainerStartRequest cdr)
+ private LocalStreamingContainerLauncher(ContainerStartRequest cdr)
{
this.containerId = "container-" + containerSeq++;
WindowGenerator wingen = null;
@@ -253,7 +256,7 @@ public class StramLocalCluster implements Runnable, Controller
ContainerResource cr = new ContainerResource(cdr.container.getResourceRequestPriority(), containerId, "localhost", cdr.container.getRequiredMemoryMB(), cdr.container.getRequiredVCores(), null);
StreamingContainerAgent sca = dnmgr.assignContainer(cr, perContainerBufferServer ? null : bufferServerAddress);
if (sca != null) {
- Thread launchThread = new Thread(this, containerId);
+ launchThread = new Thread(this, containerId);
launchThread.start();
childContainers.put(containerId, child);
LOG.info("Started container {}", containerId);
@@ -437,6 +440,7 @@ public class StramLocalCluster implements Runnable, Controller
public void run(long runMillis)
{
long endMillis = System.currentTimeMillis() + runMillis;
+ List<Thread> containerThreads = new LinkedList<Thread>();
while (!appDone) {
@@ -458,7 +462,10 @@ public class StramLocalCluster implements Runnable, Controller
while (!dnmgr.containerStartRequests.isEmpty()) {
ContainerStartRequest cdr = dnmgr.containerStartRequests.poll();
if (cdr != null) {
- new LocalStramChildLauncher(cdr);
+ LocalStreamingContainerLauncher launcher = new LocalStreamingContainerLauncher(cdr);
+ if (launcher.launchThread != null) {
+ containerThreads.add(launcher.launchThread);
+ }
}
}
@@ -502,6 +509,14 @@ public class StramLocalCluster implements Runnable, Controller
lsc.triggerHeartbeat();
}
+ for (Thread thread: containerThreads) {
+ try {
+ thread.join(1000);
+ } catch (InterruptedException e) {
+ LOG.warn("Container thread didn't finish {}", thread.getName());
+ }
+ }
+
dnmgr.teardown();
LOG.info("Application finished.");
http://git-wip-us.apache.org/repos/asf/apex-core/blob/0670eb3e/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java b/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
index 4f8becd..9eef586 100644
--- a/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java
@@ -260,7 +260,7 @@ public class PartitioningTest
};
StramTestSupport.awaitCompletion(c, 10000);
- Assert.assertTrue("Number partitions " + ow, c.isComplete());
+ Assert.assertTrue("Number partitions match " + ow, c.isComplete());
return lc.getPlanOperators(ow);
}