You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/05/05 06:02:33 UTC

[1/2] incubator-apex-core git commit: APEXCORE-107 Support for adding modules in the DAG property file and json file. Change module API to make module as an operator.

Repository: incubator-apex-core
Updated Branches:
  refs/heads/master c77ea114e -> da46ec186


APEXCORE-107 Support for adding modules in the DAG property file and json file.
Change module API to make module as an operator.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/d4f3a506
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/d4f3a506
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/d4f3a506

Branch: refs/heads/master
Commit: d4f3a506aa200c91a0dc6698645415d54dd67562
Parents: c2903da
Author: Tushar R. Gosavi <tu...@gmail.com>
Authored: Thu May 5 03:24:48 2016 +0530
Committer: Tushar R. Gosavi <tu...@gmail.com>
Committed: Thu May 5 03:24:48 2016 +0530

----------------------------------------------------------------------
 api/src/main/java/com/datatorrent/api/DAG.java  |  20 +-
 .../main/java/com/datatorrent/api/Module.java   |   3 +-
 .../main/java/com/datatorrent/api/Operator.java |   3 +-
 .../stram/codec/LogicalPlanSerializer.java      |   2 +-
 .../stram/plan/logical/LogicalPlan.java         | 198 +++++++------------
 .../plan/logical/LogicalPlanConfiguration.java  |  76 ++++---
 .../stram/plan/logical/Operators.java           |   3 +-
 .../stram/webapp/StramWebServices.java          |   2 +-
 .../plan/logical/module/ModuleAppTest.java      |   2 +-
 .../logical/module/TestModuleExpansion.java     |  73 +++++--
 .../src/test/resources/testModuleTopology.json  | 141 +++++++++++++
 .../resources/testModuleTopology.properties     |  62 ++++++
 12 files changed, 386 insertions(+), 199 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d4f3a506/api/src/main/java/com/datatorrent/api/DAG.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/DAG.java b/api/src/main/java/com/datatorrent/api/DAG.java
index 74448fd..1518fcf 100644
--- a/api/src/main/java/com/datatorrent/api/DAG.java
+++ b/api/src/main/java/com/datatorrent/api/DAG.java
@@ -159,14 +159,6 @@ public interface DAG extends DAGContext, Serializable
     public OutputPortMeta getMeta(Operator.OutputPort<?> port);
   }
 
-  @InterfaceStability.Evolving
-  interface ModuleMeta extends Serializable, Context
-  {
-    String getName();
-
-    Module getModule();
-  }
-
   /**
    * Add new instance of operator under given name to the DAG.
    * The operator class must have a default constructor.
@@ -272,15 +264,17 @@ public interface DAG extends DAGContext, Serializable
    */
   public abstract OperatorMeta getOperatorMeta(String operatorId);
 
-  @InterfaceStability.Evolving
-  ModuleMeta getModuleMeta(String moduleId);
-
   /**
    * <p>getMeta.</p>
    */
   public abstract OperatorMeta getMeta(Operator operator);
 
