You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2016/10/25 16:29:01 UTC
[43/50] incubator-beam git commit: OffsetBasedSource: allow empty
sources
OffsetBasedSource: allow empty sources
As one example, empty files exist.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7e9ad995
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7e9ad995
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7e9ad995
Branch: refs/heads/apex-runner
Commit: 7e9ad9954e50bd01fba4cda84c182af895b2c23f
Parents: dfaf2a8
Author: Dan Halperin <dh...@google.com>
Authored: Fri Oct 21 12:21:45 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Oct 24 13:16:50 2016 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/io/OffsetBasedSource.java | 4 +--
.../beam/sdk/io/OffsetBasedSourceTest.java | 30 +++++++++++++++-----
2 files changed, 25 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7e9ad995/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
index 6e49cc3..e9a398d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
@@ -146,8 +146,8 @@ public abstract class OffsetBasedSource<T> extends BoundedSource<T> {
this.endOffset >= 0,
"End offset has value %s, must be non-negative", this.endOffset);
checkArgument(
- this.startOffset < this.endOffset,
- "Start offset %s must be before end offset %s",
+ this.startOffset <= this.endOffset,
+ "Start offset %s may not be larger than end offset %s",
this.startOffset, this.endOffset);
checkArgument(
this.minBundleSize >= 0,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7e9ad995/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
index 923b4b4..6584e5d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
@@ -62,7 +62,7 @@ public class OffsetBasedSourceTest {
}
@Override
- public boolean producesSortedKeys(PipelineOptions options) throws Exception {
+ public boolean producesSortedKeys(PipelineOptions options) {
return false;
}
@@ -85,7 +85,7 @@ public class OffsetBasedSourceTest {
}
@Override
- public OffsetBasedReader<Integer> createReader(PipelineOptions options) throws IOException {
+ public CoarseRangeReader createReader(PipelineOptions options) {
return new CoarseRangeReader(this);
}
}
@@ -105,7 +105,7 @@ public class OffsetBasedSourceTest {
}
@Override
- public boolean startImpl() throws IOException {
+ public boolean startImpl() {
current = getCurrentSource().getStartOffset();
while (current % granularity != 0) {
++current;
@@ -114,7 +114,7 @@ public class OffsetBasedSourceTest {
}
@Override
- public boolean advanceImpl() throws IOException {
+ public boolean advanceImpl() {
++current;
return true;
}
@@ -130,7 +130,7 @@ public class OffsetBasedSourceTest {
}
@Override
- public void close() throws IOException { }
+ public void close() { }
}
public static void assertSplitsAre(List<? extends OffsetBasedSource<?>> splits,
@@ -211,7 +211,7 @@ public class OffsetBasedSourceTest {
// in the face of that.
PipelineOptions options = PipelineOptionsFactory.create();
CoarseRangeSource source = new CoarseRangeSource(13, 35, 1, 10);
- try (BoundedSource.BoundedReader<Integer> reader = source.createReader(options)) {
+ try (CoarseRangeReader reader = source.createReader(options)) {
List<Integer> items = new ArrayList<>();
assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
@@ -304,7 +304,7 @@ public class OffsetBasedSourceTest {
public void testSplitAtFraction() throws IOException {
PipelineOptions options = PipelineOptionsFactory.create();
CoarseRangeSource source = new CoarseRangeSource(13, 35, 1, 10);
- try (CoarseRangeReader reader = (CoarseRangeReader) source.createReader(options)) {
+ try (CoarseRangeReader reader = source.createReader(options)) {
List<Integer> originalItems = new ArrayList<>();
assertTrue(reader.start());
originalItems.add(reader.getCurrent());
@@ -341,4 +341,20 @@ public class OffsetBasedSourceTest {
CoarseRangeSource original = new CoarseRangeSource(13, 35, 1, 10);
assertSplitAtFractionExhaustive(original, options);
}
+
+ @Test
+ public void testEmptyOffsetRange() throws Exception {
+ CoarseRangeSource empty = new CoarseRangeSource(0, 0, 1, 1);
+ try (CoarseRangeReader reader = empty.createReader(PipelineOptionsFactory.create())) {
+ assertEquals(0, reader.getSplitPointsConsumed());
+ assertEquals(OffsetBasedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
+ assertEquals(0.0, reader.getFractionConsumed(), 0.0001);
+
+ assertFalse(reader.start());
+
+ assertEquals(0, reader.getSplitPointsConsumed());
+ assertEquals(0, reader.getSplitPointsRemaining());
+ assertEquals(1.0, reader.getFractionConsumed(), 0.0001);
+ }
+ }
}