You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2015/11/03 02:46:05 UTC
[1/3] incubator-apex-core git commit: Skipping endWindow and operator
is shutdown prematurely. APEX-58 #resolve
Repository: incubator-apex-core
Updated Branches:
refs/heads/devel-3 b1af72403 -> f35522b5a
Skipping endWindow and operator is shutdown prematurely. APEX-58 #resolve
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/18d43731
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/18d43731
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/18d43731
Branch: refs/heads/devel-3
Commit: 18d437315689aeea32c391a9670f7fc17554fed2
Parents: 8d9aa4b
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Tue Oct 27 18:22:32 2015 -0700
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Mon Nov 2 10:41:47 2015 -0800
----------------------------------------------------------------------
.../datatorrent/stram/engine/GenericNode.java | 6 +-
.../java/com/datatorrent/stram/engine/Node.java | 33 ++++++--
.../stram/engine/GenericNodeTest.java | 85 ++++++++++++++++++++
3 files changed, 118 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/18d43731/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
index 3902f37..26ba98a 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
@@ -553,7 +553,11 @@ public class GenericNode extends Node<Operator>
}
}
- if (insideWindow) {
+ /**
+ * TODO: If shutdown and inside window provide alternate way of notifying the operator in such ways
+ * TODO: as using a listener callback
+ */
+ if (insideWindow && !shutdown) {
endWindowEmitTime = System.currentTimeMillis();
operator.endWindow();
if (++applicationWindowCount == APPLICATION_WINDOW_COUNT) {
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/18d43731/engine/src/main/java/com/datatorrent/stram/engine/Node.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/Node.java b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
index b073dcd..c66df12 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/Node.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
@@ -28,25 +28,44 @@ import java.lang.reflect.Array;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
-import java.util.concurrent.*;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.hadoop.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.util.ReflectionUtils;
+
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.math.IntMath;
-import com.datatorrent.api.*;
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Component;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Operator;
import com.datatorrent.api.Operator.InputPort;
import com.datatorrent.api.Operator.OutputPort;
import com.datatorrent.api.Operator.ProcessingMode;
import com.datatorrent.api.Operator.Unifier;
+import com.datatorrent.api.Sink;
+import com.datatorrent.api.Stats;
+import com.datatorrent.api.StatsListener;
import com.datatorrent.api.StatsListener.OperatorRequest;
-
+import com.datatorrent.api.StorageAgent;
import com.datatorrent.bufferserver.util.Codec;
import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.common.util.Pair;
@@ -297,6 +316,10 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera
logger.warn("Shutdown requested when context is not available!");
}
else {
+ /*
+ * Since alive is non-volatile this code explicitly unsets it in the operator lifecycle theread thereby notifying
+ * it even when the thread is reading it from the cache
+ */
context.request(new OperatorRequest()
{
@Override
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/18d43731/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
index 9e62ac5..d5ceae6 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
@@ -211,4 +211,89 @@ public class GenericNodeTest
Assert.assertEquals(Thread.State.TERMINATED, t.getState());
}
+ @Test
+ public void testPrematureTermination() throws InterruptedException
+ {
+ long maxSleep = 5000;
+ long sleeptime = 25L;
+ GenericOperator go = new GenericOperator();
+ final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, new DefaultAttributeMap(), null));
+ gn.setId(1);
+ DefaultReservoir reservoir1 = new DefaultReservoir("ip1Res", 1024);
+ DefaultReservoir reservoir2 = new DefaultReservoir("ip2Res", 1024);
+
+ gn.connectInputPort("ip1", reservoir1);
+ gn.connectInputPort("ip2", reservoir2);
+ gn.connectOutputPort("op", Sink.BLACKHOLE);
+
+ final AtomicBoolean ab = new AtomicBoolean(false);
+ Thread t = new Thread()
+ {
+ @Override
+ public void run()
+ {
+ ab.set(true);
+ gn.activate();
+ gn.run();
+ gn.deactivate();
+ }
+
+ };
+ t.start();
+
+ long interval = 0;
+ do {
+ Thread.sleep(sleeptime);
+ interval += sleeptime;
+ }
+ while ((ab.get() == false) && (interval < maxSleep));
+
+
+ int controlTupleCount = gn.controlTupleCount;
+ Tuple beginWindow1 = new Tuple(MessageType.BEGIN_WINDOW, 0x1L);
+
+ reservoir1.add(beginWindow1);
+ reservoir2.add(beginWindow1);
+
+ interval = 0;
+ do {
+ Thread.sleep(sleeptime);
+ interval += sleeptime;
+ }
+ while ((gn.controlTupleCount == controlTupleCount) && (interval < maxSleep));
+ Assert.assertTrue("Begin window called", go.endWindowId != go.beginWindowId);
+ controlTupleCount = gn.controlTupleCount;
+
+ Tuple endWindow1 = new EndWindowTuple(0x1L);
+
+ reservoir1.add(endWindow1);
+ reservoir2.add(endWindow1);
+
+ interval = 0;
+ do {
+ Thread.sleep(sleeptime);
+ interval += sleeptime;
+ }
+ while ((gn.controlTupleCount == controlTupleCount) && (interval < maxSleep));
+ Assert.assertTrue("End window called", go.endWindowId == go.beginWindowId);
+ controlTupleCount = gn.controlTupleCount;
+
+ Tuple beginWindow2 = new Tuple(MessageType.BEGIN_WINDOW, 0x2L);
+
+ reservoir1.add(beginWindow2);
+ reservoir2.add(beginWindow2);
+
+ interval = 0;
+ do {
+ Thread.sleep(sleeptime);
+ interval += sleeptime;
+ }
+ while ((gn.controlTupleCount == controlTupleCount) && (interval < maxSleep));
+
+ gn.shutdown();
+ t.join();
+
+ Assert.assertTrue("End window not called", go.endWindowId != go.beginWindowId);
+ }
+
}
[3/3] incubator-apex-core git commit: Adjust checkstyle
maxAllowedViolations.
Posted by th...@apache.org.
Adjust checkstyle maxAllowedViolations.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/f35522b5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/f35522b5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/f35522b5
Branch: refs/heads/devel-3
Commit: f35522b5ac2426967f916c627fe57ae823f03a3b
Parents: b6d80ca
Author: Thomas Weise <th...@datatorrent.com>
Authored: Mon Nov 2 17:45:51 2015 -0800
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Mon Nov 2 17:45:51 2015 -0800
----------------------------------------------------------------------
engine/pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f35522b5/engine/pom.xml
----------------------------------------------------------------------
diff --git a/engine/pom.xml b/engine/pom.xml
index a915ff2..1513728 100644
--- a/engine/pom.xml
+++ b/engine/pom.xml
@@ -145,7 +145,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<configuration>
- <maxAllowedViolations>2159</maxAllowedViolations>
+ <maxAllowedViolations>2072</maxAllowedViolations>
</configuration>
</plugin>
</plugins>
[2/3] incubator-apex-core git commit: Merge branch 'APEX-58' of
https://github.com/PramodSSImmaneni/incubator-apex-core into devel-3
Posted by th...@apache.org.
Merge branch 'APEX-58' of https://github.com/PramodSSImmaneni/incubator-apex-core into devel-3
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/b6d80ca5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/b6d80ca5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/b6d80ca5
Branch: refs/heads/devel-3
Commit: b6d80ca52a94ae1ae10f06f79c55a126e5915fb5
Parents: b1af724 18d4373
Author: Thomas Weise <th...@datatorrent.com>
Authored: Mon Nov 2 17:32:53 2015 -0800
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Mon Nov 2 17:32:53 2015 -0800
----------------------------------------------------------------------
.../datatorrent/stram/engine/GenericNode.java | 6 +-
.../java/com/datatorrent/stram/engine/Node.java | 33 ++++++--
.../stram/engine/GenericNodeTest.java | 85 ++++++++++++++++++++
3 files changed, 118 insertions(+), 6 deletions(-)
----------------------------------------------------------------------