-  @InterfaceStability.Evolving
-  ModuleMeta getMeta(Module module);
+  /**
+   * Marker interface for the Node in the DAG. Any object which can be added as a Node in the DAG
+   * needs to implement this interface.
+   */
+  interface GenericOperator
+  {
 
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d4f3a506/api/src/main/java/com/datatorrent/api/Module.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/Module.java b/api/src/main/java/com/datatorrent/api/Module.java
index 8a85d8b..d93d16f 100644
--- a/api/src/main/java/com/datatorrent/api/Module.java
+++ b/api/src/main/java/com/datatorrent/api/Module.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 
 import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DAG.GenericOperator;
 import com.datatorrent.api.Operator.InputPort;
 import com.datatorrent.api.Operator.OutputPort;
 import com.datatorrent.api.Operator.Unifier;
@@ -36,7 +37,7 @@ import com.datatorrent.api.Operator.Unifier;
  * @since 3.3.0
  */
 @InterfaceStability.Evolving
-public interface Module
+public interface Module extends GenericOperator
 {
   void populateDAG(DAG dag, Configuration conf);
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d4f3a506/api/src/main/java/com/datatorrent/api/Operator.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/Operator.java b/api/src/main/java/com/datatorrent/api/Operator.java
index d4a6a90..c016799 100644
--- a/api/src/main/java/com/datatorrent/api/Operator.java
+++ b/api/src/main/java/com/datatorrent/api/Operator.java
@@ -20,6 +20,7 @@ package com.datatorrent.api;
 
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DAG.GenericOperator;
 
 /**
  * <p>
@@ -27,7 +28,7 @@ import com.datatorrent.api.Context.PortContext;
  *
  * @since 0.3.2
  */
-public interface Operator extends Component<OperatorContext>
+public interface Operator extends Component<OperatorContext>, GenericOperator
 {
   /**
    * One can set attribute on an Operator to indicate the mode in which it processes Tuples.

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d4f3a506/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java b/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java
index 9139566..6607321 100644
--- a/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java
+++ b/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java
@@ -362,7 +362,7 @@ public class LogicalPlanSerializer extends JsonSerializer<LogicalPlan>
     Map<String, Object> moduleDetailMap = new HashMap<>();
     ArrayList<String> operatorArray = new ArrayList<>();
     moduleDetailMap.put("name", moduleMeta.getName());
-    moduleDetailMap.put("className", moduleMeta.getModule().getClass().getName());
+    moduleDetailMap.put("className", moduleMeta.getGenericOperator().getClass().getName());
 
     moduleDetailMap.put("operators", operatorArray);
     for (OperatorMeta operatorMeta : moduleMeta.getDag().getAllOperators()) {

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d4f3a506/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 15969b7..af6b1bc 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -63,13 +63,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.commons.io.input.ClassLoaderObjectInputStream;
+import org.apache.commons.lang.ClassUtils;
 import org.apache.commons.lang.builder.ToStringBuilder;
 import org.apache.commons.lang.builder.ToStringStyle;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
-import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Sets;
 
 import com.datatorrent.api.AffinityRule;
@@ -195,7 +195,6 @@ public class LogicalPlan implements Serializable, DAG
   private final List<OperatorMeta> rootOperators = new ArrayList<>();
   private final List<OperatorMeta> leafOperators = new ArrayList<>();
   private final Attribute.AttributeMap attributes = new DefaultAttributeMap();
-  private transient Map<String, ArrayListMultimap<OutputPort<?>, InputPort<?>>> streamLinks = new HashMap<>();
 
   @Override
   public Attribute.AttributeMap getAttributes()
@@ -489,6 +488,10 @@ public class LogicalPlan implements Serializable, DAG
     @Override
     public StreamMeta setSource(Operator.OutputPort<?> port)
     {
+      if (port instanceof ProxyOutputPort) {
+        proxySource = port;
+        return this;
+      }
       OutputPortMeta portMeta = assertGetPortMeta(port);
       OperatorMeta om = portMeta.getOperatorMeta();
       if (om.outputStreams.containsKey(portMeta)) {
@@ -508,6 +511,10 @@ public class LogicalPlan implements Serializable, DAG
     @Override
     public StreamMeta addSink(Operator.InputPort<?> port)
     {
+      if (port instanceof ProxyInputPort) {
+        proxySinks.add(port);
+        return this;
+      }
       InputPortMeta portMeta = assertGetPortMeta(port);
       OperatorMeta om = portMeta.getOperatorWrapper();
       String portName = portMeta.getPortName();
@@ -771,12 +778,40 @@ public class LogicalPlan implements Serializable, DAG
         removeOperator(persistOpMeta.getOperator());
       }
     }
+
+    private OutputPort<?> proxySource = null;
+    private List<InputPort<?>> proxySinks = new ArrayList<>();
+
+    /**
+     * Go over each Proxy port and find out the actual port connected to the ProxyPort
+     * and update StreamMeta.
+     */
+    private void resolvePorts()
+    {
+      if (proxySource != null && proxySource instanceof ProxyOutputPort) {
+        OutputPort<?> outputPort = proxySource;
+        while (outputPort instanceof ProxyOutputPort) {
+          outputPort = ((ProxyOutputPort<?>)outputPort).get();
+        }
+        setSource(outputPort);
+      }
+
+      for (InputPort<?> inputPort : proxySinks) {
+        while (inputPort instanceof ProxyInputPort) {
+          inputPort = ((ProxyInputPort<?>)inputPort).get();
+        }
+        addSink(inputPort);
+      }
+
+      proxySource = null;
+      proxySinks.clear();
+    }
   }
 
   /**
    * Operator meta object.
    */
-  public final class OperatorMeta implements DAG.OperatorMeta, Serializable
+  public class OperatorMeta implements DAG.OperatorMeta, Serializable
   {
     private final LinkedHashMap<InputPortMeta, StreamMeta> inputStreams = new LinkedHashMap<>();
     private final LinkedHashMap<OutputPortMeta, StreamMeta> outputStreams = new LinkedHashMap<>();
@@ -789,7 +824,7 @@ public class LogicalPlan implements Serializable, DAG
     private final LogicalOperatorStatus status;
     private transient Integer nindex; // for cycle detection
     private transient Integer lowlink; // for cycle detection
-    private transient Operator operator;
+    private transient GenericOperator operator;
     private MetricAggregatorMeta metricAggregatorMeta;
     private String moduleName;  // Name of the module which has this operator. null if this is a top level operator.
 
@@ -799,13 +834,14 @@ public class LogicalPlan implements Serializable, DAG
      *  other value => represents the root oio node for this node
      */
     private transient Integer oioRoot = null;
+    private ClassUtils genricOperator;
 
-    private OperatorMeta(String name, Operator operator)
+    private OperatorMeta(String name, GenericOperator operator)
     {
       this(name, operator, new DefaultAttributeMap());
     }
 
-    private OperatorMeta(String name, Operator operator, Attribute.AttributeMap attributeMap)
+    private OperatorMeta(String name, GenericOperator operator, Attribute.AttributeMap attributeMap)
     {
       LOG.debug("Initializing {} as {}", name, operator.getClass().getName());
       this.operatorAnnotation = operator.getClass().getAnnotation(OperatorAnnotation.class);
@@ -858,7 +894,7 @@ public class LogicalPlan implements Serializable, DAG
       input.defaultReadObject();
       // TODO: not working because - we don't have the storage agent in parent attribuet map
       //operator = (Operator)getValue2(OperatorContext.STORAGE_AGENT).load(id, Checkpoint.STATELESS_CHECKPOINT_WINDOW_ID);
-      operator = (Operator)FSStorageAgent.retrieve(input);
+      operator = (GenericOperator)FSStorageAgent.retrieve(input);
     }
 
     @Override
@@ -1000,6 +1036,7 @@ public class LogicalPlan implements Serializable, DAG
       }
     }
 
+
     private class PortMapping implements Operators.OperatorDescriptor
     {
       private final Map<Operator.InputPort<?>, InputPortMeta> inPortMap = new HashMap<>();
@@ -1113,6 +1150,11 @@ public class LogicalPlan implements Serializable, DAG
     @Override
     public Operator getOperator()
     {
+      return (Operator)operator;
+    }
+
+    public GenericOperator getGenericOperator()
+    {
       return operator;
     }
 
@@ -1198,90 +1240,28 @@ public class LogicalPlan implements Serializable, DAG
   /**
    * Module meta object.
    */
-  public final class ModuleMeta implements DAG.ModuleMeta, Serializable
+  public final class ModuleMeta extends OperatorMeta implements Serializable
   {
-    private final LinkedHashMap<InputPortMeta, StreamMeta> inputStreams = new LinkedHashMap<>();
-    private final LinkedHashMap<OutputPortMeta, StreamMeta> outputStreams = new LinkedHashMap<>();
-    private final Attribute.AttributeMap attributes;
-    @NotNull
-    private String name;
-    private transient Module module;
     private ModuleMeta parent;
     private LogicalPlan dag = null;
     private transient String fullName;
+    //type-casted reference to the module.
+    private transient Module module;
+    private transient boolean flattened = false;
 
     private ModuleMeta(String name, Module module)
     {
-      LOG.debug("Initializing {} as {}", name, module.getClass().getName());
-      this.name = name;
+      super(name, module);
       this.module = module;
-      this.attributes = new DefaultAttributeMap();
+      LOG.debug("Initializing {} as {}", name, module.getClass().getName());
       this.dag = new LogicalPlan();
     }
 
-    @Override
-    public String getName()
-    {
-      return name;
-    }
-
-    @Override
-    public Module getModule()
-    {
-      return module;
-    }
-
-    @Override
-    public Attribute.AttributeMap getAttributes()
-    {
-      return attributes;
-    }
-
-    @Override
-    public <T> T getValue(Attribute<T> key)
-    {
-      return attributes.get(key);
-    }
-
-    @Override
-    public void setCounters(Object counters)
-    {
-
-    }
-
-    @Override
-    public void sendMetrics(Collection<String> metricNames)
-    {
-
-    }
-
-    public LinkedHashMap<InputPortMeta, StreamMeta> getInputStreams()
-    {
-      return inputStreams;
-    }
-
-    public LinkedHashMap<OutputPortMeta, StreamMeta> getOutputStreams()
-    {
-      return outputStreams;
-    }
-
     public LogicalPlan getDag()
     {
       return dag;
     }
 
-    private void writeObject(ObjectOutputStream out) throws IOException
-    {
-      out.defaultWriteObject();
-      FSStorageAgent.store(out, module);
-    }
-
-    private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException
-    {
-      input.defaultReadObject();
-      module = (Module)FSStorageAgent.retrieve(input);
-    }
-
     /**
      * Expand the module and add its operator to the parentDAG. After this method finishes the module is expanded fully
      * with all its submodules also expanded. The parentDAG contains the operator added by all the modules.
@@ -1291,6 +1271,10 @@ public class LogicalPlan implements Serializable, DAG
      */
     public void flattenModule(LogicalPlan parentDAG, Configuration conf)
     {
+      if (flattened) {
+        return;
+      }
+
       module.populateDAG(dag, conf);
       for (ModuleMeta subModuleMeta : dag.getAllModules()) {
         subModuleMeta.setParent(this);
@@ -1298,6 +1282,7 @@ public class LogicalPlan implements Serializable, DAG
       }
       dag.applyStreamLinks();
       parentDAG.addDAGToCurrentDAG(this);
+      flattened = true;
     }
 
     /**
@@ -1317,9 +1302,9 @@ public class LogicalPlan implements Serializable, DAG
       }
 
       if (parent == null) {
-        fullName = name;
+        fullName = getName();
       } else {
-        fullName = parent.getFullName() + MODULE_NAMESPACE_SEPARATOR + name;
+        fullName = parent.getFullName() + MODULE_NAMESPACE_SEPARATOR + getName();
       }
       return fullName;
     }
@@ -1401,53 +1386,25 @@ public class LogicalPlan implements Serializable, DAG
   public <T> StreamMeta addStream(String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T>... sinks)
   {
     StreamMeta s = addStream(id);
-    id = s.id;
-    ArrayListMultimap<OutputPort<?>, InputPort<?>> streamMap = ArrayListMultimap.create();
-    if (!(source instanceof ProxyOutputPort)) {
-      s.setSource(source);
-    }
+    s.setSource(source);
     for (Operator.InputPort<?> sink : sinks) {
-      if (source instanceof ProxyOutputPort || sink instanceof ProxyInputPort) {
-        streamMap.put(source, sink);
-        streamLinks.put(id, streamMap);
-      } else {
-        if (s.getSource() == null) {
-          s.setSource(source);
-        }
-        s.addSink(sink);
-      }
+      s.addSink(sink);
     }
     return s;
   }
 
   /**
-   * This will be called once the Logical Dag is expanded, and the proxy input and proxy output ports are populated with
-   * the actual ports that they refer to This method adds sources and sinks for the StreamMeta objects which were left
-   * empty in the addStream call.
+   * This will be called once the Logical Dag is expanded, and proxy input and proxy output ports are populated
+   * with actual ports they refer to.
    */
   public void applyStreamLinks()
   {
-    for (String id : streamLinks.keySet()) {
-      StreamMeta s = getStream(id);
-      for (Map.Entry<Operator.OutputPort<?>, Operator.InputPort<?>> pair : streamLinks.get(id).entries()) {
-        if (s.getSource() == null) {
-          Operator.OutputPort<?> outputPort = pair.getKey();
-          while (outputPort instanceof ProxyOutputPort) {
-            outputPort = ((ProxyOutputPort<?>)outputPort).get();
-          }
-          s.setSource(outputPort);
-        }
-
-        Operator.InputPort<?> inputPort = pair.getValue();
-        while (inputPort instanceof ProxyInputPort) {
-          inputPort = ((ProxyInputPort<?>)inputPort).get();
-        }
-        s.addSink(inputPort);
-      }
+    for (StreamMeta smeta : streams.values()) {
+      smeta.resolvePorts();
     }
   }
 
-  @SuppressWarnings({ "unchecked", "rawtypes" })
+  @SuppressWarnings({"unchecked", "rawtypes"})
   private void addDAGToCurrentDAG(ModuleMeta moduleMeta)
   {
     LogicalPlan subDag = moduleMeta.getDag();
@@ -1594,12 +1551,6 @@ public class LogicalPlan implements Serializable, DAG
   }
 
   @Override
-  public ModuleMeta getModuleMeta(String moduleName)
-  {
-    return this.modules.get(moduleName);
-  }
-
-  @Override
   public OperatorMeta getMeta(Operator operator)
   {
     // TODO: cache mapping
@@ -1611,17 +1562,6 @@ public class LogicalPlan implements Serializable, DAG
     throw new IllegalArgumentException("Operator not associated with the DAG: " + operator);
   }
 
-  @Override
-  public ModuleMeta getMeta(Module module)
-  {
-    for (ModuleMeta m : getAllModules()) {
-      if (m.module == module) {
-        return m;
-      }
-    }
-    throw new IllegalArgumentException("Module not associated with the DAG: " + module);
-  }
-
   public int getMaxContainerCount()
   {
     return this.getValue(CONTAINERS_MAX_COUNT);

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d4f3a506/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
index 628fce2..bab414f 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
@@ -69,6 +69,8 @@ import com.datatorrent.api.Context.DAGContext;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.Context.PortContext;
 import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.GenericOperator;
+import com.datatorrent.api.Module;
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.StringCodec;
@@ -2111,28 +2113,28 @@ public class LogicalPlanConfiguration
 
     Map<String, OperatorConf> operators = appConf.getChildren(StramElement.OPERATOR);
 
-    Map<OperatorConf, Operator> nodeMap = Maps.newHashMapWithExpectedSize(operators.size());
+    Map<OperatorConf, GenericOperator> nodeMap = Maps.newHashMapWithExpectedSize(operators.size());
     // add all operators first
     for (Map.Entry<String, OperatorConf> nodeConfEntry : operators.entrySet()) {
       OperatorConf nodeConf = nodeConfEntry.getValue();
       if (!WILDCARD.equals(nodeConf.id)) {
-        Class<? extends Operator> nodeClass = StramUtils.classForName(nodeConf.getClassNameReqd(), Operator.class);
+        Class<? extends GenericOperator> nodeClass = StramUtils.classForName(nodeConf.getClassNameReqd(), GenericOperator.class);
         String optJson = nodeConf.getProperties().get(nodeClass.getName());
-        Operator nd = null;
+        GenericOperator operator = null;
         try {
           if (optJson != null) {
             // if there is a special key which is the class name, it means the operator is serialized in json format
             ObjectMapper mapper = ObjectMapperFactory.getOperatorValueDeserializer();
-            nd = mapper.readValue("{\"" + nodeClass.getName() + "\":" + optJson + "}", nodeClass);
-            dag.addOperator(nodeConfEntry.getKey(), nd);
+            operator = mapper.readValue("{\"" + nodeClass.getName() + "\":" + optJson + "}", nodeClass);
+            addOperator(dag, nodeConfEntry.getKey(), operator);
           } else {
-            nd = dag.addOperator(nodeConfEntry.getKey(), nodeClass);
+            operator = addOperator(dag, nodeConfEntry.getKey(), nodeClass);
           }
-          setOperatorProperties(nd, nodeConf.getProperties());
+          setOperatorProperties(operator, nodeConf.getProperties());
         } catch (IOException e) {
           throw new IllegalArgumentException("Error setting operator properties " + e.getMessage(), e);
         }
-        nodeMap.put(nodeConf, nd);
+        nodeMap.put(nodeConf, operator);
       }
     }
 
@@ -2157,7 +2159,7 @@ public class LogicalPlanConfiguration
             portName = e.getKey();
           }
         }
-        Operator sourceDecl = nodeMap.get(streamConf.sourceNode);
+        GenericOperator sourceDecl = nodeMap.get(streamConf.sourceNode);
         Operators.PortMappingDescriptor sourcePortMap = new Operators.PortMappingDescriptor();
         Operators.describe(sourceDecl, sourcePortMap);
         sd.setSource(sourcePortMap.outputPorts.get(portName).component);
@@ -2174,7 +2176,7 @@ public class LogicalPlanConfiguration
             portName = e.getKey();
           }
         }
-        Operator targetDecl = nodeMap.get(targetNode);
+        GenericOperator targetDecl = nodeMap.get(targetNode);
         Operators.PortMappingDescriptor targetPortMap = new Operators.PortMappingDescriptor();
         Operators.describe(targetDecl, targetPortMap);
         sd.addSink(targetPortMap.inputPorts.get(portName).component);
@@ -2187,6 +2189,27 @@ public class LogicalPlanConfiguration
 
   }
 
