You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2015/11/03 03:27:12 UTC

[11/50] [abbrv] incubator-apex-core git commit: APEX-157 #comment #resolve Added changes for attribute serializable check in dag.validate

APEX-157 #comment #resolve Added changes for attribute serializable check in dag.validate


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/b799bd2d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/b799bd2d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/b799bd2d

Branch: refs/heads/master
Commit: b799bd2d3edd7af3bdf4631129601e68ce349bf4
Parents: 4adac06
Author: ishark <is...@datatorrent.com>
Authored: Thu Sep 24 17:05:13 2015 -0700
Committer: ishark <is...@datatorrent.com>
Committed: Mon Sep 28 16:43:08 2015 -0700

----------------------------------------------------------------------
 .../stram/plan/logical/LogicalPlan.java         | 23 ++++++
 .../logical/LogicalPlanConfigurationTest.java   |  6 ++
 .../stram/plan/logical/LogicalPlanTest.java     | 83 +++++++++++++++++++-
 engine/src/test/resources/testTopology.json     |  2 +-
 4 files changed, 112 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b799bd2d/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index f068884..6405644 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.collect.Sets;
 
 import com.datatorrent.api.*;
+import com.datatorrent.api.Attribute.AttributeMap;
 import com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap;
 import com.datatorrent.api.Operator.InputPort;
 import com.datatorrent.api.Operator.OutputPort;
@@ -1312,6 +1313,8 @@ public class LogicalPlan implements Serializable, DAG
             Validation.buildDefaultValidatorFactory();
     Validator validator = factory.getValidator();
 
