You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/11/23 06:15:03 UTC

apex-core git commit: APEXCORE-544 APEXCORE-379 #resolve fixed latency calculation when the upstream heartbeat is processed later than the downstream for a particular window

Repository: apex-core
Updated Branches:
  refs/heads/master cf8141846 -> a54e0b7f8


APEXCORE-544 APEXCORE-379 #resolve fixed latency calculation when the upstream heartbeat is processed later than the downstream for a particular window


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/a54e0b7f
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/a54e0b7f
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/a54e0b7f

Branch: refs/heads/master
Commit: a54e0b7f84a4c5fe8e0f6fa3a6f6c3fe7b13b5b3
Parents: cf81418
Author: David Yan <da...@datatorrent.com>
Authored: Fri Sep 30 16:54:45 2016 -0700
Committer: Thomas Weise <th...@apache.org>
Committed: Tue Nov 22 16:01:16 2016 -0800

----------------------------------------------------------------------
 .../stram/StreamingContainerManager.java        | 156 +++++++++++++------
 .../stram/engine/WindowGenerator.java           |   8 +-
 .../java/com/datatorrent/stram/LatencyTest.java | 133 ++++++++++++++++
 .../stram/engine/WindowGeneratorTest.java       |   2 +-
 4 files changed, 250 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/a54e0b7f/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index d5e5475..4d193d5 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -276,7 +276,7 @@ public class StreamingContainerManager implements PlanContext
 
   private final long startTime = System.currentTimeMillis();
 
