You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2015/09/10 00:13:08 UTC

[37/50] incubator-apex-core git commit: APEX-56 SPOI-4380 #resolve Remove terminated operators from plan after window is committed.

APEX-56 SPOI-4380 #resolve Remove terminated operators from plan after window is committed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/76faf869
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/76faf869
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/76faf869

Branch: refs/heads/master
Commit: 76faf869506d004fc2f7d470f5bb89d681b470df
Parents: 3c5b88c
Author: thomas <th...@datatorrent.com>
Authored: Thu Aug 20 11:06:33 2015 -0700
Committer: thomas <th...@datatorrent.com>
Committed: Thu Aug 20 11:06:33 2015 -0700

----------------------------------------------------------------------
 .../stram/StreamingContainerManager.java        | 46 +++++++----
 .../stram/plan/physical/PhysicalPlan.java       | 32 +++++---
 .../com/datatorrent/stram/MockContainer.java    |  2 +-
 .../com/datatorrent/stram/StreamCodecTest.java  | 35 +--------
 .../stram/StreamingContainerManagerTest.java    | 81 +++++++++++++++++++-
 5 files changed, 134 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/76faf869/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 6e0f3f5..eed2948 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -154,6 +154,7 @@ public class StreamingContainerManager implements PlanContext
   private long lastResourceRequest = 0;
   private final Map<String, StreamingContainerAgent> containers = new ConcurrentHashMap<String, StreamingContainerAgent>();
   private final List<Pair<PTOperator, Long>> purgeCheckpoints = new ArrayList<Pair<PTOperator, Long>>();
+  private final Map<Long, Set<PTOperator>> shutdownOperators = new HashMap<>();
   private CriticalPathInfo criticalPathInfo;
   private final ConcurrentMap<PTOperator, PTOperator> reportStats = Maps.newConcurrentMap();
   private final AtomicBoolean deployChangeInProgress = new AtomicBoolean();
@@ -1003,6 +1004,26 @@ public class StreamingContainerManager implements PlanContext
       }
       reportStats.remove(o);
     }