+  private GenericOperator addOperator(LogicalPlan dag, String name, GenericOperator operator)
+  {
+    if (operator instanceof Module) {
+      dag.addModule(name, (Module)operator);
+    } else if (operator instanceof Operator) {
+      dag.addOperator(name, (Operator)operator);
+    }
+    return operator;
+  }
+
+
+  private GenericOperator addOperator(LogicalPlan dag, String name, Class<?> clazz)
+  {
+    if (Module.class.isAssignableFrom(clazz)) {
+      return dag.addModule(name, (Class<Module>)clazz);
+    } else if (Operator.class.isAssignableFrom(clazz)) {
+      return dag.addOperator(name, (Class<Operator>)clazz);
+    }
+    return null;
+  }
+
   /**
    * Populate the logical plan from the streaming application definition and configuration.
    * Configuration is resolved based on application alias, if any.
@@ -2323,12 +2346,7 @@ public class LogicalPlanConfiguration
 
   private PropertyArgs getPropertyArgs(OperatorMeta om)
   {
-    return new PropertyArgs(om.getName(), om.getOperator().getClass().getName());
-  }
-
-  private PropertyArgs getPropertyArgs(ModuleMeta mm)
-  {
-    return new PropertyArgs(mm.getName(), mm.getModule().getClass().getName());
+    return new PropertyArgs(om.getName(), om.getGenericOperator().getClass().getName());
   }
 
   /**
@@ -2361,7 +2379,7 @@ public class LogicalPlanConfiguration
    * @param properties
    * @return Operator
    */
