You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/07/06 17:20:29 UTC

[23/50] [abbrv] incubator-beam git commit: Added getSplitPointsConsumed() to ByteKeyRangeTracker

Added getSplitPointsConsumed() to ByteKeyRangeTracker


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ef9d1955
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ef9d1955
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ef9d1955

Branch: refs/heads/runners-spark2
Commit: ef9d1955fcdaab28633ea2d081ad342471a23359
Parents: 53c8bff
Author: Ian Zhou <ia...@google.com>
Authored: Tue Jun 21 17:34:20 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Jul 6 10:18:51 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/range/ByteKeyRangeTracker.java  | 63 +++++++++++++++++++-
 .../sdk/io/range/ByteKeyRangeTrackerTest.java   | 58 +++++++++++++++++-
 2 files changed, 118 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ef9d1955/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKeyRangeTracker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKeyRangeTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKeyRangeTracker.java
index b165924..7dd9a2c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKeyRangeTracker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKeyRangeTracker.java
@@ -18,6 +18,9 @@
 package org.apache.beam.sdk.io.range;
 
 import static com.google.common.base.MoreObjects.toStringHelper;
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,6 +41,10 @@ public final class ByteKeyRangeTracker implements RangeTracker<ByteKey> {
     return new ByteKeyRangeTracker(range);
   }
 
