You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/06/15 21:19:31 UTC
[1/2] incubator-beam git commit: Closes #440
Repository: incubator-beam
Updated Branches:
refs/heads/master 2f46bc004 -> 11eb9f65f
Closes #440
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/11eb9f65
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/11eb9f65
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/11eb9f65
Branch: refs/heads/master
Commit: 11eb9f65f9020ceb40d8a60164914227ff0451b3
Parents: 2f46bc0 172d4fd
Author: Dan Halperin <dh...@google.com>
Authored: Wed Jun 15 14:18:50 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Jun 15 14:18:50 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/io/range/ByteKeyRangeTracker.java | 8 ++
.../sdk/io/range/ByteKeyRangeTrackerTest.java | 84 ++++++++++++++------
.../beam/sdk/io/gcp/bigtable/BigtableIO.java | 5 +-
.../sdk/io/gcp/bigtable/BigtableIOTest.java | 2 -
4 files changed, 70 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
[2/2] incubator-beam git commit: Modified range tracker to use first
response seen as start key
Posted by dh...@apache.org.
Modified range tracker to use first response seen as start key
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/172d4fdc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/172d4fdc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/172d4fdc
Branch: refs/heads/master
Commit: 172d4fdc84eb65898259b0d65f5f60492a0cbf7b
Parents: 2f46bc0
Author: Ian Zhou <ia...@google.com>
Authored: Thu Jun 9 14:17:14 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Jun 15 14:18:50 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/io/range/ByteKeyRangeTracker.java | 8 ++
.../sdk/io/range/ByteKeyRangeTrackerTest.java | 84 ++++++++++++++------
.../beam/sdk/io/gcp/bigtable/BigtableIO.java | 5 +-
.../sdk/io/gcp/bigtable/BigtableIOTest.java | 2 -
4 files changed, 70 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/172d4fdc/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKeyRangeTracker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKeyRangeTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKeyRangeTracker.java
index cb779fd..b165924 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKeyRangeTracker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKeyRangeTracker.java
@@ -48,11 +48,19 @@ public final class ByteKeyRangeTracker implements RangeTracker<ByteKey> {
return range.getEndKey();
}
+ /** Returns the current range. */
+ public synchronized ByteKeyRange getRange() {
+ return range;
+ }
+
@Override
public synchronized boolean tryReturnRecordAt(boolean isAtSplitPoint, ByteKey recordStart) {
if (isAtSplitPoint && !range.containsKey(recordStart)) {
return false;
}
+ if (position == null) {
+ range = range.withStartKey(recordStart);
+ }
position = recordStart;
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/172d4fdc/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTrackerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTrackerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTrackerTest.java
index 958fa48..4404414 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTrackerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/ByteKeyRangeTrackerTest.java
@@ -28,66 +28,100 @@ import org.junit.runners.JUnit4;
/** Tests for {@link ByteKeyRangeTracker}. */
@RunWith(JUnit4.class)
public class ByteKeyRangeTrackerTest {
- private static final ByteKey START_KEY = ByteKey.of(0x12);
- private static final ByteKey MIDDLE_KEY = ByteKey.of(0x23);
+ private static final ByteKey INITIAL_START_KEY = ByteKey.of(0x12);
+ private static final ByteKey INITIAL_MIDDLE_KEY = ByteKey.of(0x23);
+ private static final ByteKey NEW_START_KEY = ByteKey.of(0x14);
+ private static final ByteKey NEW_MIDDLE_KEY = ByteKey.of(0x24);
private static final ByteKey BEFORE_END_KEY = ByteKey.of(0x33);
private static final ByteKey END_KEY = ByteKey.of(0x34);
- private static final double RANGE_SIZE = 0x34 - 0x12;
- private static final ByteKeyRange RANGE = ByteKeyRange.of(START_KEY, END_KEY);
+ private static final double INITIAL_RANGE_SIZE = 0x34 - 0x12;
+ private static final ByteKeyRange INITIAL_RANGE = ByteKeyRange.of(INITIAL_START_KEY, END_KEY);
+ private static final double NEW_RANGE_SIZE = 0x34 - 0x14;
+ private static final ByteKeyRange NEW_RANGE = ByteKeyRange.of(NEW_START_KEY, END_KEY);
/** Tests for {@link ByteKeyRangeTracker#toString}. */
@Test
public void testToString() {
- ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(RANGE);
- String expected = String.format("ByteKeyRangeTracker{range=%s, position=null}", RANGE);
+ ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(INITIAL_RANGE);
+ String expected = String.format("ByteKeyRangeTracker{range=%s, position=null}", INITIAL_RANGE);
assertEquals(expected, tracker.toString());
- tracker.tryReturnRecordAt(true, MIDDLE_KEY);
- expected = String.format("ByteKeyRangeTracker{range=%s, position=%s}", RANGE, MIDDLE_KEY);
+ tracker.tryReturnRecordAt(true, INITIAL_START_KEY);
+ tracker.tryReturnRecordAt(true, INITIAL_MIDDLE_KEY);
+ expected =
+ String.format("ByteKeyRangeTracker{range=%s, position=%s}", INITIAL_RANGE,
+ INITIAL_MIDDLE_KEY);
+ assertEquals(expected, tracker.toString());
+ }
+
+ /** Tests for updating the start key to the first record returned. */
+ @Test
+ public void testUpdateStartKey() {
+ ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(INITIAL_RANGE);
+
+ tracker.tryReturnRecordAt(true, NEW_START_KEY);
+ String expected =
+ String.format("ByteKeyRangeTracker{range=%s, position=%s}", NEW_RANGE, NEW_START_KEY);
assertEquals(expected, tracker.toString());
}
/** Tests for {@link ByteKeyRangeTracker#of}. */
@Test
public void testBuilding() {
- ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(RANGE);
+ ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(INITIAL_RANGE);
- assertEquals(START_KEY, tracker.getStartPosition());
+ assertEquals(INITIAL_START_KEY, tracker.getStartPosition());
assertEquals(END_KEY, tracker.getStopPosition());
}
/** Tests for {@link ByteKeyRangeTracker#getFractionConsumed()}. */
@Test
public void testGetFractionConsumed() {
- ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(RANGE);
+ ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(INITIAL_RANGE);
double delta = 0.00001;
assertEquals(0.0, tracker.getFractionConsumed(), delta);
- tracker.tryReturnRecordAt(true, START_KEY);
+ tracker.tryReturnRecordAt(true, INITIAL_START_KEY);
+ assertEquals(0.0, tracker.getFractionConsumed(), delta);
+
+ tracker.tryReturnRecordAt(true, INITIAL_MIDDLE_KEY);
+ assertEquals(0.5, tracker.getFractionConsumed(), delta);
+
+ tracker.tryReturnRecordAt(true, BEFORE_END_KEY);
+ assertEquals(1 - 1 / INITIAL_RANGE_SIZE, tracker.getFractionConsumed(), delta);
+ }
+
+ /** Tests for {@link ByteKeyRangeTracker#getFractionConsumed()} with updated start key. */
+ @Test
+ public void testGetFractionConsumedUpdateStartKey() {
+ ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(INITIAL_RANGE);
+ double delta = 0.00001;
+
+ tracker.tryReturnRecordAt(true, NEW_START_KEY);
assertEquals(0.0, tracker.getFractionConsumed(), delta);
- tracker.tryReturnRecordAt(true, MIDDLE_KEY);
+ tracker.tryReturnRecordAt(true, NEW_MIDDLE_KEY);
assertEquals(0.5, tracker.getFractionConsumed(), delta);
tracker.tryReturnRecordAt(true, BEFORE_END_KEY);
- assertEquals(1 - 1 / RANGE_SIZE, tracker.getFractionConsumed(), delta);
+ assertEquals(1 - 1 / NEW_RANGE_SIZE, tracker.getFractionConsumed(), delta);
}
/** Tests for {@link ByteKeyRangeTracker#tryReturnRecordAt}. */
@Test
public void testTryReturnRecordAt() {
- ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(RANGE);
+ ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(INITIAL_RANGE);
// Should be able to emit at the same key twice, should that happen.
// Should be able to emit within range (in order, but system guarantees won't try out of order).
// Should not be able to emit past end of range.
- assertTrue(tracker.tryReturnRecordAt(true, START_KEY));
- assertTrue(tracker.tryReturnRecordAt(true, START_KEY));
+ assertTrue(tracker.tryReturnRecordAt(true, INITIAL_START_KEY));
+ assertTrue(tracker.tryReturnRecordAt(true, INITIAL_START_KEY));
- assertTrue(tracker.tryReturnRecordAt(true, MIDDLE_KEY));
- assertTrue(tracker.tryReturnRecordAt(true, MIDDLE_KEY));
+ assertTrue(tracker.tryReturnRecordAt(true, INITIAL_MIDDLE_KEY));
+ assertTrue(tracker.tryReturnRecordAt(true, INITIAL_MIDDLE_KEY));
assertTrue(tracker.tryReturnRecordAt(true, BEFORE_END_KEY));
@@ -99,13 +133,13 @@ public class ByteKeyRangeTrackerTest {
/** Tests for {@link ByteKeyRangeTracker#trySplitAtPosition}. */
@Test
public void testSplitAtPosition() {
- ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(RANGE);
+ ByteKeyRangeTracker tracker = ByteKeyRangeTracker.of(INITIAL_RANGE);
// Unstarted, should not split.
- assertFalse(tracker.trySplitAtPosition(MIDDLE_KEY));
+ assertFalse(tracker.trySplitAtPosition(INITIAL_MIDDLE_KEY));
// Start it, split it before the end.
- assertTrue(tracker.tryReturnRecordAt(true, START_KEY));
+ assertTrue(tracker.tryReturnRecordAt(true, INITIAL_START_KEY));
assertTrue(tracker.trySplitAtPosition(BEFORE_END_KEY));
assertEquals(BEFORE_END_KEY, tracker.getStopPosition());
@@ -113,8 +147,8 @@ public class ByteKeyRangeTrackerTest {
assertFalse(tracker.trySplitAtPosition(END_KEY));
// Should not be able to split after emitting.
- assertTrue(tracker.tryReturnRecordAt(true, MIDDLE_KEY));
- assertFalse(tracker.trySplitAtPosition(MIDDLE_KEY));
- assertTrue(tracker.tryReturnRecordAt(true, MIDDLE_KEY));
+ assertTrue(tracker.tryReturnRecordAt(true, INITIAL_MIDDLE_KEY));
+ assertFalse(tracker.trySplitAtPosition(INITIAL_MIDDLE_KEY));
+ assertTrue(tracker.tryReturnRecordAt(true, INITIAL_MIDDLE_KEY));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/172d4fdc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 9656494..f725a66 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -811,9 +811,10 @@ public class BigtableIO {
public final synchronized BigtableSource splitAtFraction(double fraction) {
ByteKey splitKey;
try {
- splitKey = source.getRange().interpolateKey(fraction);
+ splitKey = rangeTracker.getRange().interpolateKey(fraction);
} catch (IllegalArgumentException e) {
- logger.info("%s: Failed to interpolate key for fraction %s.", source.getRange(), fraction);
+ logger.info(
+ "%s: Failed to interpolate key for fraction %s.", rangeTracker.getRange(), fraction);
return null;
}
logger.debug(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/172d4fdc/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index 357ab44..4cb30b4 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -65,7 +65,6 @@ import com.google.protobuf.Empty;
import org.hamcrest.Matchers;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -294,7 +293,6 @@ public class BigtableIOTest {
* <p>Because this test runs so slowly, it is disabled by default. Re-run when changing the
* {@link BigtableIO.Read} implementation.
*/
- @Ignore("Slow. Rerun when changing the implementation.")
@Test
public void testReadingSplitAtFractionExhaustive() throws Exception {
final String table = "TEST-FEW-ROWS-SPLIT-EXHAUSTIVE-TABLE";