You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2020/04/24 01:54:48 UTC

[beam] branch master updated: [BEAM-8871] Support trySplit for ByteKeyRangeTracker

This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 89958d8  [BEAM-8871] Support trySplit for ByteKeyRangeTracker
     new 28fd597  Merge pull request #11454 from boyuanzz/restriction_tracker
89958d8 is described below

commit 89958d8311b7225be51f3d86f03cb58581279372
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Fri Apr 17 14:19:26 2020 -0700

    [BEAM-8871] Support trySplit for ByteKeyRangeTracker
---
 .../splittabledofn/ByteKeyRangeTracker.java        |  78 +++++++++++-----
 .../splittabledofn/RestrictionTracker.java         |   8 +-
 .../apache/beam/sdk/io/range/ByteKeyRangeTest.java |   7 +-
 .../splittabledofn/ByteKeyRangeTrackerTest.java    | 101 ++++++++++++++++++---
 4 files changed, 157 insertions(+), 37 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java
index 86e1309..ddbe9bd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java
@@ -65,31 +65,61 @@ public class ByteKeyRangeTracker extends RestrictionTracker<ByteKeyRange, ByteKe
 
   @Override
   public SplitResult<ByteKeyRange> trySplit(double fractionOfRemainder) {
-    // TODO(BEAM-8871): Add support for splitting off a fixed amount of work for this restriction
-    // instead of only supporting checkpointing.
+    // No split on an empty range.
+    if (NO_KEYS.equals(range)
+        || (!range.getEndKey().isEmpty() && range.getStartKey().equals(range.getEndKey()))) {
+      return null;
+    }
+    // There is no more remaining work after the entire range has been claimed.
+    if (lastAttemptedKey != null && lastAttemptedKey.isEmpty()) {
+      return null;
+    }
 
-    // If we haven't done any work, we should return the original range we were processing
-    // as the checkpoint.
-    if (lastAttemptedKey == null) {
-      ByteKeyRange rval = range;
-      // We update our current range to an interval that contains no elements.
-      range = NO_KEYS;
-      return SplitResult.of(range, rval);
+    ByteKey unprocessedRangeStartKey =
+        (lastAttemptedKey == null) ? range.getStartKey() : next(lastAttemptedKey);
+    ByteKey endKey = range.getEndKey();
+    // There is no more space for split.
+    if (!endKey.isEmpty() && unprocessedRangeStartKey.compareTo(endKey) >= 0) {
+      return null;
     }
 
-    // Return an empty range if the current range is done.
-    if (lastAttemptedKey.isEmpty()
-        || !(range.getEndKey().isEmpty() || range.getEndKey().compareTo(lastAttemptedKey) > 0)) {
-      return SplitResult.of(range, NO_KEYS);
+    // Treat checkpoint specially because {@link ByteKeyRange#interpolateKey} computes a key with
+    // trailing zeros when fraction is 0.
+    if (fractionOfRemainder == 0.0) {
+      // If we haven't done any work, we should return the original range we were processing
+      // as the checkpoint.
+      if (lastAttemptedKey == null) {
+        // We update our current range to an interval that contains no elements.
+        ByteKeyRange rval = range;
+        range =
+            range.getStartKey().isEmpty()
+                ? NO_KEYS
+                : ByteKeyRange.of(range.getStartKey(), range.getStartKey());
+        return SplitResult.of(range, rval);
+      } else {
+        range = ByteKeyRange.of(range.getStartKey(), unprocessedRangeStartKey);
+        return SplitResult.of(range, ByteKeyRange.of(unprocessedRangeStartKey, endKey));
+      }
+    }
+
+    ByteKeyRange unprocessedRange = ByteKeyRange.of(unprocessedRangeStartKey, range.getEndKey());
+    ByteKey splitPos;
+    try {
+      // The interpolateKey shouldn't return empty key. Please refer to {@link
+      // ByteKeyRange#interpolateKey}.
+      splitPos = unprocessedRange.interpolateKey(fractionOfRemainder);
+      checkState(!splitPos.isEmpty());
+    } catch (Exception e) {
+      // There is no way to interpolate a key based on provided fraction.
+      return null;
+    }
+    // Computed splitPos is out of current tracking restriction.
+    if (!range.getEndKey().isEmpty() && splitPos.compareTo(range.getEndKey()) >= 0) {
+      return null;
     }
 
-    // Otherwise we compute the "remainder" of the range from the last key.
-    assert lastAttemptedKey.equals(lastClaimedKey)
-        : "Expect both keys to be equal since the last key attempted was a valid key in the range.";
-    ByteKey nextKey = next(lastAttemptedKey);
-    ByteKeyRange res = ByteKeyRange.of(nextKey, range.getEndKey());
-    this.range = ByteKeyRange.of(range.getStartKey(), nextKey);
-    return SplitResult.of(range, res);
+    range = ByteKeyRange.of(range.getStartKey(), splitPos);
+    return SplitResult.of(range, ByteKeyRange.of(splitPos, endKey));
   }
 
   /**
@@ -142,7 +172,8 @@ public class ByteKeyRangeTracker extends RestrictionTracker<ByteKeyRange, ByteKe
     // Handle checking the empty range which is implicitly done.
     // This case can occur if the range tracker is checkpointed before any keys have been claimed
     // or if the range tracker is checkpointed once the range is done.
-    if (NO_KEYS.equals(range)) {
+    if (NO_KEYS.equals(range)
+        || (!range.getEndKey().isEmpty() && range.getStartKey().equals(range.getEndKey()))) {
       return;
     }
 
@@ -157,6 +188,11 @@ public class ByteKeyRangeTracker extends RestrictionTracker<ByteKeyRange, ByteKe
       return;
     }
 
+    // The lastAttemptedKey is the last key of current restriction.
+    if (!range.getEndKey().isEmpty() && next(lastAttemptedKey).compareTo(range.getEndKey()) >= 0) {
+      return;
+    }
+
     // If the last attempted key was not at or beyond the end of the range then throw.
     if (range.getEndKey().isEmpty() || range.getEndKey().compareTo(lastAttemptedKey) > 0) {
       ByteKey nextKey = next(lastAttemptedKey);
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
index 614cb2c..7f17bdc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.transforms.splittabledofn;
 
 import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -34,7 +35,7 @@ import org.apache.beam.sdk.transforms.DoFn;
 public abstract class RestrictionTracker<RestrictionT, PositionT> {
   /**
    * Attempts to claim the block of work in the current restriction identified by the given
-   * position.
+   * position. Each claimed position MUST be a valid split point.
    *
    * <p>If this succeeds, the DoFn MUST execute the entire block of work. If this fails:
    *
@@ -83,8 +84,11 @@ public abstract class RestrictionTracker<RestrictionT, PositionT> {
    *
    * @param fractionOfRemainder A hint as to the fraction of work the primary restriction should
    *     represent based upon the current known remaining amount of work.
-   * @return a {@link SplitResult} if a split was possible, otherwise returns {@code null}.
+   * @return a {@link SplitResult} if a split was possible, otherwise returns {@code null}. If the
+   *     {@code fractionOfRemainder == 0}, a {@code null} result MUST imply that the restriction
+   *     tracker is done and there is no more work left to do.
    */
+  @Nullable
   public abstract SplitResult<RestrictionT> trySplit(double fractionOfRemainder);
 
   /**
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTest.java
index 0025e3e..66a7d5b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTest.java
@@ -361,8 +361,11 @@ public class ByteKeyRangeTest {
     assertThat("Too many hash collisions", collisions, lessThan(totalUnequalTests / 2));
   }
 
-  /** Asserts the two keys are equal except trailing zeros. */
-  private static void assertEqualExceptPadding(ByteKey expected, ByteKey key) {
+  /**
+   * Asserts the two keys are equal except trailing zeros. Note that this can only be used for
+   * testing split logic. *
+   */
+  public static void assertEqualExceptPadding(ByteKey expected, ByteKey key) {
     ByteBuffer shortKey = expected.getValue();
     ByteBuffer longKey = key.getValue();
     if (shortKey.remaining() > longKey.remaining()) {
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTrackerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTrackerTest.java
index a60f1e8..1ebb4fe 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTrackerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTrackerTest.java
@@ -17,9 +17,11 @@
  */
 package org.apache.beam.sdk.transforms.splittabledofn;
 
+import static org.apache.beam.sdk.io.range.ByteKeyRangeTest.assertEqualExceptPadding;
 import static org.apache.beam.sdk.transforms.splittabledofn.ByteKeyRangeTracker.next;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import org.apache.beam.sdk.io.range.ByteKey;
@@ -37,6 +39,21 @@ public class ByteKeyRangeTrackerTest {
   @Rule public final ExpectedException expected = ExpectedException.none();
 
   @Test
+  public void testTryClaimNoKeys() throws Exception {
+    ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(ByteKeyRangeTracker.NO_KEYS);
+    assertFalse(tracker.tryClaim(ByteKey.of(0x00)));
+    tracker.checkDone();
+  }
+
+  @Test
+  public void testTryClaimEmptyRange() throws Exception {
+    ByteKeyRangeTracker tracker =
+        ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0x10)));
+    assertFalse(tracker.tryClaim(ByteKey.of(0x10)));
+    tracker.checkDone();
+  }
+
+  @Test
   public void testTryClaim() throws Exception {
     ByteKeyRange range = ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0));
     ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(range);
@@ -47,6 +64,7 @@ public class ByteKeyRangeTrackerTest {
     assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
     assertTrue(tracker.tryClaim(ByteKey.of(0x99)));
     assertFalse(tracker.tryClaim(ByteKey.of(0xc0)));
+    tracker.checkDone();
   }
 
   @Test
@@ -58,7 +76,8 @@ public class ByteKeyRangeTrackerTest {
     // We expect to get the original range back and that the current restriction
     // is effectively made empty.
     assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)), checkpoint);
