You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/06/22 05:29:38 UTC

[2/2] incubator-beam git commit: Update OffsetRangeTracker progress tracking and start offset

Update OffsetRangeTracker progress tracking and start offset


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ee0a3bf2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ee0a3bf2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ee0a3bf2

Branch: refs/heads/master
Commit: ee0a3bf2acc0213e27d3b3d1353c27b977046577
Parents: 741ef26
Author: Ian Zhou <ia...@google.com>
Authored: Tue Jun 21 17:23:09 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jun 21 22:29:25 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/range/OffsetRangeTracker.java   | 13 +--
 .../beam/sdk/io/OffsetBasedSourceTest.java      |  5 +-
 .../sdk/io/range/OffsetRangeTrackerTest.java    | 91 +++++++++++++++-----
 .../sdk/io/gcp/bigtable/BigtableIOTest.java     |  3 -
 4 files changed, 82 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ee0a3bf2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java
index 76790af..a8d00ee 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java
@@ -32,7 +32,7 @@ import org.slf4j.LoggerFactory;
 public class OffsetRangeTracker implements RangeTracker<Long> {
   private static final Logger LOG = LoggerFactory.getLogger(OffsetRangeTracker.class);
 
-  private final long startOffset;
+  private long startOffset;
   private long stopOffset;
   private long lastRecordStart = -1L;
   private long offsetOfLastSplitPoint = -1L;
@@ -101,6 +101,9 @@ public class OffsetRangeTracker implements RangeTracker<Long> {
               lastRecordStart));
     }
 
+    if (lastRecordStart == -1) {
+      startOffset = recordStart;
+    }
     lastRecordStart = recordStart;
 
     if (isAtSplitPoint) {
@@ -165,7 +168,7 @@ public class OffsetRangeTracker implements RangeTracker<Long> {
       throw new IllegalArgumentException(
           "getPositionForFractionConsumed is not applicable to an unbounded range: " + this);
     }
-    return (long) Math.ceil(startOffset + fraction * (stopOffset - startOffset));
+    return (long) Math.floor(startOffset + fraction * (stopOffset - startOffset));
   }
 
   @Override
