You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2020/04/24 01:54:48 UTC
[beam] branch master updated: [BEAM-8871] Support trySplit for
ByteKeyRangeTracker
This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 89958d8 [BEAM-8871] Support trySplit for ByteKeyRangeTracker
new 28fd597 Merge pull request #11454 from boyuanzz/restriction_tracker
89958d8 is described below
commit 89958d8311b7225be51f3d86f03cb58581279372
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Fri Apr 17 14:19:26 2020 -0700
[BEAM-8871] Support trySplit for ByteKeyRangeTracker
---
.../splittabledofn/ByteKeyRangeTracker.java | 78 +++++++++++-----
.../splittabledofn/RestrictionTracker.java | 8 +-
.../apache/beam/sdk/io/range/ByteKeyRangeTest.java | 7 +-
.../splittabledofn/ByteKeyRangeTrackerTest.java | 101 ++++++++++++++++++---
4 files changed, 157 insertions(+), 37 deletions(-)
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java
index 86e1309..ddbe9bd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java
@@ -65,31 +65,61 @@ public class ByteKeyRangeTracker extends RestrictionTracker<ByteKeyRange, ByteKe
@Override
public SplitResult<ByteKeyRange> trySplit(double fractionOfRemainder) {
- // TODO(BEAM-8871): Add support for splitting off a fixed amount of work for this restriction
- // instead of only supporting checkpointing.
+ // No split on an empty range.
+ if (NO_KEYS.equals(range)
+ || (!range.getEndKey().isEmpty() && range.getStartKey().equals(range.getEndKey()))) {
+ return null;
+ }
+ // There is no more remaining work after the entire range has been claimed.
+ if (lastAttemptedKey != null && lastAttemptedKey.isEmpty()) {
+ return null;
+ }
- // If we haven't done any work, we should return the original range we were processing
- // as the checkpoint.
- if (lastAttemptedKey == null) {
- ByteKeyRange rval = range;
- // We update our current range to an interval that contains no elements.
- range = NO_KEYS;
- return SplitResult.of(range, rval);
+ ByteKey unprocessedRangeStartKey =
+ (lastAttemptedKey == null) ? range.getStartKey() : next(lastAttemptedKey);
+ ByteKey endKey = range.getEndKey();
+ // There is no more space for split.
+ if (!endKey.isEmpty() && unprocessedRangeStartKey.compareTo(endKey) >= 0) {
+ return null;
}
- // Return an empty range if the current range is done.
- if (lastAttemptedKey.isEmpty()
- || !(range.getEndKey().isEmpty() || range.getEndKey().compareTo(lastAttemptedKey) > 0)) {
- return SplitResult.of(range, NO_KEYS);
+ // Treat checkpoint specially because {@link ByteKeyRange#interpolateKey} computes a key with
+ // trailing zeros when fraction is 0.
+ if (fractionOfRemainder == 0.0) {
+ // If we haven't done any work, we should return the original range we were processing
+ // as the checkpoint.
+ if (lastAttemptedKey == null) {
+ // We update our current range to an interval that contains no elements.
+ ByteKeyRange rval = range;
+ range =
+ range.getStartKey().isEmpty()
+ ? NO_KEYS
+ : ByteKeyRange.of(range.getStartKey(), range.getStartKey());
+ return SplitResult.of(range, rval);
+ } else {
+ range = ByteKeyRange.of(range.getStartKey(), unprocessedRangeStartKey);
+ return SplitResult.of(range, ByteKeyRange.of(unprocessedRangeStartKey, endKey));
+ }
+ }
+
+ ByteKeyRange unprocessedRange = ByteKeyRange.of(unprocessedRangeStartKey, range.getEndKey());
+ ByteKey splitPos;
+ try {
+ // The interpolateKey shouldn't return empty key. Please refer to {@link
+ // ByteKeyRange#interpolateKey}.
+ splitPos = unprocessedRange.interpolateKey(fractionOfRemainder);
+ checkState(!splitPos.isEmpty());
+ } catch (Exception e) {
+ // There is no way to interpolate a key based on provided fraction.
+ return null;
+ }
+ // Computed splitPos is out of current tracking restriction.
+ if (!range.getEndKey().isEmpty() && splitPos.compareTo(range.getEndKey()) >= 0) {
+ return null;
}
- // Otherwise we compute the "remainder" of the range from the last key.
- assert lastAttemptedKey.equals(lastClaimedKey)
- : "Expect both keys to be equal since the last key attempted was a valid key in the range.";
- ByteKey nextKey = next(lastAttemptedKey);
- ByteKeyRange res = ByteKeyRange.of(nextKey, range.getEndKey());
- this.range = ByteKeyRange.of(range.getStartKey(), nextKey);
- return SplitResult.of(range, res);
+ range = ByteKeyRange.of(range.getStartKey(), splitPos);
+ return SplitResult.of(range, ByteKeyRange.of(splitPos, endKey));
}
/**
@@ -142,7 +172,8 @@ public class ByteKeyRangeTracker extends RestrictionTracker<ByteKeyRange, ByteKe
// Handle checking the empty range which is implicitly done.
// This case can occur if the range tracker is checkpointed before any keys have been claimed
// or if the range tracker is checkpointed once the range is done.
- if (NO_KEYS.equals(range)) {
+ if (NO_KEYS.equals(range)
+ || (!range.getEndKey().isEmpty() && range.getStartKey().equals(range.getEndKey()))) {
return;
}
@@ -157,6 +188,11 @@ public class ByteKeyRangeTracker extends RestrictionTracker<ByteKeyRange, ByteKe
return;
}
+ // The lastAttemptedKey is the last key of current restriction.
+ if (!range.getEndKey().isEmpty() && next(lastAttemptedKey).compareTo(range.getEndKey()) >= 0) {
+ return;
+ }
+
// If the last attempted key was not at or beyond the end of the range then throw.
if (range.getEndKey().isEmpty() || range.getEndKey().compareTo(lastAttemptedKey) > 0) {
ByteKey nextKey = next(lastAttemptedKey);
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
index 614cb2c..7f17bdc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.transforms.splittabledofn;
import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.transforms.DoFn;
@@ -34,7 +35,7 @@ import org.apache.beam.sdk.transforms.DoFn;
public abstract class RestrictionTracker<RestrictionT, PositionT> {
/**
* Attempts to claim the block of work in the current restriction identified by the given
- * position.
+ * position. Each claimed position MUST be a valid split point.
*
* <p>If this succeeds, the DoFn MUST execute the entire block of work. If this fails:
*
@@ -83,8 +84,11 @@ public abstract class RestrictionTracker<RestrictionT, PositionT> {
*
* @param fractionOfRemainder A hint as to the fraction of work the primary restriction should
* represent based upon the current known remaining amount of work.
- * @return a {@link SplitResult} if a split was possible, otherwise returns {@code null}.
+ * @return a {@link SplitResult} if a split was possible, otherwise returns {@code null}. If the
+ * {@code fractionOfRemainder == 0}, a {@code null} result MUST imply that the restriction
+ * tracker is done and there is no more work left to do.
*/
+ @Nullable
public abstract SplitResult<RestrictionT> trySplit(double fractionOfRemainder);
/**
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTest.java
index 0025e3e..66a7d5b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTest.java
@@ -361,8 +361,11 @@ public class ByteKeyRangeTest {
assertThat("Too many hash collisions", collisions, lessThan(totalUnequalTests / 2));
}
- /** Asserts the two keys are equal except trailing zeros. */
- private static void assertEqualExceptPadding(ByteKey expected, ByteKey key) {
+ /**
+ * Asserts the two keys are equal except trailing zeros. Note that this can only be used for
+ * testing split logic. *
+ */
+ public static void assertEqualExceptPadding(ByteKey expected, ByteKey key) {
ByteBuffer shortKey = expected.getValue();
ByteBuffer longKey = key.getValue();
if (shortKey.remaining() > longKey.remaining()) {
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTrackerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTrackerTest.java
index a60f1e8..1ebb4fe 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTrackerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTrackerTest.java
@@ -17,9 +17,11 @@
*/
package org.apache.beam.sdk.transforms.splittabledofn;
+import static org.apache.beam.sdk.io.range.ByteKeyRangeTest.assertEqualExceptPadding;
import static org.apache.beam.sdk.transforms.splittabledofn.ByteKeyRangeTracker.next;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import org.apache.beam.sdk.io.range.ByteKey;
@@ -37,6 +39,21 @@ public class ByteKeyRangeTrackerTest {
@Rule public final ExpectedException expected = ExpectedException.none();
@Test
+ public void testTryClaimNoKeys() throws Exception {
+ ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(ByteKeyRangeTracker.NO_KEYS);
+ assertFalse(tracker.tryClaim(ByteKey.of(0x00)));
+ tracker.checkDone();
+ }
+
+ @Test
+ public void testTryClaimEmptyRange() throws Exception {
+ ByteKeyRangeTracker tracker =
+ ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0x10)));
+ assertFalse(tracker.tryClaim(ByteKey.of(0x10)));
+ tracker.checkDone();
+ }
+
+ @Test
public void testTryClaim() throws Exception {
ByteKeyRange range = ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0));
ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(range);
@@ -47,6 +64,7 @@ public class ByteKeyRangeTrackerTest {
assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
assertTrue(tracker.tryClaim(ByteKey.of(0x99)));
assertFalse(tracker.tryClaim(ByteKey.of(0xc0)));
+ tracker.checkDone();
}
@Test
@@ -58,7 +76,8 @@ public class ByteKeyRangeTrackerTest {
// We expect to get the original range back and that the current restriction
// is effectively made empty.
assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)), checkpoint);
- assertEquals(ByteKeyRangeTracker.NO_KEYS, tracker.currentRestriction());
+ assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0x10)), tracker.currentRestriction());
+ tracker.checkDone();
}
@Test
@@ -70,6 +89,23 @@ public class ByteKeyRangeTrackerTest {
// is effectively made empty.
assertEquals(ByteKeyRange.ALL_KEYS, checkpoint);
assertEquals(ByteKeyRangeTracker.NO_KEYS, tracker.currentRestriction());
+ tracker.checkDone();
+ }
+
+ @Test
+ public void testCheckpointUnstartedForNoKeysRange() throws Exception {
+ ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(ByteKeyRangeTracker.NO_KEYS);
+ assertNull(tracker.trySplit(0));
+ tracker.checkDone();
+ }
+
+ @Test
+ public void testCheckpointUnstartedForEmptyRange() throws Exception {
+ ByteKeyRangeTracker tracker =
+ ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0x10)));
+ assertNull(tracker.trySplit(0));
+ assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0x10)), tracker.currentRestriction());
+ tracker.checkDone();
}
@Test
@@ -77,9 +113,9 @@ public class ByteKeyRangeTrackerTest {
ByteKeyRangeTracker tracker =
ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)));
assertFalse(tracker.tryClaim(ByteKey.of(0xd0)));
- ByteKeyRange checkpoint = tracker.trySplit(0).getResidual();
+ assertNull(tracker.trySplit(0));
assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)), tracker.currentRestriction());
- assertEquals(ByteKeyRangeTracker.NO_KEYS, checkpoint);
+ tracker.checkDone();
}
@Test
@@ -91,6 +127,7 @@ public class ByteKeyRangeTrackerTest {
assertEquals(
ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0x10, 0x00)), tracker.currentRestriction());
assertEquals(ByteKeyRange.of(ByteKey.of(0x10, 0x00), ByteKey.of(0xc0)), checkpoint);
+ tracker.checkDone();
}
@Test
@@ -103,6 +140,7 @@ public class ByteKeyRangeTrackerTest {
assertEquals(
ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0x90, 0x00)), tracker.currentRestriction());
assertEquals(ByteKeyRange.of(ByteKey.of(0x90, 0x00), ByteKey.of(0xc0)), checkpoint);
+ tracker.checkDone();
}
@Test
@@ -112,9 +150,8 @@ public class ByteKeyRangeTrackerTest {
assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
assertFalse(tracker.tryClaim(ByteKey.of(0xc0)));
- ByteKeyRange checkpoint = tracker.trySplit(0).getResidual();
- assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)), tracker.currentRestriction());
- assertEquals(ByteKeyRangeTracker.NO_KEYS, checkpoint);
+ assertNull(tracker.trySplit(0));
+ tracker.checkDone();
}
@Test
@@ -123,9 +160,9 @@ public class ByteKeyRangeTrackerTest {
assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
assertFalse(tracker.tryClaim(ByteKey.EMPTY));
- ByteKeyRange checkpoint = tracker.trySplit(0).getResidual();
+ assertNull(tracker.trySplit(0));
assertEquals(ByteKeyRange.ALL_KEYS, tracker.currentRestriction());
- assertEquals(ByteKeyRangeTracker.NO_KEYS, checkpoint);
+ tracker.checkDone();
}
@Test
@@ -136,9 +173,9 @@ public class ByteKeyRangeTrackerTest {
assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
assertTrue(tracker.tryClaim(ByteKey.of(0xa0)));
assertFalse(tracker.tryClaim(ByteKey.of(0xd0)));
- ByteKeyRange checkpoint = tracker.trySplit(0).getResidual();
+ assertNull(tracker.trySplit(0));
assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)), tracker.currentRestriction());
- assertEquals(ByteKeyRangeTracker.NO_KEYS, checkpoint);
+ tracker.checkDone();
}
@Test
@@ -149,9 +186,43 @@ public class ByteKeyRangeTrackerTest {
assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
assertTrue(tracker.tryClaim(ByteKey.of(0xa0)));
assertFalse(tracker.tryClaim(ByteKey.EMPTY));
- ByteKeyRange checkpoint = tracker.trySplit(0).getResidual();
+ assertNull(tracker.trySplit(0));
assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)), tracker.currentRestriction());
- assertEquals(ByteKeyRangeTracker.NO_KEYS, checkpoint);
+ tracker.checkDone();
+ }
+
+ @Test
+ public void testTrySplit() throws Exception {
+ ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(ByteKeyRange.ALL_KEYS);
+ SplitResult<ByteKeyRange> res = tracker.trySplit(0.5);
+ assertKeyRangeEqualExceptPadding(
+ ByteKeyRange.of(ByteKey.EMPTY, ByteKey.of(0x80)), res.getPrimary());
+ assertKeyRangeEqualExceptPadding(
+ ByteKeyRange.of(ByteKey.of(0x80), ByteKey.EMPTY), res.getResidual());
+ tracker.tryClaim(ByteKey.of(0x00));
+ res = tracker.trySplit(0.5);
+ assertKeyRangeEqualExceptPadding(
+ ByteKeyRange.of(ByteKey.EMPTY, ByteKey.of(0x40)), res.getPrimary());
+ assertKeyRangeEqualExceptPadding(
+ ByteKeyRange.of(ByteKey.of(0x40), ByteKey.of(0x80)), res.getResidual());
+ assertNull(tracker.trySplit(1));
+ }
+
+ @Test
+ public void testTrySplitAtNoKeysRange() throws Exception {
+ ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(ByteKeyRangeTracker.NO_KEYS);
+ assertNull(tracker.trySplit(0));
+ assertNull(tracker.trySplit(1));
+ tracker.checkDone();
+ }
+
+ @Test
+ public void testTrySplitAtEmptyRange() throws Exception {
+ ByteKeyRangeTracker tracker =
+ ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0x10)));
+ assertNull(tracker.trySplit(0.5));
+ assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0x10)), tracker.currentRestriction());
+ tracker.checkDone();
}
@Test
@@ -296,4 +367,10 @@ public class ByteKeyRangeTrackerTest {
assertEquals(0.75, progress.getWorkCompleted(), 0.001);
assertEquals(0.25, progress.getWorkRemaining(), 0.001);
}
+
+ /** Asserts the two ByteKeyRange are equal except trailing zeros. */
+ private static void assertKeyRangeEqualExceptPadding(ByteKeyRange expected, ByteKeyRange key) {
+ assertEqualExceptPadding(expected.getStartKey(), key.getStartKey());
+ assertEqualExceptPadding(expected.getEndKey(), key.getEndKey());
+ }
}