You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2016/12/06 10:17:10 UTC
[1/2] incubator-beam git commit: [BEAM-961] Add starting number to
CountingInput
Repository: incubator-beam
Updated Branches:
refs/heads/master 1efda59ab -> 493c04faa
[BEAM-961] Add starting number to CountingInput
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/41ae08bf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/41ae08bf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/41ae08bf
Branch: refs/heads/master
Commit: 41ae08bf18525f52b03252dee783505ae400911e
Parents: 1efda59
Author: Vladisav Jelisavcic <vj...@apache.org>
Authored: Sun Dec 4 10:42:28 2016 +0100
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Tue Dec 6 11:01:45 2016 +0100
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/CountingInput.java | 42 ++++++++++++++++----
.../org/apache/beam/sdk/io/CountingSource.java | 11 +++++
.../apache/beam/sdk/io/CountingInputTest.java | 42 +++++++++++++++-----
3 files changed, 76 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41ae08bf/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
index f479215..456d291 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java
@@ -35,7 +35,7 @@ import org.joda.time.Instant;
/**
* A {@link PTransform} that produces longs. When used to produce a
* {@link IsBounded#BOUNDED bounded} {@link PCollection}, {@link CountingInput} starts at {@code 0}
- * and counts up to a specified maximum. When used to produce an
+ * or starting value, and counts up to a specified maximum. When used to produce an
* {@link IsBounded#UNBOUNDED unbounded} {@link PCollection}, it counts up to {@link Long#MAX_VALUE}
* and then never produces more output. (In practice, this limit should never be reached.)
*
@@ -43,7 +43,8 @@ import org.joda.time.Instant;
* {@link OffsetBasedSource.OffsetBasedReader}, so it performs efficient initial splitting and it
* supports dynamic work rebalancing.
*
- * <p>To produce a bounded {@code PCollection<Long>}, use {@link CountingInput#upTo(long)}:
+ * <p>To produce a bounded {@code PCollection<Long>} starting from {@code 0},
+ * use {@link CountingInput#upTo(long)}:
*
* <pre>{@code
* Pipeline p = ...
@@ -51,6 +52,9 @@ import org.joda.time.Instant;
* PCollection<Long> bounded = p.apply(producer);
* }</pre>
*
+ * <p>To produce a bounded {@code PCollection<Long>} starting from {@code startOffset},
+ * use {@link CountingInput#forSubrange(long, long)} instead.
+ *
* <p>To produce an unbounded {@code PCollection<Long>}, use {@link CountingInput#unbounded()},
* calling {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} to provide values
* with timestamps other than {@link Instant#now}.
@@ -76,6 +80,16 @@ public class CountingInput {
}
/**
+ * Creates a {@link BoundedCountingInput} that will produce elements
+ * starting from {@code startIndex} to {@code endIndex - 1}.
+ */
+ public static BoundedCountingInput forSubrange(long startIndex, long endIndex) {
+ checkArgument(endIndex > startIndex, "endIndex (%s) must be greater than startIndex (%s)",
+ endIndex, startIndex);
+ return new BoundedCountingInput(startIndex, endIndex);
+ }
+
+ /**
* Creates an {@link UnboundedCountingInput} that will produce numbers starting from {@code 0} up
* to {@link Long#MAX_VALUE}.
*
@@ -102,23 +116,35 @@ public class CountingInput {
* 0.
*/
public static class BoundedCountingInput extends PTransform<PBegin, PCollection<Long>> {
- private final long numElements;
+ private final long startIndex;
+ private final long endIndex;
private BoundedCountingInput(long numElements) {
- this.numElements = numElements;
+ this.endIndex = numElements;
+ this.startIndex = 0;
+ }
+
+ private BoundedCountingInput(long startIndex, long endIndex) {
+ this.endIndex = endIndex;
+ this.startIndex = startIndex;
}
- @SuppressWarnings("deprecation")
@Override
public PCollection<Long> apply(PBegin begin) {
- return begin.apply(Read.from(CountingSource.upTo(numElements)));
+ return begin.apply(Read.from(CountingSource.createSourceForSubrange(startIndex, endIndex)));
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
- builder.add(DisplayData.item("upTo", numElements)
- .withLabel("Count Up To"));
+
+ if (startIndex == 0) {
+ builder.add(DisplayData.item("upTo", endIndex)
+ .withLabel("Count Up To"));
+ } else {
+ builder.add(DisplayData.item("startAt", startIndex).withLabel("Count Starting At"))
+ .add(DisplayData.item("upTo", endIndex).withLabel("Count Up To"));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41ae08bf/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
index 59a8df8..bc7fb78 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
@@ -83,6 +83,17 @@ public class CountingSource {
}
/**
+ * Creates a {@link BoundedSource} that will produce elements
+ * from {@code startIndex} to {@code endIndex - 1}.
+ */
+ static BoundedSource<Long> createSourceForSubrange(long startIndex, long endIndex) {
+ checkArgument(endIndex > startIndex, "endIndex (%s) must be greater than startIndex (%s)",
+ endIndex, startIndex);
+
+ return new BoundedCountingSource(startIndex, endIndex);
+ }
+
+ /**
* Create a new {@link UnboundedCountingSource}.
*/
// package-private to return a typed UnboundedCountingSource rather than the UnboundedSource type.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41ae08bf/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
index 2397d10..02b4ba0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
@@ -49,21 +49,21 @@ import org.junit.runners.JUnit4;
*/
@RunWith(JUnit4.class)
public class CountingInputTest {
- public static void addCountingAsserts(PCollection<Long> input, long numElements) {
+ public static void addCountingAsserts(PCollection<Long> input, long start, long end) {
// Count == numElements
PAssert.thatSingleton(input.apply("Count", Count.<Long>globally()))
- .isEqualTo(numElements);
+ .isEqualTo(end - start);
// Unique count == numElements
PAssert.thatSingleton(
input
.apply(Distinct.<Long>create())
.apply("UniqueCount", Count.<Long>globally()))
- .isEqualTo(numElements);
- // Min == 0
- PAssert.thatSingleton(input.apply("Min", Min.<Long>globally())).isEqualTo(0L);
- // Max == numElements-1
+ .isEqualTo(end - start);
+ // Min == start
+ PAssert.thatSingleton(input.apply("Min", Min.<Long>globally())).isEqualTo(start);
+ // Max == end-1
PAssert.thatSingleton(input.apply("Max", Max.<Long>globally()))
- .isEqualTo(numElements - 1);
+ .isEqualTo(end - 1);
}
@Test
@@ -73,7 +73,19 @@ public class CountingInputTest {
long numElements = 1000;
PCollection<Long> input = p.apply(CountingInput.upTo(numElements));
- addCountingAsserts(input, numElements);
+ addCountingAsserts(input, 0, numElements);
+ p.run();
+ }
+
+ @Test
+ @Category(RunnableOnService.class)
+ public void testBoundedInputSubrange() {
+ Pipeline p = TestPipeline.create();
+ long start = 10;
+ long end = 1000;
+ PCollection<Long> input = p.apply(CountingInput.forSubrange(start, end));
+
+ addCountingAsserts(input, start, end);
p.run();
}
@@ -85,6 +97,14 @@ public class CountingInputTest {
}
@Test
+ public void testBoundedDisplayDataSubrange() {
+ PTransform<?, ?> input = CountingInput.forSubrange(12, 1234);
+ DisplayData displayData = DisplayData.from(input);
+ assertThat(displayData, hasDisplayItem("startAt", 12));
+ assertThat(displayData, hasDisplayItem("upTo", 1234));
+ }
+
+ @Test
@Category(RunnableOnService.class)
public void testUnboundedInput() {
Pipeline p = TestPipeline.create();
@@ -92,7 +112,7 @@ public class CountingInputTest {
PCollection<Long> input = p.apply(CountingInput.unbounded().withMaxNumRecords(numElements));
- addCountingAsserts(input, numElements);
+ addCountingAsserts(input, 0, numElements);
p.run();
}
@@ -110,7 +130,7 @@ public class CountingInputTest {
.withRate(elemsPerPeriod, periodLength)
.withMaxNumRecords(numElements));
- addCountingAsserts(input, numElements);
+ addCountingAsserts(input, 0, numElements);
long expectedRuntimeMillis = (periodLength.getMillis() * numElements) / elemsPerPeriod;
Instant startTime = Instant.now();
p.run();
@@ -136,7 +156,7 @@ public class CountingInputTest {
CountingInput.unbounded()
.withTimestampFn(new ValueAsTimestampFn())
.withMaxNumRecords(numElements));
- addCountingAsserts(input, numElements);
+ addCountingAsserts(input, 0, numElements);
PCollection<Long> diffs =
input
[2/2] incubator-beam git commit: [BEAM-961] This closes #1505
Posted by jb...@apache.org.
[BEAM-961] This closes #1505
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/493c04fa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/493c04fa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/493c04fa
Branch: refs/heads/master
Commit: 493c04faa12b8bb7483e5805bd4eb4e3280feaca
Parents: 1efda59 41ae08b
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Tue Dec 6 11:17:02 2016 +0100
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Tue Dec 6 11:17:02 2016 +0100
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/CountingInput.java | 42 ++++++++++++++++----
.../org/apache/beam/sdk/io/CountingSource.java | 11 +++++
.../apache/beam/sdk/io/CountingInputTest.java | 42 +++++++++++++++-----
3 files changed, 76 insertions(+), 19 deletions(-)
----------------------------------------------------------------------