You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2015/08/26 18:37:45 UTC

incubator-apex-core git commit: ability to hide ports

Repository: incubator-apex-core
Updated Branches:
  refs/heads/devel-3 f28d8d742 -> 71a915aa3


ability to hide ports


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/71a915aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/71a915aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/71a915aa

Branch: refs/heads/devel-3
Commit: 71a915aa3fd2169b0c0b9157dd11d22b110ec0b1
Parents: f28d8d7
Author: Chandni Singh <ch...@datatorrent.com>
Authored: Mon Aug 10 22:21:17 2015 -0700
Committer: Chandni Singh <cs...@apache.org>
Committed: Tue Aug 25 17:41:14 2015 -0700

----------------------------------------------------------------------
 .../stram/plan/logical/LogicalPlan.java         |  47 ++++++--
 .../datatorrent/stram/plan/LogicalPlanTest.java | 116 ++++++++++++++++---
 2 files changed, 135 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/71a915aa/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index b9c7c19..6741d37 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -189,6 +189,8 @@ public class LogicalPlan implements Serializable, DAG
     private InputPortFieldAnnotation portAnnotation;
     private AppData.QueryPort adqAnnotation;
     private final Attribute.AttributeMap attributes = new DefaultAttributeMap();
+    //This is null when port is not hidden
+    private Class<?> classDeclaringHiddenPort;
 
     public OperatorMeta getOperatorWrapper()
     {
@@ -267,6 +269,8 @@ public class LogicalPlan implements Serializable, DAG
     private OutputPortFieldAnnotation portAnnotation;
     private AppData.ResultPort adrAnnotation;
     private final DefaultAttributeMap attributes;
+    //This is null when port is not hidden
+    private Class<?> classDeclaringHiddenPort;
 
     public OutputPortMeta()
     {
@@ -897,7 +901,7 @@ public class LogicalPlan implements Serializable, DAG
             if (pm.operatorMeta == OperatorMeta.this && pm.fieldName.equals(field.getName())) {
               //LOG.debug("Found existing port meta for: " + field);
               inPortMap.put(portObject, pm);
-              checkDuplicateName(pm.getPortName(), pm);
+              markInputPortIfHidden(pm.getPortName(), pm, field.getDeclaringClass());
               return;
             }
           }
@@ -908,7 +912,7 @@ public class LogicalPlan implements Serializable, DAG
         metaPort.portAnnotation = portAnnotation;
         metaPort.adqAnnotation = adqAnnotation;
         inPortMap.put(portObject, metaPort);
-        checkDuplicateName(metaPort.getPortName(), metaPort);
+        markInputPortIfHidden(metaPort.getPortName(), metaPort, field.getDeclaringClass());
       }
 
       @Override
@@ -920,7 +924,7 @@ public class LogicalPlan implements Serializable, DAG
             if (pm.operatorMeta == OperatorMeta.this && pm.fieldName.equals(field.getName())) {
               //LOG.debug("Found existing port meta for: " + field);
               outPortMap.put(portObject, pm);
-              checkDuplicateName(pm.getPortName(), pm);
+              markOutputPortIfHidden(pm.getPortName(), pm, field.getDeclaringClass());
               return;
             }
           }
@@ -931,14 +935,27 @@ public class LogicalPlan implements Serializable, DAG
         metaPort.portAnnotation = portAnnotation;
         metaPort.adrAnnotation = adrAnnotation;
         outPortMap.put(portObject, metaPort);
-        checkDuplicateName(metaPort.getPortName(), metaPort);
+        markOutputPortIfHidden(metaPort.getPortName(), metaPort, field.getDeclaringClass());
       }
 
