You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/01/23 04:53:03 UTC
[2/3] incubator-apex-core git commit: moved attribute from context to
logical plan
moved attribute from context to logical plan
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/4d5828c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/4d5828c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/4d5828c6
Branch: refs/heads/devel-3
Commit: 4d5828c6ca48f5d28cd8c77c5706c6f72c7cd1ad
Parents: f7e1ccf
Author: Gaurav <ga...@datatorrent.com>
Authored: Wed Dec 16 06:33:54 2015 -0800
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Jan 22 19:04:27 2016 -0800
----------------------------------------------------------------------
api/src/main/java/com/datatorrent/api/Context.java | 7 -------
.../main/java/com/datatorrent/stram/engine/GenericNode.java | 3 ++-
.../java/com/datatorrent/stram/plan/logical/LogicalPlan.java | 8 +++++++-
.../com/datatorrent/stram/plan/physical/PhysicalPlan.java | 4 ++--
.../com/datatorrent/stram/plan/physical/StreamMapping.java | 2 +-
5 files changed, 12 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4d5828c6/api/src/main/java/com/datatorrent/api/Context.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java
index 58bc552..ceed8a2 100644
--- a/api/src/main/java/com/datatorrent/api/Context.java
+++ b/api/src/main/java/com/datatorrent/api/Context.java
@@ -166,13 +166,6 @@ public interface Context
*/
Attribute<Class<?>> TUPLE_CLASS = new Attribute<>(new Class2String<>());
- /**
- * Attribute of input port.
- * This is a read-only attribute to query whether the input port is connected to a DelayOperator
- * This is for iterative processing.
- */
- Attribute<Boolean> IS_CONNECTED_TO_DELAY_OPERATOR = new Attribute<>(false);
-
@SuppressWarnings("FieldNameHidesFieldInSuperclass")
long serialVersionUID = AttributeMap.AttributeInitializer.initialize(PortContext.class);
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4d5828c6/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
index 4777f93..1ccec31 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
@@ -40,6 +40,7 @@ import com.datatorrent.netlet.util.DTThrowable;
import com.datatorrent.netlet.util.CircularBuffer;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerStats;
import com.datatorrent.stram.debug.TappedReservoir;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.Operators;
import com.datatorrent.stram.tuple.ResetWindowTuple;
import com.datatorrent.stram.tuple.Tuple;
@@ -207,7 +208,7 @@ public class GenericNode extends Node<Operator>
if (pcPair == null || pcPair.context == null) {
return false;
}
- return pcPair.context.getValue(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR);
+ return pcPair.context.getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4d5828c6/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 3c26118..883ad71 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -76,6 +76,12 @@ import com.datatorrent.stram.engine.Slider;
*/
public class LogicalPlan implements Serializable, DAG
{
+ /**
+ * Attribute of input port.
+ * This is a read-only attribute to query whether the input port is connected to a DelayOperator
+ * This is for iterative processing.
+ */
+ public static final Attribute<Boolean> IS_CONNECTED_TO_DELAY_OPERATOR = new Attribute<>(false);
@SuppressWarnings("FieldNameHidesFieldInSuperclass")
private static final long serialVersionUID = -2099729915606048704L;
private static final Logger LOG = LoggerFactory.getLogger(LogicalPlan.class);
@@ -1914,7 +1920,7 @@ public class LogicalPlan implements Serializable, DAG
for (InputPortMeta sink: downStream.sinks) {
if (om.getOperator() instanceof Operator.DelayOperator) {
// this is an iteration loop, do not treat it as downstream when detecting cycles
- sink.attributes.put(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR, true);
+ sink.attributes.put(IS_CONNECTED_TO_DELAY_OPERATOR, true);
continue;
}
OperatorMeta successor = sink.getOperatorWrapper();
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4d5828c6/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index da96ef3..c696224 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -948,11 +948,11 @@ public class PhysicalPlan implements Serializable
PTOperator slidingUnifier = StreamMapping.createSlidingUnifier(sourceOut.logicalStream, this,
sourceOM.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT), slidingWindowCount);
StreamMapping.addInput(slidingUnifier, sourceOut, null);
- input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, slidingUnifier.outputs.get(0), ipm.getKey().getValue(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR));
+ input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, slidingUnifier.outputs.get(0), ipm.getKey().getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR));
sourceMapping.outputStreams.get(ipm.getValue().getSource()).slidingUnifiers.add(slidingUnifier);
}
else {
- input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, sourceOut, ipm.getKey().getValue(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR));
+ input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, sourceOut, ipm.getKey().getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR));
}
oper.inputs.add(input);
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4d5828c6/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
index 91c6eef..f30ceb6 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
@@ -347,7 +347,7 @@ public class StreamMapping implements java.io.Serializable
// link to upstream output(s) for this stream
for (PTOutput upstreamOut : sourceOper.outputs) {
if (upstreamOut.logicalStream == streamMeta) {
- PTInput input = new PTInput(ipm.getPortName(), streamMeta, oper, pks, upstreamOut, ipm.getValue(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR));
+ PTInput input = new PTInput(ipm.getPortName(), streamMeta, oper, pks, upstreamOut, ipm.getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR));
oper.inputs.add(input);
}
}