You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2015/09/09 09:37:34 UTC
incubator-apex-core git commit: APEX-98 #resolve fixed problem with
precision in the window to time utility function in WindowGenerator
Repository: incubator-apex-core
Updated Branches:
refs/heads/devel-3 711fd0708 -> e512610ed
APEX-98 #resolve fixed problem with precision in the window to time utility function in WindowGenerator
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/e512610e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/e512610e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/e512610e
Branch: refs/heads/devel-3
Commit: e512610ed6a84f7a4d0d904bd9e010ac56cf109c
Parents: 711fd07
Author: David Yan <da...@datatorrent.com>
Authored: Tue Sep 8 16:06:55 2015 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Tue Sep 8 18:00:54 2015 -0700
----------------------------------------------------------------------
.../stram/engine/WindowGenerator.java | 44 +++++++++++++-
.../stram/engine/WindowGeneratorTest.java | 62 +++++++++++++++++---
2 files changed, 96 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/e512610e/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java b/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
index e0ea4d0..83f4790 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
@@ -269,6 +269,34 @@ public class WindowGenerator extends MuxReservoir implements Stream, Runnable
return getWindowMillis(windowId, firstWindowMillis, windowWidthMillis) + windowWidthMillis;
}
+ public static long getNextWindowId(long windowId, long firstWindowMillis, long windowWidthMillis)
+ {
+ return getAheadWindowId(windowId, firstWindowMillis, windowWidthMillis, 1);
+ }
+
+ public static long getAheadWindowId(long windowId, long firstWindowMillis, long windowWidthMillis, int ahead)
+ {
+ long millis = getWindowMillis(windowId, firstWindowMillis, windowWidthMillis);
+ millis += ahead * windowWidthMillis;
+ return getWindowId(millis, firstWindowMillis, windowWidthMillis);
+ }
+
+ /**
+ * Returns the number of windows windowIdA is ahead of windowIdB.
+ *
+ * @param windowIdA
+ * @param windowIdB
+ * @param firstWindowMillis
+ * @param windowWidthMillis
+ * @return the number of windows ahead, negative if windowIdA is behind windowIdB
+ */
+ public static long compareWindowId(long windowIdA, long windowIdB, long firstWindowMillis, long windowWidthMillis)
+ {
+ long millisA = getWindowMillis(windowIdA, firstWindowMillis, windowWidthMillis);
+ long millisB = getWindowMillis(windowIdB, firstWindowMillis, windowWidthMillis);
+ return (millisA - millisB) / windowWidthMillis;
+ }
+
/**
* @param windowId
* @param firstWindowMillis
@@ -277,9 +305,19 @@ public class WindowGenerator extends MuxReservoir implements Stream, Runnable
*/
public static long getWindowMillis(long windowId, long firstWindowMillis, long windowWidthMillis)
{
- long millis = (windowId >> 32) * 1000 + windowWidthMillis * (windowId & WindowGenerator.WINDOW_MASK);
- millis = millis > firstWindowMillis ? millis : firstWindowMillis;
- return millis;
+ if (windowId == -1) {
+ return firstWindowMillis;
+ }
+ long baseMillis = (windowId >> 32) * 1000;
+ long diff = baseMillis - firstWindowMillis;
+ long baseChangeInterval = windowWidthMillis * (WindowGenerator.MAX_WINDOW_ID + 1);
+ long multiplier = diff / baseChangeInterval;
+ if (diff % baseChangeInterval > 0) {
+ multiplier++;
+ }
+ assert (multiplier >= 0);
+ windowId = windowId & WindowGenerator.WINDOW_MASK;
+ return firstWindowMillis + (multiplier * windowWidthMillis * (WindowGenerator.MAX_WINDOW_ID + 1)) + windowId * windowWidthMillis;
}
private class MasterReservoir extends CircularBuffer<Tuple> implements Reservoir
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/e512610e/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java b/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
index 4665d79..1ef473d 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
@@ -321,16 +321,64 @@ public class WindowGeneratorTest
{
long first = 1431714014000L;
- long time1 = WindowGenerator.getWindowMillis(6149164867354886271L, first, 500);
- long time2 = WindowGenerator.getWindowMillis(6149164867354886272L, first, 500);
+ for (int windowWidthMillis : new int[]{500, 123}) {
+ long time1 = WindowGenerator.getWindowMillis(6149164867354886271L, first, windowWidthMillis);
+ long time2 = WindowGenerator.getWindowMillis(6149164867354886272L, first, windowWidthMillis);
- long window1 = WindowGenerator.getWindowId(time1, first, 500);
- long window2 = WindowGenerator.getWindowId(time2, first, 500);
+ long window1 = WindowGenerator.getWindowId(time1, first, windowWidthMillis);
+ long window2 = WindowGenerator.getWindowId(time2, first, windowWidthMillis);
- Assert.assertEquals("window 1", 6149164867354886271L, window1);
- Assert.assertEquals("window 2", 6149164867354886272L, window2);
+ Assert.assertEquals("window 1", 6149164867354886271L, window1);
+ Assert.assertEquals("window 2", 6149164867354886272L, window2);
- Assert.assertTrue(time2 > time1);
+ Assert.assertEquals("window millis difference", windowWidthMillis, time2 - time1);
+ }
+ }
+
+ @Test
+ public void testWindowToTimeBaseSecondRollover()
+ {
+ long first = 1431714014123L;
+
+ for (int windowWidthMillis : new int[]{500, 123}) {
+ long window1 = WindowGenerator.getWindowId(first, first, windowWidthMillis);
+ window1 |= WindowGenerator.MAX_WINDOW_ID;
+ long window2 = WindowGenerator.getNextWindowId(window1, first, windowWidthMillis);
+ Assert.assertTrue("base seconds should be greater during an rollover", (window2 >> 32) > (window1 >> 32));
+ long time1 = WindowGenerator.getWindowMillis(window1, first, windowWidthMillis);
+ long time2 = WindowGenerator.getWindowMillis(window2, first, windowWidthMillis);
+
+ Assert.assertEquals("max window id", WindowGenerator.MAX_WINDOW_ID, window1 & WindowGenerator.WINDOW_MASK);
+ Assert.assertEquals("rollover after max", 0, window2 & WindowGenerator.WINDOW_MASK);
+ Assert.assertEquals("window millis difference", windowWidthMillis, time2 - time1);
+ }
+ }
+
+ @Test
+ public void testWindowIdAhead()
+ {
+ long first = 1431714014123L;
+ int ahead = 678;
+ for (int windowWidthMillis : new int[]{500, 123}) {
+ long window1 = WindowGenerator.getWindowId(first, first, windowWidthMillis);
+ long window2 = WindowGenerator.getAheadWindowId(window1, first, windowWidthMillis, ahead);
+ for (int i = 0; i < ahead; i++) {
+ window1 = WindowGenerator.getNextWindowId(window1, first, windowWidthMillis);
+ }
+ Assert.assertEquals(window2, window1);
+ }
+ }
+
+ @Test
+ public void testWindowIdCompare()
+ {
+ long first = 1431714014123L;
+ int ahead = 341;
+ for (int windowWidthMillis : new int[]{500, 123}) {
+ long window1 = WindowGenerator.getWindowId(first, first, windowWidthMillis);
+ long window2 = WindowGenerator.getAheadWindowId(window1, first, windowWidthMillis, ahead);
+ Assert.assertEquals(ahead, WindowGenerator.compareWindowId(window2, window1, first, windowWidthMillis));
+ }
}
public static final Logger logger = LoggerFactory.getLogger(WindowGeneratorTest.class);