You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/09/01 01:50:09 UTC

[1/2] apex-core git commit: APEXCORE-516 - StramLocalCluster should always use loopback address for buffer server location

Repository: apex-core
Updated Branches:
  refs/heads/master 65a721fb7 -> ae0ec2464


APEXCORE-516 - StramLocalCluster should always use loopback address for buffer server location


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/6fa7b91e
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/6fa7b91e
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/6fa7b91e

Branch: refs/heads/master
Commit: 6fa7b91ee2b1fdfc79ef36c9345485089583f371
Parents: 9c48c41
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Mon Aug 29 10:56:00 2016 -0700
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Mon Aug 29 10:56:00 2016 -0700

----------------------------------------------------------------------
 .../datatorrent/stram/StramLocalCluster.java    | 23 +++++++++++---------
 .../stram/engine/StreamingContainer.java        | 10 ++++-----
 2 files changed, 17 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/6fa7b91e/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
index b3cf63f..23737d0 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
@@ -23,7 +23,6 @@ import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.net.SocketAddress;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -37,7 +36,6 @@ import org.apache.commons.lang3.SerializationUtils;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.ipc.ProtocolSignature;
-import org.apache.hadoop.net.NetUtils;
 
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.LocalMode.Controller;
@@ -69,6 +67,8 @@ public class StramLocalCluster implements Runnable, Controller
   private static final Logger LOG = LoggerFactory.getLogger(StramLocalCluster.class);
   // assumes execution as unit test
   private static File CLUSTER_WORK_DIR = new File("target", StramLocalCluster.class.getName());
+  private static final String LOCALHOST_PROPERTY_KEY = "org.apache.apex.stram.StramLocalCluster.hostname";
+  private static final String LOCALHOST = System.getProperty(LOCALHOST_PROPERTY_KEY, "localhost");
   protected final StreamingContainerManager dnmgr;
   private final UmbilicalProtocolLocalImpl umbilical;
   private InetSocketAddress bufferServerAddress;
@@ -169,19 +169,23 @@ public class StramLocalCluster implements Runnable, Controller
       this.windowGenerator = winGen;
     }
 
-    public static void run(StreamingContainer stramChild, StreamingContainerContext ctx) throws Exception
+    public void run(StreamingContainerContext ctx) throws Exception
     {
       LOG.debug("Got context: " + ctx);
-      stramChild.setup(ctx);
+      setup(ctx);
+      if (bufferServerAddress != null && !bufferServerAddress.getAddress().isLoopbackAddress()) {
+        bufferServerAddress = InetSocketAddress.createUnresolved(LOCALHOST, bufferServerAddress.getPort());
+      }
+
       boolean hasError = true;
       try {
         // main thread enters heartbeat loop
-        stramChild.heartbeatLoop();
+        heartbeatLoop();
         hasError = false;
       } finally {
         // teardown
         try {
-          stramChild.teardown();
+          teardown();
         } catch (Exception e) {
           if (!hasError) {
             throw e;
@@ -247,7 +251,7 @@ public class StramLocalCluster implements Runnable, Controller
       }
       this.child = new LocalStreamingContainer(containerId, umbilical, wingen);
       ContainerResource cr = new ContainerResource(cdr.container.getResourceRequestPriority(), containerId, "localhost", cdr.container.getRequiredMemoryMB(), cdr.container.getRequiredVCores(), null);
-      StreamingContainerAgent sca = dnmgr.assignContainer(cr, perContainerBufferServer ? null : NetUtils.getConnectAddress(bufferServerAddress));
+      StreamingContainerAgent sca = dnmgr.assignContainer(cr, perContainerBufferServer ? null : bufferServerAddress);
       if (sca != null) {
         Thread launchThread = new Thread(this, containerId);
         launchThread.start();
@@ -261,7 +265,7 @@ public class StramLocalCluster implements Runnable, Controller
     {
       try {
         StreamingContainerContext ctx = umbilical.getInitContext(containerId);
-        LocalStreamingContainer.run(child, ctx);
+        child.run(ctx);
       } catch (Exception e) {
         LOG.error("Container {} failed", containerId, e);
         throw new RuntimeException(e);
@@ -303,8 +307,7 @@ public class StramLocalCluster implements Runnable, Controller
       StreamingContainer.eventloop.start();
       bufferServer = new Server(0, 1024 * 1024,8);
       bufferServer.setSpoolStorage(new DiskStorage());
-      SocketAddress bindAddr = bufferServer.run(StreamingContainer.eventloop);
-      this.bufferServerAddress = ((InetSocketAddress)bindAddr);
+      bufferServerAddress = InetSocketAddress.createUnresolved(LOCALHOST, bufferServer.run(StreamingContainer.eventloop).getPort());
       LOG.info("Buffer server started: {}", bufferServerAddress);
     }
   }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/6fa7b91e/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
index 756fec2..0c74b27 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
@@ -24,7 +24,6 @@ import java.lang.management.GarbageCollectorMXBean;
 import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.net.SocketAddress;
 import java.net.UnknownHostException;
 import java.util.AbstractMap.SimpleEntry;
 import java.util.ArrayList;
@@ -155,8 +154,8 @@ public class StreamingContainer extends YarnContainerMain
    */
   private long firstWindowMillis;
   private int windowWidthMillis;
-  private InetSocketAddress bufferServerAddress;
-  private com.datatorrent.bufferserver.server.Server bufferServer;
+  protected InetSocketAddress bufferServerAddress;
+  protected com.datatorrent.bufferserver.server.Server bufferServer;
   private int checkpointWindowCount;
   private boolean fastPublisherSubscriber;
   private StreamingContainerContext containerContext;
@@ -230,9 +229,8 @@ public class StreamingContainer extends YarnContainerMain
         if (ctx.getValue(Context.DAGContext.BUFFER_SPOOLING)) {
           bufferServer.setSpoolStorage(new DiskStorage());
         }
-        SocketAddress bindAddr = bufferServer.run(eventloop);
-        logger.debug("Buffer server started: {}", bindAddr);
-        this.bufferServerAddress = NetUtils.getConnectAddress(((InetSocketAddress)bindAddr));
+        bufferServerAddress = NetUtils.getConnectAddress(bufferServer.run(eventloop));
+        logger.debug("Buffer server started: {}", bufferServerAddress);
       }
     } catch (IOException ex) {
       logger.warn("deploy request failed due to {}", ex);


[2/2] apex-core git commit: Merge branch 'APEXCORE-516' of https://github.com/vrozov/apex-core

Posted by th...@apache.org.
Merge branch 'APEXCORE-516' of https://github.com/vrozov/apex-core


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/ae0ec246
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/ae0ec246
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/ae0ec246

Branch: refs/heads/master
Commit: ae0ec246467266083ad7e4bc499b70458a36dc0f
Parents: 65a721f 6fa7b91
Author: Thomas Weise <th...@datatorrent.com>
Authored: Wed Aug 31 16:03:08 2016 -0700
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Wed Aug 31 16:03:08 2016 -0700

----------------------------------------------------------------------
 .../datatorrent/stram/StramLocalCluster.java    | 23 +++++++++++---------
 .../stram/engine/StreamingContainer.java        | 10 ++++-----
 2 files changed, 17 insertions(+), 16 deletions(-)
----------------------------------------------------------------------