You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2017/03/21 06:27:16 UTC

[1/2] apex-core git commit: APEXCORE-504 - Possible race condition in StreamingContainerAgent.getStreamCodec()

Repository: apex-core
Updated Branches:
  refs/heads/master 491e2e332 -> d90e8829f


APEXCORE-504 - Possible race condition in StreamingContainerAgent.getStreamCodec()


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/283ebe23
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/283ebe23
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/283ebe23

Branch: refs/heads/master
Commit: 283ebe232b3099b2339184b8a54dbcf6066680dc
Parents: abc836c
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Mon Jan 30 17:24:45 2017 -0800
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Fri Feb 17 15:54:53 2017 -0800

----------------------------------------------------------------------
 .../stram/StreamingContainerAgent.java          | 53 +++++++++-----------
 .../stram/StreamingContainerManager.java        |  4 +-
 .../stram/plan/logical/LogicalPlan.java         | 39 +++++++++-----
 .../stram/plan/physical/PhysicalPlan.java       | 16 +++---
 .../stram/plan/physical/StreamMapping.java      | 13 ++---
 .../com/datatorrent/stram/StreamCodecTest.java  |  8 +--
 .../stram/plan/StreamPersistanceTests.java      |  2 +-
 7 files changed, 69 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/283ebe23/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
index eb7aa43..0aa0b83 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java
@@ -202,7 +202,11 @@ public class StreamingContainerAgent
         if (ndi.type == OperatorDeployInfo.OperatorType.UNIFIER) {
           // input attributes of the downstream operator
           for (InputPortMeta sink : streamMeta.getSinks()) {
-            portInfo.contextAttributes = sink.getAttributes();
+            try {
+              portInfo.contextAttributes = sink.getAttributes().clone();
+            } catch (CloneNotSupportedException e) {
+              throw new RuntimeException("Cannot clone attributes", e);
+            }
             break;
           }
         }
@@ -215,11 +219,11 @@ public class StreamingContainerAgent
           for (PTOperator.PTInput input : out.sinks) {
             // Create mappings for all non-inline operators
             if (input.target.getContainer() != out.source.getContainer()) {
-              InputPortMeta inputPortMeta = getIdentifyingInputPortMeta(input);
-              StreamCodec<?> streamCodecInfo = getStreamCodec(inputPortMeta);
-              Integer id = physicalPlan.getStreamCodecIdentifier(streamCodecInfo);
+              final StreamCodec<?> streamCodec = getIdentifyingInputPortMeta(input).getStreamCodec();
+              final Integer id = physicalPlan.getStreamCodecIdentifier(streamCodec);
+              // TODO: replace with inputInfo.streamCodecs.putIfAbsent() after support for JDK 1.7 is dropped.
               if (!portInfo.streamCodecs.containsKey(id)) {
-                portInfo.streamCodecs.put(id, streamCodecInfo);
+                portInfo.streamCodecs.put(id, streamCodec);
               }
             }
           }
@@ -248,11 +252,19 @@ public class StreamingContainerAgent
         InputPortMeta inputPortMeta = getInputPortMeta(oper.getOperatorMeta(), streamMeta);
 
         if (inputPortMeta != null) {
-          inputInfo.contextAttributes = inputPortMeta.getAttributes();
+          try {
+            inputInfo.contextAttributes = inputPortMeta.getAttributes().clone();
+          } catch (CloneNotSupportedException e) {
+            throw new RuntimeException("Cannot clone attributes", e);
+          }
         }
 
         if (inputInfo.contextAttributes == null && ndi.type == OperatorDeployInfo.OperatorType.UNIFIER) {
-          inputInfo.contextAttributes = in.source.logicalStream.getSource().getAttributes();
+          try {
+            inputInfo.contextAttributes = in.source.logicalStream.getSource().getAttributes().clone();
+          } catch (CloneNotSupportedException e) {
+            throw new RuntimeException("Cannot clone attributes", e);
+          }
         }
 
         inputInfo.sourceNodeId = sourceOutput.source.getId();
@@ -288,10 +300,12 @@ public class StreamingContainerAgent
 
         // On the input side there is a unlikely scenario of partitions even for inline stream that is being
         // handled. Always specifying a stream codec configuration in case that scenario happens.
