You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2015/09/21 20:15:37 UTC

[1/2] incubator-apex-core git commit: APEX-42: Added support for configuring unifier attributes through configuration file

Repository: incubator-apex-core
Updated Branches:
  refs/heads/devel-3 282c43b2e -> de1d0032a


APEX-42: Added support for configuring unifier attributes through configuration file


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/9b78c67b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/9b78c67b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/9b78c67b

Branch: refs/heads/devel-3
Commit: 9b78c67b68dd42b8216457e886c3cf221b9275b2
Parents: 282c43b
Author: Chaitanya <ch...@datatorrent.com>
Authored: Mon Sep 21 17:30:00 2015 +0530
Committer: Chaitanya <ch...@datatorrent.com>
Committed: Mon Sep 21 18:13:17 2015 +0530

----------------------------------------------------------------------
 .../plan/logical/LogicalPlanConfiguration.java  | 37 ++++++++++++++++---
 .../logical/LogicalPlanConfigurationTest.java   | 39 ++++++++++++++++++++
 2 files changed, 71 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/9b78c67b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
index 7a53cd7..6b141bc 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
@@ -120,7 +120,7 @@ public class LogicalPlanConfiguration {
    */
   protected enum StramElement {
     APPLICATION("application"), GATEWAY("gateway"), TEMPLATE("template"), OPERATOR("operator"),STREAM("stream"), PORT("port"), INPUT_PORT("inputport"),OUTPUT_PORT("outputport"),
-    ATTR("attr"), PROP("prop"),CLASS("class"),PATH("path");
+    ATTR("attr"), PROP("prop"),CLASS("class"),PATH("path"),UNIFIER("unifier");
     private final String value;
 
     /**
@@ -168,7 +168,8 @@ public class LogicalPlanConfiguration {
     GATEWAY(StramElement.GATEWAY, ConfElement.APPLICATION, null, null),
     OPERATOR(StramElement.OPERATOR, ConfElement.APPLICATION, null, OperatorContext.class),
     STREAM(StramElement.STREAM, ConfElement.APPLICATION, null, null),
-    PORT(StramElement.PORT, ConfElement.OPERATOR, EnumSet.of(StramElement.INPUT_PORT, StramElement.OUTPUT_PORT), PortContext.class);
+    PORT(StramElement.PORT, ConfElement.OPERATOR, EnumSet.of(StramElement.INPUT_PORT, StramElement.OUTPUT_PORT), PortContext.class),
+    UNIFIER(StramElement.UNIFIER, ConfElement.PORT, null, null);
 
     protected static final Map<StramElement, ConfElement> STRAM_ELEMENT_TO_CONF_ELEMENT = Maps.newHashMap();
     protected static final Map<Class<? extends Context>, ConfElement> CONTEXT_TO_CONF_ELEMENT = Maps.newHashMap();
@@ -182,6 +183,7 @@ public class LogicalPlanConfiguration {
       STRAM.setChildren(Sets.newHashSet(APPLICATION, TEMPLATE));
       APPLICATION.setChildren(Sets.newHashSet(GATEWAY, OPERATOR, STREAM));
       OPERATOR.setChildren(Sets.newHashSet(PORT));
+      PORT.setChildren(Sets.newHashSet(UNIFIER));
 
       STRAM_ELEMENT_TO_CONF_ELEMENT.clear();
 
@@ -1126,7 +1128,7 @@ public class LogicalPlanConfiguration {
     private final Map<String, String> appAliases = Maps.newHashMap();
 
     private static final StramElement[] CHILD_ELEMENTS = new StramElement[]{StramElement.APPLICATION, StramElement.GATEWAY, StramElement.TEMPLATE, StramElement.OPERATOR,
-            StramElement.PORT, StramElement.INPUT_PORT, StramElement.OUTPUT_PORT, StramElement.STREAM, StramElement.TEMPLATE, StramElement.ATTR};
+            StramElement.PORT, StramElement.INPUT_PORT, StramElement.OUTPUT_PORT, StramElement.STREAM, StramElement.TEMPLATE, StramElement.ATTR, StramElement.UNIFIER};
 
     StramConf() {
     }
@@ -1151,7 +1153,7 @@ public class LogicalPlanConfiguration {
 
     private static final StramElement[] CHILD_ELEMENTS = new StramElement[]{StramElement.GATEWAY, StramElement.OPERATOR, StramElement.PORT,
             StramElement.INPUT_PORT, StramElement.OUTPUT_PORT, StramElement.STREAM, StramElement.ATTR, StramElement.CLASS, StramElement.PATH,
-            StramElement.PROP};
+            StramElement.PROP, StramElement.UNIFIER};
 
     @SuppressWarnings("unused")
     AppConf() {
@@ -1446,7 +1448,7 @@ public class LogicalPlanConfiguration {
    */
   private static class PortConf extends Conf {
 
-    private static final StramElement[] CHILD_ELEMENTS = new StramElement[] {StramElement.ATTR};
+    private static final StramElement[] CHILD_ELEMENTS = new StramElement[] {StramElement.ATTR, StramElement.UNIFIER};
 
     @SuppressWarnings("unused")
     PortConf() {
@@ -1477,6 +1479,7 @@ public class LogicalPlanConfiguration {
     elementMaps.put(StramElement.PORT, PortConf.class);
     elementMaps.put(StramElement.INPUT_PORT, PortConf.class);
     elementMaps.put(StramElement.OUTPUT_PORT, PortConf.class);
+    elementMaps.put(StramElement.UNIFIER, OperatorConf.class);
   }
 
   /**
@@ -1746,6 +1749,8 @@ public class LogicalPlanConfiguration {
         parseAppElement(index, keys, element, conf, propertyName, propertyValue);
       } else if (element == StramElement.GATEWAY) {
         parseGatewayElement(element, conf, keys, index, propertyName, propertyValue);
+      } else if ((element == StramElement.UNIFIER)) {
+        parseUnifierElement(element, conf, keys, index, propertyName, propertyValue);
       } else if ((element == StramElement.ATTR) || ((element == null) && (conf.getDefaultChildElement() == StramElement.ATTR))) {
         parseAttributeElement(element, keys, index, conf, propertyValue, propertyName);
       } else if ((element == StramElement.PROP) || ((element == null) && (conf.getDefaultChildElement() == StramElement.PROP))) {
@@ -1798,6 +1803,24 @@ public class LogicalPlanConfiguration {
   }
 
   /**
+   * This is a helper method for {@link #parseStramPropertyTokens} which is responsible for parsing a unifier element.
+   * @param element The current {@link StramElement} of the property being parsed.
+   * @param keys The keys that the property being parsed was split into.
+   * @param index The current key that the parser is on.
+   * @param propertyValue The value associated with the property being parsed.
+   * @param propertyName The complete unprocessed name of the property being parsed.
+   */
+  private void parseUnifierElement(StramElement element, Conf conf1, String[] keys, int index, String propertyName, String propertyValue)
+  {
+    Conf elConf = addConf(element, null, conf1);
+    if (elConf != null) {
+      parseStramPropertyTokens(keys, index+1, propertyName, propertyValue, elConf);
+    } else {
+      LOG.error("Invalid configuration key: {}", propertyName);
+    }
+  }
+
+  /**
    * This is a helper method for {@link #parseStramPropertyTokens} which is responsible for parsing an attribute.
    * @param element The current {@link StramElement} of the property being parsed.
    * @param keys The keys that the property being parsed was split into.
@@ -2292,6 +2315,10 @@ public class LogicalPlanConfiguration {
         List<PortConf> portConfs = getMatchingChildConf(opConfs, om.getPortName(), StramElement.PORT);
         outPortConfs.addAll(portConfs);
         setAttributes(outPortConfs, om.getAttributes());
+        List<OperatorConf> unifConfs = getMatchingChildConf(outPortConfs, null, StramElement.UNIFIER);
+        if(unifConfs.size() != 0) {
+          setAttributes(unifConfs, om.getUnifierMeta().getAttributes());
+        }
       }
       ow.populateAggregatorMeta();
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/9b78c67b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
index 077e3a9..9b2003b 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
@@ -525,6 +525,45 @@ public class LogicalPlanConfigurationTest {
   }
 
   @Test
+  @SuppressWarnings( {"UnnecessaryBoxing", "AssertEqualsBetweenInconvertibleTypes"})
+  public void testUnifierLevelAttributes() {
+    String appName = "app1";
+    final GenericTestOperator operator1 = new GenericTestOperator();
+    final GenericTestOperator operator2 = new GenericTestOperator();
+    StreamingApplication app = new StreamingApplication() {
+      @Override
+      public void populateDAG(DAG dag, Configuration conf)
+      {
+        dag.addOperator("operator1", operator1);
+        dag.addOperator("operator2", operator2);
+        dag.addStream("s1", operator1.outport1, operator2.inport1);
+      }
+    };
+
+    Properties props = new Properties();
+    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".class", app.getClass().getName());
+    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator1.outputport.outport1.unifier." + OperatorContext.APPLICATION_WINDOW_COUNT.getName(), "2");
+    props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator1.outputport.outport1.unifier." + OperatorContext.MEMORY_MB.getName(), "512");
+    LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false));
+    dagBuilder.addFromProperties(props, null);
+
+    String appPath = app.getClass().getName().replace(".", "/") + ".class";
+
+    LogicalPlan dag = new LogicalPlan();
+    dagBuilder.prepareDAG(dag, app, appPath);
+
+    OperatorMeta om = null;
+    for (Map.Entry<OutputPortMeta, StreamMeta> entry : dag.getOperatorMeta("operator1").getOutputStreams().entrySet()) {
+      if(entry.getKey().getPortName().equals("outport1")) {
+        om = entry.getKey().getUnifierMeta();
+      }
+    }
+    Assert.assertNotNull(om);
+    Assert.assertEquals("", Integer.valueOf(2), om.getValue(OperatorContext.APPLICATION_WINDOW_COUNT));
+    Assert.assertEquals("", Integer.valueOf(512), om.getValue(OperatorContext.MEMORY_MB));
+  }
+
+  @Test
   public void testOperatorLevelProperties() {
     String appName = "app1";
     final GenericTestOperator operator1 = new GenericTestOperator();


[2/2] incubator-apex-core git commit: Merge branch 'APEX-42_UnifierAttr_final' of github.com:chaithu14/incubator-apex-core into chaithu-unifier-config

Posted by ch...@apache.org.
Merge branch 'APEX-42_UnifierAttr_final' of github.com:chaithu14/incubator-apex-core into chaithu-unifier-config


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/de1d0032
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/de1d0032
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/de1d0032

Branch: refs/heads/devel-3
Commit: de1d0032af89b03b5bd39442e6b692937a84857b
Parents: 282c43b 9b78c67
Author: Chetan Narsude <ch...@datatorrent.com>
Authored: Mon Sep 21 10:41:26 2015 -0700
Committer: Chetan Narsude <ch...@datatorrent.com>
Committed: Mon Sep 21 10:41:26 2015 -0700

----------------------------------------------------------------------
 .../plan/logical/LogicalPlanConfiguration.java  | 37 ++++++++++++++++---
 .../logical/LogicalPlanConfigurationTest.java   | 39 ++++++++++++++++++++
 2 files changed, 71 insertions(+), 5 deletions(-)
----------------------------------------------------------------------