-    assertEquals(ByteKeyRangeTracker.NO_KEYS, tracker.currentRestriction());
+    assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0x10)), tracker.currentRestriction());
+    tracker.checkDone();
   }
 
   @Test
@@ -70,6 +89,23 @@ public class ByteKeyRangeTrackerTest {
     // is effectively made empty.
     assertEquals(ByteKeyRange.ALL_KEYS, checkpoint);
     assertEquals(ByteKeyRangeTracker.NO_KEYS, tracker.currentRestriction());
+    tracker.checkDone();
+  }
+
+  @Test
+  public void testCheckpointUnstartedForNoKeysRange() throws Exception {
+    ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(ByteKeyRangeTracker.NO_KEYS);
+    assertNull(tracker.trySplit(0));
+    tracker.checkDone();
+  }
+
+  @Test
+  public void testCheckpointUnstartedForEmptyRange() throws Exception {
+    ByteKeyRangeTracker tracker =
+        ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0x10)));
+    assertNull(tracker.trySplit(0));
+    assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0x10)), tracker.currentRestriction());
+    tracker.checkDone();
   }
 
   @Test
@@ -77,9 +113,9 @@ public class ByteKeyRangeTrackerTest {
     ByteKeyRangeTracker tracker =
         ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)));
     assertFalse(tracker.tryClaim(ByteKey.of(0xd0)));