-  public static Operator setOperatorProperties(Operator operator, Map<String, String> properties)
+  public static GenericOperator setOperatorProperties(GenericOperator operator, Map<String, String> properties)
   {
     try {
       // populate custom opProps
@@ -2372,26 +2390,6 @@ public class LogicalPlanConfiguration
     }
   }
 
-  /**
-   * Generic helper function to inject properties on the object.
-   *
-   * @param obj
-   * @param properties
-   * @param <T>
-   * @return
-   */
-  public static <T> T setObjectProperties(T obj, Map<String, String> properties)
-  {
-    try {
-      BeanUtils.populate(obj, properties);
-      return obj;
-    } catch (IllegalAccessException e) {
-      throw new IllegalArgumentException("Error setting operator properties", e);
-    } catch (InvocationTargetException e) {
-      throw new IllegalArgumentException("Error setting operator properties", e);
-    }
-  }
-
   public static StreamingApplication setApplicationProperties(StreamingApplication application, Map<String, String> properties)
   {
     try {
@@ -2421,7 +2419,7 @@ public class LogicalPlanConfiguration
     for (OperatorMeta ow : dag.getAllOperators()) {
       List<OperatorConf> opConfs = getMatchingChildConf(appConfs, ow.getName(), StramElement.OPERATOR);
       Map<String, String> opProps = getProperties(getPropertyArgs(ow), opConfs, applicationName);
-      setOperatorProperties(ow.getOperator(), opProps);
+      setOperatorProperties(ow.getGenericOperator(), opProps);
     }
   }
 
@@ -2502,7 +2500,7 @@ public class LogicalPlanConfiguration
     for (final ModuleMeta mw : dag.getAllModules()) {
       List<OperatorConf> opConfs = getMatchingChildConf(appConfs, mw.getName(), StramElement.OPERATOR);
       Map<String, String> opProps = getProperties(getPropertyArgs(mw), opConfs, appName);
-      setObjectProperties(mw.getModule(), opProps);
+      setOperatorProperties(mw.getGenericOperator(), opProps);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d4f3a506/engine/src/main/java/com/datatorrent/stram/plan/logical/Operators.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/Operators.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/Operators.java
index 5da7383..7bb4c39 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/Operators.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/Operators.java
@@ -22,6 +22,7 @@ import java.lang.reflect.Field;
 import java.util.LinkedHashMap;
 
 import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.DAG.GenericOperator;
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.Operator.InputPort;
 import com.datatorrent.api.Operator.OutputPort;
@@ -80,7 +81,7 @@ public abstract class Operators
     }
   }
 
