You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2017/03/14 15:20:02 UTC
[1/2] apex-core git commit: Plugin infrastructure to setup DAG before
application launch.
Repository: apex-core
Updated Branches:
refs/heads/master 10650b3a0 -> 3024b06e1
Plugin infrastructure to setup DAG before application launch.
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/8d46cc6d
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/8d46cc6d
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/8d46cc6d
Branch: refs/heads/master
Commit: 8d46cc6d1256d00ad14024dbf12c601835af14f6
Parents: ad4210b
Author: Tushar R. Gosavi <tu...@apache.org>
Authored: Mon Mar 6 14:45:53 2017 +0530
Committer: Tushar R. Gosavi <tu...@apache.org>
Committed: Tue Mar 14 15:37:55 2017 +0530
----------------------------------------------------------------------
.../org/apache/apex/api/DAGSetupPlugin.java | 137 +++++++++++++++++++
.../common/util/BaseDAGSetupPlugin.java | 76 ++++++++++
.../datatorrent/stram/client/AppPackage.java | 2 -
.../plan/logical/DAGSetupPluginManager.java | 126 +++++++++++++++++
.../plan/logical/LogicalPlanConfiguration.java | 66 ++++++---
.../apex/engine/util/StreamingAppFactory.java | 4 +-
.../stram/plan/logical/DAGSetupPluginTests.java | 115 ++++++++++++++++
.../plan/logical/PropertyInjectorVisitor.java | 119 ++++++++++++++++
.../stram/webapp/OperatorDiscoveryTest.java | 17 ++-
.../src/test/resources/visitortests.properties | 20 +++
10 files changed, 656 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-core/blob/8d46cc6d/api/src/main/java/org/apache/apex/api/DAGSetupPlugin.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/apex/api/DAGSetupPlugin.java b/api/src/main/java/org/apache/apex/api/DAGSetupPlugin.java
new file mode 100644
index 0000000..d2e7199
--- /dev/null
+++ b/api/src/main/java/org/apache/apex/api/DAGSetupPlugin.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.api;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Component;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+
+/**
+ * DAGSetupPlugin allows user provided code to run at various stages
+ * during DAG preparation. Currently following stages are supported
+ *
+ * <ul>
+ * <li>Before dag is populated</li>
+ * <li>After dag is populated</li>
+ * <li>Before dag is configured</li>
+ * <li>After dag is configured</li>
+ * <li>Before dag is validated</li>
+ * <li>After dag is validated</li>
+ * </ul>
+ */
+@InterfaceStability.Evolving
+public interface DAGSetupPlugin extends Component<DAGSetupPlugin.DAGSetupPluginContext>, Serializable
+{
+
+ /**
+ * This method is called before platform adds operators and streams in the DAG. i.e this method
+ * will get called just before {@link com.datatorrent.api.StreamingApplication#populateDAG(DAG, Configuration)}
+ *
+ * For Application specified using property and json file format, this method will get called
+ * before platform adds operators and streams in the DAG as per specification in the file.
+ */
+ void prePopulateDAG();
+
+ /**
+ * This method is called after platform adds operators and streams in the DAG. i.e this method
+ * will get called just after {@link com.datatorrent.api.StreamingApplication#populateDAG(DAG, Configuration)}
+ * in case application is specified in java.
+ *
+ * For Application specified using property and json file format, this method will get called
+ * after platform has added operators and streams in the DAG as per specification in the file.
+ */
+ void postPopulateDAG();
+
+ /**
+ * This is method is called before DAG is configured, i.e operator and application
+ * properties/attributes are injected from configuration files.
+ */
+ void preConfigureDAG();
+
+ /**
+ * This is method is called after DAG is configured, i.e operator and application
+ * properties/attributes are injected from configuration files.
+ */
+ void postConfigureDAG();
+
+ /**
+ * This method is called just before dag is validated before final job submission.
+ */
+ void preValidateDAG();
+
+ /**
+ * This method is called after dag is validated. If plugin makes in incompatible changes
+ * to the DAG at this stage, then application may get launched incorrectly or application
+ * launch may fail.
+ */
+ void postValidateDAG();
+
+ public static class DAGSetupPluginContext implements Context
+ {
+ private final DAG dag;
+ private final Configuration conf;
+
+ public DAGSetupPluginContext(DAG dag, Configuration conf)
+ {
+ this.dag = dag;
+ this.conf = conf;
+ }
+
+ public DAG getDAG()
+ {
+ return dag;
+ }
+
+ public Configuration getConfiguration()
+ {
+ return conf;
+ }
+
+ @Override
+ public Attribute.AttributeMap getAttributes()
+ {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public <T> T getValue(Attribute<T> key)
+ {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public void setCounters(Object counters)
+ {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public void sendMetrics(Collection<String> metricNames)
+ {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/8d46cc6d/common/src/main/java/com/datatorrent/common/util/BaseDAGSetupPlugin.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/BaseDAGSetupPlugin.java b/common/src/main/java/com/datatorrent/common/util/BaseDAGSetupPlugin.java
new file mode 100644
index 0000000..7d26b89
--- /dev/null
+++ b/common/src/main/java/com/datatorrent/common/util/BaseDAGSetupPlugin.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.common.util;
+
+import org.apache.apex.api.DAGSetupPlugin;
+
+/**
+ * Base class for DAGSetupPlugin implementations that provides empty implementations
+ * for all interface methods.
+ */
+public class BaseDAGSetupPlugin implements DAGSetupPlugin
+{
+ @Override
+ public void setup(DAGSetupPluginContext context)
+ {
+
+ }
+
+ @Override
+ public void prePopulateDAG()
+ {
+
+ }
+
+ @Override
+ public void teardown()
+ {
+
+ }
+
+ @Override
+ public void postPopulateDAG()
+ {
+
+ }
+
+ @Override
+ public void preConfigureDAG()
+ {
+
+ }
+
+ @Override
+ public void postConfigureDAG()
+ {
+
+ }
+
+ @Override
+ public void preValidateDAG()
+ {
+
+ }
+
+ @Override
+ public void postValidateDAG()
+ {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/8d46cc6d/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java b/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java
index addf68e..e6b4b7f 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java
@@ -355,7 +355,6 @@ public class AppPackage extends JarFile
appInfo.displayName = appFactory.getDisplayName();
try {
appInfo.dag = appFactory.createApp(stramAppLauncher.getLogicalPlanConfiguration());
- appInfo.dag.validate();
} catch (Throwable ex) {
appInfo.error = ex.getMessage();
appInfo.errorStackTrace = ExceptionUtils.getStackTrace(ex);
@@ -394,7 +393,6 @@ public class AppPackage extends JarFile
appInfo.displayName = appFactory.getDisplayName();
try {
appInfo.dag = appFactory.createApp(stramAppLauncher.getLogicalPlanConfiguration());
- appInfo.dag.validate();
} catch (Throwable t) {
appInfo.error = t.getMessage();
appInfo.errorStackTrace = ExceptionUtils.getStackTrace(t);
http://git-wip-us.apache.org/repos/asf/apex-core/blob/8d46cc6d/engine/src/main/java/com/datatorrent/stram/plan/logical/DAGSetupPluginManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/DAGSetupPluginManager.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/DAGSetupPluginManager.java
new file mode 100644
index 0000000..ad37071
--- /dev/null
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/DAGSetupPluginManager.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.stram.plan.logical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+
+import org.apache.apex.api.DAGSetupPlugin;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.stram.StramUtils;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+public class DAGSetupPluginManager
+{
+ private static final Logger LOG = getLogger(DAGSetupPluginManager.class);
+
+ private final transient List<DAGSetupPlugin> plugins = new ArrayList<>();
+ private Configuration conf;
+
+ public static final String DAGSETUP_PLUGINS_CONF_KEY = "org.apache.apex.api";
+ private DAGSetupPlugin.DAGSetupPluginContext contex;
+
+ private void loadVisitors(Configuration conf)
+ {
+ this.conf = conf;
+ if (!plugins.isEmpty()) {
+ return;
+ }
+
+ String classNamesStr = conf.get(DAGSETUP_PLUGINS_CONF_KEY);
+ if (classNamesStr == null) {
+ return;
+ }
+ String[] classNames = classNamesStr.split(",");
+ for (String className : classNames) {
+ try {
+ Class<? extends DAGSetupPlugin> plugin = StramUtils.classForName(className, DAGSetupPlugin.class);
+ plugins.add(StramUtils.newInstance(plugin));
+ LOG.info("Found DAG setup plugin {}", plugin);
+ } catch (IllegalArgumentException e) {
+ LOG.warn("Could not load plugin {}", className);
+ }
+ }
+ }
+
+ public void setup(DAGSetupPlugin.DAGSetupPluginContext context)
+ {
+ this.contex = context;
+ for (DAGSetupPlugin plugin : plugins) {
+ plugin.setup(context);
+ }
+ }
+
+ public enum DispatchType
+ {
+ SETUP,
+ PRE_POPULATE,
+ POST_POPULATE,
+ PRE_CONFIGURE,
+ POST_CONFIGURE,
+ PRE_VALIDATE,
+ POST_VALIDATE,
+ TEARDOWN
+ }
+
+ public void dispatch(DispatchType type, DAGSetupPlugin.DAGSetupPluginContext context)
+ {
+ for (DAGSetupPlugin plugin : plugins) {
+ switch (type) {
+ case SETUP:
+ plugin.setup(context);
+ break;
+ case PRE_POPULATE:
+ plugin.prePopulateDAG();
+ break;
+ case POST_POPULATE:
+ plugin.postPopulateDAG();
+ break;
+ case PRE_CONFIGURE:
+ plugin.preConfigureDAG();
+ break;
+ case POST_CONFIGURE:
+ plugin.postValidateDAG();
+ break;
+ case PRE_VALIDATE:
+ plugin.preValidateDAG();
+ break;
+ case POST_VALIDATE:
+ plugin.postValidateDAG();
+ break;
+ case TEARDOWN:
+ plugin.teardown();
+ break;
+ default:
+ throw new UnsupportedOperationException("Not implemented ");
+ }
+ }
+ }
+
+ public static synchronized DAGSetupPluginManager getInstance(Configuration conf)
+ {
+ DAGSetupPluginManager manager = new DAGSetupPluginManager();
+ manager.loadVisitors(conf);
+ return manager;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/8d46cc6d/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
index bab414f..ffe33f3 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
@@ -48,6 +48,7 @@ import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.apex.api.DAGSetupPlugin.DAGSetupPluginContext;
import org.apache.commons.beanutils.BeanMap;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.collections.CollectionUtils;
@@ -85,6 +86,15 @@ import com.datatorrent.stram.plan.logical.LogicalPlan.OutputPortMeta;
import com.datatorrent.stram.plan.logical.LogicalPlan.StreamMeta;
import com.datatorrent.stram.util.ObjectMapperFactory;
+import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.POST_CONFIGURE;
+import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.POST_POPULATE;
+import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.POST_VALIDATE;
+import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.PRE_CONFIGURE;
+import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.PRE_POPULATE;
+import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.PRE_VALIDATE;
+import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.SETUP;
+import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.TEARDOWN;
+
/**
*
* Builder for the DAG logical representation of operators and streams from properties.<p>
@@ -134,6 +144,8 @@ public class LogicalPlanConfiguration
LOG.debug("Initialized attributes {}", serial);
}
+ private final DAGSetupPluginManager pluginManager;
+
/**
* This represents an element that can be referenced in a DT property.
*/
@@ -1640,6 +1652,7 @@ public class LogicalPlanConfiguration
{
this.conf = conf;
this.addFromConfiguration(conf);
+ this.pluginManager = DAGSetupPluginManager.getInstance(conf);
}
/**
@@ -2052,44 +2065,58 @@ public class LogicalPlanConfiguration
return Collections.unmodifiableMap(this.stramConf.appAliases);
}
- public LogicalPlan createFromProperties(Properties props, String appName) throws IOException
+ private LogicalPlan populateDAGAndValidate(LogicalPlanConfiguration tb, String appName)
{
- // build DAG from properties
- LogicalPlanConfiguration tb = new LogicalPlanConfiguration(new Configuration(false));
- tb.addFromProperties(props, conf);
LogicalPlan dag = new LogicalPlan();
+ DAGSetupPluginContext context = new DAGSetupPluginContext(dag, this.conf);
+ pluginManager.dispatch(SETUP, context);
+ pluginManager.dispatch(PRE_POPULATE, context);
tb.populateDAG(dag);
// configure with embedded settings
tb.prepareDAG(dag, null, appName);
+ pluginManager.dispatch(POST_POPULATE, context);
// configure with external settings
prepareDAG(dag, null, appName);
+ pluginManager.dispatch(PRE_VALIDATE, context);
+ dag.validate();
+ pluginManager.dispatch(POST_VALIDATE, context);
+ pluginManager.dispatch(TEARDOWN, context);
return dag;
}
+ public LogicalPlan createFromProperties(Properties props, String appName) throws IOException
+ {
+ // build DAG from properties
+ LogicalPlanConfiguration tb = new LogicalPlanConfiguration(new Configuration(false));
+ tb.addFromProperties(props, conf);
+ return populateDAGAndValidate(tb, appName);
+ }
+
public LogicalPlan createFromJson(JSONObject json, String appName) throws Exception
{
// build DAG from properties
LogicalPlanConfiguration tb = new LogicalPlanConfiguration(new Configuration(false));
tb.addFromJson(json, conf);
- LogicalPlan dag = new LogicalPlan();
- tb.populateDAG(dag);
- // configure with embedded settings
- tb.prepareDAG(dag, null, appName);
- // configure with external settings
- prepareDAG(dag, null, appName);
- return dag;
+ return populateDAGAndValidate(tb, appName);
}
public LogicalPlan createEmptyForRecovery(String appName)
{
// build DAG from properties
LogicalPlanConfiguration tb = new LogicalPlanConfiguration(new Configuration(false));
+ return populateDAGAndValidate(tb, appName);
+ }
+
+ public LogicalPlan createFromStreamingApplication(StreamingApplication app, String appName)
+ {
LogicalPlan dag = new LogicalPlan();
- tb.populateDAG(dag);
- // configure with embedded settings
- tb.prepareDAG(dag, null, appName);
- // configure with external settings
- prepareDAG(dag, null, appName);
+ DAGSetupPluginContext context = new DAGSetupPluginContext(dag, this.conf);
+ pluginManager.dispatch(SETUP, context);
+ prepareDAG(dag, app, appName);
+ pluginManager.dispatch(PRE_VALIDATE, context);
+ dag.validate();
+ pluginManager.dispatch(POST_VALIDATE, context);
+ pluginManager.dispatch(TEARDOWN, context);
return dag;
}
@@ -2186,7 +2213,6 @@ public class LogicalPlanConfiguration
}
}
}
-
}
private GenericOperator addOperator(LogicalPlan dag, String name, GenericOperator operator)
@@ -2222,9 +2248,14 @@ public class LogicalPlanConfiguration
// EVENTUALLY to be replaced by variable enabled configuration in the demo where the attribute below is used
String connectAddress = conf.get(StreamingApplication.DT_PREFIX + Context.DAGContext.GATEWAY_CONNECT_ADDRESS.getName());
dag.setAttribute(Context.DAGContext.GATEWAY_CONNECT_ADDRESS, connectAddress == null ? conf.get(GATEWAY_LISTEN_ADDRESS) : connectAddress);
+ DAGSetupPluginContext context = new DAGSetupPluginContext(dag, this.conf);
if (app != null) {
+ pluginManager.dispatch(SETUP, context);
+ pluginManager.dispatch(PRE_POPULATE, context);
app.populateDAG(dag, conf);
+ pluginManager.dispatch(POST_POPULATE, context);
}
+ pluginManager.dispatch(PRE_CONFIGURE, context);
String appAlias = getAppAlias(name);
String appName = appAlias == null ? name : appAlias;
List<AppConf> appConfs = stramConf.getMatchingChildConf(appName, StramElement.APPLICATION);
@@ -2240,6 +2271,7 @@ public class LogicalPlanConfiguration
// inject external operator configuration
setOperatorConfiguration(dag, appConfs, appName);
setStreamConfiguration(dag, appConfs, appName);
+ pluginManager.dispatch(POST_CONFIGURE, context);
}
private void flattenDAG(LogicalPlan dag, Configuration conf)
http://git-wip-us.apache.org/repos/asf/apex-core/blob/8d46cc6d/engine/src/main/java/org/apache/apex/engine/util/StreamingAppFactory.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/org/apache/apex/engine/util/StreamingAppFactory.java b/engine/src/main/java/org/apache/apex/engine/util/StreamingAppFactory.java
index 96edc8d..02c0910 100644
--- a/engine/src/main/java/org/apache/apex/engine/util/StreamingAppFactory.java
+++ b/engine/src/main/java/org/apache/apex/engine/util/StreamingAppFactory.java
@@ -44,9 +44,7 @@ public abstract class StreamingAppFactory implements StramAppLauncher.AppFactory
protected LogicalPlan createApp(StreamingApplication app, LogicalPlanConfiguration planConfig)
{
- LogicalPlan dag = new LogicalPlan();
- planConfig.prepareDAG(dag, app, getName());
- return dag;
+ return planConfig.createFromStreamingApplication(app, getName());
}
public String getName()
http://git-wip-us.apache.org/repos/asf/apex-core/blob/8d46cc6d/engine/src/test/java/com/datatorrent/stram/plan/logical/DAGSetupPluginTests.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/DAGSetupPluginTests.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/DAGSetupPluginTests.java
new file mode 100644
index 0000000..25201eb
--- /dev/null
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/DAGSetupPluginTests.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.stram.plan.logical;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.apex.engine.util.StreamingAppFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.stram.StramUtils;
+import com.datatorrent.stram.client.StramAppLauncher;
+import com.datatorrent.stram.engine.GenericTestOperator;
+import com.datatorrent.stram.engine.TestGeneratorInputOperator;
+
+public class DAGSetupPluginTests
+{
+
+ public static class Application implements StreamingApplication
+ {
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ TestGeneratorInputOperator inputOperator = dag.addOperator("inputOperator", new TestGeneratorInputOperator());
+ GenericTestOperator operator1 = dag.addOperator("operator1", new GenericTestOperator());
+ GenericTestOperator operator2 = dag.addOperator("operator2", new GenericTestOperator());
+ GenericTestOperator operator3 = dag.addOperator("operator3", new GenericTestOperator());
+ GenericTestOperator operator4 = dag.addOperator("operator4", new GenericTestOperator());
+
+ dag.addStream("n1n2", operator1.outport1, operator2.inport1);
+ dag.addStream("inputStream", inputOperator.outport, operator1.inport1, operator3.inport1, operator4.inport1);
+ }
+ }
+
+ private Configuration getConfiguration()
+ {
+ Configuration conf = new Configuration();
+ conf.set(DAGSetupPluginManager.DAGSETUP_PLUGINS_CONF_KEY, "com.datatorrent.stram.plan.logical.PropertyInjectorVisitor");
+ conf.set("propertyVisitor.Path","/visitortests.properties");
+ return conf;
+ }
+
+ @Test
+ public void testJavaApplication()
+ {
+ Configuration conf = getConfiguration();
+ StreamingAppFactory factory = new StreamingAppFactory(Application.class.getName(), Application.class)
+ {
+ @Override
+ public LogicalPlan createApp(LogicalPlanConfiguration planConfig)
+ {
+ Class<? extends StreamingApplication> c = StramUtils.classForName(Application.class.getName(), StreamingApplication.class);
+ StreamingApplication app = StramUtils.newInstance(c);
+ return super.createApp(app, planConfig);
+ }
+ };
+ LogicalPlan dag = factory.createApp(new LogicalPlanConfiguration(conf));
+ validateProperties(dag);
+ }
+
+ @Test
+ public void testPropertyFileApp() throws IOException
+ {
+ File tempFile = File.createTempFile("testTopology", "properties");
+ org.apache.commons.io.IOUtils.copy(getClass().getResourceAsStream("/testTopology.properties"), new FileOutputStream(tempFile));
+ StramAppLauncher.PropertyFileAppFactory factory = new StramAppLauncher.PropertyFileAppFactory(tempFile);
+ Configuration conf = getConfiguration();
+ LogicalPlan dag = factory.createApp(new LogicalPlanConfiguration(conf));
+ validateProperties(dag);
+ tempFile.delete();
+ }
+
+ @Test
+ public void testJsonFileApp() throws IOException
+ {
+ File tempFile = File.createTempFile("testTopology", "json");
+ org.apache.commons.io.IOUtils.copy(getClass().getResourceAsStream("/testTopology.json"), new FileOutputStream(tempFile));
+ StramAppLauncher.JsonFileAppFactory factory = new StramAppLauncher.JsonFileAppFactory(tempFile);
+ Configuration conf = getConfiguration();
+ LogicalPlan dag = factory.createApp(new LogicalPlanConfiguration(conf));
+ validateProperties(dag);
+ tempFile.delete();
+ }
+
+ protected void validateProperties(LogicalPlan dag)
+ {
+ String[] operators = new String[]{"operator1", "operator2", "operator3", "operator4"};
+ for (String name : operators) {
+ GenericTestOperator op = (GenericTestOperator)dag.getOperatorMeta(name).getOperator();
+ Assert.assertEquals("property set on operator ", op.getMyStringProperty(), "mynewstringvalue");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/8d46cc6d/engine/src/test/java/com/datatorrent/stram/plan/logical/PropertyInjectorVisitor.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/PropertyInjectorVisitor.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/PropertyInjectorVisitor.java
new file mode 100644
index 0000000..d2bd927
--- /dev/null
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/PropertyInjectorVisitor.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.stram.plan.logical;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import javax.validation.ValidationException;
+
+import org.slf4j.Logger;
+
+import org.apache.apex.api.DAGSetupPlugin;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+public class PropertyInjectorVisitor implements DAGSetupPlugin
+{
+ private static final Logger LOG = getLogger(PropertyInjectorVisitor.class);
+
+ private String path;
+ private Map<String, String> propertyMap = new HashMap<>();
+ private DAG dag;
+
+ @Override
+ public void setup(DAGSetupPluginContext context)
+ {
+ this.dag = context.getDAG();
+ try {
+ this.path = context.getConfiguration().get("propertyVisitor.Path");
+ Properties properties = new Properties();
+ properties.load(this.getClass().getResourceAsStream(path));
+ for (Map.Entry<Object, Object> entry : properties.entrySet()) {
+ propertyMap.put(entry.getKey().toString(), entry.getValue().toString());
+ }
+ } catch (IOException ex) {
+ throw new ValidationException("Not able to load input file " + path);
+ }
+ }
+
+ @Override
+ public void prePopulateDAG()
+ {
+
+ }
+
+ @Override
+ public void postPopulateDAG()
+ {
+
+ }
+
+ @Override
+ public void preConfigureDAG()
+ {
+
+ }
+
+ @Override
+ public void postConfigureDAG()
+ {
+
+ }
+
+ @Override
+ public void preValidateDAG()
+ {
+ for (DAG.OperatorMeta ometa : dag.getAllOperatorsMeta()) {
+ Operator o = ometa.getOperator();
+ LogicalPlanConfiguration.setOperatorProperties(o, propertyMap);
+ }
+ }
+
+ @Override
+ public void postValidateDAG()
+ {
+
+ }
+
+ public PropertyInjectorVisitor()
+ {
+ }
+
+ public String getPath()
+ {
+ return path;
+ }
+
+ public void setPath(String path)
+ {
+ this.path = path;
+ }
+
+ @Override
+ public void teardown()
+ {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/8d46cc6d/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java b/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java
index 6552015..9ac28c0 100644
--- a/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java
@@ -918,6 +918,15 @@ public class OperatorDiscoveryTest
}
+ public static class InputTestOperator<T, Z extends Map<String, Number>> extends TestOperator<T,Z> implements InputOperator
+ {
+ @Override
+ public void emitTuples()
+ {
+
+ }
+ }
+
static class ExtendedOperator extends TestOperator<String, Map<String, Number>>
{
}
@@ -1047,7 +1056,7 @@ public class OperatorDiscoveryTest
@Test
public void testLogicalPlanConfiguration() throws Exception
{
- TestOperator<String, Map<String, Number>> bean = new TestOperator<String, Map<String, Number>>();
+ TestOperator<String, Map<String, Number>> bean = new InputTestOperator<String, Map<String, Number>>();
bean.map.put("key1", new Structured());
bean.stringArray = new String[]{"one", "two", "three"};
bean.stringList = Lists.newArrayList("four", "five");
@@ -1074,7 +1083,7 @@ public class OperatorDiscoveryTest
jsonPlan.put("streams", new JSONArray());
JSONObject jsonOper = new JSONObject();
jsonOper.put("name", "Test Operator");
- jsonOper.put("class", TestOperator.class.getName());
+ jsonOper.put("class", InputTestOperator.class.getName());
jsonOper.put("properties", jsonObj);
jsonPlan.put("operators", new JSONArray(Lists.newArrayList(jsonOper)));
@@ -1083,9 +1092,9 @@ public class OperatorDiscoveryTest
// create logical plan from the json
LogicalPlan lp = lpc.createFromJson(jsonPlan, "jsontest");
OperatorMeta om = lp.getOperatorMeta("Test Operator");
- Assert.assertTrue(om.getOperator() instanceof TestOperator);
+ Assert.assertTrue(om.getOperator() instanceof InputTestOperator);
@SuppressWarnings("rawtypes")
- TestOperator beanBack = (TestOperator)om.getOperator();
+ TestOperator beanBack = (InputTestOperator)om.getOperator();
// The operator deserialized back from json should be same as original operator
Assert.assertEquals(bean.map, beanBack.map);
http://git-wip-us.apache.org/repos/asf/apex-core/blob/8d46cc6d/engine/src/test/resources/visitortests.properties
----------------------------------------------------------------------
diff --git a/engine/src/test/resources/visitortests.properties b/engine/src/test/resources/visitortests.properties
new file mode 100644
index 0000000..620c53b
--- /dev/null
+++ b/engine/src/test/resources/visitortests.properties
@@ -0,0 +1,20 @@
+##
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+myStringProperty=mynewstringvalue
[2/2] apex-core git commit: Merge branch 'APEXCORE-577' of
github.com:tushargosavi/apex-core
Posted by pr...@apache.org.
Merge branch 'APEXCORE-577' of github.com:tushargosavi/apex-core
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/3024b06e
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/3024b06e
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/3024b06e
Branch: refs/heads/master
Commit: 3024b06e16103efbd16a018f7b557dd848476c99
Parents: 10650b3 8d46cc6
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Tue Mar 14 08:00:29 2017 -0700
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Tue Mar 14 08:00:29 2017 -0700
----------------------------------------------------------------------
.../org/apache/apex/api/DAGSetupPlugin.java | 137 +++++++++++++++++++
.../common/util/BaseDAGSetupPlugin.java | 76 ++++++++++
.../datatorrent/stram/client/AppPackage.java | 2 -
.../plan/logical/DAGSetupPluginManager.java | 126 +++++++++++++++++
.../plan/logical/LogicalPlanConfiguration.java | 66 ++++++---
.../apex/engine/util/StreamingAppFactory.java | 4 +-
.../stram/plan/logical/DAGSetupPluginTests.java | 115 ++++++++++++++++
.../plan/logical/PropertyInjectorVisitor.java | 119 ++++++++++++++++
.../stram/webapp/OperatorDiscoveryTest.java | 17 ++-
.../src/test/resources/visitortests.properties | 20 +++
10 files changed, 656 insertions(+), 26 deletions(-)
----------------------------------------------------------------------