You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/01/29 08:21:31 UTC

[12/50] incubator-apex-core git commit: APEXCORE-104 Added flattening of module into parent DAG

APEXCORE-104 Added flattening of module into parent DAG


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/14a09bb5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/14a09bb5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/14a09bb5

Branch: refs/heads/master
Commit: 14a09bb51cb30585e7979ec022ec762ac2ba91e5
Parents: b0360d4
Author: chinmaykolhatkar <ch...@datatorrent.com>
Authored: Wed Oct 7 15:06:36 2015 +0530
Committer: chinmaykolhatkar <ch...@datatorrent.com>
Committed: Tue Dec 22 01:42:43 2015 +0530

----------------------------------------------------------------------
 api/src/main/java/com/datatorrent/api/DAG.java  |   4 -
 .../stram/plan/logical/LogicalPlan.java         | 169 +++++++++++++++----
 .../plan/logical/LogicalPlanConfiguration.java  |   8 +
 3 files changed, 146 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/14a09bb5/api/src/main/java/com/datatorrent/api/DAG.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/DAG.java b/api/src/main/java/com/datatorrent/api/DAG.java
index 1dce402..7c793f9 100644
--- a/api/src/main/java/com/datatorrent/api/DAG.java
+++ b/api/src/main/java/com/datatorrent/api/DAG.java
@@ -165,10 +165,6 @@ public interface DAG extends DAGContext, Serializable
     String getName();
 
     Module getModule();
-
-    InputPortMeta getMeta(Operator.InputPort<?> port);
-
-    OutputPortMeta getMeta(Operator.OutputPort<?> port);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/14a09bb5/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 53e81bc..5a3e167 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -80,6 +80,7 @@ public class LogicalPlan implements Serializable, DAG
   public static final String SER_FILE_NAME = "dt-conf.ser";
   public static final String LAUNCH_CONFIG_FILE_NAME = "dt-launch-config.xml";
   private static final transient AtomicInteger logicalOperatorSequencer = new AtomicInteger();
+  public static final String MODULE_NAMESPACE_SEPARATOR = "$";
 
   /**
    * Constant
@@ -146,6 +147,7 @@ public class LogicalPlan implements Serializable, DAG
 
   private final Map<String, StreamMeta> streams = new HashMap<String, StreamMeta>();
   private final Map<String, OperatorMeta> operators = new HashMap<String, OperatorMeta>();
+  public final Map<String, ModuleMeta> modules = new LinkedHashMap<>();
   private final List<OperatorMeta> rootOperators = new ArrayList<OperatorMeta>();
   private final Attribute.AttributeMap attributes = new DefaultAttributeMap();
   private transient int nodeIndex = 0; // used for cycle validation
@@ -733,6 +735,7 @@ public class LogicalPlan implements Serializable, DAG
     private transient Integer lowlink; // for cycle detection
     private transient Operator operator;
     private MetricAggregatorMeta metricAggregatorMeta;
+    private String moduleName;  // Name of the module which has this operator. null if this is a top level operator.
 
     /*
      * Used for  OIO validation,
@@ -819,6 +822,16 @@ public class LogicalPlan implements Serializable, DAG
       return metricAggregatorMeta;
     }
 
+    public String getModuleName()
+    {
+      return moduleName;
+    }
+
+    public void setModuleName(String moduleName)
+    {
+      this.moduleName = moduleName;
+    }
+
     protected void populateAggregatorMeta()
     {
       AutoMetric.Aggregator aggregator = getValue(OperatorContext.METRICS_AGGREGATOR);
@@ -1073,37 +1086,38 @@ public class LogicalPlan implements Serializable, DAG
       throw new IllegalArgumentException("duplicate operator id: " + operators.get(name));
     }
 
+    // Avoid name conflict with module.
+    if (modules.containsKey(name))
+      throw new IllegalArgumentException("duplicate operator id: " + operators.get(name));
+
     OperatorMeta decl = new OperatorMeta(name, operator);
     rootOperators.add(decl); // will be removed when a sink is added to an input port for this operator
     operators.put(name, decl);
     return operator;
   }
 
+  /**
+   * Module meta object.
+   */
   public final class ModuleMeta implements DAG.ModuleMeta, Serializable
   {
     private final LinkedHashMap<InputPortMeta, StreamMeta> inputStreams = new LinkedHashMap<>();
     private final LinkedHashMap<OutputPortMeta, StreamMeta> outputStreams = new LinkedHashMap<>();
     private final Attribute.AttributeMap attributes;
-    @SuppressWarnings("unused")
-    private final int id;
     @NotNull
-    private final String name;
-    private transient Integer nindex; // for cycle detection
-    private transient Integer lowlink; // for cycle detection
+    private String name;
     private transient Module module;
+    private ModuleMeta parent;
+    private LogicalPlan dag = null;
+    private transient String fullName;
 
-    public ModuleMeta(String name, Module module)
-    {
-      this(name, module, new DefaultAttributeMap());
-    }
-
-    public ModuleMeta(String name, Module module, DefaultAttributeMap attributeMap)
+    private ModuleMeta(String name, Module module)
     {
       LOG.debug("Initializing {} as {}", name, module.getClass().getName());
       this.name = name;
       this.module = module;
-      this.id = logicalOperatorSequencer.decrementAndGet();
-      this.attributes = attributeMap;
+      this.attributes = new DefaultAttributeMap();
+      this.dag = new LogicalPlan();
     }
 
     @Override
@@ -1119,43 +1133,104 @@ public class LogicalPlan implements Serializable, DAG
     }
 
     @Override
