You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/12/22 23:34:54 UTC
[3/5] incubator-beam git commit: Use CountingSource in
ForceStreamingTest
Use CountingSource in ForceStreamingTest
Removes the requirement to have a FakeUnboundedSource, plus the read is
fully specified.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6d972629
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6d972629
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6d972629
Branch: refs/heads/python-sdk
Commit: 6d9726290f61dd97f81de47c4070fb27e7e07432
Parents: d5c0175
Author: Thomas Groh <tg...@google.com>
Authored: Tue Dec 20 14:23:21 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Dec 22 15:34:41 2016 -0800
----------------------------------------------------------------------
.../beam/runners/spark/ForceStreamingTest.java | 39 +-------------------
1 file changed, 2 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6d972629/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
index eb17eea..1b2ff08 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
@@ -23,10 +23,9 @@ import static org.hamcrest.MatcherAssert.assertThat;
import java.io.IOException;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource;
+import org.apache.beam.sdk.io.CountingSource;
import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.TransformHierarchy;
@@ -59,7 +58,7 @@ public class ForceStreamingTest {
// apply the BoundedReadFromUnboundedSource.
@SuppressWarnings("unchecked")
BoundedReadFromUnboundedSource boundedRead =
- Read.from(new FakeUnboundedSource()).withMaxNumRecords(-1);
+ Read.from(CountingSource.unbounded()).withMaxNumRecords(-1);
//noinspection unchecked
pipeline.apply(boundedRead);
@@ -86,38 +85,4 @@ public class ForceStreamingTest {
}
}
-
- /**
- * A fake {@link UnboundedSource} to satisfy the compiler.
- */
- private static class FakeUnboundedSource extends UnboundedSource {
-
- @Override
- public List<? extends UnboundedSource> generateInitialSplits(
- int desiredNumSplits,
- PipelineOptions options) throws Exception {
- return null;
- }
-
- @Override
- public UnboundedReader createReader(
- PipelineOptions options,
- CheckpointMark checkpointMark) throws IOException {
- return null;
- }
-
- @Override
- public Coder getCheckpointMarkCoder() {
- return null;
- }
-
- @Override
- public void validate() { }
-
- @Override
- public Coder getDefaultOutputCoder() {
- return null;
- }
- }
-
}