+    
+    if (!this.shutdownOperators.isEmpty()) {
+      synchronized (this.shutdownOperators) {
+        Iterator<Map.Entry<Long, Set<PTOperator>>> it = shutdownOperators.entrySet().iterator();
+        while (it.hasNext()) {
+          Map.Entry<Long, Set<PTOperator>> windowAndOpers = it.next();
+          if (windowAndOpers.getKey().longValue() > this.committedWindowId) {
+            // wait until window is committed
+            continue;
+          } else {
+            LOG.info("Removing inactive operators at window {} {}", Codec.getStringWindowId(windowAndOpers.getKey()), windowAndOpers.getValue());
+            for (PTOperator oper : windowAndOpers.getValue()) {
+              plan.removeTerminatedPartition(oper);
+            }
+            it.remove();
+          }
+        }
+      }
+    }
+    
     if (!eventQueue.isEmpty()) {
       for (PTOperator oper : plan.getAllOperators().values()) {
         if (oper.getState() != PTOperator.State.ACTIVE) {
@@ -1274,20 +1295,19 @@ public class StreamingContainerManager implements PlanContext
         else {
           switch (ds) {
             case SHUTDOWN:
-              // remove the operator from the plan
-              Runnable r = new Runnable()
-              {
-                @Override
-                public void run()
-                {
-                  if (oper.getInputs().isEmpty()) {
-                    LOG.info("Removing IDLE operator from plan {}", oper);
-                    plan.removeIdlePartition(oper);
-                  }
+              // schedule operator deactivation against the windowId
+              // will be processed once window is committed and all dependent operators completed processing
+              long windowId = oper.stats.currentWindowId.get(); 
+              if (ohb.windowStats != null && !ohb.windowStats.isEmpty()) {
+                windowId = ohb.windowStats.get(ohb.windowStats.size()-1).windowId;
+              }
+              LOG.debug("Operator {} deactivated at window {}", oper, windowId);
+              synchronized (this.shutdownOperators) {
+                Set<PTOperator> deactivatedOpers = this.shutdownOperators.get(windowId);
+                if (deactivatedOpers == null) {
+                  this.shutdownOperators.put(windowId, deactivatedOpers = Sets.newHashSet(oper));
                 }
-
-              };
-              dispatch(r);
+              }
               sca.undeployOpers.add(oper.getId());
               // record operator stop event
               recordEventAsync(new StramEvent.StopOperatorEvent(oper.getName(), oper.getId(), oper.getContainer().getExternalId()));

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/76faf869/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index 5b90c04..a57a248 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -780,7 +780,8 @@ public class PhysicalPlan implements Serializable
     partitioner.partitioned(mainPC.operatorIdToPartition);
   }
 
-  private void updateStreamMappings(PMapping m) {
+  private void updateStreamMappings(PMapping m)
+  {
     for (Map.Entry<OutputPortMeta, StreamMeta> opm : m.logicalOperator.getOutputStreams().entrySet()) {
       StreamMapping ug = m.outputStreams.get(opm.getKey());
       if (ug == null) {
@@ -789,7 +790,6 @@ public class PhysicalPlan implements Serializable
       }
       LOG.debug("update stream mapping for {} {}", opm.getKey().getOperatorMeta(), opm.getKey().getPortName());
       ug.setSources(m.partitions);
-      //ug.redoMapping();
     }
 
     for (Map.Entry<InputPortMeta, StreamMeta> ipm : m.logicalOperator.getInputStreams().entrySet()) {
@@ -847,7 +847,6 @@ public class PhysicalPlan implements Serializable
         }
         LOG.debug("update upstream stream mapping for {} {}", sourceMapping.logicalOperator, ipm.getValue().getSource().getPortName());
         ug.setSources(sourceMapping.partitions);
-        //ug.redoMapping();
       }
     }
 
@@ -990,18 +989,30 @@ public class PhysicalPlan implements Serializable
   }
 
   /**
-   * Remove a partition that was reported as idle by the execution layer.
-   * Since the end stream tuple is propagated to the downstream operators,
-   * there is no need to undeploy/redeploy them as part of this operation.
+   * Remove a partition that was reported as terminated by the execution layer.
+   * Recursively removes all downstream operators with no remaining input.
    * @param p
    */
-  public void removeIdlePartition(PTOperator p)
+  public void removeTerminatedPartition(PTOperator p)
   {
+    // keep track of downstream operators for cascading remove
+    Set<PTOperator> downstreamOpers = new HashSet<>(p.outputs.size());
+    for (PTOutput out : p.outputs) {
+      for (PTInput sinkIn : out.sinks) {
+        downstreamOpers.add(sinkIn.target);
+      }
+    }
     PMapping currentMapping = this.logicalToPTOperator.get(p.operatorMeta);
     List<PTOperator> copyPartitions = Lists.newArrayList(currentMapping.partitions);
     copyPartitions.remove(p);
     removePartition(p, currentMapping);
     currentMapping.partitions = copyPartitions;
+    // remove orphaned downstream operators
+    for (PTOperator dop : downstreamOpers) {
+      if (dop.inputs.isEmpty()) {
+        removeTerminatedPartition(dop);
+      }
+    }
     deployChanges();
   }
 
@@ -1012,8 +1023,8 @@ public class PhysicalPlan implements Serializable
    * @param oper
    * @return
    */
-  private void removePartition(PTOperator oper, PMapping operatorMapping) {
-
+  private void removePartition(PTOperator oper, PMapping operatorMapping)
+  {
     // remove any parallel partition
     for (PTOutput out : oper.outputs) {
       // copy list as it is modified by recursive remove
@@ -1137,7 +1148,8 @@ public class PhysicalPlan implements Serializable
     return inputPortList;
   }
 
-  void removePTOperator(PTOperator oper) {
+  void removePTOperator(PTOperator oper)
+  {
     LOG.debug("Removing operator " + oper);
 
     // per partition merge operators

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/76faf869/engine/src/test/java/com/datatorrent/stram/MockContainer.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/MockContainer.java b/engine/src/test/java/com/datatorrent/stram/MockContainer.java
index 7a6ba64..c0b704f 100644
--- a/engine/src/test/java/com/datatorrent/stram/MockContainer.java
+++ b/engine/src/test/java/com/datatorrent/stram/MockContainer.java
@@ -91,7 +91,7 @@ public class MockContainer
     for (Map.Entry<Integer, MockOperatorStats> oe : this.stats.entrySet()) {
       OperatorHeartbeat ohb = new OperatorHeartbeat();
       ohb.setNodeId(oe.getKey());
-      ohb.setState(OperatorHeartbeat.DeployState.ACTIVE);
+      ohb.setState(oe.getValue().deployState);
       OperatorStats lstats = new OperatorStats();
       lstats.checkpoint = new Checkpoint(oe.getValue().checkpointWindowId, 0, 0);
       lstats.windowId = oe.getValue().currentWindowId;

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/76faf869/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
index 9726e65..d7a7fff 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamCodecTest.java
@@ -1178,28 +1178,6 @@ public class StreamCodecTest
     return unifiers;
   }
 
-  private void checkNotSetStreamCodecInfo(Map<Integer, StreamCodec<?>> streamCodecs, String id,
-                                          Integer streamCodecIdentifier)
-  {
-    StreamCodec<?> streamCodecInfo = streamCodecs.get(streamCodecIdentifier);
-    Assert.assertNotNull("stream codec null " + id, streamCodecInfo);
-    Assert.assertNull("stream codec object not null " + id, streamCodecInfo);
-  }
-
-  private void checkStreamCodecInfo(Map<Integer, StreamCodec<?>> streamCodecs, String id,
-                                    Integer streamCodecIdentifier, StreamCodec<?> streamCodec)
-  {
-    checkStreamCodecInfo(streamCodecs, id, streamCodecIdentifier, streamCodec, null);
-  }
-
-  private void checkStreamCodecInfo(Map<Integer, StreamCodec<?>> streamCodecs, String id,
-                                    Integer streamCodecIdentifier, StreamCodec<?> streamCodec, String className)
-  {
-    StreamCodec<?> streamCodecInfo = streamCodecs.get(streamCodecIdentifier);
-    Assert.assertNotNull("stream codec info null " + id, streamCodecInfo);
-    Assert.assertEquals("stream codec object " + id, streamCodec, streamCodecInfo);
-  }
-
   private void checkPresentStreamCodec(LogicalPlan.OperatorMeta operatorMeta, Operator.InputPort<?> inputPort,
                                        Map<Integer, StreamCodec<?>> streamCodecs,
                                        String id, PhysicalPlan plan )
@@ -1277,17 +1255,6 @@ public class StreamCodecTest
     return otdi;
   }
 
-  private LogicalPlan.InputPortMeta getInputPortMeta(LogicalPlan.StreamMeta streamMeta, LogicalPlan.OperatorMeta operatorMeta)
-  {
-    LogicalPlan.InputPortMeta portMeta = null;
-    for (Map.Entry<LogicalPlan.InputPortMeta, LogicalPlan.StreamMeta> entry : operatorMeta.getInputStreams().entrySet()) {
-      if (entry.getValue() == streamMeta) {
-        portMeta = entry.getKey();
-      }
-    }
-    return portMeta;
-  }
-
   // For tests so that it doesn't trigger assignment of a new id
   public boolean isStrCodecPresent(StreamCodec<?> streamCodecInfo, PhysicalPlan plan)
   {
@@ -1316,7 +1283,7 @@ public class StreamCodecTest
 
   public static class DefaultTestStreamCodec  extends DefaultStatefulStreamCodec<Object> implements Serializable
   {
-
+    private static final long serialVersionUID = 1L;
   }
 
   public static class DefaultCodecOperator extends GenericTestOperator

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/76faf869/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
index a238e3e..89f2878 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
@@ -31,7 +31,6 @@ import org.junit.Test;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-
 import com.datatorrent.api.Context;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.Context.PortContext;
@@ -40,10 +39,10 @@ import com.datatorrent.api.Stats.OperatorStats;
 import com.datatorrent.api.Stats.OperatorStats.PortStats;
 import com.datatorrent.api.StatsListener;
 import com.datatorrent.api.annotation.Stateless;
-
 import com.datatorrent.common.partitioner.StatelessPartitioner;
 import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.common.util.FSStorageAgent;
+import com.datatorrent.stram.MockContainer.MockOperatorStats;
 import com.datatorrent.stram.StreamingContainerAgent.ContainerStartRequest;
 import com.datatorrent.stram.StreamingContainerManager.ContainerResource;
 import com.datatorrent.stram.api.AppDataSource;
@@ -56,6 +55,7 @@ import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerHe
 import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse;
 import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerStats;
 import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.OperatorHeartbeat;
+import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.OperatorHeartbeat.DeployState;
 import com.datatorrent.stram.appdata.AppDataPushAgent;
 import com.datatorrent.stram.codec.DefaultStatefulStreamCodec;
 import com.datatorrent.stram.engine.*;
@@ -72,12 +72,14 @@ import com.datatorrent.stram.support.StramTestSupport.EmbeddedWebSocketServer;
 import com.datatorrent.stram.support.StramTestSupport.MemoryStorageAgent;
 import com.datatorrent.stram.support.StramTestSupport.TestMeta;
 import com.datatorrent.stram.tuple.Tuple;
+
 import org.apache.commons.lang.StringUtils;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 import org.eclipse.jetty.websocket.WebSocket;
 
-public class StreamingContainerManagerTest {
+public class StreamingContainerManagerTest
+{
   @Rule public TestMeta testMeta = new TestMeta();
 
   @Test
@@ -703,6 +705,74 @@ public class StreamingContainerManagerTest {
     Assert.assertEquals("type " + o1DeployInfo, OperatorDeployInfo.OperatorType.INPUT, o1DeployInfo.type);
   }
 
+  @Test
+  public void testOperatorShutdown()
+  {
+    LogicalPlan dag = new LogicalPlan();
+    dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir);
+    dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
+
+    GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
+    GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
+    dag.addStream("stream1", o1.outport1, o2.inport1);
+
+    StreamingContainerManager scm = new StreamingContainerManager(dag);
+
+    PhysicalPlan physicalPlan = scm.getPhysicalPlan();
+    Map<PTContainer, MockContainer> mockContainers = new HashMap<>();
+    for (PTContainer c : physicalPlan.getContainers()) {
+      MockContainer mc = new MockContainer(scm, c);
+      mockContainers.put(c, mc);
+    }
+    // deploy all containers
+    for (Map.Entry<PTContainer, MockContainer> ce : mockContainers.entrySet()) {
+      ce.getValue().deploy();
+      // skip buffer server purge in monitorHeartbeat
+      ce.getKey().bufferServerAddress = null;
+    }
+
+    PTOperator o1p1 = physicalPlan.getOperators(dag.getMeta(o1)).get(0);
+    MockContainer mc1 = mockContainers.get(o1p1.getContainer());
+    MockOperatorStats o1p1mos = mc1.stats(o1p1.getId());
+    o1p1mos.currentWindowId(1).checkpointWindowId(1).deployState(DeployState.ACTIVE);
+    mc1.sendHeartbeat();
+
+    PTOperator o2p1 = physicalPlan.getOperators(dag.getMeta(o2)).get(0);
+    MockContainer mc2 = mockContainers.get(o2p1.getContainer());
+    MockOperatorStats o2p1mos = mc2.stats(o2p1.getId());
+    o2p1mos.currentWindowId(1).checkpointWindowId(1).deployState(DeployState.ACTIVE);
+    mc2.sendHeartbeat();
+
+    o1p1mos.currentWindowId(2).deployState(DeployState.SHUTDOWN);
+    mc1.sendHeartbeat();
+    scm.monitorHeartbeat();
+    Assert.assertEquals("committedWindowId", -1, scm.getCommittedWindowId());
+    scm.monitorHeartbeat(); // committedWindowId updated in next cycle
+    Assert.assertEquals("committedWindowId", 1, scm.getCommittedWindowId());
+    scm.processEvents();
+    Assert.assertEquals("containers at committedWindowId=1", 2, physicalPlan.getContainers().size());
+
+    // checkpoint window 2
+    o1p1mos.checkpointWindowId(2);
+    mc1.sendHeartbeat();
+    scm.monitorHeartbeat();
+
+    o2p1mos.currentWindowId(2).checkpointWindowId(2);
+    mc2.sendHeartbeat();
+    scm.monitorHeartbeat();
+    Assert.assertEquals("committedWindowId", 1, scm.getCommittedWindowId());
+    scm.monitorHeartbeat(); // committedWindowId updated in next cycle
+    Assert.assertEquals("committedWindowId", 2, scm.getCommittedWindowId());
+    Assert.assertEquals(1, o1p1.getContainer().getOperators().size());
+    Assert.assertEquals(1, o2p1.getContainer().getOperators().size());
+    Assert.assertEquals(2, physicalPlan.getContainers().size());
+
+    // call again as events are processed after committed window was updated
+    scm.processEvents();
+    Assert.assertEquals(0, o1p1.getContainer().getOperators().size());
+    Assert.assertEquals(0, o2p1.getContainer().getOperators().size());
+    Assert.assertEquals(0, physicalPlan.getContainers().size());
+  }
 
   private void testDownStreamPartition(Locality locality) throws Exception
   {
@@ -738,7 +808,8 @@ public class StreamingContainerManagerTest {
   }
 
   @Test
-  public void testPhysicalPropertyUpdate() throws Exception{
+  public void testPhysicalPropertyUpdate() throws Exception
+  {
     LogicalPlan dag = new LogicalPlan();
     dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null));
     TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
@@ -755,6 +826,7 @@ public class StreamingContainerManagerTest {
     Future<?> future = dnmgr.getPhysicalOperatorProperty(lc.getPlanOperators(dag.getMeta(o1)).get(0).getId(), "maxTuples", 10000);
     Object object = future.get(10000, TimeUnit.MILLISECONDS);
     Assert.assertNotNull(object);
+    @SuppressWarnings("unchecked")
     Map<String, Object> propertyValue = (Map<String, Object>)object;
     Assert.assertEquals(2,propertyValue.get("maxTuples"));
     lc.shutdown();
@@ -873,6 +945,7 @@ public class StreamingContainerManagerTest {
       pushAgent.pushData();
       Thread.sleep(1000);
       Assert.assertTrue(messages.size() > 0);
+      pushAgent.close();
       JSONObject message = messages.get(0);
       System.out.println("Got this message: " + message.toString(2));
       Assert.assertEquals(topic, message.getString("topic"));