-    public DAG.InputPortMeta getMeta(InputPort<?> port)
+    public Attribute.AttributeMap getAttributes()
     {
-      return null;
+      return attributes;
     }
 
     @Override
-    public DAG.OutputPortMeta getMeta(OutputPort<?> port)
+    public <T> T getValue(Attribute<T> key)
     {
-      return null;
+      return attributes.get(key);
     }
 
     @Override
-    public Attribute.AttributeMap getAttributes()
+    public void setCounters(Object counters)
     {
-      return null;
+
     }
 
     @Override
-    public <T> T getValue(Attribute<T> key)
+    public void sendMetrics(Collection<String> metricNames)
     {
-      return null;
+
     }
 
-    @Override
-    public void setCounters(Object counters)
+    public LinkedHashMap<InputPortMeta, StreamMeta> getInputStreams()
     {
+      return inputStreams;
+    }
 
+    public LinkedHashMap<OutputPortMeta, StreamMeta> getOutputStreams()
+    {
+      return outputStreams;
     }
 
-    @Override
-    public void sendMetrics(Collection<String> metricNames)
+    public LogicalPlan getDag()
     {
+      return dag;
+    }
 
+    private void writeObject(ObjectOutputStream out) throws IOException
+    {
+      out.defaultWriteObject();
+      FSStorageAgent.store(out, module);
     }
-  }
 
-  public transient Map<String, ModuleMeta> modules = Maps.newHashMap();
+    private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException
+    {
+      input.defaultReadObject();
+      module = (Module)FSStorageAgent.retrieve(input);
+    }
+
+    /**
+     * Expand the module and add its operator to the parentDAG. After this method finishes the module is expanded fully
+     * with all its submodules also expanded. The parentDAG contains the operator added by all the modules.
+     *
+     * @param parentDAG parent dag to populate with operators from this and inner modules.
+     * @param conf      configuration object.
+     */
+    public void flattenModule(LogicalPlan parentDAG, org.apache.hadoop.conf.Configuration conf)
+    {
+      module.populateDAG(dag, conf);
+      for (ModuleMeta subModuleMeta : dag.getAllModules()) {
+        subModuleMeta.setParent(this);
+        subModuleMeta.flattenModule(dag, conf);
+      }
+      parentDAG.addDAGToCurrentDAG(this);
+    }
+
+    /**
+     * Return full name of the module. If this is a inner module, i.e module inside of module this method will traverse
+     * till the top level module, and construct the name by concatenating name of modules in the chain in reverse order
+     * separated by MODULE_NAMESPACE_SEPARATO.
+     *
+     * For example If there is module M1, which adds another module M2 in the DAG. Then the full name of the module M2
+     * is ("M1" ++ MODULE_NAMESPACE_SEPARATO + "M2")
+     *
+     * @return full name of the module.
+     */
+    public String getFullName()
+    {
+      if (fullName != null) {
+        return fullName;
+      }
+
+      if (parent == null) {
+        fullName = name;
+      } else {
+        fullName = parent.getFullName() + MODULE_NAMESPACE_SEPARATOR + name;
+      }
+      return fullName;
+    }
+
+    private void setParent(ModuleMeta meta)
+    {
+      this.parent = meta;
+    }
+
+    private static final long serialVersionUID = 7562277769188329223L;
+  }
 
   @Override
   public <T extends Module> T addModule(String name, T module)
