You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2015/11/03 03:27:12 UTC
[11/50] [abbrv] incubator-apex-core git commit: APEX-157 #comment
#resolve Added changes for attribute serializable check in dag.validate
APEX-157 #comment #resolve Added changes for attribute serializable check in dag.validate
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/b799bd2d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/b799bd2d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/b799bd2d
Branch: refs/heads/master
Commit: b799bd2d3edd7af3bdf4631129601e68ce349bf4
Parents: 4adac06
Author: ishark <is...@datatorrent.com>
Authored: Thu Sep 24 17:05:13 2015 -0700
Committer: ishark <is...@datatorrent.com>
Committed: Mon Sep 28 16:43:08 2015 -0700
----------------------------------------------------------------------
.../stram/plan/logical/LogicalPlan.java | 23 ++++++
.../logical/LogicalPlanConfigurationTest.java | 6 ++
.../stram/plan/logical/LogicalPlanTest.java | 83 +++++++++++++++++++-
engine/src/test/resources/testTopology.json | 2 +-
4 files changed, 112 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b799bd2d/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index f068884..6405644 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.Sets;
import com.datatorrent.api.*;
+import com.datatorrent.api.Attribute.AttributeMap;
import com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap;
import com.datatorrent.api.Operator.InputPort;
import com.datatorrent.api.Operator.OutputPort;
@@ -1312,6 +1313,8 @@ public class LogicalPlan implements Serializable, DAG
Validation.buildDefaultValidatorFactory();
Validator validator = factory.getValidator();
+ checkAttributeValueSerializable(this.getAttributes(), DAG.class.getName());
+
// clear oioRoot values in all operators
for (OperatorMeta n: operators.values()) {
n.oioRoot = null;
@@ -1336,6 +1339,8 @@ public class LogicalPlan implements Serializable, DAG
OperatorMeta.PortMapping portMapping = n.getPortMapping();
+ checkAttributeValueSerializable(n.getAttributes(), n.getName());
+
// Check operator annotation
if (n.operatorAnnotation != null) {
// Check if partition property of the operator is being honored
@@ -1368,6 +1373,7 @@ public class LogicalPlan implements Serializable, DAG
// check that non-optional ports are connected
for (InputPortMeta pm: portMapping.inPortMap.values()) {
+ checkAttributeValueSerializable(pm.getAttributes(), n.getName() + "." + pm.getPortName());
StreamMeta sm = n.inputStreams.get(pm);
if (sm == null) {
if ((pm.portAnnotation == null || !pm.portAnnotation.optional()) && pm.classDeclaringHiddenPort == null) {
@@ -1397,6 +1403,7 @@ public class LogicalPlan implements Serializable, DAG
boolean allPortsOptional = true;
for (OutputPortMeta pm: portMapping.outPortMap.values()) {
+ checkAttributeValueSerializable(pm.getAttributes(), n.getName() + "." + pm.getPortName());
if (!n.outputStreams.containsKey(pm)) {
if ((pm.portAnnotation != null && !pm.portAnnotation.optional()) && pm.classDeclaringHiddenPort == null) {
throw new ValidationException("Output port connection required: " + n.name + "." + pm.getPortName());
@@ -1458,6 +1465,22 @@ public class LogicalPlan implements Serializable, DAG
}
+ private void checkAttributeValueSerializable(AttributeMap attributes, String context)
+ {
+ StringBuilder sb = new StringBuilder();
+ String delim = "";
+ // Check all attributes got operator are serializable
+ for (Entry<Attribute<?>, Object> entry : attributes.entrySet()) {
+ if (entry.getValue() != null && !(entry.getValue() instanceof Serializable)) {
+ sb.append(delim).append(entry.getKey().getSimpleName());
+ delim = ", ";
+ }
+ }
+ if (sb.length() > 0) {
+ throw new ValidationException("Attribute value(s) for " + sb.toString() + " in " + context + " are not serializable");
+ }
+ }
+
/*
* Validates OIO constraints for operators with more than one input streams
* For a node to be OIO,
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b799bd2d/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
index c4ad724..1d95afe 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
@@ -20,6 +20,7 @@ package com.datatorrent.stram.plan.logical;
import java.io.IOException;
import java.io.InputStream;
+import java.io.Serializable;
import java.io.StringWriter;
import java.lang.reflect.Field;
@@ -84,6 +85,11 @@ public class LogicalPlanConfigurationTest {
return n;
}
+ public static class TestStreamCodec<T> extends JsonStreamCodec<T> implements Serializable
+ {
+ private static final long serialVersionUID = 1L;
+ }
+
/**
* Test read from dt-site.xml in Hadoop configuration format.
*/
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b799bd2d/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
index 52c5f7d..a4ac488 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanTest.java
@@ -19,9 +19,11 @@
package com.datatorrent.stram.plan.logical;
import com.datatorrent.common.util.BaseOperator;
+
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Serializable;
+import java.lang.reflect.Field;
import java.util.*;
import javax.validation.*;
@@ -41,6 +43,7 @@ import static org.junit.Assert.*;
import com.datatorrent.common.partitioner.StatelessPartitioner;
import com.datatorrent.api.*;
+import com.datatorrent.api.Context.DAGContext;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.Context.PortContext;
import com.datatorrent.api.DAG.Locality;
@@ -683,7 +686,85 @@ public class LogicalPlanTest {
Assert.assertNotNull("port object null", o1Clone.inport1);
}
- private static class TestStreamCodec implements StreamCodec<Object> {
+ @Test
+ public void testAttributeValuesSerializableCheck() throws NoSuchFieldException, SecurityException, IllegalArgumentException, IllegalAccessException
+ {
+ LogicalPlan dag = new LogicalPlan();
+ Attribute<Object> attr = new Attribute<Object>(new TestAttributeValue(), new Object2String());
+ Field nameField = Attribute.class.getDeclaredField("name");
+ nameField.setAccessible(true);
+ nameField.set(attr, "Test_Attribute");
+ nameField.setAccessible(false);
+
+ assertNotNull(attr);
+ // Dag attribute not serializable test
+ dag.setAttribute(attr, new TestAttributeValue());
+ try {
+ dag.validate();
+ Assert.fail("Setting not serializable attribute should throw exception");
+ } catch (ValidationException e) {
+ assertEquals("Validation Exception should match ", "Attribute value(s) for Test_Attribute in com.datatorrent.api.DAG are not serializable", e.getMessage());
+ }
+
+ // Operator attribute not serializable test
+ dag = new LogicalPlan();
+ TestGeneratorInputOperator operator = dag.addOperator("TestOperator", TestGeneratorInputOperator.class);
+ dag.setAttribute(operator, attr, new TestAttributeValue());
+ try {
+ dag.validate();
+ Assert.fail("Setting not serializable attribute should throw exception");
+ } catch (ValidationException e) {
+ assertEquals("Validation Exception should match ", "Attribute value(s) for Test_Attribute in TestOperator are not serializable", e.getMessage());
+ }
+
+ // Output Port attribute not serializable test
+ dag = new LogicalPlan();
+ operator = dag.addOperator("TestOperator", TestGeneratorInputOperator.class);
+ dag.setOutputPortAttribute(operator.outport, attr, new TestAttributeValue());
+ try {
+ dag.validate();
+ Assert.fail("Setting not serializable attribute should throw exception");
+ } catch (ValidationException e) {
+ assertEquals("Validation Exception should match ", "Attribute value(s) for Test_Attribute in TestOperator.outport are not serializable", e.getMessage());
+ }
+
+ // Input Port attribute not serializable test
+ dag = new LogicalPlan();
+ GenericTestOperator operator1 = dag.addOperator("TestOperator", GenericTestOperator.class);
+ dag.setInputPortAttribute(operator1.inport1, attr, new TestAttributeValue());
+ try {
+ dag.validate();
+ Assert.fail("Setting non serializable attribute should throw exception");
+ } catch (ValidationException e) {
+ assertEquals("Validation Exception should match ", "Attribute value(s) for Test_Attribute in TestOperator.inport1 are not serializable", e.getMessage());
+ }
+ }
+
+ private static class Object2String implements StringCodec<Object>
+ {
+
+ @Override
+ public Object fromString(String string)
+ {
+ // Stub method for testing - do nothing
+ return null;
+ }
+
+ @Override
+ public String toString(Object pojo)
+ {
+ // Stub method for testing - do nothing
+ return null;
+ }
+
+ }
+
+ private static class TestAttributeValue
+ {
+ }
+
+ private static class TestStreamCodec implements StreamCodec<Object>
+ {
@Override
public Object fromByteArray(Slice fragment)
{
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b799bd2d/engine/src/test/resources/testTopology.json
----------------------------------------------------------------------
diff --git a/engine/src/test/resources/testTopology.json b/engine/src/test/resources/testTopology.json
index 62c5262..45e1f0e 100644
--- a/engine/src/test/resources/testTopology.json
+++ b/engine/src/test/resources/testTopology.json
@@ -25,7 +25,7 @@
"attributes": {
"UNIFIER_LIMIT": 8,
"STREAM_CODEC" : {
- "com.datatorrent.common.codec.JsonStreamCodec" : {}
+ "com.datatorrent.stram.plan.logical.LogicalPlanConfigurationTest$TestStreamCodec" : {}
}
}
}