You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2015/11/17 18:10:44 UTC
[1/2] incubator-apex-core git commit: - APEX-129 #resolve #comment
Fixed bug where tuples can be emitted outside of a streaming window
Repository: incubator-apex-core
Updated Branches:
refs/heads/devel-3 93bdf2d10 -> 5b8a4d502
- APEX-129 #resolve #comment Fixed bug where tuples can be emitted outside of a streaming window
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/61fd64df
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/61fd64df
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/61fd64df
Branch: refs/heads/devel-3
Commit: 61fd64df294e215e91455eada2e3aa2c81a0528e
Parents: 1873f55
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Thu Nov 12 13:59:04 2015 -0800
Committer: Timothy Farkas <ti...@datatorrent.com>
Committed: Thu Nov 12 14:38:49 2015 -0800
----------------------------------------------------------------------
.../com/datatorrent/stram/engine/InputNode.java | 15 +-
.../datatorrent/stram/engine/InputNodeTest.java | 258 +++++++++++++++++++
2 files changed, 267 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/61fd64df/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java b/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java
index 1f66635..92a61f0 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/InputNode.java
@@ -69,14 +69,15 @@ public class InputNode extends Node<InputOperator>
long spinMillis = context.getValue(OperatorContext.SPIN_MILLIS);
final boolean handleIdleTime = operator instanceof IdleTimeHandler;
- boolean insideWindow = applicationWindowCount != 0;
+ boolean insideApplicationWindow = applicationWindowCount != 0;
boolean doCheckpoint = false;
+ boolean insideStreamingWindow = false;
try {
while (alive) {
Tuple t = controlTuples.sweep();
if (t == null) {
- if (insideWindow) {
+ if (insideStreamingWindow) {
int generatedTuples = 0;
for (Sink<Object> cs : sinks) {
@@ -111,8 +112,9 @@ public class InputNode extends Node<InputOperator>
}
controlTupleCount++;
currentWindowId = t.getWindowId();
+ insideStreamingWindow = true;
if (applicationWindowCount == 0) {
- insideWindow = true;
+ insideApplicationWindow = true;
operator.beginWindow(currentWindowId);
}
operator.emitTuples(); /* give at least one chance to emit the tuples */
@@ -121,8 +123,9 @@ public class InputNode extends Node<InputOperator>
case END_WINDOW:
endWindowEmitTime = System.currentTimeMillis();
+ insideStreamingWindow = false;
if (++applicationWindowCount == APPLICATION_WINDOW_COUNT) {
- insideWindow = false;
+ insideApplicationWindow = false;
operator.endWindow();
applicationWindowCount = 0;
}
@@ -145,7 +148,7 @@ public class InputNode extends Node<InputOperator>
ContainerStats.OperatorStats stats = new ContainerStats.OperatorStats();
reportStats(stats, currentWindowId);
- if(!insideWindow){
+ if(!insideApplicationWindow){
stats.metrics = collectMetrics();
}
handleRequests(currentWindowId);
@@ -214,7 +217,7 @@ public class InputNode extends Node<InputOperator>
}
}
- if (insideWindow) {
+ if (insideApplicationWindow) {
endWindowEmitTime = System.currentTimeMillis();
operator.endWindow();
if (++applicationWindowCount == APPLICATION_WINDOW_COUNT) {
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/61fd64df/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java
new file mode 100644
index 0000000..3dedd28
--- /dev/null
+++ b/engine/src/test/java/com/datatorrent/stram/engine/InputNodeTest.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.stram.engine;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Operator.IdleTimeHandler;
+import com.datatorrent.api.Sink;
+import com.datatorrent.bufferserver.packet.MessageType;
+import com.datatorrent.stram.tuple.EndWindowTuple;
+import com.datatorrent.stram.tuple.ResetWindowTuple;
+import com.datatorrent.stram.tuple.Tuple;
+
+public class InputNodeTest
+{
+ @Test
+ public void testEmitTuplesOutsideStreamingWindow() throws Exception
+ {
+ emitTestHelper(true);
+ }
+
+ @Test
+ public void testHandleIdleTimeOutsideStreamingWindow() throws Exception
+ {
+ emitTestHelper(false);
+ }
+
+ @SuppressWarnings("deprecation")
+ private void emitTestHelper(boolean trueEmitTuplesFalseHandleIdleTime) throws Exception
+ {
+ TestInputOperator tio = new TestInputOperator();
+ tio.trueEmitTuplesFalseHandleIdleTime = trueEmitTuplesFalseHandleIdleTime;
+ DefaultAttributeMap dam = new DefaultAttributeMap();
+ dam.put(OperatorContext.APPLICATION_WINDOW_COUNT, 10);
+ dam.put(OperatorContext.CHECKPOINT_WINDOW_COUNT, 10);
+
+ final InputNode in = new InputNode(tio, new com.datatorrent.stram.engine.OperatorContext(0, dam, null));
+ in.setId(1);
+
+ TestSink testSink = new TestSink();
+
+ in.connectInputPort(Node.INPUT, new TestWindowGenerator());
+ in.connectOutputPort("output", testSink);
+
+ final AtomicBoolean ab = new AtomicBoolean(false);
+ Thread t = new Thread()
+ {
+ @Override
+ public void run()
+ {
+ ab.set(true);
+ in.activate();
+ in.run();
+ in.deactivate();
+ }
+
+ };
+ t.start();
+
+ Thread.sleep(3000);
+
+ t.stop();
+
+ Assert.assertTrue("Should have emitted some tuples", testSink.collectedTuples.size() > 0);
+
+ boolean insideWindow = false;
+
+ for (Object tuple : testSink.collectedTuples) {
+ if (tuple instanceof Tuple) {
+ Tuple controlTuple = (Tuple)tuple;
+ MessageType tupleType = controlTuple.getType();
+
+ if (tupleType == MessageType.RESET_WINDOW) {
+ Assert.assertFalse(insideWindow);
+ } else if (tupleType == MessageType.BEGIN_WINDOW) {
+ Assert.assertFalse(insideWindow);
+ insideWindow = true;
+ } else if (tupleType == MessageType.END_WINDOW) {
+ Assert.assertTrue(insideWindow);
+ insideWindow = false;
+ }
+ }
+ else {
+ Assert.assertTrue(insideWindow);
+ }
+ }
+ }
+
+ public static class TestWindowGenerator implements SweepableReservoir
+ {
+ private final long baseSeconds = (System.currentTimeMillis() / 1000L) << 32;
+ private long windowId = 0L;
+
+ private Tuple currentTuple;
+ private Sink<Object> oldSink = null;
+ private State currentState = State.RESET_WINDOW_NO_TUPLE;
+ private long lastTime;
+
+ public static enum State
+ {
+ RESET_WINDOW_NO_TUPLE,
+ RESET_WINDOW_TUPLE,
+ BEGIN_WINDOW,
+ END_WINDOW;
+ }
+
+ public TestWindowGenerator()
+ {
+ }
+
+ @Override
+ public Sink<Object> setSink(Sink<Object> sink)
+ {
+ Sink<Object> tempOldSink = oldSink;
+ oldSink = sink;
+ return tempOldSink;
+ }
+
+ @Override
+ public Tuple sweep()
+ {
+ switch(currentState) {
+ case RESET_WINDOW_NO_TUPLE: {
+ currentTuple = new ResetWindowTuple(baseSeconds | 500L);
+ currentState = State.RESET_WINDOW_TUPLE;
+ break;
+ }
+ case RESET_WINDOW_TUPLE: {
+ if(currentTuple == null) {
+ currentState = State.BEGIN_WINDOW;
+ }
+ break;
+ }
+ case BEGIN_WINDOW: {
+ if (System.currentTimeMillis() - lastTime > 1000L) {
+ lastTime = System.currentTimeMillis();
+ windowId++;
+ currentTuple = new Tuple(MessageType.BEGIN_WINDOW, baseSeconds | windowId);
+ currentState = State.END_WINDOW;
+ }
+ break;
+ }
+ case END_WINDOW: {
+ currentTuple = new EndWindowTuple(baseSeconds | windowId);
+ currentState = State.BEGIN_WINDOW;
+ break;
+ }
+ }
+
+ return currentTuple;
+ }
+
+ @Override
+ public int getCount(boolean reset)
+ {
+ return 0;
+ }
+
+ @Override
+ public int size()
+ {
+ if (currentTuple != null) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+
+ @Override
+ public Object remove()
+ {
+ Tuple tempTuple = currentTuple;
+ currentTuple = null;
+ return tempTuple;
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestWindowGenerator.class);
+ }
+
+ public static class TestInputOperator implements InputOperator, IdleTimeHandler
+ {
+ public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<>();
+
+ public boolean trueEmitTuplesFalseHandleIdleTime = true;
+ private long lastTimestamp;
+
+ @Override
+ public void emitTuples()
+ {
+ if (trueEmitTuplesFalseHandleIdleTime) {
+ emit(100L);
+ }
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ }
+
+ @Override
+ public void endWindow()
+ {
+ }
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ }
+
+ @Override
+ public void teardown()
+ {
+ }
+
+ @Override
+ public void handleIdleTime()
+ {
+ if (!trueEmitTuplesFalseHandleIdleTime) {
+ emit(100L);
+ }
+ }
+
+ private void emit(long delay)
+ {
+ if (System.currentTimeMillis() - lastTimestamp > delay) {
+ lastTimestamp = System.currentTimeMillis();
+ output.emit(1L);
+ }
+ }
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(InputNodeTest.class);
+}
[2/2] incubator-apex-core git commit: Merge branch 'APEX-129' of
github.com:ilooner/incubator-apex-core into devel-3
Posted by pr...@apache.org.
Merge branch 'APEX-129' of github.com:ilooner/incubator-apex-core into devel-3
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/5b8a4d50
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/5b8a4d50
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/5b8a4d50
Branch: refs/heads/devel-3
Commit: 5b8a4d5028ed769318794b59bdece4ae3afe633d
Parents: 93bdf2d 61fd64d
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Mon Nov 16 14:19:23 2015 -0800
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Mon Nov 16 14:19:23 2015 -0800
----------------------------------------------------------------------
.../com/datatorrent/stram/engine/InputNode.java | 15 +-
.../datatorrent/stram/engine/InputNodeTest.java | 258 +++++++++++++++++++
2 files changed, 267 insertions(+), 6 deletions(-)
----------------------------------------------------------------------