You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2015/09/10 00:13:00 UTC

[29/50] incubator-apex-core git commit: APEX-43: assigning classToStringCodec to TUPLE_CLASS attr in PortContext

APEX-43: assigning classToStringCodec to TUPLE_CLASS attr in PortContext


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/93b8c661
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/93b8c661
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/93b8c661

Branch: refs/heads/master
Commit: 93b8c661942a13449975123904d24ab71c69de7a
Parents: d813963
Author: Chandni Singh <ch...@datatorrent.com>
Authored: Tue Aug 11 13:33:04 2015 -0700
Committer: Chandni Singh <ch...@datatorrent.com>
Committed: Tue Aug 11 14:59:56 2015 -0700

----------------------------------------------------------------------
 .../main/java/com/datatorrent/api/Context.java  |  2 +-
 .../plan/LogicalPlanConfigurationTest.java      | 32 +++++++++++++++-----
 2 files changed, 26 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/93b8c661/api/src/main/java/com/datatorrent/api/Context.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java
index 249cecd..cd10398 100644
--- a/api/src/main/java/com/datatorrent/api/Context.java
+++ b/api/src/main/java/com/datatorrent/api/Context.java
@@ -156,7 +156,7 @@ public interface Context
      * Provides the tuple class which the port receives or emits. While this attribute is null by default,
      * whether it is needed or not is controlled through the port annotation.
      */
-    Attribute<Class<?>> TUPLE_CLASS = new Attribute<>(new Object2String<Class<?>>());
+    Attribute<Class<?>> TUPLE_CLASS = new Attribute<>(new Class2String<>());
 
     @SuppressWarnings("FieldNameHidesFieldInSuperclass")
     long serialVersionUID = AttributeMap.AttributeInitializer.initialize(PortContext.class);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/93b8c661/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanConfigurationTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanConfigurationTest.java b/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanConfigurationTest.java
index af12575..3b6bdd1 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanConfigurationTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/LogicalPlanConfigurationTest.java
@@ -713,7 +713,7 @@ public class LogicalPlanConfigurationTest {
     }
   }
 
-  @Test
+  @Test(expected = ValidationException.class)
   public void testTupleClassAttrValidation() throws Exception
   {
     String resourcePath = "/schemaTestTopology.json";
@@ -733,12 +733,30 @@ public class LogicalPlanConfigurationTest {
     LogicalPlanConfiguration planConf = new LogicalPlanConfiguration(conf);
     LogicalPlan dag = planConf.createFromJson(json, "testLoadFromJson");
 
-    try {
-      dag.validate();
-      Assert.fail();
-    } catch (ValidationException ve) {
-      //test pass as validation exception was thrown.
-    }
+    dag.validate();
+  }
+
+  @Test
+  public void testTestTupleClassAttrSetFromConfig()
+  {
+    Configuration conf = new Configuration(false);
+    conf.set(StreamingApplication.DT_PREFIX + "operator.o2.port.schemaRequiredPort.attr.TUPLE_CLASS",
+      "com.datatorrent.stram.plan.LogicalPlanConfigurationTest$TestSchema");
+
+    StreamingApplication streamingApplication = new StreamingApplication()
+    {
+      @Override
+      public void populateDAG(DAG dag, Configuration conf)
+      {
+        TestGeneratorInputOperator o1 = dag.addOperator("o1", new TestGeneratorInputOperator());
+        SchemaTestOperator o2 = dag.addOperator("o2", new SchemaTestOperator());
+        dag.addStream("stream", o1.outport, o2.schemaRequiredPort);
+      }
+    };
+    LogicalPlan dag = new LogicalPlan();
+    LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(conf);
+    lpc.prepareDAG(dag, streamingApplication, "app");
+    dag.validate();
   }
 
   private static final Logger logger = LoggerFactory.getLogger(LogicalPlanConfigurationTest.class);