You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/01/29 08:21:32 UTC
[13/50] incubator-apex-core git commit: APEXCORE-194 Added support
for proxy ports Added test cases.
APEXCORE-194 Added support for proxy ports
Added test cases.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/c1314eaf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/c1314eaf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/c1314eaf
Branch: refs/heads/master
Commit: c1314eafaac239b420d085a4584d5c5acaf3e69b
Parents: 14a09bb
Author: bhupeshchawda <bh...@gmail.com>
Authored: Tue Oct 6 12:34:24 2015 +0530
Committer: Tushar R. Gosavi <tu...@apache.org>
Committed: Tue Dec 22 02:04:18 2015 +0530
----------------------------------------------------------------------
.../main/java/com/datatorrent/api/Module.java | 120 ++++
.../stram/plan/logical/LogicalPlan.java | 51 +-
.../plan/logical/LogicalPlanConfiguration.java | 1 +
.../plan/logical/module/ModuleAppTest.java | 168 ++++++
.../logical/module/TestModuleExpansion.java | 552 +++++++++++++++++++
5 files changed, 888 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/c1314eaf/api/src/main/java/com/datatorrent/api/Module.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/Module.java b/api/src/main/java/com/datatorrent/api/Module.java
index 1220fc1..67682e7 100644
--- a/api/src/main/java/com/datatorrent/api/Module.java
+++ b/api/src/main/java/com/datatorrent/api/Module.java
@@ -21,8 +21,128 @@ package com.datatorrent.api;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
+import com.datatorrent.api.Context.PortContext;
+import com.datatorrent.api.Operator.InputPort;
+import com.datatorrent.api.Operator.OutputPort;
+import com.datatorrent.api.Operator.Unifier;
+
+/**
+ * A Module is a component which can be added to the DAG similar to the operator,
+ * using addModule API. The module should implement populateDAG method, which
+ * will be called by the platform, and DAG populated by the module will be
+ * replace in place of the module.
+ *
+ */
@InterfaceStability.Evolving
public interface Module
{
void populateDAG(DAG dag, Configuration conf);
+
+ /**
+ * These ports allow platform to short circuit module port to the operator port. i.e When a module is expanded, it can
+ * specify which operator's port is used to replaced the module port in the final DAG.
+ *
+ * @param <T> data type accepted at the input port.
+ */
+ final class ProxyInputPort<T> implements InputPort<T>
+ {
+ InputPort<T> inputPort;
+
+ public void set(InputPort<T> port)
+ {
+ inputPort = port;
+ }
+
+ public InputPort<T> get()
+ {
+ return inputPort;
+ }
+
+ @Override
+ public void setup(PortContext context)
+ {
+ if (inputPort != null) {
+ inputPort.setup(context);
+ }
+ }
+
+ @Override
+ public void teardown()
+ {
+ if (inputPort != null) {
+ inputPort.teardown();
+ }
+ }
+
+ @Override
+ public Sink<T> getSink()
+ {
+ return inputPort == null ? null : inputPort.getSink();
+ }
+
+ @Override
+ public void setConnected(boolean connected)
+ {
+ if (inputPort != null) {
+ inputPort.setConnected(connected);
+ }
+ }
+
+ @Override
+ public StreamCodec<T> getStreamCodec()
+ {
+ return inputPort == null ? null : inputPort.getStreamCodec();
+ }
+ }
+
+ /**
+ * Similar to ProxyInputPort, but on output side.
+ *
+ * @param <T> datatype emitted on the port.
+ */
+ final class ProxyOutputPort<T> implements OutputPort<T>
+ {
+ OutputPort<T> outputPort;
+
+ public void set(OutputPort<T> port)
+ {
+ outputPort = port;
+ }
+
+ public OutputPort<T> get()
+ {
+ return outputPort;
+ }
+
+ @Override
+ public void setup(PortContext context)
+ {
+ if (outputPort != null) {
+ outputPort.setup(context);
+ }
+ }
+
+ @Override
+ public void teardown()
+ {
+ if (outputPort != null) {
+ outputPort.teardown();
+ }
+ }
+
+ @Override
+ public void setSink(Sink<Object> s)
+ {
+ if (outputPort != null) {
+ outputPort.setSink(s);
+ }
+ }
+
+ @Override
+ public Unifier<T> getUnifier()
+ {
+ return outputPort == null ? null : outputPort.getUnifier();
+ }
+ }
}
+
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/c1314eaf/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 5a3e167..21039cc 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -37,12 +37,14 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Maps;
+import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Sets;
import com.datatorrent.api.*;
import com.datatorrent.api.Attribute.AttributeMap;
import com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap;
+import com.datatorrent.api.Module.ProxyInputPort;
+import com.datatorrent.api.Module.ProxyOutputPort;
import com.datatorrent.api.Operator.InputPort;
import com.datatorrent.api.Operator.OutputPort;
import com.datatorrent.api.Operator.Unifier;
@@ -152,6 +154,7 @@ public class LogicalPlan implements Serializable, DAG
private final Attribute.AttributeMap attributes = new DefaultAttributeMap();
private transient int nodeIndex = 0; // used for cycle validation
private transient Stack<OperatorMeta> stack = new Stack<OperatorMeta>(); // used for cycle validation
+ private transient Map<String, ArrayListMultimap<OutputPort<?>, InputPort<?>>> streamLinks = new HashMap<>();
@Override
public Attribute.AttributeMap getAttributes()
@@ -1197,6 +1200,7 @@ public class LogicalPlan implements Serializable, DAG
subModuleMeta.setParent(this);
subModuleMeta.flattenModule(dag, conf);
}
+ dag.applyStreamLinks();
parentDAG.addDAGToCurrentDAG(this);
}
@@ -1300,13 +1304,52 @@ public class LogicalPlan implements Serializable, DAG
public <T> StreamMeta addStream(String id, Operator.OutputPort<? extends T> source, Operator.InputPort<? super T>... sinks)
{
StreamMeta s = addStream(id);
- s.setSource(source);
- for (Operator.InputPort<?> sink: sinks) {
- s.addSink(sink);
+ id = s.id;
+ ArrayListMultimap<OutputPort<?>, InputPort<?>> streamMap = ArrayListMultimap.create();
+ if (!(source instanceof ProxyOutputPort)) {
+ s.setSource(source);
+ }
+ for (Operator.InputPort<?> sink : sinks) {
+ if (source instanceof ProxyOutputPort || sink instanceof ProxyInputPort) {
+ streamMap.put(source, sink);
+ streamLinks.put(id, streamMap);
+ } else {
+ if (s.getSource() == null) {
+ s.setSource(source);
+ }
+ s.addSink(sink);
+ }
}
return s;
}
+ /**
+ * This will be called once the Logical Dag is expanded, and the proxy input and proxy output ports are populated with
+ * the actual ports that they refer to This method adds sources and sinks for the StreamMeta objects which were left
+ * empty in the addStream call.
+ */
+ public void applyStreamLinks()
+ {
+ for (String id : streamLinks.keySet()) {
+ StreamMeta s = getStream(id);
+ for (Map.Entry<Operator.OutputPort<?>, Operator.InputPort<?>> pair : streamLinks.get(id).entries()) {
+ if (s.getSource() == null) {
+ Operator.OutputPort<?> outputPort = pair.getKey();
+ while (outputPort instanceof ProxyOutputPort) {
+ outputPort = ((ProxyOutputPort<?>)outputPort).get();
+ }
+ s.setSource(outputPort);
+ }
+
+ Operator.InputPort<?> inputPort = pair.getValue();
+ while (inputPort instanceof ProxyInputPort) {
+ inputPort = ((ProxyInputPort<?>)inputPort).get();
+ }
+ s.addSink(inputPort);
+ }
+ }
+ }
+
@SuppressWarnings({ "unchecked", "rawtypes" })
private void addDAGToCurrentDAG(ModuleMeta moduleMeta)
{
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/c1314eaf/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
index 6dc4c0c..483576a 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java
@@ -2128,6 +2128,7 @@ public class LogicalPlanConfiguration {
for (ModuleMeta moduleMeta : dag.getAllModules()) {
moduleMeta.flattenModule(dag, conf);
}
+ dag.applyStreamLinks();
}
public static Properties readProperties(String filePath) throws IOException
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/c1314eaf/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java
new file mode 100644
index 0000000..97c015e
--- /dev/null
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/ModuleAppTest.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.stram.plan.logical.module;
+
+import java.util.Random;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Module;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
+
+/**
+ * Unit tests for testing Dag expansion with modules and proxy port substitution
+ */
+public class ModuleAppTest
+{
+
+ /*
+ * Input Operator - 1
+ */
+ static class DummyInputOperator extends BaseOperator implements InputOperator
+ {
+
+ Random r = new Random();
+ public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>();
+
+ @Override
+ public void emitTuples()
+ {
+ output.emit(r.nextInt());
+ }
+ }
+
+ /*
+ * Input Operator - 1.1
+ */
+ static class DummyOperatorAfterInput extends BaseOperator
+ {
+
+ public transient DefaultInputPort<Integer> input = new DefaultInputPort<Integer>()
+ {
+ @Override
+ public void process(Integer tuple)
+ {
+ output.emit(tuple);
+ }
+ };
+ public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>();
+ }
+
+ /*
+ * Operator - 2
+ */
+ static class DummyOperator extends BaseOperator
+ {
+ int prop;
+
+ public transient DefaultInputPort<Integer> input = new DefaultInputPort<Integer>()
+ {
+ @Override
+ public void process(Integer tuple)
+ {
+ LOG.debug(tuple.intValue() + " processed");
+ output.emit(tuple);
+ }
+ };
+ public transient DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>();
+ }
+
+ /*
+ * Output Operator - 3
+ */
+ static class DummyOutputOperator extends BaseOperator
+ {
+ int prop;
+
+ public transient DefaultInputPort<Integer> input = new DefaultInputPort<Integer>()
+ {
+ @Override
+ public void process(Integer tuple)
+ {
+ LOG.debug(tuple.intValue() + " processed");
+ }
+ };
+ }
+
+ /*
+ * Module Definition
+ */
+ static class TestModule implements Module
+ {
+
+ public transient ProxyInputPort<Integer> moduleInput = new Module.ProxyInputPort<Integer>();
+ public transient ProxyOutputPort<Integer> moduleOutput = new Module.ProxyOutputPort<Integer>();
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ LOG.debug("Module - PopulateDAG");
+ DummyOperator dummyOperator = dag.addOperator("DummyOperator", new DummyOperator());
+ moduleInput.set(dummyOperator.input);
+ moduleOutput.set(dummyOperator.output);
+ }
+ }
+
+ static class Application implements StreamingApplication
+ {
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ LOG.debug("Application - PopulateDAG");
+ DummyInputOperator dummyInputOperator = dag.addOperator("DummyInputOperator", new DummyInputOperator());
+ DummyOperatorAfterInput dummyOperatorAfterInput = dag.addOperator("DummyOperatorAfterInput",
+ new DummyOperatorAfterInput());
+ Module m1 = dag.addModule("TestModule1", new TestModule());
+ Module m2 = dag.addModule("TestModule2", new TestModule());
+ DummyOutputOperator dummyOutputOperator = dag.addOperator("DummyOutputOperator", new DummyOutputOperator());
+ dag.addStream("Operator To Operator", dummyInputOperator.output, dummyOperatorAfterInput.input);
+ dag.addStream("Operator To Module", dummyOperatorAfterInput.output, ((TestModule)m1).moduleInput);
+ dag.addStream("Module To Module", ((TestModule)m1).moduleOutput, ((TestModule)m2).moduleInput);
+ dag.addStream("Module To Operator", ((TestModule)m2).moduleOutput, dummyOutputOperator.input);
+ }
+ }
+
+ @Test
+ public void validateTestApplication()
+ {
+ Configuration conf = new Configuration(false);
+ LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(conf);
+ LogicalPlan dag = new LogicalPlan();
+ lpc.prepareDAG(dag, new Application(), "TestApp");
+
+ Assert.assertEquals(2, dag.getAllModules().size(), 2);
+ Assert.assertEquals(5, dag.getAllOperators().size());
+ Assert.assertEquals(4, dag.getAllStreams().size());
+ dag.validate();
+ }
+
+ private static Logger LOG = LoggerFactory.getLogger(ModuleAppTest.class);
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/c1314eaf/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java
new file mode 100644
index 0000000..5bfd8f1
--- /dev/null
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java
@@ -0,0 +1,552 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.stram.plan.logical.module;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Module;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
+
+public class TestModuleExpansion
+{
+ static class DummyInputOperator extends BaseOperator implements InputOperator
+ {
+ private int inputOperatorProp = 0;
+
+ Random r = new Random();
+ public transient DefaultOutputPort<Integer> out = new DefaultOutputPort<Integer>();
+
+ @Override
+ public void emitTuples()
+ {
+ out.emit(r.nextInt());
+ }
+
+ public int getInputOperatorProp()
+ {
+ return inputOperatorProp;
+ }
+
+ public void setInputOperatorProp(int inputOperatorProp)
+ {
+ this.inputOperatorProp = inputOperatorProp;
+ }
+ }
+
+ static class DummyOperator extends BaseOperator
+ {
+ private int operatorProp = 0;
+
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<Integer> out1 = new DefaultOutputPort<>();
+
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient DefaultOutputPort<Integer> out2 = new DefaultOutputPort<>();
+
+ @InputPortFieldAnnotation(optional = true)
+ public final transient DefaultInputPort<Integer> in = new DefaultInputPort<Integer>()
+ {
+ @Override
+ public void process(Integer tuple)
+ {
+ out1.emit(tuple);
+ out2.emit(tuple);
+ }
+ };
+
+ public int getOperatorProp()
+ {
+ return operatorProp;
+ }
+
+ public void setOperatorProp(int operatorProp)
+ {
+ this.operatorProp = operatorProp;
+ }
+ }
+
+ static class Level1Module implements Module
+ {
+ private int level1ModuleProp = 0;
+
+ @InputPortFieldAnnotation(optional = true)
+ public final transient ProxyInputPort<Integer> mIn = new ProxyInputPort<>();
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient ProxyOutputPort<Integer> mOut = new ProxyOutputPort<>();
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ DummyOperator o1 = dag.addOperator("O1", new DummyOperator());
+ o1.setOperatorProp(level1ModuleProp);
+ mIn.set(o1.in);
+ mOut.set(o1.out1);
+ }
+
+ public int getLevel1ModuleProp()
+ {
+ return level1ModuleProp;
+ }
+
+ public void setLevel1ModuleProp(int level1ModuleProp)
+ {
+ this.level1ModuleProp = level1ModuleProp;
+ }
+ }
+
+ static class Level2ModuleA implements Module
+ {
+ private int level2ModuleAProp1 = 0;
+ private int level2ModuleAProp2 = 0;
+ private int level2ModuleAProp3 = 0;
+
+ @InputPortFieldAnnotation(optional = true)
+ public final transient ProxyInputPort<Integer> mIn = new ProxyInputPort<>();
+
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient ProxyOutputPort<Integer> mOut1 = new ProxyOutputPort<>();
+
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient ProxyOutputPort<Integer> mOut2 = new ProxyOutputPort<>();
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ Level1Module m1 = dag.addModule("M1", new Level1Module());
+ m1.setLevel1ModuleProp(level2ModuleAProp1);
+
+ Level1Module m2 = dag.addModule("M2", new Level1Module());
+ m2.setLevel1ModuleProp(level2ModuleAProp2);
+
+ DummyOperator o1 = dag.addOperator("O1", new DummyOperator());
+ o1.setOperatorProp(level2ModuleAProp3);
+
+ dag.addStream("M1_M2&O1", m1.mOut, m2.mIn, o1.in);
+
+ mIn.set(m1.mIn);
+ mOut1.set(m2.mOut);
+ mOut2.set(o1.out1);
+ }
+
+ public int getLevel2ModuleAProp1()
+ {
+ return level2ModuleAProp1;
+ }
+
+ public void setLevel2ModuleAProp1(int level2ModuleAProp1)
+ {
+ this.level2ModuleAProp1 = level2ModuleAProp1;
+ }
+
+ public int getLevel2ModuleAProp2()
+ {
+ return level2ModuleAProp2;
+ }
+
+ public void setLevel2ModuleAProp2(int level2ModuleAProp2)
+ {
+ this.level2ModuleAProp2 = level2ModuleAProp2;
+ }
+
+ public int getLevel2ModuleAProp3()
+ {
+ return level2ModuleAProp3;
+ }
+
+ public void setLevel2ModuleAProp3(int level2ModuleAProp3)
+ {
+ this.level2ModuleAProp3 = level2ModuleAProp3;
+ }
+ }
+
+ static class Level2ModuleB implements Module
+ {
+ private int level2ModuleBProp1 = 0;
+ private int level2ModuleBProp2 = 0;
+ private int level2ModuleBProp3 = 0;
+
+ @InputPortFieldAnnotation(optional = true)
+ public final transient ProxyInputPort<Integer> mIn = new ProxyInputPort<>();
+
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient ProxyOutputPort<Integer> mOut1 = new ProxyOutputPort<>();
+
+ @OutputPortFieldAnnotation(optional = true)
+ public final transient ProxyOutputPort<Integer> mOut2 = new ProxyOutputPort<>();
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ DummyOperator o1 = dag.addOperator("O1", new DummyOperator());
+ o1.setOperatorProp(level2ModuleBProp1);
+
+ Level1Module m1 = dag.addModule("M1", new Level1Module());
+ m1.setLevel1ModuleProp(level2ModuleBProp2);
+
+ DummyOperator o2 = dag.addOperator("O2", new DummyOperator());
+ o2.setOperatorProp(level2ModuleBProp3);
+
+ dag.addStream("O1_M1", o1.out1, m1.mIn);
+ dag.addStream("O1_O2", o1.out2, o2.in);
+
+ mIn.set(o1.in);
+ mOut1.set(m1.mOut);
+ mOut2.set(o2.out1);
+ }
+
+ public int getLevel2ModuleBProp1()
+ {
+ return level2ModuleBProp1;
+ }
+
+ public void setLevel2ModuleBProp1(int level2ModuleBProp1)
+ {
+ this.level2ModuleBProp1 = level2ModuleBProp1;
+ }
+
+ public int getLevel2ModuleBProp2()
+ {
+ return level2ModuleBProp2;
+ }
+
+ public void setLevel2ModuleBProp2(int level2ModuleBProp2)
+ {
+ this.level2ModuleBProp2 = level2ModuleBProp2;
+ }
+
+ public int getLevel2ModuleBProp3()
+ {
+ return level2ModuleBProp3;
+ }
+
+ public void setLevel2ModuleBProp3(int level2ModuleBProp3)
+ {
+ this.level2ModuleBProp3 = level2ModuleBProp3;
+ }
+ }
+
+ static class Level3Module implements Module
+ {
+
+ public final transient ProxyInputPort<Integer> mIn = new ProxyInputPort<>();
+ public final transient ProxyOutputPort<Integer> mOut1 = new ProxyOutputPort<>();
+ public final transient ProxyOutputPort<Integer> mOut2 = new ProxyOutputPort<>();
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ DummyOperator op = dag.addOperator("O1", new DummyOperator());
+ Level2ModuleB m1 = dag.addModule("M1", new Level2ModuleB());
+ Level1Module m2 = dag.addModule("M2", new Level1Module());
+
+ dag.addStream("s1", op.out1, m1.mIn);
+ dag.addStream("s2", op.out2, m2.mIn);
+
+ mIn.set(op.in);
+ mOut1.set(m1.mOut1);
+ mOut2.set(m2.mOut);
+ }
+ }
+
+ static class NestedModuleApp implements StreamingApplication
+ {
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ DummyInputOperator o1 = dag.addOperator("O1", new DummyInputOperator());
+ o1.setInputOperatorProp(1);
+
+ DummyOperator o2 = dag.addOperator("O2", new DummyOperator());
+ o2.setOperatorProp(2);
+
+ Level2ModuleA ma = dag.addModule("Ma", new Level2ModuleA());
+ ma.setLevel2ModuleAProp1(11);
+ ma.setLevel2ModuleAProp2(12);
+ ma.setLevel2ModuleAProp3(13);
+
+ Level2ModuleB mb = dag.addModule("Mb", new Level2ModuleB());
+ mb.setLevel2ModuleBProp1(21);
+ mb.setLevel2ModuleBProp2(22);
+ mb.setLevel2ModuleBProp3(23);
+
+ Level2ModuleA mc = dag.addModule("Mc", new Level2ModuleA());
+ mc.setLevel2ModuleAProp1(31);
+ mc.setLevel2ModuleAProp2(32);
+ mc.setLevel2ModuleAProp3(33);
+
+ Level2ModuleB md = dag.addModule("Md", new Level2ModuleB());
+ md.setLevel2ModuleBProp1(41);
+ md.setLevel2ModuleBProp2(42);
+ md.setLevel2ModuleBProp3(43);
+
+ Level3Module me = dag.addModule("Me", new Level3Module());
+
+ dag.addStream("O1_O2", o1.out, o2.in, me.mIn);
+ dag.addStream("O2_Ma", o2.out1, ma.mIn);
+ dag.addStream("Ma_Mb", ma.mOut1, mb.mIn);
+ dag.addStream("Ma_Md", ma.mOut2, md.mIn);
+ dag.addStream("Mb_Mc", mb.mOut2, mc.mIn);
+ }
+ }
+
+ @Test
+ public void testModuleExtreme()
+ {
+ StreamingApplication app = new NestedModuleApp();
+ Configuration conf = new Configuration(false);
+ LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(conf);
+ LogicalPlan dag = new LogicalPlan();
+ lpc.prepareDAG(dag, app, "ModuleApp");
+
+ dag.validate();
+ validateTopLevelOperators(dag);
+ validateTopLevelStreams(dag);
+ validatePublicMethods(dag);
+ }
+
+ private void validateTopLevelStreams(LogicalPlan dag)
+ {
+ List<String> streamNames = new ArrayList<>();
+ for (LogicalPlan.StreamMeta streamMeta : dag.getAllStreams()) {
+ streamNames.add(streamMeta.getName());
+ }
+
+ Assert.assertTrue(streamNames.contains(componentName("Mb", "O1_M1")));
+ Assert.assertTrue(streamNames.contains("O2_Ma"));
+ Assert.assertTrue(streamNames.contains("Mb_Mc"));
+ Assert.assertTrue(streamNames.contains(componentName("Mb", "O1_O2")));
+ Assert.assertTrue(streamNames.contains(componentName("Ma", "M1_M2&O1")));
+ Assert.assertTrue(streamNames.contains(componentName("Md", "O1_M1")));
+ Assert.assertTrue(streamNames.contains(componentName("Ma_Md")));
+ Assert.assertTrue(streamNames.contains(componentName("Mc", "M1_M2&O1")));
+ Assert.assertTrue(streamNames.contains(componentName("Md", "O1_O2")));
+ Assert.assertTrue(streamNames.contains("Ma_Mb"));
+ Assert.assertTrue(streamNames.contains("O1_O2"));
+
+ validateSeperateStream(dag, componentName("Mb", "O1_M1"), componentName("Mb", "O1"),
+ componentName("Mb", "M1", "O1"));
+ validateSeperateStream(dag, "O2_Ma", "O2", componentName("Ma", "M1", "O1"));
+ validateSeperateStream(dag, "Mb_Mc", componentName("Mb", "O2"), componentName("Mc", "M1", "O1"));
+ validateSeperateStream(dag, componentName("Mb", "O1_O2"), componentName("Mb", "O1"), componentName("Mb", "O2"));
+ validateSeperateStream(dag, componentName("Ma", "M1_M2&O1"), componentName("Ma", "M1", "O1"),
+ componentName("Ma", "O1"), componentName("Ma", "M2", "O1"));
+ validateSeperateStream(dag, componentName("Md", "O1_M1"), componentName("Md", "O1"),
+ componentName("Md", "M1", "O1"));
+ validateSeperateStream(dag, "Ma_Md", componentName("Ma", "O1"), componentName("Md", "O1"));
+ validateSeperateStream(dag, componentName("Mc", "M1_M2&O1"), componentName("Mc", "M1", "O1"),
+ componentName("Mc", "O1"), componentName("Mc", "M2", "O1"));
+ validateSeperateStream(dag, componentName("Md", "O1_O2"), componentName("Md", "O1"), componentName("Md", "O2"));
+ validateSeperateStream(dag, "Ma_Mb", componentName("Ma", "M2", "O1"), componentName("Mb", "O1"));
+ validateSeperateStream(dag, "O1_O2", "O1", "O2", componentName("Me", "O1"));
+ }
+
+ private void validateSeperateStream(LogicalPlan dag, String streamName, String inputOperatorName,
+ String... outputOperatorNames)
+ {
+ LogicalPlan.StreamMeta streamMeta = dag.getStream(streamName);
+ String sourceName = streamMeta.getSource().getOperatorMeta().getName();
+
+ List<String> sinksName = new ArrayList<>();
+ for (LogicalPlan.InputPortMeta inputPortMeta : streamMeta.getSinks()) {
+ sinksName.add(inputPortMeta.getOperatorWrapper().getName());
+ }
+
+ Assert.assertTrue(inputOperatorName.equals(sourceName));
+ Assert.assertEquals(outputOperatorNames.length, sinksName.size());
+
+ for (String outputOperatorName : outputOperatorNames) {
+ Assert.assertTrue(sinksName.contains(outputOperatorName));
+ }
+ }
+
+ private void validateTopLevelOperators(LogicalPlan dag)
+ {
+ List<String> operatorNames = new ArrayList<>();
+ for (LogicalPlan.OperatorMeta operatorMeta : dag.getAllOperators()) {
+ operatorNames.add(operatorMeta.getName());
+ }
+ Assert.assertTrue(operatorNames.contains("O1"));
+ Assert.assertTrue(operatorNames.contains("O2"));
+ Assert.assertTrue(operatorNames.contains(componentName("Ma", "M1", "O1")));
+ Assert.assertTrue(operatorNames.contains(componentName("Ma", "M2", "O1")));
+ Assert.assertTrue(operatorNames.contains(componentName("Ma", "O1")));
+ Assert.assertTrue(operatorNames.contains(componentName("Mb", "O1")));
+ Assert.assertTrue(operatorNames.contains(componentName("Mb", "M1", "O1")));
+ Assert.assertTrue(operatorNames.contains(componentName("Mb", "O2")));
+ Assert.assertTrue(operatorNames.contains(componentName("Mc", "M1", "O1")));
+ Assert.assertTrue(operatorNames.contains(componentName("Mc", "M2", "O1")));
+ Assert.assertTrue(operatorNames.contains(componentName("Mc", "O1")));
+ Assert.assertTrue(operatorNames.contains(componentName("Md", "O1")));
+ Assert.assertTrue(operatorNames.contains(componentName("Md", "M1", "O1")));
+ Assert.assertTrue(operatorNames.contains(componentName("Md", "O2")));
+
+ validateOperatorPropertyValue(dag, "O1", 1);
+ validateOperatorPropertyValue(dag, "O2", 2);
+ validateOperatorPropertyValue(dag, componentName("Ma", "M1", "O1"), 11);
+ validateOperatorPropertyValue(dag, componentName("Ma", "M2", "O1"), 12);
+ validateOperatorPropertyValue(dag, componentName("Ma", "O1"), 13);
+ validateOperatorPropertyValue(dag, componentName("Mb", "O1"), 21);
+ validateOperatorPropertyValue(dag, componentName("Mb", "M1", "O1"), 22);
+ validateOperatorPropertyValue(dag, componentName("Mb", "O2"), 23);
+ validateOperatorPropertyValue(dag, componentName("Mc", "M1", "O1"), 31);
+ validateOperatorPropertyValue(dag, componentName("Mc", "M2", "O1"), 32);
+ validateOperatorPropertyValue(dag, componentName("Mc", "O1"), 33);
+ validateOperatorPropertyValue(dag, componentName("Md", "O1"), 41);
+ validateOperatorPropertyValue(dag, componentName("Md", "M1", "O1"), 42);
+ validateOperatorPropertyValue(dag, componentName("Md", "O2"), 43);
+
+ validateOperatorParent(dag, "O1", null);
+ validateOperatorParent(dag, "O2", null);
+ validateOperatorParent(dag, componentName("Ma", "M1", "O1"), componentName("Ma", "M1"));
+ validateOperatorParent(dag, componentName("Ma", "M2", "O1"), componentName("Ma", "M2"));
+ validateOperatorParent(dag, componentName("Ma", "O1"), "Ma");
+ validateOperatorParent(dag, componentName("Mb", "O1"), "Mb");
+ validateOperatorParent(dag, componentName("Mb", "M1", "O1"), componentName("Mb", "M1"));
+ validateOperatorParent(dag, componentName("Mb", "O2"), "Mb");
+ validateOperatorParent(dag, componentName("Mc", "M1", "O1"), componentName("Mc", "M1"));
+ validateOperatorParent(dag, componentName("Mc", "M2", "O1"), componentName("Mc", "M2"));
+ validateOperatorParent(dag, componentName("Mc", "O1"), "Mc");
+ validateOperatorParent(dag, componentName("Md", "O1"), "Md");
+ validateOperatorParent(dag, componentName("Md", "M1", "O1"), componentName("Md", "M1"));
+ validateOperatorParent(dag, componentName("Md", "O2"), "Md");
+ }
+
+ private void validateOperatorParent(LogicalPlan dag, String operatorName, String parentModuleName)
+ {
+ LogicalPlan.OperatorMeta operatorMeta = dag.getOperatorMeta(operatorName);
+ if (parentModuleName == null) {
+ Assert.assertNull(operatorMeta.getModuleName());
+ } else {
+ Assert.assertTrue(parentModuleName.equals(operatorMeta.getModuleName()));
+ }
+ }
+
+ private void validateOperatorPropertyValue(LogicalPlan dag, String operatorName, int expectedValue)
+ {
+ LogicalPlan.OperatorMeta oMeta = dag.getOperatorMeta(operatorName);
+ if (operatorName.equals("O1")) {
+ DummyInputOperator operator = (DummyInputOperator)oMeta.getOperator();
+ Assert.assertEquals(expectedValue, operator.getInputOperatorProp());
+ } else {
+ DummyOperator operator = (DummyOperator)oMeta.getOperator();
+ Assert.assertEquals(expectedValue, operator.getOperatorProp());
+ }
+ }
+
+ private void validatePublicMethods(LogicalPlan dag)
+ {
+ // Logical dag contains 4 modules added on top level.
+ List<String> moduleNames = new ArrayList<>();
+ for (LogicalPlan.ModuleMeta moduleMeta : dag.getAllModules()) {
+ moduleNames.add(moduleMeta.getName());
+ }
+ Assert.assertTrue(moduleNames.contains("Ma"));
+ Assert.assertTrue(moduleNames.contains("Mb"));
+ Assert.assertTrue(moduleNames.contains("Mc"));
+ Assert.assertTrue(moduleNames.contains("Md"));
+ Assert.assertTrue(moduleNames.contains("Me"));
+ Assert.assertEquals("Number of modules are 5", 5, dag.getAllModules().size());
+
+ // correct module meta is returned by getMeta call.
+ LogicalPlan.ModuleMeta m = dag.getModuleMeta("Ma");
+ Assert.assertEquals("Name of module is Ma", m.getName(), "Ma");
+
+ }
+
+ private static String componentName(String... names)
+ {
+ if (names.length == 0) {
+ return "";
+ }
+ StringBuilder sb = new StringBuilder(names[0]);
+ for (int i = 1; i < names.length; i++) {
+ sb.append(LogicalPlan.MODULE_NAMESPACE_SEPARATOR);
+ sb.append(names[i]);
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Generate a conflict, Add a top level operator with name "m1_O1",
+ * and add a module "m1" which will populate operator "O1", causing name conflict with
+ * top level operator.
+ */
+ @Test(expected = java.lang.IllegalArgumentException.class)
+ public void conflictingNamesWithExpandedModule()
+ {
+ Configuration conf = new Configuration(false);
+ LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(conf);
+ LogicalPlan dag = new LogicalPlan();
+ DummyInputOperator in = dag.addOperator(componentName("m1", "O1"), new DummyInputOperator());
+ Level2ModuleA module = dag.addModule("m1", new Level2ModuleA());
+ dag.addStream("s1", in.out, module.mIn);
+ lpc.prepareDAG(dag, null, "ModuleApp");
+ dag.validate();
+ }
+
+ /**
+ * Module and Operator with same name is not allowed in a DAG, to prevent properties
+ * conflict.
+ */
+ @Test(expected = java.lang.IllegalArgumentException.class)
+ public void conflictingNamesWithOperator1()
+ {
+ Configuration conf = new Configuration(false);
+ LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(conf);
+ LogicalPlan dag = new LogicalPlan();
+ DummyInputOperator in = dag.addOperator("M1", new DummyInputOperator());
+ Level2ModuleA module = dag.addModule("M1", new Level2ModuleA());
+ dag.addStream("s1", in.out, module.mIn);
+ lpc.prepareDAG(dag, null, "ModuleApp");
+ dag.validate();
+ }
+
+ /**
+ * Module and Operator with same name is not allowed in a DAG, to prevent properties
+ * conflict.
+ */
+ @Test(expected = java.lang.IllegalArgumentException.class)
+ public void conflictingNamesWithOperator2()
+ {
+ Configuration conf = new Configuration(false);
+ LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(conf);
+ LogicalPlan dag = new LogicalPlan();
+ Level2ModuleA module = dag.addModule("M1", new Level2ModuleA());
+ DummyInputOperator in = dag.addOperator("M1", new DummyInputOperator());
+ dag.addStream("s1", in.out, module.mIn);
+ lpc.prepareDAG(dag, null, "ModuleApp");
+ dag.validate();
+ }
+}