You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2017/03/14 15:20:02 UTC

[1/2] apex-core git commit: Plugin infrastructure to setup DAG before application launch.

Repository: apex-core
Updated Branches:
  refs/heads/master 10650b3a0 -> 3024b06e1


Plugin infrastructure to setup DAG before application launch.


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/8d46cc6d
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/8d46cc6d
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/8d46cc6d

Branch: refs/heads/master
Commit: 8d46cc6d1256d00ad14024dbf12c601835af14f6
Parents: ad4210b
Author: Tushar R. Gosavi <tu...@apache.org>
Authored: Mon Mar 6 14:45:53 2017 +0530
Committer: Tushar R. Gosavi <tu...@apache.org>
Committed: Tue Mar 14 15:37:55 2017 +0530

----------------------------------------------------------------------
 .../org/apache/apex/api/DAGSetupPlugin.java     | 137 +++++++++++++++++++
 .../common/util/BaseDAGSetupPlugin.java         |  76 ++++++++++
 .../datatorrent/stram/client/AppPackage.java    |   2 -
 .../plan/logical/DAGSetupPluginManager.java     | 126 +++++++++++++++++
 .../plan/logical/LogicalPlanConfiguration.java  |  66 ++++++---
 .../apex/engine/util/StreamingAppFactory.java   |   4 +-
 .../stram/plan/logical/DAGSetupPluginTests.java | 115 ++++++++++++++++
 .../plan/logical/PropertyInjectorVisitor.java   | 119 ++++++++++++++++
 .../stram/webapp/OperatorDiscoveryTest.java     |  17 ++-
 .../src/test/resources/visitortests.properties  |  20 +++
 10 files changed, 656 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/8d46cc6d/api/src/main/java/org/apache/apex/api/DAGSetupPlugin.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/apex/api/DAGSetupPlugin.java b/api/src/main/java/org/apache/apex/api/DAGSetupPlugin.java