-        InputPortMeta idInputPortMeta = getIdentifyingInputPortMeta(in);
-        StreamCodec<?> streamCodecInfo = getStreamCodec(idInputPortMeta);
-        Integer id = physicalPlan.getStreamCodecIdentifier(streamCodecInfo);
-        inputInfo.streamCodecs.put(id, streamCodecInfo);
+        final StreamCodec<?> streamCodec = getIdentifyingInputPortMeta(in).getStreamCodec();
+        final Integer id = physicalPlan.getStreamCodecIdentifier(streamCodec);
+        // TODO: replace with inputInfo.streamCodecs.putIfAbsent() after support for JDK 1.7 is dropped.
+        if (!inputInfo.streamCodecs.containsKey(id)) {
+          inputInfo.streamCodecs.put(id, streamCodec);
+        }
         ndi.inputs.add(inputInfo);
       }
     }
@@ -344,23 +358,6 @@ public class StreamingContainerAgent
     return operator;
   }
 
-  public static StreamCodec<?> getStreamCodec(InputPortMeta inputPortMeta)
-  {
-    if (inputPortMeta != null) {
-      StreamCodec<?> codec = inputPortMeta.getValue(PortContext.STREAM_CODEC);
-      if (codec == null) {
-        // it cannot be this object that gets returned. Depending on this value is dangerous
-        codec = inputPortMeta.getPort().getStreamCodec();
-        if (codec != null) {
-          // don't create codec multiple times - it will assign a new identifier
-          inputPortMeta.getAttributes().put(PortContext.STREAM_CODEC, codec);
-        }
-      }
-      return codec;
-    }
-    return null;
-  }
-
   /**
    * Create deploy info for operator.
    * <p>

http://git-wip-us.apache.org/repos/asf/apex-core/blob/283ebe23/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index dfbc7d1..0b4afda 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -2330,8 +2330,8 @@ public class StreamingContainerManager implements PlanContext
             for (PTOperator.PTOutput out : operator.getOutputs()) {
               if (!out.isDownStreamInline()) {
                 for (InputPortMeta ipm : out.logicalStream.getSinks()) {
-                  StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(ipm);
-                  Integer codecId = plan.getStreamCodecIdentifier(streamCodecInfo);
+                  StreamCodec<?> streamCodec = ipm.getStreamCodec();
+                  Integer codecId = plan.getStreamCodecIdentifier(streamCodec);
                   // following needs to match the concat logic in StreamingContainer
                   String sourceIdentifier = Integer.toString(operator.getId()).concat(Component.CONCAT_SEPARATOR).concat(out.portName).concat(Component.CONCAT_SEPARATOR).concat(codecId.toString());
                   if (operator.getContainer().getState() == PTContainer.State.ACTIVE) {

http://git-wip-us.apache.org/repos/asf/apex-core/blob/283ebe23/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index e1debbb..19b13fe 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -103,6 +103,8 @@ import com.datatorrent.common.util.Pair;
 import com.datatorrent.stram.engine.DefaultUnifier;
 import com.datatorrent.stram.engine.Slider;
 
+import static com.datatorrent.api.Context.PortContext.STREAM_CODEC;
+
 /**
  * DAG contains the logical declarations of operators and streams.
  * <p>
@@ -309,6 +311,20 @@ public class LogicalPlan implements Serializable, DAG
       throw new UnsupportedOperationException("Not supported yet.");
     }
 
+    public StreamCodec<?> getStreamCodec()
+    {
+      return attributes.get(STREAM_CODEC);
+    }
+
+    void setStreamCodec(StreamCodec<?> streamCodec)
+    {
+      if (streamCodec != null) {
+        StreamCodec<?> oldStreamCodec = attributes.put(STREAM_CODEC, streamCodec);
+        if (oldStreamCodec != null && oldStreamCodec != streamCodec) { // once input port codec is set, it is not expected that it will be changed.
+          LOG.warn("Operator {} input port {} stream codec was changed from {} to {}", getOperatorMeta().getName(), getPortName(), oldStreamCodec, streamCodec);
+        }
+      }
+    }
   }
 
   public final class OutputPortMeta implements DAG.OutputPortMeta, Serializable
@@ -738,18 +754,14 @@ public class LogicalPlan implements Serializable, DAG
 
     private void addStreamCodec(InputPortMeta sinkToPersistPortMeta, InputPort<?> port)
     {
-      StreamCodec<Object> inputStreamCodec = sinkToPersistPortMeta.getValue(PortContext.STREAM_CODEC) != null
-          ? (StreamCodec<Object>)sinkToPersistPortMeta.getValue(PortContext.STREAM_CODEC)
-          : (StreamCodec<Object>)sinkToPersistPortMeta.getPort().getStreamCodec();
+      StreamCodec<Object> inputStreamCodec = (StreamCodec<Object>)sinkToPersistPortMeta.getStreamCodec();
       if (inputStreamCodec != null) {
         Map<InputPortMeta, StreamCodec<Object>> codecs = new HashMap<>();
         codecs.put(sinkToPersistPortMeta, inputStreamCodec);
         InputPortMeta persistOperatorPortMeta = assertGetPortMeta(port);
-        StreamCodec<Object> specifiedCodecForPersistOperator = (persistOperatorPortMeta.getValue(PortContext.STREAM_CODEC) != null)
-            ? (StreamCodec<Object>)persistOperatorPortMeta.getValue(PortContext.STREAM_CODEC)
-            : (StreamCodec<Object>)port.getStreamCodec();
+        StreamCodec<Object> specifiedCodecForPersistOperator = (StreamCodec<Object>)persistOperatorPortMeta.getStreamCodec();
         StreamCodecWrapperForPersistance<Object> codec = new StreamCodecWrapperForPersistance<Object>(codecs, specifiedCodecForPersistOperator);
-        setInputPortAttribute(port, PortContext.STREAM_CODEC, codec);
+        persistOperatorPortMeta.setStreamCodec(codec);
       }
     }
 
@@ -1070,6 +1082,12 @@ public class LogicalPlan implements Serializable, DAG
         metaPort.adqAnnotation = adqAnnotation;
         inPortMap.put(portObject, metaPort);
         markInputPortIfHidden(metaPort.getPortName(), metaPort, field.getDeclaringClass());
+        if (metaPort.getStreamCodec() == null) {
+          metaPort.setStreamCodec(portObject.getStreamCodec());
+        } else if (portObject.getStreamCodec() != null) {
+          LOG.info("Operator {} input port {} attribute {} overrides codec {} with {} codec", metaPort.getOperatorMeta().getName(),
+              metaPort.getPortName(), STREAM_CODEC.getSimpleName(), portObject.getStreamCodec(), metaPort.getStreamCodec());
+        }
       }
 
       @Override
@@ -1638,14 +1656,9 @@ public class LogicalPlan implements Serializable, DAG
     }
     for (StreamMeta n: this.streams.values()) {
       for (InputPortMeta sink : n.getSinks()) {
-        StreamCodec<?> streamCodec = sink.getValue(PortContext.STREAM_CODEC);
+        StreamCodec<?> streamCodec = sink.getStreamCodec();
         if (streamCodec != null) {
           classNames.add(streamCodec.getClass().getName());
-        } else {
-          StreamCodec<?> codec = sink.getPort().getStreamCodec();
-          if (codec != null) {
-            classNames.add(codec.getClass().getName());
-          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/283ebe23/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index 94f47e6..ce22bfd 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -543,7 +543,7 @@ public class PhysicalPlan implements Serializable
         for (StreamMeta s : n.getOutputStreams().values()) {
           if (s.getPersistOperator() != null) {
             InputPortMeta persistInputPort = s.getPersistOperatorInputPort();
-            StreamCodecWrapperForPersistance<?> persistCodec = (StreamCodecWrapperForPersistance<?>)persistInputPort.getAttributes().get(PortContext.STREAM_CODEC);
+            StreamCodecWrapperForPersistance<?> persistCodec = (StreamCodecWrapperForPersistance<?>)persistInputPort.getStreamCodec();
             if (persistCodec == null) {
               continue;
             }
@@ -556,12 +556,9 @@ public class PhysicalPlan implements Serializable
           // Check partitioning for persist operators per sink too
           for (Map.Entry<InputPortMeta, InputPortMeta> entry : s.sinkSpecificPersistInputPortMap.entrySet()) {
             InputPortMeta persistInputPort = entry.getValue();
-            StreamCodec<?> codec = persistInputPort.getAttributes().get(PortContext.STREAM_CODEC);
-            if (codec != null) {
-              if (codec instanceof StreamCodecWrapperForPersistance) {
-                StreamCodecWrapperForPersistance<?> persistCodec = (StreamCodecWrapperForPersistance<?>)codec;
-                updatePersistOperatorWithSinkPartitions(persistInputPort, s.sinkSpecificPersistOperatorMap.get(entry.getKey()), persistCodec, entry.getKey());
-              }
+            StreamCodec<?> streamCodec = persistInputPort.getStreamCodec();
+            if (streamCodec != null && streamCodec instanceof StreamCodecWrapperForPersistance) {
+              updatePersistOperatorWithSinkPartitions(persistInputPort, s.sinkSpecificPersistOperatorMap.get(entry.getKey()), (StreamCodecWrapperForPersistance<?>)streamCodec, entry.getKey());
             }
           }
         }
@@ -593,8 +590,7 @@ public class PhysicalPlan implements Serializable
             Map<InputPortMeta, StreamCodec<?>> inputStreamCodecs = new HashMap<>();
             // Logging is enabled for the stream
             for (InputPortMeta portMeta : s.getSinksToPersist()) {
-              InputPort<?> port = portMeta.getPort();
-              StreamCodec<?> inputStreamCodec = (portMeta.getValue(PortContext.STREAM_CODEC) != null) ? portMeta.getValue(PortContext.STREAM_CODEC) : port.getStreamCodec();
+              StreamCodec<?> inputStreamCodec = portMeta.getStreamCodec();
               if (inputStreamCodec != null) {
                 boolean alreadyAdded = false;
 
@@ -619,7 +615,7 @@ public class PhysicalPlan implements Serializable
               // Create Wrapper codec for Stream persistence using all unique
               // stream codecs
               // Logger should write merged or union of all input stream codecs
-              StreamCodec<?> specifiedCodecForLogger = (s.getPersistOperatorInputPort().getValue(PortContext.STREAM_CODEC) != null) ? s.getPersistOperatorInputPort().getValue(PortContext.STREAM_CODEC) : s.getPersistOperatorInputPort().getPort().getStreamCodec();
+              StreamCodec<?> specifiedCodecForLogger = s.getPersistOperatorInputPort().getStreamCodec();
               @SuppressWarnings({ "unchecked", "rawtypes" })
               StreamCodecWrapperForPersistance<Object> codec = new StreamCodecWrapperForPersistance(inputStreamCodecs, specifiedCodecForLogger);
               streamMetaToCodecMap.put(s, codec);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/283ebe23/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
index c50ed79..32e90b4 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/StreamMapping.java
@@ -39,7 +39,6 @@ import com.datatorrent.api.Partitioner.PartitionKeys;
 import com.datatorrent.api.StreamCodec;
 
 import com.datatorrent.common.util.Pair;
-import com.datatorrent.stram.StreamingContainerAgent;
 import com.datatorrent.stram.plan.logical.LogicalPlan;
 import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta;
 import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
@@ -236,8 +235,7 @@ public class StreamMapping implements java.io.Serializable
       boolean separateUnifiers = false;
       Integer lastId = null;
       for (InputPortMeta ipm : streamMeta.getSinks()) {
-        StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(ipm);
-        Integer id = plan.getStreamCodecIdentifier(streamCodecInfo);
+        Integer id = plan.getStreamCodecIdentifier(ipm.getStreamCodec());
         if (lastId == null) {
           lastId = id;
         } else if (!id.equals(lastId)) {
@@ -255,10 +253,10 @@ public class StreamMapping implements java.io.Serializable
           unifierSources = setupCascadingUnifiers(this.upstream, currentUnifiers, limit, 0);
         } else {
           for (InputPortMeta ipm : streamMeta.getSinks()) {
-            StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(ipm);
-            if (!cascadeUnifierSourcesMap.containsKey(streamCodecInfo)) {
+            StreamCodec<?> streamCodec = ipm.getStreamCodec();
+            if (!cascadeUnifierSourcesMap.containsKey(streamCodec)) {
               unifierSources = setupCascadingUnifiers(this.upstream, currentUnifiers, limit, 0);
-              cascadeUnifierSourcesMap.put(streamCodecInfo, unifierSources);
+              cascadeUnifierSourcesMap.put(streamCodec, unifierSources);
             }
           }
         }
@@ -320,8 +318,7 @@ public class StreamMapping implements java.io.Serializable
             unifier.inputs.clear();
             List<PTOutput> doperUnifierSources = unifierSources;
             if (separateUnifiers) {
-              StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(doperEntry.second);
-              List<PTOutput> cascadeSources = cascadeUnifierSourcesMap.get(streamCodecInfo);
+              List<PTOutput> cascadeSources = cascadeUnifierSourcesMap.get(doperEntry.second.getStreamCodec());
               if (cascadeSources != null) {
                 doperUnifierSources = cascadeSources;
               }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/283ebe23/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
index 8cb4871..35bb363 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
@@ -1159,10 +1159,10 @@ public class StreamCodecTest
       Map<Integer, StreamCodec<?>> streamCodecs,
       String id, PhysicalPlan plan)
   {
-    StreamCodec<?> streamCodecInfo = StreamingContainerAgent.getStreamCodec(operatorMeta.getMeta(inputPort));
-    Assert.assertTrue("stream codec identifier not present" + id, isStrCodecPresent(streamCodecInfo, plan));
-    Integer streamCodecIdentifier = plan.getStreamCodecIdentifier(streamCodecInfo);
-    checkPresentStreamCodecInfo(streamCodecs, id, streamCodecIdentifier, streamCodecInfo);
+    StreamCodec<?> streamCodec = operatorMeta.getMeta(inputPort).getStreamCodec();
+    Assert.assertTrue("stream codec identifier not present" + id, isStrCodecPresent(streamCodec, plan));
+    Integer streamCodecIdentifier = plan.getStreamCodecIdentifier(streamCodec);
+    checkPresentStreamCodecInfo(streamCodecs, id, streamCodecIdentifier, streamCodec);
   }
 
   private void checkPresentStreamCodecInfo(Map<Integer, StreamCodec<?>> streamCodecs, String id,

http://git-wip-us.apache.org/repos/asf/apex-core/blob/283ebe23/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java b/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
index 4472743..d40fd7b 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
@@ -994,7 +994,7 @@ public class StreamPersistanceTests
     assertTrue("persist operator should be part of the operators to be redeployed", operators.contains(persistOperatorContainer));
 
     LogicalPlan.StreamMeta s1 = (LogicalPlan.StreamMeta)s;
-    StreamCodec codec = s1.getPersistOperatorInputPort().getValue(PortContext.STREAM_CODEC);
+    StreamCodec codec = s1.getPersistOperatorInputPort().getStreamCodec();
 
     assertEquals("Codec should be instance of StreamCodecWrapper", codec instanceof StreamCodecWrapperForPersistance, true);
     StreamCodecWrapperForPersistance wrapperCodec = (StreamCodecWrapperForPersistance)codec;


[2/2] apex-core git commit: Merge branch 'APEXCORE-504' of github.com:vrozov/apex-core

Posted by pr...@apache.org.
Merge branch 'APEXCORE-504' of github.com:vrozov/apex-core


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/d90e8829
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/d90e8829
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/d90e8829

Branch: refs/heads/master
Commit: d90e8829fd89edba66ee5b79296a69fd9a5b6dd8
Parents: 491e2e3 283ebe2
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Mon Mar 20 23:25:06 2017 -0700
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Mon Mar 20 23:25:06 2017 -0700

----------------------------------------------------------------------
 .../stram/StreamingContainerAgent.java          | 53 +++++++++-----------
 .../stram/StreamingContainerManager.java        |  4 +-
 .../stram/plan/logical/LogicalPlan.java         | 39 +++++++++-----
 .../stram/plan/physical/PhysicalPlan.java       | 16 +++---
 .../stram/plan/physical/StreamMapping.java      | 13 ++---
 .../com/datatorrent/stram/StreamCodecTest.java  |  8 +--
 .../stram/plan/StreamPersistanceTests.java      |  2 +-
 7 files changed, 69 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/d90e8829/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/apex-core/blob/d90e8829/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
----------------------------------------------------------------------