You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2017/01/25 15:49:26 UTC
apex-core git commit: APEXCORE-504 - Possible race condition in
StreamingContainerAgent.getStreamCodec()
Repository: apex-core
Updated Branches:
refs/heads/release-3.2 50db668f8 -> 332810722
APEXCORE-504 - Possible race condition in StreamingContainerAgent.getStreamCodec()
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/33281072
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/33281072
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/33281072
Branch: refs/heads/release-3.2
Commit: 332810722671b4892a441d174bd19f07ab8efe5c
Parents: 50db668
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Fri Dec 30 12:48:52 2016 -0800
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Tue Jan 24 18:54:13 2017 -0800
----------------------------------------------------------------------
.../stram/StreamingContainerAgent.java | 39 +++++++++-----------
.../stram/StreamingContainerManager.java | 4 +-
.../stram/plan/logical/LogicalPlan.java | 30 +++++++++++++--
.../stram/plan/physical/StreamMapping.java | 6 +--
.../com/datatorrent/stram/StreamCodecTest.java | 2 +-
5 files changed, 49 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-core/blob/33281072/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
index 81dc96e..a5fc3a5 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
@@ -185,7 +185,11 @@ public class StreamingContainerAgent {
if (ndi.type == OperatorDeployInfo.OperatorType.UNIFIER) {
// input attributes of the downstream operator
for (InputPortMeta sink : streamMeta.getSinks()) {
- portInfo.contextAttributes = sink.getAttributes();
+ try {
+ portInfo.contextAttributes = sink.getAttributes().clone();
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeException("Cannot clone attributes", e);
+ }
break;
}
}
@@ -199,7 +203,7 @@ public class StreamingContainerAgent {
// Create mappings for all non-inline operators
if (input.target.getContainer() != out.source.getContainer()) {
InputPortMeta inputPortMeta = getIdentifyingInputPortMeta(input);
- StreamCodec<?> streamCodecInfo = getStreamCodec(inputPortMeta);
+ StreamCodec<?> streamCodecInfo = inputPortMeta.getStreamCodec();
Integer id = physicalPlan.getStreamCodecIdentifier(streamCodecInfo);
if (!portInfo.streamCodecs.containsKey(id)) {
portInfo.streamCodecs.put(id, streamCodecInfo);
@@ -231,11 +235,19 @@ public class StreamingContainerAgent {
InputPortMeta inputPortMeta = getInputPortMeta(oper.getOperatorMeta(), streamMeta);
if (inputPortMeta != null) {
- inputInfo.contextAttributes = inputPortMeta.getAttributes();
+ try {
+ inputInfo.contextAttributes = inputPortMeta.getAttributes().clone();
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeException("Cannot clone attributes", e);
+ }
}
if (inputInfo.contextAttributes == null && ndi.type == OperatorDeployInfo.OperatorType.UNIFIER) {
- inputInfo.contextAttributes = in.source.logicalStream.getSource().getAttributes();
+ try {
+ inputInfo.contextAttributes = in.source.logicalStream.getSource().getAttributes().clone();
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeException("Cannot clone attributes", e);
+ }
}
inputInfo.sourceNodeId = sourceOutput.source.getId();
@@ -272,7 +284,7 @@ public class StreamingContainerAgent {
// On the input side there is a unlikely scenario of partitions even for inline stream that is being
// handled. Always specifying a stream codec configuration in case that scenario happens.
InputPortMeta idInputPortMeta = getIdentifyingInputPortMeta(in);
- StreamCodec<?> streamCodecInfo = getStreamCodec(idInputPortMeta);
+ StreamCodec<?> streamCodecInfo = idInputPortMeta.getStreamCodec();
Integer id = physicalPlan.getStreamCodecIdentifier(streamCodecInfo);
inputInfo.streamCodecs.put(id, streamCodecInfo);
ndi.inputs.add(inputInfo);
@@ -327,23 +339,6 @@ public class StreamingContainerAgent {
return operator;
}
- public static StreamCodec<?> getStreamCodec(InputPortMeta inputPortMeta)
- {
- if (inputPortMeta != null) {
- StreamCodec<?> codec = inputPortMeta.getValue(PortContext.STREAM_CODEC);
- if (codec == null) {
- // it cannot be this object that gets returned. Depending on this value is dangerous
- codec = inputPortMeta.getPortObject().getStreamCodec();
- if (codec != null) {
- // don't create codec multiple times - it will assign a new identifier
- inputPortMeta.getAttributes().put(PortContext.STREAM_CODEC, codec);
- }
- }
- return codec;
- }
- return null;
- }
-
/**
* Create deploy info for operator.
* <p>
http://git-wip-us.apache.org/repos/asf/apex-core/blob/33281072/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 7eb4c96..6d49fa6 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -2066,7 +2066,7 @@ public class StreamingContainerManager implements PlanContext
}
for (InputPortMeta ipm : out.logicalStream.getSinks()) {
- StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(ipm);
+ StreamCodec<?> streamCodecInfo = ipm.getStreamCodec();
Integer codecId = plan.getStreamCodecIdentifier(streamCodecInfo);
// following needs to match the concat logic in StreamingContainer
String sourceIdentifier = Integer.toString(operator.getId()).concat(Component.CONCAT_SEPARATOR).concat(out.portName).concat(Component.CONCAT_SEPARATOR).concat(codecId.toString());
@@ -2163,7 +2163,7 @@ public class StreamingContainerManager implements PlanContext
for (PTOperator.PTOutput out : operator.getOutputs()) {
if (!out.isDownStreamInline()) {
for (InputPortMeta ipm : out.logicalStream.getSinks()) {
- StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(ipm);
+ StreamCodec<?> streamCodecInfo = ipm.getStreamCodec();
Integer codecId = plan.getStreamCodecIdentifier(streamCodecInfo);
// following needs to match the concat logic in StreamingContainer
String sourceIdentifier = Integer.toString(operator.getId()).concat(Component.CONCAT_SEPARATOR).concat(out.portName).concat(Component.CONCAT_SEPARATOR).concat(codecId.toString());
http://git-wip-us.apache.org/repos/asf/apex-core/blob/33281072/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 6405644..4ed2b80 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -58,6 +58,8 @@ import com.datatorrent.common.util.FSStorageAgent;
import com.datatorrent.stram.engine.DefaultUnifier;
import com.datatorrent.stram.engine.Slider;
+import static com.datatorrent.api.Context.PortContext.STREAM_CODEC;
+
/**
* DAG contains the logical declarations of operators and streams.
* <p>
@@ -260,6 +262,20 @@ public class LogicalPlan implements Serializable, DAG
throw new UnsupportedOperationException("Not supported yet.");
}
+ public StreamCodec<?> getStreamCodec()
+ {
+ return attributes.get(STREAM_CODEC);
+ }
+
+ void setStreamCodec(StreamCodec<?> streamCodec)
+ {
+ if (streamCodec != null) {
+ StreamCodec<?> oldStreamCodec = attributes.put(STREAM_CODEC, streamCodec);
+ if (oldStreamCodec != null && oldStreamCodec != streamCodec) {
+ LOG.warn("Input port {} stream codec was changed from {} to {}", this.getPortName(), oldStreamCodec, streamCodec);
+ }
+ }
+ }
}
public final class OutputPortMeta implements DAG.OutputPortMeta, Serializable
@@ -671,14 +687,14 @@ public class LogicalPlan implements Serializable, DAG
private void addStreamCodec(InputPortMeta sinkToPersistPortMeta, InputPort<?> port)
{
- StreamCodec<Object> inputStreamCodec = sinkToPersistPortMeta.getValue(PortContext.STREAM_CODEC) != null ? (StreamCodec<Object>) sinkToPersistPortMeta.getValue(PortContext.STREAM_CODEC) : (StreamCodec<Object>) sinkToPersistPortMeta.getPortObject().getStreamCodec();
+ StreamCodec<Object> inputStreamCodec = sinkToPersistPortMeta.getValue(STREAM_CODEC) != null ? (StreamCodec<Object>) sinkToPersistPortMeta.getValue(STREAM_CODEC) : (StreamCodec<Object>) sinkToPersistPortMeta.getPortObject().getStreamCodec();
if (inputStreamCodec != null) {
Map<InputPortMeta, StreamCodec<Object>> codecs = new HashMap<InputPortMeta, StreamCodec<Object>>();
codecs.put(sinkToPersistPortMeta, inputStreamCodec);
InputPortMeta persistOperatorPortMeta = assertGetPortMeta(port);
- StreamCodec<Object> specifiedCodecForPersistOperator = (persistOperatorPortMeta.getValue(PortContext.STREAM_CODEC) != null) ? (StreamCodec<Object>) persistOperatorPortMeta.getValue(PortContext.STREAM_CODEC) : (StreamCodec<Object>) port.getStreamCodec();
+ StreamCodec<Object> specifiedCodecForPersistOperator = (persistOperatorPortMeta.getValue(STREAM_CODEC) != null) ? (StreamCodec<Object>) persistOperatorPortMeta.getValue(STREAM_CODEC) : (StreamCodec<Object>) port.getStreamCodec();
StreamCodecWrapperForPersistance<Object> codec = new StreamCodecWrapperForPersistance<Object>(codecs, specifiedCodecForPersistOperator);
- setInputPortAttribute(port, PortContext.STREAM_CODEC, codec);
+ setInputPortAttribute(port, STREAM_CODEC, codec);
}
}
@@ -917,6 +933,12 @@ public class LogicalPlan implements Serializable, DAG
metaPort.adqAnnotation = adqAnnotation;
inPortMap.put(portObject, metaPort);
markInputPortIfHidden(metaPort.getPortName(), metaPort, field.getDeclaringClass());
+ if (metaPort.getStreamCodec() == null) {
+ metaPort.setStreamCodec(portObject.getStreamCodec());
+ } else if (portObject.getStreamCodec() != null) {
+ LOG.info("Input port {} attribute {} overrides codec {} with {} codec", metaPort.getPortName(), STREAM_CODEC.getSimpleName(),
+ portObject.getStreamCodec(), metaPort.getStreamCodec());
+ }
}
@Override
@@ -1287,7 +1309,7 @@ public class LogicalPlan implements Serializable, DAG
}
for (StreamMeta n: this.streams.values()) {
for (InputPortMeta sink : n.getSinks()) {
- StreamCodec<?> streamCodec = sink.getValue(PortContext.STREAM_CODEC);
+ StreamCodec<?> streamCodec = sink.getValue(STREAM_CODEC);
if (streamCodec != null) {
classNames.add(streamCodec.getClass().getName());
} else {
http://git-wip-us.apache.org/repos/asf/apex-core/blob/33281072/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
index d42c327..529435c 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
@@ -230,7 +230,7 @@ public class StreamMapping implements java.io.Serializable
boolean separateUnifiers = false;
Integer lastId = null;
for (InputPortMeta ipm : streamMeta.getSinks()) {
- StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(ipm);
+ StreamCodec<?> streamCodecInfo = ipm.getStreamCodec();
Integer id = plan.getStreamCodecIdentifier(streamCodecInfo);
if (lastId == null) {
lastId = id;
@@ -249,7 +249,7 @@ public class StreamMapping implements java.io.Serializable
unifierSources = setupCascadingUnifiers(this.upstream, currentUnifiers, limit, 0);
} else {
for (InputPortMeta ipm : streamMeta.getSinks()) {
- StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(ipm);
+ StreamCodec<?> streamCodecInfo = ipm.getStreamCodec();
if (!cascadeUnifierSourcesMap.containsKey(streamCodecInfo)) {
unifierSources = setupCascadingUnifiers(this.upstream, currentUnifiers, limit, 0);
cascadeUnifierSourcesMap.put(streamCodecInfo, unifierSources);
@@ -303,7 +303,7 @@ public class StreamMapping implements java.io.Serializable
unifier.inputs.clear();
List<PTOutput> doperUnifierSources = unifierSources;
if (separateUnifiers) {
- StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(doperEntry.second);
+ StreamCodec<?> streamCodecInfo = doperEntry.second.getStreamCodec();
List<PTOutput> cascadeSources = cascadeUnifierSourcesMap.get(streamCodecInfo);
if (cascadeSources != null) {
doperUnifierSources = cascadeSources;
http://git-wip-us.apache.org/repos/asf/apex-core/blob/33281072/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
index 6bfa591..da7d4dd 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
@@ -1185,7 +1185,7 @@ public class StreamCodecTest
Map<Integer, StreamCodec<?>> streamCodecs,
String id, PhysicalPlan plan )
{
- StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(operatorMeta.getMeta(inputPort));
+ StreamCodec<?> streamCodecInfo = operatorMeta.getMeta(inputPort).getStreamCodec();
Assert.assertTrue("stream codec identifier not present" + id, isStrCodecPresent(streamCodecInfo, plan));
Integer streamCodecIdentifier = plan.getStreamCodecIdentifier(streamCodecInfo);
checkPresentStreamCodecInfo(streamCodecs, id, streamCodecIdentifier, streamCodecInfo);