You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/07/06 17:20:29 UTC
[23/50] [abbrv] incubator-beam git commit: Added
getSplitPointsConsumed() to ByteKeyRangeTracker
Added getSplitPointsConsumed() to ByteKeyRangeTracker
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ef9d1955
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ef9d1955
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ef9d1955
Branch: refs/heads/runners-spark2
Commit: ef9d1955fcdaab28633ea2d081ad342471a23359
Parents: 53c8bff
Author: Ian Zhou <ia...@google.com>
Authored: Tue Jun 21 17:34:20 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Jul 6 10:18:51 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/io/range/ByteKeyRangeTracker.java | 63 +++++++++++++++++++-
.../sdk/io/range/ByteKeyRangeTrackerTest.java | 58 +++++++++++++++++-
2 files changed, 118 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ef9d1955/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKeyRangeTracker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKeyRangeTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKeyRangeTracker.java
index b165924..7dd9a2c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKeyRangeTracker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKeyRangeTracker.java
@@ -18,6 +18,9 @@
package org.apache.beam.sdk.io.range;
import static com.google.common.base.MoreObjects.toStringHelper;
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,6 +41,10 @@ public final class ByteKeyRangeTracker implements RangeTracker<ByteKey> {
return new ByteKeyRangeTracker(range);
}
+ public synchronized boolean isDone() {
+ return done;
+ }
+
@Override
public synchronized ByteKey getStartPosition() {
return range.getStartKey();
@@ -55,13 +62,28 @@ public final class ByteKeyRangeTracker implements RangeTracker<ByteKey> {
@Override
public synchronized boolean tryReturnRecordAt(boolean isAtSplitPoint, ByteKey recordStart) {
- if (isAtSplitPoint && !range.containsKey(recordStart)) {
+ if (done) {
return false;
}
+
+ checkState(!(position == null && !isAtSplitPoint), "The first record must be at a split point");
+ checkState(!(recordStart.compareTo(range.getStartKey()) < 0),
+ "Trying to return record which is before the start key");
+ checkState(!(position != null && recordStart.compareTo(position) < 0),
+ "Trying to return record which is before the last-returned record");
+
if (position == null) {
range = range.withStartKey(recordStart);
}
position = recordStart;
+
+ if (isAtSplitPoint) {
+ if (!range.containsKey(recordStart)) {
+ done = true;
+ return false;
+ }
+ ++splitPointsSeen;
+ }
return true;
}
@@ -107,13 +129,50 @@ public final class ByteKeyRangeTracker implements RangeTracker<ByteKey> {
return range.estimateFractionForKey(position);
}
+ public synchronized long getSplitPointsConsumed() {
+ if (position == null) {
+ return 0;
+ } else if (isDone()) {
+ return splitPointsSeen;
+ } else {
+ // There is a current split point, and it has not finished processing.
+ checkState(
+ splitPointsSeen > 0,
+ "A started rangeTracker should have seen > 0 split points (is %s)",
+ splitPointsSeen);
+ return splitPointsSeen - 1;
+ }
+ }
+
///////////////////////////////////////////////////////////////////////////////
private ByteKeyRange range;
@Nullable private ByteKey position;
+ private long splitPointsSeen;
+ private boolean done;
private ByteKeyRangeTracker(ByteKeyRange range) {
this.range = range;
- this.position = null;
+ position = null;
+ splitPointsSeen = 0L;
+ done = false;
+ }
+
+ /**
+ * Marks this range tracker as being done. Specifically, this will mark the current split point,
+ * if one exists, as being finished.
+ *
+ * <p>Always returns false, so that it can be used in an implementation of
+ * {@link BoundedReader#start()} or {@link BoundedReader#advance()} as follows:
+ *
+ * <pre> {@code
+ * public boolean start() {
+ * return startImpl() && rangeTracker.tryReturnRecordAt(isAtSplitPoint, position)
+ * || rangeTracker.markDone();
+ * }} </pre>
+ */
+ public synchronized boolean markDone() {
+ done = true;
+ return false;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ef9d1955/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTrackerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTrackerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTrackerTest.java
index 4404414..8deaf44 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTrackerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTrackerTest.java
@@ -21,14 +21,18 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Tests for {@link ByteKeyRangeTracker}. */
@RunWith(JUnit4.class)
public class ByteKeyRangeTrackerTest {
+ private static final ByteKey BEFORE_START_KEY = ByteKey.of(0x11);
private static final ByteKey INITIAL_START_KEY = ByteKey.of(0x12);
+ private static final ByteKey AFTER_START_KEY = ByteKey.of(0x13);
private static final ByteKey INITIAL_MIDDLE_KEY = ByteKey.of(0x23);
private static final ByteKey NEW_START_KEY = ByteKey.of(0x14);
private static final ByteKey NEW_MIDDLE_KEY = ByteKey.of(0x24);
@@ -39,6 +43,8 @@ public class ByteKeyRangeTrackerTest {
private static final double NEW_RANGE_SIZE = 0x34 - 0x14;
private static final ByteKeyRange NEW_RANGE = ByteKeyRange.of(NEW_START_KEY, END_KEY);
+ @Rule public final ExpectedException expected = ExpectedException.none();
+
/** Tests for {@link ByteKeyRangeTracker#toString}. */
@Test
public void testToString() {
@@ -127,7 +133,30 @@ public class ByteKeyRangeTrackerTest {
assertFalse(tracker.tryReturnRecordAt(true, END_KEY)); // after end
- assertTrue(tracker.tryReturnRecordAt(true, BEFORE_END_KEY)); // still succeeds
+ assertFalse(tracker.tryReturnRecordAt(true, BEFORE_END_KEY)); // false because done
+ }
+
+ @Test
+ public void testTryReturnFirstRecordNotSplitPoint() {
+ ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(INITIAL_RANGE);
+ expected.expect(IllegalStateException.class);
+ tracker.tryReturnRecordAt(false, INITIAL_START_KEY);
+ }
+
+ @Test
+ public void testTryReturnBeforeStartKey() {
+ ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(INITIAL_RANGE);
+ expected.expect(IllegalStateException.class);
+ tracker.tryReturnRecordAt(true, BEFORE_START_KEY);
+ }
+
+ @Test
+ public void testTryReturnBeforeLastReturnedRecord() {
+ ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(INITIAL_RANGE);
+ assertTrue(tracker.tryReturnRecordAt(true, INITIAL_START_KEY));
+ assertTrue(tracker.tryReturnRecordAt(true, INITIAL_MIDDLE_KEY));
+ expected.expect(IllegalStateException.class);
+ tracker.tryReturnRecordAt(true, AFTER_START_KEY);
}
/** Tests for {@link ByteKeyRangeTracker#trySplitAtPosition}. */
@@ -151,4 +180,31 @@ public class ByteKeyRangeTrackerTest {
assertFalse(tracker.trySplitAtPosition(INITIAL_MIDDLE_KEY));
assertTrue(tracker.tryReturnRecordAt(true, INITIAL_MIDDLE_KEY));
}
+
+ /** Tests for {@link ByteKeyRangeTracker#getSplitPointsConsumed()}. */
+ @Test
+ public void testGetSplitPointsConsumed() {
+ ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(INITIAL_RANGE);
+ assertEquals(0, tracker.getSplitPointsConsumed());
+
+ // Started, 0 split points consumed
+ assertTrue(tracker.tryReturnRecordAt(true, INITIAL_START_KEY));
+ assertEquals(0, tracker.getSplitPointsConsumed());
+
+ // Processing new split point, 1 split point consumed
+ assertTrue(tracker.tryReturnRecordAt(true, AFTER_START_KEY));
+ assertEquals(1, tracker.getSplitPointsConsumed());
+
+ // Processing new non-split point, 1 split point consumed
+ assertTrue(tracker.tryReturnRecordAt(false, INITIAL_MIDDLE_KEY));
+ assertEquals(1, tracker.getSplitPointsConsumed());
+
+ // Processing new split point, 2 split points consumed
+ assertTrue(tracker.tryReturnRecordAt(true, BEFORE_END_KEY));
+ assertEquals(2, tracker.getSplitPointsConsumed());
+
+ // Mark tracker as done, 3 split points consumed
+ tracker.markDone();
+ assertEquals(3, tracker.getSplitPointsConsumed());
+ }
}