You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/12/09 02:41:53 UTC
[1/2] incubator-beam git commit: Add support for having an empty
CountingInput/CountingSource
Repository: incubator-beam
Updated Branches:
refs/heads/master 40bd27602 -> ddb59125a
Add support for having an empty CountingInput/CountingSource
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/30ff1ee1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/30ff1ee1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/30ff1ee1
Branch: refs/heads/master
Commit: 30ff1ee17bb290f2b50fd082d8cb63d48280c5c2
Parents: 40bd276
Author: Luke Cwik <lc...@google.com>
Authored: Thu Dec 8 15:22:35 2016 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Dec 8 18:41:17 2016 -0800
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/CountingInput.java | 12 ++++++----
.../org/apache/beam/sdk/io/CountingSource.java | 12 ++++++----
.../apache/beam/sdk/io/CountingInputTest.java | 23 +++++++++++++++++++-
.../apache/beam/sdk/io/CountingSourceTest.java | 10 +++++++++
4 files changed, 48 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/30ff1ee1/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
index 3148d8d..ac70aca 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
@@ -75,17 +75,21 @@ public class CountingInput {
* from {@code 0} to {@code numElements - 1}.
*/
public static BoundedCountingInput upTo(long numElements) {
- checkArgument(numElements > 0, "numElements (%s) must be greater than 0", numElements);
+ checkArgument(numElements >= 0,
+ "numElements (%s) must be greater than or equal to 0",
+ numElements);
return new BoundedCountingInput(numElements);
}
/**
* Creates a {@link BoundedCountingInput} that will produce elements
- * starting from {@code startIndex} to {@code endIndex - 1}.
+ * starting from {@code startIndex} (inclusive) to {@code endIndex} (exclusive).
+ * If {@code startIndex == endIndex}, then no elements will be produced.
*/
public static BoundedCountingInput forSubrange(long startIndex, long endIndex) {
- checkArgument(endIndex > startIndex, "endIndex (%s) must be greater than startIndex (%s)",
- endIndex, startIndex);
+ checkArgument(endIndex >= startIndex,
+ "endIndex (%s) must be greater than or equal to startIndex (%s)",
+ endIndex, startIndex);
return new BoundedCountingInput(startIndex, endIndex);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/30ff1ee1/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
index bc7fb78..9752dba 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
@@ -78,17 +78,21 @@ public class CountingSource {
*/
@Deprecated
public static BoundedSource<Long> upTo(long numElements) {
- checkArgument(numElements > 0, "numElements (%s) must be greater than 0", numElements);
+ checkArgument(numElements >= 0,
+ "numElements (%s) must be greater than or equal to 0",
+ numElements);
return new BoundedCountingSource(0, numElements);
}
/**
* Creates a {@link BoundedSource} that will produce elements
- * from {@code startIndex} to {@code endIndex - 1}.
+ * starting from {@code startIndex} (inclusive) to {@code endIndex} (exclusive).
+ * If {@code startIndex == endIndex}, then no elements will be produced.
*/
static BoundedSource<Long> createSourceForSubrange(long startIndex, long endIndex) {
- checkArgument(endIndex > startIndex, "endIndex (%s) must be greater than startIndex (%s)",
- endIndex, startIndex);
+ checkArgument(endIndex >= startIndex,
+ "endIndex (%s) must be greater than or equal to startIndex (%s)",
+ endIndex, startIndex);
return new BoundedCountingSource(startIndex, endIndex);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/30ff1ee1/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
index 02b4ba0..4349f66 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
@@ -71,7 +71,7 @@ public class CountingInputTest {
public void testBoundedInput() {
Pipeline p = TestPipeline.create();
long numElements = 1000;
- PCollection<Long> input = p.apply(CountingInput.upTo(numElements));
+ PCollection<Long> input = p.apply(Read.from(CountingSource.upTo(numElements)));
addCountingAsserts(input, 0, numElements);
p.run();
@@ -79,6 +79,27 @@ public class CountingInputTest {
@Test
@Category(RunnableOnService.class)
+ public void testEmptyBoundedSource() {
+ Pipeline p = TestPipeline.create();
+ PCollection<Long> input = p.apply(CountingInput.upTo(0));
+
+ PAssert.that(input).empty();
+ p.run();
+ }
+
+ @Test
+ @Category(RunnableOnService.class)
+ public void testEmptyBoundedSourceUsingRange() {
+ Pipeline p = TestPipeline.create();
+ PCollection<Long> input = p.apply(CountingInput.forSubrange(42, 42));
+
+ PAssert.that(input).empty();
+ p.run();
+ }
+
+
+ @Test
+ @Category(RunnableOnService.class)
public void testBoundedInputSubrange() {
Pipeline p = TestPipeline.create();
long start = 10;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/30ff1ee1/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
index 88c68d3..5eccde6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
@@ -92,6 +92,16 @@ public class CountingSourceTest {
@Test
@Category(RunnableOnService.class)
+ public void testEmptyBoundedSource() {
+ Pipeline p = TestPipeline.create();
+ PCollection<Long> input = p.apply(Read.from(CountingSource.upTo(0)));
+
+ PAssert.that(input).empty();
+ p.run();
+ }
+
+ @Test
+ @Category(RunnableOnService.class)
public void testBoundedSourceSplits() throws Exception {
Pipeline p = TestPipeline.create();
long numElements = 1000;
[2/2] incubator-beam git commit: Add support for having an empty
CountingInput/CountingSource
Posted by lc...@apache.org.
Add support for having an empty CountingInput/CountingSource
This closes #1557
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ddb59125
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ddb59125
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ddb59125
Branch: refs/heads/master
Commit: ddb59125aeacb809b7695c203fe8b1a40e36aed2
Parents: 40bd276 30ff1ee
Author: Luke Cwik <lc...@google.com>
Authored: Thu Dec 8 18:41:42 2016 -0800
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Dec 8 18:41:42 2016 -0800
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/CountingInput.java | 12 ++++++----
.../org/apache/beam/sdk/io/CountingSource.java | 12 ++++++----
.../apache/beam/sdk/io/CountingInputTest.java | 23 +++++++++++++++++++-
.../apache/beam/sdk/io/CountingSourceTest.java | 10 +++++++++
4 files changed, 48 insertions(+), 9 deletions(-)
----------------------------------------------------------------------