@@ -179,11 +182,11 @@ public class OffsetRangeTracker implements RangeTracker<Long> {
     } else if (lastRecordStart >= stopOffset) {
       return 1.0;
     } else {
-      // E.g., when reading [3, 6) and lastRecordStart is 4, that means we consumed 3,4 of 3,4,5
-      // which is (4 - 3 + 1) / (6 - 3) = 67%.
+      // E.g., when reading [3, 6) and lastRecordStart is 4, that means we consumed 3 of 3,4,5
+      // which is (4 - 3) / (6 - 3) = 33%.
       // Also, clamp to at most 1.0 because the last consumed position can extend past the
       // stop position.
-      return Math.min(1.0, 1.0 * (lastRecordStart - startOffset + 1) / (stopOffset - startOffset));
+      return Math.min(1.0, 1.0 * (lastRecordStart - startOffset) / (stopOffset - startOffset));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ee0a3bf2/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
index 66abd33..7009023 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
@@ -218,13 +218,14 @@ public class OffsetBasedSourceTest {
 
       assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
       assertTrue(reader.start());
-      do {
+      items.add(reader.getCurrent());
+      while (reader.advance()) {
         Double fraction = reader.getFractionConsumed();
         assertNotNull(fraction);
         assertTrue(fraction.toString(), fraction > 0.0);
         assertTrue(fraction.toString(), fraction <= 1.0);
         items.add(reader.getCurrent());
-      } while (reader.advance());
+      }
       assertEquals(1.0, reader.getFractionConsumed(), 1e-6);
 
       assertEquals(20, items.size());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ee0a3bf2/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/OffsetRangeTrackerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/OffsetRangeTrackerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/OffsetRangeTrackerTest.java
index edd4c4f..0c2c639 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/OffsetRangeTrackerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/OffsetRangeTrackerTest.java
@@ -36,6 +36,17 @@ public class OffsetRangeTrackerTest {
   @Rule public final ExpectedException expected = ExpectedException.none();
 
   @Test
+  public void testUpdateStartOffset() throws Exception {
+    OffsetRangeTracker tracker = new OffsetRangeTracker(100, 200);
+    assertEquals(100, tracker.getStartPosition().longValue());
+    // Update start offset to first record returned
+    assertTrue(tracker.tryReturnRecordAt(true, 150));
+    assertEquals(150, tracker.getStartPosition().longValue());
+    assertTrue(tracker.tryReturnRecordAt(true, 180));
+    assertEquals(150, tracker.getStartPosition().longValue());
+  }
+
+  @Test
   public void testTryReturnRecordSimpleSparse() throws Exception {
     OffsetRangeTracker tracker = new OffsetRangeTracker(100, 200);
     assertTrue(tracker.tryReturnRecordAt(true, 110));
@@ -113,28 +124,48 @@ public class OffsetRangeTrackerTest {
   public void testGetPositionForFractionDense() throws Exception {
     // Represents positions 3, 4, 5.
     OffsetRangeTracker tracker = new OffsetRangeTracker(3, 6);
-    // [3, 3) represents 0.0 of [3, 6)
+    // [3, 3) represents from [0, 1/3) fraction of [3, 6)
     assertEquals(3, tracker.getPositionForFractionConsumed(0.0));
-    // [3, 4) represents up to 1/3 of [3, 6)
-    assertEquals(4, tracker.getPositionForFractionConsumed(1.0 / 6));
-    assertEquals(4, tracker.getPositionForFractionConsumed(0.333));
-    // [3, 5) represents up to 2/3 of [3, 6)
-    assertEquals(5, tracker.getPositionForFractionConsumed(0.334));
-    assertEquals(5, tracker.getPositionForFractionConsumed(0.666));
-    // any fraction consumed over 2/3 means the whole [3, 6) has been consumed.
-    assertEquals(6, tracker.getPositionForFractionConsumed(0.667));
+    assertEquals(3, tracker.getPositionForFractionConsumed(1.0 / 6));
+    assertEquals(3, tracker.getPositionForFractionConsumed(0.333));
+    // [3, 4) represents from [0, 2/3) fraction of [3, 6)
+    assertEquals(4, tracker.getPositionForFractionConsumed(0.334));
+    assertEquals(4, tracker.getPositionForFractionConsumed(0.666));
+    // [3, 5) represents from [0, 1) fraction of [3, 6)
+    assertEquals(5, tracker.getPositionForFractionConsumed(0.667));
+    assertEquals(5, tracker.getPositionForFractionConsumed(0.999));
+    // The whole [3, 6) is consumed for fraction 1
+    assertEquals(6, tracker.getPositionForFractionConsumed(1.0));
+  }
+
+  @Test
+  public void testGetPositionForFractionDenseUpdateStartOffset() throws Exception {
+    // Represents positions 3, 4, 5.
+    OffsetRangeTracker tracker = new OffsetRangeTracker(3, 6);
+    // [3, 3) represents from [0, 1/3) fraction of [3, 6)
+    assertEquals(3, tracker.getPositionForFractionConsumed(0.333));
+    // Update start offset to 4
+    assertTrue(tracker.tryReturnRecordAt(true, 4));
+    // [4, 4) represents from [0, 1/2) fraction of [4, 6)
+    assertEquals(4, tracker.getPositionForFractionConsumed(0.0));
+    assertEquals(4, tracker.getPositionForFractionConsumed(0.499));
+    // [4, 5) represents from [0, 1) fraction of [4, 6)
+    assertEquals(5, tracker.getPositionForFractionConsumed(0.5));
+    assertEquals(5, tracker.getPositionForFractionConsumed(0.999));
+    // The whole [4, 6) is consumed for fraction 1
+    assertEquals(6, tracker.getPositionForFractionConsumed(1.0));
   }
 
   @Test
   public void testGetFractionConsumedDense() throws Exception {
     OffsetRangeTracker tracker = new OffsetRangeTracker(3, 6);
-    assertEquals(0, tracker.getFractionConsumed(), 1e-6);
+    assertEquals(0.0, tracker.getFractionConsumed(), 1e-6);
     assertTrue(tracker.tryReturnRecordAt(true, 3));
-    assertEquals(1.0 / 3, tracker.getFractionConsumed(), 1e-6);
+    assertEquals(0.0, tracker.getFractionConsumed(), 1e-6);
     assertTrue(tracker.tryReturnRecordAt(true, 4));
-    assertEquals(2.0 / 3, tracker.getFractionConsumed(), 1e-6);
+    assertEquals(1.0 / 3, tracker.getFractionConsumed(), 1e-6);
     assertTrue(tracker.tryReturnRecordAt(true, 5));
-    assertEquals(1.0, tracker.getFractionConsumed(), 1e-6);
+    assertEquals(2.0 / 3, tracker.getFractionConsumed(), 1e-6);
     assertTrue(tracker.tryReturnRecordAt(false /* non-split-point */, 6));
     assertEquals(1.0, tracker.getFractionConsumed(), 1e-6);
     assertTrue(tracker.tryReturnRecordAt(false /* non-split-point */, 7));
@@ -145,14 +176,33 @@ public class OffsetRangeTrackerTest {
   @Test
   public void testGetFractionConsumedSparse() throws Exception {
     OffsetRangeTracker tracker = new OffsetRangeTracker(100, 200);
-    assertEquals(0, tracker.getFractionConsumed(), 1e-6);
+    assertEquals(0.0, tracker.getFractionConsumed(), 1e-6);
+    assertTrue(tracker.tryReturnRecordAt(true, 100));
+    assertEquals(0.0, tracker.getFractionConsumed(), 1e-6);
     assertTrue(tracker.tryReturnRecordAt(true, 110));
-    // Consumed positions through 110 = total 11 positions of 100.
-    assertEquals(0.11, tracker.getFractionConsumed(), 1e-6);
+    // Consumed positions through 109 = total 10 positions of 100.
+    assertEquals(0.1, tracker.getFractionConsumed(), 1e-6);
+    assertTrue(tracker.tryReturnRecordAt(true, 150));
+    assertEquals(0.5, tracker.getFractionConsumed(), 1e-6);
+    assertTrue(tracker.tryReturnRecordAt(true, 195));
+    assertEquals(0.95, tracker.getFractionConsumed(), 1e-6);
+    assertFalse(tracker.tryReturnRecordAt(true, 200));
+    assertEquals(1.0, tracker.getFractionConsumed(), 1e-6);
+  }
+
+  @Test
+  public void testGetFractionConsumedUpdateStartOffset() throws Exception {
+    OffsetRangeTracker tracker = new OffsetRangeTracker(100, 200);
     assertTrue(tracker.tryReturnRecordAt(true, 150));
-    assertEquals(0.51, tracker.getFractionConsumed(), 1e-6);
+    assertEquals(0.0, tracker.getFractionConsumed(), 1e-6);
+    assertTrue(tracker.tryReturnRecordAt(true, 160));
+    assertEquals(0.2, tracker.getFractionConsumed(), 1e-6);
+    assertTrue(tracker.tryReturnRecordAt(true, 180));
+    assertEquals(0.6, tracker.getFractionConsumed(), 1e-6);
     assertTrue(tracker.tryReturnRecordAt(true, 195));
-    assertEquals(0.96, tracker.getFractionConsumed(), 1e-6);
+    assertEquals(0.9, tracker.getFractionConsumed(), 1e-6);
+    assertFalse(tracker.tryReturnRecordAt(true, 200));
+    assertEquals(1.0, tracker.getFractionConsumed(), 1e-6);
   }
 
   @Test
@@ -172,15 +222,16 @@ public class OffsetRangeTrackerTest {
 
   @Test
   public void testTryReturnFirstRecordNotSplitPoint() throws Exception {
+    OffsetRangeTracker tracker = new OffsetRangeTracker(100, 200);
     expected.expect(IllegalStateException.class);
-    new OffsetRangeTracker(100, 200).tryReturnRecordAt(false, 120);
+    tracker.tryReturnRecordAt(false, 120);
   }
 
   @Test
   public void testTryReturnRecordNonMonotonic() throws Exception {
     OffsetRangeTracker tracker = new OffsetRangeTracker(100, 200);
-    expected.expect(IllegalStateException.class);
     tracker.tryReturnRecordAt(true, 120);
+    expected.expect(IllegalStateException.class);
     tracker.tryReturnRecordAt(true, 110);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ee0a3bf2/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index 4cb30b4..c09943b 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -289,9 +289,6 @@ public class BigtableIOTest {
 
   /**
    * Tests dynamic work rebalancing exhaustively.
-   *
-   * <p>Because this test runs so slowly, it is disabled by default. Re-run when changing the
-   * {@link BigtableIO.Read} implementation.
    */
   @Test
   public void testReadingSplitAtFractionExhaustive() throws Exception {