You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2017/04/13 16:05:18 UTC
[1/2] apex-core git commit: APEXCORE-511 add null and empty checks
for addOperator, addStream and addModule
Repository: apex-core
Updated Branches:
refs/heads/master ca1a375f9 -> f63e01d14
APEXCORE-511 add null and empty checks for addOperator, addStream and addModule
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/2ce4ae51
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/2ce4ae51
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/2ce4ae51
Branch: refs/heads/master
Commit: 2ce4ae515ddfadddace260839b634cf122653a29
Parents: 077009e
Author: Oliver Winke <ol...@datatorrent.com>
Authored: Tue Apr 11 17:27:35 2017 -0700
Committer: Oliver Winke <ol...@datatorrent.com>
Committed: Wed Apr 12 10:25:17 2017 -0700
----------------------------------------------------------------------
api/src/main/java/com/datatorrent/api/DAG.java | 18 +++--
.../stram/plan/logical/LogicalPlan.java | 29 ++++---
.../stram/plan/logical/LogicalPlanTest.java | 82 ++++++++++++--------
3 files changed, 79 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-core/blob/2ce4ae51/api/src/main/java/com/datatorrent/api/DAG.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/DAG.java b/api/src/main/java/com/datatorrent/api/DAG.java
index 96420a3..93936d7 100644
--- a/api/src/main/java/com/datatorrent/api/DAG.java
+++ b/api/src/main/java/com/datatorrent/api/DAG.java
@@ -22,6 +22,8 @@ import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
+import javax.annotation.Nonnull;
+
import org.apache.hadoop.classification.InterfaceStability;
import com.datatorrent.api.Context.DAGContext;
@@ -216,7 +218,7 @@ public interface DAG extends DAGContext, Serializable
* @param clazz Concrete class with default constructor so that instance of it can be initialized and added to the DAG.
* @return Instance of the operator that has been added to the DAG.
*/
- <T extends Operator> T addOperator(String name, Class<T> clazz);
+ <T extends Operator> T addOperator(@Nonnull String name, Class<T> clazz);
/**
* <p>addOperator.</p>
@@ -225,20 +227,20 @@ public interface DAG extends DAGContext, Serializable
* @param operator Instance of the operator that needs to be added to the DAG
* @return Instance of the operator that has been added to the DAG.
*/
- <T extends Operator> T addOperator(String name, T operator);
+ <T extends Operator> T addOperator(@Nonnull String name, T operator);
@InterfaceStability.Evolving
- <T extends Module> T addModule(String name, Class<T> moduleClass);
+ <T extends Module> T addModule(@Nonnull String name, Class<T> moduleClass);
@InterfaceStability.Evolving
- <T extends Module> T addModule(String name, T module);
+ <T extends Module> T addModule(@Nonnull String name, T module);
/**
* <p>addStream.</p>
* @param id Identifier of the stream that will be used to identify stream in DAG
* @return
*/
- StreamMeta addStream(String id);
+ StreamMeta addStream(@Nonnull String id);
/**
* Add identified stream for given source and sinks. Multiple sinks can be
@@ -256,7 +258,7 @@ public interface DAG extends DAGContext, Serializable
* @param sinks
* @return StreamMeta
*/
- <T> StreamMeta addStream(String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T>... sinks);
+ <T> StreamMeta addStream(@Nonnull String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T>... sinks);
/**
* Overload varargs version to avoid generic array type safety warnings in calling code.
@@ -269,12 +271,12 @@ public interface DAG extends DAGContext, Serializable
* @param sink1
* @return StreamMeta
*/
- <T> StreamMeta addStream(String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T> sink1);
+ <T> StreamMeta addStream(@Nonnull String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T> sink1);
/**
* <p>addStream.</p>
*/
- <T> StreamMeta addStream(String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T> sink1, Operator.InputPort<? super T> sink2);
+ <T> StreamMeta addStream(@Nonnull String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T> sink1, Operator.InputPort<? super T> sink2);
/**
* <p>setAttribute.</p>
http://git-wip-us.apache.org/repos/asf/apex-core/blob/2ce4ae51/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 62c4fd8..bf4b2cb 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -51,6 +51,7 @@ import java.util.Stack;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
+import javax.annotation.Nonnull;
import javax.validation.ConstraintViolation;
import javax.validation.ConstraintViolationException;
import javax.validation.Validation;
@@ -105,6 +106,9 @@ import com.datatorrent.stram.engine.Slider;
import static com.datatorrent.api.Context.PortContext.STREAM_CODEC;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Strings.isNullOrEmpty;
+
/**
* DAG contains the logical declarations of operators and streams.
* <p>
@@ -1233,7 +1237,7 @@ public class LogicalPlan implements Serializable, DAG
}
@Override
- public <T extends Operator> T addOperator(String name, Class<T> clazz)
+ public <T extends Operator> T addOperator(@Nonnull String name, Class<T> clazz)
{
T instance;
try {
@@ -1246,18 +1250,20 @@ public class LogicalPlan implements Serializable, DAG
}
@Override
- public <T extends Operator> T addOperator(String name, T operator)
+ public <T extends Operator> T addOperator(@Nonnull String name, T operator)
{
+ checkArgument(!isNullOrEmpty(name), "operator name is null or empty");
+
if (operators.containsKey(name)) {
if (operators.get(name).operator == operator) {
return operator;
}
- throw new IllegalArgumentException("duplicate operator id: " + operators.get(name));
+ throw new IllegalArgumentException("duplicate operator name: " + operators.get(name));
}
// Avoid name conflict with module.
if (modules.containsKey(name)) {
- throw new IllegalArgumentException("duplicate operator id: " + operators.get(name));
+ throw new IllegalArgumentException("duplicate operator name: " + operators.get(name));
}
OperatorMeta decl = new OperatorMeta(name, operator);
rootOperators.add(decl); // will be removed when a sink is added to an input port for this operator
@@ -1347,16 +1353,18 @@ public class LogicalPlan implements Serializable, DAG
}
@Override
- public <T extends Module> T addModule(String name, T module)
+ public <T extends Module> T addModule(@Nonnull String name, T module)
{
+ checkArgument(!isNullOrEmpty(name), "module name is null or empty");
+
if (modules.containsKey(name)) {
if (modules.get(name).module == module) {
return module;
}
- throw new IllegalArgumentException("duplicate module is: " + modules.get(name));
+ throw new IllegalArgumentException("duplicate module name: " + modules.get(name));
}
if (operators.containsKey(name)) {
- throw new IllegalArgumentException("duplicate module is: " + modules.get(name));
+ throw new IllegalArgumentException("duplicate module name: " + modules.get(name));
}
ModuleMeta meta = new ModuleMeta(name, module);
@@ -1365,7 +1373,7 @@ public class LogicalPlan implements Serializable, DAG
}
@Override
- public <T extends Module> T addModule(String name, Class<T> clazz)
+ public <T extends Module> T addModule(@Nonnull String name, Class<T> clazz)
{
T instance;
try {
@@ -1399,8 +1407,9 @@ public class LogicalPlan implements Serializable, DAG
}
@Override
- public StreamMeta addStream(String id)
+ public StreamMeta addStream(@Nonnull String id)
{
+ checkArgument(!isNullOrEmpty(id),"stream id is null or empty");
StreamMeta s = new StreamMeta(id);
StreamMeta o = streams.put(id, s);
if (o == null) {
@@ -1412,7 +1421,7 @@ public class LogicalPlan implements Serializable, DAG
@Override
@SuppressWarnings("unchecked")
- public <T> StreamMeta addStream(String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T>... sinks)
+ public <T> StreamMeta addStream(@Nonnull String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T>... sinks)
{
StreamMeta s = addStream(id);
s.setSource(source);
http://git-wip-us.apache.org/repos/asf/apex-core/blob/2ce4ae51/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
index 1507e2d..9f68a4f 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
@@ -42,6 +42,7 @@ import javax.validation.constraints.NotNull;
import javax.validation.constraints.Pattern;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import com.esotericsoftware.kryo.DefaultSerializer;
@@ -60,6 +61,7 @@ import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Module;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.Sink;
@@ -85,15 +87,21 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
public class LogicalPlanTest
{
+ private LogicalPlan dag;
+
+ @Before
+ public void setUp()
+ {
+ dag = new LogicalPlan();
+ }
@Test
public void testCycleDetection()
{
- LogicalPlan dag = new LogicalPlan();
-
//NodeConf operator1 = b.getOrAddNode("operator1");
GenericTestOperator operator2 = dag.addOperator("operator2", GenericTestOperator.class);
GenericTestOperator operator3 = dag.addOperator("operator3", GenericTestOperator.class);
@@ -145,8 +153,6 @@ public class LogicalPlanTest
@Test
public void testCycleDetectionWithDelay()
{
- LogicalPlan dag = new LogicalPlan();
-
TestGeneratorInputOperator opA = dag.addOperator("A", TestGeneratorInputOperator.class);
GenericTestOperator opB = dag.addOperator("B", GenericTestOperator.class);
GenericTestOperator opC = dag.addOperator("C", GenericTestOperator.class);
@@ -192,7 +198,6 @@ public class LogicalPlanTest
@Test
public void testLogicalPlanSerialization() throws Exception
{
- LogicalPlan dag = new LogicalPlan();
dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
ValidationOperator validationNode = dag.addOperator("validationNode", ValidationOperator.class);
@@ -231,7 +236,6 @@ public class LogicalPlanTest
@Test
public void testDeleteOperator()
{
- LogicalPlan dag = new LogicalPlan();
TestGeneratorInputOperator input = dag.addOperator("input1", TestGeneratorInputOperator.class);
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
@@ -359,7 +363,6 @@ public class LogicalPlanTest
Assert.assertEquals("", "intField1", cv.getPropertyPath().toString());
// ensure DAG validation produces matching results
- LogicalPlan dag = new LogicalPlan();
bean = dag.addOperator("testOperator", bean);
try {
@@ -435,7 +438,6 @@ public class LogicalPlanTest
@Test
public void testValidationForNonInputRootOperator()
{
- LogicalPlan dag = new LogicalPlan();
NoInputPortOperator x = dag.addOperator("x", new NoInputPortOperator());
try {
dag.validate();
@@ -463,7 +465,6 @@ public class LogicalPlanTest
@Test
public void testOperatorAnnotation()
{
- LogicalPlan dag = new LogicalPlan();
TestGeneratorInputOperator input = dag.addOperator("input1", TestGeneratorInputOperator.class);
TestOperatorAnnotationOperator operator = dag.addOperator("operator1", TestOperatorAnnotationOperator.class);
dag.addStream("Connection", input.outport, operator.input1);
@@ -502,11 +503,49 @@ public class LogicalPlanTest
}
}
+ @Test(expected = IllegalArgumentException.class)
+ public void testNullOperatorName()
+ {
+ dag.addOperator(null, BaseOperator.class);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testEmptyOperatorName()
+ {
+ dag.addOperator("", BaseOperator.class);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testNullStreamId()
+ {
+ GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
+ dag.addStream(null, o1.outport1, o1.inport1, o1.inport2 );
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testEmptyStreamId()
+ {
+ GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
+ dag.addStream("", o1.outport1, o1.inport1 );
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testEmptyModuleName()
+ {
+ Module testModule = mock(Module.class);
+ dag.addModule("", testModule);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testNullModuleName()
+ {
+ Module testModule = mock(Module.class);
+ dag.addModule(null, testModule);
+ }
+
@Test
public void testPortConnectionValidation()
{
- LogicalPlan dag = new LogicalPlan();
-
TestNonOptionalOutportInputOperator input = dag.addOperator("input1", TestNonOptionalOutportInputOperator.class);
try {
@@ -534,8 +573,6 @@ public class LogicalPlanTest
@Test
public void testAtMostOnceProcessingModeValidation()
{
- LogicalPlan dag = new LogicalPlan();
-
TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
TestGeneratorInputOperator input2 = dag.addOperator("input2", TestGeneratorInputOperator.class);
@@ -560,14 +597,11 @@ public class LogicalPlanTest
OperatorMeta outputOperOm = dag.getMeta(outputOper);
Assert.assertEquals("" + outputOperOm.getAttributes(), Operator.ProcessingMode.AT_MOST_ONCE, outputOperOm.getValue(OperatorContext.PROCESSING_MODE));
-
}
@Test
public void testExactlyOnceProcessingModeValidation()
{
- LogicalPlan dag = new LogicalPlan();
-
TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
TestGeneratorInputOperator input2 = dag.addOperator("input2", TestGeneratorInputOperator.class);
@@ -604,8 +638,6 @@ public class LogicalPlanTest
@Test
public void testLocalityValidation()
{
- LogicalPlan dag = new LogicalPlan();
-
TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
StreamMeta s1 = dag.addStream("input1.outport", input1.outport, o1.inport1).setLocality(Locality.THREAD_LOCAL);
@@ -668,7 +700,6 @@ public class LogicalPlanTest
@Test
public void testOutputPortAnnotation()
{
- LogicalPlan dag = new LogicalPlan();
TestAnnotationsOperator ta1 = dag.addOperator("testAnnotationsOperator", new TestAnnotationsOperator());
try {
@@ -757,7 +788,6 @@ public class LogicalPlanTest
@Test
public void testJdkSerializableOperator() throws Exception
{
- LogicalPlan dag = new LogicalPlan();
dag.addOperator("o1", new JdkSerializableOperator());
ByteArrayOutputStream outStream = new ByteArrayOutputStream();
@@ -772,7 +802,6 @@ public class LogicalPlanTest
@Test
public void testAttributeValuesSerializableCheck() throws NoSuchFieldException, SecurityException, IllegalArgumentException, IllegalAccessException
{
- LogicalPlan dag = new LogicalPlan();
Attribute<Object> attr = new Attribute<>(new TestAttributeValue(), new Object2String());
Field nameField = Attribute.class.getDeclaredField("name");
nameField.setAccessible(true);
@@ -892,7 +921,6 @@ public class LogicalPlanTest
/*
@Test
public void testStreamCodec() throws Exception {
- LogicalPlan dag = new LogicalPlan();
TestGeneratorInputOperator input = dag.addOperator("input", TestGeneratorInputOperator.class);
GenericTestOperator gto1 = dag.addOperator("gto1", GenericTestOperator.class);
StreamMeta stream1 = dag.addStream("s1", input.outport, gto1.inport1);
@@ -946,7 +974,6 @@ public class LogicalPlanTest
@Test
public void testCheckpointableWithinAppWindowAnnotation()
{
- LogicalPlan dag = new LogicalPlan();
TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
GenericTestOperator x = dag.addOperator("x", new GenericTestOperator());
dag.addStream("Stream1", input1.outport, x.inport1);
@@ -998,7 +1025,6 @@ public class LogicalPlanTest
@Test
public void testInputPortHiding()
{
- LogicalPlan dag = new LogicalPlan();
TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
Operator2 operator2 = dag.addOperator("operator2", new Operator2());
dag.addStream("Stream1", input1.outport, operator2.input);
@@ -1008,7 +1034,6 @@ public class LogicalPlanTest
@Test
public void testInvalidInputPortConnection()
{
- LogicalPlan dag = new LogicalPlan();
TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
Operator1 operator1 = dag.addOperator("operator3", new Operator3());
dag.addStream("Stream1", input1.outport, operator1.input);
@@ -1024,7 +1049,6 @@ public class LogicalPlanTest
@Test
public void testAffinityRulesDagValidation()
{
- LogicalPlan dag = new LogicalPlan();
TestGeneratorInputOperator o1 = dag.addOperator("O1", new TestGeneratorInputOperator());
GenericTestOperator o2 = dag.addOperator("O2", new GenericTestOperator());
GenericTestOperator o3 = dag.addOperator("O3", new GenericTestOperator());
@@ -1183,7 +1207,6 @@ public class LogicalPlanTest
@Test
public void testOutputPortHiding()
{
- LogicalPlan dag = new LogicalPlan();
Operator5 operator5 = dag.addOperator("input", new Operator5());
Operator2 operator2 = dag.addOperator("operator2", new Operator2());
dag.addStream("Stream1", operator5.output, operator2.input);
@@ -1193,7 +1216,6 @@ public class LogicalPlanTest
@Test(expected = ValidationException.class)
public void testInvalidOutputPortConnection()
{
- LogicalPlan dag = new LogicalPlan();
Operator4 operator4 = dag.addOperator("input", new Operator5());
Operator3 operator3 = dag.addOperator("operator3", new Operator3());
dag.addStream("Stream1", operator4.output, operator3.input);
@@ -1245,8 +1267,6 @@ public class LogicalPlanTest
@Test
public void testInvalidInputOperatorDeclaration()
{
- LogicalPlan dag = new LogicalPlan();
-
TestGeneratorInputOperator.InvalidInputOperator inputOperator = dag.addOperator("input", new TestGeneratorInputOperator.InvalidInputOperator());
GenericTestOperator operator2 = dag.addOperator("operator2", GenericTestOperator.class);
@@ -1264,8 +1284,6 @@ public class LogicalPlanTest
@Test
public void testValidInputOperatorDeclaration()
{
- LogicalPlan dag = new LogicalPlan();
-
TestGeneratorInputOperator.ValidGenericOperator operator1 = dag.addOperator("input", new TestGeneratorInputOperator.ValidGenericOperator());
GenericTestOperator operator2 = dag.addOperator("operator2", GenericTestOperator.class);
[2/2] apex-core git commit: Merge branch
'APEXCORE-511.emptyNameChecksInLogicalPlan' of
http://github.com/oliverwnk/apex-core into APEXCORE-511
Posted by vr...@apache.org.
Merge branch 'APEXCORE-511.emptyNameChecksInLogicalPlan' of http://github.com/oliverwnk/apex-core into APEXCORE-511
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/f63e01d1
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/f63e01d1
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/f63e01d1
Branch: refs/heads/master
Commit: f63e01d142f16dad34e2420dccbcc8addbbd404a
Parents: ca1a375 2ce4ae5
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Thu Apr 13 09:04:48 2017 -0700
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Thu Apr 13 09:04:48 2017 -0700
----------------------------------------------------------------------
api/src/main/java/com/datatorrent/api/DAG.java | 18 +++--
.../stram/plan/logical/LogicalPlan.java | 29 ++++---
.../stram/plan/logical/LogicalPlanTest.java | 82 ++++++++++++--------
3 files changed, 79 insertions(+), 50 deletions(-)
----------------------------------------------------------------------