You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2017/03/21 06:27:16 UTC
[1/2] apex-core git commit: APEXCORE-504 - Possible race condition in
StreamingContainerAgent.getStreamCodec()
Repository: apex-core
Updated Branches:
refs/heads/master 491e2e332 -> d90e8829f
APEXCORE-504 - Possible race condition in StreamingContainerAgent.getStreamCodec()
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/283ebe23
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/283ebe23
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/283ebe23
Branch: refs/heads/master
Commit: 283ebe232b3099b2339184b8a54dbcf6066680dc
Parents: abc836c
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Mon Jan 30 17:24:45 2017 -0800
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Fri Feb 17 15:54:53 2017 -0800
----------------------------------------------------------------------
.../stram/StreamingContainerAgent.java | 53 +++++++++-----------
.../stram/StreamingContainerManager.java | 4 +-
.../stram/plan/logical/LogicalPlan.java | 39 +++++++++-----
.../stram/plan/physical/PhysicalPlan.java | 16 +++---
.../stram/plan/physical/StreamMapping.java | 13 ++---
.../com/datatorrent/stram/StreamCodecTest.java | 8 +--
.../stram/plan/StreamPersistanceTests.java | 2 +-
7 files changed, 69 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-core/blob/283ebe23/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
index eb7aa43..0aa0b83 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
@@ -202,7 +202,11 @@ public class StreamingContainerAgent
if (ndi.type == OperatorDeployInfo.OperatorType.UNIFIER) {
// input attributes of the downstream operator
for (InputPortMeta sink : streamMeta.getSinks()) {
- portInfo.contextAttributes = sink.getAttributes();
+ try {
+ portInfo.contextAttributes = sink.getAttributes().clone();
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeException("Cannot clone attributes", e);
+ }
break;
}
}
@@ -215,11 +219,11 @@ public class StreamingContainerAgent
for (PTOperator.PTInput input : out.sinks) {
// Create mappings for all non-inline operators
if (input.target.getContainer() != out.source.getContainer()) {
- InputPortMeta inputPortMeta = getIdentifyingInputPortMeta(input);
- StreamCodec<?> streamCodecInfo = getStreamCodec(inputPortMeta);
- Integer id = physicalPlan.getStreamCodecIdentifier(streamCodecInfo);
+ final StreamCodec<?> streamCodec = getIdentifyingInputPortMeta(input).getStreamCodec();
+ final Integer id = physicalPlan.getStreamCodecIdentifier(streamCodec);
+ // TODO: replace with inputInfo.streamCodecs.putIfAbsent() after support for JDK 1.7 is dropped.
if (!portInfo.streamCodecs.containsKey(id)) {
- portInfo.streamCodecs.put(id, streamCodecInfo);
+ portInfo.streamCodecs.put(id, streamCodec);
}
}
}
@@ -248,11 +252,19 @@ public class StreamingContainerAgent
InputPortMeta inputPortMeta = getInputPortMeta(oper.getOperatorMeta(), streamMeta);
if (inputPortMeta != null) {
- inputInfo.contextAttributes = inputPortMeta.getAttributes();
+ try {
+ inputInfo.contextAttributes = inputPortMeta.getAttributes().clone();
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeException("Cannot clone attributes", e);
+ }
}
if (inputInfo.contextAttributes == null && ndi.type == OperatorDeployInfo.OperatorType.UNIFIER) {
- inputInfo.contextAttributes = in.source.logicalStream.getSource().getAttributes();
+ try {
+ inputInfo.contextAttributes = in.source.logicalStream.getSource().getAttributes().clone();
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeException("Cannot clone attributes", e);
+ }
}
inputInfo.sourceNodeId = sourceOutput.source.getId();
@@ -288,10 +300,12 @@ public class StreamingContainerAgent
// On the input side there is a unlikely scenario of partitions even for inline stream that is being
// handled. Always specifying a stream codec configuration in case that scenario happens.
- InputPortMeta idInputPortMeta = getIdentifyingInputPortMeta(in);
- StreamCodec<?> streamCodecInfo = getStreamCodec(idInputPortMeta);
- Integer id = physicalPlan.getStreamCodecIdentifier(streamCodecInfo);
- inputInfo.streamCodecs.put(id, streamCodecInfo);
+ final StreamCodec<?> streamCodec = getIdentifyingInputPortMeta(in).getStreamCodec();
+ final Integer id = physicalPlan.getStreamCodecIdentifier(streamCodec);
+ // TODO: replace with inputInfo.streamCodecs.putIfAbsent() after support for JDK 1.7 is dropped.
+ if (!inputInfo.streamCodecs.containsKey(id)) {
+ inputInfo.streamCodecs.put(id, streamCodec);
+ }
ndi.inputs.add(inputInfo);
}
}
@@ -344,23 +358,6 @@ public class StreamingContainerAgent
return operator;
}
- public static StreamCodec<?> getStreamCodec(InputPortMeta inputPortMeta)
- {
- if (inputPortMeta != null) {
- StreamCodec<?> codec = inputPortMeta.getValue(PortContext.STREAM_CODEC);
- if (codec == null) {
- // it cannot be this object that gets returned. Depending on this value is dangerous
- codec = inputPortMeta.getPort().getStreamCodec();
- if (codec != null) {
- // don't create codec multiple times - it will assign a new identifier
- inputPortMeta.getAttributes().put(PortContext.STREAM_CODEC, codec);
- }
- }
- return codec;
- }
- return null;
- }
-
/**
* Create deploy info for operator.
* <p>
http://git-wip-us.apache.org/repos/asf/apex-core/blob/283ebe23/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index dfbc7d1..0b4afda 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -2330,8 +2330,8 @@ public class StreamingContainerManager implements PlanContext
for (PTOperator.PTOutput out : operator.getOutputs()) {
if (!out.isDownStreamInline()) {
for (InputPortMeta ipm : out.logicalStream.getSinks()) {
- StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(ipm);
- Integer codecId = plan.getStreamCodecIdentifier(streamCodecInfo);
+ StreamCodec<?> streamCodec = ipm.getStreamCodec();
+ Integer codecId = plan.getStreamCodecIdentifier(streamCodec);
// following needs to match the concat logic in StreamingContainer
String sourceIdentifier = Integer.toString(operator.getId()).concat(Component.CONCAT_SEPARATOR).concat(out.portName).concat(Component.CONCAT_SEPARATOR).concat(codecId.toString());
if (operator.getContainer().getState() == PTContainer.State.ACTIVE) {
http://git-wip-us.apache.org/repos/asf/apex-core/blob/283ebe23/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index e1debbb..19b13fe 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -103,6 +103,8 @@ import com.datatorrent.common.util.Pair;
import com.datatorrent.stram.engine.DefaultUnifier;
import com.datatorrent.stram.engine.Slider;
+import static com.datatorrent.api.Context.PortContext.STREAM_CODEC;
+
/**
* DAG contains the logical declarations of operators and streams.
* <p>
@@ -309,6 +311,20 @@ public class LogicalPlan implements Serializable, DAG
throw new UnsupportedOperationException("Not supported yet.");
}
+ public StreamCodec<?> getStreamCodec()
+ {
+ return attributes.get(STREAM_CODEC);
+ }
+
+ void setStreamCodec(StreamCodec<?> streamCodec)
+ {
+ if (streamCodec != null) {
+ StreamCodec<?> oldStreamCodec = attributes.put(STREAM_CODEC, streamCodec);
+ if (oldStreamCodec != null && oldStreamCodec != streamCodec) { // once input port codec is set, it is not expected that it will be changed.
+ LOG.warn("Operator {} input port {} stream codec was changed from {} to {}", getOperatorMeta().getName(), getPortName(), oldStreamCodec, streamCodec);
+ }
+ }
+ }
}
public final class OutputPortMeta implements DAG.OutputPortMeta, Serializable
@@ -738,18 +754,14 @@ public class LogicalPlan implements Serializable, DAG
private void addStreamCodec(InputPortMeta sinkToPersistPortMeta, InputPort<?> port)
{
- StreamCodec<Object> inputStreamCodec = sinkToPersistPortMeta.getValue(PortContext.STREAM_CODEC) != null
- ? (StreamCodec<Object>)sinkToPersistPortMeta.getValue(PortContext.STREAM_CODEC)
- : (StreamCodec<Object>)sinkToPersistPortMeta.getPort().getStreamCodec();
+ StreamCodec<Object> inputStreamCodec = (StreamCodec<Object>)sinkToPersistPortMeta.getStreamCodec();
if (inputStreamCodec != null) {
Map<InputPortMeta, StreamCodec<Object>> codecs = new HashMap<>();
codecs.put(sinkToPersistPortMeta, inputStreamCodec);
InputPortMeta persistOperatorPortMeta = assertGetPortMeta(port);
- StreamCodec<Object> specifiedCodecForPersistOperator = (persistOperatorPortMeta.getValue(PortContext.STREAM_CODEC) != null)
- ? (StreamCodec<Object>)persistOperatorPortMeta.getValue(PortContext.STREAM_CODEC)
- : (StreamCodec<Object>)port.getStreamCodec();
+ StreamCodec<Object> specifiedCodecForPersistOperator = (StreamCodec<Object>)persistOperatorPortMeta.getStreamCodec();
StreamCodecWrapperForPersistance<Object> codec = new StreamCodecWrapperForPersistance<Object>(codecs, specifiedCodecForPersistOperator);
- setInputPortAttribute(port, PortContext.STREAM_CODEC, codec);
+ persistOperatorPortMeta.setStreamCodec(codec);
}
}
@@ -1070,6 +1082,12 @@ public class LogicalPlan implements Serializable, DAG
metaPort.adqAnnotation = adqAnnotation;
inPortMap.put(portObject, metaPort);
markInputPortIfHidden(metaPort.getPortName(), metaPort, field.getDeclaringClass());
+ if (metaPort.getStreamCodec() == null) {
+ metaPort.setStreamCodec(portObject.getStreamCodec());
+ } else if (portObject.getStreamCodec() != null) {
+ LOG.info("Operator {} input port {} attribute {} overrides codec {} with {} codec", metaPort.getOperatorMeta().getName(),
+ metaPort.getPortName(), STREAM_CODEC.getSimpleName(), portObject.getStreamCodec(), metaPort.getStreamCodec());
+ }
}
@Override
@@ -1638,14 +1656,9 @@ public class LogicalPlan implements Serializable, DAG
}
for (StreamMeta n: this.streams.values()) {
for (InputPortMeta sink : n.getSinks()) {
- StreamCodec<?> streamCodec = sink.getValue(PortContext.STREAM_CODEC);
+ StreamCodec<?> streamCodec = sink.getStreamCodec();
if (streamCodec != null) {
classNames.add(streamCodec.getClass().getName());
- } else {
- StreamCodec<?> codec = sink.getPort().getStreamCodec();
- if (codec != null) {
- classNames.add(codec.getClass().getName());
- }
}
}
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/283ebe23/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index 94f47e6..ce22bfd 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -543,7 +543,7 @@ public class PhysicalPlan implements Serializable
for (StreamMeta s : n.getOutputStreams().values()) {
if (s.getPersistOperator() != null) {
InputPortMeta persistInputPort = s.getPersistOperatorInputPort();
- StreamCodecWrapperForPersistance<?> persistCodec = (StreamCodecWrapperForPersistance<?>)persistInputPort.getAttributes().get(PortContext.STREAM_CODEC);
+ StreamCodecWrapperForPersistance<?> persistCodec = (StreamCodecWrapperForPersistance<?>)persistInputPort.getStreamCodec();
if (persistCodec == null) {
continue;
}
@@ -556,12 +556,9 @@ public class PhysicalPlan implements Serializable
// Check partitioning for persist operators per sink too
for (Map.Entry<InputPortMeta, InputPortMeta> entry : s.sinkSpecificPersistInputPortMap.entrySet()) {
InputPortMeta persistInputPort = entry.getValue();
- StreamCodec<?> codec = persistInputPort.getAttributes().get(PortContext.STREAM_CODEC);
- if (codec != null) {
- if (codec instanceof StreamCodecWrapperForPersistance) {
- StreamCodecWrapperForPersistance<?> persistCodec = (StreamCodecWrapperForPersistance<?>)codec;
- updatePersistOperatorWithSinkPartitions(persistInputPort, s.sinkSpecificPersistOperatorMap.get(entry.getKey()), persistCodec, entry.getKey());
- }
+ StreamCodec<?> streamCodec = persistInputPort.getStreamCodec();
+ if (streamCodec != null && streamCodec instanceof StreamCodecWrapperForPersistance) {
+ updatePersistOperatorWithSinkPartitions(persistInputPort, s.sinkSpecificPersistOperatorMap.get(entry.getKey()), (StreamCodecWrapperForPersistance<?>)streamCodec, entry.getKey());
}
}
}
@@ -593,8 +590,7 @@ public class PhysicalPlan implements Serializable
Map<InputPortMeta, StreamCodec<?>> inputStreamCodecs = new HashMap<>();
// Logging is enabled for the stream
for (InputPortMeta portMeta : s.getSinksToPersist()) {
- InputPort<?> port = portMeta.getPort();
- StreamCodec<?> inputStreamCodec = (portMeta.getValue(PortContext.STREAM_CODEC) != null) ? portMeta.getValue(PortContext.STREAM_CODEC) : port.getStreamCodec();
+ StreamCodec<?> inputStreamCodec = portMeta.getStreamCodec();
if (inputStreamCodec != null) {
boolean alreadyAdded = false;
@@ -619,7 +615,7 @@ public class PhysicalPlan implements Serializable
// Create Wrapper codec for Stream persistence using all unique
// stream codecs
// Logger should write merged or union of all input stream codecs
- StreamCodec<?> specifiedCodecForLogger = (s.getPersistOperatorInputPort().getValue(PortContext.STREAM_CODEC) != null) ? s.getPersistOperatorInputPort().getValue(PortContext.STREAM_CODEC) : s.getPersistOperatorInputPort().getPort().getStreamCodec();
+ StreamCodec<?> specifiedCodecForLogger = s.getPersistOperatorInputPort().getStreamCodec();
@SuppressWarnings({ "unchecked", "rawtypes" })
StreamCodecWrapperForPersistance<Object> codec = new StreamCodecWrapperForPersistance(inputStreamCodecs, specifiedCodecForLogger);
streamMetaToCodecMap.put(s, codec);
http://git-wip-us.apache.org/repos/asf/apex-core/blob/283ebe23/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
index c50ed79..32e90b4 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
@@ -39,7 +39,6 @@ import com.datatorrent.api.Partitioner.PartitionKeys;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.common.util.Pair;
-import com.datatorrent.stram.StreamingContainerAgent;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta;
import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
@@ -236,8 +235,7 @@ public class StreamMapping implements java.io.Serializable
boolean separateUnifiers = false;
Integer lastId = null;
for (InputPortMeta ipm : streamMeta.getSinks()) {
- StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(ipm);
- Integer id = plan.getStreamCodecIdentifier(streamCodecInfo);
+ Integer id = plan.getStreamCodecIdentifier(ipm.getStreamCodec());
if (lastId == null) {
lastId = id;
} else if (!id.equals(lastId)) {
@@ -255,10 +253,10 @@ public class StreamMapping implements java.io.Serializable
unifierSources = setupCascadingUnifiers(this.upstream, currentUnifiers, limit, 0);
} else {
for (InputPortMeta ipm : streamMeta.getSinks()) {
- StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(ipm);
- if (!cascadeUnifierSourcesMap.containsKey(streamCodecInfo)) {
+ StreamCodec<?> streamCodec = ipm.getStreamCodec();
+ if (!cascadeUnifierSourcesMap.containsKey(streamCodec)) {
unifierSources = setupCascadingUnifiers(this.upstream, currentUnifiers, limit, 0);
- cascadeUnifierSourcesMap.put(streamCodecInfo, unifierSources);
+ cascadeUnifierSourcesMap.put(streamCodec, unifierSources);
}
}
}
@@ -320,8 +318,7 @@ public class StreamMapping implements java.io.Serializable
unifier.inputs.clear();
List<PTOutput> doperUnifierSources = unifierSources;
if (separateUnifiers) {
- StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(doperEntry.second);
- List<PTOutput> cascadeSources = cascadeUnifierSourcesMap.get(streamCodecInfo);
+ List<PTOutput> cascadeSources = cascadeUnifierSourcesMap.get(doperEntry.second.getStreamCodec());
if (cascadeSources != null) {
doperUnifierSources = cascadeSources;
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/283ebe23/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
index 8cb4871..35bb363 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
@@ -1159,10 +1159,10 @@ public class StreamCodecTest
Map<Integer, StreamCodec<?>> streamCodecs,
String id, PhysicalPlan plan)
{
- StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(operatorMeta.getMeta(inputPort));
- Assert.assertTrue("stream codec identifier not present" + id, isStrCodecPresent(streamCodecInfo, plan));
- Integer streamCodecIdentifier = plan.getStreamCodecIdentifier(streamCodecInfo);
- checkPresentStreamCodecInfo(streamCodecs, id, streamCodecIdentifier, streamCodecInfo);
+ StreamCodec<?> streamCodec = operatorMeta.getMeta(inputPort).getStreamCodec();
+ Assert.assertTrue("stream codec identifier not present" + id, isStrCodecPresent(streamCodec, plan));
+ Integer streamCodecIdentifier = plan.getStreamCodecIdentifier(streamCodec);
+ checkPresentStreamCodecInfo(streamCodecs, id, streamCodecIdentifier, streamCodec);
}
private void checkPresentStreamCodecInfo(Map<Integer, StreamCodec<?>> streamCodecs, String id,
http://git-wip-us.apache.org/repos/asf/apex-core/blob/283ebe23/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java b/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
index 4472743..d40fd7b 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
@@ -994,7 +994,7 @@ public class StreamPersistanceTests
assertTrue("persist operator should be part of the operators to be redeployed", operators.contains(persistOperatorContainer));
LogicalPlan.StreamMeta s1 = (LogicalPlan.StreamMeta)s;
- StreamCodec codec = s1.getPersistOperatorInputPort().getValue(PortContext.STREAM_CODEC);
+ StreamCodec codec = s1.getPersistOperatorInputPort().getStreamCodec();
assertEquals("Codec should be instance of StreamCodecWrapper", codec instanceof StreamCodecWrapperForPersistance, true);
StreamCodecWrapperForPersistance wrapperCodec = (StreamCodecWrapperForPersistance)codec;
[2/2] apex-core git commit: Merge branch 'APEXCORE-504' of
github.com:vrozov/apex-core
Posted by pr...@apache.org.
Merge branch 'APEXCORE-504' of github.com:vrozov/apex-core
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/d90e8829
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/d90e8829
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/d90e8829
Branch: refs/heads/master
Commit: d90e8829fd89edba66ee5b79296a69fd9a5b6dd8
Parents: 491e2e3 283ebe2
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Mon Mar 20 23:25:06 2017 -0700
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Mon Mar 20 23:25:06 2017 -0700
----------------------------------------------------------------------
.../stram/StreamingContainerAgent.java | 53 +++++++++-----------
.../stram/StreamingContainerManager.java | 4 +-
.../stram/plan/logical/LogicalPlan.java | 39 +++++++++-----
.../stram/plan/physical/PhysicalPlan.java | 16 +++---
.../stram/plan/physical/StreamMapping.java | 13 ++---
.../com/datatorrent/stram/StreamCodecTest.java | 8 +--
.../stram/plan/StreamPersistanceTests.java | 2 +-
7 files changed, 69 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-core/blob/d90e8829/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-core/blob/d90e8829/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------