You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2015/09/10 00:12:42 UTC

[11/50] incubator-apex-core git commit: Deprecated name property from BaseOperator

Deprecated name property from BaseOperator


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/fe5d0356
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/fe5d0356
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/fe5d0356

Branch: refs/heads/master
Commit: fe5d03560804a151e4c415939f3ca7edbd686b15
Parents: 19d6658
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Mon Aug 3 14:16:52 2015 -0700
Committer: Timothy Farkas <ti...@datatorrent.com>
Committed: Thu Aug 6 09:37:59 2015 -0700

----------------------------------------------------------------------
 .../datatorrent/common/util/BaseOperator.java   |  2 +
 engine/pom.xml                                  | 10 ++-
 .../stram/plan/logical/LogicalPlan.java         |  6 +-
 .../com/datatorrent/stram/StreamCodecTest.java  | 45 +++++------
 .../stram/StreamingContainerManagerTest.java    | 12 +--
 .../datatorrent/stram/plan/LogicalPlanTest.java | 16 ++--
 .../stram/plan/physical/PhysicalPlanTest.java   |  9 +--
 .../stram/webapp/OperatorDiscoveryTest.java     | 78 ++++++++++----------
 8 files changed, 91 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fe5d0356/common/src/main/java/com/datatorrent/common/util/BaseOperator.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/BaseOperator.java b/common/src/main/java/com/datatorrent/common/util/BaseOperator.java
index 0c2f8b3..f653d14 100644
--- a/common/src/main/java/com/datatorrent/common/util/BaseOperator.java
+++ b/common/src/main/java/com/datatorrent/common/util/BaseOperator.java
@@ -33,6 +33,7 @@ public class BaseOperator implements Operator
   /**
    * @return the name property of the operator.
    */