-      private void checkDuplicateName(String portName, Object portMeta) {
-        Object existingValue = portNameMap.put(portName, portMeta);
-        if (existingValue != null) {
-          String msg = String.format("Port name %s of %s duplicates %s", portName, portMeta, existingValue);
-          throw new IllegalArgumentException(msg);
+      private void markOutputPortIfHidden(String portName, OutputPortMeta portMeta, Class<?> declaringClass)
+      {
+        if (!portNameMap.containsKey(portName)) {
+          portNameMap.put(portName, portMeta);
+        } else {
+          // make the port optional
+          portMeta.classDeclaringHiddenPort = declaringClass;
+        }
+
+      }
+
+      private void markInputPortIfHidden(String portName, InputPortMeta portMeta, Class<?> declaringClass)
+      {
+        if (!portNameMap.containsKey(portName)) {
+          portNameMap.put(portName, portMeta);
+        } else {
+          // make the port optional
+          portMeta.classDeclaringHiddenPort = declaringClass;
         }
       }
     }
@@ -1350,10 +1367,14 @@ public class LogicalPlan implements Serializable, DAG
       for (InputPortMeta pm: portMapping.inPortMap.values()) {
         StreamMeta sm = n.inputStreams.get(pm);
         if (sm == null) {
-          if (pm.portAnnotation == null || !pm.portAnnotation.optional()) {
+          if ((pm.portAnnotation == null || !pm.portAnnotation.optional()) && pm.classDeclaringHiddenPort == null) {
             throw new ValidationException("Input port connection required: " + n.name + "." + pm.getPortName());
           }
         } else {
+          if (pm.classDeclaringHiddenPort != null) {
+            throw new ValidationException(String.format("Invalid port connected: %s.%s is hidden by %s.%s", pm.classDeclaringHiddenPort.getName(),
+              pm.getPortName(), pm.operatorMeta.getOperator().getClass().getName(), pm.getPortName()));
+          }
           // check locality constraints
           DAG.Locality locality = sm.getLocality();
           if (locality == DAG.Locality.THREAD_LOCAL) {
@@ -1374,11 +1395,15 @@ public class LogicalPlan implements Serializable, DAG
       boolean allPortsOptional = true;
       for (OutputPortMeta pm: portMapping.outPortMap.values()) {
         if (!n.outputStreams.containsKey(pm)) {
-          if (pm.portAnnotation != null && !pm.portAnnotation.optional()) {
+          if ((pm.portAnnotation != null && !pm.portAnnotation.optional()) && pm.classDeclaringHiddenPort == null) {
             throw new ValidationException("Output port connection required: " + n.name + "." + pm.getPortName());
           }
         } else {
           //port is connected
+          if (pm.classDeclaringHiddenPort != null) {
+            throw new ValidationException(String.format("Invalid port connected: %s.%s is hidden by %s.%s", pm.classDeclaringHiddenPort.getName(),
+              pm.getPortName(), pm.operatorMeta.getOperator().getClass().getName(), pm.getPortName()));
+          }
           if (pm.portAnnotation != null && pm.portAnnotation.schemaRequired()) {
             //since schema is required, the port attribute TUPLE_CLASS should be present
             if (pm.attributes.get(PortContext.TUPLE_CLASS) == null) {

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/71a915aa/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanTest.java b/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanTest.java
index 5bda8ee..ac05bed 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanTest.java
@@ -613,23 +613,6 @@ public class LogicalPlanTest {
 
   }
 
-  public class DuplicatePortOperator extends GenericTestOperator {
-    @SuppressWarnings("FieldNameHidesFieldInSuperclass")
-    final public transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<Object>();
-  }
-
-  @Test
-  public void testDuplicatePort() {
-    LogicalPlan dag = new LogicalPlan();
-    DuplicatePortOperator o1 = dag.addOperator("o1", new DuplicatePortOperator());
-    try {
-      dag.setOutputPortAttribute(o1.outport1, PortContext.QUEUE_CAPACITY, 0);
-      Assert.fail("Should detect duplicate port");
-    } catch (IllegalArgumentException e) {
-      // expected
-    }
-  }
-
   /**
    * Operator that can be used with default Java serialization instead of Kryo
    */
@@ -846,6 +829,105 @@ public class LogicalPlanTest {
   {
   }
 
+  @Test
+  public void testInputPortHiding()
+  {
+    LogicalPlan dag = new LogicalPlan();
+    TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
+    Operator2 operator2 = dag.addOperator("operator2", new Operator2());
+    dag.addStream("Stream1", input1.outport, operator2.input);
+    dag.validate();
+  }
+
+  @Test
+  public void testInvalidInputPortConnection()
+  {
+    LogicalPlan dag = new LogicalPlan();
+    TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
+    Operator1 operator1 = dag.addOperator("operator3", new Operator3());
+    dag.addStream("Stream1", input1.outport, operator1.input);
+    try {
+      dag.validate();
+    } catch (ValidationException ex) {
+      Assert.assertTrue("validation message", ex.getMessage().startsWith("Invalid port connected"));
+      return;
+    }
+    Assert.fail();
+  }
+
+  class Operator1 extends BaseOperator
+  {
+    public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
+    {
+      @Override
+      public void process(Object tuple)
+      {
+
+      }
+    };
+  }
+
+  class Operator2 extends Operator1
+  {
+    public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
+    {
+      @Override
+      public void process(Object tuple)
+      {
+
+      }
+    };
+  }
+
+  class Operator3 extends Operator1
+  {
+    @InputPortFieldAnnotation(optional = true)
+    public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
+    {
+      @Override
+      public void process(Object tuple)
+      {
+
+      }
+    };
+  }
+
+  @Test
+  public void testOutputPortHiding()
+  {
+    LogicalPlan dag = new LogicalPlan();
+    Operator5 operator5 = dag.addOperator("input", new Operator5());
+    Operator2 operator2 = dag.addOperator("operator2", new Operator2());
+    dag.addStream("Stream1", operator5.output, operator2.input);
+    dag.validate();
+  }
+
+  @Test(expected = ValidationException.class)
+  public void testInvalidOutputPortConnection()
+  {
+    LogicalPlan dag = new LogicalPlan();
+    Operator4 operator4 = dag.addOperator("input", new Operator5());
+    Operator3 operator3 = dag.addOperator("operator3", new Operator3());
+    dag.addStream("Stream1", operator4.output, operator3.input);
+    dag.validate();
+  }
+
+  class Operator4 extends BaseOperator implements InputOperator
+  {
+    public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<>();
+
+    @Override
+    public void emitTuples()
+    {
+
+    }
+  }
+
+  class Operator5 extends Operator4
+  {
+    public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<>();
+  }
+
   /*
   These were tests for operator semantics that verified if an operator class implements InputOperator then the same class should not declare input ports.
   This would be done later when we are able to verify user code at compile-time.