You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/01/23 04:53:03 UTC

[2/3] incubator-apex-core git commit: moved attribute from context to logical plan

moved attribute from context to logical plan


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/4d5828c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/4d5828c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/4d5828c6

Branch: refs/heads/devel-3
Commit: 4d5828c6ca48f5d28cd8c77c5706c6f72c7cd1ad
Parents: f7e1ccf
Author: Gaurav <ga...@datatorrent.com>
Authored: Wed Dec 16 06:33:54 2015 -0800
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Jan 22 19:04:27 2016 -0800

----------------------------------------------------------------------
 api/src/main/java/com/datatorrent/api/Context.java           | 7 -------
 .../main/java/com/datatorrent/stram/engine/GenericNode.java  | 3 ++-
 .../java/com/datatorrent/stram/plan/logical/LogicalPlan.java | 8 +++++++-
 .../com/datatorrent/stram/plan/physical/PhysicalPlan.java    | 4 ++--
 .../com/datatorrent/stram/plan/physical/StreamMapping.java   | 2 +-
 5 files changed, 12 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4d5828c6/api/src/main/java/com/datatorrent/api/Context.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java
index 58bc552..ceed8a2 100644
--- a/api/src/main/java/com/datatorrent/api/Context.java
+++ b/api/src/main/java/com/datatorrent/api/Context.java
@@ -166,13 +166,6 @@ public interface Context
      */
     Attribute<Class<?>> TUPLE_CLASS = new Attribute<>(new Class2String<>());
 
-    /**
-     * Attribute of input port.
-     * This is a read-only attribute to query whether the input port is connected to a DelayOperator
-     * This is for iterative processing.
-     */
-    Attribute<Boolean> IS_CONNECTED_TO_DELAY_OPERATOR = new Attribute<>(false);
-
     @SuppressWarnings("FieldNameHidesFieldInSuperclass")
     long serialVersionUID = AttributeMap.AttributeInitializer.initialize(PortContext.class);
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4d5828c6/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
index 4777f93..1ccec31 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
@@ -40,6 +40,7 @@ import com.datatorrent.netlet.util.DTThrowable;
 import com.datatorrent.netlet.util.CircularBuffer;
 import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerStats;
 import com.datatorrent.stram.debug.TappedReservoir;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
 import com.datatorrent.stram.plan.logical.Operators;
 import com.datatorrent.stram.tuple.ResetWindowTuple;
 import com.datatorrent.stram.tuple.Tuple;
@@ -207,7 +208,7 @@ public class GenericNode extends Node<Operator>
     if (pcPair == null || pcPair.context == null) {
       return false;
     }
-    return pcPair.context.getValue(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR);
+    return pcPair.context.getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4d5828c6/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 3c26118..883ad71 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -76,6 +76,12 @@ import com.datatorrent.stram.engine.Slider;
  */
 public class LogicalPlan implements Serializable, DAG
 {
+  /**
+   * Attribute of input port.
+   * This is a read-only attribute to query whether the input port is connected to a DelayOperator
+   * This is for iterative processing.
+   */
+  public static final Attribute<Boolean> IS_CONNECTED_TO_DELAY_OPERATOR = new Attribute<>(false);
   @SuppressWarnings("FieldNameHidesFieldInSuperclass")
   private static final long serialVersionUID = -2099729915606048704L;
   private static final Logger LOG = LoggerFactory.getLogger(LogicalPlan.class);
@@ -1914,7 +1920,7 @@ public class LogicalPlan implements Serializable, DAG
       for (InputPortMeta sink: downStream.sinks) {
         if (om.getOperator() instanceof Operator.DelayOperator) {
           // this is an iteration loop, do not treat it as downstream when detecting cycles
-          sink.attributes.put(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR, true);
+          sink.attributes.put(IS_CONNECTED_TO_DELAY_OPERATOR, true);
           continue;
         }
         OperatorMeta successor = sink.getOperatorWrapper();

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4d5828c6/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index da96ef3..c696224 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -948,11 +948,11 @@ public class PhysicalPlan implements Serializable
                 PTOperator slidingUnifier = StreamMapping.createSlidingUnifier(sourceOut.logicalStream, this,
                   sourceOM.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT), slidingWindowCount);
                 StreamMapping.addInput(slidingUnifier, sourceOut, null);
-                input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, slidingUnifier.outputs.get(0), ipm.getKey().getValue(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR));
+                input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, slidingUnifier.outputs.get(0), ipm.getKey().getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR));
                 sourceMapping.outputStreams.get(ipm.getValue().getSource()).slidingUnifiers.add(slidingUnifier);
               }
               else {
-                input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, sourceOut, ipm.getKey().getValue(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR));
+                input = new PTInput(ipm.getKey().getPortName(), ipm.getValue(), oper, null, sourceOut, ipm.getKey().getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR));
               }
               oper.inputs.add(input);
             }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4d5828c6/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
index 91c6eef..f30ceb6 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
@@ -347,7 +347,7 @@ public class StreamMapping implements java.io.Serializable
     // link to upstream output(s) for this stream
     for (PTOutput upstreamOut : sourceOper.outputs) {
       if (upstreamOut.logicalStream == streamMeta) {
-        PTInput input = new PTInput(ipm.getPortName(), streamMeta, oper, pks, upstreamOut, ipm.getValue(PortContext.IS_CONNECTED_TO_DELAY_OPERATOR));
+        PTInput input = new PTInput(ipm.getPortName(), streamMeta, oper, pks, upstreamOut, ipm.getValue(LogicalPlan.IS_CONNECTED_TO_DELAY_OPERATOR));
         oper.inputs.add(input);
       }
     }