@@ -1166,6 +1241,10 @@ public class LogicalPlan implements Serializable, DAG
       }
       throw new IllegalArgumentException("duplicate module is: " + modules.get(name));
     }
+    if (operators.containsKey(name)) {
+      throw new IllegalArgumentException("duplicate module is: " + modules.get(name));
+    }
+
     ModuleMeta meta = new ModuleMeta(name, module);
     modules.put(name, meta);
     return module;
@@ -1228,6 +1307,33 @@ public class LogicalPlan implements Serializable, DAG
     return s;
   }
 
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  private void addDAGToCurrentDAG(ModuleMeta moduleMeta)
+  {
+    LogicalPlan subDag = moduleMeta.getDag();
+    String subDAGName = moduleMeta.getName();
+    String name;
+    for (OperatorMeta operatorMeta : subDag.getAllOperators()) {
+      name = subDAGName + MODULE_NAMESPACE_SEPARATOR + operatorMeta.getName();
+      this.addOperator(name, operatorMeta.getOperator());
+      OperatorMeta operatorMetaNew = this.getOperatorMeta(name);
+      operatorMetaNew.setModuleName(operatorMeta.getModuleName() == null ? subDAGName :
+          subDAGName + MODULE_NAMESPACE_SEPARATOR + operatorMeta.getModuleName());
+    }
+
+    for (StreamMeta streamMeta : subDag.getAllStreams()) {
+      OutputPortMeta sourceMeta = streamMeta.getSource();
+      List<InputPort<?>> ports = new LinkedList<>();
+      for (InputPortMeta inputPortMeta : streamMeta.getSinks()) {
+        ports.add(inputPortMeta.getPortObject());
+      }
+      InputPort[] inputPorts = ports.toArray(new InputPort[]{});
+
+      name = subDAGName + MODULE_NAMESPACE_SEPARATOR + streamMeta.getName();
+      this.addStream(name, sourceMeta.getPortObject(), inputPorts);
+    }
+  }
+
   @Override
   @SuppressWarnings("unchecked")
   public <T> StreamMeta addStream(String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T> sink1)
@@ -1276,7 +1382,7 @@ public class LogicalPlan implements Serializable, DAG
 
   private OutputPortMeta assertGetPortMeta(Operator.OutputPort<?> port)
   {
-    for (OperatorMeta o: getAllOperators()) {
+    for (OperatorMeta o : getAllOperators()) {
       OutputPortMeta opm = o.getPortMapping().outPortMap.get(port);
       if (opm != null) {
         return opm;
@@ -1287,7 +1393,7 @@ public class LogicalPlan implements Serializable, DAG
 
   private InputPortMeta assertGetPortMeta(Operator.InputPort<?> port)
   {
-    for (OperatorMeta o: getAllOperators()) {
+    for (OperatorMeta o : getAllOperators()) {
       InputPortMeta opm = o.getPortMapping().inPortMap.get(port);
       if (opm != null) {
         return opm;
@@ -1324,7 +1430,8 @@ public class LogicalPlan implements Serializable, DAG
     return Collections.unmodifiableCollection(this.operators.values());
   }
 
-  public Collection<ModuleMeta> getAllModules() {
+  public Collection<ModuleMeta> getAllModules()
+  {
     return Collections.unmodifiableCollection(this.modules.values());
   }
 
@@ -1341,7 +1448,7 @@ public class LogicalPlan implements Serializable, DAG
 
   public ModuleMeta getModuleMeta(String moduleName)
   {
-    return null;
+    return this.modules.get(moduleName);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/14a09bb5/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
index 9bbe85c..6dc4c0c 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
@@ -2116,12 +2116,20 @@ public class LogicalPlanConfiguration {
 
     // Expand the modules within the dag recursively
     setModuleProperties(dag, appName);
+    flattenDAG(dag, conf);
 
     // inject external operator configuration
     setOperatorConfiguration(dag, appConfs, appName);
     setStreamConfiguration(dag, appConfs, appName);
   }
 
+  private void flattenDAG(LogicalPlan dag, Configuration conf)
+  {
+    for (ModuleMeta moduleMeta : dag.getAllModules()) {
+      moduleMeta.flattenModule(dag, conf);
+    }
+  }
+
   public static Properties readProperties(String filePath) throws IOException
   {
     InputStream is = new FileInputStream(filePath);