You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2016/11/14 17:29:45 UTC

apex-core git commit: APEXCORE-505 Heartbeat loop was blocked waiting for operator activation, reason for this is that Stream activation(Only BufferServerSubscriber and WindowGenerator) waits for operator activation in heartbeat thread. After analysis an

Repository: apex-core
Updated Branches:
  refs/heads/master 2c024cd84 -> fc3246e11


APEXCORE-505 Heartbeat loop was blocked waiting for operator activation, reason for this is that Stream activation(Only BufferServerSubscriber and WindowGenerator) waits for operator activation in heartbeat thread. After analysis and sanity testing, we don't see the need to have the synchronization between operator and stream activation


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/fc3246e1
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/fc3246e1
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/fc3246e1

Branch: refs/heads/master
Commit: fc3246e11afa426eefe1dcaf5b8aface079d7d10
Parents: 2c024cd
Author: Sandesh Hegde <sa...@gmail.com>
Authored: Fri Nov 4 10:53:03 2016 -0700
Committer: Sandesh Hegde <sa...@gmail.com>
Committed: Wed Nov 9 16:00:38 2016 -0800

----------------------------------------------------------------------
 .../stram/engine/StreamingContainer.java        |  35 +----
 .../stram/engine/GenericNodeTest.java           | 132 +++++++++++++++++++
 .../stram/stream/SocketStreamTest.java          | 101 ++++++++++----
 3 files changed, 213 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/fc3246e1/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
index 8e7e0a1..78f3421 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
@@ -38,7 +38,6 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -1361,14 +1360,11 @@ public class StreamingContainer extends YarnContainerMain
   public synchronized void activate(final Map<Integer, OperatorDeployInfo> nodeMap, Map<String, ComponentContextPair<Stream, StreamContext>> newStreams)
   {
     for (ComponentContextPair<Stream, StreamContext> pair : newStreams.values()) {
-      if (!(pair.component instanceof BufferServerSubscriber)) {
-        activeStreams.put(pair.component, pair.context);
-        pair.component.activate(pair.context);
-        eventBus.publish(new StreamActivationEvent(pair));
-      }
+      activeStreams.put(pair.component, pair.context);
+      pair.component.activate(pair.context);
+      eventBus.publish(new StreamActivationEvent(pair));
     }
 
-    final CountDownLatch signal = new CountDownLatch(nodeMap.size());
     for (final OperatorDeployInfo ndi : nodeMap.values()) {
       /*
        * OiO nodes get activated with their primary nodes.
@@ -1408,10 +1404,6 @@ public class StreamingContainer extends YarnContainerMain
 
             currentdi = null;
 
-            for (int i = setOperators.size(); i-- > 0; ) {
-              signal.countDown();
-            }
-
             node.run(); /* this is a blocking call */
           } catch (Error error) {
             int[] operators;
@@ -1448,8 +1440,6 @@ public class StreamingContainer extends YarnContainerMain
                 failedNodes.add(ndi.id);
                 logger.error("Shutdown of operator {} failed due to an exception.", ndi, ex);
               }
-            } else {
-              signal.countDown();
             }
 
             List<Integer> oioNodeIdList = oioGroups.get(ndi.id);
@@ -1463,8 +1453,6 @@ public class StreamingContainer extends YarnContainerMain
                     failedNodes.add(oiodi.id);
                     logger.error("Shutdown of operator {} failed due to an exception.", oiodi, ex);
                   }
-                } else {
-                  signal.countDown();
                 }
               }
             }
@@ -1475,23 +1463,6 @@ public class StreamingContainer extends YarnContainerMain
       thread.start();
     }
 
