You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2017/04/13 16:05:18 UTC

[1/2] apex-core git commit: APEXCORE-511 add null and empty checks for addOperator, addStream and addModule

Repository: apex-core
Updated Branches:
  refs/heads/master ca1a375f9 -> f63e01d14


APEXCORE-511 add null and empty checks for addOperator, addStream and addModule


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/2ce4ae51
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/2ce4ae51
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/2ce4ae51

Branch: refs/heads/master
Commit: 2ce4ae515ddfadddace260839b634cf122653a29
Parents: 077009e
Author: Oliver Winke <ol...@datatorrent.com>
Authored: Tue Apr 11 17:27:35 2017 -0700
Committer: Oliver Winke <ol...@datatorrent.com>
Committed: Wed Apr 12 10:25:17 2017 -0700

----------------------------------------------------------------------
 api/src/main/java/com/datatorrent/api/DAG.java  | 18 +++--
 .../stram/plan/logical/LogicalPlan.java         | 29 ++++---
 .../stram/plan/logical/LogicalPlanTest.java     | 82 ++++++++++++--------
 3 files changed, 79 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/2ce4ae51/api/src/main/java/com/datatorrent/api/DAG.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/DAG.java b/api/src/main/java/com/datatorrent/api/DAG.java
index 96420a3..93936d7 100644
--- a/api/src/main/java/com/datatorrent/api/DAG.java
+++ b/api/src/main/java/com/datatorrent/api/DAG.java
@@ -22,6 +22,8 @@ import java.io.Serializable;
 import java.util.Collection;
 import java.util.Map;
 
+import javax.annotation.Nonnull;
+
 import org.apache.hadoop.classification.InterfaceStability;
 
 import com.datatorrent.api.Context.DAGContext;
@@ -216,7 +218,7 @@ public interface DAG extends DAGContext, Serializable
    * @param clazz Concrete class with default constructor so that instance of it can be initialized and added to the DAG.
    * @return Instance of the operator that has been added to the DAG.
    */
-  <T extends Operator> T addOperator(String name, Class<T> clazz);
+  <T extends Operator> T addOperator(@Nonnull String name, Class<T> clazz);
 
   /**
    * <p>addOperator.</p>
@@ -225,20 +227,20 @@ public interface DAG extends DAGContext, Serializable
    * @param operator Instance of the operator that needs to be added to the DAG
    * @return Instance of the operator that has been added to the DAG.
    */
-  <T extends Operator> T addOperator(String name, T operator);
+  <T extends Operator> T addOperator(@Nonnull String name, T operator);
 
   @InterfaceStability.Evolving
-  <T extends Module> T addModule(String name, Class<T> moduleClass);
+  <T extends Module> T addModule(@Nonnull String name, Class<T> moduleClass);
 
   @InterfaceStability.Evolving
-  <T extends Module> T addModule(String name, T module);
+  <T extends Module> T addModule(@Nonnull String name, T module);
 
   /**
    * <p>addStream.</p>
    * @param id Identifier of the stream that will be used to identify stream in DAG
    * @return
    */
-  StreamMeta addStream(String id);
+  StreamMeta addStream(@Nonnull String id);
 
   /**
    * Add identified stream for given source and sinks. Multiple sinks can be
@@ -256,7 +258,7 @@ public interface DAG extends DAGContext, Serializable
    * @param sinks
    * @return StreamMeta
    */
-  <T> StreamMeta addStream(String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T>... sinks);
+  <T> StreamMeta addStream(@Nonnull String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T>... sinks);
 
   /**
    * Overload varargs version to avoid generic array type safety warnings in calling code.
@@ -269,12 +271,12 @@ public interface DAG extends DAGContext, Serializable
    * @param sink1
    * @return StreamMeta
    */
-  <T> StreamMeta addStream(String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T> sink1);
+  <T> StreamMeta addStream(@Nonnull String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T> sink1);
 
   /**
    * <p>addStream.</p>
    */
