You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/10/01 15:04:00 UTC

[jira] [Work logged] (BEAM-5487) ByteKeyRangeTracker restrictions do not cover the entire interval because of incorrect next key

     [ https://issues.apache.org/jira/browse/BEAM-5487?focusedWorklogId=150054&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-150054 ]

ASF GitHub Bot logged work on BEAM-5487:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 01/Oct/18 15:03
            Start Date: 01/Oct/18 15:03
    Worklog Time Spent: 10m 
      Work Description: iemejia closed pull request #6481: [BEAM-5487] ByteKeyRangeTracker restrictions do not cover the entire interval because of incorrect next key
URL: https://github.com/apache/beam/pull/6481
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java
index bee175ec261..0a553f7d931 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTracker.java
@@ -23,7 +23,7 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
-import java.util.Arrays;
+import com.google.common.primitives.Bytes;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.io.range.ByteKey;
 import org.apache.beam.sdk.io.range.ByteKeyRange;
@@ -131,28 +131,8 @@ public String toString() {
    */
   @VisibleForTesting
   static ByteKey next(ByteKey key) {
-    return ByteKey.copyFrom(unsignedCopyAndIncrement(key.getBytes()));
+    return ByteKey.copyFrom(Bytes.concat(key.getBytes(), ZERO_BYTE_ARRAY));
   }
 
-  /**
-   * @return The value of the input incremented by one using byte arithmetic. It treats the input
-   *     byte[] as an unsigned series of bytes, most significant bits first.
-   */
-  private static byte[] unsignedCopyAndIncrement(byte[] input) {
-    if (input.length == 0) {
-      return new byte[] {0};
-    }
-    byte[] copy = Arrays.copyOf(input, input.length);
-    for (int i = copy.length - 1; i >= 0; --i) {
-      if (copy[i] != (byte) 0xff) {
-        ++copy[i];
-        return copy;
-      }
-      copy[i] = 0;
-    }
-    byte[] out = new byte[copy.length + 1];
-    out[0] = 1;
-    System.arraycopy(copy, 0, out, 1, copy.length);
-    return out;
-  }
+  private static final byte[] ZERO_BYTE_ARRAY = new byte[] {0};
 }
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTrackerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTrackerTest.java
index ef6eba41ff1..a285ec82ef6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTrackerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/ByteKeyRangeTrackerTest.java
@@ -71,8 +71,9 @@ public void testCheckpointJustStarted() throws Exception {
         ByteKeyRangeTracker.of(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)));
     assertTrue(tracker.tryClaim(ByteKey.of(0x10)));
     ByteKeyRange checkpoint = tracker.checkpoint();
-    assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0x11)), tracker.currentRestriction());
-    assertEquals(ByteKeyRange.of(ByteKey.of(0x11), ByteKey.of(0xc0)), checkpoint);
+    assertEquals(
+        ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0x10, 0x00)), tracker.currentRestriction());
+    assertEquals(ByteKeyRange.of(ByteKey.of(0x10, 0x00), ByteKey.of(0xc0)), checkpoint);
   }
 
   @Test
@@ -82,8 +83,9 @@ public void testCheckpointRegular() throws Exception {
     assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
     assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
     ByteKeyRange checkpoint = tracker.checkpoint();
-    assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0x91)), tracker.currentRestriction());
-    assertEquals(ByteKeyRange.of(ByteKey.of(0x91), ByteKey.of(0xc0)), checkpoint);
+    assertEquals(
+        ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0x90, 0x00)), tracker.currentRestriction());
+    assertEquals(ByteKeyRange.of(ByteKey.of(0x90, 0x00), ByteKey.of(0xc0)), checkpoint);
   }
 
   @Test
@@ -94,8 +96,9 @@ public void testCheckpointClaimedLast() throws Exception {
     assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
     assertTrue(tracker.tryClaim(ByteKey.of(0xbf)));
     ByteKeyRange checkpoint = tracker.checkpoint();
-    assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xc0)), tracker.currentRestriction());
-    assertEquals(ByteKeyRange.of(ByteKey.of(0xc0), ByteKey.of(0xc0)), checkpoint);
+    assertEquals(
+        ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xbf, 0x00)), tracker.currentRestriction());
+    assertEquals(ByteKeyRange.of(ByteKey.of(0xbf, 0x00), ByteKey.of(0xc0)), checkpoint);
   }
 
   @Test
