You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2015/09/09 09:37:34 UTC

incubator-apex-core git commit: APEX-98 #resolve fixed problem with precision in the window to time utility function in WindowGenerator

Repository: incubator-apex-core
Updated Branches:
  refs/heads/devel-3 711fd0708 -> e512610ed


APEX-98 #resolve fixed problem with precision in the window to time utility function in WindowGenerator


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/e512610e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/e512610e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/e512610e

Branch: refs/heads/devel-3
Commit: e512610ed6a84f7a4d0d904bd9e010ac56cf109c
Parents: 711fd07
Author: David Yan <da...@datatorrent.com>
Authored: Tue Sep 8 16:06:55 2015 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Tue Sep 8 18:00:54 2015 -0700

----------------------------------------------------------------------
 .../stram/engine/WindowGenerator.java           | 44 +++++++++++++-
 .../stram/engine/WindowGeneratorTest.java       | 62 +++++++++++++++++---
 2 files changed, 96 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/e512610e/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java b/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
index e0ea4d0..83f4790 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/WindowGenerator.java
@@ -269,6 +269,34 @@ public class WindowGenerator extends MuxReservoir implements Stream, Runnable
     return getWindowMillis(windowId, firstWindowMillis, windowWidthMillis) + windowWidthMillis;
   }
 
