You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/09/15 19:00:56 UTC
[1/2] apex-core git commit: APEXCORE-527 - Minor changes in
LocalStramChildLauncher to help with unit test failures
Repository: apex-core
Updated Branches:
refs/heads/master 0b518dea6 -> 388bb3877
APEXCORE-527 - Minor changes in LocalStramChildLauncher to help with unit test failures
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/1388bca7
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/1388bca7
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/1388bca7
Branch: refs/heads/master
Commit: 1388bca782f88216c3888b7d6c260b89006fdcea
Parents: 0670eb3
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Mon Sep 12 19:00:09 2016 -0700
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Tue Sep 13 13:17:26 2016 -0700
----------------------------------------------------------------------
.../datatorrent/stram/StramLocalCluster.java | 25 ++++++++------------
1 file changed, 10 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-core/blob/1388bca7/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
index dd28304..48ed070 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
@@ -172,7 +172,7 @@ public class StramLocalCluster implements Runnable, Controller
public void run(StreamingContainerContext ctx) throws Exception
{
- LOG.debug("Got context: " + ctx);
+ LOG.debug("container {} context {}", getContainerId(), ctx);
setup(ctx);
if (bufferServerAddress != null && !bufferServerAddress.getAddress().isLoopbackAddress()) {
bufferServerAddress = InetSocketAddress.createUnresolved(LOCALHOST, bufferServerAddress.getPort());
@@ -242,10 +242,8 @@ public class StramLocalCluster implements Runnable, Controller
final String containerId;
final LocalStreamingContainer child;
- Thread launchThread;
-
@SuppressWarnings("CallToThreadStartDuringObjectConstruction")
- private LocalStreamingContainerLauncher(ContainerStartRequest cdr)
+ private LocalStreamingContainerLauncher(ContainerStartRequest cdr, List<Thread> containerThreads)
{
this.containerId = "container-" + containerSeq++;
WindowGenerator wingen = null;
@@ -256,10 +254,10 @@ public class StramLocalCluster implements Runnable, Controller
ContainerResource cr = new ContainerResource(cdr.container.getResourceRequestPriority(), containerId, "localhost", cdr.container.getRequiredMemoryMB(), cdr.container.getRequiredVCores(), null);
StreamingContainerAgent sca = dnmgr.assignContainer(cr, perContainerBufferServer ? null : bufferServerAddress);
if (sca != null) {
- launchThread = new Thread(this, containerId);
- launchThread.start();
childContainers.put(containerId, child);
- LOG.info("Started container {}", containerId);
+ Thread launchThread = new Thread(this, containerId);
+ containerThreads.add(launchThread);
+ launchThread.start();
}
}
@@ -268,10 +266,10 @@ public class StramLocalCluster implements Runnable, Controller
{
try {
StreamingContainerContext ctx = umbilical.getInitContext(containerId);
+ LOG.info("Started container {}", containerId);
child.run(ctx);
- } catch (Exception e) {
- LOG.error("Container {} failed", containerId, e);
- throw new RuntimeException(e);
+ } catch (Exception | Error e) {
+ LOG.error("Fatal {} in container {}", e instanceof Error ? "error" : "exception", containerId, e);
} finally {
childContainers.remove(containerId);
LOG.info("Container {} terminating.", containerId);
@@ -440,7 +438,7 @@ public class StramLocalCluster implements Runnable, Controller
public void run(long runMillis)
{
long endMillis = System.currentTimeMillis() + runMillis;
- List<Thread> containerThreads = new LinkedList<Thread>();
+ List<Thread> containerThreads = new LinkedList<>();
while (!appDone) {
@@ -462,10 +460,7 @@ public class StramLocalCluster implements Runnable, Controller
while (!dnmgr.containerStartRequests.isEmpty()) {
ContainerStartRequest cdr = dnmgr.containerStartRequests.poll();
if (cdr != null) {
- LocalStreamingContainerLauncher launcher = new LocalStreamingContainerLauncher(cdr);
- if (launcher.launchThread != null) {
- containerThreads.add(launcher.launchThread);
- }
+ new LocalStreamingContainerLauncher(cdr, containerThreads);
}
}
[2/2] apex-core git commit: Merge branch 'APEXCORE-527' of
https://github.com/vrozov/apex-core
Posted by th...@apache.org.
Merge branch 'APEXCORE-527' of https://github.com/vrozov/apex-core
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/388bb387
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/388bb387
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/388bb387
Branch: refs/heads/master
Commit: 388bb38771263ceeec120fb1b75ecea7a5d4f95a
Parents: 0b518de 1388bca
Author: Thomas Weise <th...@datatorrent.com>
Authored: Thu Sep 15 11:52:20 2016 -0700
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Thu Sep 15 11:52:20 2016 -0700
----------------------------------------------------------------------
.../datatorrent/stram/StramLocalCluster.java | 25 ++++++++------------
1 file changed, 10 insertions(+), 15 deletions(-)
----------------------------------------------------------------------