@@ -107,8 +110,9 @@ public void testCheckpointAfterFailedClaim() throws Exception {
     assertTrue(tracker.tryClaim(ByteKey.of(0xa0)));
     assertFalse(tracker.tryClaim(ByteKey.of(0xd0)));
     ByteKeyRange checkpoint = tracker.checkpoint();
-    assertEquals(ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xa1)), tracker.currentRestriction());
-    assertEquals(ByteKeyRange.of(ByteKey.of(0xa1), ByteKey.of(0xc0)), checkpoint);
+    assertEquals(
+        ByteKeyRange.of(ByteKey.of(0x10), ByteKey.of(0xa0, 0x00)), tracker.currentRestriction());
+    assertEquals(ByteKeyRange.of(ByteKey.of(0xa0, 0x00), ByteKey.of(0xc0)), checkpoint);
   }
 
   @Test
@@ -158,6 +162,9 @@ public void testCheckDoneAfterTryClaimRightBeforeEndOfRange() {
     assertTrue(tracker.tryClaim(ByteKey.of(0x50)));
     assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
     assertTrue(tracker.tryClaim(ByteKey.of(0xbf)));
+    expected.expectMessage(
+        "Last attempted key was [bf] in range ByteKeyRange{startKey=[10], endKey=[c0]}, "
+            + "claiming work in [[bf00], [c0]) was not attempted");
     tracker.checkDone();
   }
 
@@ -169,7 +176,7 @@ public void testCheckDoneWhenNotDone() {
     assertTrue(tracker.tryClaim(ByteKey.of(0x90)));
     expected.expectMessage(
         "Last attempted key was [90] in range ByteKeyRange{startKey=[10], endKey=[c0]}, "
-            + "claiming work in [[91], [c0]) was not attempted");
+            + "claiming work in [[9000], [c0]) was not attempted");
     tracker.checkDone();
   }
 
@@ -194,11 +201,11 @@ public void testCheckDoneUnstarted() {
   @Test
   public void testNextByteKey() {
     assertEquals(next(ByteKey.EMPTY), ByteKey.of(0x00));
-    assertEquals(next(ByteKey.of(0x00)), ByteKey.of(0x01));
-    assertEquals(next(ByteKey.of(0x9f)), ByteKey.of(0xa0));
-    assertEquals(next(ByteKey.of(0xff)), ByteKey.of(0x01, 0x00));
-    assertEquals(next(ByteKey.of(0x10, 0x10)), ByteKey.of(0x10, 0x11));
-    assertEquals(next(ByteKey.of(0x00, 0xff)), ByteKey.of(0x01, 0x00));
-    assertEquals(next(ByteKey.of(0xff, 0xff)), ByteKey.of(0x01, 0x00, 0x00));
+    assertEquals(next(ByteKey.of(0x00)), ByteKey.of(0x00, 0x00));
+    assertEquals(next(ByteKey.of(0x9f)), ByteKey.of(0x9f, 0x00));
+    assertEquals(next(ByteKey.of(0xff)), ByteKey.of(0xff, 0x00));
+    assertEquals(next(ByteKey.of(0x10, 0x10)), ByteKey.of(0x10, 0x10, 0x00));
+    assertEquals(next(ByteKey.of(0x00, 0xff)), ByteKey.of(0x00, 0xff, 0x00));
+    assertEquals(next(ByteKey.of(0xff, 0xff)), ByteKey.of(0xff, 0xff, 0x00));
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 150054)
    Time Spent: 40m  (was: 0.5h)

> ByteKeyRangeTracker restrictions do not cover the entire interval because of incorrect next key
> -----------------------------------------------------------------------------------------------
>
>                 Key: BEAM-5487
>                 URL: https://issues.apache.org/jira/browse/BEAM-5487
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-core
>            Reporter: Luke Cwik
>            Assignee: Luke Cwik
>            Priority: Major
>             Fix For: 2.8.0
>
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> The definition of next for a byte key is incorrectly set as old key + 1 when treating the key as the next smallest value, for example
> 0x01 is the next key for 0x00
>  
> The ByteKey compareTo operator treats two keys where one key is the proper prefix of the other as smaller hence 0x01 is greater then 0x00, but so is 0x0000 thus next is skipping over 0x0000 as the next smallest key larger then 0x00.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)