You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2015/09/10 00:12:42 UTC
[11/50] incubator-apex-core git commit: Deprecated name property from
BaseOperator
Deprecated name property from BaseOperator
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/fe5d0356
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/fe5d0356
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/fe5d0356
Branch: refs/heads/master
Commit: fe5d03560804a151e4c415939f3ca7edbd686b15
Parents: 19d6658
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Mon Aug 3 14:16:52 2015 -0700
Committer: Timothy Farkas <ti...@datatorrent.com>
Committed: Thu Aug 6 09:37:59 2015 -0700
----------------------------------------------------------------------
.../datatorrent/common/util/BaseOperator.java | 2 +
engine/pom.xml | 10 ++-
.../stram/plan/logical/LogicalPlan.java | 6 +-
.../com/datatorrent/stram/StreamCodecTest.java | 45 +++++------
.../stram/StreamingContainerManagerTest.java | 12 +--
.../datatorrent/stram/plan/LogicalPlanTest.java | 16 ++--
.../stram/plan/physical/PhysicalPlanTest.java | 9 +--
.../stram/webapp/OperatorDiscoveryTest.java | 78 ++++++++++----------
8 files changed, 91 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fe5d0356/common/src/main/java/com/datatorrent/common/util/BaseOperator.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/BaseOperator.java b/common/src/main/java/com/datatorrent/common/util/BaseOperator.java
index 0c2f8b3..f653d14 100644
--- a/common/src/main/java/com/datatorrent/common/util/BaseOperator.java
+++ b/common/src/main/java/com/datatorrent/common/util/BaseOperator.java
@@ -33,6 +33,7 @@ public class BaseOperator implements Operator
/**
* @return the name property of the operator.
*/
+ @Deprecated
public String getName()
{
return name;
@@ -43,6 +44,7 @@ public class BaseOperator implements Operator
*
* @param name
*/
+ @Deprecated
public void setName(String name)
{
this.name = name;
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fe5d0356/engine/pom.xml
----------------------------------------------------------------------
diff --git a/engine/pom.xml b/engine/pom.xml
index 1e6a7ed..c91265c 100644
--- a/engine/pom.xml
+++ b/engine/pom.xml
@@ -229,9 +229,15 @@
</dependency>
<dependency>
<groupId>org.mockito</groupId>
- <artifactId>mockito-all</artifactId>
- <version>1.8.5</version>
+ <artifactId>mockito-core</artifactId>
+ <version>1.10.19</version>
<scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-core</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>net.lingala.zip4j</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fe5d0356/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index fc182cd..9bfc2bd 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -854,10 +854,6 @@ public class LogicalPlan implements Serializable, DAG
@Override
public <T extends Operator> T addOperator(String name, T operator)
{
- // TODO: optional interface to provide contextual information to instance
- if (operator instanceof BaseOperator) {
- ((BaseOperator)operator).setName(name);
- }
if (operators.containsKey(name)) {
if (operators.get(name) == (Object)operator) {
return operator;
@@ -1219,7 +1215,7 @@ public class LogicalPlan implements Serializable, DAG
}
}
- // Validate root operators are input operators
+ // Validate root operators are input operators
for (OperatorMeta om : this.rootOperators) {
if (!(om.getOperator() instanceof InputOperator)) {
throw new ValidationException(String.format("Root operator: %s is not a Input operator",
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fe5d0356/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
index 046425f..9726e65 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
@@ -73,14 +73,14 @@ public class StreamCodecTest
LogicalPlan.OperatorMeta n2meta = dag.getMeta(node2);
LogicalPlan.OperatorMeta n3meta = dag.getMeta(node3);
- OperatorDeployInfo n1di = getSingleOperatorDeployInfo(node1, node1.getName(), dnm);
+ OperatorDeployInfo n1di = getSingleOperatorDeployInfo(node1, dnm);
OperatorDeployInfo.OutputDeployInfo n1odi = getOutputDeployInfo(n1di, n1meta.getMeta(node1.outport1));
String id = n1meta.getName() + " " + n1odi.portName;
Assert.assertEquals("number stream codecs " + id, n1odi.streamCodecs.size(), 1);
Assert.assertTrue("No user set stream codec", n1odi.streamCodecs.containsValue(null));
- OperatorDeployInfo n2di = getSingleOperatorDeployInfo(node2, node2.getName(), dnm);
+ OperatorDeployInfo n2di = getSingleOperatorDeployInfo(node2, dnm);
OperatorDeployInfo.InputDeployInfo n2idi = getInputDeployInfo(n2di, n2meta.getMeta(node2.inport1));
id = n2meta.getName() + " " + n2idi.portName;
@@ -93,7 +93,7 @@ public class StreamCodecTest
checkPresentStreamCodec(n3meta, node3.inport1, n2odi.streamCodecs, id, plan);
- OperatorDeployInfo n3di = getSingleOperatorDeployInfo(node3, node3.getName(), dnm);
+ OperatorDeployInfo n3di = getSingleOperatorDeployInfo(node3, dnm);
OperatorDeployInfo.InputDeployInfo n3idi = getInputDeployInfo(n3di, n3meta.getMeta(node3.inport1));
id = n3meta.getName() + " " + n3idi.portName;
@@ -139,12 +139,12 @@ public class StreamCodecTest
StreamingContainerManagerTest.assignContainer(dnm, "container" + (i + 1));
}
- getSingleOperatorDeployInfo(node1, node1.getName(), dnm);
- getSingleOperatorDeployInfo(node2, node2.getName(), dnm);
- getSingleOperatorDeployInfo(node3, node3.getName(), dnm);
- getSingleOperatorDeployInfo(node4, node4.getName(), dnm);
- getSingleOperatorDeployInfo(node5, node5.getName(), dnm);
- getSingleOperatorDeployInfo(node6, node6.getName(), dnm);
+ getSingleOperatorDeployInfo(node1, dnm);
+ getSingleOperatorDeployInfo(node2, dnm);
+ getSingleOperatorDeployInfo(node3, dnm);
+ getSingleOperatorDeployInfo(node4, dnm);
+ getSingleOperatorDeployInfo(node5, dnm);
+ getSingleOperatorDeployInfo(node6, dnm);
Assert.assertEquals("number of stream codec identifiers", 3, plan.getStreamCodecIdentifiers().size());
}
@@ -180,14 +180,14 @@ public class StreamCodecTest
LogicalPlan.OperatorMeta n2meta = dag.getMeta(node2);
LogicalPlan.OperatorMeta n3meta = dag.getMeta(node3);
- OperatorDeployInfo n1di = getSingleOperatorDeployInfo(node1, node1.getName(), dnm);
+ OperatorDeployInfo n1di = getSingleOperatorDeployInfo(node1, dnm);
OperatorDeployInfo.OutputDeployInfo n1odi = getOutputDeployInfo(n1di, n1meta.getMeta(node1.outport1));
String id = n1meta.getName() + " " + n1odi.portName;
Assert.assertEquals("number stream codecs " + id, n1odi.streamCodecs.size(), 1);
checkPresentStreamCodec(n2meta, node2.inportWithCodec, n1odi.streamCodecs, id, plan);
- OperatorDeployInfo n2di = getSingleOperatorDeployInfo(node2, node2.getName(), dnm);
+ OperatorDeployInfo n2di = getSingleOperatorDeployInfo(node2, dnm);
OperatorDeployInfo.InputDeployInfo n2idi = getInputDeployInfo(n2di, n2meta.getMeta(node2.inportWithCodec));
id = n2meta.getName() + " " + n2idi.portName;
@@ -199,7 +199,7 @@ public class StreamCodecTest
Assert.assertEquals("number stream codecs " + id, n2odi.streamCodecs.size(), 1);
checkPresentStreamCodec(n3meta, node3.inportWithCodec, n2odi.streamCodecs, id, plan);
- OperatorDeployInfo n3di = getSingleOperatorDeployInfo(node3, node3.getName(), dnm);
+ OperatorDeployInfo n3di = getSingleOperatorDeployInfo(node3, dnm);
OperatorDeployInfo.InputDeployInfo n3idi = getInputDeployInfo(n3di, n3meta.getMeta(node3.inportWithCodec));
id = n3meta.getName() + " " + n3idi.portName;
@@ -238,7 +238,7 @@ public class StreamCodecTest
LogicalPlan.OperatorMeta n1meta = dag.getMeta(node1);
LogicalPlan.OperatorMeta n2meta = dag.getMeta(node2);
- OperatorDeployInfo n1di = getSingleOperatorDeployInfo(node1, node1.getName(), dnm);
+ OperatorDeployInfo n1di = getSingleOperatorDeployInfo(node1, dnm);
OperatorDeployInfo.OutputDeployInfo n1odi = getOutputDeployInfo(n1di, n1meta.getMeta(node1.outport1));
String id = n1meta.getName() + " " + n1odi.portName;
@@ -449,21 +449,21 @@ public class StreamCodecTest
LogicalPlan.OperatorMeta n2meta = dag.getMeta(node2);
LogicalPlan.OperatorMeta n3meta = dag.getMeta(node3);
- OperatorDeployInfo n1di = getSingleOperatorDeployInfo(node1, node1.getName(), dnm);
+ OperatorDeployInfo n1di = getSingleOperatorDeployInfo(node1, dnm);
OperatorDeployInfo.OutputDeployInfo n1odi = getOutputDeployInfo(n1di, n1meta.getMeta(node1.outport1));
String id = n1meta.getName() + " " + n1odi.portName;
Assert.assertEquals("number stream codecs " + id, n1odi.streamCodecs.size(), 1);
checkPresentStreamCodec(n2meta, node2.inport1, n1odi.streamCodecs, id, plan);
- OperatorDeployInfo n2di = getSingleOperatorDeployInfo(node2, node2.getName(), dnm);
+ OperatorDeployInfo n2di = getSingleOperatorDeployInfo(node2, dnm);
OperatorDeployInfo.InputDeployInfo n2idi = getInputDeployInfo(n2di, n2meta.getMeta(node2.inport1));
id = n2meta.getName() + " " + n2idi.portName;
Assert.assertEquals("number stream codecs " + id, n2idi.streamCodecs.size(), 1);
checkPresentStreamCodec(n2meta, node2.inport1, n2idi.streamCodecs, id, plan);
- OperatorDeployInfo n3di = getSingleOperatorDeployInfo(node3, node3.getName(), dnm);
+ OperatorDeployInfo n3di = getSingleOperatorDeployInfo(node3, dnm);
OperatorDeployInfo.InputDeployInfo n3idi = getInputDeployInfo(n3di, n3meta.getMeta(node3.inport1));
id = n3meta.getName() + " " + n3idi.portName;
@@ -584,7 +584,7 @@ public class StreamCodecTest
LogicalPlan.OperatorMeta n2meta = dag.getMeta(node2);
LogicalPlan.OperatorMeta n3meta = dag.getMeta(node3);
- OperatorDeployInfo n1di = getSingleOperatorDeployInfo(node1, node1.getName(), dnm);
+ OperatorDeployInfo n1di = getSingleOperatorDeployInfo(node1, dnm);
OperatorDeployInfo.OutputDeployInfo n1odi = getOutputDeployInfo(n1di, n1meta.getMeta(node1.outport1));
String id = n1meta.getName() + " " + n1odi.portName;
@@ -592,14 +592,14 @@ public class StreamCodecTest
checkPresentStreamCodec(n2meta, node2.inport1, n1odi.streamCodecs, id, plan);
checkPresentStreamCodec(n3meta, node3.inport1, n1odi.streamCodecs, id, plan);
- OperatorDeployInfo n2di = getSingleOperatorDeployInfo(node2, node2.getName(), dnm);
+ OperatorDeployInfo n2di = getSingleOperatorDeployInfo(node2, dnm);
OperatorDeployInfo.InputDeployInfo n2idi = getInputDeployInfo(n2di, n2meta.getMeta(node2.inport1));
id = n2meta.getName() + " " + n2idi.portName;
Assert.assertEquals("number stream codecs " + id, n2idi.streamCodecs.size(), 1);
checkPresentStreamCodec(n2meta, node2.inport1, n2idi.streamCodecs, id, plan);
- OperatorDeployInfo n3di = getSingleOperatorDeployInfo(node3, node3.getName(), dnm);
+ OperatorDeployInfo n3di = getSingleOperatorDeployInfo(node3, dnm);
OperatorDeployInfo.InputDeployInfo n3idi = getInputDeployInfo(n3di, n3meta.getMeta(node3.inport1));
id = n3meta.getName() + " " + n3idi.portName;
@@ -855,14 +855,14 @@ public class StreamCodecTest
Assert.assertNotNull("non inline operator is null", nonInlineOperator);
- OperatorDeployInfo n1di = getSingleOperatorDeployInfo(node1, node1.getName(), dnm);
+ OperatorDeployInfo n1di = getSingleOperatorDeployInfo(node1, dnm);
OperatorDeployInfo.OutputDeployInfo n1odi = getOutputDeployInfo(n1di, n1meta.getMeta(node1.outport1));
String id = n1meta.getName() + " " + n1odi.portName;
Assert.assertEquals("number stream codecs " + id, n1odi.streamCodecs.size(), 1);
checkPresentStreamCodec(nonInlineMeta, niInputPort, n1odi.streamCodecs, id, plan);
- OperatorDeployInfo odi = getSingleOperatorDeployInfo(nonInlineOperator, nonInlineOperator.getName(), dnm);
+ OperatorDeployInfo odi = getSingleOperatorDeployInfo(nonInlineOperator, dnm);
OperatorDeployInfo.InputDeployInfo idi = getInputDeployInfo(odi, nonInlineMeta.getMeta(niInputPort));
id = nonInlineMeta.getName() + " " + idi.portName;
@@ -1218,9 +1218,10 @@ public class StreamCodecTest
Assert.assertEquals("stream codec not same " + id, opStreamCodecInfo, streamCodecInfo);
}
- private OperatorDeployInfo getSingleOperatorDeployInfo(Operator oper, String id, StreamingContainerManager scm)
+ private OperatorDeployInfo getSingleOperatorDeployInfo(Operator oper, StreamingContainerManager scm)
{
LogicalPlan dag = scm.getLogicalPlan();
+ String id = dag.getMeta(oper).toString();
PhysicalPlan plan = scm.getPhysicalPlan();
List<PTOperator> operators = plan.getOperators(dag.getMeta(oper));
Assert.assertEquals("number of operators " + id, 1, operators.size());
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fe5d0356/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
index ba15a78..38a54f0 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
@@ -188,8 +188,8 @@ public class StreamingContainerManagerTest {
Assert.assertEquals("number operators assigned to container", 3, c2.size());
OperatorDeployInfo o2DI = getNodeDeployInfo(c2, dag.getMeta(o2));
OperatorDeployInfo o3DI = getNodeDeployInfo(c2, dag.getMeta(o3));
- Assert.assertNotNull(o2.getName() + " assigned to " + sca2.container.getExternalId(), o2DI);
- Assert.assertNotNull(o3.getName() + " assigned to " + sca2.container.getExternalId(), o3DI);
+ Assert.assertNotNull(dag.getMeta(o2) + " assigned to " + sca2.container.getExternalId(), o2DI);
+ Assert.assertNotNull(dag.getMeta(o3) + " assigned to " + sca2.container.getExternalId(), o3DI);
Assert.assertTrue("The buffer server memory for container 1", 256 == sca1.getInitContext().getValue(ContainerContext.BUFFER_SERVER_MB));
Assert.assertTrue("The buffer server memory for container 2", 0 == sca2.getInitContext().getValue(ContainerContext.BUFFER_SERVER_MB));
@@ -218,7 +218,7 @@ public class StreamingContainerManagerTest {
// THREAD_LOCAL o4.inport1
OperatorDeployInfo o4DI = getNodeDeployInfo(c2, dag.getMeta(o4));
- Assert.assertNotNull(o4.getName() + " assigned to " + sca2.container.getExternalId(), o4DI);
+ Assert.assertNotNull(dag.getMeta(o4) + " assigned to " + sca2.container.getExternalId(), o4DI);
InputDeployInfo c2o4i1 = getInputDeployInfo(o4DI, "o3.outport1");
Assert.assertNotNull("input from o3.outport1", c2o4i1);
Assert.assertEquals("portName " + c2o4i1, GenericTestOperator.IPORT1, c2o4i1.portName);
@@ -271,7 +271,7 @@ public class StreamingContainerManagerTest {
StreamingContainerAgent sca1 = dnm.getContainerAgent(c.getExternalId());
List<OperatorDeployInfo> c1 = getDeployInfo(sca1);
Assert.assertEquals("number operators assigned to container", 1, c1.size());
- Assert.assertTrue(node2.getName() + " assigned to " + sca1.container.getExternalId(), containsNodeContext(c1, dag.getMeta(node1)));
+ Assert.assertTrue(dag.getMeta(node2) + " assigned to " + sca1.container.getExternalId(), containsNodeContext(c1, dag.getMeta(node1)));
List<PTOperator> o2Partitions = plan.getOperators(dag.getMeta(node2));
Assert.assertEquals("number partitions", TestStaticPartitioningSerDe.partitions.length, o2Partitions.size());
@@ -280,7 +280,7 @@ public class StreamingContainerManagerTest {
String containerId = o2Partitions.get(i).getContainer().getExternalId();
List<OperatorDeployInfo> cc = getDeployInfo(dnm.getContainerAgent(containerId));
Assert.assertEquals("number operators assigned to container", 1, cc.size());
- Assert.assertTrue(node2.getName() + " assigned to " + containerId, containsNodeContext(cc, dag.getMeta(node2)));
+ Assert.assertTrue(dag.getMeta(node2) + " assigned to " + containerId, containsNodeContext(cc, dag.getMeta(node2)));
// n1n2 in, mergeStream out
OperatorDeployInfo ndi = cc.get(0);
@@ -338,7 +338,7 @@ public class StreamingContainerManagerTest {
Assert.assertEquals("number operators " + cmerge, 1, cmerge.size());
OperatorDeployInfo node3DI = getNodeDeployInfo(cmerge, dag.getMeta(node3));
- Assert.assertNotNull(node3.getName() + " assigned", node3DI);
+ Assert.assertNotNull(dag.getMeta(node3) + " assigned", node3DI);
Assert.assertEquals("inputs " + node3DI, 1, node3DI.inputs.size());
InputDeployInfo node3In = node3DI.inputs.get(0);
Assert.assertEquals("streamName " + node3In, n2n3.getName(), node3In.declaredStreamId);
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fe5d0356/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanTest.java b/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanTest.java
index 24a9031..5bda8ee 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanTest.java
@@ -89,16 +89,16 @@ public class LogicalPlanTest {
dag.findStronglyConnected(dag.getMeta(operator7), cycles);
assertEquals("operator self reference", 1, cycles.size());
assertEquals("operator self reference", 1, cycles.get(0).size());
- assertEquals("operator self reference", operator7.getName(), cycles.get(0).get(0));
+ assertEquals("operator self reference", dag.getMeta(operator7).getName(), cycles.get(0).get(0));
// 3 operator cycle
cycles.clear();
dag.findStronglyConnected(dag.getMeta(operator4), cycles);
assertEquals("3 operator cycle", 1, cycles.size());
assertEquals("3 operator cycle", 3, cycles.get(0).size());
- assertTrue("operator2", cycles.get(0).contains(operator2.getName()));
- assertTrue("operator3", cycles.get(0).contains(operator3.getName()));
- assertTrue("operator4", cycles.get(0).contains(operator4.getName()));
+ assertTrue("operator2", cycles.get(0).contains(dag.getMeta(operator2).getName()));
+ assertTrue("operator3", cycles.get(0).contains(dag.getMeta(operator3).getName()));
+ assertTrue("operator4", cycles.get(0).contains(dag.getMeta(operator4).getName()));
try {
dag.validate();
@@ -294,7 +294,7 @@ public class LogicalPlanTest {
Assert.fail("should throw ConstraintViolationException");
} catch (ConstraintViolationException e) {
Assert.assertEquals("violation details", constraintViolations, e.getConstraintViolations());
- String expRegex = ".*ValidationTestOperator\\{name=testOperator}, propertyPath='intField1', message='must be greater than or equal to 2',.*value=1}]";
+ String expRegex = ".*ValidationTestOperator\\{name=null}, propertyPath='intField1', message='must be greater than or equal to 2',.*value=1}]";
Assert.assertThat("exception message", e.getMessage(), RegexMatcher.matches(expRegex));
}
@@ -396,7 +396,7 @@ public class LogicalPlanTest {
dag.validate();
Assert.fail("should raise operator is not partitionable for operator1");
} catch (ValidationException e) {
- Assert.assertEquals("", "Operator " + operator.getName() + " provides partitioning capabilities but the annotation on the operator class declares it non partitionable!", e.getMessage());
+ Assert.assertEquals("", "Operator " + dag.getMeta(operator).getName() + " provides partitioning capabilities but the annotation on the operator class declares it non partitionable!", e.getMessage());
}
dag.setAttribute(operator, OperatorContext.PARTITIONER, null);
@@ -406,7 +406,7 @@ public class LogicalPlanTest {
dag.validate();
Assert.fail("should raise operator is not partitionable for operator1");
} catch (ValidationException e) {
- Assert.assertEquals("", "Operator " + operator.getName() + " is not partitionable but PARTITION_PARALLEL attribute is set", e.getMessage());
+ Assert.assertEquals("", "Operator " + dag.getMeta(operator).getName() + " is not partitionable but PARTITION_PARALLEL attribute is set", e.getMessage());
}
dag.setInputPortAttribute(operator.input1, PortContext.PARTITION_PARALLEL, false);
@@ -419,7 +419,7 @@ public class LogicalPlanTest {
dag.validate();
Assert.fail("should raise operator is not partitionable for operator2");
} catch (ValidationException e) {
- Assert.assertEquals("Operator " + operator2.getName() + " provides partitioning capabilities but the annotation on the operator class declares it non partitionable!", e.getMessage());
+ Assert.assertEquals("Operator " + dag.getMeta(operator2).getName() + " provides partitioning capabilities but the annotation on the operator class declares it non partitionable!", e.getMessage());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fe5d0356/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java b/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
index 9382a4b..ccf930f 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/physical/PhysicalPlanTest.java
@@ -180,7 +180,6 @@ public class PhysicalPlanTest
p.getPartitionKeys().put(this.inport1, lpks);
p.getPartitionKeys().put(this.inportWithCodec, lpks);
p.getPartitionedInstance().pks = p.getPartitionKeys().values().toString();
- p.getPartitionedInstance().setName(p.getPartitionKeys().values().toString());
newPartitions.add(p);
}
@@ -252,7 +251,7 @@ public class PhysicalPlanTest
dag.addStream("node1.outport1", node1.outport1, node2.inport2, node2.inport1);
int initialPartitionCount = 5;
- OperatorMeta node2Decl = dag.getOperatorMeta(node2.getName());
+ OperatorMeta node2Decl = dag.getMeta(node2);
node2Decl.getAttributes().put(OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(initialPartitionCount));
PhysicalPlan plan = new PhysicalPlan(dag, new TestPlanContext());
@@ -350,7 +349,7 @@ public class PhysicalPlanTest
dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 2);
- OperatorMeta o2Meta = dag.getOperatorMeta(o2.getName());
+ OperatorMeta o2Meta = dag.getMeta(o2);
o2Meta.getAttributes().put(OperatorContext.STATS_LISTENERS,
Lists.newArrayList((StatsListener) new PartitionLoadWatch(0, 5)));
o2Meta.getAttributes().put(OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(1));
@@ -441,7 +440,7 @@ public class PhysicalPlanTest
public void testInputOperatorPartitioning() {
LogicalPlan dag = new LogicalPlan();
TestInputOperator<Object> o1 = dag.addOperator("o1", new TestInputOperator<Object>());
- OperatorMeta o1Meta = dag.getOperatorMeta(o1.getName());
+ OperatorMeta o1Meta = dag.getMeta(o1);
dag.setAttribute(o1, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitioningTest.PartitionLoadWatch()}));
dag.setAttribute(o1, OperatorContext.PARTITIONER, new StatelessPartitioner<TestInputOperator<Object>>(2));
@@ -509,7 +508,7 @@ public class PhysicalPlanTest
dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 2);
- OperatorMeta node2Meta = dag.getOperatorMeta(o2.getName());
+ OperatorMeta node2Meta = dag.getMeta(o2);
node2Meta.getAttributes().put(OperatorContext.STATS_LISTENERS,
Lists.newArrayList((StatsListener) new PartitionLoadWatch(3, 5)));
node2Meta.getAttributes().put(OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(8));
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/fe5d0356/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java b/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java
index ad915c8..8baa08a 100644
--- a/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java
@@ -57,7 +57,7 @@ import com.google.common.collect.Lists;
public class OperatorDiscoveryTest
{
// private static final Logger LOG = LoggerFactory.getLogger(OperatorDiscoveryTest.class);
-
+
public static class GenericClassBase<T> extends BaseOperator
{
private int A;
@@ -271,7 +271,7 @@ public class OperatorDiscoveryTest
@Test
public void testPropertyDiscovery() throws Exception
{
-
+
String[] classFilePath = getClassFileInClasspath();
OperatorDiscoverer od = new OperatorDiscoverer(classFilePath);
od.buildTypeGraph();
@@ -343,8 +343,8 @@ public class OperatorDiscoveryTest
props = desc.getJSONArray("properties");
genericArray = getJSONProperty(props, "genericArray");
Assert.assertEquals(debug + "type " + genericArray, String[].class.getName(), genericArray.get("type"));
-
-
+
+
// Test complicated Type Variable override in Hierarchy
desc = od.describeClassByASM(SubSubClass.class.getName());
props = desc.getJSONArray("properties");
@@ -554,9 +554,9 @@ public class OperatorDiscoveryTest
return false;
return true;
}
-
-
-
+
+
+
}
@@ -573,7 +573,7 @@ public class OperatorDiscoveryTest
private long longProp;
private double doubleProp;
private boolean booleanProp;
-
+
private Integer integerProp;
private List<String> stringList;
private List<Structured> nestedList;
@@ -612,39 +612,39 @@ public class OperatorDiscoveryTest
{
return mProp;
}
-
+
public String getAlias()
{
return realName;
}
-
+
public void setAlias(String alias)
{
realName = alias;
}
-
+
public String getGetterOnly()
{
return getterOnly;
}
-
-
+
+
public URI getUri()
{
return uri;
}
-
+
public void setUri(URI uri)
{
this.uri = uri;
}
-
-
+
+
public void setIntegerProp(Integer integerProp)
{
this.integerProp = integerProp;
}
-
+
public Integer getIntegerProp()
{
return integerProp;
@@ -734,7 +734,7 @@ public class OperatorDiscoveryTest
{
return stringArray;
}
-
+
public void setStringArray(String[] stringArray)
{
this.stringArray = stringArray;
@@ -858,15 +858,15 @@ public class OperatorDiscoveryTest
static class ExtendedOperator extends TestOperator<String, Map<String, Number>>
{
}
-
+
public static class BaseClass<A, B, C>
{
private A a;
-
+
private B b;
private C c;
-
+
public void setA(A a)
{
this.a = a;
@@ -875,12 +875,12 @@ public class OperatorDiscoveryTest
{
this.b = b;
}
-
+
public A getA()
{
return a;
}
-
+
public B getB()
{
return b;
@@ -890,7 +890,7 @@ public class OperatorDiscoveryTest
{
this.c = c;
}
-
+
public C getC()
{
return c;
@@ -900,28 +900,28 @@ public class OperatorDiscoveryTest
public static class SubClass<D, A extends Number> extends BaseClass<Number, A, D>
{
private D d;
-
+
public void setD(D d)
{
this.d = d;
}
-
+
public D getD()
{
return d;
}
-
+
}
public static class SubSubClass<E extends Runnable> extends SubClass<List<String>, Long>
{
private E e;
-
+
public void setE(E e)
{
this.e = e;
}
-
+
public E getE()
{
return e;
@@ -975,7 +975,7 @@ public class OperatorDiscoveryTest
Assert.assertArrayEquals(ah.intArray, clone.intArray);
}
-
+
@Test
public void testLogicalPlanConfiguration() throws Exception
{
@@ -995,13 +995,13 @@ public class OperatorDiscoveryTest
ObjectMapper mapper = ObjectMapperFactory.getOperatorValueSerializer();
String s = mapper.writeValueAsString(bean);
// LOG.debug(new JSONObject(s).toString(2));
- //
+ //
Assert.assertTrue("Shouldn't contain field 'realName' !", !s.contains("realName"));
Assert.assertTrue("Should contain property 'alias' !", s.contains("alias"));
Assert.assertTrue("Shouldn't contain property 'getterOnly' !", !s.contains("getterOnly"));
JSONObject jsonObj = new JSONObject(s);
-
- // create the json dag representation
+
+ // create the json dag representation
JSONObject jsonPlan = new JSONObject();
jsonPlan.put("streams", new JSONArray());
JSONObject jsonOper = new JSONObject();
@@ -1009,17 +1009,17 @@ public class OperatorDiscoveryTest
jsonOper.put("class", TestOperator.class.getName());
jsonOper.put("properties", jsonObj);
jsonPlan.put("operators", new JSONArray(Lists.newArrayList(jsonOper)));
-
-
+
+
Configuration conf = new Configuration(false);
LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(conf);
- // create logical plan from the json
+ // create logical plan from the json
LogicalPlan lp = lpc.createFromJson(jsonPlan, "jsontest");
OperatorMeta om = lp.getOperatorMeta("Test Operator");
Assert.assertTrue(om.getOperator() instanceof TestOperator);
@SuppressWarnings("rawtypes")
TestOperator beanBack = (TestOperator) om.getOperator();
-
+
// The operator deserialized back from json should be same as original operator
Assert.assertEquals(bean.map, beanBack.map);
Assert.assertArrayEquals(bean.stringArray, beanBack.stringArray);
@@ -1031,8 +1031,8 @@ public class OperatorDiscoveryTest
Assert.assertEquals(bean.booleanProp, beanBack.booleanProp);
Assert.assertEquals(bean.realName, beanBack.realName);
Assert.assertEquals(bean.getterOnly, beanBack.getterOnly);
-
-
+
+
}
public static class SchemaRequiredOperator extends BaseOperator implements InputOperator