You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2015/11/17 18:10:44 UTC

[1/2] incubator-apex-core git commit: - APEX-129 #resolve #comment Fixed bug where tuples can be emitted outside of a streaming window

Repository: incubator-apex-core
Updated Branches:
  refs/heads/devel-3 93bdf2d10 -> 5b8a4d502


- APEX-129 #resolve #comment Fixed bug where tuples can be emitted outside of a streaming window


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/61fd64df
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/61fd64df
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/61fd64df

Branch: refs/heads/devel-3
Commit: 61fd64df294e215e91455eada2e3aa2c81a0528e
Parents: 1873f55
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Thu Nov 12 13:59:04 2015 -0800
Committer: Timothy Farkas <ti...@datatorrent.com>
Committed: Thu Nov 12 14:38:49 2015 -0800

----------------------------------------------------------------------
 .../com/datatorrent/stram/engine/InputNode.java |  15 +-
 .../datatorrent/stram/engine/InputNodeTest.java | 258 +++++++++++++++++++
 2 files changed, 267 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/61fd64df/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java b/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java
index 1f66635..92a61f0 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java
@@ -69,14 +69,15 @@ public class InputNode extends Node<InputOperator>
     long spinMillis = context.getValue(OperatorContext.SPIN_MILLIS);
     final boolean handleIdleTime = operator instanceof IdleTimeHandler;
 
-    boolean insideWindow = applicationWindowCount != 0;
+    boolean insideApplicationWindow = applicationWindowCount != 0;
     boolean doCheckpoint = false;
