You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by tu...@apache.org on 2017/03/03 06:41:48 UTC
apex-core git commit: APEXCORE-580 APEXCORE-581 Support for custom
control tuples
Repository: apex-core
Updated Branches:
refs/heads/master 3b660c9c1 -> 1e4785671
APEXCORE-580 APEXCORE-581 Support for custom control tuples
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/1e478567
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/1e478567
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/1e478567
Branch: refs/heads/master
Commit: 1e47856712dc4fcae40856d27ce8ce2360037a12
Parents: 3b660c9
Author: bhupeshchawda <bh...@apache.org>
Authored: Wed Dec 28 15:48:42 2016 +0530
Committer: bhupeshchawda <bh...@apache.org>
Committed: Fri Mar 3 11:08:25 2017 +0530
----------------------------------------------------------------------
.../api/ControlTupleEnabledSink.java | 56 +++
.../com/datatorrent/api/DefaultInputPort.java | 2 +-
.../com/datatorrent/api/DefaultOutputPort.java | 21 +-
.../apex/api/ControlAwareDefaultInputPort.java | 46 +++
.../apex/api/ControlAwareDefaultOutputPort.java | 60 +++
.../apex/api/UserDefinedControlTuple.java | 46 +++
.../bufferserver/packet/CustomControlTuple.java | 36 ++
.../bufferserver/packet/MessageType.java | 4 +
.../datatorrent/bufferserver/packet/Tuple.java | 3 +
.../datatorrent/stram/engine/GenericNode.java | 126 ++++++-
.../com/datatorrent/stram/engine/OiONode.java | 53 +++
.../com/datatorrent/stram/engine/Stream.java | 3 +-
.../datatorrent/stram/engine/UnifierNode.java | 1 -
.../stram/engine/WindowGenerator.java | 8 +
.../stram/stream/BufferServerPublisher.java | 32 ++
.../stram/stream/BufferServerSubscriber.java | 12 +
.../datatorrent/stram/stream/FastPublisher.java | 15 +
.../datatorrent/stram/stream/InlineStream.java | 10 +
.../com/datatorrent/stram/stream/MuxStream.java | 11 +-
.../com/datatorrent/stram/stream/OiOStream.java | 15 +
.../stram/tuple/CustomControlTuple.java | 62 +++
.../stram/CustomControlTupleTest.java | 376 +++++++++++++++++++
.../stram/engine/GenericNodeTest.java | 192 ++++++++++
23 files changed, 1179 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/api/src/main/java/com/datatorrent/api/ControlTupleEnabledSink.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/ControlTupleEnabledSink.java b/api/src/main/java/com/datatorrent/api/ControlTupleEnabledSink.java
new file mode 100644
index 0000000..e27003d
--- /dev/null
+++ b/api/src/main/java/com/datatorrent/api/ControlTupleEnabledSink.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.api;
+
+import org.apache.apex.api.UserDefinedControlTuple;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * A {@link Sink} which supports adding control tuples
+ */
+@InterfaceStability.Evolving
+public interface ControlTupleEnabledSink<T> extends Sink<T>
+{
+ public static final ControlTupleEnabledSink<Object> BLACKHOLE = new ControlTupleEnabledSink<Object>()
+ {
+ @Override
+ public void put(Object tuple)
+ {
+ }
+
+ @Override
+ public boolean putControl(UserDefinedControlTuple payload)
+ {
+ return true;
+ }
+
+ @Override
+ public int getCount(boolean reset)
+ {
+ return 0;
+ }
+ };
+
+ /**
+ * Add a control tuple to the sink
+ *
+ * @param payload the control tuple payload
+ */
+ public boolean putControl(UserDefinedControlTuple payload);
+}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/api/src/main/java/com/datatorrent/api/DefaultInputPort.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/DefaultInputPort.java b/api/src/main/java/com/datatorrent/api/DefaultInputPort.java
index 046a35d..dc8705c 100644
--- a/api/src/main/java/com/datatorrent/api/DefaultInputPort.java
+++ b/api/src/main/java/com/datatorrent/api/DefaultInputPort.java
@@ -31,7 +31,7 @@ import com.datatorrent.api.Operator.InputPort;
*/
public abstract class DefaultInputPort<T> implements InputPort<T>, Sink<T>
{
- private int count;
+ protected int count;
protected boolean connected = false;
/**
http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/api/src/main/java/com/datatorrent/api/DefaultOutputPort.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/DefaultOutputPort.java b/api/src/main/java/com/datatorrent/api/DefaultOutputPort.java
index 71be22c..acd562f 100644
--- a/api/src/main/java/com/datatorrent/api/DefaultOutputPort.java
+++ b/api/src/main/java/com/datatorrent/api/DefaultOutputPort.java
@@ -37,7 +37,7 @@ public class DefaultOutputPort<T> implements Operator.OutputPort<T>
public static final String THREAD_AFFINITY_DISABLE_CHECK = "com.datatorrent.api.DefaultOutputPort.thread.check.disable";
private static final Logger logger = LoggerFactory.getLogger(DefaultOutputPort.class);
- private transient Sink<Object> sink;
+ protected transient Sink<Object> sink;
private transient Thread operatorThread;
/**
@@ -45,7 +45,7 @@ public class DefaultOutputPort<T> implements Operator.OutputPort<T>
*/
public DefaultOutputPort()
{
- this.sink = Sink.BLACKHOLE;
+ this.sink = ControlTupleEnabledSink.BLACKHOLE;
}
/**
@@ -55,13 +55,18 @@ public class DefaultOutputPort<T> implements Operator.OutputPort<T>
*/
public void emit(T tuple)
{
+ verifyOperatorThread();
+ sink.put(tuple);
+ }
+
+ protected void verifyOperatorThread()
+ {
// operatorThread could be null if setup() never got called.
if (operatorThread != null && Thread.currentThread() != operatorThread) {
// only under certain modes: enforce this
throw new IllegalStateException("Current thread " + Thread.currentThread().getName() +
- " is different from the operator thread " + operatorThread.getName());
+ " is different from the operator thread " + operatorThread.getName());
}
- sink.put(tuple);
}
/**
@@ -70,7 +75,7 @@ public class DefaultOutputPort<T> implements Operator.OutputPort<T>
* Called by execution engine to inject sink at deployment time.
*/
@Override
- public final void setSink(Sink<Object> s)
+ public void setSink(Sink<Object> s)
{
this.sink = s == null ? Sink.BLACKHOLE : s;
}
@@ -83,7 +88,7 @@ public class DefaultOutputPort<T> implements Operator.OutputPort<T>
*/
public boolean isConnected()
{
- return sink != Sink.BLACKHOLE;
+ return sink != ControlTupleEnabledSink.BLACKHOLE;
}
/**
@@ -113,4 +118,8 @@ public class DefaultOutputPort<T> implements Operator.OutputPort<T>
{
}
+ protected Sink<Object> getSink()
+ {
+ return sink;
+ }
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/api/src/main/java/org/apache/apex/api/ControlAwareDefaultInputPort.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/apex/api/ControlAwareDefaultInputPort.java b/api/src/main/java/org/apache/apex/api/ControlAwareDefaultInputPort.java
new file mode 100644
index 0000000..ff2b849
--- /dev/null
+++ b/api/src/main/java/org/apache/apex/api/ControlAwareDefaultInputPort.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.api;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.ControlTupleEnabledSink;
+import com.datatorrent.api.DefaultInputPort;
+
+/**
+ * Default abstract implementation for an input port which is capable of processing
+ * @{@link UserDefinedControlTuple}
+ */
+@InterfaceStability.Evolving
+public abstract class ControlAwareDefaultInputPort<T> extends DefaultInputPort<T> implements ControlTupleEnabledSink<T>
+{
+ @Override
+ public boolean putControl(UserDefinedControlTuple payload)
+ {
+ count++;
+ return processControl(payload);
+ }
+
+ /**
+ * Process the control tuples
+ *
+ * @param payload the control tuple payload generated by upstream operator(s)
+ */
+ public abstract boolean processControl(UserDefinedControlTuple payload);
+}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/api/src/main/java/org/apache/apex/api/ControlAwareDefaultOutputPort.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/apex/api/ControlAwareDefaultOutputPort.java b/api/src/main/java/org/apache/apex/api/ControlAwareDefaultOutputPort.java
new file mode 100644
index 0000000..4a83518
--- /dev/null
+++ b/api/src/main/java/org/apache/apex/api/ControlAwareDefaultOutputPort.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.api;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.ControlTupleEnabledSink;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Sink;
+
+/**
+ * Default implementation for an output port which can emit a @{@link UserDefinedControlTuple}.
+ * The {@link #emitControl(UserDefinedControlTuple)} method can be used to emit control tuples onto this output port
+ */
+@InterfaceStability.Evolving
+public class ControlAwareDefaultOutputPort<T> extends DefaultOutputPort<T>
+{
+ public ControlAwareDefaultOutputPort()
+ {
+ sink = ControlTupleEnabledSink.BLACKHOLE;
+ }
+
+ /**
+ * Allows the operator to emit a @{@link UserDefinedControlTuple}
+ * @param {@link UserDefinedControlTuple}
+ */
+ public void emitControl(UserDefinedControlTuple tuple)
+ {
+ verifyOperatorThread();
+ ((ControlTupleEnabledSink)sink).putControl(tuple);
+ }
+
+ public boolean isConnected()
+ {
+ return sink != ControlTupleEnabledSink.BLACKHOLE;
+ }
+
+ @Override
+ public void setSink(Sink<Object> s)
+ {
+ this.sink = (s == null ? ControlTupleEnabledSink.BLACKHOLE : s);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/api/src/main/java/org/apache/apex/api/UserDefinedControlTuple.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/apex/api/UserDefinedControlTuple.java b/api/src/main/java/org/apache/apex/api/UserDefinedControlTuple.java
new file mode 100644
index 0000000..8e62a8f
--- /dev/null
+++ b/api/src/main/java/org/apache/apex/api/UserDefinedControlTuple.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.api;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Any user generated control tuple must implement {@link UserDefinedControlTuple} interface
+ */
+@InterfaceStability.Evolving
+public interface UserDefinedControlTuple
+{
+ /**
+ * A user generated control tuple must specify a @{@link DeliveryType}
+ * @return @{@link DeliveryType} type
+ */
+ DeliveryType getDeliveryType();
+
+ /**
+ * All custom control tuples can be delivered according to the following semantics
+ * 1. IMMEDIATE - The control tuple will be delivered immediately to the next operator
+ * 2. END_WINDOW - The control tuple will be delivered to the next operator just before the
+ * com.datatorrent.api.Operator#endWindow() call.
+ */
+ enum DeliveryType
+ {
+ IMMEDIATE,
+ END_WINDOW
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/CustomControlTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/CustomControlTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/CustomControlTuple.java
new file mode 100644
index 0000000..3aca31d
--- /dev/null
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/CustomControlTuple.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.bufferserver.packet;
+
+/**
+ * Custom Control Tuple class
+ */
+public class CustomControlTuple extends DataTuple
+{
+ public CustomControlTuple(byte[] array, int offset, int index)
+ {
+ super(array, offset, index);
+ }
+
+ @Override
+ public MessageType getType()
+ {
+ return MessageType.CUSTOM_CONTROL;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/MessageType.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/MessageType.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/MessageType.java
index 3c0ec2c..efc4ac3 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/MessageType.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/MessageType.java
@@ -37,6 +37,7 @@ public enum MessageType
RESET_REQUEST(9),
CHECKPOINT(10),
CODEC_STATE(11),
+ CUSTOM_CONTROL(12),
NO_MESSAGE_ODD(127);
public static final byte NO_MESSAGE_VALUE = 0;
@@ -51,6 +52,7 @@ public enum MessageType
public static final byte RESET_REQUEST_VALUE = 9;
public static final byte CHECKPOINT_VALUE = 10;
public static final byte CODEC_STATE_VALUE = 11;
+ public static final byte CUSTOM_CONTROL_VALUE = 12;
public static final byte NO_MESSAGE_ODD_VALUE = 127;
public final int getNumber()
@@ -85,6 +87,8 @@ public enum MessageType
return CHECKPOINT;
case 11:
return CODEC_STATE;
+ case 12:
+ return CUSTOM_CONTROL;
case 127:
return NO_MESSAGE_ODD;
default:
http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java
index de3cae8..aae7f68 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java
@@ -73,6 +73,9 @@ public abstract class Tuple
case END_WINDOW:
return new EndWindowTuple(buffer, offset, length);
+ case CUSTOM_CONTROL:
+ return new CustomControlTuple(buffer, offset, length);
+
case END_STREAM:
return new WindowIdTuple(buffer, offset, length);
http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
index 41acd43..dae838d 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
@@ -30,10 +31,14 @@ import java.util.Map.Entry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.apex.api.ControlAwareDefaultInputPort;
+import org.apache.apex.api.UserDefinedControlTuple;
import org.apache.commons.lang.UnhandledException;
import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
+import com.datatorrent.api.ControlTupleEnabledSink;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Operator.IdleTimeHandler;
import com.datatorrent.api.Operator.InputPort;
@@ -47,6 +52,7 @@ import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerSt
import com.datatorrent.stram.debug.TappedReservoir;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.Operators;
+import com.datatorrent.stram.tuple.CustomControlTuple;
import com.datatorrent.stram.tuple.ResetWindowTuple;
import com.datatorrent.stram.tuple.Tuple;
@@ -67,6 +73,7 @@ public class GenericNode extends Node<Operator>
{
protected final HashMap<String, SweepableReservoir> inputs = new HashMap<>();
protected ArrayList<DeferredInputConnection> deferredInputConnections = new ArrayList<>();
+ protected Map<SweepableReservoir,Sink> reservoirPortMap = Maps.newHashMap();
@Override
@SuppressWarnings("unchecked")
@@ -249,6 +256,9 @@ public class GenericNode extends Node<Operator>
TupleTracker tracker;
LinkedList<TupleTracker> resetTupleTracker = new LinkedList<>();
+ Map<SweepableReservoir, LinkedHashSet<CustomControlTuple>> immediateDeliveryTuples = Maps.newHashMap();
+ Map<SweepableReservoir,LinkedHashSet<CustomControlTuple>> endWindowDeliveryTuples = Maps.newHashMap();
+
try {
do {
Iterator<Map.Entry<String, SweepableReservoir>> buffers = activeQueues.iterator();
@@ -290,8 +300,8 @@ public class GenericNode extends Node<Operator>
for (int s = sinks.length; s-- > 0; ) {
sinks[s].put(resetWindowTuple);
}
- controlTupleCount++;
}
+ controlTupleCount++;
t.setWindowId(windowAhead);
}
for (int s = sinks.length; s-- > 0; ) {
@@ -354,6 +364,36 @@ public class GenericNode extends Node<Operator>
if (delay) {
t.setWindowId(windowAhead);
}
+
+ /* Emit control tuples here */
+ if (reservoirPortMap.isEmpty()) {
+ populateReservoirInputPortMap();
+ }
+
+
+ for (Entry<SweepableReservoir,LinkedHashSet<CustomControlTuple>> portSet: endWindowDeliveryTuples.entrySet()) {
+ Sink activeSink = reservoirPortMap.get(portSet.getKey());
+ // activeSink may not be null
+ if (activeSink instanceof ControlAwareDefaultInputPort) {
+ ControlTupleEnabledSink sink = (ControlTupleEnabledSink)activeSink;
+ for (CustomControlTuple cct : portSet.getValue()) {
+ if (!sink.putControl((UserDefinedControlTuple)cct.getUserObject())) {
+ // operator cannot handle control tuple; forward to sinks
+ forwardToSinks(delay, cct);
+ }
+ }
+ } else {
+ // Not a ControlAwarePort. Operator cannot handle a custom control tuple.
+ for (CustomControlTuple cct : portSet.getValue()) {
+ forwardToSinks(delay, cct);
+ }
+ }
+ }
+
+ immediateDeliveryTuples.clear();
+ endWindowDeliveryTuples.clear();
+
+ /* Now call endWindow() */
processEndWindow(t);
activeQueues.addAll(inputs.entrySet());
expectingBeginWindow = activeQueues.size();
@@ -362,6 +402,53 @@ public class GenericNode extends Node<Operator>
}
break;
+ case CUSTOM_CONTROL:
+ activePort.remove();
+ /* All custom control tuples are expected to be arriving in the current window only.*/
+ /* Buffer control tuples until end of the window */
+ CustomControlTuple cct = (CustomControlTuple)t;
+ UserDefinedControlTuple udct = (UserDefinedControlTuple)cct.getUserObject();
+ boolean forward = false;
+
+ // Handle Immediate Delivery Control Tuples
+ if (udct.getDeliveryType().equals(UserDefinedControlTuple.DeliveryType.IMMEDIATE)) {
+ if (!isDuplicate(immediateDeliveryTuples.get(activePort), cct)) {
+ // Forward immediately
+ if (reservoirPortMap.isEmpty()) {
+ populateReservoirInputPortMap();
+ }
+
+ Sink activeSink = reservoirPortMap.get(activePort);
+ // activeSink may not be null
+ if (activeSink instanceof ControlAwareDefaultInputPort) {
+ ControlTupleEnabledSink sink = (ControlTupleEnabledSink)activeSink;
+ if (!sink.putControl((UserDefinedControlTuple)cct.getUserObject())) {
+ forward = true;
+ }
+ } else {
+ forward = true;
+ }
+
+ if (forward) {
+ forwardToSinks(delay, cct);
+ }
+ // Add to set
+ if (!immediateDeliveryTuples.containsKey(activePort)) {
+ immediateDeliveryTuples.put(activePort, new LinkedHashSet<CustomControlTuple>());
+ }
+ immediateDeliveryTuples.get(activePort).add(cct);
+ }
+ } else {
+ // Buffer EndWindow Delivery Control Tuples
+ if (!endWindowDeliveryTuples.containsKey(activePort)) {
+ endWindowDeliveryTuples.put(activePort, new LinkedHashSet<CustomControlTuple>());
+ }
+ if (!isDuplicate(endWindowDeliveryTuples.get(activePort), cct)) {
+ endWindowDeliveryTuples.get(activePort).add(cct);
+ }
+ }
+ break;
+
case CHECKPOINT:
activePort.remove();
long checkpointWindow = t.getWindowId();
@@ -656,6 +743,43 @@ public class GenericNode extends Node<Operator>
}
+ protected void forwardToSinks(boolean delay, Object o)
+ {
+ if (!delay) {
+ for (int s = sinks.length; s-- > 0; ) {
+ sinks[s].put(o);
+ }
+ controlTupleCount++;
+ }
+ }
+
+ /**
+ * Populate {@link #reservoirPortMap} with information on which reservoirs are connected to which input ports
+ */
+ protected void populateReservoirInputPortMap()
+ {
+ for (Entry<String,Operators.PortContextPair<InputPort<?>>> entry : descriptor.inputPorts.entrySet()) {
+ if (entry.getValue().component != null && entry.getValue().component instanceof InputPort) {
+ if (inputs.containsKey(entry.getKey())) {
+ reservoirPortMap.put(inputs.get(entry.getKey()), entry.getValue().component.getSink());
+ }
+ }
+ }
+ }
+
+ protected boolean isDuplicate(LinkedHashSet<CustomControlTuple> set, CustomControlTuple t)
+ {
+ if (set == null || set.isEmpty()) {
+ return false;
+ }
+ for (CustomControlTuple cct : set) {
+ if (cct.getUid().equals(t.getUid())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
private void fabricateFirstWindow(Operator.DelayOperator delayOperator, long windowAhead)
{
Tuple beginWindowTuple = new Tuple(MessageType.BEGIN_WINDOW, windowAhead);
http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/engine/src/main/java/com/datatorrent/stram/engine/OiONode.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/OiONode.java b/engine/src/main/java/com/datatorrent/stram/engine/OiONode.java
index f968b4e..e2370ff 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/OiONode.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/OiONode.java
@@ -19,16 +19,24 @@
package com.datatorrent.stram.engine;
import java.util.Iterator;
+import java.util.LinkedHashSet;
import java.util.Map.Entry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import org.apache.apex.api.ControlAwareDefaultInputPort;
+import org.apache.apex.api.UserDefinedControlTuple;
import org.apache.commons.lang.UnhandledException;
+import com.google.common.collect.Sets;
+
import com.datatorrent.api.Operator;
import com.datatorrent.api.Operator.InputPort;
import com.datatorrent.api.Sink;
import com.datatorrent.stram.plan.logical.Operators.PortContextPair;
+import com.datatorrent.stram.stream.OiOStream;
+import com.datatorrent.stram.tuple.CustomControlTuple;
import com.datatorrent.stram.tuple.Tuple;
/**
@@ -59,6 +67,9 @@ public class OiONode extends GenericNode
reservoir = sr;
}
+ private LinkedHashSet<CustomControlTuple> immediateDeliveryControlTuples = Sets.newLinkedHashSet();
+ private LinkedHashSet<CustomControlTuple> endWindowControlTuples = Sets.newLinkedHashSet();
+
@Override
public void put(Tuple t)
{
@@ -82,10 +93,52 @@ public class OiONode extends GenericNode
case END_WINDOW:
endWindowDequeueTimes.put(reservoir, System.currentTimeMillis());
if (--expectingEndWindows == 0) {
+
+ /* process custom control tuples here */
+ for (CustomControlTuple cct : endWindowControlTuples) {
+ Sink sink = ((OiOStream.OiOReservoir)reservoir).getSink();
+ if (sink instanceof ControlAwareDefaultInputPort) {
+ if (!((ControlAwareDefaultInputPort)sink).putControl((UserDefinedControlTuple)cct.getUserObject())) {
+ // Operator will not handle; forward to sinks
+ forwardToSinks(false, cct);
+ }
+ } else {
+ // Port incapable of handling; forward to sinks
+ forwardToSinks(false, cct);
+ }
+ }
+ endWindowControlTuples.clear();
+ immediateDeliveryControlTuples.clear();
+
processEndWindow(t);
}
break;
+ case CUSTOM_CONTROL:
+ CustomControlTuple cct = ((CustomControlTuple)t);
+ UserDefinedControlTuple udct = (UserDefinedControlTuple)cct.getUserObject();
+
+ if (udct.getDeliveryType().equals(UserDefinedControlTuple.DeliveryType.IMMEDIATE)) { // Immediate Delivery
+ if (!isDuplicate(immediateDeliveryControlTuples, cct)) {
+ Sink sink = ((OiOStream.OiOReservoir)reservoir).getSink();
+ if (sink instanceof ControlAwareDefaultInputPort) {
+ if (!((ControlAwareDefaultInputPort)sink).putControl((UserDefinedControlTuple)cct.getUserObject())) {
+ // Operator will not handle; forward to sinks
+ forwardToSinks(false, cct);
+ }
+ } else {
+ forwardToSinks(false, cct);
+ }
+ // store
+ immediateDeliveryControlTuples.add(cct);
+ }
+ } else { // End Window Delivery
+ if (!isDuplicate(endWindowControlTuples, cct)) {
+ endWindowControlTuples.add(cct);
+ }
+ }
+ break;
+
case CHECKPOINT:
dagCheckpointOffsetCount = 0;
if (lastCheckpointWindowId < t.getWindowId() && !doCheckpoint) {
http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/engine/src/main/java/com/datatorrent/stram/engine/Stream.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/Stream.java b/engine/src/main/java/com/datatorrent/stram/engine/Stream.java
index fc93b38..196134f 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/Stream.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/Stream.java
@@ -19,6 +19,7 @@
package com.datatorrent.stram.engine;
import com.datatorrent.api.Component;
+import com.datatorrent.api.ControlTupleEnabledSink;
import com.datatorrent.api.Operator.ActivationListener;
import com.datatorrent.api.Sink;
@@ -32,7 +33,7 @@ import com.datatorrent.api.Sink;
/*
* Provides basic interface for a stream object. Stram, StramChild work via this interface
*/
-public interface Stream extends Component<StreamContext>, ActivationListener<StreamContext>, Sink<Object>
+public interface Stream extends Component<StreamContext>, ActivationListener<StreamContext>, ControlTupleEnabledSink<Object>
{
public interface MultiSinkCapableStream extends Stream
{
http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/engine/src/main/java/com/datatorrent/stram/engine/UnifierNode.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/UnifierNode.java b/engine/src/main/java/com/datatorrent/stram/engine/UnifierNode.java
index e38c94e..57a20b7 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/UnifierNode.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/UnifierNode.java
@@ -27,7 +27,6 @@ import com.datatorrent.api.Operator.InputPort;
import com.datatorrent.api.Operator.Unifier;
import com.datatorrent.api.Sink;
import com.datatorrent.api.StreamCodec;
-
import com.datatorrent.stram.plan.logical.Operators.PortContextPair;
/**
http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java b/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
index 3a8438d..77ce1f0 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
@@ -25,6 +25,8 @@ import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.apex.api.UserDefinedControlTuple;
+
import com.datatorrent.bufferserver.packet.MessageType;
import com.datatorrent.common.util.ScheduledExecutorService;
import com.datatorrent.netlet.util.CircularBuffer;
@@ -228,6 +230,12 @@ public class WindowGenerator extends MuxReservoir implements Stream, Runnable
}
@Override
+ public boolean putControl(UserDefinedControlTuple payload)
+ {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
protected Queue getQueue()
{
return queue;
http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java
index 7db4892..fa2d823 100644
--- a/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java
+++ b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerPublisher.java
@@ -24,6 +24,8 @@ import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.apex.api.UserDefinedControlTuple;
+
import com.datatorrent.api.StreamCodec;
import com.datatorrent.bufferserver.client.Publisher;
import com.datatorrent.bufferserver.packet.BeginWindowTuple;
@@ -40,6 +42,7 @@ import com.datatorrent.stram.codec.StatefulStreamCodec;
import com.datatorrent.stram.codec.StatefulStreamCodec.DataStatePair;
import com.datatorrent.stram.engine.ByteCounterStream;
import com.datatorrent.stram.engine.StreamContext;
+import com.datatorrent.stram.tuple.CustomControlTuple;
import com.datatorrent.stram.tuple.Tuple;
import static java.lang.Thread.sleep;
@@ -98,6 +101,28 @@ public class BufferServerPublisher extends Publisher implements ByteCounterStrea
array = EndWindowTuple.getSerializedTuple((int)t.getWindowId());
break;
+ case CUSTOM_CONTROL:
+ if (statefulSerde == null) {
+ array = com.datatorrent.bufferserver.packet.CustomControlTuple
+ .getSerializedTuple(MessageType.CUSTOM_CONTROL_VALUE, serde.toByteArray(payload));
+ } else {
+ DataStatePair dsp = statefulSerde.toDataStatePair(payload);
+ if (dsp.state != null) {
+ array = com.datatorrent.bufferserver.packet.CustomControlTuple
+ .getSerializedTuple(MessageType.CODEC_STATE_VALUE, dsp.state);
+ try {
+ while (!write(array)) {
+ sleep(5);
+ }
+ } catch (InterruptedException ie) {
+ throw new RuntimeException(ie);
+ }
+ }
+ array = com.datatorrent.bufferserver.packet.CustomControlTuple
+ .getSerializedTuple(MessageType.CUSTOM_CONTROL_VALUE, dsp.data);
+ }
+ break;
+
case END_STREAM:
array = EndStreamTuple.getSerializedTuple((int)t.getWindowId());
break;
@@ -145,6 +170,13 @@ public class BufferServerPublisher extends Publisher implements ByteCounterStrea
}
}
+ @Override
+ public boolean putControl(UserDefinedControlTuple payload)
+ {
+ put(new CustomControlTuple(payload));
+ return false;
+ }
+
/**
*
* @param context
http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java
index d5b0997..606add0 100644
--- a/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java
+++ b/engine/src/main/java/com/datatorrent/stram/stream/BufferServerSubscriber.java
@@ -27,6 +27,8 @@ import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.apex.api.UserDefinedControlTuple;
+
import com.datatorrent.api.Sink;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.bufferserver.client.Subscriber;
@@ -196,6 +198,12 @@ public class BufferServerSubscriber extends Subscriber implements ByteCounterStr
throw new UnsupportedOperationException("Not supported yet.");
}
+ @Override
+ public boolean putControl(UserDefinedControlTuple payload)
+ {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
public SweepableReservoir releaseReservoir(String sinkId)
{
BufferReservoir r = reservoirMap.remove(sinkId);
@@ -343,6 +351,10 @@ public class BufferServerSubscriber extends Subscriber implements ByteCounterStr
o = new EndWindowTuple(baseSeconds | (lastWindowId = data.getWindowId()));
break;
+ case CUSTOM_CONTROL:
+ o = processPayload(data);
+ break;
+
case END_STREAM:
o = new EndStreamTuple(baseSeconds | data.getWindowId());
break;
http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java b/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java
index 88f2052..574e61c 100644
--- a/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java
+++ b/engine/src/main/java/com/datatorrent/stram/stream/FastPublisher.java
@@ -28,6 +28,8 @@ import java.nio.channels.SocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.apex.api.UserDefinedControlTuple;
+
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.io.Output;
@@ -44,6 +46,7 @@ import com.datatorrent.netlet.Listener;
import com.datatorrent.netlet.Listener.ClientListener;
import com.datatorrent.stram.engine.Stream;
import com.datatorrent.stram.engine.StreamContext;
+import com.datatorrent.stram.tuple.CustomControlTuple;
import com.datatorrent.stram.tuple.Tuple;
import static java.lang.Thread.sleep;
@@ -228,6 +231,11 @@ public class FastPublisher extends Kryo implements ClientListener, Stream
array = EndWindowTuple.getSerializedTuple((int)t.getWindowId());
break;
+ case CUSTOM_CONTROL:
+ array = null;
+ // TODO implement
+ break;
+
case END_STREAM:
array = EndStreamTuple.getSerializedTuple((int)t.getWindowId());
break;
@@ -477,6 +485,13 @@ public class FastPublisher extends Kryo implements ClientListener, Stream
}
}
+ @Override
+ public boolean putControl(UserDefinedControlTuple payload)
+ {
+ put(new CustomControlTuple(payload));
+ return true;
+ }
+
@SuppressWarnings("SleepWhileInLoop")
public void advanceWriteBuffer()
{
http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/engine/src/main/java/com/datatorrent/stram/stream/InlineStream.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/stream/InlineStream.java b/engine/src/main/java/com/datatorrent/stram/stream/InlineStream.java
index ec9660b..7559a18 100644
--- a/engine/src/main/java/com/datatorrent/stram/stream/InlineStream.java
+++ b/engine/src/main/java/com/datatorrent/stram/stream/InlineStream.java
@@ -21,10 +21,13 @@ package com.datatorrent.stram.stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.apex.api.UserDefinedControlTuple;
+
import com.datatorrent.stram.engine.AbstractReservoir;
import com.datatorrent.stram.engine.Stream;
import com.datatorrent.stram.engine.StreamContext;
import com.datatorrent.stram.engine.SweepableReservoir;
+import com.datatorrent.stram.tuple.CustomControlTuple;
import com.datatorrent.stram.tuple.Tuple;
/**
@@ -99,6 +102,13 @@ public class InlineStream implements Stream
}
@Override
+ public boolean putControl(UserDefinedControlTuple payload)
+ {
+ put(new CustomControlTuple(payload));
+ return false;
+ }
+
+ @Override
public int getCount(boolean reset)
{
try {
http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/engine/src/main/java/com/datatorrent/stram/stream/MuxStream.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/stream/MuxStream.java b/engine/src/main/java/com/datatorrent/stram/stream/MuxStream.java
index 63f5ee4..007a3ac 100644
--- a/engine/src/main/java/com/datatorrent/stram/stream/MuxStream.java
+++ b/engine/src/main/java/com/datatorrent/stram/stream/MuxStream.java
@@ -24,10 +24,12 @@ import java.util.HashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.datatorrent.api.Sink;
+import org.apache.apex.api.UserDefinedControlTuple;
+import com.datatorrent.api.Sink;
import com.datatorrent.stram.engine.Stream;
import com.datatorrent.stram.engine.StreamContext;
+import com.datatorrent.stram.tuple.CustomControlTuple;
/**
* <p>MuxStream class.</p>
@@ -121,6 +123,13 @@ public class MuxStream implements Stream.MultiSinkCapableStream
}
@Override
+ public boolean putControl(UserDefinedControlTuple payload)
+ {
+ put(new CustomControlTuple(payload));
+ return false;
+ }
+
+ @Override
public int getCount(boolean reset)
{
try {
http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/engine/src/main/java/com/datatorrent/stram/stream/OiOStream.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/stream/OiOStream.java b/engine/src/main/java/com/datatorrent/stram/stream/OiOStream.java
index 61ed0e6..f4a2b9b 100644
--- a/engine/src/main/java/com/datatorrent/stram/stream/OiOStream.java
+++ b/engine/src/main/java/com/datatorrent/stram/stream/OiOStream.java
@@ -18,10 +18,13 @@
*/
package com.datatorrent.stram.stream;
+import org.apache.apex.api.UserDefinedControlTuple;
+
import com.datatorrent.api.Sink;
import com.datatorrent.stram.engine.Stream;
import com.datatorrent.stram.engine.StreamContext;
import com.datatorrent.stram.engine.SweepableReservoir;
+import com.datatorrent.stram.tuple.CustomControlTuple;
import com.datatorrent.stram.tuple.Tuple;
/**
@@ -77,6 +80,13 @@ public class OiOStream implements Stream
}
@Override
+ public boolean putControl(UserDefinedControlTuple payload)
+ {
+ put(new CustomControlTuple(payload));
+ return false;
+ }
+
+ @Override
public int getCount(boolean reset)
{
try {
@@ -124,6 +134,11 @@ public class OiOStream implements Stream
}
}
+ public Sink<Object> getSink()
+ {
+ return OiOStream.this.sink;
+ }
+
@Override
public Tuple sweep()
{
http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/engine/src/main/java/com/datatorrent/stram/tuple/CustomControlTuple.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/tuple/CustomControlTuple.java b/engine/src/main/java/com/datatorrent/stram/tuple/CustomControlTuple.java
new file mode 100644
index 0000000..810fa57
--- /dev/null
+++ b/engine/src/main/java/com/datatorrent/stram/tuple/CustomControlTuple.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.stram.tuple;
+
+import java.util.UUID;
+
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+
+import com.datatorrent.bufferserver.packet.MessageType;
+
+/**
+ * An implementation for @{@link Tuple} which can be generated by the user
+ * Acts as the wrapper for the user payload
+ */
+public class CustomControlTuple extends Tuple
+{
+ private final Object userObject;
+ @FieldSerializer.Bind(JavaSerializer.class)
+ private final UUID uid;
+
+ protected CustomControlTuple()
+ {
+ // for Kryo
+ super(MessageType.CUSTOM_CONTROL, 0);
+ userObject = null;
+ uid = null;
+ }
+
+ public CustomControlTuple(Object userObject)
+ {
+ super(MessageType.CUSTOM_CONTROL, 0);
+ this.userObject = userObject;
+ uid = UUID.randomUUID();
+ }
+
+ public Object getUserObject()
+ {
+ return userObject;
+ }
+
+ public UUID getUid()
+ {
+ return uid;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/engine/src/test/java/com/datatorrent/stram/CustomControlTupleTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/CustomControlTupleTest.java b/engine/src/test/java/com/datatorrent/stram/CustomControlTupleTest.java
new file mode 100644
index 0000000..86078c2
--- /dev/null
+++ b/engine/src/test/java/com/datatorrent/stram/CustomControlTupleTest.java
@@ -0,0 +1,376 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.stram;
+
+import java.util.concurrent.Callable;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.api.ControlAwareDefaultInputPort;
+import org.apache.apex.api.ControlAwareDefaultOutputPort;
+import org.apache.apex.api.UserDefinedControlTuple;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.common.util.BaseOperator;
+
+public class CustomControlTupleTest
+{
+ public static final Logger LOG = LoggerFactory.getLogger(CustomControlTupleTest.class);
+ private static long controlIndex = 0;
+ private static int numControlTuples = 0;
+ private static boolean done = false;
+ private static boolean endApp = false;
+ private static long endingWindowId = 0;
+ private static boolean immediate = false;
+
+ @Before
+ public void starting()
+ {
+ controlIndex = 0;
+ numControlTuples = 0;
+ done = false;
+ endApp = false;
+ endingWindowId = 0;
+ }
+
+ public static class Generator extends BaseOperator implements InputOperator
+ {
+ private long currentWindowId;
+ public final transient ControlAwareDefaultOutputPort<Double> out = new ControlAwareDefaultOutputPort<>();
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ if (!done) {
+ currentWindowId = windowId;
+ out.emitControl(new TestControlTuple(controlIndex++, immediate));
+ }
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ if (!done) {
+ out.emitControl(new TestControlTuple(controlIndex++, immediate));
+ }
+ }
+
+ @Override
+ public void endWindow()
+ {
+ if (!done) {
+ out.emitControl(new TestControlTuple(controlIndex++, immediate));
+ endingWindowId = currentWindowId;
+ done = true;
+ }
+ }
+ }
+
+ public static class DefaultProcessor extends BaseOperator
+ {
+ public final transient DefaultInputPort<Double> input = new DefaultInputPort<Double>()
+ {
+ @Override
+ public void process(Double tuple)
+ {
+ output.emit(tuple);
+ }
+ };
+
+ public final transient DefaultOutputPort<Double> output = new DefaultOutputPort<>();
+ }
+
+ public static class ControlAwareProcessor extends BaseOperator
+ {
+ public final transient ControlAwareDefaultInputPort<Double> input = new ControlAwareDefaultInputPort<Double>()
+ {
+ @Override
+ public void process(Double tuple)
+ {
+ output.emit(tuple);
+ }
+
+ @Override
+ public boolean processControl(UserDefinedControlTuple tuple)
+ {
+ output.emitControl(tuple);
+ return true;
+ }
+ };
+
+ public final transient ControlAwareDefaultOutputPort<Double> output = new ControlAwareDefaultOutputPort<>();
+ }
+
+ public static class ControlAwareReceiver extends BaseOperator
+ {
+ private long currentWindowId;
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ currentWindowId = windowId;
+ }
+
+ public final transient ControlAwareDefaultInputPort<Double> input = new ControlAwareDefaultInputPort<Double>()
+ {
+ @Override
+ public boolean processControl(UserDefinedControlTuple payload)
+ {
+ numControlTuples++;
+ return false;
+ }
+
+ @Override
+ public void process(Double tuple)
+ {
+ }
+ };
+
+ @Override
+ public void endWindow()
+ {
+ if (done && currentWindowId > endingWindowId) {
+ endApp = true;
+ }
+ }
+ }
+
+ @ApplicationAnnotation(name = "TestDefaultPropagation")
+ public static class Application1 implements StreamingApplication
+ {
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ Generator randomGenerator = dag.addOperator("randomGenerator", Generator.class);
+ DefaultProcessor processor = dag.addOperator("process", DefaultProcessor.class);
+ ControlAwareReceiver receiver = dag.addOperator("receiver", ControlAwareReceiver.class);
+ dag.addStream("genToProcessor", randomGenerator.out, processor.input);
+ dag.addStream("ProcessorToReceiver", processor.output, receiver.input);
+ }
+ }
+
+ @ApplicationAnnotation(name = "TestExplicitPropagation")
+ public static class Application2 implements StreamingApplication
+ {
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ Generator randomGenerator = dag.addOperator("randomGenerator", Generator.class);
+ ControlAwareProcessor processor = dag.addOperator("process", ControlAwareProcessor.class);
+ ControlAwareReceiver receiver = dag.addOperator("receiver", ControlAwareReceiver.class);
+ dag.addStream("genToProcessor", randomGenerator.out, processor.input);
+ dag.addStream("ProcessorToReceiver", processor.output, receiver.input);
+ }
+ }
+
+ @ApplicationAnnotation(name = "TestDuplicateControlTuples")
+ public static class Application3 implements StreamingApplication
+ {
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ Generator randomGenerator = dag.addOperator("randomGenerator", Generator.class);
+ DefaultProcessor processor = dag.addOperator("process", DefaultProcessor.class);
+ ControlAwareReceiver receiver = dag.addOperator("receiver", ControlAwareReceiver.class);
+ dag.addStream("genToProcessor", randomGenerator.out, processor.input);
+ dag.addStream("ProcessorToReceiver", processor.output, receiver.input);
+ dag.setOperatorAttribute(processor, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<>(2));
+ }
+ }
+
+ @ApplicationAnnotation(name = "TestThreadLocal")
+ public static class Application4 implements StreamingApplication
+ {
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ Generator randomGenerator = dag.addOperator("randomGenerator", Generator.class);
+ DefaultProcessor processor = dag.addOperator("process", DefaultProcessor.class);
+ ControlAwareReceiver receiver = dag.addOperator("receiver", ControlAwareReceiver.class);
+ dag.addStream("genToProcessor", randomGenerator.out, processor.input).setLocality(DAG.Locality.THREAD_LOCAL);
+ dag.addStream("ProcessorToReceiver", processor.output, receiver.input).setLocality(DAG.Locality.THREAD_LOCAL);
+ }
+ }
+
+ @ApplicationAnnotation(name = "TestContainerLocal")
+ public static class Application5 implements StreamingApplication
+ {
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ Generator randomGenerator = dag.addOperator("randomGenerator", Generator.class);
+ DefaultProcessor processor = dag.addOperator("process", DefaultProcessor.class);
+ ControlAwareReceiver receiver = dag.addOperator("receiver", ControlAwareReceiver.class);
+ dag.addStream("genToProcessor", randomGenerator.out, processor.input).setLocality(DAG.Locality.CONTAINER_LOCAL);
+ dag.addStream("ProcessorToReceiver", processor.output, receiver.input).setLocality(DAG.Locality.CONTAINER_LOCAL);
+ }
+ }
+
+ public void testApp(StreamingApplication app) throws Exception
+ {
+ try {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration(false);
+ lma.prepareDAG(app, conf);
+ LocalMode.Controller lc = lma.getController();
+ ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
+ {
+ @Override
+ public Boolean call() throws Exception
+ {
+ return endApp;
+ }
+ });
+
+ lc.run(200000); // runs for 20 seconds and quits if terminating condition not reached
+
+ LOG.info("Control Tuples received {} expected {}", numControlTuples, controlIndex);
+ Assert.assertTrue("Incorrect Control Tuples", numControlTuples == controlIndex);
+ } catch (ConstraintViolationException e) {
+ Assert.fail("constraint violations: " + e.getConstraintViolations());
+ }
+ }
+
+ @Test
+ public void testDefaultPropagation() throws Exception
+ {
+ immediate = false;
+ testApp(new Application1());
+ }
+
+ @Test
+ public void testExplicitPropagation() throws Exception
+ {
+ immediate = false;
+ testApp(new Application2());
+ }
+
+ @Test
+ public void testDuplicateControlTuples() throws Exception
+ {
+ immediate = false;
+ testApp(new Application3());
+ }
+
+ @Test
+ public void testThreadLocal() throws Exception
+ {
+ immediate = false;
+ testApp(new Application4());
+ }
+
+ @Test
+ public void testContainerLocal() throws Exception
+ {
+ immediate = false;
+ testApp(new Application5());
+ }
+
+ @Test
+ public void testDefaultPropagationImmediate() throws Exception
+ {
+ immediate = true;
+ testApp(new Application1());
+ }
+
+ @Test
+ public void testExplicitPropagationImmediate() throws Exception
+ {
+ immediate = true;
+ testApp(new Application2());
+ }
+
+ @Test
+ public void testDuplicateControlTuplesImmediate() throws Exception
+ {
+ immediate = true;
+ testApp(new Application3());
+ }
+
+ @Test
+ public void testThreadLocalImmediate() throws Exception
+ {
+ immediate = true;
+ testApp(new Application4());
+ }
+
+ @Test
+ public void testContainerLocalImmediate() throws Exception
+ {
+ immediate = true;
+ testApp(new Application5());
+ }
+
+ public static class TestControlTuple implements UserDefinedControlTuple
+ {
+ public long data;
+ public boolean immediate;
+
+ public TestControlTuple()
+ {
+ data = 0;
+ }
+
+ public TestControlTuple(long data, boolean immediate)
+ {
+ this.data = data;
+ this.immediate = immediate;
+ }
+
+ @Override
+ public boolean equals(Object t)
+ {
+ if (t instanceof TestControlTuple && ((TestControlTuple)t).data == this.data) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public String toString()
+ {
+ return data + "";
+ }
+
+ @Override
+ public DeliveryType getDeliveryType()
+ {
+ if (immediate) {
+ return DeliveryType.IMMEDIATE;
+ } else {
+ return DeliveryType.END_WINDOW;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/1e478567/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
index af99e98..99dee8f 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
@@ -62,10 +62,13 @@ import com.datatorrent.common.util.ScheduledExecutorService;
import com.datatorrent.common.util.ScheduledThreadPoolExecutor;
import com.datatorrent.netlet.DefaultEventLoop;
import com.datatorrent.netlet.EventLoop;
+import com.datatorrent.stram.CustomControlTupleTest;
import com.datatorrent.stram.api.Checkpoint;
import com.datatorrent.stram.codec.DefaultStatefulStreamCodec;
import com.datatorrent.stram.stream.BufferServerPublisher;
import com.datatorrent.stram.stream.BufferServerSubscriber;
+import com.datatorrent.stram.stream.OiOStream;
+import com.datatorrent.stram.tuple.CustomControlTuple;
import com.datatorrent.stram.tuple.EndStreamTuple;
import com.datatorrent.stram.tuple.EndWindowTuple;
import com.datatorrent.stram.tuple.Tuple;
@@ -609,6 +612,195 @@ public class GenericNodeTest
}
@Test
+ public void testControlTuplesDeliveryGenericNode() throws InterruptedException
+ {
+ long maxSleep = 5000000;
+ long sleeptime = 25L;
+ GenericOperator go = new GenericOperator();
+ final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, "operator",
+ new DefaultAttributeMap(), null));
+ gn.setId(1);
+ AbstractReservoir reservoir1 = AbstractReservoir.newReservoir("ip1Res", 1024);
+
+ gn.connectInputPort("ip1", reservoir1);
+ TestSink testSink = new TestSink();
+ gn.connectOutputPort("op", testSink);
+ gn.firstWindowMillis = 0;
+ gn.windowWidthMillis = 100;
+
+ final AtomicBoolean ab = new AtomicBoolean(false);
+ Thread t = new Thread()
+ {
+ @Override
+ public void run()
+ {
+ ab.set(true);
+ gn.activate();
+ gn.run();
+ gn.deactivate();
+ }
+ };
+ t.start();
+
+ long interval = 0;
+ do {
+ Thread.sleep(sleeptime);
+ interval += sleeptime;
+ } while ((ab.get() == false) && (interval < maxSleep));
+
+ int controlTupleCount = gn.controlTupleCount;
+ Tuple beginWindow = new Tuple(MessageType.BEGIN_WINDOW, 0x1L);
+ reservoir1.add(beginWindow);
+
+ interval = 0;
+ do {
+ Thread.sleep(sleeptime);
+ interval += sleeptime;
+ } while ((gn.controlTupleCount == controlTupleCount) && (interval < maxSleep));
+ controlTupleCount = gn.controlTupleCount;
+
+ CustomControlTuple t1 = new CustomControlTuple(new CustomControlTupleTest.TestControlTuple(1, false));
+ CustomControlTuple t2 = new CustomControlTuple(new CustomControlTupleTest.TestControlTuple(2, true));
+ CustomControlTuple t3 = new CustomControlTuple(new CustomControlTupleTest.TestControlTuple(3, false));
+ CustomControlTuple t4 = new CustomControlTuple(new CustomControlTupleTest.TestControlTuple(4, true));
+ reservoir1.add(t1);
+ reservoir1.add(t2);
+ reservoir1.add(t3);
+ reservoir1.add(t4);
+
+ interval = 0;
+ do {
+ Thread.sleep(sleeptime);
+ interval += sleeptime;
+ } while ((gn.controlTupleCount == controlTupleCount) && (interval < maxSleep));
+
+ Assert.assertTrue("Custom control tuples emitted immediately", testSink.getResultCount() == 3);
+
+ controlTupleCount = gn.controlTupleCount;
+ Tuple endWindow = new Tuple(MessageType.END_WINDOW, 0x1L);
+ reservoir1.add(endWindow);
+
+ interval = 0;
+ do {
+ Thread.sleep(sleeptime);
+ interval += sleeptime;
+ } while ((gn.controlTupleCount == controlTupleCount) && (interval < maxSleep));
+
+ gn.shutdown();
+ t.join();
+
+ Assert.assertTrue("Total control tuples", testSink.getResultCount() == 6);
+
+ long expected = 0;
+ for (Object o: testSink.collectedTuples) {
+ if (o instanceof CustomControlTuple) {
+ expected++;
+ }
+ }
+ Assert.assertTrue("Number of Custom control tuples", expected == 4);
+ }
+
+ @Test
+ public void testControlTuplesDeliveryOiONode() throws InterruptedException
+ {
+ GenericOperator go = new GenericOperator();
+ final OiONode oioNode = new OiONode(go, new com.datatorrent.stram.engine.OperatorContext(0, "operator",
+ new DefaultAttributeMap(), null));
+ oioNode.setId(1);
+
+ OiOStream stream = new OiOStream();
+ SweepableReservoir reservoir = stream.getReservoir();
+ ((OiOStream.OiOReservoir)reservoir).setControlSink((oioNode).getControlSink(reservoir));
+ oioNode.connectInputPort("ip1", reservoir);
+ Sink controlSink = oioNode.getControlSink(reservoir);
+
+ TestSink testSink = new TestSink();
+ oioNode.connectOutputPort("op", testSink);
+ oioNode.firstWindowMillis = 0;
+ oioNode.windowWidthMillis = 100;
+
+ oioNode.activate();
+
+ Tuple beginWindow = new Tuple(MessageType.BEGIN_WINDOW, 0x1L);
+ controlSink.put(beginWindow);
+ Assert.assertTrue("Begin window", testSink.getResultCount() == 1);
+
+ CustomControlTuple t1 = new CustomControlTuple(new CustomControlTupleTest.TestControlTuple(1, false));
+ CustomControlTuple t2 = new CustomControlTuple(new CustomControlTupleTest.TestControlTuple(2, true));
+ CustomControlTuple t3 = new CustomControlTuple(new CustomControlTupleTest.TestControlTuple(3, false));
+ CustomControlTuple t4 = new CustomControlTuple(new CustomControlTupleTest.TestControlTuple(4, true));
+ controlSink.put(t1);
+ controlSink.put(t2);
+ controlSink.put(t3);
+ controlSink.put(t4);
+ Assert.assertTrue("Custom control tuples emitted immediately", testSink.getResultCount() == 3);
+
+ Tuple endWindow = new Tuple(MessageType.END_WINDOW, 0x1L);
+ controlSink.put(endWindow);
+
+ oioNode.deactivate();
+ oioNode.shutdown();
+
+ Assert.assertTrue("Total control tuples", testSink.getResultCount() == 6);
+
+ long expected = 0;
+ for (Object o: testSink.collectedTuples) {
+ if (o instanceof CustomControlTuple) {
+ expected++;
+ }
+ }
+ Assert.assertTrue("Number of Custom control tuples", expected == 4);
+ }
+
+ @Test
+ public void testReservoirPortMapping() throws InterruptedException
+ {
+ long maxSleep = 5000;
+ long sleeptime = 25L;
+ GenericOperator go = new GenericOperator();
+ final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, "operator",
+ new DefaultAttributeMap(), null));
+ gn.setId(1);
+ AbstractReservoir reservoir1 = AbstractReservoir.newReservoir("ip1Res", 1024);
+ AbstractReservoir reservoir2 = AbstractReservoir.newReservoir("ip2Res", 1024);
+
+ gn.connectInputPort("ip1", reservoir1);
+ gn.connectInputPort("ip2", reservoir2);
+ gn.connectOutputPort("op", Sink.BLACKHOLE);
+ gn.firstWindowMillis = 0;
+ gn.windowWidthMillis = 100;
+
+ final AtomicBoolean ab = new AtomicBoolean(false);
+ Thread t = new Thread()
+ {
+ @Override
+ public void run()
+ {
+ ab.set(true);
+ gn.activate();
+ gn.run();
+ gn.deactivate();
+ }
+ };
+ t.start();
+
+ long interval = 0;
+ do {
+ Thread.sleep(sleeptime);
+ interval += sleeptime;
+ } while ((ab.get() == false) && (interval < maxSleep));
+
+ gn.populateReservoirInputPortMap();
+
+ gn.shutdown();
+ t.join();
+
+ Assert.assertTrue("Port Mapping Size", gn.reservoirPortMap.size() == 2);
+ Assert.assertTrue("Sink 1 is not a port", gn.reservoirPortMap.get(reservoir1) instanceof Operator.InputPort);
+ Assert.assertTrue("Sink 2 is not a port", gn.reservoirPortMap.get(reservoir2) instanceof Operator.InputPort);
+ }
+
+ @Test
public void testDoubleCheckpointAtleastOnce() throws Exception
{
NodeTest.testDoubleCheckpointHandling(ProcessingMode.AT_LEAST_ONCE, true, testMeta.getDir());