-  public static void describe(Operator operator, OperatorDescriptor descriptor)
+  public static void describe(GenericOperator operator, OperatorDescriptor descriptor)
   {
     for (Class<?> c = operator.getClass(); c != Object.class; c = c.getSuperclass()) {
       Field[] fields = c.getDeclaredFields();

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d4f3a506/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java b/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java
index 52be922..f09a53e 100644
--- a/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java
+++ b/engine/src/main/java/com/datatorrent/stram/webapp/StramWebServices.java
@@ -828,7 +828,7 @@ public class StramWebServices
       if (logicalModule == null) {
         throw new NotFoundException();
       }
-      operatorProperties = LogicalPlanConfiguration.getObjectProperties(logicalModule.getModule());
+      operatorProperties = LogicalPlanConfiguration.getObjectProperties(logicalModule.getOperator());
     } else {
       operatorProperties = LogicalPlanConfiguration.getObjectProperties(logicalOperator.getOperator());
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d4f3a506/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java
index 97c015e..1966678 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java
@@ -118,7 +118,7 @@ public class ModuleAppTest
   static class TestModule implements Module
   {
 
-    public transient ProxyInputPort<Integer> moduleInput = new Module.ProxyInputPort<Integer>();
+    public transient ProxyInputPort<Integer> moduleInput = new ProxyInputPort<Integer>();
     public transient ProxyOutputPort<Integer> moduleOutput = new Module.ProxyOutputPort<Integer>();
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d4f3a506/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java
index d5af67b..97a375f 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java
@@ -18,16 +18,22 @@
  */
 package com.datatorrent.stram.plan.logical.module;
 
+import java.io.IOException;
+import java.io.InputStream;
 import java.io.Serializable;
+import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Random;
 
+import org.codehaus.jettison.json.JSONObject;
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 
 import com.datatorrent.api.Attribute;
@@ -48,7 +54,7 @@ import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
 
 public class TestModuleExpansion
 {
-  static class DummyInputOperator extends BaseOperator implements InputOperator
+  public static class DummyInputOperator extends BaseOperator implements InputOperator
   {
     private int inputOperatorProp = 0;
 
@@ -72,7 +78,7 @@ public class TestModuleExpansion
     }
   }
 
-  static class DummyOperator extends BaseOperator
+  public static class DummyOperator extends BaseOperator
   {
     private int operatorProp = 0;
 
@@ -104,7 +110,7 @@ public class TestModuleExpansion
     }
   }
 
-  static class TestPartitioner implements Partitioner<DummyOperator>, Serializable
+  public static class TestPartitioner implements Partitioner<DummyOperator>, Serializable
   {
     @Override
     public Collection<Partition<DummyOperator>> definePartitions(Collection<Partition<DummyOperator>> partitions, PartitioningContext context)
@@ -121,7 +127,7 @@ public class TestModuleExpansion
     }
   }
 
-  static class Level1Module implements Module
+  public static class Level1Module implements Module
   {
     private int level1ModuleProp = 0;
 
@@ -184,7 +190,7 @@ public class TestModuleExpansion
     }
   }
 
