You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2017/01/25 15:49:26 UTC

apex-core git commit: APEXCORE-504 - Possible race condition in StreamingContainerAgent.getStreamCodec()

Repository: apex-core
Updated Branches:
  refs/heads/release-3.2 50db668f8 -> 332810722


APEXCORE-504 - Possible race condition in StreamingContainerAgent.getStreamCodec()


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/33281072
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/33281072
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/33281072

Branch: refs/heads/release-3.2
Commit: 332810722671b4892a441d174bd19f07ab8efe5c
Parents: 50db668
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Fri Dec 30 12:48:52 2016 -0800
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Tue Jan 24 18:54:13 2017 -0800

----------------------------------------------------------------------
 .../stram/StreamingContainerAgent.java          | 39 +++++++++-----------
 .../stram/StreamingContainerManager.java        |  4 +-
 .../stram/plan/logical/LogicalPlan.java         | 30 +++++++++++++--
 .../stram/plan/physical/StreamMapping.java      |  6 +--
 .../com/datatorrent/stram/StreamCodecTest.java  |  2 +-
 5 files changed, 49 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/33281072/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
index 81dc96e..a5fc3a5 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
@@ -185,7 +185,11 @@ public class StreamingContainerAgent {
         if (ndi.type == OperatorDeployInfo.OperatorType.UNIFIER) {
           // input attributes of the downstream operator
           for (InputPortMeta sink : streamMeta.getSinks()) {
-            portInfo.contextAttributes = sink.getAttributes();
+            try {
+              portInfo.contextAttributes = sink.getAttributes().clone();
+            } catch (CloneNotSupportedException e) {
+              throw new RuntimeException("Cannot clone attributes", e);
+            }
             break;
           }
         }
@@ -199,7 +203,7 @@ public class StreamingContainerAgent {
             // Create mappings for all non-inline operators
             if (input.target.getContainer() != out.source.getContainer()) {
               InputPortMeta inputPortMeta = getIdentifyingInputPortMeta(input);
-              StreamCodec<?> streamCodecInfo = getStreamCodec(inputPortMeta);
+              StreamCodec<?> streamCodecInfo = inputPortMeta.getStreamCodec();
               Integer id = physicalPlan.getStreamCodecIdentifier(streamCodecInfo);
               if (!portInfo.streamCodecs.containsKey(id)) {
                 portInfo.streamCodecs.put(id, streamCodecInfo);
@@ -231,11 +235,19 @@ public class StreamingContainerAgent {
         InputPortMeta inputPortMeta = getInputPortMeta(oper.getOperatorMeta(), streamMeta);
 
         if (inputPortMeta != null) {
-          inputInfo.contextAttributes = inputPortMeta.getAttributes();
+          try {
+            inputInfo.contextAttributes = inputPortMeta.getAttributes().clone();
+          } catch (CloneNotSupportedException e) {
+            throw new RuntimeException("Cannot clone attributes", e);
+          }
         }
 
         if (inputInfo.contextAttributes == null && ndi.type == OperatorDeployInfo.OperatorType.UNIFIER) {
-          inputInfo.contextAttributes = in.source.logicalStream.getSource().getAttributes();
+          try {
+            inputInfo.contextAttributes = in.source.logicalStream.getSource().getAttributes().clone();
+          } catch (CloneNotSupportedException e) {
+            throw new RuntimeException("Cannot clone attributes", e);
+          }
         }
 
         inputInfo.sourceNodeId = sourceOutput.source.getId();
@@ -272,7 +284,7 @@ public class StreamingContainerAgent {
         // On the input side there is a unlikely scenario of partitions even for inline stream that is being
         // handled. Always specifying a stream codec configuration in case that scenario happens.
         InputPortMeta idInputPortMeta = getIdentifyingInputPortMeta(in);
-        StreamCodec<?> streamCodecInfo = getStreamCodec(idInputPortMeta);
+        StreamCodec<?> streamCodecInfo = idInputPortMeta.getStreamCodec();
         Integer id = physicalPlan.getStreamCodecIdentifier(streamCodecInfo);
         inputInfo.streamCodecs.put(id, streamCodecInfo);
         ndi.inputs.add(inputInfo);
@@ -327,23 +339,6 @@ public class StreamingContainerAgent {
     return operator;
   }
 
-  public static StreamCodec<?> getStreamCodec(InputPortMeta inputPortMeta)
-  {
-    if (inputPortMeta != null) {
-      StreamCodec<?> codec = inputPortMeta.getValue(PortContext.STREAM_CODEC);
-      if (codec == null) {
-        // it cannot be this object that gets returned. Depending on this value is dangerous 
-        codec = inputPortMeta.getPortObject().getStreamCodec();
-        if (codec != null) {
-          // don't create codec multiple times - it will assign a new identifier
-          inputPortMeta.getAttributes().put(PortContext.STREAM_CODEC, codec);
-        }
-      }
-      return codec;
-    }
-    return null;
-  }
-
   /**
    * Create deploy info for operator.
    * <p>

http://git-wip-us.apache.org/repos/asf/apex-core/blob/33281072/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 7eb4c96..6d49fa6 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -2066,7 +2066,7 @@ public class StreamingContainerManager implements PlanContext
           }
 
           for (InputPortMeta ipm : out.logicalStream.getSinks()) {
-            StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(ipm);
+            StreamCodec<?> streamCodecInfo = ipm.getStreamCodec();
             Integer codecId = plan.getStreamCodecIdentifier(streamCodecInfo);
             // following needs to match the concat logic in StreamingContainer
             String sourceIdentifier = Integer.toString(operator.getId()).concat(Component.CONCAT_SEPARATOR).concat(out.portName).concat(Component.CONCAT_SEPARATOR).concat(codecId.toString());
@@ -2163,7 +2163,7 @@ public class StreamingContainerManager implements PlanContext
             for (PTOperator.PTOutput out : operator.getOutputs()) {
               if (!out.isDownStreamInline()) {
                 for (InputPortMeta ipm : out.logicalStream.getSinks()) {
-                  StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(ipm);
+                  StreamCodec<?> streamCodecInfo = ipm.getStreamCodec();
                   Integer codecId = plan.getStreamCodecIdentifier(streamCodecInfo);
                   // following needs to match the concat logic in StreamingContainer
                   String sourceIdentifier = Integer.toString(operator.getId()).concat(Component.CONCAT_SEPARATOR).concat(out.portName).concat(Component.CONCAT_SEPARATOR).concat(codecId.toString());

http://git-wip-us.apache.org/repos/asf/apex-core/blob/33281072/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 6405644..4ed2b80 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -58,6 +58,8 @@ import com.datatorrent.common.util.FSStorageAgent;
 import com.datatorrent.stram.engine.DefaultUnifier;
 import com.datatorrent.stram.engine.Slider;
 
+import static com.datatorrent.api.Context.PortContext.STREAM_CODEC;
+
 /**
  * DAG contains the logical declarations of operators and streams.
  * <p>
@@ -260,6 +262,20 @@ public class LogicalPlan implements Serializable, DAG
       throw new UnsupportedOperationException("Not supported yet.");
     }
 
+    public StreamCodec<?> getStreamCodec()
+    {
+      return attributes.get(STREAM_CODEC);
+    }
+
+    void setStreamCodec(StreamCodec<?> streamCodec)
+    {
+      if (streamCodec != null) {
+        StreamCodec<?> oldStreamCodec = attributes.put(STREAM_CODEC, streamCodec);
+        if (oldStreamCodec != null && oldStreamCodec != streamCodec) {
+          LOG.warn("Input port {} stream codec was changed from {} to {}", this.getPortName(), oldStreamCodec, streamCodec);
+        }
+      }
+    }
   }
 
   public final class OutputPortMeta implements DAG.OutputPortMeta, Serializable
@@ -671,14 +687,14 @@ public class LogicalPlan implements Serializable, DAG
 
     private void addStreamCodec(InputPortMeta sinkToPersistPortMeta, InputPort<?> port)
     {
-      StreamCodec<Object> inputStreamCodec = sinkToPersistPortMeta.getValue(PortContext.STREAM_CODEC) != null ? (StreamCodec<Object>) sinkToPersistPortMeta.getValue(PortContext.STREAM_CODEC) : (StreamCodec<Object>) sinkToPersistPortMeta.getPortObject().getStreamCodec();
+      StreamCodec<Object> inputStreamCodec = sinkToPersistPortMeta.getValue(STREAM_CODEC) != null ? (StreamCodec<Object>) sinkToPersistPortMeta.getValue(STREAM_CODEC) : (StreamCodec<Object>) sinkToPersistPortMeta.getPortObject().getStreamCodec();
       if (inputStreamCodec != null) {
         Map<InputPortMeta, StreamCodec<Object>> codecs = new HashMap<InputPortMeta, StreamCodec<Object>>();
         codecs.put(sinkToPersistPortMeta, inputStreamCodec);
         InputPortMeta persistOperatorPortMeta = assertGetPortMeta(port);
-        StreamCodec<Object> specifiedCodecForPersistOperator = (persistOperatorPortMeta.getValue(PortContext.STREAM_CODEC) != null) ? (StreamCodec<Object>) persistOperatorPortMeta.getValue(PortContext.STREAM_CODEC) : (StreamCodec<Object>) port.getStreamCodec();
+        StreamCodec<Object> specifiedCodecForPersistOperator = (persistOperatorPortMeta.getValue(STREAM_CODEC) != null) ? (StreamCodec<Object>) persistOperatorPortMeta.getValue(STREAM_CODEC) : (StreamCodec<Object>) port.getStreamCodec();
         StreamCodecWrapperForPersistance<Object> codec = new StreamCodecWrapperForPersistance<Object>(codecs, specifiedCodecForPersistOperator);
-        setInputPortAttribute(port, PortContext.STREAM_CODEC, codec);
+        setInputPortAttribute(port, STREAM_CODEC, codec);
       }
     }
 
@@ -917,6 +933,12 @@ public class LogicalPlan implements Serializable, DAG
         metaPort.adqAnnotation = adqAnnotation;
         inPortMap.put(portObject, metaPort);
         markInputPortIfHidden(metaPort.getPortName(), metaPort, field.getDeclaringClass());
+        if (metaPort.getStreamCodec() == null) {
+          metaPort.setStreamCodec(portObject.getStreamCodec());
+        } else if (portObject.getStreamCodec() != null) {
+          LOG.info("Input port {} attribute {} overrides codec {} with {} codec", metaPort.getPortName(), STREAM_CODEC.getSimpleName(),
+              portObject.getStreamCodec(), metaPort.getStreamCodec());
+        }
       }
 
       @Override
@@ -1287,7 +1309,7 @@ public class LogicalPlan implements Serializable, DAG
     }
     for (StreamMeta n: this.streams.values()) {
       for (InputPortMeta sink : n.getSinks()) {
-        StreamCodec<?> streamCodec = sink.getValue(PortContext.STREAM_CODEC);
+        StreamCodec<?> streamCodec = sink.getValue(STREAM_CODEC);
         if (streamCodec != null) {
           classNames.add(streamCodec.getClass().getName());
         } else {

http://git-wip-us.apache.org/repos/asf/apex-core/blob/33281072/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
index d42c327..529435c 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
@@ -230,7 +230,7 @@ public class StreamMapping implements java.io.Serializable
       boolean separateUnifiers = false;
       Integer lastId = null;
       for (InputPortMeta ipm : streamMeta.getSinks()) {
-        StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(ipm);
+        StreamCodec<?> streamCodecInfo = ipm.getStreamCodec();
         Integer id = plan.getStreamCodecIdentifier(streamCodecInfo);
         if (lastId == null) {
           lastId = id;
@@ -249,7 +249,7 @@ public class StreamMapping implements java.io.Serializable
           unifierSources = setupCascadingUnifiers(this.upstream, currentUnifiers, limit, 0);
         } else {
           for (InputPortMeta ipm : streamMeta.getSinks()) {
-            StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(ipm);
+            StreamCodec<?> streamCodecInfo = ipm.getStreamCodec();
             if (!cascadeUnifierSourcesMap.containsKey(streamCodecInfo)) {
               unifierSources = setupCascadingUnifiers(this.upstream, currentUnifiers, limit, 0);
               cascadeUnifierSourcesMap.put(streamCodecInfo, unifierSources);
@@ -303,7 +303,7 @@ public class StreamMapping implements java.io.Serializable
             unifier.inputs.clear();
             List<PTOutput> doperUnifierSources = unifierSources;
             if (separateUnifiers) {
-              StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(doperEntry.second);
+              StreamCodec<?> streamCodecInfo = doperEntry.second.getStreamCodec();
               List<PTOutput> cascadeSources = cascadeUnifierSourcesMap.get(streamCodecInfo);
               if (cascadeSources != null) {
                 doperUnifierSources = cascadeSources;

http://git-wip-us.apache.org/repos/asf/apex-core/blob/33281072/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
index 6bfa591..da7d4dd 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
@@ -1185,7 +1185,7 @@ public class StreamCodecTest
                                        Map<Integer, StreamCodec<?>> streamCodecs,
                                        String id, PhysicalPlan plan )
   {
-    StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(operatorMeta.getMeta(inputPort));
+    StreamCodec<?> streamCodecInfo = operatorMeta.getMeta(inputPort).getStreamCodec();
     Assert.assertTrue("stream codec identifier not present" + id, isStrCodecPresent(streamCodecInfo, plan));
     Integer streamCodecIdentifier = plan.getStreamCodecIdentifier(streamCodecInfo);
     checkPresentStreamCodecInfo(streamCodecs, id, streamCodecIdentifier, streamCodecInfo);