-  <T> StreamMeta addStream(String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T> sink1, Operator.InputPort<? super T> sink2);
+  <T> StreamMeta addStream(@Nonnull String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T> sink1, Operator.InputPort<? super T> sink2);
 
   /**
    * <p>setAttribute.</p>

http://git-wip-us.apache.org/repos/asf/apex-core/blob/2ce4ae51/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 62c4fd8..bf4b2cb 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -51,6 +51,7 @@ import java.util.Stack;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 
+import javax.annotation.Nonnull;
 import javax.validation.ConstraintViolation;
 import javax.validation.ConstraintViolationException;
 import javax.validation.Validation;
@@ -105,6 +106,9 @@ import com.datatorrent.stram.engine.Slider;
 
 import static com.datatorrent.api.Context.PortContext.STREAM_CODEC;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Strings.isNullOrEmpty;
+
 /**
  * DAG contains the logical declarations of operators and streams.
  * <p>
@@ -1233,7 +1237,7 @@ public class LogicalPlan implements Serializable, DAG
   }
 
   @Override
-  public <T extends Operator> T addOperator(String name, Class<T> clazz)
+  public <T extends Operator> T addOperator(@Nonnull String name, Class<T> clazz)
   {
     T instance;
     try {
@@ -1246,18 +1250,20 @@ public class LogicalPlan implements Serializable, DAG
   }
 
   @Override
-  public <T extends Operator> T addOperator(String name, T operator)
+  public <T extends Operator> T addOperator(@Nonnull String name, T operator)
   {
+    checkArgument(!isNullOrEmpty(name), "operator name is null or empty");
+
     if (operators.containsKey(name)) {
       if (operators.get(name).operator == operator) {
         return operator;
       }
-      throw new IllegalArgumentException("duplicate operator id: " + operators.get(name));
+      throw new IllegalArgumentException("duplicate operator name: " + operators.get(name));
     }
 
     // Avoid name conflict with module.
     if (modules.containsKey(name)) {
-      throw new IllegalArgumentException("duplicate operator id: " + operators.get(name));
+      throw new IllegalArgumentException("duplicate operator name: " + operators.get(name));
     }
     OperatorMeta decl = new OperatorMeta(name, operator);
     rootOperators.add(decl); // will be removed when a sink is added to an input port for this operator
@@ -1347,16 +1353,18 @@ public class LogicalPlan implements Serializable, DAG
   }
 
   @Override
-  public <T extends Module> T addModule(String name, T module)
+  public <T extends Module> T addModule(@Nonnull String name, T module)
   {
+    checkArgument(!isNullOrEmpty(name), "module name is null or empty");
+
     if (modules.containsKey(name)) {
       if (modules.get(name).module == module) {
         return module;
       }
-      throw new IllegalArgumentException("duplicate module is: " + modules.get(name));
+      throw new IllegalArgumentException("duplicate module name: " + modules.get(name));
     }
     if (operators.containsKey(name)) {
-      throw new IllegalArgumentException("duplicate module is: " + modules.get(name));
+      throw new IllegalArgumentException("duplicate module name: " + modules.get(name));
     }
 
     ModuleMeta meta = new ModuleMeta(name, module);
@@ -1365,7 +1373,7 @@ public class LogicalPlan implements Serializable, DAG
   }
 
   @Override
-  public <T extends Module> T addModule(String name, Class<T> clazz)
+  public <T extends Module> T addModule(@Nonnull String name, Class<T> clazz)
   {
     T instance;
     try {
@@ -1399,8 +1407,9 @@ public class LogicalPlan implements Serializable, DAG
   }
 
   @Override
-  public StreamMeta addStream(String id)
+  public StreamMeta addStream(@Nonnull String id)
   {
+    checkArgument(!isNullOrEmpty(id),"stream id is null or empty");
     StreamMeta s = new StreamMeta(id);
     StreamMeta o = streams.put(id, s);
     if (o == null) {
@@ -1412,7 +1421,7 @@ public class LogicalPlan implements Serializable, DAG
 
   @Override
   @SuppressWarnings("unchecked")
-  public <T> StreamMeta addStream(String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T>... sinks)
+  public <T> StreamMeta addStream(@Nonnull String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T>... sinks)
   {
     StreamMeta s = addStream(id);
     s.setSource(source);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/2ce4ae51/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
index 1507e2d..9f68a4f 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
@@ -42,6 +42,7 @@ import javax.validation.constraints.NotNull;
 import javax.validation.constraints.Pattern;
 
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
 import com.esotericsoftware.kryo.DefaultSerializer;
@@ -60,6 +61,7 @@ import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Module;
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.Partitioner;
 import com.datatorrent.api.Sink;
@@ -85,15 +87,21 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
 
 public class LogicalPlanTest
 {
+  private LogicalPlan dag;
+
+  @Before
+  public void setUp()
+  {
+    dag = new LogicalPlan();
+  }
 
   @Test
   public void testCycleDetection()
   {
-    LogicalPlan dag = new LogicalPlan();
-
     //NodeConf operator1 = b.getOrAddNode("operator1");
     GenericTestOperator operator2 = dag.addOperator("operator2", GenericTestOperator.class);
     GenericTestOperator operator3 = dag.addOperator("operator3", GenericTestOperator.class);
@@ -145,8 +153,6 @@ public class LogicalPlanTest
   @Test
   public void testCycleDetectionWithDelay()
   {
-    LogicalPlan dag = new LogicalPlan();
-
     TestGeneratorInputOperator opA = dag.addOperator("A", TestGeneratorInputOperator.class);
     GenericTestOperator opB = dag.addOperator("B", GenericTestOperator.class);
     GenericTestOperator opC = dag.addOperator("C", GenericTestOperator.class);
@@ -192,7 +198,6 @@ public class LogicalPlanTest
   @Test
   public void testLogicalPlanSerialization() throws Exception
   {
-    LogicalPlan dag = new LogicalPlan();
     dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
 
     ValidationOperator validationNode = dag.addOperator("validationNode", ValidationOperator.class);
@@ -231,7 +236,6 @@ public class LogicalPlanTest
   @Test
   public void testDeleteOperator()
   {
-    LogicalPlan dag = new LogicalPlan();
     TestGeneratorInputOperator input = dag.addOperator("input1", TestGeneratorInputOperator.class);
     GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
     GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
@@ -359,7 +363,6 @@ public class LogicalPlanTest
     Assert.assertEquals("", "intField1", cv.getPropertyPath().toString());
 
     // ensure DAG validation produces matching results
-    LogicalPlan dag = new LogicalPlan();
     bean = dag.addOperator("testOperator", bean);
 
     try {
@@ -435,7 +438,6 @@ public class LogicalPlanTest
   @Test
   public void testValidationForNonInputRootOperator()
   {
-    LogicalPlan dag = new LogicalPlan();
     NoInputPortOperator x = dag.addOperator("x", new NoInputPortOperator());
     try {
       dag.validate();
@@ -463,7 +465,6 @@ public class LogicalPlanTest
   @Test
   public void testOperatorAnnotation()
   {
-    LogicalPlan dag = new LogicalPlan();
     TestGeneratorInputOperator input = dag.addOperator("input1", TestGeneratorInputOperator.class);
     TestOperatorAnnotationOperator operator = dag.addOperator("operator1", TestOperatorAnnotationOperator.class);
     dag.addStream("Connection", input.outport, operator.input1);
@@ -502,11 +503,49 @@ public class LogicalPlanTest
     }
   }
 
+  @Test(expected = IllegalArgumentException.class)
+  public void testNullOperatorName()
+  {
+    dag.addOperator(null, BaseOperator.class);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testEmptyOperatorName()
+  {
+    dag.addOperator("", BaseOperator.class);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testNullStreamId()
+  {
+    GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
+    dag.addStream(null, o1.outport1, o1.inport1, o1.inport2 );
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testEmptyStreamId()
+  {
+    GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
+    dag.addStream("", o1.outport1, o1.inport1 );
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testEmptyModuleName()
+  {
+    Module testModule = mock(Module.class);
+    dag.addModule("", testModule);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testNullModuleName()
+  {
+    Module testModule = mock(Module.class);
+    dag.addModule(null, testModule);
+  }
+
   @Test
   public void testPortConnectionValidation()
   {
-    LogicalPlan dag = new LogicalPlan();
-
     TestNonOptionalOutportInputOperator input = dag.addOperator("input1", TestNonOptionalOutportInputOperator.class);
 
     try {
@@ -534,8 +573,6 @@ public class LogicalPlanTest
   @Test
   public void testAtMostOnceProcessingModeValidation()
   {
-    LogicalPlan dag = new LogicalPlan();
-
     TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
     TestGeneratorInputOperator input2 = dag.addOperator("input2", TestGeneratorInputOperator.class);
 
@@ -560,14 +597,11 @@ public class LogicalPlanTest
 
     OperatorMeta outputOperOm = dag.getMeta(outputOper);
     Assert.assertEquals("" + outputOperOm.getAttributes(), Operator.ProcessingMode.AT_MOST_ONCE, outputOperOm.getValue(OperatorContext.PROCESSING_MODE));
-
   }
 
   @Test
   public void testExactlyOnceProcessingModeValidation()
   {
-    LogicalPlan dag = new LogicalPlan();
-
     TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
     TestGeneratorInputOperator input2 = dag.addOperator("input2", TestGeneratorInputOperator.class);
 
@@ -604,8 +638,6 @@ public class LogicalPlanTest
   @Test
   public void testLocalityValidation()
   {
-    LogicalPlan dag = new LogicalPlan();
-
     TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
     GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
     StreamMeta s1 = dag.addStream("input1.outport", input1.outport, o1.inport1).setLocality(Locality.THREAD_LOCAL);
@@ -668,7 +700,6 @@ public class LogicalPlanTest
   @Test
   public void testOutputPortAnnotation()
   {
-    LogicalPlan dag = new LogicalPlan();
     TestAnnotationsOperator ta1 = dag.addOperator("testAnnotationsOperator", new TestAnnotationsOperator());
 
     try {
@@ -757,7 +788,6 @@ public class LogicalPlanTest
   @Test
   public void testJdkSerializableOperator() throws Exception
   {
-    LogicalPlan dag = new LogicalPlan();
     dag.addOperator("o1", new JdkSerializableOperator());
 
     ByteArrayOutputStream outStream = new ByteArrayOutputStream();
@@ -772,7 +802,6 @@ public class LogicalPlanTest
   @Test
   public void testAttributeValuesSerializableCheck() throws NoSuchFieldException, SecurityException, IllegalArgumentException, IllegalAccessException
   {
-    LogicalPlan dag = new LogicalPlan();
     Attribute<Object> attr = new Attribute<>(new TestAttributeValue(), new Object2String());
     Field nameField = Attribute.class.getDeclaredField("name");
     nameField.setAccessible(true);
@@ -892,7 +921,6 @@ public class LogicalPlanTest
   /*
   @Test
   public void testStreamCodec() throws Exception {
-    LogicalPlan dag = new LogicalPlan();
     TestGeneratorInputOperator input = dag.addOperator("input", TestGeneratorInputOperator.class);
     GenericTestOperator gto1 = dag.addOperator("gto1", GenericTestOperator.class);
     StreamMeta stream1 = dag.addStream("s1", input.outport, gto1.inport1);
@@ -946,7 +974,6 @@ public class LogicalPlanTest
   @Test
   public void testCheckpointableWithinAppWindowAnnotation()
   {
-    LogicalPlan dag = new LogicalPlan();
     TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
     GenericTestOperator x = dag.addOperator("x", new GenericTestOperator());
     dag.addStream("Stream1", input1.outport, x.inport1);
@@ -998,7 +1025,6 @@ public class LogicalPlanTest
   @Test
   public void testInputPortHiding()
   {
-    LogicalPlan dag = new LogicalPlan();
     TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
     Operator2 operator2 = dag.addOperator("operator2", new Operator2());
     dag.addStream("Stream1", input1.outport, operator2.input);
@@ -1008,7 +1034,6 @@ public class LogicalPlanTest
   @Test
   public void testInvalidInputPortConnection()
   {
-    LogicalPlan dag = new LogicalPlan();
     TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
     Operator1 operator1 = dag.addOperator("operator3", new Operator3());
     dag.addStream("Stream1", input1.outport, operator1.input);
@@ -1024,7 +1049,6 @@ public class LogicalPlanTest
   @Test
   public void testAffinityRulesDagValidation()
   {
-    LogicalPlan dag = new LogicalPlan();
     TestGeneratorInputOperator o1 = dag.addOperator("O1", new TestGeneratorInputOperator());
     GenericTestOperator o2 = dag.addOperator("O2", new GenericTestOperator());
     GenericTestOperator o3 = dag.addOperator("O3", new GenericTestOperator());
@@ -1183,7 +1207,6 @@ public class LogicalPlanTest
   @Test
   public void testOutputPortHiding()
   {
-    LogicalPlan dag = new LogicalPlan();
     Operator5 operator5 = dag.addOperator("input", new Operator5());
     Operator2 operator2 = dag.addOperator("operator2", new Operator2());
     dag.addStream("Stream1", operator5.output, operator2.input);
@@ -1193,7 +1216,6 @@ public class LogicalPlanTest
   @Test(expected = ValidationException.class)
   public void testInvalidOutputPortConnection()
   {
-    LogicalPlan dag = new LogicalPlan();
     Operator4 operator4 = dag.addOperator("input", new Operator5());
     Operator3 operator3 = dag.addOperator("operator3", new Operator3());
     dag.addStream("Stream1", operator4.output, operator3.input);
@@ -1245,8 +1267,6 @@ public class LogicalPlanTest
   @Test
   public void testInvalidInputOperatorDeclaration()
   {
-    LogicalPlan dag = new LogicalPlan();
-
     TestGeneratorInputOperator.InvalidInputOperator inputOperator = dag.addOperator("input", new TestGeneratorInputOperator.InvalidInputOperator());
     GenericTestOperator operator2 = dag.addOperator("operator2", GenericTestOperator.class);
 
@@ -1264,8 +1284,6 @@ public class LogicalPlanTest
   @Test
   public void testValidInputOperatorDeclaration()
   {
-    LogicalPlan dag = new LogicalPlan();
-
     TestGeneratorInputOperator.ValidGenericOperator operator1 = dag.addOperator("input", new TestGeneratorInputOperator.ValidGenericOperator());
     GenericTestOperator operator2 = dag.addOperator("operator2", GenericTestOperator.class);
 


[2/2] apex-core git commit: Merge branch 'APEXCORE-511.emptyNameChecksInLogicalPlan' of http://github.com/oliverwnk/apex-core into APEXCORE-511

Posted by vr...@apache.org.
Merge branch 'APEXCORE-511.emptyNameChecksInLogicalPlan' of http://github.com/oliverwnk/apex-core into APEXCORE-511


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/f63e01d1
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/f63e01d1
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/f63e01d1

Branch: refs/heads/master
Commit: f63e01d142f16dad34e2420dccbcc8addbbd404a
Parents: ca1a375 2ce4ae5
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Thu Apr 13 09:04:48 2017 -0700
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Thu Apr 13 09:04:48 2017 -0700

----------------------------------------------------------------------
 api/src/main/java/com/datatorrent/api/DAG.java  | 18 +++--
 .../stram/plan/logical/LogicalPlan.java         | 29 ++++---
 .../stram/plan/logical/LogicalPlanTest.java     | 82 ++++++++++++--------
 3 files changed, 79 insertions(+), 50 deletions(-)
----------------------------------------------------------------------