You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/11/23 06:15:03 UTC
apex-core git commit: APEXCORE-544 APEXCORE-379 #resolve fixed
latency calculation when the upstream heartbeat is processed later than the
downstream for a particular window
Repository: apex-core
Updated Branches:
refs/heads/master cf8141846 -> a54e0b7f8
APEXCORE-544 APEXCORE-379 #resolve fixed latency calculation when the upstream heartbeat is processed later than the downstream for a particular window
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/a54e0b7f
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/a54e0b7f
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/a54e0b7f
Branch: refs/heads/master
Commit: a54e0b7f84a4c5fe8e0f6fa3a6f6c3fe7b13b5b3
Parents: cf81418
Author: David Yan <da...@datatorrent.com>
Authored: Fri Sep 30 16:54:45 2016 -0700
Committer: Thomas Weise <th...@apache.org>
Committed: Tue Nov 22 16:01:16 2016 -0800
----------------------------------------------------------------------
.../stram/StreamingContainerManager.java | 156 +++++++++++++------
.../stram/engine/WindowGenerator.java | 8 +-
.../java/com/datatorrent/stram/LatencyTest.java | 133 ++++++++++++++++
.../stram/engine/WindowGeneratorTest.java | 2 +-
4 files changed, 250 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-core/blob/a54e0b7f/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index d5e5475..4d193d5 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -276,7 +276,7 @@ public class StreamingContainerManager implements PlanContext
private final long startTime = System.currentTimeMillis();
- private static class EndWindowStats
+ static class EndWindowStats
{
long emitTimestamp = -1;
HashMap<String, Long> dequeueTimestamps = new HashMap<>(); // input port name to end window dequeue time
@@ -816,20 +816,27 @@ public class StreamingContainerManager implements PlanContext
private void calculateEndWindowStats()
{
+ Map<Integer, PTOperator> allOperators = plan.getAllOperators();
+
+ UpdateOperatorLatencyContext ctx = new UpdateOperatorLatencyContext(rpcLatencies, endWindowStatsOperatorMap);
+
+ for (PTOperator operator : allOperators.values()) {
+ updateOperatorLatency(operator, ctx);
+ }
+
if (!endWindowStatsOperatorMap.isEmpty()) {
- Set<Integer> allCurrentOperators = plan.getAllOperators().keySet();
if (endWindowStatsOperatorMap.size() > this.vars.maxWindowsBehindForStats) {
LOG.warn("Some operators are behind for more than {} windows! Trimming the end window stats map", this.vars.maxWindowsBehindForStats);
while (endWindowStatsOperatorMap.size() > this.vars.maxWindowsBehindForStats) {
LOG.debug("Removing incomplete end window stats for window id {}. Collected operator set: {}. Complete set: {}",
endWindowStatsOperatorMap.firstKey(),
- endWindowStatsOperatorMap.get(endWindowStatsOperatorMap.firstKey()).keySet(), allCurrentOperators);
+ endWindowStatsOperatorMap.get(endWindowStatsOperatorMap.firstKey()).keySet(), allOperators.keySet());
endWindowStatsOperatorMap.remove(endWindowStatsOperatorMap.firstKey());
}
}
//logicalMetrics.clear();
- int numOperators = allCurrentOperators.size();
+ int numOperators = allOperators.size();
Long windowId = endWindowStatsOperatorMap.firstKey();
while (windowId != null) {
Map<Integer, EndWindowStats> endWindowStatsMap = endWindowStatsOperatorMap.get(windowId);
@@ -838,7 +845,7 @@ public class StreamingContainerManager implements PlanContext
aggregateMetrics(windowId, endWindowStatsMap);
criticalPathInfo = findCriticalPath();
- if (allCurrentOperators.containsAll(endWindowStatsOperators)) {
+ if (allOperators.keySet().containsAll(endWindowStatsOperators)) {
if (endWindowStatsMap.size() < numOperators) {
if (windowId < completeEndWindowStatsWindowId) {
LOG.debug("Disregarding stale end window stats for window {}", windowId);
@@ -1704,45 +1711,6 @@ public class StreamingContainerManager implements PlanContext
}
endWindowStatsMap.put(shb.getNodeId(), endWindowStats);
- if (!oper.getInputs().isEmpty()) {
- long latency = Long.MAX_VALUE;
- long adjustedEndWindowEmitTimestamp = endWindowStats.emitTimestamp;
- MovingAverageLong rpcLatency = rpcLatencies.get(oper.getContainer().getExternalId());
- if (rpcLatency != null) {
- adjustedEndWindowEmitTimestamp += rpcLatency.getAvg();
- }
- PTOperator slowestUpstream = null;
- for (PTInput input : oper.getInputs()) {
- PTOperator upstreamOp = input.source.source;
- if (upstreamOp.getOperatorMeta().getOperator() instanceof Operator.DelayOperator) {
- continue;
- }
- EndWindowStats ews = endWindowStatsMap.get(upstreamOp.getId());
- long portLatency;
- if (ews == null) {
- // This is when the operator is likely to be behind too many windows. We need to give an estimate for
- // latency at this point, by looking at the number of windows behind
- int widthMillis = plan.getLogicalPlan().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS);
- portLatency = (upstreamOp.stats.currentWindowId.get() - oper.stats.currentWindowId.get()) * widthMillis;
- } else {
- MovingAverageLong upstreamRPCLatency = rpcLatencies.get(upstreamOp.getContainer().getExternalId());
- portLatency = adjustedEndWindowEmitTimestamp - ews.emitTimestamp;
- if (upstreamRPCLatency != null) {
- portLatency -= upstreamRPCLatency.getAvg();
- }
- }
- if (portLatency < 0) {
- portLatency = 0;
- }
- if (latency > portLatency) {
- latency = portLatency;
- slowestUpstream = upstreamOp;
- }
- }
- status.latencyMA.add(latency);
- slowestUpstreamOp.put(oper, slowestUpstream);
- }
-
Set<Integer> allCurrentOperators = plan.getAllOperators().keySet();
int numOperators = plan.getAllOperators().size();
if (allCurrentOperators.containsAll(endWindowStatsMap.keySet()) && endWindowStatsMap.size() == numOperators) {
@@ -1828,6 +1796,106 @@ public class StreamingContainerManager implements PlanContext
return rsp;
}
+ static class UpdateOperatorLatencyContext
+ {
+ Map<String, MovingAverageLong> rpcLatencies;
+ Map<Long, Map<Integer, EndWindowStats>> endWindowStatsOperatorMap;
+
+ UpdateOperatorLatencyContext()
+ {
+ }
+
+ UpdateOperatorLatencyContext(Map<String, MovingAverageLong> rpcLatencies, Map<Long, Map<Integer, EndWindowStats>> endWindowStatsOperatorMap)
+ {
+ this.rpcLatencies = rpcLatencies;
+ this.endWindowStatsOperatorMap = endWindowStatsOperatorMap;
+ }
+
+ long getRPCLatency(PTOperator oper)
+ {
+ MovingAverageLong rpcLatency = rpcLatencies.get(oper.getContainer().getExternalId());
+ return rpcLatency == null ? 0 : rpcLatency.getAvg();
+ }
+
+ boolean endWindowStatsExists(long windowId)
+ {
+ return endWindowStatsOperatorMap.containsKey(windowId);
+ }
+
+ long getEndWindowEmitTimestamp(long windowId, PTOperator oper)
+ {
+ Map<Integer, EndWindowStats> endWindowStatsMap = endWindowStatsOperatorMap.get(windowId);
+ if (endWindowStatsMap == null) {
+ return -1;
+ }
+ EndWindowStats ews = endWindowStatsMap.get(oper.getId());
+ if (ews == null) {
+ return -1;
+ }
+ return ews.emitTimestamp;
+ }
+ }
+
+ public long updateOperatorLatency(PTOperator oper, UpdateOperatorLatencyContext ctx)
+ {
+ if (!oper.getInputs().isEmpty()) {
+ OperatorStatus status = oper.stats;
+ long latency = Long.MAX_VALUE;
+ PTOperator slowestUpstream = null;
+ int windowWidthMillis = plan.getLogicalPlan().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS);
+ int heartbeatTimeoutMillis = plan.getLogicalPlan().getValue(LogicalPlan.HEARTBEAT_TIMEOUT_MILLIS);
+ long currentWindowId = status.currentWindowId.get();
+ if (!ctx.endWindowStatsExists(currentWindowId)) {
+ // the end window stats for the current window id is not available, estimate latency by looking at upstream window id
+ for (PTInput input : oper.getInputs()) {
+ PTOperator upstreamOp = input.source.source;
+ if (upstreamOp.getOperatorMeta().getOperator() instanceof Operator.DelayOperator) {
+ continue;
+ }
+ if (upstreamOp.stats.currentWindowId.get() > oper.stats.currentWindowId.get()) {
+ long portLatency = WindowGenerator
+ .compareWindowId(upstreamOp.stats.currentWindowId.get(), oper.stats.currentWindowId.get(), windowWidthMillis) * windowWidthMillis;
+ if (latency > portLatency) {
+ latency = portLatency;
+ slowestUpstream = upstreamOp;
+ }
+ }
+ }
+ } else {
+ long endWindowEmitTime = ctx.getEndWindowEmitTimestamp(currentWindowId, oper);
+ long adjustedEndWindowEmitTimestamp = endWindowEmitTime + ctx.getRPCLatency(oper);
+ for (PTInput input : oper.getInputs()) {
+ PTOperator upstreamOp = input.source.source;
+ if (upstreamOp.getOperatorMeta().getOperator() instanceof Operator.DelayOperator) {
+ continue;
+ }
+ long upstreamEndWindowEmitTime = ctx.getEndWindowEmitTimestamp(currentWindowId, upstreamOp);
+ if (upstreamEndWindowEmitTime < 0) {
+ continue;
+ }
+ long portLatency = adjustedEndWindowEmitTimestamp - (upstreamEndWindowEmitTime + ctx.getRPCLatency(upstreamOp));
+ if (portLatency < 0) {
+ portLatency = 0;
+ }
+ long latencyFromWindowsBehind = WindowGenerator.compareWindowId(upstreamOp.stats.currentWindowId.get(), oper.stats.currentWindowId.get(), windowWidthMillis) * windowWidthMillis;
+ if (latencyFromWindowsBehind > portLatency && latencyFromWindowsBehind > heartbeatTimeoutMillis) {
+ portLatency = latencyFromWindowsBehind;
+ }
+ if (latency > portLatency) {
+ latency = portLatency;
+ slowestUpstream = upstreamOp;
+ }
+ }
+ }
+ if (slowestUpstream != null) {
+ status.latencyMA.add(latency);
+ slowestUpstreamOp.put(oper, slowestUpstream);
+ return latency;
+ }
+ }
+ return 0;
+ }
+
private ContainerHeartbeatResponse getHeartbeatResponse(StreamingContainerAgent sca)
{
ContainerHeartbeatResponse rsp = new ContainerHeartbeatResponse();
http://git-wip-us.apache.org/repos/asf/apex-core/blob/a54e0b7f/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java b/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
index 4793924..3a8438d 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
@@ -282,14 +282,14 @@ public class WindowGenerator extends MuxReservoir implements Stream, Runnable
*
* @param windowIdA
* @param windowIdB
- * @param firstWindowMillis
* @param windowWidthMillis
* @return the number of windows ahead, negative if windowIdA is behind windowIdB
*/
- public static long compareWindowId(long windowIdA, long windowIdB, long firstWindowMillis, long windowWidthMillis)
+ public static long compareWindowId(long windowIdA, long windowIdB, long windowWidthMillis)
{
- long millisA = getWindowMillis(windowIdA, firstWindowMillis, windowWidthMillis);
- long millisB = getWindowMillis(windowIdB, firstWindowMillis, windowWidthMillis);
+ // firstWindowMillis here actually does not matter since they will be subtracted out.
+ long millisA = getWindowMillis(windowIdA, 0, windowWidthMillis);
+ long millisB = getWindowMillis(windowIdB, 0, windowWidthMillis);
return (millisA - millisB) / windowWidthMillis;
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/a54e0b7f/engine/src/test/java/com/datatorrent/stram/LatencyTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/LatencyTest.java b/engine/src/test/java/com/datatorrent/stram/LatencyTest.java
new file mode 100644
index 0000000..29f525a
--- /dev/null
+++ b/engine/src/test/java/com/datatorrent/stram/LatencyTest.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.stram;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.stram.engine.GenericTestOperator;
+import com.datatorrent.stram.plan.logical.LogicalPlan;
+import com.datatorrent.stram.plan.physical.PTOperator;
+import com.datatorrent.stram.plan.physical.PhysicalPlan;
+import com.datatorrent.stram.support.StramTestSupport;
+
+public class LatencyTest
+{
+ private static final Logger LOG = LoggerFactory.getLogger(LatencyTest.class);
+ @Rule
+ public StramTestSupport.TestMeta testMeta = new StramTestSupport.TestMeta();
+
+ private LogicalPlan dag;
+ private StreamingContainerManager scm;
+ private PTOperator o1p1;
+ private PTOperator o2p1;
+ private PTOperator o3p1;
+
+ private static final int windowWidthMillis = 600;
+ private static final int heartbeatTimeoutMillis = 30000;
+
+ @Before
+ public void setup()
+ {
+ dag = StramTestSupport.createDAG(testMeta);
+ dag.setAttribute(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS, windowWidthMillis);
+ dag.setAttribute(Context.DAGContext.HEARTBEAT_TIMEOUT_MILLIS, heartbeatTimeoutMillis);
+ dag.setAttribute(com.datatorrent.api.Context.OperatorContext.STORAGE_AGENT, new StramTestSupport
+ .MemoryStorageAgent());
+
+ GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
+ GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
+ GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
+
+ dag.addStream("o1.output1", o1.outport1, o3.inport1);
+ dag.addStream("o2.output1", o2.outport1, o3.inport2);
+ scm = new StreamingContainerManager(dag);
+ PhysicalPlan plan = scm.getPhysicalPlan();
+ o1p1 = plan.getOperators(dag.getMeta(o1)).get(0);
+ o2p1 = plan.getOperators(dag.getMeta(o2)).get(0);
+ o3p1 = plan.getOperators(dag.getMeta(o3)).get(0);
+ }
+
+ private long getLatency(long windowId1, long windowId2, long windowId3, final boolean endWindowStatsExists, final long ewt1, final long ewt2, final long ewt3)
+ {
+ o1p1.stats.statsRevs.checkout();
+ o1p1.stats.currentWindowId.set(windowId1);
+ o1p1.stats.statsRevs.commit();
+
+ o2p1.stats.statsRevs.checkout();
+ o2p1.stats.currentWindowId.set(windowId2);
+ o2p1.stats.statsRevs.commit();
+
+ o3p1.stats.statsRevs.checkout();
+ o3p1.stats.currentWindowId.set(windowId3);
+ o3p1.stats.statsRevs.commit();
+
+ return scm.updateOperatorLatency(o3p1, new StreamingContainerManager.UpdateOperatorLatencyContext()
+ {
+ @Override
+ long getRPCLatency(PTOperator oper)
+ {
+ return 0;
+ }
+
+ @Override
+ boolean endWindowStatsExists(long windowId)
+ {
+ return endWindowStatsExists;
+ }
+
+ @Override
+ long getEndWindowEmitTimestamp(long windowId, PTOperator oper)
+ {
+ if (oper == o1p1) {
+ return ewt1;
+ } else if (oper == o2p1) {
+ return ewt2;
+ } else if (oper == o3p1) {
+ return ewt3;
+ } else {
+ Assert.fail();
+ return 0;
+ }
+ }
+ });
+ }
+
+ @Test
+ public void testLatency()
+ {
+ // When all end window stats are available and latency within heartbeatTimeout
+ Assert.assertEquals(100, getLatency(1000, 1000, 1000, true, 1000, 1500, 1600));
+
+ // When all end window stats are available and calculated latency is more than heartbeatTimeout
+ Assert.assertEquals((10000 - 100) * windowWidthMillis, getLatency(10000, 10000, 100, true, 1000, 1500, 1600));
+
+ // When end window stats are not available
+ Assert.assertEquals((1000 - 997) * windowWidthMillis, getLatency(1000, 1000, 997, false, 1000, 1500, 1600));
+
+ // When the current window is larger than upstream's current window
+ Assert.assertEquals(0, getLatency(1000, 1000, 1001, true, -1, -1, 1600));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/a54e0b7f/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java b/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
index e302f98..0c95b75 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
@@ -383,7 +383,7 @@ public class WindowGeneratorTest
for (int windowWidthMillis : new int[]{500, 123}) {
long window1 = WindowGenerator.getWindowId(first, first, windowWidthMillis);
long window2 = WindowGenerator.getAheadWindowId(window1, first, windowWidthMillis, ahead);
- Assert.assertEquals(ahead, WindowGenerator.compareWindowId(window2, window1, first, windowWidthMillis));
+ Assert.assertEquals(ahead, WindowGenerator.compareWindowId(window2, window1, windowWidthMillis));
}
}