You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/01/29 08:21:31 UTC
[12/50] incubator-apex-core git commit: APEXCORE-104 Added flattening
of module into parent DAG
APEXCORE-104 Added flattening of module into parent DAG
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/14a09bb5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/14a09bb5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/14a09bb5
Branch: refs/heads/master
Commit: 14a09bb51cb30585e7979ec022ec762ac2ba91e5
Parents: b0360d4
Author: chinmaykolhatkar <ch...@datatorrent.com>
Authored: Wed Oct 7 15:06:36 2015 +0530
Committer: chinmaykolhatkar <ch...@datatorrent.com>
Committed: Tue Dec 22 01:42:43 2015 +0530
----------------------------------------------------------------------
api/src/main/java/com/datatorrent/api/DAG.java | 4 -
.../stram/plan/logical/LogicalPlan.java | 169 +++++++++++++++----
.../plan/logical/LogicalPlanConfiguration.java | 8 +
3 files changed, 146 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/14a09bb5/api/src/main/java/com/datatorrent/api/DAG.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/DAG.java b/api/src/main/java/com/datatorrent/api/DAG.java
index 1dce402..7c793f9 100644
--- a/api/src/main/java/com/datatorrent/api/DAG.java
+++ b/api/src/main/java/com/datatorrent/api/DAG.java
@@ -165,10 +165,6 @@ public interface DAG extends DAGContext, Serializable
String getName();
Module getModule();
-
- InputPortMeta getMeta(Operator.InputPort<?> port);
-
- OutputPortMeta getMeta(Operator.OutputPort<?> port);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/14a09bb5/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 53e81bc..5a3e167 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -80,6 +80,7 @@ public class LogicalPlan implements Serializable, DAG
public static final String SER_FILE_NAME = "dt-conf.ser";
public static final String LAUNCH_CONFIG_FILE_NAME = "dt-launch-config.xml";
private static final transient AtomicInteger logicalOperatorSequencer = new AtomicInteger();
+ public static final String MODULE_NAMESPACE_SEPARATOR = "$";
/**
* Constant
@@ -146,6 +147,7 @@ public class LogicalPlan implements Serializable, DAG
private final Map<String, StreamMeta> streams = new HashMap<String, StreamMeta>();
private final Map<String, OperatorMeta> operators = new HashMap<String, OperatorMeta>();
+ public final Map<String, ModuleMeta> modules = new LinkedHashMap<>();
private final List<OperatorMeta> rootOperators = new ArrayList<OperatorMeta>();
private final Attribute.AttributeMap attributes = new DefaultAttributeMap();
private transient int nodeIndex = 0; // used for cycle validation
@@ -733,6 +735,7 @@ public class LogicalPlan implements Serializable, DAG
private transient Integer lowlink; // for cycle detection
private transient Operator operator;
private MetricAggregatorMeta metricAggregatorMeta;
+ private String moduleName; // Name of the module which has this operator. null if this is a top level operator.
/*
* Used for OIO validation,
@@ -819,6 +822,16 @@ public class LogicalPlan implements Serializable, DAG
return metricAggregatorMeta;
}
+ public String getModuleName()
+ {
+ return moduleName;
+ }
+
+ public void setModuleName(String moduleName)
+ {
+ this.moduleName = moduleName;
+ }
+
protected void populateAggregatorMeta()
{
AutoMetric.Aggregator aggregator = getValue(OperatorContext.METRICS_AGGREGATOR);
@@ -1073,37 +1086,38 @@ public class LogicalPlan implements Serializable, DAG
throw new IllegalArgumentException("duplicate operator id: " + operators.get(name));
}
+ // Avoid name conflict with module.
+ if (modules.containsKey(name))
+ throw new IllegalArgumentException("duplicate operator id: " + operators.get(name));
+
OperatorMeta decl = new OperatorMeta(name, operator);
rootOperators.add(decl); // will be removed when a sink is added to an input port for this operator
operators.put(name, decl);
return operator;
}
+ /**
+ * Module meta object.
+ */
public final class ModuleMeta implements DAG.ModuleMeta, Serializable
{
private final LinkedHashMap<InputPortMeta, StreamMeta> inputStreams = new LinkedHashMap<>();
private final LinkedHashMap<OutputPortMeta, StreamMeta> outputStreams = new LinkedHashMap<>();
private final Attribute.AttributeMap attributes;
- @SuppressWarnings("unused")
- private final int id;
@NotNull
- private final String name;
- private transient Integer nindex; // for cycle detection
- private transient Integer lowlink; // for cycle detection
+ private String name;
private transient Module module;
+ private ModuleMeta parent;
+ private LogicalPlan dag = null;
+ private transient String fullName;
- public ModuleMeta(String name, Module module)
- {
- this(name, module, new DefaultAttributeMap());
- }
-
- public ModuleMeta(String name, Module module, DefaultAttributeMap attributeMap)
+ private ModuleMeta(String name, Module module)
{
LOG.debug("Initializing {} as {}", name, module.getClass().getName());
this.name = name;
this.module = module;
- this.id = logicalOperatorSequencer.decrementAndGet();
- this.attributes = attributeMap;
+ this.attributes = new DefaultAttributeMap();
+ this.dag = new LogicalPlan();
}
@Override
@@ -1119,43 +1133,104 @@ public class LogicalPlan implements Serializable, DAG
}
@Override
- public DAG.InputPortMeta getMeta(InputPort<?> port)
+ public Attribute.AttributeMap getAttributes()
{
- return null;
+ return attributes;
}
@Override
- public DAG.OutputPortMeta getMeta(OutputPort<?> port)
+ public <T> T getValue(Attribute<T> key)
{
- return null;
+ return attributes.get(key);
}
@Override
- public Attribute.AttributeMap getAttributes()
+ public void setCounters(Object counters)
{
- return null;
+
}
@Override
- public <T> T getValue(Attribute<T> key)
+ public void sendMetrics(Collection<String> metricNames)
{
- return null;
+
}
- @Override
- public void setCounters(Object counters)
+ public LinkedHashMap<InputPortMeta, StreamMeta> getInputStreams()
{
+ return inputStreams;
+ }
+ public LinkedHashMap<OutputPortMeta, StreamMeta> getOutputStreams()
+ {
+ return outputStreams;
}
- @Override
- public void sendMetrics(Collection<String> metricNames)
+ public LogicalPlan getDag()
{
+ return dag;
+ }
+ private void writeObject(ObjectOutputStream out) throws IOException
+ {
+ out.defaultWriteObject();
+ FSStorageAgent.store(out, module);
}
- }
- public transient Map<String, ModuleMeta> modules = Maps.newHashMap();
+ private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException
+ {
+ input.defaultReadObject();
+ module = (Module)FSStorageAgent.retrieve(input);
+ }
+
+ /**
+ * Expand the module and add its operator to the parentDAG. After this method finishes the module is expanded fully
+ * with all its submodules also expanded. The parentDAG contains the operator added by all the modules.
+ *
+ * @param parentDAG parent dag to populate with operators from this and inner modules.
+ * @param conf configuration object.
+ */
+ public void flattenModule(LogicalPlan parentDAG, org.apache.hadoop.conf.Configuration conf)
+ {
+ module.populateDAG(dag, conf);
+ for (ModuleMeta subModuleMeta : dag.getAllModules()) {
+ subModuleMeta.setParent(this);
+ subModuleMeta.flattenModule(dag, conf);
+ }
+ parentDAG.addDAGToCurrentDAG(this);
+ }
+
+ /**
+ * Return full name of the module. If this is a inner module, i.e module inside of module this method will traverse
+ * till the top level module, and construct the name by concatenating name of modules in the chain in reverse order
+ * separated by MODULE_NAMESPACE_SEPARATO.
+ *
+ * For example If there is module M1, which adds another module M2 in the DAG. Then the full name of the module M2
+ * is ("M1" ++ MODULE_NAMESPACE_SEPARATO + "M2")
+ *
+ * @return full name of the module.
+ */
+ public String getFullName()
+ {
+ if (fullName != null) {
+ return fullName;
+ }
+
+ if (parent == null) {
+ fullName = name;
+ } else {
+ fullName = parent.getFullName() + MODULE_NAMESPACE_SEPARATOR + name;
+ }
+ return fullName;
+ }
+
+ private void setParent(ModuleMeta meta)
+ {
+ this.parent = meta;
+ }
+
+ private static final long serialVersionUID = 7562277769188329223L;
+ }
@Override
public <T extends Module> T addModule(String name, T module)
@@ -1166,6 +1241,10 @@ public class LogicalPlan implements Serializable, DAG
}
throw new IllegalArgumentException("duplicate module is: " + modules.get(name));
}
+ if (operators.containsKey(name)) {
+ throw new IllegalArgumentException("duplicate module is: " + modules.get(name));
+ }
+
ModuleMeta meta = new ModuleMeta(name, module);
modules.put(name, meta);
return module;
@@ -1228,6 +1307,33 @@ public class LogicalPlan implements Serializable, DAG
return s;
}
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ private void addDAGToCurrentDAG(ModuleMeta moduleMeta)
+ {
+ LogicalPlan subDag = moduleMeta.getDag();
+ String subDAGName = moduleMeta.getName();
+ String name;
+ for (OperatorMeta operatorMeta : subDag.getAllOperators()) {
+ name = subDAGName + MODULE_NAMESPACE_SEPARATOR + operatorMeta.getName();
+ this.addOperator(name, operatorMeta.getOperator());
+ OperatorMeta operatorMetaNew = this.getOperatorMeta(name);
+ operatorMetaNew.setModuleName(operatorMeta.getModuleName() == null ? subDAGName :
+ subDAGName + MODULE_NAMESPACE_SEPARATOR + operatorMeta.getModuleName());
+ }
+
+ for (StreamMeta streamMeta : subDag.getAllStreams()) {
+ OutputPortMeta sourceMeta = streamMeta.getSource();
+ List<InputPort<?>> ports = new LinkedList<>();
+ for (InputPortMeta inputPortMeta : streamMeta.getSinks()) {
+ ports.add(inputPortMeta.getPortObject());
+ }
+ InputPort[] inputPorts = ports.toArray(new InputPort[]{});
+
+ name = subDAGName + MODULE_NAMESPACE_SEPARATOR + streamMeta.getName();
+ this.addStream(name, sourceMeta.getPortObject(), inputPorts);
+ }
+ }
+
@Override
@SuppressWarnings("unchecked")
public <T> StreamMeta addStream(String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T> sink1)
@@ -1276,7 +1382,7 @@ public class LogicalPlan implements Serializable, DAG
private OutputPortMeta assertGetPortMeta(Operator.OutputPort<?> port)
{
- for (OperatorMeta o: getAllOperators()) {
+ for (OperatorMeta o : getAllOperators()) {
OutputPortMeta opm = o.getPortMapping().outPortMap.get(port);
if (opm != null) {
return opm;
@@ -1287,7 +1393,7 @@ public class LogicalPlan implements Serializable, DAG
private InputPortMeta assertGetPortMeta(Operator.InputPort<?> port)
{
- for (OperatorMeta o: getAllOperators()) {
+ for (OperatorMeta o : getAllOperators()) {
InputPortMeta opm = o.getPortMapping().inPortMap.get(port);
if (opm != null) {
return opm;
@@ -1324,7 +1430,8 @@ public class LogicalPlan implements Serializable, DAG
return Collections.unmodifiableCollection(this.operators.values());
}
- public Collection<ModuleMeta> getAllModules() {
+ public Collection<ModuleMeta> getAllModules()
+ {
return Collections.unmodifiableCollection(this.modules.values());
}
@@ -1341,7 +1448,7 @@ public class LogicalPlan implements Serializable, DAG
public ModuleMeta getModuleMeta(String moduleName)
{
- return null;
+ return this.modules.get(moduleName);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/14a09bb5/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
index 9bbe85c..6dc4c0c 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
@@ -2116,12 +2116,20 @@ public class LogicalPlanConfiguration {
// Expand the modules within the dag recursively
setModuleProperties(dag, appName);
+ flattenDAG(dag, conf);
// inject external operator configuration
setOperatorConfiguration(dag, appConfs, appName);
setStreamConfiguration(dag, appConfs, appName);
}
+ private void flattenDAG(LogicalPlan dag, Configuration conf)
+ {
+ for (ModuleMeta moduleMeta : dag.getAllModules()) {
+ moduleMeta.flattenModule(dag, conf);
+ }
+ }
+
public static Properties readProperties(String filePath) throws IOException
{
InputStream is = new FileInputStream(filePath);