new file mode 100644
index 0000000..d2e7199
--- /dev/null
+++ b/api/src/main/java/org/apache/apex/api/DAGSetupPlugin.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.api;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Component;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+
+/**
+ * DAGSetupPlugin allows user provided code to run at various stages
+ * during DAG preparation. Currently following stages are supported
+ *
+ * <ul>
+ *   <li>Before dag is populated</li>
+ *   <li>After dag is populated</li>
+ *   <li>Before dag is configured</li>
+ *   <li>After dag is configured</li>
+ *   <li>Before dag is validated</li>
+ *   <li>After dag is validated</li>
+ * </ul>
+ */
+@InterfaceStability.Evolving
+public interface DAGSetupPlugin extends Component<DAGSetupPlugin.DAGSetupPluginContext>, Serializable
+{
+
+  /**
+   * This method is called before platform adds operators and streams in the DAG. i.e this method
+   * will get called just before {@link com.datatorrent.api.StreamingApplication#populateDAG(DAG, Configuration)}
+   *
+   * For Application specified using property and json file format, this method will get called
+   * before platform adds operators and streams in the DAG as per specification in the file.
+   */
+  void prePopulateDAG();
+
+  /**
+   * This method is called after platform adds operators and streams in the DAG. i.e this method
+   * will get called just after {@link com.datatorrent.api.StreamingApplication#populateDAG(DAG, Configuration)}
+   * in case application is specified in java.
+   *
+   * For Application specified using property and json file format, this method will get called
+   * after platform has added operators and streams in the DAG as per specification in the file.
+   */
+  void postPopulateDAG();
+
+  /**
+   * This is method is called before DAG is configured, i.e operator and application
+   * properties/attributes are injected from configuration files.
+   */
+  void preConfigureDAG();
+
+  /**
+   * This is method is called after DAG is configured, i.e operator and application
+   * properties/attributes are injected from configuration files.
+   */
+  void postConfigureDAG();
+
+  /**
+   * This method is called just before dag is validated before final job submission.
+   */
+  void preValidateDAG();
+
+  /**
+   * This method is called after dag is validated. If plugin makes in incompatible changes
+   * to the DAG at this stage, then application may get launched incorrectly or application
+   * launch may fail.
+   */
+  void postValidateDAG();
+
+  public static class DAGSetupPluginContext implements Context
+  {
+    private final DAG dag;
+    private final Configuration conf;
+
+    public DAGSetupPluginContext(DAG dag, Configuration conf)
+    {
+      this.dag = dag;
+      this.conf = conf;
+    }
+
+    public DAG getDAG()
+    {
+      return dag;
+    }
+
+    public Configuration getConfiguration()
+    {
+      return conf;
+    }
+
+    @Override
+    public Attribute.AttributeMap getAttributes()
+    {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public <T> T getValue(Attribute<T> key)
+    {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public void setCounters(Object counters)
+    {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public void sendMetrics(Collection<String> metricNames)
+    {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/8d46cc6d/common/src/main/java/com/datatorrent/common/util/BaseDAGSetupPlugin.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/com/datatorrent/common/util/BaseDAGSetupPlugin.java b/common/src/main/java/com/datatorrent/common/util/BaseDAGSetupPlugin.java
new file mode 100644
index 0000000..7d26b89
--- /dev/null
+++ b/common/src/main/java/com/datatorrent/common/util/BaseDAGSetupPlugin.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.common.util;
+
+import org.apache.apex.api.DAGSetupPlugin;
+
+/**
+ * Base class for DAGSetupPlugin implementations that provides empty implementations
+ * for all interface methods.
+ */
+public class BaseDAGSetupPlugin implements DAGSetupPlugin
+{
+  @Override
+  public void setup(DAGSetupPluginContext context)
+  {
+
+  }
+
+  @Override
+  public void prePopulateDAG()
+  {
+
+  }
+
+  @Override
+  public void teardown()
+  {
+
+  }
+
+  @Override
+  public void postPopulateDAG()
+  {
+
+  }
+
+  @Override
+  public void preConfigureDAG()
+  {
+
+  }
+
+  @Override
+  public void postConfigureDAG()
+  {
+
+  }
+
+  @Override
+  public void preValidateDAG()
+  {
+
+  }
+
+  @Override
+  public void postValidateDAG()
+  {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/8d46cc6d/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java b/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java
index addf68e..e6b4b7f 100644
--- a/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java
+++ b/engine/src/main/java/com/datatorrent/stram/client/AppPackage.java
@@ -355,7 +355,6 @@ public class AppPackage extends JarFile
             appInfo.displayName = appFactory.getDisplayName();
             try {
               appInfo.dag = appFactory.createApp(stramAppLauncher.getLogicalPlanConfiguration());
-              appInfo.dag.validate();
             } catch (Throwable ex) {
               appInfo.error = ex.getMessage();
               appInfo.errorStackTrace = ExceptionUtils.getStackTrace(ex);
@@ -394,7 +393,6 @@ public class AppPackage extends JarFile
           appInfo.displayName = appFactory.getDisplayName();
           try {
             appInfo.dag = appFactory.createApp(stramAppLauncher.getLogicalPlanConfiguration());
-            appInfo.dag.validate();
           } catch (Throwable t) {
             appInfo.error = t.getMessage();
             appInfo.errorStackTrace = ExceptionUtils.getStackTrace(t);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/8d46cc6d/engine/src/main/java/com/datatorrent/stram/plan/logical/DAGSetupPluginManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/DAGSetupPluginManager.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/DAGSetupPluginManager.java
new file mode 100644
index 0000000..ad37071
--- /dev/null
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/DAGSetupPluginManager.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.stram.plan.logical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+
+import org.apache.apex.api.DAGSetupPlugin;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.stram.StramUtils;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+public class DAGSetupPluginManager
+{
+  private static final Logger LOG = getLogger(DAGSetupPluginManager.class);
+
+  private final transient List<DAGSetupPlugin> plugins = new ArrayList<>();
+  private Configuration conf;
+
+  public static final String DAGSETUP_PLUGINS_CONF_KEY = "org.apache.apex.api";
+  private DAGSetupPlugin.DAGSetupPluginContext contex;
+
+  private void loadVisitors(Configuration conf)
+  {
+    this.conf = conf;
+    if (!plugins.isEmpty()) {
+      return;
+    }
+
+    String classNamesStr = conf.get(DAGSETUP_PLUGINS_CONF_KEY);
+    if (classNamesStr == null) {
+      return;
+    }
+    String[] classNames = classNamesStr.split(",");
+    for (String className : classNames) {
+      try {
+        Class<? extends DAGSetupPlugin> plugin = StramUtils.classForName(className, DAGSetupPlugin.class);
+        plugins.add(StramUtils.newInstance(plugin));
+        LOG.info("Found DAG setup plugin {}", plugin);
+      } catch (IllegalArgumentException e) {
+        LOG.warn("Could not load plugin {}", className);
+      }
+    }
+  }
+
+  public void setup(DAGSetupPlugin.DAGSetupPluginContext context)
+  {
+    this.contex = context;
+    for (DAGSetupPlugin plugin : plugins) {
+      plugin.setup(context);
+    }
+  }
+
+  public enum DispatchType
+  {
+    SETUP,
+    PRE_POPULATE,
+    POST_POPULATE,
+    PRE_CONFIGURE,
+    POST_CONFIGURE,
+    PRE_VALIDATE,
+    POST_VALIDATE,
+    TEARDOWN
+  }
+
+  public void dispatch(DispatchType type, DAGSetupPlugin.DAGSetupPluginContext context)
+  {
+    for (DAGSetupPlugin plugin : plugins) {
+      switch (type) {
+        case SETUP:
+          plugin.setup(context);
+          break;
+        case PRE_POPULATE:
+          plugin.prePopulateDAG();
+          break;
+        case POST_POPULATE:
+          plugin.postPopulateDAG();
+          break;
+        case PRE_CONFIGURE:
+          plugin.preConfigureDAG();
+          break;
+        case POST_CONFIGURE:
+          plugin.postValidateDAG();
+          break;
+        case PRE_VALIDATE:
+          plugin.preValidateDAG();
+          break;
+        case POST_VALIDATE:
+          plugin.postValidateDAG();
+          break;
+        case TEARDOWN:
+          plugin.teardown();
+          break;
+        default:
+          throw new UnsupportedOperationException("Not implemented ");
+      }
+    }
+  }
+
+  public static synchronized DAGSetupPluginManager getInstance(Configuration conf)
+  {
+    DAGSetupPluginManager manager = new DAGSetupPluginManager();
+    manager.loadVisitors(conf);
+    return manager;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/8d46cc6d/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
index bab414f..ffe33f3 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
@@ -48,6 +48,7 @@ import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.api.DAGSetupPlugin.DAGSetupPluginContext;
 import org.apache.commons.beanutils.BeanMap;
 import org.apache.commons.beanutils.BeanUtils;
 import org.apache.commons.collections.CollectionUtils;
@@ -85,6 +86,15 @@ import com.datatorrent.stram.plan.logical.LogicalPlan.OutputPortMeta;
 import com.datatorrent.stram.plan.logical.LogicalPlan.StreamMeta;
 import com.datatorrent.stram.util.ObjectMapperFactory;
 
+import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.POST_CONFIGURE;
+import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.POST_POPULATE;
+import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.POST_VALIDATE;
+import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.PRE_CONFIGURE;
+import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.PRE_POPULATE;
+import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.PRE_VALIDATE;
+import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.SETUP;
+import static com.datatorrent.stram.plan.logical.DAGSetupPluginManager.DispatchType.TEARDOWN;
+
 /**
  *
  * Builder for the DAG logical representation of operators and streams from properties.<p>
@@ -134,6 +144,8 @@ public class LogicalPlanConfiguration
     LOG.debug("Initialized attributes {}", serial);
   }
 
+  private final DAGSetupPluginManager pluginManager;
+
   /**
    * This represents an element that can be referenced in a DT property.
    */
@@ -1640,6 +1652,7 @@ public class LogicalPlanConfiguration
   {
     this.conf = conf;
     this.addFromConfiguration(conf);
+    this.pluginManager = DAGSetupPluginManager.getInstance(conf);
   }
 
   /**
@@ -2052,44 +2065,58 @@ public class LogicalPlanConfiguration
     return Collections.unmodifiableMap(this.stramConf.appAliases);
   }
 
-  public LogicalPlan createFromProperties(Properties props, String appName) throws IOException
+  private LogicalPlan populateDAGAndValidate(LogicalPlanConfiguration tb, String appName)
   {
-    // build DAG from properties
-    LogicalPlanConfiguration tb = new LogicalPlanConfiguration(new Configuration(false));
-    tb.addFromProperties(props, conf);
     LogicalPlan dag = new LogicalPlan();
+    DAGSetupPluginContext context = new DAGSetupPluginContext(dag, this.conf);
+    pluginManager.dispatch(SETUP, context);
+    pluginManager.dispatch(PRE_POPULATE, context);
     tb.populateDAG(dag);
     // configure with embedded settings
     tb.prepareDAG(dag, null, appName);
+    pluginManager.dispatch(POST_POPULATE, context);
     // configure with external settings
     prepareDAG(dag, null, appName);
+    pluginManager.dispatch(PRE_VALIDATE, context);
+    dag.validate();
+    pluginManager.dispatch(POST_VALIDATE, context);
+    pluginManager.dispatch(TEARDOWN, context);
     return dag;
   }
 
+  public LogicalPlan createFromProperties(Properties props, String appName) throws IOException
+  {
+    // build DAG from properties
+    LogicalPlanConfiguration tb = new LogicalPlanConfiguration(new Configuration(false));
+    tb.addFromProperties(props, conf);
+    return populateDAGAndValidate(tb, appName);
+  }
+
   public LogicalPlan createFromJson(JSONObject json, String appName) throws Exception
   {
     // build DAG from properties
     LogicalPlanConfiguration tb = new LogicalPlanConfiguration(new Configuration(false));
     tb.addFromJson(json, conf);
-    LogicalPlan dag = new LogicalPlan();
-    tb.populateDAG(dag);
-    // configure with embedded settings
-    tb.prepareDAG(dag, null, appName);
-    // configure with external settings
-    prepareDAG(dag, null, appName);
-    return dag;
+    return populateDAGAndValidate(tb, appName);
   }
 
   public LogicalPlan createEmptyForRecovery(String appName)
   {
     // build DAG from properties
     LogicalPlanConfiguration tb = new LogicalPlanConfiguration(new Configuration(false));
+    return populateDAGAndValidate(tb, appName);
+  }
+
+  public LogicalPlan createFromStreamingApplication(StreamingApplication app, String appName)
+  {
     LogicalPlan dag = new LogicalPlan();
-    tb.populateDAG(dag);
-    // configure with embedded settings
-    tb.prepareDAG(dag, null, appName);
-    // configure with external settings
-    prepareDAG(dag, null, appName);
+    DAGSetupPluginContext context = new DAGSetupPluginContext(dag, this.conf);
+    pluginManager.dispatch(SETUP, context);
+    prepareDAG(dag, app, appName);
+    pluginManager.dispatch(PRE_VALIDATE, context);
+    dag.validate();
+    pluginManager.dispatch(POST_VALIDATE, context);
+    pluginManager.dispatch(TEARDOWN, context);
     return dag;
   }
 
@@ -2186,7 +2213,6 @@ public class LogicalPlanConfiguration
         }
       }
     }
-
   }
 
   private GenericOperator addOperator(LogicalPlan dag, String name, GenericOperator operator)
@@ -2222,9 +2248,14 @@ public class LogicalPlanConfiguration
     // EVENTUALLY to be replaced by variable enabled configuration in the demo where the attribute below is used
     String connectAddress = conf.get(StreamingApplication.DT_PREFIX + Context.DAGContext.GATEWAY_CONNECT_ADDRESS.getName());
     dag.setAttribute(Context.DAGContext.GATEWAY_CONNECT_ADDRESS, connectAddress == null ? conf.get(GATEWAY_LISTEN_ADDRESS) : connectAddress);
+    DAGSetupPluginContext context = new DAGSetupPluginContext(dag, this.conf);
     if (app != null) {
+      pluginManager.dispatch(SETUP, context);
+      pluginManager.dispatch(PRE_POPULATE, context);
       app.populateDAG(dag, conf);
+      pluginManager.dispatch(POST_POPULATE, context);
     }
+    pluginManager.dispatch(PRE_CONFIGURE, context);
     String appAlias = getAppAlias(name);
     String appName = appAlias == null ? name : appAlias;
     List<AppConf> appConfs = stramConf.getMatchingChildConf(appName, StramElement.APPLICATION);
@@ -2240,6 +2271,7 @@ public class LogicalPlanConfiguration
     // inject external operator configuration
     setOperatorConfiguration(dag, appConfs, appName);
     setStreamConfiguration(dag, appConfs, appName);
+    pluginManager.dispatch(POST_CONFIGURE, context);
   }
 
   private void flattenDAG(LogicalPlan dag, Configuration conf)

http://git-wip-us.apache.org/repos/asf/apex-core/blob/8d46cc6d/engine/src/main/java/org/apache/apex/engine/util/StreamingAppFactory.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/org/apache/apex/engine/util/StreamingAppFactory.java b/engine/src/main/java/org/apache/apex/engine/util/StreamingAppFactory.java
index 96edc8d..02c0910 100644
--- a/engine/src/main/java/org/apache/apex/engine/util/StreamingAppFactory.java
+++ b/engine/src/main/java/org/apache/apex/engine/util/StreamingAppFactory.java
@@ -44,9 +44,7 @@ public abstract class StreamingAppFactory implements StramAppLauncher.AppFactory
 
   protected LogicalPlan createApp(StreamingApplication app, LogicalPlanConfiguration planConfig)
   {
-    LogicalPlan dag = new LogicalPlan();
-    planConfig.prepareDAG(dag, app, getName());
-    return dag;
+    return planConfig.createFromStreamingApplication(app, getName());
   }
 
   public String getName()

http://git-wip-us.apache.org/repos/asf/apex-core/blob/8d46cc6d/engine/src/test/java/com/datatorrent/stram/plan/logical/DAGSetupPluginTests.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/DAGSetupPluginTests.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/DAGSetupPluginTests.java
new file mode 100644
index 0000000..25201eb
--- /dev/null
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/DAGSetupPluginTests.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.stram.plan.logical;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.apex.engine.util.StreamingAppFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.stram.StramUtils;
+import com.datatorrent.stram.client.StramAppLauncher;
+import com.datatorrent.stram.engine.GenericTestOperator;
+import com.datatorrent.stram.engine.TestGeneratorInputOperator;
+
+public class DAGSetupPluginTests
+{
+
+  public static class Application implements StreamingApplication
+  {
+    @Override
+    public void populateDAG(DAG dag, Configuration conf)
+    {
+      TestGeneratorInputOperator inputOperator = dag.addOperator("inputOperator", new TestGeneratorInputOperator());
+      GenericTestOperator operator1 = dag.addOperator("operator1", new GenericTestOperator());
+      GenericTestOperator operator2 = dag.addOperator("operator2", new GenericTestOperator());
+      GenericTestOperator operator3 = dag.addOperator("operator3", new GenericTestOperator());
+      GenericTestOperator operator4 = dag.addOperator("operator4", new GenericTestOperator());
+
+      dag.addStream("n1n2", operator1.outport1, operator2.inport1);
+      dag.addStream("inputStream", inputOperator.outport, operator1.inport1, operator3.inport1, operator4.inport1);
+    }
+  }
+
+  private Configuration getConfiguration()
+  {
+    Configuration conf = new Configuration();
+    conf.set(DAGSetupPluginManager.DAGSETUP_PLUGINS_CONF_KEY, "com.datatorrent.stram.plan.logical.PropertyInjectorVisitor");
+    conf.set("propertyVisitor.Path","/visitortests.properties");
+    return conf;
+  }
+
+  @Test
+  public void testJavaApplication()
+  {
+    Configuration conf = getConfiguration();
+    StreamingAppFactory factory  = new StreamingAppFactory(Application.class.getName(), Application.class)
+    {
+      @Override
+      public LogicalPlan createApp(LogicalPlanConfiguration planConfig)
+      {
+        Class<? extends StreamingApplication> c = StramUtils.classForName(Application.class.getName(), StreamingApplication.class);
+        StreamingApplication app = StramUtils.newInstance(c);
+        return super.createApp(app, planConfig);
+      }
+    };
+    LogicalPlan dag = factory.createApp(new LogicalPlanConfiguration(conf));
+    validateProperties(dag);
+  }
+
+  @Test
+  public void testPropertyFileApp() throws IOException
+  {
+    File tempFile = File.createTempFile("testTopology", "properties");
+    org.apache.commons.io.IOUtils.copy(getClass().getResourceAsStream("/testTopology.properties"), new FileOutputStream(tempFile));
+    StramAppLauncher.PropertyFileAppFactory factory = new StramAppLauncher.PropertyFileAppFactory(tempFile);
+    Configuration conf = getConfiguration();
+    LogicalPlan dag = factory.createApp(new LogicalPlanConfiguration(conf));
+    validateProperties(dag);
+    tempFile.delete();
+  }
+
+  @Test
+  public void testJsonFileApp() throws IOException
+  {
+    File tempFile = File.createTempFile("testTopology", "json");
+    org.apache.commons.io.IOUtils.copy(getClass().getResourceAsStream("/testTopology.json"), new FileOutputStream(tempFile));
+    StramAppLauncher.JsonFileAppFactory factory = new StramAppLauncher.JsonFileAppFactory(tempFile);
+    Configuration conf = getConfiguration();
+    LogicalPlan dag = factory.createApp(new LogicalPlanConfiguration(conf));
+    validateProperties(dag);
+    tempFile.delete();
+  }
+
+  protected void validateProperties(LogicalPlan dag)
+  {
+    String[] operators = new String[]{"operator1", "operator2", "operator3", "operator4"};
+    for (String name : operators) {
+      GenericTestOperator op = (GenericTestOperator)dag.getOperatorMeta(name).getOperator();
+      Assert.assertEquals("property set on operator ", op.getMyStringProperty(), "mynewstringvalue");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/8d46cc6d/engine/src/test/java/com/datatorrent/stram/plan/logical/PropertyInjectorVisitor.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/PropertyInjectorVisitor.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/PropertyInjectorVisitor.java
new file mode 100644
index 0000000..d2bd927
--- /dev/null
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/PropertyInjectorVisitor.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.stram.plan.logical;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import javax.validation.ValidationException;
+
+import org.slf4j.Logger;
+
+import org.apache.apex.api.DAGSetupPlugin;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Operator;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+public class PropertyInjectorVisitor implements DAGSetupPlugin
+{
+  private static final Logger LOG = getLogger(PropertyInjectorVisitor.class);
+
+  private String path;
+  private Map<String, String> propertyMap = new HashMap<>();
+  private DAG dag;
+
+  @Override
+  public void setup(DAGSetupPluginContext context)
+  {
+    this.dag = context.getDAG();
+    try {
+      this.path = context.getConfiguration().get("propertyVisitor.Path");
+      Properties properties = new Properties();
+      properties.load(this.getClass().getResourceAsStream(path));
+      for (Map.Entry<Object, Object> entry : properties.entrySet()) {
+        propertyMap.put(entry.getKey().toString(), entry.getValue().toString());
+      }
+    } catch (IOException ex) {
+      throw new ValidationException("Not able to load input file " + path);
+    }
+  }
+
+  @Override
+  public void prePopulateDAG()
+  {
+
+  }
+
+  @Override
+  public void postPopulateDAG()
+  {
+
+  }
+
+  @Override
+  public void preConfigureDAG()
+  {
+
+  }
+
+  @Override
+  public void postConfigureDAG()
+  {
+
+  }
+
+  @Override
+  public void preValidateDAG()
+  {
+    for (DAG.OperatorMeta ometa : dag.getAllOperatorsMeta()) {
+      Operator o = ometa.getOperator();
+      LogicalPlanConfiguration.setOperatorProperties(o, propertyMap);
+    }
+  }
+
+  @Override
+  public void postValidateDAG()
+  {
+
+  }
+
+  public PropertyInjectorVisitor()
+  {
+  }
+
+  public String getPath()
+  {
+    return path;
+  }
+
+  public void setPath(String path)
+  {
+    this.path = path;
+  }
+
+  @Override
+  public void teardown()
+  {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/8d46cc6d/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java b/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java
index 6552015..9ac28c0 100644
--- a/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/webapp/OperatorDiscoveryTest.java
@@ -918,6 +918,15 @@ public class OperatorDiscoveryTest
 
   }
 
+  public static class InputTestOperator<T, Z extends Map<String, Number>> extends TestOperator<T,Z> implements InputOperator
+  {
+    @Override
+    public void emitTuples()
+    {
+
+    }
+  }
+
   static class ExtendedOperator extends TestOperator<String, Map<String, Number>>
   {
   }
@@ -1047,7 +1056,7 @@ public class OperatorDiscoveryTest
   @Test
   public void testLogicalPlanConfiguration() throws Exception
   {
-    TestOperator<String, Map<String, Number>> bean = new TestOperator<String, Map<String, Number>>();
+    TestOperator<String, Map<String, Number>> bean = new InputTestOperator<String, Map<String, Number>>();
     bean.map.put("key1", new Structured());
     bean.stringArray = new String[]{"one", "two", "three"};
     bean.stringList = Lists.newArrayList("four", "five");
@@ -1074,7 +1083,7 @@ public class OperatorDiscoveryTest
     jsonPlan.put("streams", new JSONArray());
     JSONObject jsonOper = new JSONObject();
     jsonOper.put("name", "Test Operator");
-    jsonOper.put("class", TestOperator.class.getName());
+    jsonOper.put("class", InputTestOperator.class.getName());
     jsonOper.put("properties", jsonObj);
     jsonPlan.put("operators", new JSONArray(Lists.newArrayList(jsonOper)));
 
@@ -1083,9 +1092,9 @@ public class OperatorDiscoveryTest
     // create logical plan from the json
     LogicalPlan lp = lpc.createFromJson(jsonPlan, "jsontest");
     OperatorMeta om = lp.getOperatorMeta("Test Operator");
-    Assert.assertTrue(om.getOperator() instanceof TestOperator);
+    Assert.assertTrue(om.getOperator() instanceof InputTestOperator);
     @SuppressWarnings("rawtypes")
-    TestOperator beanBack = (TestOperator)om.getOperator();
+    TestOperator beanBack = (InputTestOperator)om.getOperator();
 
     // The operator deserialized back from json should be same as original operator
     Assert.assertEquals(bean.map, beanBack.map);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/8d46cc6d/engine/src/test/resources/visitortests.properties
----------------------------------------------------------------------
diff --git a/engine/src/test/resources/visitortests.properties b/engine/src/test/resources/visitortests.properties
new file mode 100644
index 0000000..620c53b
--- /dev/null
+++ b/engine/src/test/resources/visitortests.properties
@@ -0,0 +1,20 @@
+##
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+myStringProperty=mynewstringvalue


[2/2] apex-core git commit: Merge branch 'APEXCORE-577' of github.com:tushargosavi/apex-core

Posted by pr...@apache.org.
Merge branch 'APEXCORE-577' of github.com:tushargosavi/apex-core


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/3024b06e
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/3024b06e
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/3024b06e

Branch: refs/heads/master
Commit: 3024b06e16103efbd16a018f7b557dd848476c99
Parents: 10650b3 8d46cc6
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Tue Mar 14 08:00:29 2017 -0700
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Tue Mar 14 08:00:29 2017 -0700

----------------------------------------------------------------------
 .../org/apache/apex/api/DAGSetupPlugin.java     | 137 +++++++++++++++++++
 .../common/util/BaseDAGSetupPlugin.java         |  76 ++++++++++
 .../datatorrent/stram/client/AppPackage.java    |   2 -
 .../plan/logical/DAGSetupPluginManager.java     | 126 +++++++++++++++++
 .../plan/logical/LogicalPlanConfiguration.java  |  66 ++++++---
 .../apex/engine/util/StreamingAppFactory.java   |   4 +-
 .../stram/plan/logical/DAGSetupPluginTests.java | 115 ++++++++++++++++
 .../plan/logical/PropertyInjectorVisitor.java   | 119 ++++++++++++++++
 .../stram/webapp/OperatorDiscoveryTest.java     |  17 ++-
 .../src/test/resources/visitortests.properties  |  20 +++
 10 files changed, 656 insertions(+), 26 deletions(-)
----------------------------------------------------------------------