-    ByteKeyRange checkpoint = tracker.trySplit(0).getResidual();
+    assertNull(tracker.trySplit(0));
     assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)), tracker.currentRestriction());
-    assertEquals(ByteKeyRangeTracker.NO_KEYS, checkpoint);
+    tracker.checkDone();
   }
 
   @Test
@@ -91,6 +127,7 @@ public class ByteKeyRangeTrackerTest {
     assertEquals(
         ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0x10, 0x00)), tracker.currentRestriction());
     assertEquals(ByteKeyRange.of(ByteKey.of(0x10, 0x00), ByteKey.of(0xc0)), checkpoint);
+    tracker.checkDone();
   }
 
   @Test
@@ -103,6 +140,7 @@ public class ByteKeyRangeTrackerTest {
     assertEquals(
         ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0x90, 0x00)), tracker.currentRestriction());
     assertEquals(ByteKeyRange.of(ByteKey.of(0x90, 0x00), ByteKey.of(0xc0)), checkpoint);
+    tracker.checkDone();
   }
 
   @Test
@@ -112,9 +150,8 @@ public class ByteKeyRangeTrackerTest {
     assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
     assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
     assertFalse(tracker.tryClaim(ByteKey.of(0xc0)));
-    ByteKeyRange checkpoint = tracker.trySplit(0).getResidual();
-    assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)), tracker.currentRestriction());
-    assertEquals(ByteKeyRangeTracker.NO_KEYS, checkpoint);
+    assertNull(tracker.trySplit(0));
+    tracker.checkDone();
   }
 
   @Test