+    checkAttributeValueSerializable(this.getAttributes(), DAG.class.getName());
+
     // clear oioRoot values in all operators
     for (OperatorMeta n: operators.values()) {
       n.oioRoot = null;
@@ -1336,6 +1339,8 @@ public class LogicalPlan implements Serializable, DAG
 
       OperatorMeta.PortMapping portMapping = n.getPortMapping();
 
+      checkAttributeValueSerializable(n.getAttributes(), n.getName());
+
       // Check operator annotation
       if (n.operatorAnnotation != null) {
         // Check if partition property of the operator is being honored
@@ -1368,6 +1373,7 @@ public class LogicalPlan implements Serializable, DAG
 
       // check that non-optional ports are connected
       for (InputPortMeta pm: portMapping.inPortMap.values()) {
+        checkAttributeValueSerializable(pm.getAttributes(), n.getName() + "." + pm.getPortName());
         StreamMeta sm = n.inputStreams.get(pm);
         if (sm == null) {
           if ((pm.portAnnotation == null || !pm.portAnnotation.optional()) && pm.classDeclaringHiddenPort == null) {
@@ -1397,6 +1403,7 @@ public class LogicalPlan implements Serializable, DAG
 
       boolean allPortsOptional = true;
       for (OutputPortMeta pm: portMapping.outPortMap.values()) {
+        checkAttributeValueSerializable(pm.getAttributes(), n.getName() + "." + pm.getPortName());
         if (!n.outputStreams.containsKey(pm)) {
           if ((pm.portAnnotation != null && !pm.portAnnotation.optional()) && pm.classDeclaringHiddenPort == null) {
             throw new ValidationException("Output port connection required: " + n.name + "." + pm.getPortName());
@@ -1458,6 +1465,22 @@ public class LogicalPlan implements Serializable, DAG
 
   }
 
+  private void checkAttributeValueSerializable(AttributeMap attributes, String context)
+  {
+    StringBuilder sb = new StringBuilder();
+    String delim = "";
+    // Check all attributes got operator are serializable
+    for (Entry<Attribute<?>, Object> entry : attributes.entrySet()) {
+      if (entry.getValue() != null && !(entry.getValue() instanceof Serializable)) {
+        sb.append(delim).append(entry.getKey().getSimpleName());
+        delim = ", ";
+      }
+    }
+    if (sb.length() > 0) {
+      throw new ValidationException("Attribute value(s) for " + sb.toString() + " in " + context + " are not serializable");
+    }
+  }
+
   /*
    * Validates OIO constraints for operators with more than one input streams
    * For a node to be OIO,

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b799bd2d/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
index c4ad724..1d95afe 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
@@ -20,6 +20,7 @@ package com.datatorrent.stram.plan.logical;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.Serializable;
 import java.io.StringWriter;
 import java.lang.reflect.Field;
 
@@ -84,6 +85,11 @@ public class LogicalPlanConfigurationTest {
       return n;
   }
 
+  public static class TestStreamCodec<T> extends JsonStreamCodec<T> implements Serializable
+  {
+    private static final long serialVersionUID = 1L;
+  }
+
   /**
    * Test read from dt-site.xml in Hadoop configuration format.
    */

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b799bd2d/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
index 52c5f7d..a4ac488 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
@@ -19,9 +19,11 @@
 package com.datatorrent.stram.plan.logical;
 
 import com.datatorrent.common.util.BaseOperator;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.Serializable;
+import java.lang.reflect.Field;
 import java.util.*;
 
 import javax.validation.*;
@@ -41,6 +43,7 @@ import static org.junit.Assert.*;
 
 import com.datatorrent.common.partitioner.StatelessPartitioner;
 import com.datatorrent.api.*;
+import com.datatorrent.api.Context.DAGContext;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.Context.PortContext;
 import com.datatorrent.api.DAG.Locality;
@@ -683,7 +686,85 @@ public class LogicalPlanTest {
     Assert.assertNotNull("port object null", o1Clone.inport1);
   }
 
-  private static class TestStreamCodec implements StreamCodec<Object> {
+  @Test
+  public void testAttributeValuesSerializableCheck() throws NoSuchFieldException, SecurityException, IllegalArgumentException, IllegalAccessException
+  {
+    LogicalPlan dag = new LogicalPlan();
+    Attribute<Object> attr = new Attribute<Object>(new TestAttributeValue(), new Object2String());
+    Field nameField = Attribute.class.getDeclaredField("name");
+    nameField.setAccessible(true);
+    nameField.set(attr, "Test_Attribute");
+    nameField.setAccessible(false);
+
+    assertNotNull(attr);
+    // Dag attribute not serializable test
+    dag.setAttribute(attr, new TestAttributeValue());
+    try {
+      dag.validate();
+      Assert.fail("Setting not serializable attribute should throw exception");
+    } catch (ValidationException e) {
+      assertEquals("Validation Exception should match ", "Attribute value(s) for Test_Attribute in com.datatorrent.api.DAG are not serializable", e.getMessage());
+    }
+
+    // Operator attribute not serializable test
+    dag = new LogicalPlan();
+    TestGeneratorInputOperator operator = dag.addOperator("TestOperator", TestGeneratorInputOperator.class);
+    dag.setAttribute(operator, attr, new TestAttributeValue());
+    try {
+      dag.validate();
+      Assert.fail("Setting not serializable attribute should throw exception");
+    } catch (ValidationException e) {
+      assertEquals("Validation Exception should match ", "Attribute value(s) for Test_Attribute in TestOperator are not serializable", e.getMessage());
+    }
+
+    // Output Port attribute not serializable test
+    dag = new LogicalPlan();
+    operator = dag.addOperator("TestOperator", TestGeneratorInputOperator.class);
+    dag.setOutputPortAttribute(operator.outport, attr, new TestAttributeValue());
+    try {
+      dag.validate();
+      Assert.fail("Setting not serializable attribute should throw exception");
+    } catch (ValidationException e) {
+      assertEquals("Validation Exception should match ", "Attribute value(s) for Test_Attribute in TestOperator.outport are not serializable", e.getMessage());
+    }
+
+    // Input Port attribute not serializable test
+    dag = new LogicalPlan();
+    GenericTestOperator operator1 = dag.addOperator("TestOperator", GenericTestOperator.class);
+    dag.setInputPortAttribute(operator1.inport1, attr, new TestAttributeValue());
+    try {
+      dag.validate();
+      Assert.fail("Setting non serializable attribute should throw exception");
+    } catch (ValidationException e) {
+      assertEquals("Validation Exception should match ", "Attribute value(s) for Test_Attribute in TestOperator.inport1 are not serializable", e.getMessage());
+    }
+  }
+
+  private static class Object2String implements StringCodec<Object>
+  {
+
+    @Override
+    public Object fromString(String string)
+    {
+      // Stub method for testing - do nothing
+      return null;
+    }
+
+    @Override
+    public String toString(Object pojo)
+    {
+      // Stub method for testing - do nothing
+      return null;
+    }
+
+  }
+
+  private static class TestAttributeValue
+  {
+  }
+
+  private static class TestStreamCodec implements StreamCodec<Object>
+  {
     @Override
     public Object fromByteArray(Slice fragment)
     {

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b799bd2d/engine/src/test/resources/testTopology.json
----------------------------------------------------------------------
diff --git a/engine/src/test/resources/testTopology.json b/engine/src/test/resources/testTopology.json
index 62c5262..45e1f0e 100644
--- a/engine/src/test/resources/testTopology.json
+++ b/engine/src/test/resources/testTopology.json
@@ -25,7 +25,7 @@
           "attributes": {
             "UNIFIER_LIMIT": 8,
             "STREAM_CODEC" : {
-              "com.datatorrent.common.codec.JsonStreamCodec" : {}
+              "com.datatorrent.stram.plan.logical.LogicalPlanConfigurationTest$TestStreamCodec" : {}
             }
           }
         }