You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2015/09/21 20:15:37 UTC
[1/2] incubator-apex-core git commit: APEX-42: Added support for
configuring unifier attributes through configuration file
Repository: incubator-apex-core
Updated Branches:
refs/heads/devel-3 282c43b2e -> de1d0032a
APEX-42: Added support for configuring unifier attributes through configuration file
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/9b78c67b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/9b78c67b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/9b78c67b
Branch: refs/heads/devel-3
Commit: 9b78c67b68dd42b8216457e886c3cf221b9275b2
Parents: 282c43b
Author: Chaitanya <ch...@datatorrent.com>
Authored: Mon Sep 21 17:30:00 2015 +0530
Committer: Chaitanya <ch...@datatorrent.com>
Committed: Mon Sep 21 18:13:17 2015 +0530
----------------------------------------------------------------------
.../plan/logical/LogicalPlanConfiguration.java | 37 ++++++++++++++++---
.../logical/LogicalPlanConfigurationTest.java | 39 ++++++++++++++++++++
2 files changed, 71 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/9b78c67b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
index 7a53cd7..6b141bc 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
@@ -120,7 +120,7 @@ public class LogicalPlanConfiguration {
*/
protected enum StramElement {
APPLICATION("application"), GATEWAY("gateway"), TEMPLATE("template"), OPERATOR("operator"),STREAM("stream"), PORT("port"), INPUT_PORT("inputport"),OUTPUT_PORT("outputport"),
- ATTR("attr"), PROP("prop"),CLASS("class"),PATH("path");
+ ATTR("attr"), PROP("prop"),CLASS("class"),PATH("path"),UNIFIER("unifier");
private final String value;
/**
@@ -168,7 +168,8 @@ public class LogicalPlanConfiguration {
GATEWAY(StramElement.GATEWAY, ConfElement.APPLICATION, null, null),
OPERATOR(StramElement.OPERATOR, ConfElement.APPLICATION, null, OperatorContext.class),
STREAM(StramElement.STREAM, ConfElement.APPLICATION, null, null),
- PORT(StramElement.PORT, ConfElement.OPERATOR, EnumSet.of(StramElement.INPUT_PORT, StramElement.OUTPUT_PORT), PortContext.class);
+ PORT(StramElement.PORT, ConfElement.OPERATOR, EnumSet.of(StramElement.INPUT_PORT, StramElement.OUTPUT_PORT), PortContext.class),
+ UNIFIER(StramElement.UNIFIER, ConfElement.PORT, null, null);
protected static final Map<StramElement, ConfElement> STRAM_ELEMENT_TO_CONF_ELEMENT = Maps.newHashMap();
protected static final Map<Class<? extends Context>, ConfElement> CONTEXT_TO_CONF_ELEMENT = Maps.newHashMap();
@@ -182,6 +183,7 @@ public class LogicalPlanConfiguration {
STRAM.setChildren(Sets.newHashSet(APPLICATION, TEMPLATE));
APPLICATION.setChildren(Sets.newHashSet(GATEWAY, OPERATOR, STREAM));
OPERATOR.setChildren(Sets.newHashSet(PORT));
+ PORT.setChildren(Sets.newHashSet(UNIFIER));
STRAM_ELEMENT_TO_CONF_ELEMENT.clear();
@@ -1126,7 +1128,7 @@ public class LogicalPlanConfiguration {
private final Map<String, String> appAliases = Maps.newHashMap();
private static final StramElement[] CHILD_ELEMENTS = new StramElement[]{StramElement.APPLICATION, StramElement.GATEWAY, StramElement.TEMPLATE, StramElement.OPERATOR,
- StramElement.PORT, StramElement.INPUT_PORT, StramElement.OUTPUT_PORT, StramElement.STREAM, StramElement.TEMPLATE, StramElement.ATTR};
+ StramElement.PORT, StramElement.INPUT_PORT, StramElement.OUTPUT_PORT, StramElement.STREAM, StramElement.TEMPLATE, StramElement.ATTR, StramElement.UNIFIER};
StramConf() {
}
@@ -1151,7 +1153,7 @@ public class LogicalPlanConfiguration {
private static final StramElement[] CHILD_ELEMENTS = new StramElement[]{StramElement.GATEWAY, StramElement.OPERATOR, StramElement.PORT,
StramElement.INPUT_PORT, StramElement.OUTPUT_PORT, StramElement.STREAM, StramElement.ATTR, StramElement.CLASS, StramElement.PATH,
- StramElement.PROP};
+ StramElement.PROP, StramElement.UNIFIER};
@SuppressWarnings("unused")
AppConf() {
@@ -1446,7 +1448,7 @@ public class LogicalPlanConfiguration {
*/
private static class PortConf extends Conf {
- private static final StramElement[] CHILD_ELEMENTS = new StramElement[] {StramElement.ATTR};
+ private static final StramElement[] CHILD_ELEMENTS = new StramElement[] {StramElement.ATTR, StramElement.UNIFIER};
@SuppressWarnings("unused")
PortConf() {
@@ -1477,6 +1479,7 @@ public class LogicalPlanConfiguration {
elementMaps.put(StramElement.PORT, PortConf.class);
elementMaps.put(StramElement.INPUT_PORT, PortConf.class);
elementMaps.put(StramElement.OUTPUT_PORT, PortConf.class);
+ elementMaps.put(StramElement.UNIFIER, OperatorConf.class);
}
/**
@@ -1746,6 +1749,8 @@ public class LogicalPlanConfiguration {
parseAppElement(index, keys, element, conf, propertyName, propertyValue);
} else if (element == StramElement.GATEWAY) {
parseGatewayElement(element, conf, keys, index, propertyName, propertyValue);
+ } else if ((element == StramElement.UNIFIER)) {
+ parseUnifierElement(element, conf, keys, index, propertyName, propertyValue);
} else if ((element == StramElement.ATTR) || ((element == null) && (conf.getDefaultChildElement() == StramElement.ATTR))) {
parseAttributeElement(element, keys, index, conf, propertyValue, propertyName);
} else if ((element == StramElement.PROP) || ((element == null) && (conf.getDefaultChildElement() == StramElement.PROP))) {
@@ -1798,6 +1803,24 @@ public class LogicalPlanConfiguration {
}
/**
+ * This is a helper method for {@link #parseStramPropertyTokens} which is responsible for parsing a unifier element.
+ * @param element The current {@link StramElement} of the property being parsed.
+ * @param keys The keys that the property being parsed was split into.
+ * @param index The current key that the parser is on.
+ * @param propertyValue The value associated with the property being parsed.
+ * @param propertyName The complete unprocessed name of the property being parsed.
+ */
+ private void parseUnifierElement(StramElement element, Conf conf1, String[] keys, int index, String propertyName, String propertyValue)
+ {
+ Conf elConf = addConf(element, null, conf1);
+ if (elConf != null) {
+ parseStramPropertyTokens(keys, index+1, propertyName, propertyValue, elConf);
+ } else {
+ LOG.error("Invalid configuration key: {}", propertyName);
+ }
+ }
+
+ /**
* This is a helper method for {@link #parseStramPropertyTokens} which is responsible for parsing an attribute.
* @param element The current {@link StramElement} of the property being parsed.
* @param keys The keys that the property being parsed was split into.
@@ -2292,6 +2315,10 @@ public class LogicalPlanConfiguration {
List<PortConf> portConfs = getMatchingChildConf(opConfs, om.getPortName(), StramElement.PORT);
outPortConfs.addAll(portConfs);
setAttributes(outPortConfs, om.getAttributes());
+ List<OperatorConf> unifConfs = getMatchingChildConf(outPortConfs, null, StramElement.UNIFIER);
+ if(unifConfs.size() != 0) {
+ setAttributes(unifConfs, om.getUnifierMeta().getAttributes());
+ }
}
ow.populateAggregatorMeta();
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/9b78c67b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
index 077e3a9..9b2003b 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
@@ -525,6 +525,45 @@ public class LogicalPlanConfigurationTest {
}
@Test
+ @SuppressWarnings( {"UnnecessaryBoxing", "AssertEqualsBetweenInconvertibleTypes"})
+ public void testUnifierLevelAttributes() {
+ String appName = "app1";
+ final GenericTestOperator operator1 = new GenericTestOperator();
+ final GenericTestOperator operator2 = new GenericTestOperator();
+ StreamingApplication app = new StreamingApplication() {
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ dag.addOperator("operator1", operator1);
+ dag.addOperator("operator2", operator2);
+ dag.addStream("s1", operator1.outport1, operator2.inport1);
+ }
+ };
+
+ Properties props = new Properties();
+ props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".class", app.getClass().getName());
+ props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator1.outputport.outport1.unifier." + OperatorContext.APPLICATION_WINDOW_COUNT.getName(), "2");
+ props.put(StreamingApplication.DT_PREFIX + "application." + appName + ".operator.operator1.outputport.outport1.unifier." + OperatorContext.MEMORY_MB.getName(), "512");
+ LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false));
+ dagBuilder.addFromProperties(props, null);
+
+ String appPath = app.getClass().getName().replace(".", "/") + ".class";
+
+ LogicalPlan dag = new LogicalPlan();
+ dagBuilder.prepareDAG(dag, app, appPath);
+
+ OperatorMeta om = null;
+ for (Map.Entry<OutputPortMeta, StreamMeta> entry : dag.getOperatorMeta("operator1").getOutputStreams().entrySet()) {
+ if(entry.getKey().getPortName().equals("outport1")) {
+ om = entry.getKey().getUnifierMeta();
+ }
+ }
+ Assert.assertNotNull(om);
+ Assert.assertEquals("", Integer.valueOf(2), om.getValue(OperatorContext.APPLICATION_WINDOW_COUNT));
+ Assert.assertEquals("", Integer.valueOf(512), om.getValue(OperatorContext.MEMORY_MB));
+ }
+
+ @Test
public void testOperatorLevelProperties() {
String appName = "app1";
final GenericTestOperator operator1 = new GenericTestOperator();
[2/2] incubator-apex-core git commit: Merge branch
'APEX-42_UnifierAttr_final' of github.com:chaithu14/incubator-apex-core into
chaithu-unifier-config
Posted by ch...@apache.org.
Merge branch 'APEX-42_UnifierAttr_final' of github.com:chaithu14/incubator-apex-core into chaithu-unifier-config
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/de1d0032
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/de1d0032
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/de1d0032
Branch: refs/heads/devel-3
Commit: de1d0032af89b03b5bd39442e6b692937a84857b
Parents: 282c43b 9b78c67
Author: Chetan Narsude <ch...@datatorrent.com>
Authored: Mon Sep 21 10:41:26 2015 -0700
Committer: Chetan Narsude <ch...@datatorrent.com>
Committed: Mon Sep 21 10:41:26 2015 -0700
----------------------------------------------------------------------
.../plan/logical/LogicalPlanConfiguration.java | 37 ++++++++++++++++---
.../logical/LogicalPlanConfigurationTest.java | 39 ++++++++++++++++++++
2 files changed, 71 insertions(+), 5 deletions(-)
----------------------------------------------------------------------