You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/01/23 04:53:02 UTC
[1/3] incubator-apex-core git commit: APEXCORE-60 Iteration support
in Apex Core
Repository: incubator-apex-core
Updated Branches:
refs/heads/devel-3 d0908e4bc -> b3402be5a
APEXCORE-60 Iteration support in Apex Core
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/f7e1ccf1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/f7e1ccf1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/f7e1ccf1
Branch: refs/heads/devel-3
Commit: f7e1ccf14154eca92b24c5f6b5387fe56c516829
Parents: d0908e4
Author: David Yan <da...@datatorrent.com>
Authored: Wed Dec 9 15:52:26 2015 -0800
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Jan 22 19:03:46 2016 -0800
----------------------------------------------------------------------
.../main/java/com/datatorrent/api/Context.java | 7 +
.../main/java/com/datatorrent/api/Operator.java | 19 +
.../common/util/DefaultDelayOperator.java | 75 ++++
.../datatorrent/stram/StramLocalCluster.java | 15 +
.../stram/StreamingContainerManager.java | 56 ++-
.../datatorrent/stram/engine/GenericNode.java | 190 +++++++---
.../java/com/datatorrent/stram/engine/Node.java | 6 +-
.../stram/engine/StreamingContainer.java | 2 +
.../stram/engine/WindowGenerator.java | 14 +-
.../stram/plan/logical/LogicalPlan.java | 53 +++
.../stram/plan/physical/PTOperator.java | 4 +-
.../stram/plan/physical/PhysicalPlan.java | 19 +-
.../stram/plan/physical/StreamMapping.java | 4 +-
.../java/com/datatorrent/stram/tuple/Tuple.java | 5 +
.../stram/debug/TupleRecorderTest.java | 208 +++++-----
.../stram/engine/GenericNodeTest.java | 18 +-
.../stram/engine/GenericTestOperator.java | 3 +
.../stram/plan/logical/DelayOperatorTest.java | 377 +++++++++++++++++++
18 files changed, 888 insertions(+), 187 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/api/src/main/java/com/datatorrent/api/Context.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java
index ceed8a2..58bc552 100644
--- a/api/src/main/java/com/datatorrent/api/Context.java
+++ b/api/src/main/java/com/datatorrent/api/Context.java
@@ -166,6 +166,13 @@ public interface Context
*/
Attribute<Class<?>> TUPLE_CLASS = new Attribute<>(new Class2String<>());
+ /**
+ * Attribute of input port.
+ * This is a read-only attribute to query whether the input port is connected to a DelayOperator
+ * This is for iterative processing.
+ */
+ Attribute<Boolean> IS_CONNECTED_TO_DELAY_OPERATOR = new Attribute<>(false);
+
@SuppressWarnings("FieldNameHidesFieldInSuperclass")
long serialVersionUID = AttributeMap.AttributeInitializer.initialize(PortContext.class);
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/api/src/main/java/com/datatorrent/api/Operator.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/Operator.java b/api/src/main/java/com/datatorrent/api/Operator.java
index 785c60b..d4a6a90 100644
--- a/api/src/main/java/com/datatorrent/api/Operator.java
+++ b/api/src/main/java/com/datatorrent/api/Operator.java
@@ -99,6 +99,25 @@ public interface Operator extends Component<OperatorContext>
}
/**
+ * DelayOperator is an operator of which the outgoing streaming window id is incremented by *one* by the
+ * engine, thus allowing loops in the "DAG". The output ports of a DelayOperator, if connected, *must*
+ * immediately connect to an upstream operator in the data flow path. Note that at least one output port of
+ * DelayOperator should be connected in order for the DelayOperator to serve its purpose.
+ *
+ * This is meant for iterative algorithms in the topology. A larger window increment can be simulated by an
+ * implementation of this interface.
+ */
+ interface DelayOperator extends Operator
+ {
+ /**
+ * This method gets called at the first window of the execution.
+ * The implementation is expected to emit tuples for initialization and/or
+ * recovery.
+ */
+ void firstWindow();
+ }
+
+ /**
* A operator provides ports as a means to consume and produce data tuples.
* Concrete ports implement derived interfaces.
*/
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java b/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java
new file mode 100644
index 0000000..ff676d4
--- /dev/null
+++ b/common/src/main/java/com/datatorrent/common/util/DefaultDelayOperator.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.common.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+
+/**
+ * DefaultDelayOperator. This is an implementation of the DelayOperator that has one input port and one output
+ * port, and does a simple pass-through from the input port to the output port, while recording the tuples in memory
+ * as checkpoint state. Subclass of this operator can override this behavior by overriding processTuple(T tuple).
+ *
+ * Note that the engine automatically does a +1 on the output window ID since it is a DelayOperator.
+ *
+ * This DelayOperator provides no data loss during recovery, but it incurs a run-time cost per tuple, and all tuples
+ * of the checkpoint window will be part of the checkpoint state.
+ */
+public class DefaultDelayOperator<T> extends BaseOperator implements Operator.DelayOperator
+{
+ public transient DefaultInputPort<T> input = new DefaultInputPort<T>()
+ {
+ @Override
+ public void process(T tuple)
+ {
+ processTuple(tuple);
+ }
+ };
+
+ public transient DefaultOutputPort<T> output = new DefaultOutputPort<T>();
+
+ protected List<T> lastWindowTuples = new ArrayList<>();
+
+ protected void processTuple(T tuple)
+ {
+ lastWindowTuples.add(tuple);
+ output.emit(tuple);
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ lastWindowTuples.clear();
+ }
+
+ @Override
+ public void firstWindow()
+ {
+ for (T tuple : lastWindowTuples) {
+ output.emit(tuple);
+ }
+ }
+
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
index 29e8e03..cda2a38 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java
@@ -27,6 +27,7 @@ import java.net.SocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -80,6 +81,7 @@ public class StramLocalCluster implements Runnable, Controller
private boolean appDone = false;
private final Map<String, StreamingContainer> injectShutdown = new ConcurrentHashMap<String, StreamingContainer>();
private boolean heartbeatMonitoringEnabled = true;
+ private Callable<Boolean> exitCondition;
public interface MockComponentFactory
{
@@ -427,6 +429,11 @@ public class StramLocalCluster implements Runnable, Controller
this.perContainerBufferServer = perContainerBufferServer;
}
+ public void setExitCondition(Callable<Boolean> exitCondition)
+ {
+ this.exitCondition = exitCondition;
+ }
+
@Override
public void run()
{
@@ -476,6 +483,14 @@ public class StramLocalCluster implements Runnable, Controller
appDone = true;
}
+ try {
+ if (exitCondition != null && exitCondition.call()) {
+ appDone = true;
+ }
+ } catch (Exception ex) {
+ break;
+ }
+
if (Thread.interrupted()) {
break;
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 4b79589..6233697 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -1008,7 +1008,7 @@ public class StreamingContainerManager implements PlanContext
return operatorStatus.latencyMA.getAvg();
}
for (PTOperator.PTInput input : maxOperator.getInputs()) {
- if (null != input.source.source) {
+ if (null != input.source.source && !input.delay) {
operators.add(input.source.source);
}
}
@@ -1896,6 +1896,19 @@ public class StreamingContainerManager implements PlanContext
}
+ private void addVisited(PTOperator operator, UpdateCheckpointsContext ctx)
+ {
+ ctx.visited.add(operator);
+ for (PTOperator.PTOutput out : operator.getOutputs()) {
+ for (PTOperator.PTInput sink : out.sinks) {
+ PTOperator sinkOperator = sink.target;
+ if (!ctx.visited.contains(sinkOperator)) {
+ addVisited(sinkOperator, ctx);
+ }
+ }
+ }
+ }
+
/**
* Compute checkpoints required for a given operator instance to be recovered.
* This is done by looking at checkpoints available for downstream dependencies first,
@@ -1913,6 +1926,9 @@ public class StreamingContainerManager implements PlanContext
if (operator.getState() == PTOperator.State.ACTIVE && (ctx.currentTms - operator.stats.lastWindowIdChangeTms) > operator.stats.windowProcessingTimeoutMillis) {
// if the checkpoint is ahead, then it is not blocked but waiting for activation (state-less recovery, at-most-once)
if (ctx.committedWindowId.longValue() >= operator.getRecoveryCheckpoint().windowId) {
+ LOG.debug("Marking operator {} blocked committed window {}, recovery window {}", operator,
+ Codec.getStringWindowId(ctx.committedWindowId.longValue()),
+ Codec.getStringWindowId(operator.getRecoveryCheckpoint().windowId));
ctx.blocked.add(operator);
}
}
@@ -1922,25 +1938,30 @@ public class StreamingContainerManager implements PlanContext
long currentWindowId = WindowGenerator.getWindowId(ctx.currentTms, this.vars.windowStartMillis, this.getLogicalPlan().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS));
maxCheckpoint = currentWindowId;
}
+ ctx.visited.add(operator);
// DFS downstream operators
- for (PTOperator.PTOutput out : operator.getOutputs()) {
- for (PTOperator.PTInput sink : out.sinks) {
- PTOperator sinkOperator = sink.target;
- if (!ctx.visited.contains(sinkOperator)) {
- // downstream traversal
- updateRecoveryCheckpoints(sinkOperator, ctx);
- }
- // recovery window id cannot move backwards
- // when dynamically adding new operators
- if (sinkOperator.getRecoveryCheckpoint().windowId >= operator.getRecoveryCheckpoint().windowId) {
- maxCheckpoint = Math.min(maxCheckpoint, sinkOperator.getRecoveryCheckpoint().windowId);
- }
+ if (operator.getOperatorMeta().getOperator() instanceof Operator.DelayOperator) {
+ addVisited(operator, ctx);
+ } else {
+ for (PTOperator.PTOutput out : operator.getOutputs()) {
+ for (PTOperator.PTInput sink : out.sinks) {
+ PTOperator sinkOperator = sink.target;
+ if (!ctx.visited.contains(sinkOperator)) {
+ // downstream traversal
+ updateRecoveryCheckpoints(sinkOperator, ctx);
+ }
+ // recovery window id cannot move backwards
+ // when dynamically adding new operators
+ if (sinkOperator.getRecoveryCheckpoint().windowId >= operator.getRecoveryCheckpoint().windowId) {
+ maxCheckpoint = Math.min(maxCheckpoint, sinkOperator.getRecoveryCheckpoint().windowId);
+ }
- if (ctx.blocked.contains(sinkOperator)) {
- if (sinkOperator.stats.getCurrentWindowId() == operator.stats.getCurrentWindowId()) {
- // downstream operator is blocked by this operator
- ctx.blocked.remove(sinkOperator);
+ if (ctx.blocked.contains(sinkOperator)) {
+ if (sinkOperator.stats.getCurrentWindowId() == operator.stats.getCurrentWindowId()) {
+ // downstream operator is blocked by this operator
+ ctx.blocked.remove(sinkOperator);
+ }
}
}
}
@@ -1975,7 +1996,6 @@ public class StreamingContainerManager implements PlanContext
LOG.debug("Skipping checkpoint update {} during {}", operator, operator.getState());
}
- ctx.visited.add(operator);
}
public long windowIdToMillis(long windowId)
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
index 93cee49..4777f93 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
@@ -34,11 +34,14 @@ import com.datatorrent.api.Operator.ShutdownException;
import com.datatorrent.api.Sink;
import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.bufferserver.packet.MessageType;
import com.datatorrent.bufferserver.util.Codec;
import com.datatorrent.netlet.util.DTThrowable;
import com.datatorrent.netlet.util.CircularBuffer;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerStats;
import com.datatorrent.stram.debug.TappedReservoir;
+import com.datatorrent.stram.plan.logical.Operators;
+import com.datatorrent.stram.tuple.ResetWindowTuple;
import com.datatorrent.stram.tuple.Tuple;
/**
@@ -198,6 +201,15 @@ public class GenericNode extends Node<Operator>
insideWindow = applicationWindowCount != 0;
}
+ private boolean isInputPortConnectedToDelayOperator(String portName)
+ {
+ Operators.PortContextPair<InputPort<?>> pcPair = descriptor.inputPorts.get(portName);
+ if (pcPair == null || pcPair.context == null) {
+ return false;
+ }
+ return pcPair.context.getValue(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR);
+ }
+
/**
* Originally this method was defined in an attempt to implement the interface Runnable.
*
@@ -212,30 +224,67 @@ public class GenericNode extends Node<Operator>
long spinMillis = context.getValue(OperatorContext.SPIN_MILLIS);
final boolean handleIdleTime = operator instanceof IdleTimeHandler;
int totalQueues = inputs.size();
+ int regularQueues = totalQueues;
+ // regularQueues is the number of queues that are not connected to a DelayOperator
+ for (String portName : inputs.keySet()) {
+ if (isInputPortConnectedToDelayOperator(portName)) {
+ regularQueues--;
+ }
+ }
- ArrayList<SweepableReservoir> activeQueues = new ArrayList<SweepableReservoir>();
- activeQueues.addAll(inputs.values());
+ ArrayList<Map.Entry<String, SweepableReservoir>> activeQueues = new ArrayList<>();
+ activeQueues.addAll(inputs.entrySet());
int expectingBeginWindow = activeQueues.size();
int receivedEndWindow = 0;
+ long firstWindowId = -1;
TupleTracker tracker;
LinkedList<TupleTracker> resetTupleTracker = new LinkedList<TupleTracker>();
-
try {
do {
- Iterator<SweepableReservoir> buffers = activeQueues.iterator();
+ Iterator<Map.Entry<String, SweepableReservoir>> buffers = activeQueues.iterator();
activequeue:
while (buffers.hasNext()) {
- SweepableReservoir activePort = buffers.next();
+ Map.Entry<String, SweepableReservoir> activePortEntry = buffers.next();
+ SweepableReservoir activePort = activePortEntry.getValue();
Tuple t = activePort.sweep();
if (t != null) {
+ boolean delay = (operator instanceof Operator.DelayOperator);
+ long windowAhead = 0;
+ if (delay) {
+ windowAhead = WindowGenerator.getAheadWindowId(t.getWindowId(), firstWindowMillis, windowWidthMillis, 1);
+ }
switch (t.getType()) {
case BEGIN_WINDOW:
if (expectingBeginWindow == totalQueues) {
+ // This is the first begin window tuple among all ports
+ if (isInputPortConnectedToDelayOperator(activePortEntry.getKey())) {
+ // We need to wait for the first BEGIN_WINDOW from a port not connected to DelayOperator before
+ // we can do anything with it, because otherwise if a CHECKPOINT tuple arrives from
+ // upstream after the BEGIN_WINDOW tuple for the next window from the delay operator, it would end
+ // up checkpointing in the middle of the window. This code is assuming we have at least one
+ // input port that is not connected to a DelayOperator, and we might have to change this later.
+ // In the future, this condition will not be needed if we get rid of the CHECKPOINT tuple.
+ continue;
+ }
activePort.remove();
expectingBeginWindow--;
+ receivedEndWindow = 0;
currentWindowId = t.getWindowId();
+ if (delay) {
+ if (WindowGenerator.getBaseSecondsFromWindowId(windowAhead) > t.getBaseSeconds()) {
+ // Buffer server code strips out the base seconds from BEGIN_WINDOW and END_WINDOW tuples for
+ // serialization optimization. That's why we need a reset window here to tell the buffer
+ // server we are having a new baseSeconds now.
+ Tuple resetWindowTuple = new ResetWindowTuple(windowAhead);
+ for (int s = sinks.length; s-- > 0; ) {
+ sinks[s].put(resetWindowTuple);
+ }
+ controlTupleCount++;
+ }
+ t.setWindowId(windowAhead);
+ }
for (int s = sinks.length; s-- > 0; ) {
sinks[s].put(t);
}
@@ -245,7 +294,6 @@ public class GenericNode extends Node<Operator>
insideWindow = true;
operator.beginWindow(currentWindowId);
}
- receivedEndWindow = 0;
}
else if (t.getWindowId() == currentWindowId) {
activePort.remove();
@@ -253,17 +301,7 @@ public class GenericNode extends Node<Operator>
}
else {
buffers.remove();
-
- /* find the name of the port which got out of sequence tuple */
- String port = null;
- for (Entry<String, SweepableReservoir> e : inputs.entrySet()) {
- if (e.getValue() == activePort) {
- port = e.getKey();
- }
- }
-
- assert (port != null); /* we should always find the port */
-
+ String port = activePortEntry.getKey();
if (PROCESSING_MODE == ProcessingMode.AT_MOST_ONCE) {
if (t.getWindowId() < currentWindowId) {
/*
@@ -279,21 +317,21 @@ public class GenericNode extends Node<Operator>
WindowIdActivatedReservoir wiar = new WindowIdActivatedReservoir(port, activePort, currentWindowId);
wiar.setSink(sink);
inputs.put(port, wiar);
- activeQueues.add(wiar);
+ activeQueues.add(new AbstractMap.SimpleEntry<String, SweepableReservoir>(port, wiar));
break activequeue;
}
else {
expectingBeginWindow--;
if (++receivedEndWindow == totalQueues) {
processEndWindow(null);
- activeQueues.addAll(inputs.values());
+ activeQueues.addAll(inputs.entrySet());
expectingBeginWindow = activeQueues.size();
break activequeue;
}
}
}
else {
- logger.error("Catastrophic Error: Out of sequence tuple {} on port {} while expecting {}", Codec.getStringWindowId(t.getWindowId()), port, Codec.getStringWindowId(currentWindowId));
+ logger.error("Catastrophic Error: Out of sequence {} tuple {} on port {} while expecting {}", t.getType(), Codec.getStringWindowId(t.getWindowId()), port, Codec.getStringWindowId(currentWindowId));
System.exit(2);
}
}
@@ -306,8 +344,11 @@ public class GenericNode extends Node<Operator>
endWindowDequeueTimes.put(activePort, System.currentTimeMillis());
if (++receivedEndWindow == totalQueues) {
assert (activeQueues.isEmpty());
+ if (delay) {
+ t.setWindowId(windowAhead);
+ }
processEndWindow(t);
- activeQueues.addAll(inputs.values());
+ activeQueues.addAll(inputs.entrySet());
expectingBeginWindow = activeQueues.size();
break activequeue;
}
@@ -330,11 +371,12 @@ public class GenericNode extends Node<Operator>
doCheckpoint = true;
}
}
-
- for (int s = sinks.length; s-- > 0; ) {
- sinks[s].put(t);
+ if (!delay) {
+ for (int s = sinks.length; s-- > 0; ) {
+ sinks[s].put(t);
+ }
+ controlTupleCount++;
}
- controlTupleCount++;
}
break;
@@ -343,12 +385,14 @@ public class GenericNode extends Node<Operator>
* we will receive tuples which are equal to the number of input streams.
*/
activePort.remove();
- buffers.remove();
+ if (isInputPortConnectedToDelayOperator(activePortEntry.getKey())) {
+ break; // breaking out of the switch/case
+ }
+ buffers.remove();
int baseSeconds = t.getBaseSeconds();
tracker = null;
- Iterator<TupleTracker> trackerIterator = resetTupleTracker.iterator();
- while (trackerIterator.hasNext()) {
+ for (Iterator<TupleTracker> trackerIterator = resetTupleTracker.iterator(); trackerIterator.hasNext(); ) {
tracker = trackerIterator.next();
if (tracker.tuple.getBaseSeconds() == baseSeconds) {
break;
@@ -356,7 +400,7 @@ public class GenericNode extends Node<Operator>
}
if (tracker == null) {
- tracker = new TupleTracker(t, totalQueues);
+ tracker = new TupleTracker(t, regularQueues);
resetTupleTracker.add(tracker);
}
int trackerIndex = 0;
@@ -364,29 +408,50 @@ public class GenericNode extends Node<Operator>
if (tracker.ports[trackerIndex] == null) {
tracker.ports[trackerIndex++] = activePort;
break;
- }
- else if (tracker.ports[trackerIndex] == activePort) {
+ } else if (tracker.ports[trackerIndex] == activePort) {
break;
}
trackerIndex++;
}
- if (trackerIndex == totalQueues) {
- trackerIterator = resetTupleTracker.iterator();
+ if (trackerIndex == regularQueues) {
+ Iterator<TupleTracker> trackerIterator = resetTupleTracker.iterator();
while (trackerIterator.hasNext()) {
if (trackerIterator.next().tuple.getBaseSeconds() <= baseSeconds) {
trackerIterator.remove();
}
}
- for (int s = sinks.length; s-- > 0; ) {
- sinks[s].put(t);
+ if (!delay) {
+ for (int s = sinks.length; s-- > 0; ) {
+ sinks[s].put(t);
+ }
+ controlTupleCount++;
}
- controlTupleCount++;
-
- assert (activeQueues.isEmpty());
- activeQueues.addAll(inputs.values());
+ if (!activeQueues.isEmpty()) {
+ // make sure they are all queues from DelayOperator
+ for (Map.Entry<String, SweepableReservoir> entry : activeQueues) {
+ if (!isInputPortConnectedToDelayOperator(entry.getKey())) {
+ assert (false);
+ }
+ }
+ activeQueues.clear();
+ }
+ activeQueues.addAll(inputs.entrySet());
expectingBeginWindow = activeQueues.size();
+
+ if (firstWindowId == -1) {
+ if (delay) {
+ for (int s = sinks.length; s-- > 0; ) {
+ sinks[s].put(t);
+ }
+ controlTupleCount++;
+ // if it's a DelayOperator and this is the first RESET_WINDOW (start) or END_STREAM
+ // (recovery), fabricate the first window
+ fabricateFirstWindow((Operator.DelayOperator)operator, windowAhead);
+ }
+ firstWindowId = t.getWindowId();
+ }
break activequeue;
}
break;
@@ -394,6 +459,15 @@ public class GenericNode extends Node<Operator>
case END_STREAM:
activePort.remove();
buffers.remove();
+ if (firstWindowId == -1) {
+ // this is for recovery from a checkpoint for DelayOperator
+ if (delay) {
+ // if it's a DelayOperator and this is the first RESET_WINDOW (start) or END_STREAM (recovery),
+ // fabricate the first window
+ fabricateFirstWindow((Operator.DelayOperator)operator, windowAhead);
+ }
+ firstWindowId = t.getWindowId();
+ }
for (Iterator<Entry<String, SweepableReservoir>> it = inputs.entrySet().iterator(); it.hasNext(); ) {
Entry<String, SweepableReservoir> e = it.next();
if (e.getValue() == activePort) {
@@ -409,7 +483,7 @@ public class GenericNode extends Node<Operator>
if (e.getKey().equals(dic.portname)) {
connectInputPort(dic.portname, dic.reservoir);
dici.remove();
- activeQueues.add(dic.reservoir);
+ activeQueues.add(new AbstractMap.SimpleEntry<>(dic.portname, dic.reservoir));
break activequeue;
}
}
@@ -427,17 +501,18 @@ public class GenericNode extends Node<Operator>
* Since one of the operators we care about it gone, we should relook at our ports.
* We need to make sure that the END_STREAM comes outside of the window.
*/
+ regularQueues--;
totalQueues--;
boolean break_activequeue = false;
- if (totalQueues == 0) {
+ if (regularQueues == 0) {
alive = false;
break_activequeue = true;
}
else if (activeQueues.isEmpty()) {
assert (!inputs.isEmpty());
processEndWindow(null);
- activeQueues.addAll(inputs.values());
+ activeQueues.addAll(inputs.entrySet());
expectingBeginWindow = activeQueues.size();
break_activequeue = true;
}
@@ -450,22 +525,22 @@ public class GenericNode extends Node<Operator>
* it's the only one which has not, then we consider it delivered and release the reset tuple downstream.
*/
Tuple tuple = null;
- for (trackerIterator = resetTupleTracker.iterator(); trackerIterator.hasNext(); ) {
+ for (Iterator<TupleTracker> trackerIterator = resetTupleTracker.iterator(); trackerIterator.hasNext(); ) {
tracker = trackerIterator.next();
trackerIndex = 0;
while (trackerIndex < tracker.ports.length) {
if (tracker.ports[trackerIndex] == activePort) {
- SweepableReservoir[] ports = new SweepableReservoir[totalQueues];
+ SweepableReservoir[] ports = new SweepableReservoir[regularQueues];
System.arraycopy(tracker.ports, 0, ports, 0, trackerIndex);
- if (trackerIndex < totalQueues) {
+ if (trackerIndex < regularQueues) {
System.arraycopy(tracker.ports, trackerIndex + 1, ports, trackerIndex, tracker.ports.length - trackerIndex - 1);
}
tracker.ports = ports;
break;
}
else if (tracker.ports[trackerIndex] == null) {
- if (trackerIndex == totalQueues) { /* totalQueues is already adjusted above */
+ if (trackerIndex == regularQueues) { /* regularQueues is already adjusted above */
if (tuple == null || tuple.getBaseSeconds() < tracker.tuple.getBaseSeconds()) {
tuple = tracker.tuple;
}
@@ -475,7 +550,7 @@ public class GenericNode extends Node<Operator>
break;
}
else {
- tracker.ports = Arrays.copyOf(tracker.ports, totalQueues);
+ tracker.ports = Arrays.copyOf(tracker.ports, regularQueues);
}
trackerIndex++;
@@ -485,7 +560,7 @@ public class GenericNode extends Node<Operator>
/*
* Since we were waiting for a reset tuple on this stream, we should not any longer.
*/
- if (tuple != null) {
+ if (tuple != null && !delay) {
for (int s = sinks.length; s-- > 0; ) {
sinks[s].put(tuple);
}
@@ -509,8 +584,8 @@ public class GenericNode extends Node<Operator>
}
else {
boolean need2sleep = true;
- for (SweepableReservoir cb : activeQueues) {
- if (cb.size() > 0) {
+ for (Map.Entry<String, SweepableReservoir> cb : activeQueues) {
+ if (cb.getValue().size() > 0) {
need2sleep = false;
break;
}
@@ -582,6 +657,21 @@ public class GenericNode extends Node<Operator>
}
+ private void fabricateFirstWindow(Operator.DelayOperator delayOperator, long windowAhead)
+ {
+ Tuple beginWindowTuple = new Tuple(MessageType.BEGIN_WINDOW, windowAhead);
+ Tuple endWindowTuple = new Tuple(MessageType.END_WINDOW, windowAhead);
+ for (Sink<Object> sink : outputs.values()) {
+ sink.put(beginWindowTuple);
+ }
+ controlTupleCount++;
+ delayOperator.firstWindow();
+ for (Sink<Object> sink : outputs.values()) {
+ sink.put(endWindowTuple);
+ }
+ controlTupleCount++;
+ }
+
/**
* End window dequeue times may not have been saved for all the input ports during deactivate,
* so save them for reporting. SPOI-1324.
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/engine/Node.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/Node.java b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
index 068a325..d4970cd 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/Node.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
@@ -126,6 +126,8 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera
private ExecutorService executorService;
private Queue<Pair<FutureTask<Stats.CheckpointStats>, CheckpointWindowInfo>> taskQueue;
protected Stats.CheckpointStats checkpointStats;
+ public long firstWindowMillis;
+ public long windowWidthMillis;
public Node(OPERATOR operator, OperatorContext context)
{
@@ -354,7 +356,9 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera
protected void emitEndWindow()
{
- EndWindowTuple ewt = new EndWindowTuple(currentWindowId);
+ long windowId = (operator instanceof Operator.DelayOperator) ?
+ WindowGenerator.getAheadWindowId(currentWindowId, firstWindowMillis, windowWidthMillis, 1) : currentWindowId;
+ EndWindowTuple ewt = new EndWindowTuple(windowId);
for (int s = sinks.length; s-- > 0; ) {
sinks[s].put(ewt);
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
index 14e00a9..79d9037 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
@@ -894,6 +894,8 @@ public class StreamingContainer extends YarnContainerMain
Node<?> node = Node.retrieveNode(backupAgent.load(ndi.id, ctx.stateless ? Stateless.WINDOW_ID : ndi.checkpoint.windowId), ctx, ndi.type);
node.currentWindowId = ndi.checkpoint.windowId;
node.applicationWindowCount = ndi.checkpoint.applicationWindowCount;
+ node.firstWindowMillis = firstWindowMillis;
+ node.windowWidthMillis = windowWidthMillis;
node.setId(ndi.id);
nodes.put(ndi.id, node);
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java b/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
index 5610112..ea429af 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
@@ -314,13 +314,25 @@ public class WindowGenerator extends MuxReservoir implements Stream, Runnable
long baseMillis = (windowId >> 32) * 1000;
long diff = baseMillis - firstWindowMillis;
long baseChangeInterval = windowWidthMillis * (WindowGenerator.MAX_WINDOW_ID + 1);
+ assert (baseChangeInterval > 0);
long multiplier = diff / baseChangeInterval;
if (diff % baseChangeInterval > 0) {
multiplier++;
}
assert (multiplier >= 0);
windowId = windowId & WindowGenerator.WINDOW_MASK;
- return firstWindowMillis + (multiplier * windowWidthMillis * (WindowGenerator.MAX_WINDOW_ID + 1)) + windowId * windowWidthMillis;
+ return firstWindowMillis + (multiplier * baseChangeInterval) + (windowId * windowWidthMillis);
+ }
+
+ /**
+ * Utility function to get the base seconds from a window id
+ *
+ * @param windowId
+ * @return the base seconds for the given window id
+ */
+ public static long getBaseSecondsFromWindowId(long windowId)
+ {
+ return windowId >>> 32;
}
private class MasterReservoir extends CircularBuffer<Tuple> implements Reservoir
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 867f814..3c26118 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -1758,6 +1758,14 @@ public class LogicalPlan implements Serializable, DAG
throw new ValidationException("Loops in graph: " + cycles);
}
+ List<List<String>> invalidDelays = new ArrayList<>();
+ for (OperatorMeta n : rootOperators) {
+ findInvalidDelays(n, invalidDelays);
+ }
+ if (!invalidDelays.isEmpty()) {
+ throw new ValidationException("Invalid delays in graph: " + invalidDelays);
+ }
+
for (StreamMeta s: streams.values()) {
if (s.source == null) {
throw new ValidationException("Stream source not connected: " + s.getName());
@@ -1814,6 +1822,11 @@ public class LogicalPlan implements Serializable, DAG
return;
}
+ if (om.getOperator() instanceof Operator.DelayOperator) {
+ String msg = String.format("Locality %s invalid for delay operator %s", Locality.THREAD_LOCAL, om);
+ throw new ValidationException(msg);
+ }
+
for (StreamMeta sm: om.inputStreams.values()){
// validation fail as each input stream should be OIO
if (sm.locality != Locality.THREAD_LOCAL){
@@ -1822,6 +1835,10 @@ public class LogicalPlan implements Serializable, DAG
throw new ValidationException(msg);
}
+ if (sm.source.operatorMeta.getOperator() instanceof Operator.DelayOperator) {
+ String msg = String.format("Locality %s invalid for delay operator %s", Locality.THREAD_LOCAL, sm.source.operatorMeta);
+ throw new ValidationException(msg);
+ }
// gets oio root for input operator for the stream
Integer oioStreamRoot = getOioRoot(sm.source.operatorMeta);
@@ -1895,6 +1912,11 @@ public class LogicalPlan implements Serializable, DAG
// depth first successors traversal
for (StreamMeta downStream: om.outputStreams.values()) {
for (InputPortMeta sink: downStream.sinks) {
+ if (om.getOperator() instanceof Operator.DelayOperator) {
+ // this is an iteration loop, do not treat it as downstream when detecting cycles
+ sink.attributes.put(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR, true);
+ continue;
+ }
OperatorMeta successor = sink.getOperatorWrapper();
if (successor == null) {
continue;
@@ -1932,6 +1954,37 @@ public class LogicalPlan implements Serializable, DAG
}
}
+ public void findInvalidDelays(OperatorMeta om, List<List<String>> invalidDelays)
+ {
+ stack.push(om);
+
+ // depth first successors traversal
+ boolean isDelayOperator = om.getOperator() instanceof Operator.DelayOperator;
+ if (isDelayOperator) {
+ if (om.getValue(OperatorContext.APPLICATION_WINDOW_COUNT) != 1) {
+ LOG.debug("detected DelayOperator having APPLICATION_WINDOW_COUNT not equal to 1");
+ invalidDelays.add(Collections.singletonList(om.getName()));
+ }
+ }
+
+ for (StreamMeta downStream: om.outputStreams.values()) {
+ for (InputPortMeta sink : downStream.sinks) {
+ OperatorMeta successor = sink.getOperatorWrapper();
+ if (isDelayOperator) {
+ // Check whether all downstream operators are already visited in the path
+ if (successor != null && !stack.contains(successor)) {
+ LOG.debug("detected DelayOperator does not immediately output to a visited operator {}.{}->{}.{}",
+ om.getName(), downStream.getSource().getPortName(), successor.getName(), sink.getPortName());
+ invalidDelays.add(Arrays.asList(om.getName(), successor.getName()));
+ }
+ } else {
+ findInvalidDelays(successor, invalidDelays);
+ }
+ }
+ }
+ stack.pop();
+ }
+
private void validateProcessingMode(OperatorMeta om, Set<OperatorMeta> visited)
{
for (StreamMeta is : om.getInputStreams().values()) {
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java
index 6adfd64..ae276d8 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java
@@ -81,6 +81,7 @@ public class PTOperator implements java.io.Serializable
public final PartitionKeys partitions;
public final PTOutput source;
public final String portName;
+ public final boolean delay;
/**
*
@@ -90,7 +91,7 @@ public class PTOperator implements java.io.Serializable
* @param partitions
* @param source
*/
- protected PTInput(String portName, StreamMeta logicalStream, PTOperator target, PartitionKeys partitions, PTOutput source)
+ protected PTInput(String portName, StreamMeta logicalStream, PTOperator target, PartitionKeys partitions, PTOutput source, boolean delay)
{
this.logicalStream = logicalStream;
this.target = target;
@@ -98,6 +99,7 @@ public class PTOperator implements java.io.Serializable
this.source = source;
this.portName = portName;
this.source.sinks.add(this);
+ this.delay = delay;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index 829a6fd..da96ef3 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -328,8 +328,11 @@ public class PhysicalPlan implements Serializable
boolean upstreamDeployed = true;
- for (StreamMeta s : n.getInputStreams().values()) {
- if (s.getSource() != null && !this.logicalToPTOperator.containsKey(s.getSource().getOperatorMeta())) {
+ for (Map.Entry<InputPortMeta, StreamMeta> entry : n.getInputStreams().entrySet()) {
+ StreamMeta s = entry.getValue();
+ boolean delay = entry.getKey().getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR);
+ // skip delay sources since it's going to be handled as downstream
+ if (!delay && s.getSource() != null && !this.logicalToPTOperator.containsKey(s.getSource().getOperatorMeta())) {
pendingNodes.push(n);
pendingNodes.push(s.getSource().getOperatorMeta());
upstreamDeployed = false;
@@ -907,7 +910,10 @@ public class PhysicalPlan implements Serializable
for (Map.Entry<InputPortMeta, StreamMeta> ipm : m.logicalOperator.getInputStreams().entrySet()) {
PMapping sourceMapping = this.logicalToPTOperator.get(ipm.getValue().getSource().getOperatorMeta());
-
+ if (ipm.getValue().getSource().getOperatorMeta().getOperator() instanceof Operator.DelayOperator) {
+ // skip if the source is a DelayOperator
+ continue;
+ }
if (ipm.getKey().getValue(PortContext.PARTITION_PARALLEL)) {
if (sourceMapping.partitions.size() < m.partitions.size()) {
throw new AssertionError("Number of partitions don't match in parallel mapping " + sourceMapping.logicalOperator.getName() + " -> " + m.logicalOperator.getName() + ", " + sourceMapping.partitions.size() + " -> " + m.partitions.size());
@@ -942,11 +948,11 @@ public class PhysicalPlan implements Serializable
PTOperator slidingUnifier = StreamMapping.createSlidingUnifier(sourceOut.logicalStream, this,
sourceOM.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT), slidingWindowCount);
StreamMapping.addInput(slidingUnifier, sourceOut, null);
- input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, slidingUnifier.outputs.get(0));
+ input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, slidingUnifier.outputs.get(0), ipm.getKey().getValue(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR));
sourceMapping.outputStreams.get(ipm.getValue().getSource()).slidingUnifiers.add(slidingUnifier);
}
else {
- input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, sourceOut);
+ input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, sourceOut, ipm.getKey().getValue(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR));
}
oper.inputs.add(input);
}
@@ -1445,6 +1451,9 @@ public class PhysicalPlan implements Serializable
PMapping upstreamPartitioned = null;
for (Map.Entry<LogicalPlan.InputPortMeta, StreamMeta> e : om.getInputStreams().entrySet()) {
+ if (e.getValue().getSource().getOperatorMeta().getOperator() instanceof Operator.DelayOperator) {
+ continue;
+ }
PMapping m = logicalToPTOperator.get(e.getValue().getSource().getOperatorMeta());
if (e.getKey().getValue(PortContext.PARTITION_PARALLEL).equals(true)) {
// operator partitioned with upstream
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
index d42c327..91c6eef 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
@@ -347,7 +347,7 @@ public class StreamMapping implements java.io.Serializable
// link to upstream output(s) for this stream
for (PTOutput upstreamOut : sourceOper.outputs) {
if (upstreamOut.logicalStream == streamMeta) {
- PTInput input = new PTInput(ipm.getPortName(), streamMeta, oper, pks, upstreamOut);
+ PTInput input = new PTInput(ipm.getPortName(), streamMeta, oper, pks, upstreamOut, ipm.getValue(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR));
oper.inputs.add(input);
}
}
@@ -356,7 +356,7 @@ public class StreamMapping implements java.io.Serializable
public static void addInput(PTOperator target, PTOutput upstreamOut, PartitionKeys pks)
{
StreamMeta lStreamMeta = upstreamOut.logicalStream;
- PTInput input = new PTInput("<merge#" + lStreamMeta.getSource().getPortName() + ">", lStreamMeta, target, pks, upstreamOut);
+ PTInput input = new PTInput("<merge#" + lStreamMeta.getSource().getPortName() + ">", lStreamMeta, target, pks, upstreamOut, false);
target.inputs.add(input);
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/main/java/com/datatorrent/stram/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/tuple/Tuple.java b/engine/src/main/java/com/datatorrent/stram/tuple/Tuple.java
index 23c197b..9191b65 100644
--- a/engine/src/main/java/com/datatorrent/stram/tuple/Tuple.java
+++ b/engine/src/main/java/com/datatorrent/stram/tuple/Tuple.java
@@ -52,6 +52,11 @@ public class Tuple
return windowId;
}
+ public void setWindowId(long windowId)
+ {
+ this.windowId = windowId;
+ }
+
public final int getBaseSeconds()
{
return (int)(windowId >> 32);
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java b/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java
index 3f97b54..1c17d68 100644
--- a/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java
@@ -76,8 +76,7 @@ public class TupleRecorderTest
public TupleRecorder getTupleRecorder(final StramLocalCluster localCluster, final PTOperator op)
{
TupleRecorderCollection instance = (TupleRecorderCollection)localCluster.getContainer(op).getInstance(classname);
- TupleRecorder tupleRecorder = instance.getTupleRecorder(op.getId(), null);
- return tupleRecorder;
+ return instance.getTupleRecorder(op.getId(), null);
}
public class Tuple
@@ -89,8 +88,7 @@ public class TupleRecorderTest
@Test
public void testRecorder() throws IOException
{
- FileSystem fs = new LocalFileSystem();
- try {
+ try (FileSystem fs = new LocalFileSystem()) {
TupleRecorder recorder = new TupleRecorder(null, "application_test_id_1");
recorder.getStorage().setBytesPerPartFile(4096);
recorder.getStorage().setLocalMode(true);
@@ -132,80 +130,76 @@ public class TupleRecorderTest
fs.initialize((new Path(recorder.getStorage().getBasePath()).toUri()), new Configuration());
Path path;
- FSDataInputStream is;
String line;
- BufferedReader br;
path = new Path(recorder.getStorage().getBasePath(), FSPartFileCollection.INDEX_FILE);
- is = fs.open(path);
- br = new BufferedReader(new InputStreamReader(is));
-
- line = br.readLine();
- // Assert.assertEquals("check index", "B:1000:T:0:part0.txt", line);
- Assert.assertTrue("check index", line.matches("F:part0.txt:\\d+-\\d+:4:T:1000-1000:33:\\{\"3\":\"1\",\"1\":\"1\",\"0\":\"1\",\"2\":\"1\"\\}"));
+ try (FSDataInputStream is = fs.open(path);
+ BufferedReader br = new BufferedReader(new InputStreamReader(is))) {
+ line = br.readLine();
+ // Assert.assertEquals("check index", "B:1000:T:0:part0.txt", line);
+ Assert.assertTrue("check index", line
+ .matches("F:part0.txt:\\d+-\\d+:4:T:1000-1000:33:\\{\"3\":\"1\",\"1\":\"1\",\"0\":\"1\",\"2\":\"1\"\\}"));
+ }
path = new Path(recorder.getStorage().getBasePath(), FSPartFileCollection.META_FILE);
- is = fs.open(path);
- br = new BufferedReader(new InputStreamReader(is));
-
- ObjectMapper mapper = new ObjectMapper();
- line = br.readLine();
- Assert.assertEquals("check version", "1.2", line);
- br.readLine(); // RecordInfo
- //RecordInfo ri = mapper.readValue(line, RecordInfo.class);
- line = br.readLine();
- PortInfo pi = mapper.readValue(line, PortInfo.class);
- Assert.assertEquals("port1", recorder.getPortInfoMap().get(pi.name).id, pi.id);
- Assert.assertEquals("port1", recorder.getPortInfoMap().get(pi.name).type, pi.type);
- line = br.readLine();
- pi = mapper.readValue(line, PortInfo.class);
- Assert.assertEquals("port2", recorder.getPortInfoMap().get(pi.name).id, pi.id);
- Assert.assertEquals("port2", recorder.getPortInfoMap().get(pi.name).type, pi.type);
- line = br.readLine();
- pi = mapper.readValue(line, PortInfo.class);
- Assert.assertEquals("port3", recorder.getPortInfoMap().get(pi.name).id, pi.id);
- Assert.assertEquals("port3", recorder.getPortInfoMap().get(pi.name).type, pi.type);
- line = br.readLine();
- pi = mapper.readValue(line, PortInfo.class);
- Assert.assertEquals("port4", recorder.getPortInfoMap().get(pi.name).id, pi.id);
- Assert.assertEquals("port4", recorder.getPortInfoMap().get(pi.name).type, pi.type);
- Assert.assertEquals("port size", 4, recorder.getPortInfoMap().size());
- //line = br.readLine();
-
+ try (FSDataInputStream is = fs.open(path);
+ BufferedReader br = new BufferedReader(new InputStreamReader(is))) {
+
+ ObjectMapper mapper = new ObjectMapper();
+ line = br.readLine();
+ Assert.assertEquals("check version", "1.2", line);
+ br.readLine(); // RecordInfo
+ //RecordInfo ri = mapper.readValue(line, RecordInfo.class);
+ line = br.readLine();
+ PortInfo pi = mapper.readValue(line, PortInfo.class);
+ Assert.assertEquals("port1", recorder.getPortInfoMap().get(pi.name).id, pi.id);
+ Assert.assertEquals("port1", recorder.getPortInfoMap().get(pi.name).type, pi.type);
+ line = br.readLine();
+ pi = mapper.readValue(line, PortInfo.class);
+ Assert.assertEquals("port2", recorder.getPortInfoMap().get(pi.name).id, pi.id);
+ Assert.assertEquals("port2", recorder.getPortInfoMap().get(pi.name).type, pi.type);
+ line = br.readLine();
+ pi = mapper.readValue(line, PortInfo.class);
+ Assert.assertEquals("port3", recorder.getPortInfoMap().get(pi.name).id, pi.id);
+ Assert.assertEquals("port3", recorder.getPortInfoMap().get(pi.name).type, pi.type);
+ line = br.readLine();
+ pi = mapper.readValue(line, PortInfo.class);
+ Assert.assertEquals("port4", recorder.getPortInfoMap().get(pi.name).id, pi.id);
+ Assert.assertEquals("port4", recorder.getPortInfoMap().get(pi.name).type, pi.type);
+ Assert.assertEquals("port size", 4, recorder.getPortInfoMap().size());
+ //line = br.readLine();
+ }
path = new Path(recorder.getStorage().getBasePath(), "part0.txt");
- is = fs.open(path);
- br = new BufferedReader(new InputStreamReader(is));
+ try (FSDataInputStream is = fs.open(path);
+ BufferedReader br = new BufferedReader(new InputStreamReader(is))) {
- line = br.readLine();
- Assert.assertTrue("check part0", line.startsWith("B:"));
- Assert.assertTrue("check part0", line.endsWith(":1000"));
+ line = br.readLine();
+ Assert.assertTrue("check part0", line.startsWith("B:"));
+ Assert.assertTrue("check part0", line.endsWith(":1000"));
- line = br.readLine();
- Assert.assertTrue("check part0 1", line.startsWith("T:"));
- Assert.assertTrue("check part0 1", line.endsWith(":0:30:{\"key\":\"speed\",\"value\":\"5m/h\"}"));
+ line = br.readLine();
+ Assert.assertTrue("check part0 1", line.startsWith("T:"));
+ Assert.assertTrue("check part0 1", line.endsWith(":0:30:{\"key\":\"speed\",\"value\":\"5m/h\"}"));
- line = br.readLine();
- Assert.assertTrue("check part0 2", line.startsWith("T:"));
- Assert.assertTrue("check part0 2", line.endsWith(":2:30:{\"key\":\"speed\",\"value\":\"4m/h\"}"));
+ line = br.readLine();
+ Assert.assertTrue("check part0 2", line.startsWith("T:"));
+ Assert.assertTrue("check part0 2", line.endsWith(":2:30:{\"key\":\"speed\",\"value\":\"4m/h\"}"));
- line = br.readLine();
- Assert.assertTrue("check part0 3", line.startsWith("T:"));
- Assert.assertTrue("check part0 3", line.endsWith(":1:30:{\"key\":\"speed\",\"value\":\"6m/h\"}"));
+ line = br.readLine();
+ Assert.assertTrue("check part0 3", line.startsWith("T:"));
+ Assert.assertTrue("check part0 3", line.endsWith(":1:30:{\"key\":\"speed\",\"value\":\"6m/h\"}"));
- line = br.readLine();
- Assert.assertTrue("check part0 4", line.startsWith("T:"));
- Assert.assertTrue("check part0 4", line.endsWith(":3:30:{\"key\":\"speed\",\"value\":\"2m/h\"}"));
+ line = br.readLine();
+ Assert.assertTrue("check part0 4", line.startsWith("T:"));
+ Assert.assertTrue("check part0 4", line.endsWith(":3:30:{\"key\":\"speed\",\"value\":\"2m/h\"}"));
- line = br.readLine();
- Assert.assertTrue("check part0 5", line.startsWith("E:"));
- Assert.assertTrue("check part0 5", line.endsWith(":1000"));
- }
- catch (IOException ex) {
+ line = br.readLine();
+ Assert.assertTrue("check part0 5", line.startsWith("E:"));
+ Assert.assertTrue("check part0 5", line.endsWith(":1000"));
+ }
+ } catch (IOException ex) {
throw new RuntimeException(ex);
}
- finally {
- fs.close();
- }
}
private static final File testWorkDir = new File("target", TupleRecorderTest.class.getName());
@@ -234,17 +228,17 @@ public class TupleRecorderTest
final PTOperator ptOp2 = localCluster.findByLogicalNode(dag.getMeta(op2));
StramTestSupport.waitForActivation(localCluster, ptOp2);
- testRecordingOnOperator(localCluster, ptOp2, 2);
+ testRecordingOnOperator(localCluster, ptOp2);
final PTOperator ptOp1 = localCluster.findByLogicalNode(dag.getMeta(op1));
StramTestSupport.waitForActivation(localCluster, ptOp1);
- testRecordingOnOperator(localCluster, ptOp1, 1);
+ testRecordingOnOperator(localCluster, ptOp1);
localCluster.shutdown();
}
- private void testRecordingOnOperator(final StramLocalCluster localCluster, final PTOperator op, int numPorts) throws Exception
+ private void testRecordingOnOperator(final StramLocalCluster localCluster, final PTOperator op) throws Exception
{
String id = "xyz";
localCluster.getStreamingContainerManager().startRecording(id, op.getId(), null, 0);
@@ -259,25 +253,30 @@ public class TupleRecorderTest
};
Assert.assertTrue("Should get a tuple recorder within 10 seconds", StramTestSupport.awaitCompletion(c, 10000));
- TupleRecorder tupleRecorder = getTupleRecorder(localCluster, op);
+ final TupleRecorder tupleRecorder = getTupleRecorder(localCluster, op);
long startTime = tupleRecorder.getStartTime();
- BufferedReader br;
String line;
File dir = new File(testWorkDir, "recordings/" + op.getId() + "/" + id);
File file;
- file = new File(dir, "meta.txt");
+ file = new File(dir, FSPartFileCollection.META_FILE);
Assert.assertTrue("meta file should exist", file.exists());
- br = new BufferedReader(new FileReader(file));
- line = br.readLine();
- Assert.assertEquals("version should be 1.2", "1.2", line);
- line = br.readLine();
- JSONObject json = new JSONObject(line);
- Assert.assertEquals("Start time verification", startTime, json.getLong("startTime"));
-
- for (int i = 0; i < numPorts; i++) {
+ int numPorts = tupleRecorder.getSinkMap().size();
+
+ try (BufferedReader br = new BufferedReader(new FileReader(file))) {
line = br.readLine();
- Assert.assertTrue("should contain name, streamName, type and id", line != null && line.contains("\"name\"") && line.contains("\"streamName\"") && line.contains("\"type\"") && line.contains("\"id\""));
+ Assert.assertEquals("version should be 1.2", "1.2", line);
+ line = br.readLine();
+ JSONObject json = new JSONObject(line);
+ Assert.assertEquals("Start time verification", startTime, json.getLong("startTime"));
+ Assert.assertTrue(numPorts > 0);
+
+ for (int i = 0; i < numPorts; i++) {
+ line = br.readLine();
+ Assert.assertTrue("should contain name, streamName, type and id", line != null && line
+ .contains("\"name\"") && line.contains("\"streamName\"") && line.contains("\"type\"") && line
+ .contains("\"id\""));
+ }
}
c = new WaitCondition()
@@ -285,7 +284,6 @@ public class TupleRecorderTest
@Override
public boolean isComplete()
{
- TupleRecorder tupleRecorder = getTupleRecorder(localCluster, op);
return (tupleRecorder.getTotalTupleCount() >= testTupleCount);
}
@@ -306,24 +304,23 @@ public class TupleRecorderTest
};
Assert.assertTrue("Tuple recorder shouldn't exist any more after stopping", StramTestSupport.awaitCompletion(c, 5000));
- file = new File(dir, "index.txt");
+ file = new File(dir, FSPartFileCollection.INDEX_FILE);
Assert.assertTrue("index file should exist", file.exists());
- br = new BufferedReader(new FileReader(file));
- ArrayList<String> partFiles = new ArrayList<String>();
+ ArrayList<String> partFiles = new ArrayList<>();
int indexCount = 0;
- while ((line = br.readLine()) != null) {
- String partFile = "part" + indexCount + ".txt";
- if (line.startsWith("F:" + partFile + ":")) {
- partFiles.add(partFile);
- indexCount++;
- }
- else if (line.startsWith("E")) {
- Assert.assertEquals("index file should end after E line", br.readLine(), null);
- break;
- }
- else {
- Assert.fail("index file line is not starting with F or E");
+ try (BufferedReader br = new BufferedReader(new FileReader(file))) {
+ while ((line = br.readLine()) != null) {
+ String partFile = "part" + indexCount + ".txt";
+ if (line.startsWith("F:" + partFile + ":")) {
+ partFiles.add(partFile);
+ indexCount++;
+ } else if (line.startsWith("E")) {
+ Assert.assertEquals("index file should end after E line", br.readLine(), null);
+ break;
+ } else {
+ Assert.fail("index file line is not starting with F or E");
+ }
}
}
@@ -337,17 +334,16 @@ public class TupleRecorderTest
Assert.assertTrue(partFile + " should be greater than 1KB", file.length() >= 1024);
}
Assert.assertTrue(partFile + " should exist", file.exists());
- br = new BufferedReader(new FileReader(file));
- while ((line = br.readLine()) != null) {
- if (line.startsWith("B:")) {
- beginWindowExists = true;
- }
- else if (line.startsWith("E:")) {
- endWindowExists = true;
- }
- else if (line.startsWith("T:")) {
- String[] parts = line.split(":");
- tupleCount[Integer.valueOf(parts[2])]++;
+ try (BufferedReader br = new BufferedReader(new FileReader(file))) {
+ while ((line = br.readLine()) != null) {
+ if (line.startsWith("B:")) {
+ beginWindowExists = true;
+ } else if (line.startsWith("E:")) {
+ endWindowExists = true;
+ } else if (line.startsWith("T:")) {
+ String[] parts = line.split(":");
+ tupleCount[Integer.valueOf(parts[2])]++;
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
index c7e8ccc..2577504 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
@@ -277,6 +277,8 @@ public class GenericNodeTest
gn.connectInputPort("ip1", reservoir1);
gn.connectInputPort("ip2", reservoir2);
gn.connectOutputPort("op", output);
+ gn.firstWindowMillis = 0;
+ gn.windowWidthMillis = 100;
final AtomicBoolean ab = new AtomicBoolean(false);
Thread t = new Thread()
@@ -382,6 +384,8 @@ public class GenericNodeTest
gn.connectInputPort("ip1", reservoir1);
gn.connectInputPort("ip2", reservoir2);
gn.connectOutputPort("op", Sink.BLACKHOLE);
+ gn.firstWindowMillis = 0;
+ gn.windowWidthMillis = 100;
final AtomicBoolean ab = new AtomicBoolean(false);
Thread t = new Thread()
@@ -493,6 +497,8 @@ public class GenericNodeTest
in.connectInputPort("ip1", windowGenerator.acquireReservoir(String.valueOf(in.id), 1024));
in.connectOutputPort("output", testSink);
+ in.firstWindowMillis = 0;
+ in.windowWidthMillis = 100;
windowGenerator.activate(null);
@@ -551,9 +557,13 @@ public class GenericNodeTest
final long sleepTime = 25L;
WindowGenerator windowGenerator = new WindowGenerator(new ScheduledThreadPoolExecutor(1, "WindowGenerator"), 1024);
- windowGenerator.setResetWindow(0L);
- windowGenerator.setFirstWindow(1448909287863L);
- windowGenerator.setWindowWidth(100);
+ long resetWindow = 0L;
+ long firstWindowMillis = 1448909287863L;
+ int windowWidth = 100;
+
+ windowGenerator.setResetWindow(resetWindow);
+ windowGenerator.setFirstWindow(firstWindowMillis);
+ windowGenerator.setWindowWidth(windowWidth);
windowGenerator.setCheckpointCount(1, 0);
GenericOperator go = new GenericOperator();
@@ -576,6 +586,8 @@ public class GenericNodeTest
gn.connectInputPort("ip1", windowGenerator.acquireReservoir(String.valueOf(gn.id), 1024));
gn.connectOutputPort("output", testSink);
+ gn.firstWindowMillis = firstWindowMillis;
+ gn.windowWidthMillis = windowWidth;
windowGenerator.activate(null);
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/test/java/com/datatorrent/stram/engine/GenericTestOperator.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/GenericTestOperator.java b/engine/src/test/java/com/datatorrent/stram/engine/GenericTestOperator.java
index 0c8ae62..a3b0c53 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/GenericTestOperator.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericTestOperator.java
@@ -132,6 +132,9 @@ public class GenericTestOperator extends BaseOperator {
if (outport1.isConnected()) {
outport1.emit(o);
}
+ if (outport2.isConnected()) {
+ outport2.emit(o);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f7e1ccf1/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
new file mode 100644
index 0000000..359da17
--- /dev/null
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.stram.plan.logical;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.validation.ValidationException;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.common.util.DefaultDelayOperator;
+import com.datatorrent.stram.StramLocalCluster;
+import com.datatorrent.stram.engine.GenericTestOperator;
+import com.datatorrent.stram.engine.TestGeneratorInputOperator;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Unit tests for topologies with delay operator
+ */
+public class DelayOperatorTest
+{
+ private static Lock sequential = new ReentrantLock();
+
+ @Before
+ public void setup()
+ {
+ sequential.lock();
+ }
+
+ @After
+ public void teardown()
+ {
+ sequential.unlock();
+ }
+
+ @Test
+ public void testInvalidDelayDetection()
+ {
+ LogicalPlan dag = new LogicalPlan();
+
+ GenericTestOperator opB = dag.addOperator("B", GenericTestOperator.class);
+ GenericTestOperator opC = dag.addOperator("C", GenericTestOperator.class);
+ GenericTestOperator opD = dag.addOperator("D", GenericTestOperator.class);
+ DefaultDelayOperator opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class);
+
+ dag.addStream("BtoC", opB.outport1, opC.inport1);
+ dag.addStream("CtoD", opC.outport1, opD.inport1);
+ dag.addStream("CtoDelay", opC.outport2, opDelay.input);
+ dag.addStream("DelayToD", opDelay.output, opD.inport2);
+
+ List<List<String>> invalidDelays = new ArrayList<>();
+ dag.findInvalidDelays(dag.getMeta(opB), invalidDelays);
+ assertEquals("operator invalid delay", 1, invalidDelays.size());
+
+ try {
+ dag.validate();
+ fail("validation should fail");
+ } catch (ValidationException e) {
+ // expected
+ }
+
+ dag = new LogicalPlan();
+
+ opB = dag.addOperator("B", GenericTestOperator.class);
+ opC = dag.addOperator("C", GenericTestOperator.class);
+ opD = dag.addOperator("D", GenericTestOperator.class);
+ opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class);
+ dag.setAttribute(opDelay, Context.OperatorContext.APPLICATION_WINDOW_COUNT, 2);
+ dag.addStream("BtoC", opB.outport1, opC.inport1);
+ dag.addStream("CtoD", opC.outport1, opD.inport1);
+ dag.addStream("CtoDelay", opC.outport2, opDelay.input);
+ dag.addStream("DelayToC", opDelay.output, opC.inport2);
+
+ invalidDelays = new ArrayList<>();
+ dag.findInvalidDelays(dag.getMeta(opB), invalidDelays);
+ assertEquals("operator invalid delay", 1, invalidDelays.size());
+
+ try {
+ dag.validate();
+ fail("validation should fail");
+ } catch (ValidationException e) {
+ // expected
+ }
+
+ dag = new LogicalPlan();
+
+ opB = dag.addOperator("B", GenericTestOperator.class);
+ opC = dag.addOperator("C", GenericTestOperator.class);
+ opD = dag.addOperator("D", GenericTestOperator.class);
+ opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class);
+ dag.addStream("BtoC", opB.outport1, opC.inport1);
+ dag.addStream("CtoD", opC.outport1, opD.inport1);
+ dag.addStream("CtoDelay", opC.outport2, opDelay.input).setLocality(DAG.Locality.THREAD_LOCAL);
+ dag.addStream("DelayToC", opDelay.output, opC.inport2).setLocality(DAG.Locality.THREAD_LOCAL);
+
+ try {
+ dag.validate();
+ fail("validation should fail");
+ } catch (ValidationException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testValidDelay()
+ {
+ LogicalPlan dag = new LogicalPlan();
+
+ TestGeneratorInputOperator opA = dag.addOperator("A", TestGeneratorInputOperator.class);
+ GenericTestOperator opB = dag.addOperator("B", GenericTestOperator.class);
+ GenericTestOperator opC = dag.addOperator("C", GenericTestOperator.class);
+ GenericTestOperator opD = dag.addOperator("D", GenericTestOperator.class);
+ DefaultDelayOperator opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class);
+
+ dag.addStream("AtoB", opA.outport, opB.inport1);
+ dag.addStream("BtoC", opB.outport1, opC.inport1);
+ dag.addStream("CtoD", opC.outport1, opD.inport1);
+ dag.addStream("CtoDelay", opC.outport2, opDelay.input);
+ dag.addStream("DelayToB", opDelay.output, opB.inport2);
+ dag.validate();
+ }
+
+ public static final Long[] FIBONACCI_NUMBERS = new Long[]{
+ 1L, 1L, 2L, 3L, 5L, 8L, 13L, 21L, 34L, 55L, 89L, 144L, 233L, 377L, 610L, 987L, 1597L, 2584L, 4181L, 6765L,
+ 10946L, 17711L, 28657L, 46368L, 75025L, 121393L, 196418L, 317811L, 514229L, 832040L, 1346269L, 2178309L,
+ 3524578L, 5702887L, 9227465L, 14930352L, 24157817L, 39088169L, 63245986L, 102334155L
+ };
+
+ public static class FibonacciOperator extends BaseOperator
+ {
+ public static List<Long> results = new ArrayList<>();
+ public long currentNumber = 1;
+ private transient long tempNum;
+
+ public transient DefaultInputPort<Object> dummyInputPort = new DefaultInputPort<Object>()
+ {
+ @Override
+ public void process(Object tuple)
+ {
+ }
+ };
+ public transient DefaultInputPort<Long> input = new DefaultInputPort<Long>()
+ {
+ @Override
+ public void process(Long tuple)
+ {
+ tempNum = tuple;
+ }
+ };
+ public transient DefaultOutputPort<Long> output = new DefaultOutputPort<>();
+
+ @Override
+ public void endWindow()
+ {
+ output.emit(currentNumber);
+ results.add(currentNumber);
+ currentNumber += tempNum;
+ if (currentNumber <= 0) {
+ // overflow
+ currentNumber = 1;
+ }
+ }
+
+ }
+
+ public static class FailableFibonacciOperator extends FibonacciOperator implements Operator.CheckpointListener
+ {
+ private boolean committed = false;
+ private int simulateFailureWindows = 0;
+ private boolean simulateFailureAfterCommit = false;
+ private int windowCount = 0;
+ public static boolean failureSimulated = false;
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ if (simulateFailureWindows > 0 && ((simulateFailureAfterCommit && committed) || !simulateFailureAfterCommit) &&
+ !failureSimulated) {
+ if (windowCount++ == simulateFailureWindows) {
+ failureSimulated = true;
+ throw new RuntimeException("simulating failure");
+ }
+ }
+ }
+
+ @Override
+ public void checkpointed(long windowId)
+ {
+ }
+
+ @Override
+ public void committed(long windowId)
+ {
+ committed = true;
+ }
+
+ public void setSimulateFailureWindows(int windows, boolean afterCommit)
+ {
+ this.simulateFailureAfterCommit = afterCommit;
+ this.simulateFailureWindows = windows;
+ }
+ }
+
+ public static class FailableDelayOperator extends DefaultDelayOperator implements Operator.CheckpointListener
+ {
+ private boolean committed = false;
+ private int simulateFailureWindows = 0;
+ private boolean simulateFailureAfterCommit = false;
+ private int windowCount = 0;
+ private static boolean failureSimulated = false;
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ super.beginWindow(windowId);
+ if (simulateFailureWindows > 0 && ((simulateFailureAfterCommit && committed) || !simulateFailureAfterCommit) &&
+ !failureSimulated) {
+ if (windowCount++ == simulateFailureWindows) {
+ failureSimulated = true;
+ throw new RuntimeException("simulating failure");
+ }
+ }
+ }
+
+ @Override
+ public void checkpointed(long windowId)
+ {
+ }
+
+ @Override
+ public void committed(long windowId)
+ {
+ committed = true;
+ }
+
+ public void setSimulateFailureWindows(int windows, boolean afterCommit)
+ {
+ this.simulateFailureAfterCommit = afterCommit;
+ this.simulateFailureWindows = windows;
+ }
+ }
+
+
+ @Test
+ public void testFibonacci() throws Exception
+ {
+ LogicalPlan dag = new LogicalPlan();
+
+ TestGeneratorInputOperator dummyInput = dag.addOperator("DUMMY", TestGeneratorInputOperator.class);
+ FibonacciOperator fib = dag.addOperator("FIB", FibonacciOperator.class);
+ DefaultDelayOperator opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class);
+
+ dag.addStream("dummy_to_operator", dummyInput.outport, fib.dummyInputPort);
+ dag.addStream("operator_to_delay", fib.output, opDelay.input);
+ dag.addStream("delay_to_operator", opDelay.output, fib.input);
+ FibonacciOperator.results.clear();
+ final StramLocalCluster localCluster = new StramLocalCluster(dag);
+ localCluster.setExitCondition(new Callable<Boolean>()
+ {
+ @Override
+ public Boolean call() throws Exception
+ {
+ return FibonacciOperator.results.size() >= 10;
+ }
+ });
+ localCluster.run(10000);
+ Assert.assertArrayEquals(Arrays.copyOfRange(FIBONACCI_NUMBERS, 0, 10),
+ FibonacciOperator.results.subList(0, 10).toArray());
+ }
+
+ @Ignore // Out of sequence BEGIN_WINDOW tuple on Travis. Will tackle in the next version
+ @Test
+ public void testFibonacciRecovery1() throws Exception
+ {
+ LogicalPlan dag = new LogicalPlan();
+
+ TestGeneratorInputOperator dummyInput = dag.addOperator("DUMMY", TestGeneratorInputOperator.class);
+ FailableFibonacciOperator fib = dag.addOperator("FIB", FailableFibonacciOperator.class);
+ DefaultDelayOperator opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class);
+
+ fib.setSimulateFailureWindows(3, true);
+
+ dag.addStream("dummy_to_operator", dummyInput.outport, fib.dummyInputPort);
+ dag.addStream("operator_to_delay", fib.output, opDelay.input);
+ dag.addStream("delay_to_operator", opDelay.output, fib.input);
+ dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
+ dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
+ FailableFibonacciOperator.results.clear();
+ FailableFibonacciOperator.failureSimulated = false;
+ final StramLocalCluster localCluster = new StramLocalCluster(dag);
+ localCluster.setPerContainerBufferServer(true);
+ localCluster.setExitCondition(new Callable<Boolean>()
+ {
+ @Override
+ public Boolean call() throws Exception
+ {
+ return FailableFibonacciOperator.results.size() >= 30;
+ }
+ });
+ localCluster.run(60000);
+ Assert.assertTrue("failure should be invoked", FailableFibonacciOperator.failureSimulated);
+ Assert.assertArrayEquals(Arrays.copyOfRange(new TreeSet<>(Arrays.asList(FIBONACCI_NUMBERS)).toArray(), 0, 20),
+ Arrays.copyOfRange(new TreeSet<>(FibonacciOperator.results).toArray(), 0, 20));
+ }
+
+ @Ignore // Out of sequence BEGIN_WINDOW tuple on Travis. Will tackle in the next version
+ @Test
+ public void testFibonacciRecovery2() throws Exception
+ {
+ LogicalPlan dag = new LogicalPlan();
+
+ TestGeneratorInputOperator dummyInput = dag.addOperator("DUMMY", TestGeneratorInputOperator.class);
+ FibonacciOperator fib = dag.addOperator("FIB", FibonacciOperator.class);
+ FailableDelayOperator opDelay = dag.addOperator("opDelay", FailableDelayOperator.class);
+
+ opDelay.setSimulateFailureWindows(5, true);
+
+ dag.addStream("dummy_to_operator", dummyInput.outport, fib.dummyInputPort);
+ dag.addStream("operator_to_delay", fib.output, opDelay.input);
+ dag.addStream("delay_to_operator", opDelay.output, fib.input);
+ dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
+ dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
+ FibonacciOperator.results.clear();
+ FailableDelayOperator.failureSimulated = false;
+ final StramLocalCluster localCluster = new StramLocalCluster(dag);
+ localCluster.setPerContainerBufferServer(true);
+ localCluster.setExitCondition(new Callable<Boolean>()
+ {
+ @Override
+ public Boolean call() throws Exception
+ {
+ return FibonacciOperator.results.size() >= 30;
+ }
+ });
+ localCluster.run(60000);
+
+ Assert.assertTrue("failure should be invoked", FailableDelayOperator.failureSimulated);
+ Assert.assertArrayEquals(Arrays.copyOfRange(new TreeSet<>(Arrays.asList(FIBONACCI_NUMBERS)).toArray(), 0, 20),
+ Arrays.copyOfRange(new TreeSet<>(FibonacciOperator.results).toArray(), 0, 20));
+ }
+
+
+}
[3/3] incubator-apex-core git commit: APEXCORE-306 Update checkpoints
for strongly connected operators as group.
Posted by th...@apache.org.
APEXCORE-306 Update checkpoints for strongly connected operators as group.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/b3402be5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/b3402be5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/b3402be5
Branch: refs/heads/devel-3
Commit: b3402be5a45728515f4a8328fec5a76ddede0350
Parents: 4d5828c
Author: Thomas Weise <th...@datatorrent.com>
Authored: Thu Jan 21 16:39:55 2016 -0800
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Jan 22 19:24:31 2016 -0800
----------------------------------------------------------------------
.../stram/StreamingContainerManager.java | 161 +++++++++++++------
.../com/datatorrent/stram/api/Checkpoint.java | 11 ++
.../stram/plan/logical/LogicalPlan.java | 91 +++++++----
.../com/datatorrent/stram/CheckpointTest.java | 3 +-
.../stram/plan/logical/DelayOperatorTest.java | 88 +++++++++-
.../stram/plan/logical/LogicalPlanTest.java | 131 ++++++++++-----
6 files changed, 358 insertions(+), 127 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b3402be5/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 6233697..a687a37 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -35,6 +35,7 @@ import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
import com.google.common.base.Predicate;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
@@ -159,6 +160,7 @@ public class StreamingContainerManager implements PlanContext
private long lastResourceRequest = 0;
private final Map<String, StreamingContainerAgent> containers = new ConcurrentHashMap<String, StreamingContainerAgent>();
private final List<Pair<PTOperator, Long>> purgeCheckpoints = new ArrayList<Pair<PTOperator, Long>>();
+ private Map<OperatorMeta, Set<OperatorMeta>> checkpointGroups;
private final Map<Long, Set<PTOperator>> shutdownOperators = new HashMap<>();
private CriticalPathInfo criticalPathInfo;
private final ConcurrentMap<PTOperator, PTOperator> reportStats = Maps.newConcurrentMap();
@@ -812,6 +814,7 @@ public class StreamingContainerManager implements PlanContext
Collection<OperatorMeta> logicalOperators = getLogicalPlan().getAllOperators();
//for backward compatibility
for (OperatorMeta operatorMeta : logicalOperators) {
+ @SuppressWarnings("deprecation")
Context.CountersAggregator aggregator = operatorMeta.getValue(OperatorContext.COUNTERS_AGGREGATOR);
if (aggregator == null) {
continue;
@@ -825,6 +828,7 @@ public class StreamingContainerManager implements PlanContext
}
}
if (counters.size() > 0) {
+ @SuppressWarnings("deprecation")
Object aggregate = aggregator.aggregate(counters);
latestLogicalCounters.put(operatorMeta.getName(), aggregate);
}
@@ -857,6 +861,8 @@ public class StreamingContainerManager implements PlanContext
if (windowMetrics == null) {
windowMetrics = new LinkedBlockingQueue<Pair<Long, Map<String, Object>>>(METRIC_QUEUE_SIZE)
{
+ private static final long serialVersionUID = 1L;
+
@Override
public boolean add(Pair<Long, Map<String, Object>> longMapPair)
{
@@ -1134,7 +1140,7 @@ public class StreamingContainerManager implements PlanContext
cs.container.setAllocatedVCores(0);
// resolve dependencies
- UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(clock);
+ UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(clock, false, getCheckpointGroups());
for (PTOperator oper : cs.container.getOperators()) {
updateRecoveryCheckpoints(oper, ctx);
}
@@ -1881,31 +1887,18 @@ public class StreamingContainerManager implements PlanContext
public final Set<PTOperator> blocked = new LinkedHashSet<PTOperator>();
public final long currentTms;
public final boolean recovery;
+ public final Map<OperatorMeta, Set<OperatorMeta>> checkpointGroups;
public UpdateCheckpointsContext(Clock clock)
{
- this.currentTms = clock.getTime();
- this.recovery = false;
+ this(clock, false, Collections.<OperatorMeta, Set<OperatorMeta>>emptyMap());
}
- public UpdateCheckpointsContext(Clock clock, boolean recovery)
+ public UpdateCheckpointsContext(Clock clock, boolean recovery, Map<OperatorMeta, Set<OperatorMeta>> checkpointGroups)
{
this.currentTms = clock.getTime();
this.recovery = recovery;
- }
-
- }
-
- private void addVisited(PTOperator operator, UpdateCheckpointsContext ctx)
- {
- ctx.visited.add(operator);
- for (PTOperator.PTOutput out : operator.getOutputs()) {
- for (PTOperator.PTInput sink : out.sinks) {
- PTOperator sinkOperator = sink.target;
- if (!ctx.visited.contains(sinkOperator)) {
- addVisited(sinkOperator, ctx);
- }
- }
+ this.checkpointGroups = checkpointGroups;
}
}
@@ -1933,20 +1926,55 @@ public class StreamingContainerManager implements PlanContext
}
}
- long maxCheckpoint = operator.getRecentCheckpoint().windowId;
- if (ctx.recovery && maxCheckpoint == Stateless.WINDOW_ID && operator.isOperatorStateLess()) {
- long currentWindowId = WindowGenerator.getWindowId(ctx.currentTms, this.vars.windowStartMillis, this.getLogicalPlan().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS));
- maxCheckpoint = currentWindowId;
+ // the most recent checkpoint eligible for recovery based on downstream state
+ Checkpoint maxCheckpoint = Checkpoint.INITIAL_CHECKPOINT;
+
+ Set<OperatorMeta> checkpointGroup = ctx.checkpointGroups.get(operator.getOperatorMeta());
+ if (checkpointGroup == null) {
+ checkpointGroup = Collections.singleton(operator.getOperatorMeta());
+ }
+ // find intersection of checkpoints that group can collectively move to
+ TreeSet<Checkpoint> commonCheckpoints = new TreeSet<>(new Checkpoint.CheckpointComparator());
+ synchronized (operator.checkpoints) {
+ commonCheckpoints.addAll(operator.checkpoints);
+ }
+ Set<PTOperator> groupOpers = new HashSet<>(checkpointGroup.size());
+ if (checkpointGroup.size() > 1) {
+ for (OperatorMeta om : checkpointGroup) {
+ Collection<PTOperator> operators = plan.getAllOperators(om);
+ for (PTOperator groupOper : operators) {
+ synchronized (groupOper.checkpoints) {
+ commonCheckpoints.retainAll(groupOper.checkpoints);
+ }
+ // visit all downstream operators of the group
+ ctx.visited.add(groupOper);
+ groupOpers.add(groupOper);
+ }
+ }
+ // highest common checkpoint
+ if (!commonCheckpoints.isEmpty()) {
+ maxCheckpoint = commonCheckpoints.last();
+ }
+ } else {
+ // without logical grouping, treat partitions as independent
+ // this is especially important for parallel partitioning
+ ctx.visited.add(operator);
+ groupOpers.add(operator);
+ maxCheckpoint = operator.getRecentCheckpoint();
+ if (ctx.recovery && maxCheckpoint.windowId == Stateless.WINDOW_ID && operator.isOperatorStateLess()) {
+ long currentWindowId = WindowGenerator.getWindowId(ctx.currentTms, this.vars.windowStartMillis, this.getLogicalPlan().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS));
+ maxCheckpoint = new Checkpoint(currentWindowId, 0, 0);
+ }
}
- ctx.visited.add(operator);
// DFS downstream operators
- if (operator.getOperatorMeta().getOperator() instanceof Operator.DelayOperator) {
- addVisited(operator, ctx);
- } else {
- for (PTOperator.PTOutput out : operator.getOutputs()) {
+ for (PTOperator groupOper : groupOpers) {
+ for (PTOperator.PTOutput out : groupOper.getOutputs()) {
for (PTOperator.PTInput sink : out.sinks) {
PTOperator sinkOperator = sink.target;
+ if (groupOpers.contains(sinkOperator)) {
+ continue; // downstream operator within group
+ }
if (!ctx.visited.contains(sinkOperator)) {
// downstream traversal
updateRecoveryCheckpoints(sinkOperator, ctx);
@@ -1954,7 +1982,7 @@ public class StreamingContainerManager implements PlanContext
// recovery window id cannot move backwards
// when dynamically adding new operators
if (sinkOperator.getRecoveryCheckpoint().windowId >= operator.getRecoveryCheckpoint().windowId) {
- maxCheckpoint = Math.min(maxCheckpoint, sinkOperator.getRecoveryCheckpoint().windowId);
+ maxCheckpoint = Checkpoint.min(maxCheckpoint, sinkOperator.getRecoveryCheckpoint());
}
if (ctx.blocked.contains(sinkOperator)) {
@@ -1967,33 +1995,43 @@ public class StreamingContainerManager implements PlanContext
}
}
- // checkpoint frozen during deployment
- if (ctx.recovery || operator.getState() != PTOperator.State.PENDING_DEPLOY) {
- // remove previous checkpoints
- Checkpoint c1 = Checkpoint.INITIAL_CHECKPOINT;
- synchronized (operator.checkpoints) {
- if (!operator.checkpoints.isEmpty() && (operator.checkpoints.getFirst()).windowId <= maxCheckpoint) {
- c1 = operator.checkpoints.getFirst();
- Checkpoint c2;
- while (operator.checkpoints.size() > 1 && ((c2 = operator.checkpoints.get(1)).windowId) <= maxCheckpoint) {
- operator.checkpoints.removeFirst();
- //LOG.debug("Checkpoint to delete: operator={} windowId={}", operator.getName(), c1);
- this.purgeCheckpoints.add(new Pair<PTOperator, Long>(operator, c1.windowId));
- c1 = c2;
+ // find the common checkpoint that is <= downstream recovery checkpoint
+ if (!commonCheckpoints.contains(maxCheckpoint)) {
+ if (!commonCheckpoints.isEmpty()) {
+ maxCheckpoint = Objects.firstNonNull(commonCheckpoints.floor(maxCheckpoint), maxCheckpoint);
+ }
+ }
+
+ for (PTOperator groupOper : groupOpers) {
+ // checkpoint frozen during deployment
+ if (ctx.recovery || groupOper.getState() != PTOperator.State.PENDING_DEPLOY) {
+ // remove previous checkpoints
+ Checkpoint c1 = Checkpoint.INITIAL_CHECKPOINT;
+ LinkedList<Checkpoint> checkpoints = groupOper.checkpoints;
+ synchronized (checkpoints) {
+ if (!checkpoints.isEmpty() && (checkpoints.getFirst()).windowId <= maxCheckpoint.windowId) {
+ c1 = checkpoints.getFirst();
+ Checkpoint c2;
+ while (checkpoints.size() > 1 && ((c2 = checkpoints.get(1)).windowId) <= maxCheckpoint.windowId) {
+ checkpoints.removeFirst();
+ //LOG.debug("Checkpoint to delete: operator={} windowId={}", operator.getName(), c1);
+ this.purgeCheckpoints.add(new Pair<PTOperator, Long>(groupOper, c1.windowId));
+ c1 = c2;
+ }
}
- }
- else {
- if (ctx.recovery && operator.checkpoints.isEmpty() && operator.isOperatorStateLess()) {
- LOG.debug("Adding checkpoint for stateless operator {} {}", operator, Codec.getStringWindowId(maxCheckpoint));
- c1 = operator.addCheckpoint(maxCheckpoint, this.vars.windowStartMillis);
+ else {
+ if (ctx.recovery && checkpoints.isEmpty() && groupOper.isOperatorStateLess()) {
+ LOG.debug("Adding checkpoint for stateless operator {} {}", groupOper, Codec.getStringWindowId(maxCheckpoint.windowId));
+ c1 = groupOper.addCheckpoint(maxCheckpoint.windowId, this.vars.windowStartMillis);
+ }
}
}
+ //LOG.debug("Operator {} checkpoints: commit {} recent {}", new Object[] {operator.getName(), c1, operator.checkpoints});
+ groupOper.setRecoveryCheckpoint(c1);
+ }
+ else {
+ LOG.debug("Skipping checkpoint update {} during {}", groupOper, groupOper.getState());
}
- //LOG.debug("Operator {} checkpoints: commit {} recent {}", new Object[] {operator.getName(), c1, operator.checkpoints});
- operator.setRecoveryCheckpoint(c1);
- }
- else {
- LOG.debug("Skipping checkpoint update {} during {}", operator, operator.getState());
}
}
@@ -2009,13 +2047,32 @@ public class StreamingContainerManager implements PlanContext
return this.vars.windowStartMillis;
}
+ private Map<OperatorMeta, Set<OperatorMeta>> getCheckpointGroups()
+ {
+ if (this.checkpointGroups == null) {
+ this.checkpointGroups = new HashMap<>();
+ LogicalPlan dag = this.plan.getLogicalPlan();
+ dag.resetNIndex();
+ LogicalPlan.ValidationContext vc = new LogicalPlan.ValidationContext();
+ for (OperatorMeta om : dag.getRootOperators()) {
+ this.plan.getLogicalPlan().findStronglyConnected(om, vc);
+ }
+ for (Set<OperatorMeta> checkpointGroup : vc.stronglyConnected) {
+ for (OperatorMeta om : checkpointGroup) {
+ this.checkpointGroups.put(om, checkpointGroup);
+ }
+ }
+ }
+ return checkpointGroups;
+ }
+
/**
* Visit all operators to update current checkpoint based on updated downstream state.
* Purge older checkpoints that are no longer needed.
*/
private long updateCheckpoints(boolean recovery)
{
- UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(clock, recovery);
+ UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(clock, recovery, getCheckpointGroups());
for (OperatorMeta logicalOperator : plan.getLogicalPlan().getRootOperators()) {
//LOG.debug("Updating checkpoints for operator {}", logicalOperator.getName());
List<PTOperator> operators = plan.getOperators(logicalOperator);
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b3402be5/engine/src/main/java/com/datatorrent/stram/api/Checkpoint.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/api/Checkpoint.java b/engine/src/main/java/com/datatorrent/stram/api/Checkpoint.java
index 5ec4a7e..d24b17a 100644
--- a/engine/src/main/java/com/datatorrent/stram/api/Checkpoint.java
+++ b/engine/src/main/java/com/datatorrent/stram/api/Checkpoint.java
@@ -18,6 +18,8 @@
*/
package com.datatorrent.stram.api;
+import java.util.Comparator;
+
import com.datatorrent.api.annotation.Stateless;
import com.datatorrent.bufferserver.util.Codec;
@@ -102,6 +104,15 @@ public class Checkpoint implements com.datatorrent.api.Stats.Checkpoint
return windowId;
}
+ public static class CheckpointComparator implements Comparator<Checkpoint>
+ {
+ @Override
+ public int compare(Checkpoint o1, Checkpoint o2)
+ {
+ return Long.compare(o1.windowId, o2.windowId);
+ }
+ }
+
@SuppressWarnings("FieldNameHidesFieldInSuperclass")
public static final Checkpoint INITIAL_CHECKPOINT = new Checkpoint(Stateless.WINDOW_ID, 0, 0);
private static final long serialVersionUID = 201402152116L;
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b3402be5/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 883ad71..6d7ebe1 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -159,8 +159,6 @@ public class LogicalPlan implements Serializable, DAG
public final Map<String, ModuleMeta> modules = new LinkedHashMap<>();
private final List<OperatorMeta> rootOperators = new ArrayList<OperatorMeta>();
private final Attribute.AttributeMap attributes = new DefaultAttributeMap();
- private transient int nodeIndex = 0; // used for cycle validation
- private transient Stack<OperatorMeta> stack = new Stack<OperatorMeta>(); // used for cycle validation
private transient Map<String, ArrayListMultimap<OutputPort<?>, InputPort<?>>> streamLinks = new HashMap<>();
@Override
@@ -1540,6 +1538,7 @@ public class LogicalPlan implements Serializable, DAG
return this.operators.get(operatorName);
}
+ @Override
public ModuleMeta getModuleMeta(String moduleName)
{
return this.modules.get(moduleName);
@@ -1557,6 +1556,7 @@ public class LogicalPlan implements Serializable, DAG
throw new IllegalArgumentException("Operator not associated with the DAG: " + operator);
}
+ @Override
public ModuleMeta getMeta(Module module)
{
for (ModuleMeta m : getAllModules()) {
@@ -1626,6 +1626,24 @@ public class LogicalPlan implements Serializable, DAG
return classNames;
}
+ public static class ValidationContext
+ {
+ public int nodeIndex = 0;
+ public Stack<OperatorMeta> stack = new Stack<OperatorMeta>();
+ public Stack<OperatorMeta> path = new Stack<OperatorMeta>();
+ public List<Set<OperatorMeta>> stronglyConnected = new ArrayList<>();
+ public OperatorMeta invalidLoopAt;
+ public List<Set<OperatorMeta>> invalidCycles = new ArrayList<>();
+ }
+
+ public void resetNIndex()
+ {
+ for (OperatorMeta om : getAllOperators()) {
+ om.lowlink = null;
+ om.nindex = null;
+ }
+ }
+
/**
* Validate the plan. Includes checks that required ports are connected,
* required configuration parameters specified, graph free of cycles etc.
@@ -1752,21 +1770,20 @@ public class LogicalPlan implements Serializable, DAG
throw new ValidationException("At least one output port must be connected: " + n.name);
}
}
- stack = new Stack<OperatorMeta>();
- List<List<String>> cycles = new ArrayList<List<String>>();
+ ValidationContext validatonContext = new ValidationContext();
for (OperatorMeta n: operators.values()) {
if (n.nindex == null) {
- findStronglyConnected(n, cycles);
+ findStronglyConnected(n, validatonContext);
}
}
- if (!cycles.isEmpty()) {
- throw new ValidationException("Loops in graph: " + cycles);
+ if (!validatonContext.invalidCycles.isEmpty()) {
+ throw new ValidationException("Loops in graph: " + validatonContext.invalidCycles);
}
List<List<String>> invalidDelays = new ArrayList<>();
for (OperatorMeta n : rootOperators) {
- findInvalidDelays(n, invalidDelays);
+ findInvalidDelays(n, invalidDelays, new Stack<OperatorMeta>());
}
if (!invalidDelays.isEmpty()) {
throw new ValidationException("Invalid delays in graph: " + invalidDelays);
@@ -1908,59 +1925,72 @@ public class LogicalPlan implements Serializable, DAG
* @param om
* @param cycles
*/
- public void findStronglyConnected(OperatorMeta om, List<List<String>> cycles)
+ public void findStronglyConnected(OperatorMeta om, ValidationContext ctx)
{
- om.nindex = nodeIndex;
- om.lowlink = nodeIndex;
- nodeIndex++;
- stack.push(om);
+ om.nindex = ctx.nodeIndex;
+ om.lowlink = ctx.nodeIndex;
+ ctx.nodeIndex++;
+ ctx.stack.push(om);
+ ctx.path.push(om);
// depth first successors traversal
for (StreamMeta downStream: om.outputStreams.values()) {
for (InputPortMeta sink: downStream.sinks) {
- if (om.getOperator() instanceof Operator.DelayOperator) {
- // this is an iteration loop, do not treat it as downstream when detecting cycles
- sink.attributes.put(IS_CONNECTED_TO_DELAY_OPERATOR, true);
- continue;
- }
OperatorMeta successor = sink.getOperatorWrapper();
if (successor == null) {
continue;
}
// check for self referencing node
if (om == successor) {
- cycles.add(Collections.singletonList(om.name));
+ ctx.invalidCycles.add(Collections.singleton(om));
}
if (successor.nindex == null) {
// not visited yet
- findStronglyConnected(successor, cycles);
+ findStronglyConnected(successor, ctx);
om.lowlink = Math.min(om.lowlink, successor.lowlink);
}
- else if (stack.contains(successor)) {
+ else if (ctx.stack.contains(successor)) {
om.lowlink = Math.min(om.lowlink, successor.nindex);
+ boolean isDelayLoop = false;
+ for (int i=ctx.path.size(); i>0; i--) {
+ OperatorMeta om2 = ctx.path.get(i-1);
+ if (om2.getOperator() instanceof Operator.DelayOperator) {
+ isDelayLoop = true;
+ }
+ if (om2 == successor) {
+ break;
+ }
+ }
+ if (!isDelayLoop) {
+ ctx.invalidLoopAt = successor;
+ }
}
}
}
// pop stack for all root operators
if (om.lowlink.equals(om.nindex)) {
- List<String> connectedIds = new ArrayList<String>();
- while (!stack.isEmpty()) {
- OperatorMeta n2 = stack.pop();
- connectedIds.add(n2.name);
+ Set<OperatorMeta> connectedSet = new LinkedHashSet<>(ctx.stack.size());
+ while (!ctx.stack.isEmpty()) {
+ OperatorMeta n2 = ctx.stack.pop();
+ connectedSet.add(n2);
if (n2 == om) {
break; // collected all connected operators
}
}
// strongly connected (cycle) if more than one node in stack
- if (connectedIds.size() > 1) {
- LOG.debug("detected cycle from node {}: {}", om.name, connectedIds);
- cycles.add(connectedIds);
+ if (connectedSet.size() > 1) {
+ ctx.stronglyConnected.add(connectedSet);
+ if (connectedSet.contains(ctx.invalidLoopAt)) {
+ ctx.invalidCycles.add(connectedSet);
+ }
}
}
+ ctx.path.pop();
+
}
- public void findInvalidDelays(OperatorMeta om, List<List<String>> invalidDelays)
+ public void findInvalidDelays(OperatorMeta om, List<List<String>> invalidDelays, Stack<OperatorMeta> stack)
{
stack.push(om);
@@ -1977,6 +2007,7 @@ public class LogicalPlan implements Serializable, DAG
for (InputPortMeta sink : downStream.sinks) {
OperatorMeta successor = sink.getOperatorWrapper();
if (isDelayOperator) {
+ sink.attributes.put(IS_CONNECTED_TO_DELAY_OPERATOR, true);
// Check whether all downstream operators are already visited in the path
if (successor != null && !stack.contains(successor)) {
LOG.debug("detected DelayOperator does not immediately output to a visited operator {}.{}->{}.{}",
@@ -1984,7 +2015,7 @@ public class LogicalPlan implements Serializable, DAG
invalidDelays.add(Arrays.asList(om.getName(), successor.getName()));
}
} else {
- findInvalidDelays(successor, invalidDelays);
+ findInvalidDelays(successor, invalidDelays, stack);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b3402be5/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
index ee3cbc3..5675b53 100644
--- a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
@@ -56,6 +56,7 @@ import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.OperatorHea
import com.datatorrent.stram.engine.GenericTestOperator;
import com.datatorrent.stram.engine.OperatorContext;
import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
import com.datatorrent.stram.plan.physical.PTContainer;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.plan.physical.PhysicalPlan;
@@ -314,7 +315,7 @@ public class CheckpointTest
o4p1.checkpoints.add(leafCheckpoint);
UpdateCheckpointsContext ctx;
- dnm.updateRecoveryCheckpoints(o1p1, ctx = new UpdateCheckpointsContext(clock, true));
+ dnm.updateRecoveryCheckpoints(o1p1, ctx = new UpdateCheckpointsContext(clock, true, Collections.<OperatorMeta, Set<OperatorMeta>>emptyMap()));
Assert.assertEquals("initial checkpoint " + o1p1, Checkpoint.INITIAL_CHECKPOINT, o1p1.getRecoveryCheckpoint());
Assert.assertEquals("initial checkpoint " + o2SLp1, leafCheckpoint, o2SLp1.getRecoveryCheckpoint());
Assert.assertEquals("initial checkpoint " + o3SLp1, new Checkpoint(clock.getTime(), 0, 0), o3SLp1.getRecoveryCheckpoint());
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b3402be5/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
index 359da17..06f184f 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/DelayOperatorTest.java
@@ -20,7 +20,11 @@ package com.datatorrent.stram.plan.logical;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.locks.Lock;
@@ -32,8 +36,14 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
+import org.junit.Rule;
import org.junit.Test;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
+
+import com.google.common.collect.Sets;
+
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
@@ -42,8 +52,17 @@ import com.datatorrent.api.Operator;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.common.util.DefaultDelayOperator;
import com.datatorrent.stram.StramLocalCluster;
+import com.datatorrent.stram.StreamingContainerManager;
+import com.datatorrent.stram.StreamingContainerManager.UpdateCheckpointsContext;
+import com.datatorrent.stram.api.Checkpoint;
import com.datatorrent.stram.engine.GenericTestOperator;
import com.datatorrent.stram.engine.TestGeneratorInputOperator;
+import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
+import com.datatorrent.stram.plan.physical.PTOperator;
+import com.datatorrent.stram.plan.physical.PhysicalPlan;
+import com.datatorrent.stram.support.StramTestSupport;
+import com.datatorrent.stram.support.StramTestSupport.MemoryStorageAgent;
+import com.datatorrent.stram.support.StramTestSupport.TestMeta;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -75,7 +94,7 @@ public class DelayOperatorTest
GenericTestOperator opB = dag.addOperator("B", GenericTestOperator.class);
GenericTestOperator opC = dag.addOperator("C", GenericTestOperator.class);
GenericTestOperator opD = dag.addOperator("D", GenericTestOperator.class);
- DefaultDelayOperator opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class);
+ DefaultDelayOperator<Object> opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class);
dag.addStream("BtoC", opB.outport1, opC.inport1);
dag.addStream("CtoD", opC.outport1, opD.inport1);
@@ -83,7 +102,7 @@ public class DelayOperatorTest
dag.addStream("DelayToD", opDelay.output, opD.inport2);
List<List<String>> invalidDelays = new ArrayList<>();
- dag.findInvalidDelays(dag.getMeta(opB), invalidDelays);
+ dag.findInvalidDelays(dag.getMeta(opB), invalidDelays, new Stack<OperatorMeta>());
assertEquals("operator invalid delay", 1, invalidDelays.size());
try {
@@ -106,7 +125,7 @@ public class DelayOperatorTest
dag.addStream("DelayToC", opDelay.output, opC.inport2);
invalidDelays = new ArrayList<>();
- dag.findInvalidDelays(dag.getMeta(opB), invalidDelays);
+ dag.findInvalidDelays(dag.getMeta(opB), invalidDelays, new Stack<OperatorMeta>());
assertEquals("operator invalid delay", 1, invalidDelays.size());
try {
@@ -373,5 +392,68 @@ public class DelayOperatorTest
Arrays.copyOfRange(new TreeSet<>(FibonacciOperator.results).toArray(), 0, 20));
}
+ @Rule
+ public TestMeta testMeta = new TestMeta();
+
+ @Test
+ public void testCheckpointUpdate()
+ {
+ LogicalPlan dag = StramTestSupport.createDAG(testMeta);
+
+ TestGeneratorInputOperator opA = dag.addOperator("A", TestGeneratorInputOperator.class);
+ GenericTestOperator opB = dag.addOperator("B", GenericTestOperator.class);
+ GenericTestOperator opC = dag.addOperator("C", GenericTestOperator.class);
+ GenericTestOperator opD = dag.addOperator("D", GenericTestOperator.class);
+ DefaultDelayOperator<Object> opDelay = dag.addOperator("opDelay", new DefaultDelayOperator<>());
+
+ dag.addStream("AtoB", opA.outport, opB.inport1);
+ dag.addStream("BtoC", opB.outport1, opC.inport1);
+ dag.addStream("CtoD", opC.outport1, opD.inport1);
+ dag.addStream("CtoDelay", opC.outport2, opDelay.input);
+ dag.addStream("DelayToB", opDelay.output, opB.inport2);
+ dag.validate();
+
+ dag.setAttribute(com.datatorrent.api.Context.OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
+ StreamingContainerManager scm = new StreamingContainerManager(dag);
+ PhysicalPlan plan = scm.getPhysicalPlan();
+ // set all operators as active to enable recovery window id update
+ for (PTOperator oper : plan.getAllOperators().values()) {
+ oper.setState(PTOperator.State.ACTIVE);
+ }
+
+ Clock clock = new SystemClock();
+
+ PTOperator opA1 = plan.getOperators(dag.getMeta(opA)).get(0);
+ PTOperator opB1 = plan.getOperators(dag.getMeta(opB)).get(0);
+ PTOperator opC1 = plan.getOperators(dag.getMeta(opC)).get(0);
+ PTOperator opDelay1 = plan.getOperators(dag.getMeta(opDelay)).get(0);
+ PTOperator opD1 = plan.getOperators(dag.getMeta(opD)).get(0);
+
+ Checkpoint cp3 = new Checkpoint(3L, 0, 0);
+ Checkpoint cp5 = new Checkpoint(5L, 0, 0);
+ Checkpoint cp4 = new Checkpoint(4L, 0, 0);
+
+ opB1.checkpoints.add(cp3);
+ opC1.checkpoints.add(cp3);
+ opC1.checkpoints.add(cp4);
+ opDelay1.checkpoints.add(cp3);
+ opDelay1.checkpoints.add(cp5);
+ opD1.checkpoints.add(cp5);
+ // construct grouping that would be supplied through LogicalPlan
+ Set<OperatorMeta> stronglyConnected = Sets.newHashSet(dag.getMeta(opB), dag.getMeta(opC), dag.getMeta(opDelay));
+ Map<OperatorMeta, Set<OperatorMeta>> groups = new HashMap<>();
+ for (OperatorMeta om : stronglyConnected) {
+ groups.put(om, stronglyConnected);
+ }
+
+ UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(clock, false, groups);
+ scm.updateRecoveryCheckpoints(opB1, ctx);
+
+ Assert.assertEquals("checkpoint " + opA1, Checkpoint.INITIAL_CHECKPOINT, opA1.getRecoveryCheckpoint());
+ Assert.assertEquals("checkpoint " + opB1, cp3, opC1.getRecoveryCheckpoint());
+ Assert.assertEquals("checkpoint " + opC1, cp3, opC1.getRecoveryCheckpoint());
+ Assert.assertEquals("checkpoint " + opD1, cp5, opD1.getRecoveryCheckpoint());
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b3402be5/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
index a4ac488..9383f12 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
@@ -19,6 +19,7 @@
package com.datatorrent.stram.plan.logical;
import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.common.util.DefaultDelayOperator;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -35,6 +36,7 @@ import javax.validation.constraints.Pattern;
import com.esotericsoftware.kryo.DefaultSerializer;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.Test;
@@ -43,7 +45,6 @@ import static org.junit.Assert.*;
import com.datatorrent.common.partitioner.StatelessPartitioner;
import com.datatorrent.api.*;
-import com.datatorrent.api.Context.DAGContext;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.Context.PortContext;
import com.datatorrent.api.DAG.Locality;
@@ -61,10 +62,12 @@ import com.datatorrent.stram.plan.logical.LogicalPlan.StreamMeta;
import com.datatorrent.stram.support.StramTestSupport.MemoryStorageAgent;
import com.datatorrent.stram.support.StramTestSupport.RegexMatcher;
-public class LogicalPlanTest {
+public class LogicalPlanTest
+{
@Test
- public void testCycleDetection() {
+ public void testCycleDetection()
+ {
LogicalPlan dag = new LogicalPlan();
//NodeConf operator1 = b.getOrAddNode("operator1");
@@ -91,20 +94,20 @@ public class LogicalPlanTest {
// expected, stream can have single input/output only
}
- List<List<String>> cycles = new ArrayList<List<String>>();
- dag.findStronglyConnected(dag.getMeta(operator7), cycles);
- assertEquals("operator self reference", 1, cycles.size());
- assertEquals("operator self reference", 1, cycles.get(0).size());
- assertEquals("operator self reference", dag.getMeta(operator7).getName(), cycles.get(0).get(0));
+ LogicalPlan.ValidationContext vc = new LogicalPlan.ValidationContext();
+ dag.findStronglyConnected(dag.getMeta(operator7), vc);
+ assertEquals("operator self reference", 1, vc.invalidCycles.size());
+ assertEquals("operator self reference", 1, vc.invalidCycles.get(0).size());
+ assertEquals("operator self reference", dag.getMeta(operator7), vc.invalidCycles.get(0).iterator().next());
// 3 operator cycle
- cycles.clear();
- dag.findStronglyConnected(dag.getMeta(operator4), cycles);
- assertEquals("3 operator cycle", 1, cycles.size());
- assertEquals("3 operator cycle", 3, cycles.get(0).size());
- assertTrue("operator2", cycles.get(0).contains(dag.getMeta(operator2).getName()));
- assertTrue("operator3", cycles.get(0).contains(dag.getMeta(operator3).getName()));
- assertTrue("operator4", cycles.get(0).contains(dag.getMeta(operator4).getName()));
+ vc = new LogicalPlan.ValidationContext();
+ dag.findStronglyConnected(dag.getMeta(operator4), vc);
+ assertEquals("3 operator cycle", 1, vc.invalidCycles.size());
+ assertEquals("3 operator cycle", 3, vc.invalidCycles.get(0).size());
+ assertTrue("operator2", vc.invalidCycles.get(0).contains(dag.getMeta(operator2)));
+ assertTrue("operator3", vc.invalidCycles.get(0).contains(dag.getMeta(operator3)));
+ assertTrue("operator4", vc.invalidCycles.get(0).contains(dag.getMeta(operator4)));
try {
dag.validate();
@@ -115,13 +118,44 @@ public class LogicalPlanTest {
}
- public static class ValidationOperator extends BaseOperator {
+ @Test
+ public void testCycleDetectionWithDelay()
+ {
+ LogicalPlan dag = new LogicalPlan();
+
+ TestGeneratorInputOperator opA = dag.addOperator("A", TestGeneratorInputOperator.class);
+ GenericTestOperator opB = dag.addOperator("B", GenericTestOperator.class);
+ GenericTestOperator opC = dag.addOperator("C", GenericTestOperator.class);
+ GenericTestOperator opD = dag.addOperator("D", GenericTestOperator.class);
+ DefaultDelayOperator<Object> opDelay = dag.addOperator("opDelay", new DefaultDelayOperator<>());
+ DefaultDelayOperator<Object> opDelay2 = dag.addOperator("opDelay2", new DefaultDelayOperator<>());
+
+ dag.addStream("AtoB", opA.outport, opB.inport1);
+ dag.addStream("BtoC", opB.outport1, opC.inport1);
+ dag.addStream("CtoD", opC.outport1, opD.inport1);
+ dag.addStream("CtoDelay", opC.outport2, opDelay.input);
+ dag.addStream("DtoDelay", opD.outport1, opDelay2.input);
+ dag.addStream("DelayToB", opDelay.output, opB.inport2);
+ dag.addStream("Delay2ToC", opDelay2.output, opC.inport2);
+
+ LogicalPlan.ValidationContext vc = new LogicalPlan.ValidationContext();
+ dag.findStronglyConnected(dag.getMeta(opA), vc);
+
+ Assert.assertEquals("No invalid cycle", Collections.emptyList(), vc.invalidCycles);
+ Set<OperatorMeta> exp = Sets.newHashSet(dag.getMeta(opDelay2), dag.getMeta(opDelay), dag.getMeta(opC), dag.getMeta(opB), dag.getMeta(opD));
+ Assert.assertEquals("cycle", exp, vc.stronglyConnected.get(0));
+ }
+
+
+ public static class ValidationOperator extends BaseOperator
+ {
public final transient DefaultOutputPort<Object> goodOutputPort = new DefaultOutputPort<Object>();
public final transient DefaultOutputPort<Object> badOutputPort = new DefaultOutputPort<Object>();
}
- public static class CounterOperator extends BaseOperator {
+ public static class CounterOperator extends BaseOperator
+ {
final public transient InputPort<Object> countInputPort = new DefaultInputPort<Object>() {
@Override
final public void process(Object payload) {
@@ -130,8 +164,8 @@ public class LogicalPlanTest {
}
@Test
- public void testLogicalPlanSerialization() throws Exception {
-
+ public void testLogicalPlanSerialization() throws Exception
+ {
LogicalPlan dag = new LogicalPlan();
dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
@@ -188,7 +222,8 @@ public class LogicalPlanTest {
Assert.assertEquals("", 2, dag.getAllOperators().size());
}
- public static class ValidationTestOperator extends BaseOperator implements InputOperator {
+ public static class ValidationTestOperator extends BaseOperator implements InputOperator
+ {
@NotNull
@Pattern(regexp=".*malhar.*", message="Value has to contain 'malhar'!")
private String stringField1;
@@ -271,8 +306,8 @@ public class LogicalPlanTest {
}
@Test
- public void testOperatorValidation() {
-
+ public void testOperatorValidation()
+ {
ValidationTestOperator bean = new ValidationTestOperator();
bean.stringField1 = "malhar1";
bean.intField1 = 1;
@@ -348,7 +383,8 @@ public class LogicalPlanTest {
}
@OperatorAnnotation(partitionable = false)
- public static class TestOperatorAnnotationOperator extends BaseOperator {
+ public static class TestOperatorAnnotationOperator extends BaseOperator
+ {
@InputPortFieldAnnotation( optional = true)
final public transient DefaultInputPort<Object> input1 = new DefaultInputPort<Object>() {
@@ -358,11 +394,13 @@ public class LogicalPlanTest {
};
}
- class NoInputPortOperator extends BaseOperator {
+ class NoInputPortOperator extends BaseOperator
+ {
}
@Test
- public void testValidationForNonInputRootOperator() {
+ public void testValidationForNonInputRootOperator()
+ {
LogicalPlan dag = new LogicalPlan();
NoInputPortOperator x = dag.addOperator("x", new NoInputPortOperator());
try {
@@ -374,8 +412,8 @@ public class LogicalPlanTest {
}
@OperatorAnnotation(partitionable = false)
- public static class TestOperatorAnnotationOperator2 extends BaseOperator implements Partitioner<TestOperatorAnnotationOperator2> {
-
+ public static class TestOperatorAnnotationOperator2 extends BaseOperator implements Partitioner<TestOperatorAnnotationOperator2>
+ {
@Override
public Collection<Partition<TestOperatorAnnotationOperator2>> definePartitions(Collection<Partition<TestOperatorAnnotationOperator2>> partitions, PartitioningContext context)
{
@@ -389,7 +427,8 @@ public class LogicalPlanTest {
}
@Test
- public void testOperatorAnnotation() {
+ public void testOperatorAnnotation()
+ {
LogicalPlan dag = new LogicalPlan();
TestGeneratorInputOperator input = dag.addOperator("input1", TestGeneratorInputOperator.class);
TestOperatorAnnotationOperator operator = dag.addOperator("operator1", TestOperatorAnnotationOperator.class);
@@ -430,8 +469,8 @@ public class LogicalPlanTest {
}
@Test
- public void testPortConnectionValidation() {
-
+ public void testPortConnectionValidation()
+ {
LogicalPlan dag = new LogicalPlan();
TestNonOptionalOutportInputOperator input = dag.addOperator("input1", TestNonOptionalOutportInputOperator.class);
@@ -459,7 +498,8 @@ public class LogicalPlanTest {
}
@Test
- public void testAtMostOnceProcessingModeValidation() {
+ public void testAtMostOnceProcessingModeValidation()
+ {
LogicalPlan dag = new LogicalPlan();
TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
@@ -489,8 +529,9 @@ public class LogicalPlanTest {
}
- @Test
- public void testExactlyOnceProcessingModeValidation() {
+ @Test
+ public void testExactlyOnceProcessingModeValidation()
+ {
LogicalPlan dag = new LogicalPlan();
TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
@@ -527,7 +568,8 @@ public class LogicalPlanTest {
}
@Test
- public void testLocalityValidation() {
+ public void testLocalityValidation()
+ {
LogicalPlan dag = new LogicalPlan();
TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
@@ -549,7 +591,8 @@ public class LogicalPlanTest {
dag.validate();
}
- private class TestAnnotationsOperator extends BaseOperator implements InputOperator {
+ private class TestAnnotationsOperator extends BaseOperator implements InputOperator
+ {
//final public transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<Object>();
@OutputPortFieldAnnotation( optional=false)
@@ -562,7 +605,8 @@ public class LogicalPlanTest {
}
}
- private class TestAnnotationsOperator2 extends BaseOperator implements InputOperator{
+ private class TestAnnotationsOperator2 extends BaseOperator implements InputOperator
+ {
// multiple ports w/o annotation, one of them must be connected
final public transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<Object>();
@@ -573,7 +617,8 @@ public class LogicalPlanTest {
}
}
- private class TestAnnotationsOperator3 extends BaseOperator implements InputOperator{
+ private class TestAnnotationsOperator3 extends BaseOperator implements InputOperator
+ {
// multiple ports w/o annotation, one of them must be connected
@OutputPortFieldAnnotation( optional=true)
final public transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<Object>();
@@ -587,7 +632,8 @@ public class LogicalPlanTest {
}
@Test
- public void testOutputPortAnnotation() {
+ public void testOutputPortAnnotation()
+ {
LogicalPlan dag = new LogicalPlan();
TestAnnotationsOperator ta1 = dag.addOperator("testAnnotationsOperator", new TestAnnotationsOperator());
@@ -623,7 +669,8 @@ public class LogicalPlanTest {
* Operator that can be used with default Java serialization instead of Kryo
*/
@DefaultSerializer(JavaSerializer.class)
- public static class JdkSerializableOperator extends BaseOperator implements Serializable {
+ public static class JdkSerializableOperator extends BaseOperator implements Serializable
+ {
private static final long serialVersionUID = -4024202339520027097L;
public abstract class SerializableInputPort<T> implements InputPort<T>, Sink<T>, java.io.Serializable {
@@ -673,7 +720,8 @@ public class LogicalPlanTest {
}
@Test
- public void testJdkSerializableOperator() throws Exception {
+ public void testJdkSerializableOperator() throws Exception
+ {
LogicalPlan dag = new LogicalPlan();
dag.addOperator("o1", new JdkSerializableOperator());
@@ -785,7 +833,8 @@ public class LogicalPlanTest {
}
}
- public static class TestPortCodecOperator extends BaseOperator {
+ public static class TestPortCodecOperator extends BaseOperator
+ {
public transient final DefaultInputPort<Object> inport1 = new DefaultInputPort<Object>()
{
@Override
[2/3] incubator-apex-core git commit: moved attribute from context to
logical plan
Posted by th...@apache.org.
moved attribute from context to logical plan
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/4d5828c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/4d5828c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/4d5828c6
Branch: refs/heads/devel-3
Commit: 4d5828c6ca48f5d28cd8c77c5706c6f72c7cd1ad
Parents: f7e1ccf
Author: Gaurav <ga...@datatorrent.com>
Authored: Wed Dec 16 06:33:54 2015 -0800
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Jan 22 19:04:27 2016 -0800
----------------------------------------------------------------------
api/src/main/java/com/datatorrent/api/Context.java | 7 -------
.../main/java/com/datatorrent/stram/engine/GenericNode.java | 3 ++-
.../java/com/datatorrent/stram/plan/logical/LogicalPlan.java | 8 +++++++-
.../com/datatorrent/stram/plan/physical/PhysicalPlan.java | 4 ++--
.../com/datatorrent/stram/plan/physical/StreamMapping.java | 2 +-
5 files changed, 12 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4d5828c6/api/src/main/java/com/datatorrent/api/Context.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java
index 58bc552..ceed8a2 100644
--- a/api/src/main/java/com/datatorrent/api/Context.java
+++ b/api/src/main/java/com/datatorrent/api/Context.java
@@ -166,13 +166,6 @@ public interface Context
*/
Attribute<Class<?>> TUPLE_CLASS = new Attribute<>(new Class2String<>());
- /**
- * Attribute of input port.
- * This is a read-only attribute to query whether the input port is connected to a DelayOperator
- * This is for iterative processing.
- */
- Attribute<Boolean> IS_CONNECTED_TO_DELAY_OPERATOR = new Attribute<>(false);
-
@SuppressWarnings("FieldNameHidesFieldInSuperclass")
long serialVersionUID = AttributeMap.AttributeInitializer.initialize(PortContext.class);
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4d5828c6/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
index 4777f93..1ccec31 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
@@ -40,6 +40,7 @@ import com.datatorrent.netlet.util.DTThrowable;
import com.datatorrent.netlet.util.CircularBuffer;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerStats;
import com.datatorrent.stram.debug.TappedReservoir;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.Operators;
import com.datatorrent.stram.tuple.ResetWindowTuple;
import com.datatorrent.stram.tuple.Tuple;
@@ -207,7 +208,7 @@ public class GenericNode extends Node<Operator>
if (pcPair == null || pcPair.context == null) {
return false;
}
- return pcPair.context.getValue(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR);
+ return pcPair.context.getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4d5828c6/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 3c26118..883ad71 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -76,6 +76,12 @@ import com.datatorrent.stram.engine.Slider;
*/
public class LogicalPlan implements Serializable, DAG
{
+ /**
+ * Attribute of input port.
+ * This is a read-only attribute to query whether the input port is connected to a DelayOperator
+ * This is for iterative processing.
+ */
+ public static final Attribute<Boolean> IS_CONNECTED_TO_DELAY_OPERATOR = new Attribute<>(false);
@SuppressWarnings("FieldNameHidesFieldInSuperclass")
private static final long serialVersionUID = -2099729915606048704L;
private static final Logger LOG = LoggerFactory.getLogger(LogicalPlan.class);
@@ -1914,7 +1920,7 @@ public class LogicalPlan implements Serializable, DAG
for (InputPortMeta sink: downStream.sinks) {
if (om.getOperator() instanceof Operator.DelayOperator) {
// this is an iteration loop, do not treat it as downstream when detecting cycles
- sink.attributes.put(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR, true);
+ sink.attributes.put(IS_CONNECTED_TO_DELAY_OPERATOR, true);
continue;
}
OperatorMeta successor = sink.getOperatorWrapper();
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4d5828c6/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index da96ef3..c696224 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -948,11 +948,11 @@ public class PhysicalPlan implements Serializable
PTOperator slidingUnifier = StreamMapping.createSlidingUnifier(sourceOut.logicalStream, this,
sourceOM.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT), slidingWindowCount);
StreamMapping.addInput(slidingUnifier, sourceOut, null);
- input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, slidingUnifier.outputs.get(0), ipm.getKey().getValue(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR));
+ input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, slidingUnifier.outputs.get(0), ipm.getKey().getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR));
sourceMapping.outputStreams.get(ipm.getValue().getSource()).slidingUnifiers.add(slidingUnifier);
}
else {
- input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, sourceOut, ipm.getKey().getValue(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR));
+ input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, sourceOut, ipm.getKey().getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR));
}
oper.inputs.add(input);
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4d5828c6/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
index 91c6eef..f30ceb6 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
@@ -347,7 +347,7 @@ public class StreamMapping implements java.io.Serializable
// link to upstream output(s) for this stream
for (PTOutput upstreamOut : sourceOper.outputs) {
if (upstreamOut.logicalStream == streamMeta) {
- PTInput input = new PTInput(ipm.getPortName(), streamMeta, oper, pks, upstreamOut, ipm.getValue(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR));
+ PTInput input = new PTInput(ipm.getPortName(), streamMeta, oper, pks, upstreamOut, ipm.getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR));
oper.inputs.add(input);
}
}