+  public synchronized boolean isDone() {
+    return done;
+  }
+
   @Override
   public synchronized ByteKey getStartPosition() {
     return range.getStartKey();
@@ -55,13 +62,28 @@ public final class ByteKeyRangeTracker implements RangeTracker<ByteKey> {
 
   @Override
   public synchronized boolean tryReturnRecordAt(boolean isAtSplitPoint, ByteKey recordStart) {
-    if (isAtSplitPoint && !range.containsKey(recordStart)) {
+    if (done) {
       return false;
     }
+
+    checkState(!(position == null && !isAtSplitPoint), "The first record must be at a split point");
+    checkState(!(recordStart.compareTo(range.getStartKey()) < 0),
+        "Trying to return record which is before the start key");
+    checkState(!(position != null && recordStart.compareTo(position) < 0),
+        "Trying to return record which is before the last-returned record");
+
     if (position == null) {
       range = range.withStartKey(recordStart);
     }
     position = recordStart;
+
+    if (isAtSplitPoint) {
+      if (!range.containsKey(recordStart)) {
+        done = true;
+        return false;
+      }
+      ++splitPointsSeen;
+    }
     return true;
   }
 
@@ -107,13 +129,50 @@ public final class ByteKeyRangeTracker implements RangeTracker<ByteKey> {
     return range.estimateFractionForKey(position);
   }
 
+  public synchronized long getSplitPointsConsumed() {
+    if (position == null) {
+      return 0;
+    } else if (isDone()) {
+      return splitPointsSeen;
+    } else {
+      // There is a current split point, and it has not finished processing.
+      checkState(
+          splitPointsSeen > 0,
+          "A started rangeTracker should have seen > 0 split points (is %s)",
+          splitPointsSeen);
+      return splitPointsSeen - 1;
+    }
+  }
+
   ///////////////////////////////////////////////////////////////////////////////
   private ByteKeyRange range;
   @Nullable private ByteKey position;
+  private long splitPointsSeen;
+  private boolean done;
 
   private ByteKeyRangeTracker(ByteKeyRange range) {
     this.range = range;
-    this.position = null;
+    position = null;
+    splitPointsSeen = 0L;
+    done = false;
+  }
+
+  /**
+   * Marks this range tracker as being done. Specifically, this will mark the current split point,
+   * if one exists, as being finished.
+   *
+   * <p>Always returns false, so that it can be used in an implementation of
+   * {@link BoundedReader#start()} or {@link BoundedReader#advance()} as follows:
+   *
+   * <pre> {@code
+   * public boolean start() {
+   *   return startImpl() && rangeTracker.tryReturnRecordAt(isAtSplitPoint, position)
+   *       || rangeTracker.markDone();
+   * }} </pre>
+   */
+  public synchronized boolean markDone() {
+    done = true;
+    return false;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ef9d1955/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTrackerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTrackerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTrackerTest.java
index 4404414..8deaf44 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTrackerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTrackerTest.java
@@ -21,14 +21,18 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 /** Tests for {@link ByteKeyRangeTracker}. */
 @RunWith(JUnit4.class)
 public class ByteKeyRangeTrackerTest {
+  private static final ByteKey BEFORE_START_KEY = ByteKey.of(0x11);
   private static final ByteKey INITIAL_START_KEY = ByteKey.of(0x12);
+  private static final ByteKey AFTER_START_KEY = ByteKey.of(0x13);
   private static final ByteKey INITIAL_MIDDLE_KEY = ByteKey.of(0x23);
   private static final ByteKey NEW_START_KEY = ByteKey.of(0x14);
   private static final ByteKey NEW_MIDDLE_KEY = ByteKey.of(0x24);
@@ -39,6 +43,8 @@ public class ByteKeyRangeTrackerTest {
   private static final double NEW_RANGE_SIZE = 0x34 - 0x14;
   private static final ByteKeyRange NEW_RANGE = ByteKeyRange.of(NEW_START_KEY, END_KEY);
 
+  @Rule public final ExpectedException expected = ExpectedException.none();
+
   /** Tests for {@link ByteKeyRangeTracker#toString}. */
   @Test
   public void testToString() {
@@ -127,7 +133,30 @@ public class ByteKeyRangeTrackerTest {
 
     assertFalse(tracker.tryReturnRecordAt(true, END_KEY)); // after end
 
-    assertTrue(tracker.tryReturnRecordAt(true, BEFORE_END_KEY)); // still succeeds
+    assertFalse(tracker.tryReturnRecordAt(true, BEFORE_END_KEY)); // false because done
+  }
+
+  @Test
+  public void testTryReturnFirstRecordNotSplitPoint() {
+    ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(INITIAL_RANGE);
+    expected.expect(IllegalStateException.class);
+    tracker.tryReturnRecordAt(false, INITIAL_START_KEY);
+  }
+
+  @Test
+  public void testTryReturnBeforeStartKey() {
+    ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(INITIAL_RANGE);
+    expected.expect(IllegalStateException.class);
+    tracker.tryReturnRecordAt(true, BEFORE_START_KEY);
+  }
+
+  @Test
+  public void testTryReturnBeforeLastReturnedRecord() {
+    ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(INITIAL_RANGE);
+    assertTrue(tracker.tryReturnRecordAt(true, INITIAL_START_KEY));
+    assertTrue(tracker.tryReturnRecordAt(true, INITIAL_MIDDLE_KEY));
+    expected.expect(IllegalStateException.class);
+    tracker.tryReturnRecordAt(true, AFTER_START_KEY);
   }
 
   /** Tests for {@link ByteKeyRangeTracker#trySplitAtPosition}. */
@@ -151,4 +180,31 @@ public class ByteKeyRangeTrackerTest {
     assertFalse(tracker.trySplitAtPosition(INITIAL_MIDDLE_KEY));
     assertTrue(tracker.tryReturnRecordAt(true, INITIAL_MIDDLE_KEY));
   }
+
+  /** Tests for {@link ByteKeyRangeTracker#getSplitPointsConsumed()}. */
+  @Test
+  public void testGetSplitPointsConsumed() {
+    ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(INITIAL_RANGE);
+    assertEquals(0, tracker.getSplitPointsConsumed());
+
+    // Started, 0 split points consumed
+    assertTrue(tracker.tryReturnRecordAt(true, INITIAL_START_KEY));
+    assertEquals(0, tracker.getSplitPointsConsumed());
+
+    // Processing new split point, 1 split point consumed
+    assertTrue(tracker.tryReturnRecordAt(true, AFTER_START_KEY));
+    assertEquals(1, tracker.getSplitPointsConsumed());
+
+    // Processing new non-split point, 1 split point consumed
+    assertTrue(tracker.tryReturnRecordAt(false, INITIAL_MIDDLE_KEY));
+    assertEquals(1, tracker.getSplitPointsConsumed());
+
+    // Processing new split point, 2 split points consumed
+    assertTrue(tracker.tryReturnRecordAt(true, BEFORE_END_KEY));
+    assertEquals(2, tracker.getSplitPointsConsumed());
+
+    // Mark tracker as done, 3 split points consumed
+    tracker.markDone();
+    assertEquals(3, tracker.getSplitPointsConsumed());
+  }
 }