@@ -123,9 +160,9 @@ public class ByteKeyRangeTrackerTest {
     assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
     assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
     assertFalse(tracker.tryClaim(ByteKey.EMPTY));
-    ByteKeyRange checkpoint = tracker.trySplit(0).getResidual();
+    assertNull(tracker.trySplit(0));
     assertEquals(ByteKeyRange.ALL_KEYS, tracker.currentRestriction());
-    assertEquals(ByteKeyRangeTracker.NO_KEYS, checkpoint);
+    tracker.checkDone();
   }
 
   @Test
@@ -136,9 +173,9 @@ public class ByteKeyRangeTrackerTest {
     assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
     assertTrue(tracker.tryClaim(ByteKey.of(0xa0)));
     assertFalse(tracker.tryClaim(ByteKey.of(0xd0)));
-    ByteKeyRange checkpoint = tracker.trySplit(0).getResidual();
+    assertNull(tracker.trySplit(0));
     assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)), tracker.currentRestriction());
-    assertEquals(ByteKeyRangeTracker.NO_KEYS, checkpoint);
+    tracker.checkDone();
   }
 
   @Test
@@ -149,9 +186,43 @@ public class ByteKeyRangeTrackerTest {
     assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
     assertTrue(tracker.tryClaim(ByteKey.of(0xa0)));
     assertFalse(tracker.tryClaim(ByteKey.EMPTY));
-    ByteKeyRange checkpoint = tracker.trySplit(0).getResidual();
+    assertNull(tracker.trySplit(0));
     assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)), tracker.currentRestriction());
-    assertEquals(ByteKeyRangeTracker.NO_KEYS, checkpoint);
+    tracker.checkDone();
+  }
+
+  @Test
+  public void testTrySplit() throws Exception {
+    ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(ByteKeyRange.ALL_KEYS);
+    SplitResult<ByteKeyRange> res = tracker.trySplit(0.5);
+    assertKeyRangeEqualExceptPadding(
+        ByteKeyRange.of(ByteKey.EMPTY, ByteKey.of(0x80)), res.getPrimary());
+    assertKeyRangeEqualExceptPadding(
+        ByteKeyRange.of(ByteKey.of(0x80), ByteKey.EMPTY), res.getResidual());
+    tracker.tryClaim(ByteKey.of(0x00));
+    res = tracker.trySplit(0.5);
+    assertKeyRangeEqualExceptPadding(
+        ByteKeyRange.of(ByteKey.EMPTY, ByteKey.of(0x40)), res.getPrimary());
+    assertKeyRangeEqualExceptPadding(
+        ByteKeyRange.of(ByteKey.of(0x40), ByteKey.of(0x80)), res.getResidual());
+    assertNull(tracker.trySplit(1));
+  }
+
+  @Test
+  public void testTrySplitAtNoKeysRange() throws Exception {
+    ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(ByteKeyRangeTracker.NO_KEYS);
+    assertNull(tracker.trySplit(0));
+    assertNull(tracker.trySplit(1));
+    tracker.checkDone();
+  }
+
+  @Test
+  public void testTrySplitAtEmptyRange() throws Exception {
+    ByteKeyRangeTracker tracker =
+        ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0x10)));
+    assertNull(tracker.trySplit(0.5));
+    assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0x10)), tracker.currentRestriction());
+    tracker.checkDone();
   }
 
   @Test
@@ -296,4 +367,10 @@ public class ByteKeyRangeTrackerTest {
     assertEquals(0.75, progress.getWorkCompleted(), 0.001);
     assertEquals(0.25, progress.getWorkRemaining(), 0.001);
   }
+
+  /** Asserts the two ByteKeyRange are equal except trailing zeros. */
+  private static void assertKeyRangeEqualExceptPadding(ByteKeyRange expected, ByteKeyRange key) {
+    assertEqualExceptPadding(expected.getStartKey(), key.getStartKey());
+    assertEqualExceptPadding(expected.getEndKey(), key.getEndKey());
+  }
 }