-  static class Level2ModuleA implements Module
+  public static class Level2ModuleA implements Module
   {
     private int level2ModuleAProp1 = 0;
     private int level2ModuleAProp2 = 0;
@@ -253,7 +259,7 @@ public class TestModuleExpansion
     }
   }
 
-  static class Level2ModuleB implements Module
+  public static class Level2ModuleB implements Module
   {
     private int level2ModuleBProp1 = 0;
     private int level2ModuleBProp2 = 0;
@@ -321,7 +327,7 @@ public class TestModuleExpansion
     }
   }
 
-  static class Level3Module implements Module
+  public static class Level3Module implements Module
   {
 
     public final transient ProxyInputPort<Integer> mIn = new ProxyInputPort<>();
@@ -344,7 +350,7 @@ public class TestModuleExpansion
     }
   }
 
-  static class NestedModuleApp implements StreamingApplication
+  public static class NestedModuleApp implements StreamingApplication
   {
     @Override
     public void populateDAG(DAG dag, Configuration conf)
@@ -564,10 +570,6 @@ public class TestModuleExpansion
     Assert.assertTrue(moduleNames.contains("Me"));
     Assert.assertEquals("Number of modules are 5", 5, dag.getAllModules().size());
 
-    // correct module meta is returned by getMeta call.
-    LogicalPlan.ModuleMeta m = dag.getModuleMeta("Ma");
-    Assert.assertEquals("Name of module is Ma", m.getName(), "Ma");
-
   }
 
   private static String componentName(String... names)