-    /**
-     * we need to make sure that before any of the operators gets the first message, it's activated.
-     */
-    try {
-      signal.await();
-    } catch (InterruptedException ex) {
-      logger.debug("Activation of operators interrupted.", ex);
-    }
-
-    for (ComponentContextPair<Stream, StreamContext> pair : newStreams.values()) {
-      if (pair.component instanceof BufferServerSubscriber) {
-        activeStreams.put(pair.component, pair.context);
-        pair.component.activate(pair.context);
-        eventBus.publish(new StreamActivationEvent(pair));
-      }
-    }
-
     for (WindowGenerator wg : generators.values()) {
       if (!activeGenerators.containsKey(wg)) {
         activeGenerators.put(wg, generators);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/fc3246e1/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
index 5dfa5f3..af99e98 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
@@ -20,9 +20,12 @@ package com.datatorrent.stram.engine;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.junit.Assert;
@@ -48,13 +51,21 @@ import com.datatorrent.api.Operator.CheckpointNotificationListener;
 import com.datatorrent.api.Operator.ProcessingMode;
 import com.datatorrent.api.Sink;
 import com.datatorrent.api.Stats.OperatorStats;
+import com.datatorrent.api.StreamCodec;
 import com.datatorrent.api.annotation.InputPortFieldAnnotation;
 import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
 import com.datatorrent.bufferserver.packet.MessageType;
+import com.datatorrent.bufferserver.packet.PayloadTuple;
+import com.datatorrent.bufferserver.server.Server;
 import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.common.util.ScheduledExecutorService;
 import com.datatorrent.common.util.ScheduledThreadPoolExecutor;
+import com.datatorrent.netlet.DefaultEventLoop;
+import com.datatorrent.netlet.EventLoop;
 import com.datatorrent.stram.api.Checkpoint;
+import com.datatorrent.stram.codec.DefaultStatefulStreamCodec;
+import com.datatorrent.stram.stream.BufferServerPublisher;
+import com.datatorrent.stram.stream.BufferServerSubscriber;
 import com.datatorrent.stram.tuple.EndStreamTuple;
 import com.datatorrent.stram.tuple.EndWindowTuple;
 import com.datatorrent.stram.tuple.Tuple;
@@ -393,6 +404,127 @@ public class GenericNodeTest
   }
 
   @Test
+  public void testBufferServerSubscriberActivationBeforeOperator() throws InterruptedException, IOException
+  {
+    final String streamName = "streamName";
+    final String upstreamNodeId = "upstreamNodeId";
+    final String  downstreamNodeId = "downStreamNodeId";
+
+    EventLoop eventloop = DefaultEventLoop.createEventLoop("StreamTestEventLoop");
+
+    ((DefaultEventLoop)eventloop).start();
+    final Server bufferServer = new Server(0); // find random port
+    final int bufferServerPort = bufferServer.run(eventloop).getPort();
+
+    final StreamCodec<Object> serde = new DefaultStatefulStreamCodec<Object>();
+    final BlockingQueue<Object> tuples = new ArrayBlockingQueue<Object>(10);
+
+    GenericTestOperator go = new GenericTestOperator();
+    final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, "operator",
+        new DefaultAttributeMap(), null));
+    gn.setId(1);
+
+    Sink<Object> output = new Sink<Object>()
+    {
+      @Override
+      public void put(Object tuple)
+      {
+        tuples.add(tuple);
+      }
+
+      @Override
+      public int getCount(boolean reset)
+      {
+        return 0;
+      }
+    };
+
+    InetSocketAddress socketAddress = new InetSocketAddress("localhost", bufferServerPort);
+
+    StreamContext issContext = new StreamContext(streamName);
+    issContext.setSourceId(upstreamNodeId);
+    issContext.setSinkId(downstreamNodeId);
+    issContext.setFinishedWindowId(-1);
+    issContext.setBufferServerAddress(socketAddress);
+    issContext.put(StreamContext.CODEC, serde);
+    issContext.put(StreamContext.EVENT_LOOP, eventloop);
+
+    StreamContext ossContext = new StreamContext(streamName);
+    ossContext.setSourceId(upstreamNodeId);
+    ossContext.setSinkId(downstreamNodeId);
+    ossContext.setBufferServerAddress(socketAddress);
+    ossContext.put(StreamContext.CODEC, serde);
+    ossContext.put(StreamContext.EVENT_LOOP, eventloop);
+
+    BufferServerPublisher oss = new BufferServerPublisher(upstreamNodeId, 1024);
+    oss.setup(ossContext);
+    oss.activate(ossContext);
+
+    oss.put(new Tuple(MessageType.BEGIN_WINDOW, 0x1L));
+    byte[] buff = PayloadTuple.getSerializedTuple(0, 1);
+    buff[buff.length - 1] = (byte)1;
+    oss.put(buff);
+    oss.put(new EndWindowTuple(0x1L));
+    oss.put(new Tuple(MessageType.BEGIN_WINDOW, 0x2L));
+    buff = PayloadTuple.getSerializedTuple(0, 1);
+    buff[buff.length - 1] = (byte)2;
+    oss.put(buff);
+    oss.put(new EndWindowTuple(0x2L));
+    oss.put(new Tuple(MessageType.BEGIN_WINDOW, 0x3L));
+    buff = PayloadTuple.getSerializedTuple(0, 1);
+    buff[buff.length - 1] = (byte)3;
+    oss.put(buff);
+
+    oss.put(new EndWindowTuple(0x3L));
+    oss.put(new EndStreamTuple(0L));
+
+    BufferServerSubscriber iss = new BufferServerSubscriber(downstreamNodeId, 1024);
+    iss.setup(issContext);
+
+    gn.connectInputPort(GenericTestOperator.IPORT1, iss.acquireReservoir("testReservoir", 10));
+    gn.connectOutputPort(GenericTestOperator.OPORT1, output);
+
+    SweepableReservoir tupleWait = iss.acquireReservoir("testReservoir2", 10);
+
+    iss.activate(issContext);
+
+    while (tupleWait.sweep() == null) {
+      Thread.sleep(100);
+    }
+
+    gn.firstWindowMillis = 0;
+    gn.windowWidthMillis = 100;
+
+    Thread t = new Thread()
+    {
+      @Override
+      public void run()
+      {
+        gn.activate();
+        gn.run();
+        gn.deactivate();
+      }
+    };
+
+    t.start();
+    t.join();
+
+    Assert.assertEquals(10, tuples.size());
+
+    List<Object> list = new ArrayList<>(tuples);
+
+    Assert.assertEquals("Payload Tuple 1", 1, ((byte[])list.get(1))[5]);
+    Assert.assertEquals("Payload Tuple 2", 2, ((byte[])list.get(4))[5]);
+    Assert.assertEquals("Payload Tuple 3", 3, ((byte[])list.get(7))[5]);
+
+    if (bufferServer != null) {
+      eventloop.stop(bufferServer);
+    }
+
+    ((DefaultEventLoop)eventloop).stop();
+  }
+
+  @Test
   public void testPrematureTermination() throws InterruptedException
   {
     long maxSleep = 5000;

http://git-wip-us.apache.org/repos/asf/apex-core/blob/fc3246e1/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java b/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java
index 7db42da..4094f66 100644
--- a/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/stream/SocketStreamTest.java
@@ -22,8 +22,10 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -53,6 +55,18 @@ public class SocketStreamTest
   private static final Logger LOG = LoggerFactory.getLogger(SocketStreamTest.class);
   private static int bufferServerPort = 0;
   private static Server bufferServer = null;
+
+  private static final String streamName = "streamName";
+  private static final String upstreamNodeId = "upstreamNodeId";
+  private static final String  downstreamNodeId = "downStreamNodeId";
+
+  private StreamContext issContext;
+  private StreamContext ossContext;
+  private SweepableReservoir reservoir;
+  private BufferServerSubscriber iss;
+  private BufferServerPublisher oss;
+  private AtomicInteger messageCount;
+
   static EventLoop eventloop;
 
   static {
@@ -91,8 +105,53 @@ public class SocketStreamTest
   @SuppressWarnings({"SleepWhileInLoop"})
   public void testBufferServerStream() throws Exception
   {
+    iss.activate(issContext);
+    LOG.debug("input stream activated");
+
+    oss.activate(ossContext);
+    LOG.debug("output stream activated");
+
+    sendMessage();
+  }
+
+  /**
+   * Test buffer server stream by sending
+   * tuple on outputstream and receive same tuple from inputstream with following changes
+   *
+   * 1. Sink is sweeped befere the BufferServerSubscriber is activated.
+   * 2. BufferServerSubscriber is activated after the messages are sent from BufferServerPublisher
+   *
+   * @throws Exception
+   */
+  @Test
+  @SuppressWarnings({"SleepWhileInLoop"})
+  public void testBufferServerStreamWithLateActivationForSubscriber() throws Exception
+  {
+    for (int i = 0; i < 50; i++) {
+      Tuple t = reservoir.sweep();
+      if (t == null) {
+        sleep(5);
+        continue;
+      }
+
+      throw new Exception("Unexpected control tuple.");
+    }
+
+    oss.activate(ossContext);
+    LOG.debug("output stream activated");
+
+    sendMessage();
+
+    iss.activate(issContext);
+    LOG.debug("input stream activated");
+  }
+
+  @Before
+  public void init()
+  {
     final StreamCodec<Object> serde = new DefaultStatefulStreamCodec<Object>();
-    final AtomicInteger messageCount = new AtomicInteger();
+    messageCount = new AtomicInteger(0);
+
     Sink<Object> sink = new Sink<Object>()
     {
       @Override
@@ -107,14 +166,9 @@ public class SocketStreamTest
       {
         throw new UnsupportedOperationException("Not supported yet.");
       }
-
     };
 
-    String streamName = "streamName";
-    String upstreamNodeId = "upstreamNodeId";
-    String downstreamNodeId = "downStreamNodeId";
-
-    StreamContext issContext = new StreamContext(streamName);
+    issContext = new StreamContext(streamName);
     issContext.setSourceId(upstreamNodeId);
     issContext.setSinkId(downstreamNodeId);
     issContext.setFinishedWindowId(-1);
@@ -122,33 +176,25 @@ public class SocketStreamTest
     issContext.put(StreamContext.CODEC, serde);
     issContext.put(StreamContext.EVENT_LOOP, eventloop);
 
-    BufferServerSubscriber iss = new BufferServerSubscriber(downstreamNodeId, 1024);
+    iss = new BufferServerSubscriber(downstreamNodeId, 1024);
     iss.setup(issContext);
-    SweepableReservoir reservoir = iss.acquireReservoir("testReservoir", 1);
+    reservoir = iss.acquireReservoir("testReservoir", 1);
     reservoir.setSink(sink);
 
-    StreamContext ossContext = new StreamContext(streamName);
+    ossContext = new StreamContext(streamName);
     ossContext.setSourceId(upstreamNodeId);
     ossContext.setSinkId(downstreamNodeId);
     ossContext.setBufferServerAddress(InetSocketAddress.createUnresolved("localhost", bufferServerPort));
     ossContext.put(StreamContext.CODEC, serde);
     ossContext.put(StreamContext.EVENT_LOOP, eventloop);
 
-    BufferServerPublisher oss = new BufferServerPublisher(upstreamNodeId, 1024);
+    oss = new BufferServerPublisher(upstreamNodeId, 1024);
     oss.setup(ossContext);
+  }
 
-    iss.activate(issContext);
-    LOG.debug("input stream activated");
-
-    oss.activate(ossContext);
-    LOG.debug("output stream activated");
-
-    LOG.debug("Sending hello message");
-    oss.put(StramTestSupport.generateBeginWindowTuple(upstreamNodeId, 0));
-    oss.put(StramTestSupport.generateTuple("hello", 0));
-    oss.put(StramTestSupport.generateEndWindowTuple(upstreamNodeId, 0));
-    oss.put(StramTestSupport.generateBeginWindowTuple(upstreamNodeId, 1)); // it's a spurious tuple, presence of it should not affect the outcome of the test.
-
+  @After
+  public void verify() throws InterruptedException
+  {
     for (int i = 0; i < 100; i++) {
       Tuple t = reservoir.sweep();
       if (t == null) {
@@ -167,5 +213,14 @@ public class SocketStreamTest
     Assert.assertEquals("Received messages", 1, messageCount.get());
   }
 
+  private void sendMessage()
+  {
+    LOG.debug("Sending hello message");
+    oss.put(StramTestSupport.generateBeginWindowTuple(upstreamNodeId, 0));
+    oss.put(StramTestSupport.generateTuple("hello", 0));
+    oss.put(StramTestSupport.generateEndWindowTuple(upstreamNodeId, 0));
+    oss.put(StramTestSupport.generateBeginWindowTuple(upstreamNodeId, 1)); // it's a spurious tuple, presence of it should not affect the outcome of the test.
+  }
+
   private static final Logger logger = LoggerFactory.getLogger(SocketStreamTest.class);
 }