You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/10/26 16:43:59 UTC

[21/50] [abbrv] incubator-beam git commit: OffsetBasedSource: allow empty sources

OffsetBasedSource: allow empty sources

As one example, empty files exist.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7e9ad995
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7e9ad995
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7e9ad995

Branch: refs/heads/gearpump-runner
Commit: 7e9ad9954e50bd01fba4cda84c182af895b2c23f
Parents: dfaf2a8
Author: Dan Halperin <dh...@google.com>
Authored: Fri Oct 21 12:21:45 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Oct 24 13:16:50 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/io/OffsetBasedSource.java   |  4 +--
 .../beam/sdk/io/OffsetBasedSourceTest.java      | 30 +++++++++++++++-----
 2 files changed, 25 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7e9ad995/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
index 6e49cc3..e9a398d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
@@ -146,8 +146,8 @@ public abstract class OffsetBasedSource<T> extends BoundedSource<T> {
         this.endOffset >= 0,
         "End offset has value %s, must be non-negative", this.endOffset);
     checkArgument(
-        this.startOffset < this.endOffset,
-        "Start offset %s must be before end offset %s",
+        this.startOffset <= this.endOffset,
+        "Start offset %s may not be larger than end offset %s",
         this.startOffset, this.endOffset);
     checkArgument(
         this.minBundleSize >= 0,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7e9ad995/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
index 923b4b4..6584e5d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
@@ -62,7 +62,7 @@ public class OffsetBasedSourceTest {
     }
 
     @Override
-    public boolean producesSortedKeys(PipelineOptions options) throws Exception {
+    public boolean producesSortedKeys(PipelineOptions options) {
       return false;
     }
 
@@ -85,7 +85,7 @@ public class OffsetBasedSourceTest {
     }
 
     @Override
-    public OffsetBasedReader<Integer> createReader(PipelineOptions options) throws IOException {
+    public CoarseRangeReader createReader(PipelineOptions options) {
       return new CoarseRangeReader(this);
     }
   }
@@ -105,7 +105,7 @@ public class OffsetBasedSourceTest {
     }
 
     @Override
-    public boolean startImpl() throws IOException {
+    public boolean startImpl() {
       current = getCurrentSource().getStartOffset();
       while (current % granularity != 0) {
         ++current;
@@ -114,7 +114,7 @@ public class OffsetBasedSourceTest {
     }
 
     @Override
-    public boolean advanceImpl() throws IOException {
+    public boolean advanceImpl() {
       ++current;
       return true;
     }
@@ -130,7 +130,7 @@ public class OffsetBasedSourceTest {
     }
 
     @Override
-    public void close() throws IOException { }
+    public void close() { }
   }
 
   public static void assertSplitsAre(List<? extends OffsetBasedSource<?>> splits,
@@ -211,7 +211,7 @@ public class OffsetBasedSourceTest {
     // in the face of that.
     PipelineOptions options = PipelineOptionsFactory.create();
     CoarseRangeSource source = new CoarseRangeSource(13, 35, 1, 10);
-    try (BoundedSource.BoundedReader<Integer> reader = source.createReader(options)) {
+    try (CoarseRangeReader reader = source.createReader(options)) {
       List<Integer> items = new ArrayList<>();
 
       assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
@@ -304,7 +304,7 @@ public class OffsetBasedSourceTest {
   public void testSplitAtFraction() throws IOException {
     PipelineOptions options = PipelineOptionsFactory.create();
     CoarseRangeSource source = new CoarseRangeSource(13, 35, 1, 10);
-    try (CoarseRangeReader reader = (CoarseRangeReader) source.createReader(options)) {
+    try (CoarseRangeReader reader = source.createReader(options)) {
       List<Integer> originalItems = new ArrayList<>();
       assertTrue(reader.start());
       originalItems.add(reader.getCurrent());
@@ -341,4 +341,20 @@ public class OffsetBasedSourceTest {
     CoarseRangeSource original = new CoarseRangeSource(13, 35, 1, 10);
     assertSplitAtFractionExhaustive(original, options);
   }
+
+  @Test
+  public void testEmptyOffsetRange() throws Exception {
+    CoarseRangeSource empty = new CoarseRangeSource(0, 0, 1, 1);
+    try (CoarseRangeReader reader = empty.createReader(PipelineOptionsFactory.create())) {
+      assertEquals(0, reader.getSplitPointsConsumed());
+      assertEquals(OffsetBasedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
+      assertEquals(0.0, reader.getFractionConsumed(), 0.0001);
+
+      assertFalse(reader.start());
+
+      assertEquals(0, reader.getSplitPointsConsumed());
+      assertEquals(0, reader.getSplitPointsRemaining());
+      assertEquals(1.0, reader.getFractionConsumed(), 0.0001);
+    }
+  }
 }