You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2015/09/24 04:37:22 UTC

[03/50] [abbrv] incubator-apex-core git commit: APEX-93 #resolve #comment Fixing dynamic partitioning issue with persist stream Added flow to redeploy persist operators as well when sink operators are dynamically repartitioned Modified dynamic repartitio

APEX-93 #resolve #comment Fixing dynamic partitioning issue with persist stream
Added flow to redeploy persist operators as well when sink operators are dynamically repartitioned
Modified dynamic repartitioning test case to validate that persist operator is part of the dependent operators redeployed after partitioning


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/3178f13f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/3178f13f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/3178f13f

Branch: refs/heads/feature-module
Commit: 3178f13f49695aa4f6910006ecd4efbca8dad6a9
Parents: 55a068f
Author: ishark <is...@datatorrent.com>
Authored: Thu Sep 3 19:02:02 2015 -0700
Committer: ishark <is...@datatorrent.com>
Committed: Wed Sep 9 16:02:43 2015 -0700

----------------------------------------------------------------------
 .../StreamCodecWrapperForPersistance.java       |  2 +-
 .../stram/plan/physical/PhysicalPlan.java       | 28 +++++++++++++++++++-
 .../stram/plan/StreamPersistanceTests.java      | 13 +++++++++
 3 files changed, 41 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3178f13f/engine/src/main/java/com/datatorrent/stram/plan/logical/StreamCodecWrapperForPersistance.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/StreamCodecWrapperForPersistance.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/StreamCodecWrapperForPersistance.java
index 97fd75f..81be56a 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/StreamCodecWrapperForPersistance.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/StreamCodecWrapperForPersistance.java
@@ -52,7 +52,7 @@ public class StreamCodecWrapperForPersistance<T> implements StreamCodec<T>, Seri
       Collection<PartitionKeys> partitionKeysList = entry.getValue();
 
       for (PartitionKeys keys : partitionKeysList) {
-        if (keys.partitions.contains(keys.mask & codec.getPartition(o))) {
+        if ( keys.partitions != null && keys.partitions.contains(keys.mask & codec.getPartition(o))) {
           // Then at least one of the partitions is getting this event
           // So send the event to persist operator
           return true;

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3178f13f/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index 2176035..fb429a9 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -420,7 +420,7 @@ public class PhysicalPlan implements Serializable
     Collection<PTOperator> ptOperators = getOperators(sinkPortMeta.getOperatorWrapper());
     Collection<PartitionKeys> partitionKeysList = new ArrayList<PartitionKeys>();
     for (PTOperator p : ptOperators) {
-      PartitionKeys keys = (PartitionKeys) p.getPartitionKeys().get(sinkPortMeta.getPortObject());
+      PartitionKeys keys = p.partitionKeys.get(sinkPortMeta);
       partitionKeysList.add(keys);
     }
 
@@ -1390,9 +1390,35 @@ public class PhysicalPlan implements Serializable
         getDeps(operator, visited);
       }
     }
+    visited.addAll(getDependentPersistOperators(operators));
     return visited;
   }
 
+  private Set<PTOperator> getDependentPersistOperators(Collection<PTOperator> operators)
+  {
+    Set<PTOperator> persistOperators = new LinkedHashSet<PTOperator>();
+    if (operators != null) {
+      for (PTOperator operator : operators) {
+        for (PTInput in : operator.inputs) {
+          if (in.logicalStream.getPersistOperator() != null) {
+            for (InputPortMeta inputPort : in.logicalStream.getSinksToPersist()) {
+              if (inputPort.getOperatorWrapper().equals(operator.operatorMeta)) {
+                // Redeploy the stream wide persist operator only if the current sink is being persisted
+                persistOperators.addAll(getOperators(in.logicalStream.getPersistOperator()));
+                break;
+              }
+            }
+          }
+          for (Entry<InputPortMeta, OperatorMeta> entry : in.logicalStream.sinkSpecificPersistOperatorMap.entrySet()) {
+            // Redeploy sink specific persist operators
+            persistOperators.addAll(getOperators(entry.getValue()));
+          }
+        }
+      }
+    }
+    return persistOperators;
+  }
+
   /**
    * Add logical operator to the plan. Assumes that upstream operators have been added before.
    * @param om

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/3178f13f/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java b/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
index c82f3a9..1cd4311 100644
--- a/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
+++ b/engine/src/test/java/com/datatorrent/stram/plan/StreamPersistanceTests.java
@@ -1,12 +1,14 @@
 package com.datatorrent.stram.plan;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -956,12 +958,23 @@ public class StreamPersistanceTests
 
     List<PTOperator> ptos = plan.getOperators(passThruMeta);
 
+    PTOperator persistOperatorContainer = null;
+
     for (PTContainer container : plan.getContainers()) {
       for (PTOperator operator : container.getOperators()) {
         operator.setState(PTOperator.State.ACTIVE);
+        if (operator.getName().equals("persister")) {
+          persistOperatorContainer = operator;
+        }
       }
     }
 
+    // Check that persist operator is part of dependents redeployed
+    Set<PTOperator> operators = plan.getDependents(ptos);
+    logger.debug("Operators to be re-deployed = {}", operators);
+    // Validate that persist operator is part of dependents
+    assertTrue("persist operator should be part of the operators to be redeployed", operators.contains(persistOperatorContainer));
+
     LogicalPlan.StreamMeta s1 = (LogicalPlan.StreamMeta) s;
     StreamCodec codec = s1.getPersistOperatorInputPort().getValue(PortContext.STREAM_CODEC);