You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2017/02/16 12:03:33 UTC
apex-core git commit: APEXCORE-604 extend DAG API to get operators
and streams from the DAG.
Repository: apex-core
Updated Branches:
refs/heads/master 74f732a79 -> abc836ca1
APEXCORE-604 extend DAG API to get operators and streams from the DAG.
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/abc836ca
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/abc836ca
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/abc836ca
Branch: refs/heads/master
Commit: abc836ca18d63a895d431dc5acf4d3a1018b235d
Parents: 74f732a
Author: Tushar R. Gosavi <tu...@apache.org>
Authored: Tue Jan 17 12:11:12 2017 +0530
Committer: Tushar R. Gosavi <tu...@apache.org>
Committed: Thu Feb 16 14:06:38 2017 +0530
----------------------------------------------------------------------
api/src/main/java/com/datatorrent/api/DAG.java | 71 ++++++++++++++++++++
.../stram/StreamingContainerAgent.java | 2 +-
.../stram/StreamingContainerManager.java | 4 +-
.../stram/codec/LogicalPlanSerializer.java | 9 ++-
.../stram/plan/logical/LogicalPlan.java | 62 +++++++++++------
.../stram/plan/physical/PTOperator.java | 2 +-
.../stram/plan/physical/PhysicalPlan.java | 20 +++---
.../stram/plan/physical/PlanModifier.java | 2 +-
.../stram/plan/physical/StreamMapping.java | 4 +-
.../com/datatorrent/stram/StreamCodecTest.java | 2 +-
.../logical/LogicalPlanConfigurationTest.java | 10 +--
.../logical/module/TestModuleExpansion.java | 2 +-
12 files changed, 142 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-core/blob/abc836ca/api/src/main/java/com/datatorrent/api/DAG.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/DAG.java b/api/src/main/java/com/datatorrent/api/DAG.java
index b80bc93..532ff72 100644
--- a/api/src/main/java/com/datatorrent/api/DAG.java
+++ b/api/src/main/java/com/datatorrent/api/DAG.java
@@ -19,6 +19,8 @@
package com.datatorrent.api;
import java.io.Serializable;
+import java.util.Collection;
+import java.util.Map;
import org.apache.hadoop.classification.InterfaceStability;
@@ -41,11 +43,26 @@ public interface DAG extends DAGContext, Serializable
{
interface InputPortMeta extends Serializable, PortContext
{
+ /**
+ * Return port object represented by this InputPortMeta
+ * @return
+ */
+ public Operator.InputPort<?> getPort();
+
+ public <T extends OperatorMeta> T getOperatorMeta();
}
interface OutputPortMeta extends Serializable, PortContext
{
OperatorMeta getUnifierMeta();
+
+ /**
+ * Return port object represented by this OutputPortMeta
+ * @return
+ */
+ public Operator.OutputPort<?> getPort();
+
+ public <T extends OperatorMeta> T getOperatorMeta();
}
/**
@@ -143,6 +160,19 @@ public interface DAG extends DAGContext, Serializable
*/
public StreamMeta persistUsing(String name, Operator persistOperator, Operator.InputPort<?> persistOperatorInputPort, Operator.InputPort<?> sinkToPersist);
+ /**
+ * Return source of the stream.
+ * @param <T>
+ * @return
+ */
+ public <T extends OutputPortMeta> T getSource();
+
+ /**
+ * Return all sinks connected to this stream.
+ * @param <T>
+ * @return
+ */
+ public <T extends InputPortMeta> Collection<T> getSinks();
}
/**
@@ -157,6 +187,22 @@ public interface DAG extends DAGContext, Serializable
public InputPortMeta getMeta(Operator.InputPort<?> port);
public OutputPortMeta getMeta(Operator.OutputPort<?> port);
+
+ /**
+ * Return collection of stream which are connected to this operator's
+ * input ports.
+ * @param <T>
+ * @return
+ */
+ public <K extends InputPortMeta, V extends StreamMeta> Map<K, V> getInputStreams();
+
+ /**
+ * Return collection of stream which are connected to this operator's
+ * output ports.
+ * @param <T>
+ * @return
+ */
+ public <K extends OutputPortMeta, V extends StreamMeta> Map<K, V> getOutputStreams();
}
/**
@@ -281,6 +327,31 @@ public interface DAG extends DAGContext, Serializable
public abstract OperatorMeta getMeta(Operator operator);
/**
+ * Return all operators present in the DAG.
+ * @param <T>
+ * @return
+ */
+ public <T extends OperatorMeta> Collection<T> getAllOperatorsMeta();
+
+ /**
+ * Get all input operators in the DAG. This method returns operators which are
+ * not connected to any upstream operator. i.e the operators which do not have
+ * any input ports or operators which is not connected through any input ports
+ * in the DAG.
+ *
+ * @param <T>
+ * @return list of {@see OperatorMeta} for root operators in the DAG.
+ */
+ public <T extends OperatorMeta> Collection<T> getRootOperatorsMeta();
+
+ /**
+ * Returns all Streams present in the DAG.
+ * @param <T>
+ * @return
+ */
+ public <T extends StreamMeta> Collection<T> getAllStreamsMeta();
+
+ /**
* Marker interface for the Node in the DAG. Any object which can be added as a Node in the DAG
* needs to implement this interface.
*/
http://git-wip-us.apache.org/repos/asf/apex-core/blob/abc836ca/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
index 1d0897d..eb7aa43 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
@@ -350,7 +350,7 @@ public class StreamingContainerAgent
StreamCodec<?> codec = inputPortMeta.getValue(PortContext.STREAM_CODEC);
if (codec == null) {
// it cannot be this object that gets returned. Depending on this value is dangerous
- codec = inputPortMeta.getPortObject().getStreamCodec();
+ codec = inputPortMeta.getPort().getStreamCodec();
if (codec != null) {
// don't create codec multiple times - it will assign a new identifier
inputPortMeta.getAttributes().put(PortContext.STREAM_CODEC, codec);
http://git-wip-us.apache.org/repos/asf/apex-core/blob/abc836ca/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 00a406c..dfbc7d1 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -706,7 +706,7 @@ public class StreamingContainerManager implements PlanContext
appDataSource.setQueryOperatorName(queryOperatorName);
appDataSource.setQueryTopic(queryTopic);
appDataSource.setQueryUrl(convertAppDataUrl(queryUrl));
- List<LogicalPlan.InputPortMeta> sinks = entry.getValue().getSinks();
+ Collection<LogicalPlan.InputPortMeta> sinks = entry.getValue().getSinks();
if (sinks.isEmpty()) {
LOG.warn("There is no result operator for the App Data Source {}.{}. Ignoring the App Data Source.", operatorMeta.getName(), portMeta.getPortName());
continue;
@@ -715,7 +715,7 @@ public class StreamingContainerManager implements PlanContext
LOG.warn("There are multiple result operators for the App Data Source {}.{}. Ignoring the App Data Source.", operatorMeta.getName(), portMeta.getPortName());
continue;
}
- OperatorMeta resultOperatorMeta = sinks.get(0).getOperatorWrapper();
+ OperatorMeta resultOperatorMeta = sinks.iterator().next().getOperatorMeta();
if (resultOperatorMeta.getOperator() instanceof AppData.ConnectionInfoProvider) {
AppData.ConnectionInfoProvider resultOperator = (AppData.ConnectionInfoProvider)resultOperatorMeta.getOperator();
appDataSource.setResultOperatorName(resultOperatorMeta.getName());
http://git-wip-us.apache.org/repos/asf/apex-core/blob/abc836ca/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java b/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java
index 6607321..b1d1fd8 100644
--- a/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java
+++ b/engine/src/main/java/com/datatorrent/stram/codec/LogicalPlanSerializer.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import javax.ws.rs.Produces;
@@ -203,11 +202,11 @@ public class LogicalPlanSerializer extends JsonSerializer<LogicalPlan>
sourcePortDetailMap.put("portName", sourcePortName);
streamDetailMap.put("name", streamName);
streamDetailMap.put("source", sourcePortDetailMap);
- List<InputPortMeta> sinks = streamMeta.getSinks();
+ Collection<InputPortMeta> sinks = streamMeta.getSinks();
ArrayList<HashMap<String, Object>> sinkPortList = new ArrayList<>();
for (InputPortMeta sinkPort : sinks) {
HashMap<String, Object> sinkPortDetailMap = new HashMap<>();
- sinkPortDetailMap.put("operatorName", sinkPort.getOperatorWrapper().getName());
+ sinkPortDetailMap.put("operatorName", sinkPort.getOperatorMeta().getName());
sinkPortDetailMap.put("portName", sinkPort.getPortName());
sinkPortList.add(sinkPortDetailMap);
}
@@ -257,14 +256,14 @@ public class LogicalPlanSerializer extends JsonSerializer<LogicalPlan>
for (StreamMeta streamMeta : allStreams) {
String streamKey = LogicalPlanConfiguration.STREAM_PREFIX + streamMeta.getName();
OutputPortMeta source = streamMeta.getSource();
- List<InputPortMeta> sinks = streamMeta.getSinks();
+ Collection<InputPortMeta> sinks = streamMeta.getSinks();
props.setProperty(streamKey + "." + LogicalPlanConfiguration.STREAM_SOURCE, source.getOperatorMeta().getName() + "." + source.getPortName());
String sinksValue = "";
for (InputPortMeta sink : sinks) {
if (!sinksValue.isEmpty()) {
sinksValue += ",";
}
- sinksValue += sink.getOperatorWrapper().getName() + "." + sink.getPortName();
+ sinksValue += sink.getOperatorMeta().getName() + "." + sink.getPortName();
}
props.setProperty(streamKey + "." + LogicalPlanConfiguration.STREAM_SINKS, sinksValue);
if (streamMeta.getLocality() != null) {
http://git-wip-us.apache.org/repos/asf/apex-core/blob/abc836ca/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 1371ce8..e1debbb 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -242,7 +242,8 @@ public class LogicalPlan implements Serializable, DAG
//This is null when port is not hidden
private Class<?> classDeclaringHiddenPort;
- public OperatorMeta getOperatorWrapper()
+ @Override
+ public OperatorMeta getOperatorMeta()
{
return operatorMeta;
}
@@ -252,7 +253,8 @@ public class LogicalPlan implements Serializable, DAG
return fieldName;
}
- public InputPort<?> getPortObject()
+ @Override
+ public InputPort<?> getPort()
{
for (Map.Entry<InputPort<?>, InputPortMeta> e : operatorMeta.getPortMapping().inPortMap.entrySet()) {
if (e.getValue() == this) {
@@ -328,6 +330,7 @@ public class LogicalPlan implements Serializable, DAG
this.attributes = new DefaultAttributeMap();
}
+ @Override
public OperatorMeta getOperatorMeta()
{
return operatorMeta;
@@ -363,7 +366,8 @@ public class LogicalPlan implements Serializable, DAG
return fieldName;
}
- public OutputPort<?> getPortObject()
+ @Override
+ public OutputPort<?> getPort()
{
for (Map.Entry<OutputPort<?>, OutputPortMeta> e : operatorMeta.getPortMapping().outPortMap.entrySet()) {
if (e.getValue() == this) {
@@ -504,7 +508,8 @@ public class LogicalPlan implements Serializable, DAG
return this;
}
- public List<InputPortMeta> getSinks()
+ @Override
+ public Collection<InputPortMeta> getSinks()
{
return sinks;
}
@@ -517,7 +522,7 @@ public class LogicalPlan implements Serializable, DAG
return this;
}
InputPortMeta portMeta = assertGetPortMeta(port);
- OperatorMeta om = portMeta.getOperatorWrapper();
+ OperatorMeta om = portMeta.getOperatorMeta();
String portName = portMeta.getPortName();
if (om.inputStreams.containsKey(portMeta)) {
throw new IllegalArgumentException(String.format("Port %s already connected to stream %s", portName, om.inputStreams.get(portMeta)));
@@ -539,9 +544,9 @@ public class LogicalPlan implements Serializable, DAG
public void remove()
{
for (InputPortMeta ipm : this.sinks) {
- ipm.getOperatorWrapper().inputStreams.remove(ipm);
- if (ipm.getOperatorWrapper().inputStreams.isEmpty()) {
- rootOperators.add(ipm.getOperatorWrapper());
+ ipm.getOperatorMeta().inputStreams.remove(ipm);
+ if (ipm.getOperatorMeta().inputStreams.isEmpty()) {
+ rootOperators.add(ipm.getOperatorMeta());
}
}
// Remove persist operator for at stream level if present:
@@ -697,7 +702,7 @@ public class LogicalPlan implements Serializable, DAG
private void setPersistOperatorInputPort(InputPortMeta inport)
{
- this.addSink(inport.getPortObject());
+ this.addSink(inport.getPort());
this.persistOperatorInputPort = inport;
}
@@ -714,7 +719,7 @@ public class LogicalPlan implements Serializable, DAG
private String getPersistOperatorName(InputPort<?> sinkToPersist)
{
InputPortMeta portMeta = assertGetPortMeta(sinkToPersist);
- OperatorMeta operatorMeta = portMeta.getOperatorWrapper();
+ OperatorMeta operatorMeta = portMeta.getOperatorMeta();
return id + "_" + operatorMeta.getName() + "_persister";
}
@@ -735,7 +740,7 @@ public class LogicalPlan implements Serializable, DAG
{
StreamCodec<Object> inputStreamCodec = sinkToPersistPortMeta.getValue(PortContext.STREAM_CODEC) != null
? (StreamCodec<Object>)sinkToPersistPortMeta.getValue(PortContext.STREAM_CODEC)
- : (StreamCodec<Object>)sinkToPersistPortMeta.getPortObject().getStreamCodec();
+ : (StreamCodec<Object>)sinkToPersistPortMeta.getPort().getStreamCodec();
if (inputStreamCodec != null) {
Map<InputPortMeta, StreamCodec<Object>> codecs = new HashMap<>();
codecs.put(sinkToPersistPortMeta, inputStreamCodec);
@@ -1138,6 +1143,7 @@ public class LogicalPlan implements Serializable, DAG
return getPortMapping().inPortMap.get(port);
}
+ @Override
public Map<InputPortMeta, StreamMeta> getInputStreams()
{
return this.inputStreams;
@@ -1359,7 +1365,7 @@ public class LogicalPlan implements Serializable, DAG
Map<InputPortMeta, StreamMeta> inputStreams = om.getInputStreams();
for (Map.Entry<InputPortMeta, StreamMeta> e : inputStreams.entrySet()) {
StreamMeta stream = e.getValue();
- if (e.getKey().getOperatorWrapper() == om) {
+ if (e.getKey().getOperatorMeta() == om) {
stream.sinks.remove(e.getKey());
}
// If persistStream was enabled for stream, reset stream when sink removed
@@ -1424,12 +1430,12 @@ public class LogicalPlan implements Serializable, DAG
OutputPortMeta sourceMeta = streamMeta.getSource();
List<InputPort<?>> ports = new LinkedList<>();
for (InputPortMeta inputPortMeta : streamMeta.getSinks()) {
- ports.add(inputPortMeta.getPortObject());
+ ports.add(inputPortMeta.getPort());
}
InputPort[] inputPorts = ports.toArray(new InputPort[]{});
name = subDAGName + MODULE_NAMESPACE_SEPARATOR + streamMeta.getName();
- StreamMeta streamMetaNew = this.addStream(name, sourceMeta.getPortObject(), inputPorts);
+ StreamMeta streamMetaNew = this.addStream(name, sourceMeta.getPort(), inputPorts);
streamMetaNew.setLocality(streamMeta.getLocality());
}
}
@@ -1531,6 +1537,12 @@ public class LogicalPlan implements Serializable, DAG
return Collections.unmodifiableList(this.rootOperators);
}
+ @Override
+ public List<OperatorMeta> getRootOperatorsMeta()
+ {
+ return getRootOperators();
+ }
+
public List<OperatorMeta> getLeafOperators()
{
return Collections.unmodifiableList(this.leafOperators);
@@ -1541,6 +1553,12 @@ public class LogicalPlan implements Serializable, DAG
return Collections.unmodifiableCollection(this.operators.values());
}
+ @Override
+ public Collection<OperatorMeta> getAllOperatorsMeta()
+ {
+ return getAllOperators();
+ }
+
public Collection<ModuleMeta> getAllModules()
{
return Collections.unmodifiableCollection(this.modules.values());
@@ -1552,6 +1570,12 @@ public class LogicalPlan implements Serializable, DAG
}
@Override
+ public Collection<StreamMeta> getAllStreamsMeta()
+ {
+ return getAllStreams();
+ }
+
+ @Override
public OperatorMeta getOperatorMeta(String operatorName)
{
return this.operators.get(operatorName);
@@ -1618,7 +1642,7 @@ public class LogicalPlan implements Serializable, DAG
if (streamCodec != null) {
classNames.add(streamCodec.getClass().getName());
} else {
- StreamCodec<?> codec = sink.getPortObject().getStreamCodec();
+ StreamCodec<?> codec = sink.getPort().getStreamCodec();
if (codec != null) {
classNames.add(codec.getClass().getName());
}
@@ -1927,7 +1951,7 @@ public class LogicalPlan implements Serializable, DAG
for (StreamMeta stream : getAllStreams()) {
String source = stream.source.getOperatorMeta().getName();
for (InputPortMeta sink : stream.sinks) {
- String sinkOperator = sink.getOperatorWrapper().getName();
+ String sinkOperator = sink.getOperatorMeta().getName();
OperatorPair pair = new OperatorPair(source, sinkOperator);
if (stream.getLocality() != null && stream.getLocality().ordinal() <= Locality.NODE_LOCAL.ordinal() && hostNamesMapping.containsKey(pair.first) && hostNamesMapping.containsKey(pair.second) && !hostNamesMapping.get(pair.first).equals(hostNamesMapping.get(pair.second))) {
throw new ValidationException(String.format("Host Locality for operators: %s(host: %s) & %s(host: %s) conflicts with stream locality", pair.first, hostNamesMapping.get(pair.first), pair.second, hostNamesMapping.get(pair.second)));
@@ -2188,7 +2212,7 @@ public class LogicalPlan implements Serializable, DAG
// depth first successors traversal
for (StreamMeta downStream: om.outputStreams.values()) {
for (InputPortMeta sink: downStream.sinks) {
- OperatorMeta successor = sink.getOperatorWrapper();
+ OperatorMeta successor = sink.getOperatorMeta();
if (successor == null) {
continue;
}
@@ -2256,7 +2280,7 @@ public class LogicalPlan implements Serializable, DAG
for (StreamMeta downStream: om.outputStreams.values()) {
for (InputPortMeta sink : downStream.sinks) {
- OperatorMeta successor = sink.getOperatorWrapper();
+ OperatorMeta successor = sink.getOperatorMeta();
if (isDelayOperator) {
sink.attributes.put(IS_CONNECTED_TO_DELAY_OPERATOR, true);
// Check whether all downstream operators are already visited in the path
@@ -2285,7 +2309,7 @@ public class LogicalPlan implements Serializable, DAG
Operator.ProcessingMode pm = om.getValue(OperatorContext.PROCESSING_MODE);
for (StreamMeta os : om.outputStreams.values()) {
for (InputPortMeta sink: os.sinks) {
- OperatorMeta sinkOm = sink.getOperatorWrapper();
+ OperatorMeta sinkOm = sink.getOperatorMeta();
Operator.ProcessingMode sinkPm = sinkOm.attributes == null ? null : sinkOm.attributes.get(OperatorContext.PROCESSING_MODE);
if (sinkPm == null) {
// If the source processing mode is AT_MOST_ONCE and a processing mode is not specified for the sink then
http://git-wip-us.apache.org/repos/asf/apex-core/blob/abc836ca/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java
index d5045b4..471dca2 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java
@@ -359,7 +359,7 @@ public class PTOperator implements java.io.Serializable
if (partitionKeys != null) {
pkeys = Maps.newHashMapWithExpectedSize(partitionKeys.size());
for (Map.Entry<InputPortMeta, PartitionKeys> e : partitionKeys.entrySet()) {
- pkeys.put(e.getKey().getPortObject(), e.getValue());
+ pkeys.put(e.getKey().getPort(), e.getValue());
}
}
return pkeys;
http://git-wip-us.apache.org/repos/asf/apex-core/blob/abc836ca/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index 4181971..94f47e6 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -573,7 +573,7 @@ public class PhysicalPlan implements Serializable
private void updatePersistOperatorWithSinkPartitions(InputPortMeta persistInputPort, OperatorMeta persistOperatorMeta, StreamCodecWrapperForPersistance<?> persistCodec, InputPortMeta sinkPortMeta)
{
- Collection<PTOperator> ptOperators = getOperators(sinkPortMeta.getOperatorWrapper());
+ Collection<PTOperator> ptOperators = getOperators(sinkPortMeta.getOperatorMeta());
Collection<PartitionKeys> partitionKeysList = new ArrayList<>();
for (PTOperator p : ptOperators) {
PartitionKeys keys = p.partitionKeys.get(sinkPortMeta);
@@ -593,7 +593,7 @@ public class PhysicalPlan implements Serializable
Map<InputPortMeta, StreamCodec<?>> inputStreamCodecs = new HashMap<>();
// Logging is enabled for the stream
for (InputPortMeta portMeta : s.getSinksToPersist()) {
- InputPort<?> port = portMeta.getPortObject();
+ InputPort<?> port = portMeta.getPort();
StreamCodec<?> inputStreamCodec = (portMeta.getValue(PortContext.STREAM_CODEC) != null) ? portMeta.getValue(PortContext.STREAM_CODEC) : port.getStreamCodec();
if (inputStreamCodec != null) {
boolean alreadyAdded = false;
@@ -619,7 +619,7 @@ public class PhysicalPlan implements Serializable
// Create Wrapper codec for Stream persistence using all unique
// stream codecs
// Logger should write merged or union of all input stream codecs
- StreamCodec<?> specifiedCodecForLogger = (s.getPersistOperatorInputPort().getValue(PortContext.STREAM_CODEC) != null) ? s.getPersistOperatorInputPort().getValue(PortContext.STREAM_CODEC) : s.getPersistOperatorInputPort().getPortObject().getStreamCodec();
+ StreamCodec<?> specifiedCodecForLogger = (s.getPersistOperatorInputPort().getValue(PortContext.STREAM_CODEC) != null) ? s.getPersistOperatorInputPort().getValue(PortContext.STREAM_CODEC) : s.getPersistOperatorInputPort().getPort().getStreamCodec();
@SuppressWarnings({ "unchecked", "rawtypes" })
StreamCodecWrapperForPersistance<Object> codec = new StreamCodecWrapperForPersistance(inputStreamCodecs, specifiedCodecForLogger);
streamMetaToCodecMap.put(s, codec);
@@ -629,7 +629,7 @@ public class PhysicalPlan implements Serializable
}
for (java.util.Map.Entry<StreamMeta, StreamCodec<?>> entry : streamMetaToCodecMap.entrySet()) {
- dag.setInputPortAttribute(entry.getKey().getPersistOperatorInputPort().getPortObject(), PortContext.STREAM_CODEC, entry.getValue());
+ dag.setInputPortAttribute(entry.getKey().getPersistOperatorInputPort().getPort(), PortContext.STREAM_CODEC, entry.getValue());
}
} catch (Exception e) {
throw Throwables.propagate(e);
@@ -1326,7 +1326,7 @@ public class PhysicalPlan implements Serializable
// copy list as it is modified by recursive remove
for (PTInput in : Lists.newArrayList(out.sinks)) {
for (LogicalPlan.InputPortMeta im : in.logicalStream.getSinks()) {
- PMapping m = this.logicalToPTOperator.get(im.getOperatorWrapper());
+ PMapping m = this.logicalToPTOperator.get(im.getOperatorMeta());
if (m.parallelPartitions == operatorMapping.parallelPartitions) {
// associated operator parallel partitioned
removePartition(in.target, operatorMapping);
@@ -1439,7 +1439,7 @@ public class PhysicalPlan implements Serializable
List<InputPort<?>> inputPortList = Lists.newArrayList();
for (InputPortMeta inputPortMeta: operatorMeta.getInputStreams().keySet()) {
- inputPortList.add(inputPortMeta.getPortObject());
+ inputPortList.add(inputPortMeta.getPort());
}
return inputPortList;
@@ -1609,7 +1609,7 @@ public class PhysicalPlan implements Serializable
for (PTInput in : operator.inputs) {
if (in.logicalStream.getPersistOperator() != null) {
for (InputPortMeta inputPort : in.logicalStream.getSinksToPersist()) {
- if (inputPort.getOperatorWrapper().equals(operator.operatorMeta)) {
+ if (inputPort.getOperatorMeta().equals(operator.operatorMeta)) {
// Redeploy the stream wide persist operator only if the current sink is being persisted
persistOperators.addAll(getOperators(in.logicalStream.getPersistOperator()));
break;
@@ -1689,7 +1689,7 @@ public class PhysicalPlan implements Serializable
{
// remove incoming connections for logical stream
for (InputPortMeta ipm : sm.getSinks()) {
- OperatorMeta om = ipm.getOperatorWrapper();
+ OperatorMeta om = ipm.getOperatorMeta();
PMapping m = this.logicalToPTOperator.get(om);
if (m == null) {
throw new AssertionError("Unknown operator " + om);
@@ -1735,7 +1735,7 @@ public class PhysicalPlan implements Serializable
*/
public void connectInput(InputPortMeta ipm)
{
- for (Map.Entry<LogicalPlan.InputPortMeta, StreamMeta> inputEntry : ipm.getOperatorWrapper().getInputStreams().entrySet()) {
+ for (Map.Entry<LogicalPlan.InputPortMeta, StreamMeta> inputEntry : ipm.getOperatorMeta().getInputStreams().entrySet()) {
if (inputEntry.getKey() == ipm) {
// initialize outputs for existing operators
for (Map.Entry<LogicalPlan.OutputPortMeta, StreamMeta> outputEntry : inputEntry.getValue().getSource().getOperatorMeta().getOutputStreams().entrySet()) {
@@ -1746,7 +1746,7 @@ public class PhysicalPlan implements Serializable
deployOpers.add(oper);
}
}
- PMapping m = this.logicalToPTOperator.get(ipm.getOperatorWrapper());
+ PMapping m = this.logicalToPTOperator.get(ipm.getOperatorMeta());
updateStreamMappings(m);
for (PTOperator oper : m.partitions) {
undeployOpers.add(oper);
http://git-wip-us.apache.org/repos/asf/apex-core/blob/abc836ca/engine/src/main/java/com/datatorrent/stram/plan/physical/PlanModifier.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PlanModifier.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PlanModifier.java
index 165517d..3fbd6f9 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PlanModifier.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PlanModifier.java
@@ -90,7 +90,7 @@ public class PlanModifier
sm.addSink(sink);
if (physicalPlan != null) {
for (InputPortMeta ipm : sm.getSinks()) {
- if (ipm.getPortObject() == sink) {
+ if (ipm.getPort() == sink) {
physicalPlan.connectInput(ipm);
}
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/abc836ca/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
index 81d6d44..c50ed79 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
@@ -209,8 +209,8 @@ public class StreamMapping implements java.io.Serializable
for (InputPortMeta ipm : streamMeta.getSinks()) {
// gets called prior to all logical operators mapped
// skipped for parallel partitions - those are handled elsewhere
- if (!ipm.getValue(PortContext.PARTITION_PARALLEL) && plan.hasMapping(ipm.getOperatorWrapper())) {
- List<PTOperator> partitions = plan.getOperators(ipm.getOperatorWrapper());
+ if (!ipm.getValue(PortContext.PARTITION_PARALLEL) && plan.hasMapping(ipm.getOperatorMeta())) {
+ List<PTOperator> partitions = plan.getOperators(ipm.getOperatorMeta());
for (PTOperator doper : partitions) {
downstreamOpers.add(new Pair<>(doper, ipm));
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/abc836ca/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
index 4ff9e51..8cb4871 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
@@ -932,7 +932,7 @@ public class StreamCodecTest
PTOperator.PTOutput out = operator.getOutputs().get(0);
Assert.assertEquals("unifier sinks " + operator.getName(), 1, out.sinks.size());
PTOperator.PTInput idInput = out.sinks.get(0);
- LogicalPlan.OperatorMeta idMeta = StreamingContainerAgent.getIdentifyingInputPortMeta(idInput).getOperatorWrapper();
+ LogicalPlan.OperatorMeta idMeta = StreamingContainerAgent.getIdentifyingInputPortMeta(idInput).getOperatorMeta();
Operator.InputPort<?> idInputPort = null;
if (idMeta == n2meta) {
idInputPort = node2.inport1;
http://git-wip-us.apache.org/repos/asf/apex-core/blob/abc836ca/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
index dbb8d34..caa1bf3 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/LogicalPlanConfigurationTest.java
@@ -143,7 +143,7 @@ public class LogicalPlanConfigurationTest
assertEquals("rootNode out is operator2 in", n1n2, operator1.getOutputStreams().get(operator1.getMeta(((TestGeneratorInputOperator)operator1.getOperator()).outport)));
assertEquals("n1n2 source", operator1, n1n2.getSource().getOperatorMeta());
Assert.assertEquals("n1n2 targets", 1, n1n2.getSinks().size());
- Assert.assertEquals("n1n2 target", operator2, n1n2.getSinks().get(0).getOperatorWrapper());
+ Assert.assertEquals("n1n2 target", operator2, n1n2.getSinks().iterator().next().getOperatorMeta());
assertEquals("stream name", "n1n2", n1n2.getName());
Assert.assertEquals("n1n2 not inline (default)", null, n1n2.getLocality());
@@ -154,7 +154,7 @@ public class LogicalPlanConfigurationTest
Set<OperatorMeta> targetNodes = Sets.newHashSet();
for (LogicalPlan.InputPortMeta ip : fromNode2.getSinks()) {
- targetNodes.add(ip.getOperatorWrapper());
+ targetNodes.add(ip.getOperatorMeta());
}
Assert.assertEquals("outputs " + fromNode2, Sets.newHashSet(operator3, operator4), targetNodes);
@@ -181,7 +181,7 @@ public class LogicalPlanConfigurationTest
for (StreamMeta downStream : operator.getOutputStreams().values()) {
if (!downStream.getSinks().isEmpty()) {
for (LogicalPlan.InputPortMeta targetNode : downStream.getSinks()) {
- printTopology(targetNode.getOperatorWrapper(), tplg, level + 1);
+ printTopology(targetNode.getOperatorMeta(), tplg, level + 1);
}
}
}
@@ -228,7 +228,7 @@ public class LogicalPlanConfigurationTest
Assert.assertEquals("input1 source", dag.getOperatorMeta("inputOperator"), input1.getSource().getOperatorMeta());
Set<OperatorMeta> targetNodes = Sets.newHashSet();
for (LogicalPlan.InputPortMeta targetPort : input1.getSinks()) {
- targetNodes.add(targetPort.getOperatorWrapper());
+ targetNodes.add(targetPort.getOperatorMeta());
}
Assert.assertEquals("input1 target ", Sets.newHashSet(dag.getOperatorMeta("operator1"), operator3, operator4), targetNodes);
@@ -296,7 +296,7 @@ public class LogicalPlanConfigurationTest
Assert.assertEquals("input1 source", inputOperator, input1.getSource().getOperatorMeta());
Set<OperatorMeta> targetNodes = Sets.newHashSet();
for (LogicalPlan.InputPortMeta targetPort : input1.getSinks()) {
- targetNodes.add(targetPort.getOperatorWrapper());
+ targetNodes.add(targetPort.getOperatorMeta());
}
Assert.assertEquals("operator attribute " + inputOperator, 64, (int)inputOperator.getValue(OperatorContext.MEMORY_MB));
Assert.assertEquals("port attribute " + inputOperator, 8, (int)input1.getSource().getValue(PortContext.UNIFIER_LIMIT));
http://git-wip-us.apache.org/repos/asf/apex-core/blob/abc836ca/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java
index 97a375f..5b5583a 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/logical/module/TestModuleExpansion.java
@@ -459,7 +459,7 @@ public class TestModuleExpansion
List<String> sinksName = new ArrayList<>();
for (LogicalPlan.InputPortMeta inputPortMeta : streamMeta.getSinks()) {
- sinksName.add(inputPortMeta.getOperatorWrapper().getName());
+ sinksName.add(inputPortMeta.getOperatorMeta().getName());
}
Assert.assertTrue(inputOperatorName.equals(sourceName));