You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/06/22 05:29:38 UTC
[2/2] incubator-beam git commit: Update OffsetRangeTracker progress
tracking and start offset
Update OffsetRangeTracker progress tracking and start offset
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ee0a3bf2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ee0a3bf2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ee0a3bf2
Branch: refs/heads/master
Commit: ee0a3bf2acc0213e27d3b3d1353c27b977046577
Parents: 741ef26
Author: Ian Zhou <ia...@google.com>
Authored: Tue Jun 21 17:23:09 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jun 21 22:29:25 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/io/range/OffsetRangeTracker.java | 13 +--
.../beam/sdk/io/OffsetBasedSourceTest.java | 5 +-
.../sdk/io/range/OffsetRangeTrackerTest.java | 91 +++++++++++++++-----
.../sdk/io/gcp/bigtable/BigtableIOTest.java | 3 -
4 files changed, 82 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ee0a3bf2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java
index 76790af..a8d00ee 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java
@@ -32,7 +32,7 @@ import org.slf4j.LoggerFactory;
public class OffsetRangeTracker implements RangeTracker<Long> {
private static final Logger LOG = LoggerFactory.getLogger(OffsetRangeTracker.class);
- private final long startOffset;
+ private long startOffset;
private long stopOffset;
private long lastRecordStart = -1L;
private long offsetOfLastSplitPoint = -1L;
@@ -101,6 +101,9 @@ public class OffsetRangeTracker implements RangeTracker<Long> {
lastRecordStart));
}
+ if (lastRecordStart == -1) {
+ startOffset = recordStart;
+ }
lastRecordStart = recordStart;
if (isAtSplitPoint) {
@@ -165,7 +168,7 @@ public class OffsetRangeTracker implements RangeTracker<Long> {
throw new IllegalArgumentException(
"getPositionForFractionConsumed is not applicable to an unbounded range: " + this);
}
- return (long) Math.ceil(startOffset + fraction * (stopOffset - startOffset));
+ return (long) Math.floor(startOffset + fraction * (stopOffset - startOffset));
}
@Override
@@ -179,11 +182,11 @@ public class OffsetRangeTracker implements RangeTracker<Long> {
} else if (lastRecordStart >= stopOffset) {
return 1.0;
} else {
- // E.g., when reading [3, 6) and lastRecordStart is 4, that means we consumed 3,4 of 3,4,5
- // which is (4 - 3 + 1) / (6 - 3) = 67%.
+ // E.g., when reading [3, 6) and lastRecordStart is 4, that means we consumed 3 of 3,4,5
+ // which is (4 - 3) / (6 - 3) = 33%.
// Also, clamp to at most 1.0 because the last consumed position can extend past the
// stop position.
- return Math.min(1.0, 1.0 * (lastRecordStart - startOffset + 1) / (stopOffset - startOffset));
+ return Math.min(1.0, 1.0 * (lastRecordStart - startOffset) / (stopOffset - startOffset));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ee0a3bf2/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
index 66abd33..7009023 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
@@ -218,13 +218,14 @@ public class OffsetBasedSourceTest {
assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
assertTrue(reader.start());
- do {
+ items.add(reader.getCurrent());
+ while (reader.advance()) {
Double fraction = reader.getFractionConsumed();
assertNotNull(fraction);
assertTrue(fraction.toString(), fraction > 0.0);
assertTrue(fraction.toString(), fraction <= 1.0);
items.add(reader.getCurrent());
- } while (reader.advance());
+ }
assertEquals(1.0, reader.getFractionConsumed(), 1e-6);
assertEquals(20, items.size());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ee0a3bf2/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/OffsetRangeTrackerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/OffsetRangeTrackerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/OffsetRangeTrackerTest.java
index edd4c4f..0c2c639 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/OffsetRangeTrackerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/OffsetRangeTrackerTest.java
@@ -36,6 +36,17 @@ public class OffsetRangeTrackerTest {
@Rule public final ExpectedException expected = ExpectedException.none();
@Test
+ public void testUpdateStartOffset() throws Exception {
+ OffsetRangeTracker tracker = new OffsetRangeTracker(100, 200);
+ assertEquals(100, tracker.getStartPosition().longValue());
+ // Update start offset to first record returned
+ assertTrue(tracker.tryReturnRecordAt(true, 150));
+ assertEquals(150, tracker.getStartPosition().longValue());
+ assertTrue(tracker.tryReturnRecordAt(true, 180));
+ assertEquals(150, tracker.getStartPosition().longValue());
+ }
+
+ @Test
public void testTryReturnRecordSimpleSparse() throws Exception {
OffsetRangeTracker tracker = new OffsetRangeTracker(100, 200);
assertTrue(tracker.tryReturnRecordAt(true, 110));
@@ -113,28 +124,48 @@ public class OffsetRangeTrackerTest {
public void testGetPositionForFractionDense() throws Exception {
// Represents positions 3, 4, 5.
OffsetRangeTracker tracker = new OffsetRangeTracker(3, 6);
- // [3, 3) represents 0.0 of [3, 6)
+ // [3, 3) represents from [0, 1/3) fraction of [3, 6)
assertEquals(3, tracker.getPositionForFractionConsumed(0.0));
- // [3, 4) represents up to 1/3 of [3, 6)
- assertEquals(4, tracker.getPositionForFractionConsumed(1.0 / 6));
- assertEquals(4, tracker.getPositionForFractionConsumed(0.333));
- // [3, 5) represents up to 2/3 of [3, 6)
- assertEquals(5, tracker.getPositionForFractionConsumed(0.334));
- assertEquals(5, tracker.getPositionForFractionConsumed(0.666));
- // any fraction consumed over 2/3 means the whole [3, 6) has been consumed.
- assertEquals(6, tracker.getPositionForFractionConsumed(0.667));
+ assertEquals(3, tracker.getPositionForFractionConsumed(1.0 / 6));
+ assertEquals(3, tracker.getPositionForFractionConsumed(0.333));
+ // [3, 4) represents from [0, 2/3) fraction of [3, 6)
+ assertEquals(4, tracker.getPositionForFractionConsumed(0.334));
+ assertEquals(4, tracker.getPositionForFractionConsumed(0.666));
+ // [3, 5) represents from [0, 1) fraction of [3, 6)
+ assertEquals(5, tracker.getPositionForFractionConsumed(0.667));
+ assertEquals(5, tracker.getPositionForFractionConsumed(0.999));
+ // The whole [3, 6) is consumed for fraction 1
+ assertEquals(6, tracker.getPositionForFractionConsumed(1.0));
+ }
+
+ @Test
+ public void testGetPositionForFractionDenseUpdateStartOffset() throws Exception {
+ // Represents positions 3, 4, 5.
+ OffsetRangeTracker tracker = new OffsetRangeTracker(3, 6);
+ // [3, 3) represents from [0, 1/3) fraction of [3, 6)
+ assertEquals(3, tracker.getPositionForFractionConsumed(0.333));
+ // Update start offset to 4
+ assertTrue(tracker.tryReturnRecordAt(true, 4));
+ // [4, 4) represents from [0, 1/2) fraction of [4, 6)
+ assertEquals(4, tracker.getPositionForFractionConsumed(0.0));
+ assertEquals(4, tracker.getPositionForFractionConsumed(0.499));
+ // [4, 5) represents from [0, 1) fraction of [4, 6)
+ assertEquals(5, tracker.getPositionForFractionConsumed(0.5));
+ assertEquals(5, tracker.getPositionForFractionConsumed(0.999));
+ // The whole [4, 6) is consumed for fraction 1
+ assertEquals(6, tracker.getPositionForFractionConsumed(1.0));
}
@Test
public void testGetFractionConsumedDense() throws Exception {
OffsetRangeTracker tracker = new OffsetRangeTracker(3, 6);
- assertEquals(0, tracker.getFractionConsumed(), 1e-6);
+ assertEquals(0.0, tracker.getFractionConsumed(), 1e-6);
assertTrue(tracker.tryReturnRecordAt(true, 3));
- assertEquals(1.0 / 3, tracker.getFractionConsumed(), 1e-6);
+ assertEquals(0.0, tracker.getFractionConsumed(), 1e-6);
assertTrue(tracker.tryReturnRecordAt(true, 4));
- assertEquals(2.0 / 3, tracker.getFractionConsumed(), 1e-6);
+ assertEquals(1.0 / 3, tracker.getFractionConsumed(), 1e-6);
assertTrue(tracker.tryReturnRecordAt(true, 5));
- assertEquals(1.0, tracker.getFractionConsumed(), 1e-6);
+ assertEquals(2.0 / 3, tracker.getFractionConsumed(), 1e-6);
assertTrue(tracker.tryReturnRecordAt(false /* non-split-point */, 6));
assertEquals(1.0, tracker.getFractionConsumed(), 1e-6);
assertTrue(tracker.tryReturnRecordAt(false /* non-split-point */, 7));
@@ -145,14 +176,33 @@ public class OffsetRangeTrackerTest {
@Test
public void testGetFractionConsumedSparse() throws Exception {
OffsetRangeTracker tracker = new OffsetRangeTracker(100, 200);
- assertEquals(0, tracker.getFractionConsumed(), 1e-6);
+ assertEquals(0.0, tracker.getFractionConsumed(), 1e-6);
+ assertTrue(tracker.tryReturnRecordAt(true, 100));
+ assertEquals(0.0, tracker.getFractionConsumed(), 1e-6);
assertTrue(tracker.tryReturnRecordAt(true, 110));
- // Consumed positions through 110 = total 11 positions of 100.
- assertEquals(0.11, tracker.getFractionConsumed(), 1e-6);
+ // Consumed positions through 109 = total 10 positions of 100.
+ assertEquals(0.1, tracker.getFractionConsumed(), 1e-6);
+ assertTrue(tracker.tryReturnRecordAt(true, 150));
+ assertEquals(0.5, tracker.getFractionConsumed(), 1e-6);
+ assertTrue(tracker.tryReturnRecordAt(true, 195));
+ assertEquals(0.95, tracker.getFractionConsumed(), 1e-6);
+ assertFalse(tracker.tryReturnRecordAt(true, 200));
+ assertEquals(1.0, tracker.getFractionConsumed(), 1e-6);
+ }
+
+ @Test
+ public void testGetFractionConsumedUpdateStartOffset() throws Exception {
+ OffsetRangeTracker tracker = new OffsetRangeTracker(100, 200);
assertTrue(tracker.tryReturnRecordAt(true, 150));
- assertEquals(0.51, tracker.getFractionConsumed(), 1e-6);
+ assertEquals(0.0, tracker.getFractionConsumed(), 1e-6);
+ assertTrue(tracker.tryReturnRecordAt(true, 160));
+ assertEquals(0.2, tracker.getFractionConsumed(), 1e-6);
+ assertTrue(tracker.tryReturnRecordAt(true, 180));
+ assertEquals(0.6, tracker.getFractionConsumed(), 1e-6);
assertTrue(tracker.tryReturnRecordAt(true, 195));
- assertEquals(0.96, tracker.getFractionConsumed(), 1e-6);
+ assertEquals(0.9, tracker.getFractionConsumed(), 1e-6);
+ assertFalse(tracker.tryReturnRecordAt(true, 200));
+ assertEquals(1.0, tracker.getFractionConsumed(), 1e-6);
}
@Test
@@ -172,15 +222,16 @@ public class OffsetRangeTrackerTest {
@Test
public void testTryReturnFirstRecordNotSplitPoint() throws Exception {
+ OffsetRangeTracker tracker = new OffsetRangeTracker(100, 200);
expected.expect(IllegalStateException.class);
- new OffsetRangeTracker(100, 200).tryReturnRecordAt(false, 120);
+ tracker.tryReturnRecordAt(false, 120);
}
@Test
public void testTryReturnRecordNonMonotonic() throws Exception {
OffsetRangeTracker tracker = new OffsetRangeTracker(100, 200);
- expected.expect(IllegalStateException.class);
tracker.tryReturnRecordAt(true, 120);
+ expected.expect(IllegalStateException.class);
tracker.tryReturnRecordAt(true, 110);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ee0a3bf2/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index 4cb30b4..c09943b 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -289,9 +289,6 @@ public class BigtableIOTest {
/**
* Tests dynamic work rebalancing exhaustively.
- *
- * <p>Because this test runs so slowly, it is disabled by default. Re-run when changing the
- * {@link BigtableIO.Read} implementation.
*/
@Test
public void testReadingSplitAtFractionExhaustive() throws Exception {