+    boolean insideStreamingWindow = false;
 
     try {
       while (alive) {
         Tuple t = controlTuples.sweep();
         if (t == null) {
-          if (insideWindow) {
+          if (insideStreamingWindow) {
             int generatedTuples = 0;
 
             for (Sink<Object> cs : sinks) {
@@ -111,8 +112,9 @@ public class InputNode extends Node<InputOperator>
               }
               controlTupleCount++;
               currentWindowId = t.getWindowId();
+              insideStreamingWindow = true;
               if (applicationWindowCount == 0) {
-                insideWindow = true;
+                insideApplicationWindow = true;
                 operator.beginWindow(currentWindowId);
               }
               operator.emitTuples(); /* give at least one chance to emit the tuples */
@@ -121,8 +123,9 @@ public class InputNode extends Node<InputOperator>
 
             case END_WINDOW:
               endWindowEmitTime = System.currentTimeMillis();
+              insideStreamingWindow = false;
               if (++applicationWindowCount == APPLICATION_WINDOW_COUNT) {
-                insideWindow = false;
+                insideApplicationWindow = false;
                 operator.endWindow();
                 applicationWindowCount = 0;
               }
@@ -145,7 +148,7 @@ public class InputNode extends Node<InputOperator>
 
               ContainerStats.OperatorStats stats = new ContainerStats.OperatorStats();
               reportStats(stats, currentWindowId);
-              if(!insideWindow){
+              if(!insideApplicationWindow){
                 stats.metrics = collectMetrics();
               }
               handleRequests(currentWindowId);
@@ -214,7 +217,7 @@ public class InputNode extends Node<InputOperator>
       }
     }
 
-    if (insideWindow) {
+    if (insideApplicationWindow) {
       endWindowEmitTime = System.currentTimeMillis();
       operator.endWindow();
       if (++applicationWindowCount == APPLICATION_WINDOW_COUNT) {

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/61fd64df/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java
new file mode 100644
index 0000000..3dedd28
--- /dev/null
+++ b/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.stram.engine;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Operator.IdleTimeHandler;
+import com.datatorrent.api.Sink;
+import com.datatorrent.bufferserver.packet.MessageType;
+import com.datatorrent.stram.tuple.EndWindowTuple;
+import com.datatorrent.stram.tuple.ResetWindowTuple;
+import com.datatorrent.stram.tuple.Tuple;
+
+public class InputNodeTest
+{
+  @Test
+  public void testEmitTuplesOutsideStreamingWindow() throws Exception
+  {
+    emitTestHelper(true);
+  }
+
+  @Test
+  public void testHandleIdleTimeOutsideStreamingWindow() throws Exception
+  {
+    emitTestHelper(false);
+  }
+
+  @SuppressWarnings("deprecation")
+  private void emitTestHelper(boolean trueEmitTuplesFalseHandleIdleTime) throws Exception
+  {
+    TestInputOperator tio = new TestInputOperator();
+    tio.trueEmitTuplesFalseHandleIdleTime = trueEmitTuplesFalseHandleIdleTime;
+    DefaultAttributeMap dam = new DefaultAttributeMap();
+    dam.put(OperatorContext.APPLICATION_WINDOW_COUNT, 10);
+    dam.put(OperatorContext.CHECKPOINT_WINDOW_COUNT, 10);
+
+    final InputNode in = new InputNode(tio, new com.datatorrent.stram.engine.OperatorContext(0, dam, null));
+    in.setId(1);
+
+    TestSink testSink = new TestSink();
+
+    in.connectInputPort(Node.INPUT, new TestWindowGenerator());
+    in.connectOutputPort("output", testSink);
+
+    final AtomicBoolean ab = new AtomicBoolean(false);
+    Thread t = new Thread()
+    {
+      @Override
+      public void run()
+      {
+        ab.set(true);
+        in.activate();
+        in.run();
+        in.deactivate();
+      }
+
+    };
+    t.start();
+
+    Thread.sleep(3000);
+
+    t.stop();
+
+    Assert.assertTrue("Should have emitted some tuples", testSink.collectedTuples.size() > 0);
+
+    boolean insideWindow = false;
+
+    for (Object tuple : testSink.collectedTuples) {
+      if (tuple instanceof Tuple) {
+        Tuple controlTuple = (Tuple)tuple;
+        MessageType tupleType = controlTuple.getType();
+
+        if (tupleType == MessageType.RESET_WINDOW) {
+          Assert.assertFalse(insideWindow);
+        } else if (tupleType == MessageType.BEGIN_WINDOW) {
+          Assert.assertFalse(insideWindow);
+          insideWindow = true;
+        } else if (tupleType == MessageType.END_WINDOW) {
+          Assert.assertTrue(insideWindow);
+          insideWindow = false;
+        }
+      }
+      else {
+        Assert.assertTrue(insideWindow);
+      }
+    }
+  }
+
+  public static class TestWindowGenerator implements SweepableReservoir
+  {
+    private final long baseSeconds = (System.currentTimeMillis() / 1000L) << 32;
+    private long windowId = 0L;
+
+    private Tuple currentTuple;
+    private Sink<Object> oldSink = null;
+    private State currentState = State.RESET_WINDOW_NO_TUPLE;
+    private long lastTime;
+
+    public static enum State
+    {
+      RESET_WINDOW_NO_TUPLE,
+      RESET_WINDOW_TUPLE,
+      BEGIN_WINDOW,
+      END_WINDOW;
+    }
+
+    public TestWindowGenerator()
+    {
+    }
+
+    @Override
+    public Sink<Object> setSink(Sink<Object> sink)
+    {
+      Sink<Object> tempOldSink = oldSink;
+      oldSink = sink;
+      return tempOldSink;
+    }
+
+    @Override
+    public Tuple sweep()
+    {
+      switch(currentState) {
+        case RESET_WINDOW_NO_TUPLE: {
+          currentTuple = new ResetWindowTuple(baseSeconds | 500L);
+          currentState = State.RESET_WINDOW_TUPLE;
+          break;
+        }
+        case RESET_WINDOW_TUPLE: {
+          if(currentTuple == null) {
+            currentState = State.BEGIN_WINDOW;
+          }
+          break;
+        }
+        case BEGIN_WINDOW: {
+          if (System.currentTimeMillis() - lastTime > 1000L) {
+            lastTime = System.currentTimeMillis();
+            windowId++;
+            currentTuple = new Tuple(MessageType.BEGIN_WINDOW, baseSeconds | windowId);
+            currentState = State.END_WINDOW;
+          }
+          break;
+        }
+        case END_WINDOW: {
+          currentTuple = new EndWindowTuple(baseSeconds | windowId);
+          currentState = State.BEGIN_WINDOW;
+          break;
+        }
+      }
+
+      return currentTuple;
+    }
+
+    @Override
+    public int getCount(boolean reset)
+    {
+      return 0;
+    }
+
+    @Override
+    public int size()
+    {
+      if (currentTuple != null) {
+        return 1;
+      } else {
+        return 0;
+      }
+    }
+
+    @Override
+    public Object remove()
+    {
+      Tuple tempTuple = currentTuple;
+      currentTuple = null;
+      return tempTuple;
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestWindowGenerator.class);
+  }
+
+  public static class TestInputOperator implements InputOperator, IdleTimeHandler
+  {
+    public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<>();
+
+    public boolean trueEmitTuplesFalseHandleIdleTime = true;
+    private long lastTimestamp;
+
+    @Override
+    public void emitTuples()
+    {
+      if (trueEmitTuplesFalseHandleIdleTime) {
+        emit(100L);
+      }
+    }
+
+    @Override
+    public void beginWindow(long windowId)
+    {
+    }
+
+    @Override
+    public void endWindow()
+    {
+    }
+
+    @Override
+    public void setup(OperatorContext context)
+    {
+    }
+
+    @Override
+    public void teardown()
+    {
+    }
+
+    @Override
+    public void handleIdleTime()
+    {
+      if (!trueEmitTuplesFalseHandleIdleTime) {
+        emit(100L);
+      }
+    }
+
+    private void emit(long delay)
+    {
+      if (System.currentTimeMillis() - lastTimestamp > delay) {
+        lastTimestamp = System.currentTimeMillis();
+        output.emit(1L);
+      }
+    }
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(InputNodeTest.class);
+}


[2/2] incubator-apex-core git commit: Merge branch 'APEX-129' of github.com:ilooner/incubator-apex-core into devel-3

Posted by pr...@apache.org.
Merge branch 'APEX-129' of github.com:ilooner/incubator-apex-core into devel-3


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/5b8a4d50
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/5b8a4d50
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/5b8a4d50

Branch: refs/heads/devel-3
Commit: 5b8a4d5028ed769318794b59bdece4ae3afe633d
Parents: 93bdf2d 61fd64d
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Mon Nov 16 14:19:23 2015 -0800
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Mon Nov 16 14:19:23 2015 -0800

----------------------------------------------------------------------
 .../com/datatorrent/stram/engine/InputNode.java |  15 +-
 .../datatorrent/stram/engine/InputNodeTest.java | 258 +++++++++++++++++++
 2 files changed, 267 insertions(+), 6 deletions(-)
----------------------------------------------------------------------