+  @Deprecated
   public String getName()
   {
     return name;
@@ -43,6 +44,7 @@ public class BaseOperator implements Operator
    *
    * @param name
    */
+  @Deprecated
   public void setName(String name)
   {
     this.name = name;

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fe5d0356/engine/pom.xml
----------------------------------------------------------------------
diff --git a/engine/pom.xml b/engine/pom.xml
index 1e6a7ed..c91265c 100644
--- a/engine/pom.xml
+++ b/engine/pom.xml
@@ -229,9 +229,15 @@
     </dependency>
     <dependency>
       <groupId>org.mockito</groupId>
-      <artifactId>mockito-all</artifactId>
-      <version>1.8.5</version>
+      <artifactId>mockito-core</artifactId>
+      <version>1.10.19</version>
       <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.hamcrest</groupId>
+          <artifactId>hamcrest-core</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>net.lingala.zip4j</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fe5d0356/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index fc182cd..9bfc2bd 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -854,10 +854,6 @@ public class LogicalPlan implements Serializable, DAG
   @Override
   public <T extends Operator> T addOperator(String name, T operator)
   {
-    // TODO: optional interface to provide contextual information to instance
-    if (operator instanceof BaseOperator) {
-      ((BaseOperator)operator).setName(name);
-    }
     if (operators.containsKey(name)) {
       if (operators.get(name) == (Object)operator) {
         return operator;
@@ -1219,7 +1215,7 @@ public class LogicalPlan implements Serializable, DAG
       }
     }
 
-    // Validate root operators are input operators 
+    // Validate root operators are input operators
     for (OperatorMeta om : this.rootOperators) {
       if (!(om.getOperator() instanceof InputOperator)) {
         throw new ValidationException(String.format("Root operator: %s is not a Input operator",

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fe5d0356/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
index 046425f..9726e65 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
@@ -73,14 +73,14 @@ public class StreamCodecTest
     LogicalPlan.OperatorMeta n2meta = dag.getMeta(node2);
     LogicalPlan.OperatorMeta n3meta = dag.getMeta(node3);
 
-    OperatorDeployInfo n1di = getSingleOperatorDeployInfo(node1, node1.getName(), dnm);
+    OperatorDeployInfo n1di = getSingleOperatorDeployInfo(node1, dnm);
 
     OperatorDeployInfo.OutputDeployInfo n1odi = getOutputDeployInfo(n1di, n1meta.getMeta(node1.outport1));
     String id = n1meta.getName() + " " + n1odi.portName;
     Assert.assertEquals("number stream codecs " + id, n1odi.streamCodecs.size(), 1);
     Assert.assertTrue("No user set stream codec", n1odi.streamCodecs.containsValue(null));
 
-    OperatorDeployInfo n2di = getSingleOperatorDeployInfo(node2, node2.getName(), dnm);
+    OperatorDeployInfo n2di = getSingleOperatorDeployInfo(node2, dnm);
 
     OperatorDeployInfo.InputDeployInfo n2idi = getInputDeployInfo(n2di, n2meta.getMeta(node2.inport1));
     id = n2meta.getName() + " " + n2idi.portName;
@@ -93,7 +93,7 @@ public class StreamCodecTest
     checkPresentStreamCodec(n3meta, node3.inport1, n2odi.streamCodecs, id, plan);
 
 
-    OperatorDeployInfo n3di = getSingleOperatorDeployInfo(node3, node3.getName(), dnm);
+    OperatorDeployInfo n3di = getSingleOperatorDeployInfo(node3, dnm);
 
     OperatorDeployInfo.InputDeployInfo n3idi = getInputDeployInfo(n3di, n3meta.getMeta(node3.inport1));
     id = n3meta.getName() + " " + n3idi.portName;
@@ -139,12 +139,12 @@ public class StreamCodecTest
       StreamingContainerManagerTest.assignContainer(dnm, "container" + (i + 1));
     }
 
-    getSingleOperatorDeployInfo(node1, node1.getName(), dnm);
-    getSingleOperatorDeployInfo(node2, node2.getName(), dnm);
-    getSingleOperatorDeployInfo(node3, node3.getName(), dnm);
-    getSingleOperatorDeployInfo(node4, node4.getName(), dnm);
-    getSingleOperatorDeployInfo(node5, node5.getName(), dnm);
-    getSingleOperatorDeployInfo(node6, node6.getName(), dnm);
+    getSingleOperatorDeployInfo(node1, dnm);
+    getSingleOperatorDeployInfo(node2, dnm);
+    getSingleOperatorDeployInfo(node3, dnm);
+    getSingleOperatorDeployInfo(node4, dnm);
+    getSingleOperatorDeployInfo(node5, dnm);
+    getSingleOperatorDeployInfo(node6, dnm);
     Assert.assertEquals("number of stream codec identifiers", 3, plan.getStreamCodecIdentifiers().size());
   }
 
@@ -180,14 +180,14 @@ public class StreamCodecTest
     LogicalPlan.OperatorMeta n2meta = dag.getMeta(node2);
     LogicalPlan.OperatorMeta n3meta = dag.getMeta(node3);
 
-    OperatorDeployInfo n1di = getSingleOperatorDeployInfo(node1, node1.getName(), dnm);
+    OperatorDeployInfo n1di = getSingleOperatorDeployInfo(node1, dnm);
 
     OperatorDeployInfo.OutputDeployInfo n1odi = getOutputDeployInfo(n1di, n1meta.getMeta(node1.outport1));
     String id = n1meta.getName() + " " + n1odi.portName;
     Assert.assertEquals("number stream codecs " + id, n1odi.streamCodecs.size(), 1);
     checkPresentStreamCodec(n2meta, node2.inportWithCodec, n1odi.streamCodecs, id, plan);
 
-    OperatorDeployInfo n2di = getSingleOperatorDeployInfo(node2, node2.getName(), dnm);
+    OperatorDeployInfo n2di = getSingleOperatorDeployInfo(node2, dnm);
 
     OperatorDeployInfo.InputDeployInfo n2idi = getInputDeployInfo(n2di, n2meta.getMeta(node2.inportWithCodec));
     id = n2meta.getName() + " " + n2idi.portName;
@@ -199,7 +199,7 @@ public class StreamCodecTest
     Assert.assertEquals("number stream codecs " + id, n2odi.streamCodecs.size(), 1);
     checkPresentStreamCodec(n3meta, node3.inportWithCodec, n2odi.streamCodecs, id, plan);
 
-    OperatorDeployInfo n3di = getSingleOperatorDeployInfo(node3, node3.getName(), dnm);
+    OperatorDeployInfo n3di = getSingleOperatorDeployInfo(node3, dnm);
 
     OperatorDeployInfo.InputDeployInfo n3idi = getInputDeployInfo(n3di, n3meta.getMeta(node3.inportWithCodec));
     id = n3meta.getName() + " " + n3idi.portName;
@@ -238,7 +238,7 @@ public class StreamCodecTest
     LogicalPlan.OperatorMeta n1meta = dag.getMeta(node1);
     LogicalPlan.OperatorMeta n2meta = dag.getMeta(node2);
 
-    OperatorDeployInfo n1di = getSingleOperatorDeployInfo(node1, node1.getName(), dnm);
+    OperatorDeployInfo n1di = getSingleOperatorDeployInfo(node1, dnm);
 
     OperatorDeployInfo.OutputDeployInfo n1odi = getOutputDeployInfo(n1di, n1meta.getMeta(node1.outport1));
     String id = n1meta.getName() + " " + n1odi.portName;
@@ -449,21 +449,21 @@ public class StreamCodecTest
     LogicalPlan.OperatorMeta n2meta = dag.getMeta(node2);
     LogicalPlan.OperatorMeta n3meta = dag.getMeta(node3);
 
-    OperatorDeployInfo n1di = getSingleOperatorDeployInfo(node1, node1.getName(), dnm);
+    OperatorDeployInfo n1di = getSingleOperatorDeployInfo(node1, dnm);
 
     OperatorDeployInfo.OutputDeployInfo n1odi = getOutputDeployInfo(n1di, n1meta.getMeta(node1.outport1));
     String id = n1meta.getName() + " " + n1odi.portName;
     Assert.assertEquals("number stream codecs " + id, n1odi.streamCodecs.size(), 1);
     checkPresentStreamCodec(n2meta, node2.inport1, n1odi.streamCodecs, id, plan);
 
-    OperatorDeployInfo n2di = getSingleOperatorDeployInfo(node2, node2.getName(), dnm);
+    OperatorDeployInfo n2di = getSingleOperatorDeployInfo(node2, dnm);
 
     OperatorDeployInfo.InputDeployInfo n2idi = getInputDeployInfo(n2di, n2meta.getMeta(node2.inport1));
     id = n2meta.getName() + " " + n2idi.portName;
     Assert.assertEquals("number stream codecs " + id, n2idi.streamCodecs.size(), 1);
     checkPresentStreamCodec(n2meta, node2.inport1, n2idi.streamCodecs, id, plan);
 
-    OperatorDeployInfo n3di = getSingleOperatorDeployInfo(node3, node3.getName(), dnm);
+    OperatorDeployInfo n3di = getSingleOperatorDeployInfo(node3, dnm);
 
     OperatorDeployInfo.InputDeployInfo n3idi = getInputDeployInfo(n3di, n3meta.getMeta(node3.inport1));
     id = n3meta.getName() + " " + n3idi.portName;
@@ -584,7 +584,7 @@ public class StreamCodecTest
     LogicalPlan.OperatorMeta n2meta = dag.getMeta(node2);
     LogicalPlan.OperatorMeta n3meta = dag.getMeta(node3);
 
-    OperatorDeployInfo n1di = getSingleOperatorDeployInfo(node1, node1.getName(), dnm);
+    OperatorDeployInfo n1di = getSingleOperatorDeployInfo(node1, dnm);
 
     OperatorDeployInfo.OutputDeployInfo n1odi = getOutputDeployInfo(n1di, n1meta.getMeta(node1.outport1));
     String id = n1meta.getName() + " " + n1odi.portName;
@@ -592,14 +592,14 @@ public class StreamCodecTest
     checkPresentStreamCodec(n2meta, node2.inport1, n1odi.streamCodecs, id, plan);
     checkPresentStreamCodec(n3meta, node3.inport1, n1odi.streamCodecs, id, plan);
 
-    OperatorDeployInfo n2di = getSingleOperatorDeployInfo(node2, node2.getName(), dnm);
+    OperatorDeployInfo n2di = getSingleOperatorDeployInfo(node2, dnm);
 
     OperatorDeployInfo.InputDeployInfo n2idi = getInputDeployInfo(n2di, n2meta.getMeta(node2.inport1));
     id = n2meta.getName() + " " + n2idi.portName;
     Assert.assertEquals("number stream codecs " + id, n2idi.streamCodecs.size(), 1);
     checkPresentStreamCodec(n2meta, node2.inport1, n2idi.streamCodecs, id, plan);
 
-    OperatorDeployInfo n3di = getSingleOperatorDeployInfo(node3, node3.getName(), dnm);
+    OperatorDeployInfo n3di = getSingleOperatorDeployInfo(node3, dnm);
 
     OperatorDeployInfo.InputDeployInfo n3idi = getInputDeployInfo(n3di, n3meta.getMeta(node3.inport1));
     id = n3meta.getName() + " " + n3idi.portName;
@@ -855,14 +855,14 @@ public class StreamCodecTest
 
     Assert.assertNotNull("non inline operator is null", nonInlineOperator);
 
-    OperatorDeployInfo n1di = getSingleOperatorDeployInfo(node1, node1.getName(), dnm);
+    OperatorDeployInfo n1di = getSingleOperatorDeployInfo(node1, dnm);
 
     OperatorDeployInfo.OutputDeployInfo n1odi = getOutputDeployInfo(n1di, n1meta.getMeta(node1.outport1));
     String id = n1meta.getName() + " " + n1odi.portName;
     Assert.assertEquals("number stream codecs " + id, n1odi.streamCodecs.size(), 1);
     checkPresentStreamCodec(nonInlineMeta, niInputPort, n1odi.streamCodecs, id, plan);
 
-    OperatorDeployInfo odi = getSingleOperatorDeployInfo(nonInlineOperator, nonInlineOperator.getName(), dnm);
+    OperatorDeployInfo odi = getSingleOperatorDeployInfo(nonInlineOperator, dnm);
 
     OperatorDeployInfo.InputDeployInfo idi = getInputDeployInfo(odi, nonInlineMeta.getMeta(niInputPort));
     id = nonInlineMeta.getName() + " " + idi.portName;
@@ -1218,9 +1218,10 @@ public class StreamCodecTest
     Assert.assertEquals("stream codec not same " + id, opStreamCodecInfo, streamCodecInfo);
   }
 
-  private OperatorDeployInfo getSingleOperatorDeployInfo(Operator oper, String id, StreamingContainerManager scm)
+  private OperatorDeployInfo getSingleOperatorDeployInfo(Operator oper, StreamingContainerManager scm)
   {
     LogicalPlan dag = scm.getLogicalPlan();
+    String id = dag.getMeta(oper).toString();
     PhysicalPlan plan = scm.getPhysicalPlan();
     List<PTOperator> operators = plan.getOperators(dag.getMeta(oper));
     Assert.assertEquals("number of operators " + id, 1, operators.size());

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fe5d0356/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
index ba15a78..38a54f0 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
@@ -188,8 +188,8 @@ public class StreamingContainerManagerTest {
     Assert.assertEquals("number operators assigned to container", 3, c2.size());
     OperatorDeployInfo o2DI = getNodeDeployInfo(c2, dag.getMeta(o2));
     OperatorDeployInfo o3DI = getNodeDeployInfo(c2, dag.getMeta(o3));
-    Assert.assertNotNull(o2.getName() + " assigned to " + sca2.container.getExternalId(), o2DI);
-    Assert.assertNotNull(o3.getName() + " assigned to " + sca2.container.getExternalId(), o3DI);
+    Assert.assertNotNull(dag.getMeta(o2) + " assigned to " + sca2.container.getExternalId(), o2DI);
+    Assert.assertNotNull(dag.getMeta(o3) + " assigned to " + sca2.container.getExternalId(), o3DI);
 
     Assert.assertTrue("The buffer server memory for container 1", 256 == sca1.getInitContext().getValue(ContainerContext.BUFFER_SERVER_MB));
     Assert.assertTrue("The buffer server memory for container 2", 0 == sca2.getInitContext().getValue(ContainerContext.BUFFER_SERVER_MB));
@@ -218,7 +218,7 @@ public class StreamingContainerManagerTest {
 
     // THREAD_LOCAL o4.inport1
     OperatorDeployInfo o4DI = getNodeDeployInfo(c2, dag.getMeta(o4));
-    Assert.assertNotNull(o4.getName() + " assigned to " + sca2.container.getExternalId(), o4DI);
+    Assert.assertNotNull(dag.getMeta(o4) + " assigned to " + sca2.container.getExternalId(), o4DI);
     InputDeployInfo c2o4i1 = getInputDeployInfo(o4DI, "o3.outport1");
     Assert.assertNotNull("input from o3.outport1", c2o4i1);
     Assert.assertEquals("portName " + c2o4i1, GenericTestOperator.IPORT1, c2o4i1.portName);
@@ -271,7 +271,7 @@ public class StreamingContainerManagerTest {
     StreamingContainerAgent sca1 = dnm.getContainerAgent(c.getExternalId());
     List<OperatorDeployInfo> c1 = getDeployInfo(sca1);
     Assert.assertEquals("number operators assigned to container", 1, c1.size());
-    Assert.assertTrue(node2.getName() + " assigned to " + sca1.container.getExternalId(), containsNodeContext(c1, dag.getMeta(node1)));
+    Assert.assertTrue(dag.getMeta(node2) + " assigned to " + sca1.container.getExternalId(), containsNodeContext(c1, dag.getMeta(node1)));
 
     List<PTOperator> o2Partitions = plan.getOperators(dag.getMeta(node2));
     Assert.assertEquals("number partitions", TestStaticPartitioningSerDe.partitions.length, o2Partitions.size());
@@ -280,7 +280,7 @@ public class StreamingContainerManagerTest {
       String containerId = o2Partitions.get(i).getContainer().getExternalId();
       List<OperatorDeployInfo> cc = getDeployInfo(dnm.getContainerAgent(containerId));
       Assert.assertEquals("number operators assigned to container", 1, cc.size());
-      Assert.assertTrue(node2.getName() + " assigned to " + containerId, containsNodeContext(cc, dag.getMeta(node2)));
+      Assert.assertTrue(dag.getMeta(node2) + " assigned to " + containerId, containsNodeContext(cc, dag.getMeta(node2)));
 
       // n1n2 in, mergeStream out
       OperatorDeployInfo ndi = cc.get(0);
@@ -338,7 +338,7 @@ public class StreamingContainerManagerTest {
     Assert.assertEquals("number operators " + cmerge, 1, cmerge.size());
 
     OperatorDeployInfo node3DI = getNodeDeployInfo(cmerge,  dag.getMeta(node3));
-    Assert.assertNotNull(node3.getName() + " assigned", node3DI);
+    Assert.assertNotNull(dag.getMeta(node3) + " assigned", node3DI);
     Assert.assertEquals("inputs " + node3DI, 1, node3DI.inputs.size());
     InputDeployInfo node3In = node3DI.inputs.get(0);
     Assert.assertEquals("streamName " + node3In, n2n3.getName(), node3In.declaredStreamId);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fe5d0356/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanTest.java b/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanTest.java
index 24a9031..5bda8ee 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanTest.java
@@ -89,16 +89,16 @@ public class LogicalPlanTest {
      dag.findStronglyConnected(dag.getMeta(operator7), cycles);
      assertEquals("operator self reference", 1, cycles.size());
      assertEquals("operator self reference", 1, cycles.get(0).size());
-     assertEquals("operator self reference", operator7.getName(), cycles.get(0).get(0));
+     assertEquals("operator self reference", dag.getMeta(operator7).getName(), cycles.get(0).get(0));
 
      // 3 operator cycle
      cycles.clear();
      dag.findStronglyConnected(dag.getMeta(operator4), cycles);
      assertEquals("3 operator cycle", 1, cycles.size());
      assertEquals("3 operator cycle", 3, cycles.get(0).size());
-     assertTrue("operator2", cycles.get(0).contains(operator2.getName()));
-     assertTrue("operator3", cycles.get(0).contains(operator3.getName()));
-     assertTrue("operator4", cycles.get(0).contains(operator4.getName()));
+     assertTrue("operator2", cycles.get(0).contains(dag.getMeta(operator2).getName()));
+     assertTrue("operator3", cycles.get(0).contains(dag.getMeta(operator3).getName()));
+     assertTrue("operator4", cycles.get(0).contains(dag.getMeta(operator4).getName()));
 
      try {
        dag.validate();
@@ -294,7 +294,7 @@ public class LogicalPlanTest {
       Assert.fail("should throw ConstraintViolationException");
     } catch (ConstraintViolationException e) {
       Assert.assertEquals("violation details", constraintViolations, e.getConstraintViolations());
-      String expRegex = ".*ValidationTestOperator\\{name=testOperator}, propertyPath='intField1', message='must be greater than or equal to 2',.*value=1}]";
+      String expRegex = ".*ValidationTestOperator\\{name=null}, propertyPath='intField1', message='must be greater than or equal to 2',.*value=1}]";
       Assert.assertThat("exception message", e.getMessage(), RegexMatcher.matches(expRegex));
     }
 
@@ -396,7 +396,7 @@ public class LogicalPlanTest {
       dag.validate();
       Assert.fail("should raise operator is not partitionable for operator1");
     } catch (ValidationException e) {
-      Assert.assertEquals("", "Operator " + operator.getName() + " provides partitioning capabilities but the annotation on the operator class declares it non partitionable!", e.getMessage());
+      Assert.assertEquals("", "Operator " + dag.getMeta(operator).getName() + " provides partitioning capabilities but the annotation on the operator class declares it non partitionable!", e.getMessage());
     }
 
     dag.setAttribute(operator, OperatorContext.PARTITIONER, null);
@@ -406,7 +406,7 @@ public class LogicalPlanTest {
       dag.validate();
       Assert.fail("should raise operator is not partitionable for operator1");
     } catch (ValidationException e) {
-      Assert.assertEquals("", "Operator " + operator.getName() + " is not partitionable but PARTITION_PARALLEL attribute is set", e.getMessage());
+      Assert.assertEquals("", "Operator " + dag.getMeta(operator).getName() + " is not partitionable but PARTITION_PARALLEL attribute is set", e.getMessage());
     }
 
     dag.setInputPortAttribute(operator.input1, PortContext.PARTITION_PARALLEL, false);
@@ -419,7 +419,7 @@ public class LogicalPlanTest {
       dag.validate();
       Assert.fail("should raise operator is not partitionable for operator2");
     } catch (ValidationException e) {
-      Assert.assertEquals("Operator " + operator2.getName() + " provides partitioning capabilities but the annotation on the operator class declares it non partitionable!", e.getMessage());
+      Assert.assertEquals("Operator " + dag.getMeta(operator2).getName() + " provides partitioning capabilities but the annotation on the operator class declares it non partitionable!", e.getMessage());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fe5d0356/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java b/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
index 9382a4b..ccf930f 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
@@ -180,7 +180,6 @@ public class PhysicalPlanTest
         p.getPartitionKeys().put(this.inport1, lpks);
         p.getPartitionKeys().put(this.inportWithCodec, lpks);
         p.getPartitionedInstance().pks = p.getPartitionKeys().values().toString();
-        p.getPartitionedInstance().setName(p.getPartitionKeys().values().toString());
         newPartitions.add(p);
       }
 
@@ -252,7 +251,7 @@ public class PhysicalPlanTest
     dag.addStream("node1.outport1", node1.outport1, node2.inport2, node2.inport1);
 
     int initialPartitionCount = 5;
-    OperatorMeta node2Decl = dag.getOperatorMeta(node2.getName());
+    OperatorMeta node2Decl = dag.getMeta(node2);
     node2Decl.getAttributes().put(OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(initialPartitionCount));
 
     PhysicalPlan plan = new PhysicalPlan(dag, new TestPlanContext());
@@ -350,7 +349,7 @@ public class PhysicalPlanTest
 
     dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 2);
 
-    OperatorMeta o2Meta = dag.getOperatorMeta(o2.getName());
+    OperatorMeta o2Meta = dag.getMeta(o2);
     o2Meta.getAttributes().put(OperatorContext.STATS_LISTENERS,
                                Lists.newArrayList((StatsListener) new PartitionLoadWatch(0, 5)));
     o2Meta.getAttributes().put(OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(1));
@@ -441,7 +440,7 @@ public class PhysicalPlanTest
   public void testInputOperatorPartitioning() {
     LogicalPlan dag = new LogicalPlan();
     TestInputOperator<Object> o1 = dag.addOperator("o1", new TestInputOperator<Object>());
-    OperatorMeta o1Meta = dag.getOperatorMeta(o1.getName());
+    OperatorMeta o1Meta = dag.getMeta(o1);
     dag.setAttribute(o1, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
     dag.setAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<TestInputOperator<Object>>(2));
 
@@ -509,7 +508,7 @@ public class PhysicalPlanTest
 
     dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 2);
 
-    OperatorMeta node2Meta = dag.getOperatorMeta(o2.getName());
+    OperatorMeta node2Meta = dag.getMeta(o2);
     node2Meta.getAttributes().put(OperatorContext.STATS_LISTENERS,
                                   Lists.newArrayList((StatsListener) new PartitionLoadWatch(3, 5)));
     node2Meta.getAttributes().put(OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(8));

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fe5d0356/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java b/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java
index ad915c8..8baa08a 100644
--- a/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java
@@ -57,7 +57,7 @@ import com.google.common.collect.Lists;
 public class OperatorDiscoveryTest
 {
 //  private static final Logger LOG = LoggerFactory.getLogger(OperatorDiscoveryTest.class);
-  
+
   public static class GenericClassBase<T> extends BaseOperator
   {
     private int A;
@@ -271,7 +271,7 @@ public class OperatorDiscoveryTest
   @Test
   public void testPropertyDiscovery() throws Exception
   {
-    
+
     String[] classFilePath = getClassFileInClasspath();
     OperatorDiscoverer od = new OperatorDiscoverer(classFilePath);
     od.buildTypeGraph();
@@ -343,8 +343,8 @@ public class OperatorDiscoveryTest
     props = desc.getJSONArray("properties");
     genericArray = getJSONProperty(props, "genericArray");
     Assert.assertEquals(debug + "type " + genericArray, String[].class.getName(), genericArray.get("type"));
-    
-    
+
+
     // Test complicated Type Variable override in Hierarchy
     desc = od.describeClassByASM(SubSubClass.class.getName());
     props = desc.getJSONArray("properties");
@@ -554,9 +554,9 @@ public class OperatorDiscoveryTest
         return false;
       return true;
     }
-    
-    
-    
+
+
+
 
   }
 
@@ -573,7 +573,7 @@ public class OperatorDiscoveryTest
     private long longProp;
     private double doubleProp;
     private boolean booleanProp;
-    
+
     private Integer integerProp;
     private List<String> stringList;
     private List<Structured> nestedList;
@@ -612,39 +612,39 @@ public class OperatorDiscoveryTest
     {
       return mProp;
     }
-    
+
     public String getAlias()
     {
       return realName;
     }
-    
+
     public void setAlias(String alias)
     {
       realName = alias;
     }
-    
+
     public String getGetterOnly()
     {
       return getterOnly;
     }
-    
-    
+
+
     public URI getUri()
     {
       return uri;
     }
-    
+
     public void setUri(URI uri)
     {
       this.uri = uri;
     }
-    
-    
+
+
     public void setIntegerProp(Integer integerProp)
     {
       this.integerProp = integerProp;
     }
-    
+
     public Integer getIntegerProp()
     {
       return integerProp;
@@ -734,7 +734,7 @@ public class OperatorDiscoveryTest
     {
       return stringArray;
     }
-    
+
     public void setStringArray(String[] stringArray)
     {
       this.stringArray = stringArray;
@@ -858,15 +858,15 @@ public class OperatorDiscoveryTest
   static class ExtendedOperator extends TestOperator<String, Map<String, Number>>
   {
   }
-  
+
   public static class BaseClass<A, B, C>
   {
     private A a;
-    
+
     private B b;
 
     private C c;
-    
+
     public void setA(A a)
     {
       this.a = a;
@@ -875,12 +875,12 @@ public class OperatorDiscoveryTest
     {
       this.b = b;
     }
-    
+
     public A getA()
     {
       return a;
     }
-    
+
     public B getB()
     {
       return b;
@@ -890,7 +890,7 @@ public class OperatorDiscoveryTest
     {
       this.c = c;
     }
-    
+
     public C getC()
     {
       return c;
@@ -900,28 +900,28 @@ public class OperatorDiscoveryTest
   public static class SubClass<D, A extends Number> extends BaseClass<Number, A, D>
   {
     private D d;
-    
+
     public void setD(D d)
     {
       this.d = d;
     }
-    
+
     public D getD()
     {
       return d;
     }
-    
+
   }
 
   public static class SubSubClass<E extends Runnable> extends SubClass<List<String>, Long>
   {
     private E e;
-    
+
     public void setE(E e)
     {
       this.e = e;
     }
-    
+
     public E getE()
     {
       return e;
@@ -975,7 +975,7 @@ public class OperatorDiscoveryTest
     Assert.assertArrayEquals(ah.intArray, clone.intArray);
 
   }
-  
+
   @Test
   public void testLogicalPlanConfiguration() throws Exception
   {
@@ -995,13 +995,13 @@ public class OperatorDiscoveryTest
     ObjectMapper mapper = ObjectMapperFactory.getOperatorValueSerializer();
     String s = mapper.writeValueAsString(bean);
 //    LOG.debug(new JSONObject(s).toString(2));
-    // 
+    //
     Assert.assertTrue("Shouldn't contain field 'realName' !", !s.contains("realName"));
     Assert.assertTrue("Should contain property 'alias' !", s.contains("alias"));
     Assert.assertTrue("Shouldn't contain property 'getterOnly' !", !s.contains("getterOnly"));
     JSONObject jsonObj = new JSONObject(s);
-    
-    // create the json dag representation 
+
+    // create the json dag representation
     JSONObject jsonPlan = new JSONObject();
     jsonPlan.put("streams", new JSONArray());
     JSONObject jsonOper = new JSONObject();
@@ -1009,17 +1009,17 @@ public class OperatorDiscoveryTest
     jsonOper.put("class", TestOperator.class.getName());
     jsonOper.put("properties", jsonObj);
     jsonPlan.put("operators", new JSONArray(Lists.newArrayList(jsonOper)));
-    
-    
+
+
     Configuration conf = new Configuration(false);
     LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(conf);
-    // create logical plan from the json 
+    // create logical plan from the json
     LogicalPlan lp = lpc.createFromJson(jsonPlan, "jsontest");
     OperatorMeta om = lp.getOperatorMeta("Test Operator");
     Assert.assertTrue(om.getOperator() instanceof TestOperator);
     @SuppressWarnings("rawtypes")
     TestOperator beanBack = (TestOperator) om.getOperator();
-    
+
     // The operator deserialized back from json should be same as original operator
     Assert.assertEquals(bean.map, beanBack.map);
     Assert.assertArrayEquals(bean.stringArray, beanBack.stringArray);
@@ -1031,8 +1031,8 @@ public class OperatorDiscoveryTest
     Assert.assertEquals(bean.booleanProp, beanBack.booleanProp);
     Assert.assertEquals(bean.realName, beanBack.realName);
     Assert.assertEquals(bean.getterOnly, beanBack.getterOnly);
-    
-    
+
+
   }
 
   public static class SchemaRequiredOperator extends BaseOperator implements InputOperator