You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/09/01 01:50:09 UTC
[1/2] apex-core git commit: APEXCORE-516 - StramLocalCluster should
always use loopback address for buffer server location
Repository: apex-core
Updated Branches:
refs/heads/master 65a721fb7 -> ae0ec2464
APEXCORE-516 - StramLocalCluster should always use loopback address for buffer server location
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/6fa7b91e
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/6fa7b91e
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/6fa7b91e
Branch: refs/heads/master
Commit: 6fa7b91ee2b1fdfc79ef36c9345485089583f371
Parents: 9c48c41
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Mon Aug 29 10:56:00 2016 -0700
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Mon Aug 29 10:56:00 2016 -0700
----------------------------------------------------------------------
.../datatorrent/stram/StramLocalCluster.java | 23 +++++++++++---------
.../stram/engine/StreamingContainer.java | 10 ++++-----
2 files changed, 17 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-core/blob/6fa7b91e/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
index b3cf63f..23737d0 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
@@ -23,7 +23,6 @@ import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.net.SocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -37,7 +36,6 @@ import org.apache.commons.lang3.SerializationUtils;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.ProtocolSignature;
-import org.apache.hadoop.net.NetUtils;
import com.datatorrent.api.DAG;
import com.datatorrent.api.LocalMode.Controller;
@@ -69,6 +67,8 @@ public class StramLocalCluster implements Runnable, Controller
private static final Logger LOG = LoggerFactory.getLogger(StramLocalCluster.class);
// assumes execution as unit test
private static File CLUSTER_WORK_DIR = new File("target", StramLocalCluster.class.getName());
+ private static final String LOCALHOST_PROPERTY_KEY = "org.apache.apex.stram.StramLocalCluster.hostname";
+ private static final String LOCALHOST = System.getProperty(LOCALHOST_PROPERTY_KEY, "localhost");
protected final StreamingContainerManager dnmgr;
private final UmbilicalProtocolLocalImpl umbilical;
private InetSocketAddress bufferServerAddress;
@@ -169,19 +169,23 @@ public class StramLocalCluster implements Runnable, Controller
this.windowGenerator = winGen;
}
- public static void run(StreamingContainer stramChild, StreamingContainerContext ctx) throws Exception
+ public void run(StreamingContainerContext ctx) throws Exception
{
LOG.debug("Got context: " + ctx);
- stramChild.setup(ctx);
+ setup(ctx);
+ if (bufferServerAddress != null && !bufferServerAddress.getAddress().isLoopbackAddress()) {
+ bufferServerAddress = InetSocketAddress.createUnresolved(LOCALHOST, bufferServerAddress.getPort());
+ }
+
boolean hasError = true;
try {
// main thread enters heartbeat loop
- stramChild.heartbeatLoop();
+ heartbeatLoop();
hasError = false;
} finally {
// teardown
try {
- stramChild.teardown();
+ teardown();
} catch (Exception e) {
if (!hasError) {
throw e;
@@ -247,7 +251,7 @@ public class StramLocalCluster implements Runnable, Controller
}
this.child = new LocalStreamingContainer(containerId, umbilical, wingen);
ContainerResource cr = new ContainerResource(cdr.container.getResourceRequestPriority(), containerId, "localhost", cdr.container.getRequiredMemoryMB(), cdr.container.getRequiredVCores(), null);
- StreamingContainerAgent sca = dnmgr.assignContainer(cr, perContainerBufferServer ? null : NetUtils.getConnectAddress(bufferServerAddress));
+ StreamingContainerAgent sca = dnmgr.assignContainer(cr, perContainerBufferServer ? null : bufferServerAddress);
if (sca != null) {
Thread launchThread = new Thread(this, containerId);
launchThread.start();
@@ -261,7 +265,7 @@ public class StramLocalCluster implements Runnable, Controller
{
try {
StreamingContainerContext ctx = umbilical.getInitContext(containerId);
- LocalStreamingContainer.run(child, ctx);
+ child.run(ctx);
} catch (Exception e) {
LOG.error("Container {} failed", containerId, e);
throw new RuntimeException(e);
@@ -303,8 +307,7 @@ public class StramLocalCluster implements Runnable, Controller
StreamingContainer.eventloop.start();
bufferServer = new Server(0, 1024 * 1024,8);
bufferServer.setSpoolStorage(new DiskStorage());
- SocketAddress bindAddr = bufferServer.run(StreamingContainer.eventloop);
- this.bufferServerAddress = ((InetSocketAddress)bindAddr);
+ bufferServerAddress = InetSocketAddress.createUnresolved(LOCALHOST, bufferServer.run(StreamingContainer.eventloop).getPort());
LOG.info("Buffer server started: {}", bufferServerAddress);
}
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/6fa7b91e/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
index 756fec2..0c74b27 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
@@ -24,7 +24,6 @@ import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayList;
@@ -155,8 +154,8 @@ public class StreamingContainer extends YarnContainerMain
*/
private long firstWindowMillis;
private int windowWidthMillis;
- private InetSocketAddress bufferServerAddress;
- private com.datatorrent.bufferserver.server.Server bufferServer;
+ protected InetSocketAddress bufferServerAddress;
+ protected com.datatorrent.bufferserver.server.Server bufferServer;
private int checkpointWindowCount;
private boolean fastPublisherSubscriber;
private StreamingContainerContext containerContext;
@@ -230,9 +229,8 @@ public class StreamingContainer extends YarnContainerMain
if (ctx.getValue(Context.DAGContext.BUFFER_SPOOLING)) {
bufferServer.setSpoolStorage(new DiskStorage());
}
- SocketAddress bindAddr = bufferServer.run(eventloop);
- logger.debug("Buffer server started: {}", bindAddr);
- this.bufferServerAddress = NetUtils.getConnectAddress(((InetSocketAddress)bindAddr));
+ bufferServerAddress = NetUtils.getConnectAddress(bufferServer.run(eventloop));
+ logger.debug("Buffer server started: {}", bufferServerAddress);
}
} catch (IOException ex) {
logger.warn("deploy request failed due to {}", ex);
[2/2] apex-core git commit: Merge branch 'APEXCORE-516' of
https://github.com/vrozov/apex-core
Posted by th...@apache.org.
Merge branch 'APEXCORE-516' of https://github.com/vrozov/apex-core
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/ae0ec246
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/ae0ec246
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/ae0ec246
Branch: refs/heads/master
Commit: ae0ec246467266083ad7e4bc499b70458a36dc0f
Parents: 65a721f 6fa7b91
Author: Thomas Weise <th...@datatorrent.com>
Authored: Wed Aug 31 16:03:08 2016 -0700
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Wed Aug 31 16:03:08 2016 -0700
----------------------------------------------------------------------
.../datatorrent/stram/StramLocalCluster.java | 23 +++++++++++---------
.../stram/engine/StreamingContainer.java | 10 ++++-----
2 files changed, 17 insertions(+), 16 deletions(-)
----------------------------------------------------------------------