@@ -671,4 +673,51 @@ public class TestModuleExpansion
     Assert.assertEquals("Locality is " + locality, meta.getLocality(), locality);
   }
 
+  @Test
+  public void testLoadFromPropertiesFile() throws IOException
+  {
+    Properties props = new Properties();
+    String resourcePath = "/testModuleTopology.properties";
+    InputStream is = this.getClass().getResourceAsStream(resourcePath);
+    if (is == null) {
+      throw new RuntimeException("Could not load " + resourcePath);
+    }
+    props.load(is);
+    LogicalPlanConfiguration pb = new LogicalPlanConfiguration(new Configuration(false))
+        .addFromProperties(props, null);
+
+    LogicalPlan dag = new LogicalPlan();
+    pb.populateDAG(dag);
+    pb.prepareDAG(dag, null, "testApplication");
+    dag.validate();
+    validateTopLevelOperators(dag);
+    validateTopLevelStreams(dag);
+    validatePublicMethods(dag);
+  }
+
+  @Test
+  public void testLoadFromJson() throws Exception
+  {
+    String resourcePath = "/testModuleTopology.json";
+    InputStream is = this.getClass().getResourceAsStream(resourcePath);
+    if (is == null) {
+      throw new RuntimeException("Could not load " + resourcePath);
+    }
+    StringWriter writer = new StringWriter();
+
+    IOUtils.copy(is, writer);
+    JSONObject json = new JSONObject(writer.toString());
+
+    Configuration conf = new Configuration(false);
+    conf.set(StreamingApplication.DT_PREFIX + "operator.operator3.prop.myStringProperty", "o3StringFromConf");
+
+    LogicalPlanConfiguration planConf = new LogicalPlanConfiguration(conf);
+    LogicalPlan dag = planConf.createFromJson(json, "testLoadFromJson");
+    planConf.prepareDAG(dag, null, "testApplication");
+    dag.validate();
+    validateTopLevelOperators(dag);
+    validateTopLevelStreams(dag);
+    validatePublicMethods(dag);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d4f3a506/engine/src/test/resources/testModuleTopology.json
----------------------------------------------------------------------
diff --git a/engine/src/test/resources/testModuleTopology.json b/engine/src/test/resources/testModuleTopology.json
new file mode 100644
index 0000000..8b2087a
--- /dev/null
+++ b/engine/src/test/resources/testModuleTopology.json
@@ -0,0 +1,141 @@
+{
+  "operators": [
+    {
+      "name": "O1",
+      "class": "com.datatorrent.stram.plan.logical.module.TestModuleExpansion$DummyInputOperator",
+      "properties": {
+        "com.datatorrent.stram.plan.logical.module.TestModuleExpansion$DummyInputOperator": {
+          "inputOperatorProp": "1"
+        }
+      }
+    },
+    {
+      "name": "O2",
+      "class": "com.datatorrent.stram.plan.logical.module.TestModuleExpansion$DummyOperator",
+      "properties": {
+        "com.datatorrent.stram.plan.logical.module.TestModuleExpansion$DummyOperator": {
+          "operatorProp": "2"
+        }
+      }
+    },
+    {
+      "name": "Ma",
+      "class": "com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleA",
+      "properties": {
+        "com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleA": {
+          "level2ModuleAProp1": "11",
+          "level2ModuleAProp2": "12",
+          "level2ModuleAProp3": "13"
+        }
+      }
+    },
+    {
+      "name": "Mb",
+      "class": "com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleB",
+      "properties": {
+        "com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleB": {
+          "level2ModuleBProp1": "21",
+          "level2ModuleBProp2": "22",
+          "level2ModuleBProp3": "23"
+        }
+      }
+    },
+    {
+      "name": "Mc",
+      "class": "com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleA",
+      "properties": {
+        "com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleA": {
+          "level2ModuleAProp1": "31",
+          "level2ModuleAProp2": "32",
+          "level2ModuleAProp3": "33"
+        }
+      }
+    },
+    {
+      "name": "Md",
+      "class": "com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleB",
+      "properties": {
+        "com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleB": {
+          "level2ModuleBProp1": "41",
+          "level2ModuleBProp2": "42",
+          "level2ModuleBProp3": "43"
+        }
+      }
+    },
+    {
+      "name": "Me",
+      "class": "com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level3Module"
+    }
+  ],
+  "streams": [
+    {
+      "name": "O1_O2",
+      "source": {
+        "operatorName": "O1",
+        "portName": "out"
+      },
+      "sinks": [
+        {
+          "operatorName": "O2",
+          "portName": "in"
+        },
+        {
+          "operatorName": "Me",
+          "portName": "mIn"
+        }
+      ]
+    },
+    {
+      "name": "O2_Ma",
+      "source": {
+        "operatorName": "O2",
+        "portName": "out1"
+      },
+      "sinks": [
+        {
+          "operatorName": "Ma",
+          "portName": "mIn"
+        }
+      ]
+    },
+    {
+      "name": "Ma_Mb",
+      "source": {
+        "operatorName": "Ma",
+        "portName": "mOut1"
+      },
+      "sinks": [
+        {
+          "operatorName": "Mb",
+          "portName": "mIn"
+        }
+      ]
+    },
+    {
+      "name": "Ma_Md",
+      "source": {
+        "operatorName": "Ma",
+        "portName": "mOut2"
+      },
+      "sinks": [
+        {
+          "operatorName": "Md",
+          "portName": "mIn"
+        }
+      ]
+    },
+    {
+      "name": "Mb_Mc",
+      "source": {
+        "operatorName": "Mb",
+        "portName": "mOut2"
+      },
+      "sinks": [
+        {
+          "operatorName": "Mc",
+          "portName": "mIn"
+        }
+      ]
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d4f3a506/engine/src/test/resources/testModuleTopology.properties
----------------------------------------------------------------------
diff --git a/engine/src/test/resources/testModuleTopology.properties b/engine/src/test/resources/testModuleTopology.properties
new file mode 100644
index 0000000..0679e26
--- /dev/null
+++ b/engine/src/test/resources/testModuleTopology.properties
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# test for defining topology as property file
+dt.operator.O1.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$DummyInputOperator
+dt.operator.O1.inputOperatorProp=1
+
+dt.operator.O2.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$DummyOperator
+dt.operator.O2.operatorProp=2
+
+dt.operator.Ma.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleA
+dt.operator.Ma.level2ModuleAProp1=11
+dt.operator.Ma.level2ModuleAProp2=12
+dt.operator.Ma.level2ModuleAProp3=13
+
+dt.operator.Mb.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleB
+dt.operator.Mb.level2ModuleBProp1=21
+dt.operator.Mb.level2ModuleBProp2=22
+dt.operator.Mb.level2ModuleBProp3=23
+
+dt.operator.Mc.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleA
+dt.operator.Mc.level2ModuleAProp1=31
+dt.operator.Mc.level2ModuleAProp2=32
+dt.operator.Mc.level2ModuleAProp3=33
+
+dt.operator.Md.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level2ModuleB
+dt.operator.Md.level2ModuleBProp1=41
+dt.operator.Md.level2ModuleBProp2=42
+dt.operator.Md.level2ModuleBProp3=43
+
+dt.operator.Me.classname=com.datatorrent.stram.plan.logical.module.TestModuleExpansion$Level3Module
+
+dt.stream.O1_O2.source=O1.out
+dt.stream.O1_O2.sinks=O2.in,Me.mIn
+
+dt.stream.O2_Ma.source=O2.out1
+dt.stream.O2_Ma.sinks=Ma.mIn
+
+dt.stream.Ma_Mb.source=Ma.mOut1
+dt.stream.Ma_Mb.sinks=Mb.mIn
+
+dt.stream.Ma_Md.source=Ma.mOut2
+dt.stream.Ma_Md.sinks=Md.mIn
+
+dt.stream.Mb_Mc.source=Mb.mOut2
+dt.stream.Mb_Mc.sinks=Mc.mIn



[2/2] incubator-apex-core git commit: Merge branch 'APEXCORE-107' of https://github.com/tushargosavi/incubator-apex-core

Posted by th...@apache.org.
Merge branch 'APEXCORE-107' of https://github.com/tushargosavi/incubator-apex-core


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/da46ec18
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/da46ec18
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/da46ec18

Branch: refs/heads/master
Commit: da46ec18624f66afe782714df4a4159d46c18074
Parents: c77ea11 d4f3a50
Author: Thomas Weise <th...@datatorrent.com>
Authored: Wed May 4 22:42:53 2016 -0700
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Wed May 4 22:42:53 2016 -0700

----------------------------------------------------------------------
 api/src/main/java/com/datatorrent/api/DAG.java  |  20 +-
 .../main/java/com/datatorrent/api/Module.java   |   3 +-
 .../main/java/com/datatorrent/api/Operator.java |   3 +-
 .../stram/codec/LogicalPlanSerializer.java      |   2 +-
 .../stram/plan/logical/LogicalPlan.java         | 198 +++++++------------
 .../plan/logical/LogicalPlanConfiguration.java  |  76 ++++---
 .../stram/plan/logical/Operators.java           |   3 +-
 .../stram/webapp/StramWebServices.java          |   2 +-
 .../plan/logical/module/ModuleAppTest.java      |   2 +-
 .../logical/module/TestModuleExpansion.java     |  73 +++++--
 .../src/test/resources/testModuleTopology.json  | 141 +++++++++++++
 .../resources/testModuleTopology.properties     |  62 ++++++
 12 files changed, 386 insertions(+), 199 deletions(-)
----------------------------------------------------------------------