+  public static long getNextWindowId(long windowId, long firstWindowMillis, long windowWidthMillis)
+  {
+    return getAheadWindowId(windowId, firstWindowMillis, windowWidthMillis, 1);
+  }
+
+  public static long getAheadWindowId(long windowId, long firstWindowMillis, long windowWidthMillis, int ahead)
+  {
+    long millis = getWindowMillis(windowId, firstWindowMillis, windowWidthMillis);
+    millis += ahead * windowWidthMillis;
+    return getWindowId(millis, firstWindowMillis, windowWidthMillis);
+  }
+
+  /**
+   * Returns the number of windows windowIdA is ahead of windowIdB.
+   *
+   * @param windowIdA
+   * @param windowIdB
+   * @param firstWindowMillis
+   * @param windowWidthMillis
+   * @return the number of windows ahead, negative if windowIdA is behind windowIdB
+   */
+  public static long compareWindowId(long windowIdA, long windowIdB, long firstWindowMillis, long windowWidthMillis)
+  {
+    long millisA = getWindowMillis(windowIdA, firstWindowMillis, windowWidthMillis);
+    long millisB = getWindowMillis(windowIdB, firstWindowMillis, windowWidthMillis);
+    return (millisA - millisB) / windowWidthMillis;
+  }
+
   /**
    * @param windowId
    * @param firstWindowMillis
@@ -277,9 +305,19 @@ public class WindowGenerator extends MuxReservoir implements Stream, Runnable
    */
   public static long getWindowMillis(long windowId, long firstWindowMillis, long windowWidthMillis)
   {
-    long millis = (windowId >> 32) * 1000 + windowWidthMillis * (windowId & WindowGenerator.WINDOW_MASK);
-    millis = millis > firstWindowMillis ? millis : firstWindowMillis;
-    return millis;
+    if (windowId == -1) {
+      return firstWindowMillis;
+    }
+    long baseMillis = (windowId >> 32) * 1000;
+    long diff = baseMillis - firstWindowMillis;
+    long baseChangeInterval = windowWidthMillis * (WindowGenerator.MAX_WINDOW_ID + 1);
+    long multiplier = diff / baseChangeInterval;
+    if (diff % baseChangeInterval > 0) {
+      multiplier++;
+    }
+    assert (multiplier >= 0);
+    windowId = windowId & WindowGenerator.WINDOW_MASK;
+    return firstWindowMillis + (multiplier * windowWidthMillis * (WindowGenerator.MAX_WINDOW_ID + 1)) + windowId * windowWidthMillis;
   }
 
   private class MasterReservoir extends CircularBuffer<Tuple> implements Reservoir

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/e512610e/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java b/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
index 4665d79..1ef473d 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java
@@ -321,16 +321,64 @@ public class WindowGeneratorTest
   {
     long first = 1431714014000L;
 
-    long time1 = WindowGenerator.getWindowMillis(6149164867354886271L, first, 500);
-    long time2 = WindowGenerator.getWindowMillis(6149164867354886272L, first, 500);
+    for (int windowWidthMillis : new int[]{500, 123}) {
+      long time1 = WindowGenerator.getWindowMillis(6149164867354886271L, first, windowWidthMillis);
+      long time2 = WindowGenerator.getWindowMillis(6149164867354886272L, first, windowWidthMillis);
 
-    long window1 = WindowGenerator.getWindowId(time1, first, 500);
-    long window2 = WindowGenerator.getWindowId(time2, first, 500);
+      long window1 = WindowGenerator.getWindowId(time1, first, windowWidthMillis);
+      long window2 = WindowGenerator.getWindowId(time2, first, windowWidthMillis);
 
-    Assert.assertEquals("window 1", 6149164867354886271L, window1);
-    Assert.assertEquals("window 2", 6149164867354886272L, window2);
+      Assert.assertEquals("window 1", 6149164867354886271L, window1);
+      Assert.assertEquals("window 2", 6149164867354886272L, window2);
 
-    Assert.assertTrue(time2 > time1);
+      Assert.assertEquals("window millis difference", windowWidthMillis, time2 - time1);
+    }
+  }
+
+  @Test
+  public void testWindowToTimeBaseSecondRollover()
+  {
+    long first = 1431714014123L;
+
+    for (int windowWidthMillis : new int[]{500, 123}) {
+      long window1 = WindowGenerator.getWindowId(first, first, windowWidthMillis);
+      window1 |= WindowGenerator.MAX_WINDOW_ID;
+      long window2 = WindowGenerator.getNextWindowId(window1, first, windowWidthMillis);
+      Assert.assertTrue("base seconds should be greater during an rollover", (window2 >> 32) > (window1 >> 32));
+      long time1 = WindowGenerator.getWindowMillis(window1, first, windowWidthMillis);
+      long time2 = WindowGenerator.getWindowMillis(window2, first, windowWidthMillis);
+
+      Assert.assertEquals("max window id", WindowGenerator.MAX_WINDOW_ID, window1 & WindowGenerator.WINDOW_MASK);
+      Assert.assertEquals("rollover after max", 0, window2 & WindowGenerator.WINDOW_MASK);
+      Assert.assertEquals("window millis difference", windowWidthMillis, time2 - time1);
+    }
+  }
+
+  @Test
+  public void testWindowIdAhead()
+  {
+    long first = 1431714014123L;
+    int ahead = 678;
+    for (int windowWidthMillis : new int[]{500, 123}) {
+      long window1 = WindowGenerator.getWindowId(first, first, windowWidthMillis);
+      long window2 = WindowGenerator.getAheadWindowId(window1, first, windowWidthMillis, ahead);
+      for (int i = 0; i < ahead; i++) {
+        window1 = WindowGenerator.getNextWindowId(window1, first, windowWidthMillis);
+      }
+      Assert.assertEquals(window2, window1);
+    }
+  }
+
+  @Test
+  public void testWindowIdCompare()
+  {
+    long first = 1431714014123L;
+    int ahead = 341;
+    for (int windowWidthMillis : new int[]{500, 123}) {
+      long window1 = WindowGenerator.getWindowId(first, first, windowWidthMillis);
+      long window2 = WindowGenerator.getAheadWindowId(window1, first, windowWidthMillis, ahead);
+      Assert.assertEquals(ahead, WindowGenerator.compareWindowId(window2, window1, first, windowWidthMillis));
+    }
   }
 
   public static final Logger logger = LoggerFactory.getLogger(WindowGeneratorTest.class);