-  private static class EndWindowStats
+  static class EndWindowStats
   {
     long emitTimestamp = -1;
     HashMap<String, Long> dequeueTimestamps = new HashMap<>(); // input port name to end window dequeue time
@@ -816,20 +816,27 @@ public class StreamingContainerManager implements PlanContext
 
   private void calculateEndWindowStats()
   {
+    Map<Integer, PTOperator> allOperators = plan.getAllOperators();
+
+    UpdateOperatorLatencyContext ctx = new UpdateOperatorLatencyContext(rpcLatencies, endWindowStatsOperatorMap);
+
+    for (PTOperator operator : allOperators.values()) {
+      updateOperatorLatency(operator, ctx);
+    }
+
     if (!endWindowStatsOperatorMap.isEmpty()) {
-      Set<Integer> allCurrentOperators = plan.getAllOperators().keySet();
 
       if (endWindowStatsOperatorMap.size() > this.vars.maxWindowsBehindForStats) {
         LOG.warn("Some operators are behind for more than {} windows! Trimming the end window stats map", this.vars.maxWindowsBehindForStats);
         while (endWindowStatsOperatorMap.size() > this.vars.maxWindowsBehindForStats) {
           LOG.debug("Removing incomplete end window stats for window id {}. Collected operator set: {}. Complete set: {}",
               endWindowStatsOperatorMap.firstKey(),
-              endWindowStatsOperatorMap.get(endWindowStatsOperatorMap.firstKey()).keySet(), allCurrentOperators);
+              endWindowStatsOperatorMap.get(endWindowStatsOperatorMap.firstKey()).keySet(), allOperators.keySet());
           endWindowStatsOperatorMap.remove(endWindowStatsOperatorMap.firstKey());
         }
       }
       //logicalMetrics.clear();
-      int numOperators = allCurrentOperators.size();
+      int numOperators = allOperators.size();
       Long windowId = endWindowStatsOperatorMap.firstKey();
       while (windowId != null) {
         Map<Integer, EndWindowStats> endWindowStatsMap = endWindowStatsOperatorMap.get(windowId);
@@ -838,7 +845,7 @@ public class StreamingContainerManager implements PlanContext
         aggregateMetrics(windowId, endWindowStatsMap);
         criticalPathInfo = findCriticalPath();
 
-        if (allCurrentOperators.containsAll(endWindowStatsOperators)) {
+        if (allOperators.keySet().containsAll(endWindowStatsOperators)) {
           if (endWindowStatsMap.size() < numOperators) {
             if (windowId < completeEndWindowStatsWindowId) {
               LOG.debug("Disregarding stale end window stats for window {}", windowId);
@@ -1704,45 +1711,6 @@ public class StreamingContainerManager implements PlanContext
             }
             endWindowStatsMap.put(shb.getNodeId(), endWindowStats);
 
-            if (!oper.getInputs().isEmpty()) {
-              long latency = Long.MAX_VALUE;
-              long adjustedEndWindowEmitTimestamp = endWindowStats.emitTimestamp;
-              MovingAverageLong rpcLatency = rpcLatencies.get(oper.getContainer().getExternalId());
-              if (rpcLatency != null) {
-                adjustedEndWindowEmitTimestamp += rpcLatency.getAvg();
-              }
-              PTOperator slowestUpstream = null;
-              for (PTInput input : oper.getInputs()) {
-                PTOperator upstreamOp = input.source.source;
-                if (upstreamOp.getOperatorMeta().getOperator() instanceof Operator.DelayOperator) {
-                  continue;
-                }
-                EndWindowStats ews = endWindowStatsMap.get(upstreamOp.getId());
-                long portLatency;
-                if (ews == null) {
-                  // This is when the operator is likely to be behind too many windows. We need to give an estimate for
-                  // latency at this point, by looking at the number of windows behind
-                  int widthMillis = plan.getLogicalPlan().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS);
-                  portLatency = (upstreamOp.stats.currentWindowId.get() - oper.stats.currentWindowId.get()) * widthMillis;
-                } else {
-                  MovingAverageLong upstreamRPCLatency = rpcLatencies.get(upstreamOp.getContainer().getExternalId());
-                  portLatency = adjustedEndWindowEmitTimestamp - ews.emitTimestamp;
-                  if (upstreamRPCLatency != null) {
-                    portLatency -= upstreamRPCLatency.getAvg();
-                  }
-                }
-                if (portLatency < 0) {
-                  portLatency = 0;
-                }
-                if (latency > portLatency) {
-                  latency = portLatency;
-                  slowestUpstream = upstreamOp;
-                }
-              }
-              status.latencyMA.add(latency);
-              slowestUpstreamOp.put(oper, slowestUpstream);
-            }
-
             Set<Integer> allCurrentOperators = plan.getAllOperators().keySet();
             int numOperators = plan.getAllOperators().size();
             if (allCurrentOperators.containsAll(endWindowStatsMap.keySet()) && endWindowStatsMap.size() == numOperators) {
@@ -1828,6 +1796,106 @@ public class StreamingContainerManager implements PlanContext
     return rsp;
   }
 
+  static class UpdateOperatorLatencyContext
+  {
+    Map<String, MovingAverageLong> rpcLatencies;
+    Map<Long, Map<Integer, EndWindowStats>> endWindowStatsOperatorMap;
+
+    UpdateOperatorLatencyContext()
+    {
+    }
+
+    UpdateOperatorLatencyContext(Map<String, MovingAverageLong> rpcLatencies, Map<Long, Map<Integer, EndWindowStats>> endWindowStatsOperatorMap)
+    {
+      this.rpcLatencies = rpcLatencies;
+      this.endWindowStatsOperatorMap = endWindowStatsOperatorMap;
+    }
+
+    long getRPCLatency(PTOperator oper)
+    {
+      MovingAverageLong rpcLatency = rpcLatencies.get(oper.getContainer().getExternalId());
+      return rpcLatency == null ? 0 : rpcLatency.getAvg();
+    }
+
+    boolean endWindowStatsExists(long windowId)
+    {
+      return endWindowStatsOperatorMap.containsKey(windowId);
+    }
+
+    long getEndWindowEmitTimestamp(long windowId, PTOperator oper)
+    {
+      Map<Integer, EndWindowStats> endWindowStatsMap = endWindowStatsOperatorMap.get(windowId);
+      if (endWindowStatsMap == null) {
+        return -1;
+      }
+      EndWindowStats ews = endWindowStatsMap.get(oper.getId());
+      if (ews == null) {
+        return -1;
+      }
+      return ews.emitTimestamp;
+    }
+  }
+
+  public long updateOperatorLatency(PTOperator oper, UpdateOperatorLatencyContext ctx)
+  {
+    if (!oper.getInputs().isEmpty()) {
+      OperatorStatus status = oper.stats;
+      long latency = Long.MAX_VALUE;
+      PTOperator slowestUpstream = null;
+      int windowWidthMillis = plan.getLogicalPlan().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS);
+      int heartbeatTimeoutMillis = plan.getLogicalPlan().getValue(LogicalPlan.HEARTBEAT_TIMEOUT_MILLIS);
+      long currentWindowId = status.currentWindowId.get();
+      if (!ctx.endWindowStatsExists(currentWindowId)) {
+        // the end window stats for the current window id is not available, estimate latency by looking at upstream window id
+        for (PTInput input : oper.getInputs()) {
+          PTOperator upstreamOp = input.source.source;
+          if (upstreamOp.getOperatorMeta().getOperator() instanceof Operator.DelayOperator) {
+            continue;
+          }
+          if (upstreamOp.stats.currentWindowId.get() > oper.stats.currentWindowId.get()) {
+            long portLatency = WindowGenerator
+                .compareWindowId(upstreamOp.stats.currentWindowId.get(), oper.stats.currentWindowId.get(), windowWidthMillis) * windowWidthMillis;
+            if (latency > portLatency) {
+              latency = portLatency;
+              slowestUpstream = upstreamOp;
+            }
+          }
+        }
+      } else {
+        long endWindowEmitTime = ctx.getEndWindowEmitTimestamp(currentWindowId, oper);
+        long adjustedEndWindowEmitTimestamp = endWindowEmitTime + ctx.getRPCLatency(oper);
+        for (PTInput input : oper.getInputs()) {
+          PTOperator upstreamOp = input.source.source;
+          if (upstreamOp.getOperatorMeta().getOperator() instanceof Operator.DelayOperator) {
+            continue;
+          }
+          long upstreamEndWindowEmitTime = ctx.getEndWindowEmitTimestamp(currentWindowId, upstreamOp);
+          if (upstreamEndWindowEmitTime < 0) {
+            continue;
+          }
+          long portLatency = adjustedEndWindowEmitTimestamp - (upstreamEndWindowEmitTime + ctx.getRPCLatency(upstreamOp));
+          if (portLatency < 0) {
+            portLatency = 0;
+          }
+          long latencyFromWindowsBehind = WindowGenerator.compareWindowId(upstreamOp.stats.currentWindowId.get(), oper.stats.currentWindowId.get(), windowWidthMillis) * windowWidthMillis;
+          if (latencyFromWindowsBehind > portLatency && latencyFromWindowsBehind > heartbeatTimeoutMillis) {
+            portLatency = latencyFromWindowsBehind;
+          }
+          if (latency > portLatency) {
+            latency = portLatency;
+            slowestUpstream = upstreamOp;
+          }
+        }
+      }
+      if (slowestUpstream != null) {
+        status.latencyMA.add(latency);
+        slowestUpstreamOp.put(oper, slowestUpstream);
+        return latency;
+      }
+    }
+    return 0;
+  }
+
   private ContainerHeartbeatResponse getHeartbeatResponse(StreamingContainerAgent sca)
   {
     ContainerHeartbeatResponse rsp = new ContainerHeartbeatResponse();

http://git-wip-us.apache.org/repos/asf/apex-core/blob/a54e0b7f/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java b/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
index 4793924..3a8438d 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
@@ -282,14 +282,14 @@ public class WindowGenerator extends MuxReservoir implements Stream, Runnable
    *
    * @param windowIdA
    * @param windowIdB
-   * @param firstWindowMillis
    * @param windowWidthMillis
    * @return the number of windows ahead, negative if windowIdA is behind windowIdB
    */
-  public static long compareWindowId(long windowIdA, long windowIdB, long firstWindowMillis, long windowWidthMillis)
+  public static long compareWindowId(long windowIdA, long windowIdB, long windowWidthMillis)
   {
-    long millisA = getWindowMillis(windowIdA, firstWindowMillis, windowWidthMillis);
-    long millisB = getWindowMillis(windowIdB, firstWindowMillis, windowWidthMillis);
+    // firstWindowMillis here actually does not matter since they will be subtracted out.
+    long millisA = getWindowMillis(windowIdA, 0, windowWidthMillis);
+    long millisB = getWindowMillis(windowIdB, 0, windowWidthMillis);
     return (millisA - millisB) / windowWidthMillis;
   }
 

http://git-wip-us.apache.org/repos/asf/apex-core/blob/a54e0b7f/engine/src/test/java/com/datatorrent/stram/LatencyTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/LatencyTest.java b/engine/src/test/java/com/datatorrent/stram/LatencyTest.java
new file mode 100644
index 0000000..29f525a
--- /dev/null
+++ b/engine/src/test/java/com/datatorrent/stram/LatencyTest.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.stram;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.stram.engine.GenericTestOperator;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.plan.physical.PTOperator;
+import com.datatorrent.stram.plan.physical.PhysicalPlan;
+import com.datatorrent.stram.support.StramTestSupport;
+
+public class LatencyTest
+{
+  private static final Logger LOG = LoggerFactory.getLogger(LatencyTest.class);
+  @Rule
+  public StramTestSupport.TestMeta testMeta = new StramTestSupport.TestMeta();
+
+  private LogicalPlan dag;
+  private StreamingContainerManager scm;
+  private PTOperator o1p1;
+  private PTOperator o2p1;
+  private PTOperator o3p1;
+
+  private static final int windowWidthMillis = 600;
+  private static final int heartbeatTimeoutMillis = 30000;
+
+  @Before
+  public void setup()
+  {
+    dag = StramTestSupport.createDAG(testMeta);
+    dag.setAttribute(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS, windowWidthMillis);
+    dag.setAttribute(Context.DAGContext.HEARTBEAT_TIMEOUT_MILLIS, heartbeatTimeoutMillis);
+    dag.setAttribute(com.datatorrent.api.Context.OperatorContext.STORAGE_AGENT, new StramTestSupport
+        .MemoryStorageAgent());
+
+    GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
+    GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
+    GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
+
+    dag.addStream("o1.output1", o1.outport1, o3.inport1);
+    dag.addStream("o2.output1", o2.outport1, o3.inport2);
+    scm = new StreamingContainerManager(dag);
+    PhysicalPlan plan = scm.getPhysicalPlan();
+    o1p1 = plan.getOperators(dag.getMeta(o1)).get(0);
+    o2p1 = plan.getOperators(dag.getMeta(o2)).get(0);
+    o3p1 = plan.getOperators(dag.getMeta(o3)).get(0);
+  }
+
+  private long getLatency(long windowId1, long windowId2, long windowId3, final boolean endWindowStatsExists, final long ewt1, final long ewt2, final long ewt3)
+  {
+    o1p1.stats.statsRevs.checkout();
+    o1p1.stats.currentWindowId.set(windowId1);
+    o1p1.stats.statsRevs.commit();
+
+    o2p1.stats.statsRevs.checkout();
+    o2p1.stats.currentWindowId.set(windowId2);
+    o2p1.stats.statsRevs.commit();
+
+    o3p1.stats.statsRevs.checkout();
+    o3p1.stats.currentWindowId.set(windowId3);
+    o3p1.stats.statsRevs.commit();
+
+    return scm.updateOperatorLatency(o3p1, new StreamingContainerManager.UpdateOperatorLatencyContext()
+    {
+      @Override
+      long getRPCLatency(PTOperator oper)
+      {
+        return 0;
+      }
+
+      @Override
+      boolean endWindowStatsExists(long windowId)
+      {
+        return endWindowStatsExists;
+      }
+
+      @Override
+      long getEndWindowEmitTimestamp(long windowId, PTOperator oper)
+      {
+        if (oper == o1p1) {
+          return ewt1;
+        } else if (oper == o2p1) {
+          return ewt2;
+        } else if (oper == o3p1) {
+          return ewt3;
+        } else {
+          Assert.fail();
+          return 0;
+        }
+      }
+    });
+  }
+
+  @Test
+  public void testLatency()
+  {
+    // When all end window stats are available and latency within heartbeatTimeout
+    Assert.assertEquals(100, getLatency(1000, 1000, 1000, true, 1000, 1500, 1600));
+
+    // When all end window stats are available and calculated latency is more than heartbeatTimeout
+    Assert.assertEquals((10000 - 100) * windowWidthMillis, getLatency(10000, 10000, 100, true, 1000, 1500, 1600));
+
+    // When end window stats are not available
+    Assert.assertEquals((1000 - 997) * windowWidthMillis, getLatency(1000, 1000, 997, false, 1000, 1500, 1600));
+
+    // When the current window is larger than upstream's current window
+    Assert.assertEquals(0, getLatency(1000, 1000, 1001, true, -1, -1, 1600));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-core/blob/a54e0b7f/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java b/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
index e302f98..0c95b75 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
@@ -383,7 +383,7 @@ public class WindowGeneratorTest
     for (int windowWidthMillis : new int[]{500, 123}) {
       long window1 = WindowGenerator.getWindowId(first, first, windowWidthMillis);
       long window2 = WindowGenerator.getAheadWindowId(window1, first, windowWidthMillis, ahead);
-      Assert.assertEquals(ahead, WindowGenerator.compareWindowId(window2, window1, first, windowWidthMillis));
+      Assert.assertEquals(ahead, WindowGenerator.compareWindowId(window2, window1, windowWidthMillis));
     }
   }