You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/18 19:01:43 UTC
[2/3] beam git commit: [BEAM-1272] Align the naming of
"generateInitialSplits" and "splitIntoBundles" to better reflect their
intention
[BEAM-1272] Align the naming of "generateInitialSplits" and "splitIntoBundles" to better reflect their intention
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/62464b5b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/62464b5b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/62464b5b
Branch: refs/heads/master
Commit: 62464b5bc8ff191dbbde2f3b1019742dea0287bc
Parents: 4124cc6
Author: Etienne Chauchot <ec...@gmail.com>
Authored: Thu Apr 13 10:55:20 2017 +0200
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Apr 18 12:01:27 2017 -0700
----------------------------------------------------------------------
.../apex/translation/utils/ValuesSource.java | 2 +-
.../apex/examples/UnboundedTextSource.java | 2 +-
.../translation/GroupByKeyTranslatorTest.java | 2 +-
.../apex/translation/utils/CollectionSource.java | 2 +-
.../UnboundedReadFromBoundedSource.java | 7 ++++---
.../direct/BoundedReadEvaluatorFactory.java | 2 +-
.../direct/UnboundedReadEvaluatorFactory.java | 2 +-
.../direct/BoundedReadEvaluatorFactoryTest.java | 4 ++--
.../beam/runners/direct/DirectRunnerTest.java | 4 ++--
.../UnboundedReadEvaluatorFactoryTest.java | 2 +-
.../translation/wrappers/SourceInputFormat.java | 3 ++-
.../streaming/io/BoundedSourceWrapper.java | 2 +-
.../streaming/io/UnboundedSocketSource.java | 2 +-
.../streaming/io/UnboundedSourceWrapper.java | 2 +-
.../flink/streaming/TestCountingSource.java | 2 +-
.../runners/dataflow/internal/CustomSources.java | 2 +-
.../beam/runners/spark/io/MicrobatchSource.java | 5 ++---
.../beam/runners/spark/io/SourceDStream.java | 2 +-
.../apache/beam/runners/spark/io/SourceRDD.java | 4 ++--
.../sdk/io/BoundedReadFromUnboundedSource.java | 4 ++--
.../org/apache/beam/sdk/io/BoundedSource.java | 13 +++++++++++--
.../org/apache/beam/sdk/io/CountingSource.java | 2 +-
.../org/apache/beam/sdk/io/FileBasedSource.java | 9 +++++----
.../apache/beam/sdk/io/OffsetBasedSource.java | 4 ++--
.../org/apache/beam/sdk/io/UnboundedSource.java | 2 +-
.../apache/beam/sdk/testing/SourceTestUtils.java | 6 +++---
.../org/apache/beam/sdk/io/AvroSourceTest.java | 10 +++++-----
.../apache/beam/sdk/io/CountingSourceTest.java | 6 +++---
.../apache/beam/sdk/io/FileBasedSourceTest.java | 8 ++++----
.../beam/sdk/io/OffsetBasedSourceTest.java | 8 ++++----
.../java/org/apache/beam/sdk/io/ReadTest.java | 4 ++--
.../java/org/apache/beam/sdk/io/TextIOTest.java | 16 ++++++++--------
.../org/apache/beam/sdk/io/XmlSourceTest.java | 8 ++++----
.../sdk/runners/dataflow/TestCountingSource.java | 2 +-
.../beam/sdk/testing/SourceTestUtilsTest.java | 2 +-
.../apache/beam/sdk/transforms/CreateTest.java | 12 ++++++------
.../sdk/io/elasticsearch/ElasticsearchIO.java | 2 +-
.../sdk/io/elasticsearch/ElasticsearchIOIT.java | 2 +-
.../io/elasticsearch/ElasticsearchIOTest.java | 4 ++--
.../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 2 +-
.../sdk/io/gcp/bigquery/TransformingSource.java | 4 ++--
.../beam/sdk/io/gcp/bigtable/BigtableIO.java | 6 +++---
.../sdk/io/gcp/pubsub/PubsubUnboundedSource.java | 6 +++---
.../beam/sdk/io/gcp/bigquery/BigQueryIOTest.java | 10 +++++-----
.../beam/sdk/io/gcp/bigtable/BigtableIOTest.java | 8 +++++---
.../io/gcp/pubsub/PubsubUnboundedSourceTest.java | 4 ++--
.../hadoop/inputformat/HadoopInputFormatIO.java | 4 ++--
.../inputformat/HadoopInputFormatIOTest.java | 19 +++++++++++--------
.../org/apache/beam/sdk/io/hbase/HBaseIO.java | 7 +++----
.../apache/beam/sdk/io/hbase/HBaseIOTest.java | 2 +-
.../apache/beam/sdk/io/hdfs/HDFSFileSource.java | 6 +++---
.../beam/sdk/io/hdfs/HDFSFileSourceTest.java | 4 ++--
.../java/org/apache/beam/sdk/io/jms/JmsIO.java | 2 +-
.../org/apache/beam/sdk/io/jms/JmsIOTest.java | 4 ++--
.../org/apache/beam/sdk/io/kafka/KafkaIO.java | 12 ++++++------
.../apache/beam/sdk/io/kafka/KafkaIOTest.java | 8 ++++----
.../beam/sdk/io/kinesis/KinesisSource.java | 2 +-
.../beam/sdk/io/mongodb/MongoDbGridFSIO.java | 4 ++--
.../apache/beam/sdk/io/mongodb/MongoDbIO.java | 2 +-
.../beam/sdk/io/mongodb/MongoDBGridFSIOTest.java | 2 +-
.../java/org/apache/beam/sdk/io/mqtt/MqttIO.java | 2 +-
61 files changed, 156 insertions(+), 141 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java
index 8526618..62c92a0 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java
@@ -55,7 +55,7 @@ public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.Checkpoi
}
@Override
- public java.util.List<? extends UnboundedSource<T, CheckpointMark>> generateInitialSplits(
+ public java.util.List<? extends UnboundedSource<T, CheckpointMark>> split(
int desiredNumSplits, PipelineOptions options) throws Exception {
return Collections.singletonList(this);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java
index 8132ee5..abe97f6 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java
@@ -39,7 +39,7 @@ public class UnboundedTextSource extends UnboundedSource<String, UnboundedSource
private static final long serialVersionUID = 1L;
@Override
- public List<? extends UnboundedSource<String, CheckpointMark>> generateInitialSplits(
+ public List<? extends UnboundedSource<String, CheckpointMark>> split(
int desiredNumSplits, PipelineOptions options) throws Exception {
return Collections.<UnboundedSource<String, CheckpointMark>>singletonList(this);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
index 96963a0..193de71 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
@@ -131,7 +131,7 @@ public class GroupByKeyTranslatorTest {
}
@Override
- public List<? extends UnboundedSource<String, CheckpointMark>> generateInitialSplits(
+ public List<? extends UnboundedSource<String, CheckpointMark>> split(
int desiredNumSplits, PipelineOptions options) throws Exception {
return Collections.<UnboundedSource<String, CheckpointMark>>singletonList(this);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java
index c3b35f9..92812b4 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java
@@ -47,7 +47,7 @@ public class CollectionSource<T> extends UnboundedSource<T, UnboundedSource.Chec
}
@Override
- public List<? extends UnboundedSource<T, CheckpointMark>> generateInitialSplits(
+ public List<? extends UnboundedSource<T, CheckpointMark>> split(
int desiredNumSplits, PipelineOptions options) throws Exception {
return Collections.singletonList(this);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
index 6b7bd71..f67af8a 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
@@ -61,7 +61,8 @@ import org.slf4j.LoggerFactory;
/**
* {@link PTransform} that converts a {@link BoundedSource} as an {@link UnboundedSource}.
*
- * <p>{@link BoundedSource} is read directly without calling {@link BoundedSource#splitIntoBundles},
+ * <p>{@link BoundedSource} is read directly without calling
+ * {@link BoundedSource#split},
* and element timestamps are propagated. While any elements remain, the watermark is the beginning
* of time {@link BoundedWindow#TIMESTAMP_MIN_VALUE}, and after all elements have been produced
* the watermark goes to the end of time {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.
@@ -130,7 +131,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
}
@Override
- public List<BoundedToUnboundedSourceAdapter<T>> generateInitialSplits(
+ public List<BoundedToUnboundedSourceAdapter<T>> split(
int desiredNumSplits, PipelineOptions options) throws Exception {
try {
long desiredBundleSize = boundedSource.getEstimatedSizeBytes(options) / desiredNumSplits;
@@ -140,7 +141,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
return ImmutableList.of(this);
}
List<? extends BoundedSource<T>> splits =
- boundedSource.splitIntoBundles(desiredBundleSize, options);
+ boundedSource.split(desiredBundleSize, options);
if (splits == null) {
LOG.warn("BoundedSource cannot split {}, skips the initial splits.", boundedSource);
return ImmutableList.of(this);
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
index 5bd6f7e..0c2afe8 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
@@ -196,7 +196,7 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
long estimatedBytes = source.getEstimatedSizeBytes(options);
long bytesPerBundle = estimatedBytes / targetParallelism;
List<? extends BoundedSource<T>> bundles =
- source.splitIntoBundles(bytesPerBundle, options);
+ source.split(bytesPerBundle, options);
ImmutableList.Builder<CommittedBundle<BoundedSourceShard<T>>> shards =
ImmutableList.builder();
for (BoundedSource<T> bundle : bundles) {
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index 91e7248..d3609f8 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -301,7 +301,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
throws Exception {
UnboundedSource<OutputT, ?> source = transform.getTransform().getSource();
List<? extends UnboundedSource<OutputT, ?>> splits =
- source.generateInitialSplits(targetParallelism, evaluationContext.getPipelineOptions());
+ source.split(targetParallelism, evaluationContext.getPipelineOptions());
UnboundedReadDeduplicator deduplicator =
source.requiresDeduping()
? UnboundedReadDeduplicator.CachedIdDeduplicator.create()
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
index 8361bdc..2b5b46d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
@@ -265,7 +265,7 @@ public class BoundedReadEvaluatorFactoryTest {
public void boundedSourceInMemoryTransformEvaluatorShardsOfSource() throws Exception {
PipelineOptions options = PipelineOptionsFactory.create();
List<? extends BoundedSource<Long>> splits =
- source.splitIntoBundles(source.getEstimatedSizeBytes(options) / 2, options);
+ source.split(source.getEstimatedSizeBytes(options) / 2, options);
UncommittedBundle<BoundedSourceShard<Long>> rootBundle = bundleFactory.createRootBundle();
for (BoundedSource<Long> split : splits) {
@@ -365,7 +365,7 @@ public class BoundedReadEvaluatorFactoryTest {
}
@Override
- public List<? extends OffsetBasedSource<T>> splitIntoBundles(
+ public List<? extends OffsetBasedSource<T>> split(
long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
return ImmutableList.of(this);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 3b81f4d..ed19be2 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -549,13 +549,13 @@ public class DirectRunnerTest implements Serializable {
}
@Override
- public List<? extends BoundedSource<T>> splitIntoBundles(
+ public List<? extends BoundedSource<T>> split(
long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
// Must have more than
checkState(
desiredBundleSizeBytes < getEstimatedSizeBytes(options),
"Must split into more than one source");
- return underlying.splitIntoBundles(desiredBundleSizeBytes, options);
+ return underlying.split(desiredBundleSizeBytes, options);
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index 8707f31..567ee98 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -450,7 +450,7 @@ public class UnboundedReadEvaluatorFactoryTest {
}
@Override
- public List<? extends UnboundedSource<T, TestCheckpointMark>> generateInitialSplits(
+ public List<? extends UnboundedSource<T, TestCheckpointMark>> split(
int desiredNumSplits, PipelineOptions options) throws Exception {
return ImmutableList.of(this);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
index 443378f..a87472b 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
@@ -100,7 +100,8 @@ public class SourceInputFormat<T>
public SourceInputSplit<T>[] createInputSplits(int numSplits) throws IOException {
try {
long desiredSizeBytes = initialSource.getEstimatedSizeBytes(options) / numSplits;
- List<? extends Source<T>> shards = initialSource.splitIntoBundles(desiredSizeBytes, options);
+ List<? extends Source<T>> shards =
+ initialSource.split(desiredSizeBytes, options);
int numShards = shards.size();
SourceInputSplit<T>[] sourceInputSplits = new SourceInputSplit[numShards];
for (int i = 0; i < numShards; i++) {
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
index 820a9bd..2ed5024 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
@@ -76,7 +76,7 @@ public class BoundedSourceWrapper<OutputT>
// get the splits early. we assume that the generated splits are stable,
// this is necessary so that the mapping of state to source is correct
// when restoring
- splitSources = source.splitIntoBundles(desiredBundleSize, pipelineOptions);
+ splitSources = source.split(desiredBundleSize, pipelineOptions);
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
index ed03dda..910a33f 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
@@ -94,7 +94,7 @@ public class UnboundedSocketSource<CheckpointMarkT extends UnboundedSource.Check
}
@Override
- public List<? extends UnboundedSource<String, CheckpointMarkT>> generateInitialSplits(
+ public List<? extends UnboundedSource<String, CheckpointMarkT>> split(
int desiredNumSplits,
PipelineOptions options) throws Exception {
return Collections.<UnboundedSource<String, CheckpointMarkT>>singletonList(this);
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index 2849464..bb9b58a 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -156,7 +156,7 @@ public class UnboundedSourceWrapper<
// get the splits early. we assume that the generated splits are stable,
// this is necessary so that the mapping of state to source is correct
// when restoring
- splitSources = source.generateInitialSplits(parallelism, pipelineOptions);
+ splitSources = source.split(parallelism, pipelineOptions);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
index 9251d42..3a08088 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
@@ -104,7 +104,7 @@ public class TestCountingSource
}
@Override
- public List<TestCountingSource> generateInitialSplits(
+ public List<TestCountingSource> split(
int desiredNumSplits, PipelineOptions options) {
List<TestCountingSource> splits = new ArrayList<>();
int numSplits = allowSplitting ? desiredNumSplits : 1;
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java
index ffbf153..778ccf3 100755
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java
@@ -98,7 +98,7 @@ public class CustomSources {
int desiredNumSplits =
getDesiredNumUnboundedSourceSplits(options.as(DataflowPipelineOptions.class));
for (UnboundedSource<?, ?> split :
- unboundedSource.generateInitialSplits(desiredNumSplits, options)) {
+ unboundedSource.split(desiredNumSplits, options)) {
encodedSplits.add(encodeBase64String(serializeToByteArray(split)));
}
checkArgument(!encodedSplits.isEmpty(), "UnboundedSources must have at least one split");
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
index 847de19..7c07920 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
@@ -102,12 +102,11 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
}
@Override
- public List<? extends BoundedSource<T>>
- splitIntoBundles(long desiredBundleSizeBytes,
+ public List<? extends BoundedSource<T>> split(long desiredBundleSizeBytes,
PipelineOptions options) throws Exception {
List<MicrobatchSource<T, CheckpointMarkT>> result = new ArrayList<>();
List<? extends UnboundedSource<T, CheckpointMarkT>> splits =
- source.generateInitialSplits(numInitialSplits, options);
+ source.split(numInitialSplits, options);
int numSplits = splits.size();
long[] numRecords = splitNumRecords(maxNumRecords, numSplits);
for (int i = 0; i < numSplits; i++) {
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
index fb6da97..d33529c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
@@ -104,7 +104,7 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
try {
this.numPartitions =
createMicrobatchSource()
- .splitIntoBundles(initialParallelism, options)
+ .split(initialParallelism, options)
.size();
} catch (Exception e) {
throw new RuntimeException(e);
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
index 2f9a827..b99ae10 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
@@ -105,7 +105,7 @@ public class SourceRDD {
+ "size of {} bytes.", source, DEFAULT_BUNDLE_SIZE);
}
try {
- List<? extends Source<T>> partitionedSources = source.splitIntoBundles(desiredSizeBytes,
+ List<? extends Source<T>> partitionedSources = source.split(desiredSizeBytes,
runtimeContext.getPipelineOptions());
Partition[] partitions = new SourcePartition[partitionedSources.size()];
for (int i = 0; i < partitionedSources.size(); i++) {
@@ -258,7 +258,7 @@ public class SourceRDD {
@Override
public Partition[] getPartitions() {
try {
- List<? extends Source<T>> partitionedSources = microbatchSource.splitIntoBundles(
+ List<? extends Source<T>> partitionedSources = microbatchSource.split(
-1 /* ignored */, runtimeContext.getPipelineOptions());
Partition[] partitions = new CheckpointableSourcePartition[partitionedSources.size()];
for (int i = 0; i < partitionedSources.size(); i++) {
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
index d7f1d7b..e54176f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
@@ -186,12 +186,12 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
}
@Override
- public List<? extends BoundedSource<ValueWithRecordId<T>>> splitIntoBundles(
+ public List<? extends BoundedSource<ValueWithRecordId<T>>> split(
long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
List<UnboundedToBoundedSourceAdapter<T>> result = new ArrayList<>();
int numInitialSplits = numInitialSplits(getMaxNumRecords());
List<? extends UnboundedSource<T, ?>> splits =
- getSource().generateInitialSplits(numInitialSplits, options);
+ getSource().split(numInitialSplits, options);
int numSplits = splits.size();
long[] numRecords = splitNumRecords(getMaxNumRecords(), numSplits);
for (int i = 0; i < numSplits; i++) {
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java
index cd389e8..0b19aa2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java
@@ -34,7 +34,7 @@ import org.joda.time.Instant;
*
* <p>The operations are:
* <ul>
- * <li>Splitting into bundles of given size: {@link #splitIntoBundles};
+ * <li>Splitting into sources that read bundles of given size: {@link #split};
* <li>Size estimation: {@link #getEstimatedSizeBytes};
* <li>The accompanying {@link BoundedReader reader} has additional functionality to enable runners
* to dynamically adapt based on runtime conditions.
@@ -54,10 +54,19 @@ public abstract class BoundedSource<T> extends Source<T> {
/**
* Splits the source into bundles of approximately {@code desiredBundleSizeBytes}.
*/
- public abstract List<? extends BoundedSource<T>> splitIntoBundles(
+ public abstract List<? extends BoundedSource<T>> split(
long desiredBundleSizeBytes, PipelineOptions options) throws Exception;
/**
+ * {@link BoundedSource#split(long, PipelineOptions)} old method name to be used with Dataflow.
+ */
+ @Deprecated
+ public List<? extends BoundedSource<T>> splitIntoBundles(
+ long desiredBundleSizeBytes, PipelineOptions options) throws Exception{
+ return split(desiredBundleSizeBytes, options);
+ }
+
+ /**
* An estimate of the total size (in bytes) of the data that would be read from this source.
* This estimate is in terms of external storage size, before any decompression or other
* processing done by the reader.
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
index 4d1305c..73b663d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
@@ -326,7 +326,7 @@ public class CountingSource {
* {@code [2, 8, 14, ...)}, and {@code [4, 10, 16, ...)}.
*/
@Override
- public List<? extends UnboundedSource<Long, CountingSource.CounterMark>> generateInitialSplits(
+ public List<? extends UnboundedSource<Long, CountingSource.CounterMark>> split(
int desiredNumSplits, PipelineOptions options) throws Exception {
// Using Javadoc example, stride 2 with 3 splits becomes stride 6.
long newStride = stride * desiredNumSplits;
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
index f38743a..95e6078 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
@@ -324,15 +324,15 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
@Override
public List<? extends FileBasedSource<T>> call() throws Exception {
return createForSubrangeOfFile(file, 0, Long.MAX_VALUE)
- .splitIntoBundles(desiredBundleSizeBytes, options);
+ .split(desiredBundleSizeBytes, options);
}
});
}
@Override
- public final List<? extends FileBasedSource<T>> splitIntoBundles(
+ public final List<? extends FileBasedSource<T>> split(
long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
- // This implementation of method splitIntoBundles is provided to simplify subclasses. Here we
+ // This implementation of method split is provided to simplify subclasses. Here we
// split a FileBasedSource based on a file pattern to FileBasedSources based on full single
// files. For files that can be efficiently seeked, we further split FileBasedSources based on
// those files to FileBasedSources based on sub ranges of single files.
@@ -370,7 +370,8 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
} else {
if (isSplittable()) {
List<FileBasedSource<T>> splitResults = new ArrayList<>();
- for (OffsetBasedSource<T> split : super.splitIntoBundles(desiredBundleSizeBytes, options)) {
+ for (OffsetBasedSource<T> split :
+ super.split(desiredBundleSizeBytes, options)) {
splitResults.add((FileBasedSource<T>) split);
}
return splitResults;
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
index e9a398d..05f0d97 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
@@ -108,7 +108,7 @@ public abstract class OffsetBasedSource<T> extends BoundedSource<T> {
}
@Override
- public List<? extends OffsetBasedSource<T>> splitIntoBundles(
+ public List<? extends OffsetBasedSource<T>> split(
long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
// Split the range into bundles based on the desiredBundleSizeBytes. Final bundle is adjusted to
// make sure that we do not end up with a too small bundle at the end. If the desired bundle
@@ -163,7 +163,7 @@ public abstract class OffsetBasedSource<T> extends BoundedSource<T> {
/**
* Returns approximately how many bytes of data correspond to a single offset in this source.
* Used for translation between this source's range and methods defined in terms of bytes, such
- * as {@link #getEstimatedSizeBytes} and {@link #splitIntoBundles}.
+ * as {@link #getEstimatedSizeBytes} and {@link #split}.
*
* <p>Defaults to {@code 1} byte, which is the common case for, e.g., file sources.
*/
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java
index cc1f598..af6a8cc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java
@@ -65,7 +65,7 @@ public abstract class UnboundedSource<
* as possible, but does not have to match exactly. A low number of splits
* will limit the amount of parallelism in the source.
*/
- public abstract List<? extends UnboundedSource<OutputT, CheckpointMarkT>> generateInitialSplits(
+ public abstract List<? extends UnboundedSource<OutputT, CheckpointMarkT>> split(
int desiredNumSplits, PipelineOptions options) throws Exception;
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
index a2a33f3..2ab5b35 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
@@ -60,7 +60,7 @@ import org.slf4j.LoggerFactory;
* amount of test coverage with few code. Most notable ones are:
* <ul>
* <li>{@link #assertSourcesEqualReferenceSource} helps testing that the data read
- * by the union of sources produced by {@link BoundedSource#splitIntoBundles}
+ * by the union of sources produced by {@link BoundedSource#split}
* is the same as data read by the original source.
* <li>If your source implements dynamic work rebalancing, use the
* {@code assertSplitAtFraction} family of functions - they test behavior of
@@ -685,7 +685,7 @@ public class SourceTestUtils {
*
* <p>It forwards most methods to the given {@code boundedSource}, except:
* <ol>
- * <li> {@link BoundedSource#splitIntoBundles} rejects initial splitting
+ * <li> {@link BoundedSource#split} rejects initial splitting
* by returning itself in a list.
* <li> {@link BoundedReader#splitAtFraction} rejects dynamic splitting by returning null.
* </ol>
@@ -708,7 +708,7 @@ public class SourceTestUtils {
}
@Override
- public List<? extends BoundedSource<T>> splitIntoBundles(
+ public List<? extends BoundedSource<T>> split(
long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
return ImmutableList.of(this);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
index fb7b27d..78485c7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
@@ -168,7 +168,7 @@ public class AvroSourceTest {
AvroSource<FixedRecord> source = AvroSource.from(filename).withSchema(FixedRecord.class);
List<? extends BoundedSource<FixedRecord>> splits =
- source.splitIntoBundles(file.length() / 3, null);
+ source.split(file.length() / 3, null);
for (BoundedSource<FixedRecord> subSource : splits) {
int items = SourceTestUtils.readFromSource(subSource, null).size();
// Shouldn't split while unstarted.
@@ -201,7 +201,7 @@ public class AvroSourceTest {
}
List<? extends BoundedSource<FixedRecord>> splits =
- source.splitIntoBundles(file.length() / 3, null);
+ source.split(file.length() / 3, null);
for (BoundedSource<FixedRecord> subSource : splits) {
try (BoundedSource.BoundedReader<FixedRecord> reader = subSource.createReader(null)) {
assertEquals(Double.valueOf(0.0), reader.getFractionConsumed());
@@ -339,7 +339,7 @@ public class AvroSourceTest {
int nonEmptySplits;
// Split with the minimum bundle size
- splits = source.splitIntoBundles(100L, options);
+ splits = source.split(100L, options);
assertTrue(splits.size() > 2);
SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
nonEmptySplits = 0;
@@ -351,7 +351,7 @@ public class AvroSourceTest {
assertTrue(nonEmptySplits > 2);
// Split with larger bundle size
- splits = source.splitIntoBundles(file.length() / 4, options);
+ splits = source.split(file.length() / 4, options);
assertTrue(splits.size() > 2);
SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
nonEmptySplits = 0;
@@ -363,7 +363,7 @@ public class AvroSourceTest {
assertTrue(nonEmptySplits > 2);
// Split with the file length
- splits = source.splitIntoBundles(file.length(), options);
+ splits = source.split(file.length(), options);
assertTrue(splits.size() == 1);
SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
index 0e3b07e..8807164 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
@@ -110,7 +110,7 @@ public class CountingSourceTest {
BoundedSource<Long> initial = CountingSource.upTo(numElements);
List<? extends BoundedSource<Long>> splits =
- initial.splitIntoBundles(splitSizeBytes, p.getOptions());
+ initial.split(splitSizeBytes, p.getOptions());
assertEquals("Expected exact splitting", numSplits, splits.size());
// Assemble all the splits into one flattened PCollection, also verify their sizes.
@@ -234,7 +234,7 @@ public class CountingSourceTest {
UnboundedSource<Long, ?> initial = CountingSource.unbounded();
List<? extends UnboundedSource<Long, ?>> splits =
- initial.generateInitialSplits(numSplits, p.getOptions());
+ initial.split(numSplits, p.getOptions());
assertEquals("Expected exact splitting", numSplits, splits.size());
long elementsPerSplit = numElements / numSplits;
@@ -262,7 +262,7 @@ public class CountingSourceTest {
UnboundedCountingSource initial =
CountingSource.createUnbounded().withRate(elementsPerPeriod, period);
List<? extends UnboundedSource<Long, ?>> splits =
- initial.generateInitialSplits(numSplits, p.getOptions());
+ initial.split(numSplits, p.getOptions());
assertEquals("Expected exact splitting", numSplits, splits.size());
long elementsPerSplit = numElements / numSplits;
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
index a889305..94a29da 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
@@ -410,7 +410,7 @@ public class FileBasedSourceTest {
TestFileBasedSource source =
new TestFileBasedSource(file0.getParent() + "/" + "file*", Long.MAX_VALUE, null);
- List<? extends BoundedSource<String>> splits = source.splitIntoBundles(Long.MAX_VALUE, null);
+ List<? extends BoundedSource<String>> splits = source.split(Long.MAX_VALUE, null);
assertEquals(numFiles, splits.size());
}
@@ -421,7 +421,7 @@ public class FileBasedSourceTest {
TestFileBasedSource source = new TestFileBasedSource(missingFilePath, Long.MAX_VALUE, null);
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(String.format("Unable to find any files matching %s", missingFilePath));
- source.splitIntoBundles(1234, options);
+ source.split(1234, options);
}
@Test
@@ -698,7 +698,7 @@ public class FileBasedSourceTest {
TestFileBasedSource source = new TestFileBasedSource(file.getPath(), 16, null);
- List<? extends BoundedSource<String>> sources = source.splitIntoBundles(32, null);
+ List<? extends BoundedSource<String>> sources = source.split(32, null);
// Not a trivial split.
assertTrue(sources.size() > 1);
@@ -877,7 +877,7 @@ public class FileBasedSourceTest {
TestFileBasedSource source =
new TestFileBasedSource(new File(file1.getParent(), "file*").getPath(), 64, null);
- List<? extends BoundedSource<String>> sources = source.splitIntoBundles(512, null);
+ List<? extends BoundedSource<String>> sources = source.split(512, null);
// Not a trivial split.
assertTrue(sources.size() > 1);
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
index a300a9a..25168a3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
@@ -147,7 +147,7 @@ public class OffsetBasedSourceTest {
CoarseRangeSource testSource = new CoarseRangeSource(start, end, minBundleSize, 1);
long[] boundaries = {0, 150, 300, 450, 600, 750, 900, 1000};
assertSplitsAre(
- testSource.splitIntoBundles(150 * testSource.getBytesPerOffset(), null),
+ testSource.split(150 * testSource.getBytesPerOffset(), null),
boundaries);
}
@@ -159,7 +159,7 @@ public class OffsetBasedSourceTest {
CoarseRangeSource testSource = new CoarseRangeSource(start, end, minBundleSize, 1);
long[] boundaries = {300, 450, 600, 750, 900, 1000};
assertSplitsAre(
- testSource.splitIntoBundles(150 * testSource.getBytesPerOffset(), null),
+ testSource.split(150 * testSource.getBytesPerOffset(), null),
boundaries);
}
@@ -182,7 +182,7 @@ public class OffsetBasedSourceTest {
CoarseRangeSource testSource = new CoarseRangeSource(start, end, minBundleSize, 1);
long[] boundaries = {300, 450, 600, 750, 1000};
assertSplitsAre(
- testSource.splitIntoBundles(100 * testSource.getBytesPerOffset(), null),
+ testSource.split(100 * testSource.getBytesPerOffset(), null),
boundaries);
}
@@ -195,7 +195,7 @@ public class OffsetBasedSourceTest {
// Last 10 bytes should collapse to the previous bundle.
long[] boundaries = {0, 110, 220, 330, 440, 550, 660, 770, 880, 1000};
assertSplitsAre(
- testSource.splitIntoBundles(110 * testSource.getBytesPerOffset(), null),
+ testSource.split(110 * testSource.getBytesPerOffset(), null),
boundaries);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
index 416a086..74acf18 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
@@ -152,7 +152,7 @@ public class ReadTest implements Serializable{
private abstract static class CustomBoundedSource extends BoundedSource<String> {
@Override
- public List<? extends BoundedSource<String>> splitIntoBundles(
+ public List<? extends BoundedSource<String>> split(
long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
return null;
}
@@ -186,7 +186,7 @@ public class ReadTest implements Serializable{
private abstract static class CustomUnboundedSource
extends UnboundedSource<String, NoOpCheckpointMark> {
@Override
- public List<? extends UnboundedSource<String, NoOpCheckpointMark>> generateInitialSplits(
+ public List<? extends UnboundedSource<String, NoOpCheckpointMark>> split(
int desiredNumSplits, PipelineOptions options) throws Exception {
return null;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 2e36273..3b6992a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -1118,7 +1118,7 @@ public class TextIOTest {
}
@Test
- public void testInitialSplitIntoBundlesAutoModeTxt() throws Exception {
+ public void testInitialSplitAutoModeTxt() throws Exception {
PipelineOptions options = TestPipeline.testingPipelineOptions();
long desiredBundleSize = 1000;
@@ -1127,7 +1127,7 @@ public class TextIOTest {
FileBasedSource<String> source = TextIO.Read.from(largeTxt.getPath()).getSource();
List<? extends FileBasedSource<String>> splits =
- source.splitIntoBundles(desiredBundleSize, options);
+ source.split(desiredBundleSize, options);
// At least 2 splits and they are equal to reading the whole file.
assertThat(splits, hasSize(greaterThan(1)));
@@ -1135,7 +1135,7 @@ public class TextIOTest {
}
@Test
- public void testInitialSplitIntoBundlesAutoModeGz() throws Exception {
+ public void testInitialSplitAutoModeGz() throws Exception {
long desiredBundleSize = 1000;
PipelineOptions options = TestPipeline.testingPipelineOptions();
@@ -1144,7 +1144,7 @@ public class TextIOTest {
FileBasedSource<String> source = TextIO.Read.from(largeGz.getPath()).getSource();
List<? extends FileBasedSource<String>> splits =
- source.splitIntoBundles(desiredBundleSize, options);
+ source.split(desiredBundleSize, options);
// Exactly 1 split, even in AUTO mode, since it is a gzip file.
assertThat(splits, hasSize(equalTo(1)));
@@ -1152,7 +1152,7 @@ public class TextIOTest {
}
@Test
- public void testInitialSplitIntoBundlesGzipModeTxt() throws Exception {
+ public void testInitialSplitGzipModeTxt() throws Exception {
PipelineOptions options = TestPipeline.testingPipelineOptions();
long desiredBundleSize = 1000;
@@ -1162,7 +1162,7 @@ public class TextIOTest {
FileBasedSource<String> source =
TextIO.Read.from(largeTxt.getPath()).withCompressionType(GZIP).getSource();
List<? extends FileBasedSource<String>> splits =
- source.splitIntoBundles(desiredBundleSize, options);
+ source.split(desiredBundleSize, options);
// Exactly 1 split, even though splittable text file, since using GZIP mode.
assertThat(splits, hasSize(equalTo(1)));
@@ -1170,7 +1170,7 @@ public class TextIOTest {
}
@Test
- public void testInitialSplitIntoBundlesGzipModeGz() throws Exception {
+ public void testInitialSplitGzipModeGz() throws Exception {
PipelineOptions options = TestPipeline.testingPipelineOptions();
long desiredBundleSize = 1000;
@@ -1180,7 +1180,7 @@ public class TextIOTest {
FileBasedSource<String> source =
TextIO.Read.from(largeGz.getPath()).withCompressionType(GZIP).getSource();
List<? extends FileBasedSource<String>> splits =
- source.splitIntoBundles(desiredBundleSize, options);
+ source.split(desiredBundleSize, options);
// Exactly 1 split using .gz extension and using GZIP mode.
assertThat(splits, hasSize(equalTo(1)));
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java
index d6898d5..5f71f30 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java
@@ -363,7 +363,7 @@ public class XmlSourceTest {
.withRecordElement("train")
.withRecordClass(Train.class)
.withMinBundleSize(10);
- List<? extends FileBasedSource<Train>> splits = source.splitIntoBundles(50, null);
+ List<? extends FileBasedSource<Train>> splits = source.split(50, null);
assertTrue(splits.size() > 2);
@@ -686,7 +686,7 @@ public class XmlSourceTest {
.withRecordElement("train")
.withRecordClass(Train.class)
.withMinBundleSize(10);
- List<? extends FileBasedSource<Train>> splits = source.splitIntoBundles(100, null);
+ List<? extends FileBasedSource<Train>> splits = source.split(100, null);
assertTrue(splits.size() > 2);
@@ -710,7 +710,7 @@ public class XmlSourceTest {
.withRecordElement("train")
.withRecordClass(Train.class)
.withMinBundleSize(10);
- List<? extends FileBasedSource<Train>> splits = source.splitIntoBundles(256, null);
+ List<? extends FileBasedSource<Train>> splits = source.split(256, null);
// Not a trivial split
assertTrue(splits.size() > 2);
@@ -737,7 +737,7 @@ public class XmlSourceTest {
.withMinBundleSize(10);
List<? extends FileBasedSource<Train>> splits =
- fileSource.splitIntoBundles(file.length() / 3, null);
+ fileSource.split(file.length() / 3, null);
for (BoundedSource<Train> splitSource : splits) {
int numItems = readEverythingFromReader(splitSource.createReader(null)).size();
// Should not split while unstarted.
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java
index b53d1fc..9fcc3c5 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java
@@ -104,7 +104,7 @@ public class TestCountingSource
}
@Override
- public List<TestCountingSource> generateInitialSplits(
+ public List<TestCountingSource> split(
int desiredNumSplits, PipelineOptions options) {
List<TestCountingSource> splits = new ArrayList<>();
int numSplits = allowSplitting ? desiredNumSplits : 1;
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SourceTestUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SourceTestUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SourceTestUtilsTest.java
index efb385d..62114b0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SourceTestUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SourceTestUtilsTest.java
@@ -43,7 +43,7 @@ public class SourceTestUtilsTest {
PipelineOptions options = PipelineOptionsFactory.create();
BoundedSource<Long> baseSource = CountingSource.upTo(100);
BoundedSource<Long> unsplittableSource = SourceTestUtils.toUnsplittableSource(baseSource);
- List<?> splits = unsplittableSource.splitIntoBundles(1, options);
+ List<?> splits = unsplittableSource.split(1, options);
assertEquals(splits.size(), 1);
assertEquals(splits.get(0), unsplittableSource);
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index 43e4463..8a30476 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -406,32 +406,32 @@ public class CreateTest {
}
@Test
- public void testSourceSplitIntoBundles() throws Exception {
+ public void testSourceSplit() throws Exception {
CreateSource<Integer> source =
CreateSource.fromIterable(
ImmutableList.of(1, 2, 3, 4, 5, 6, 7, 8), BigEndianIntegerCoder.of());
PipelineOptions options = PipelineOptionsFactory.create();
- List<? extends BoundedSource<Integer>> splitSources = source.splitIntoBundles(12, options);
+ List<? extends BoundedSource<Integer>> splitSources = source.split(12, options);
assertThat(splitSources, hasSize(3));
SourceTestUtils.assertSourcesEqualReferenceSource(source, splitSources, options);
}
@Test
- public void testSourceSplitIntoBundlesVoid() throws Exception {
+ public void testSourceSplitVoid() throws Exception {
CreateSource<Void> source =
CreateSource.fromIterable(
Lists.<Void>newArrayList(null, null, null, null, null), VoidCoder.of());
PipelineOptions options = PipelineOptionsFactory.create();
- List<? extends BoundedSource<Void>> splitSources = source.splitIntoBundles(3, options);
+ List<? extends BoundedSource<Void>> splitSources = source.split(3, options);
SourceTestUtils.assertSourcesEqualReferenceSource(source, splitSources, options);
}
@Test
- public void testSourceSplitIntoBundlesEmpty() throws Exception {
+ public void testSourceSplitEmpty() throws Exception {
CreateSource<Integer> source =
CreateSource.fromIterable(ImmutableList.<Integer>of(), BigEndianIntegerCoder.of());
PipelineOptions options = PipelineOptionsFactory.create();
- List<? extends BoundedSource<Integer>> splitSources = source.splitIntoBundles(12, options);
+ List<? extends BoundedSource<Integer>> splitSources = source.split(12, options);
SourceTestUtils.assertSourcesEqualReferenceSource(source, splitSources, options);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index baf0cc2..8e138ef 100644
--- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -428,7 +428,7 @@ public class ElasticsearchIO {
}
@Override
- public List<? extends BoundedSource<String>> splitIntoBundles(
+ public List<? extends BoundedSource<String>> split(
long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
List<BoundedElasticsearchSource> sources = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
index b5fec17..d968bc2 100644
--- a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
+++ b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
@@ -88,7 +88,7 @@ public class ElasticsearchIOIT {
// as many bundles as ES shards and bundle size is shard size
long desiredBundleSizeBytes = 0;
List<? extends BoundedSource<String>> splits =
- initialSource.splitIntoBundles(desiredBundleSizeBytes, options);
+ initialSource.split(desiredBundleSizeBytes, options);
SourceTestUtils.assertSourcesEqualReferenceSource(initialSource, splits, options);
//this is the number of ES shards
// (By default, each index in Elasticsearch is allocated 5 primary shards)
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index bca0fe8..260af79 100644
--- a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -326,7 +326,7 @@ public class ElasticsearchIOTest implements Serializable {
}
@Test
- public void testSplitIntoBundles() throws Exception {
+ public void testSplit() throws Exception {
ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client());
PipelineOptions options = PipelineOptionsFactory.create();
ElasticsearchIO.Read read =
@@ -336,7 +336,7 @@ public class ElasticsearchIOTest implements Serializable {
// as many bundles as ES shards and bundle size is shard size
int desiredBundleSizeBytes = 0;
List<? extends BoundedSource<String>> splits =
- initialSource.splitIntoBundles(desiredBundleSizeBytes, options);
+ initialSource.split(desiredBundleSizeBytes, options);
SourceTestUtils.assertSourcesEqualReferenceSource(initialSource, splits, options);
//this is the number of ES shards
// (By default, each index in Elasticsearch is allocated 5 primary shards)
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index 746258f..1b90dc3 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -81,7 +81,7 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
}
@Override
- public List<BoundedSource<TableRow>> splitIntoBundles(
+ public List<BoundedSource<TableRow>> split(
long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
TableReference tableToExtract = getTableToExtract(bqOptions);
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java
index f7d8252..b8e6b39 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java
@@ -52,10 +52,10 @@ class TransformingSource<T, V> extends BoundedSource<V> {
}
@Override
- public List<? extends BoundedSource<V>> splitIntoBundles(
+ public List<? extends BoundedSource<V>> split(
long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
return Lists.transform(
- boundedSource.splitIntoBundles(desiredBundleSizeBytes, options),
+ boundedSource.split(desiredBundleSizeBytes, options),
new Function<BoundedSource<T>, BoundedSource<V>>() {
@Override
public BoundedSource<V> apply(BoundedSource<T> input) {
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 89c67a4..28f8878 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -724,7 +724,7 @@ public class BigtableIO {
}
@Override
- public List<BigtableSource> splitIntoBundles(
+ public List<BigtableSource> split(
long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
// Update the desiredBundleSizeBytes in order to limit the
// number of splits to maximumNumberOfSplits.
@@ -734,11 +734,11 @@ public class BigtableIO {
Math.max(sizeEstimate / maximumNumberOfSplits, desiredBundleSizeBytes);
// Delegate to testable helper.
- return splitIntoBundlesBasedOnSamples(desiredBundleSizeBytes, getSampleRowKeys(options));
+ return splitBasedOnSamples(desiredBundleSizeBytes, getSampleRowKeys(options));
}
/** Helper that splits this source into bundles based on Cloud Bigtable sampled row keys. */
- private List<BigtableSource> splitIntoBundlesBasedOnSamples(
+ private List<BigtableSource> splitBasedOnSamples(
long desiredBundleSizeBytes, List<SampleRowKeysResponse> sampleRowKeys) {
// There are no regions, or no samples available. Just scan the entire range.
if (sampleRowKeys.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index 9d8763b..0389d4b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -1119,7 +1119,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
}
@Override
- public List<PubsubSource<T>> generateInitialSplits(
+ public List<PubsubSource<T>> split(
int desiredNumSplits, PipelineOptions options) throws Exception {
List<PubsubSource<T>> result = new ArrayList<>(desiredNumSplits);
PubsubSource<T> splitSource = this;
@@ -1142,8 +1142,8 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
SubscriptionPath subscription = subscriptionPath;
if (subscription == null) {
if (checkpoint == null) {
- // This reader has never been started and there was no call to #splitIntoBundles; create
- // a single random subscription, which will be kept in the checkpoint.
+ // This reader has never been started and there was no call to #split;
+ // create a single random subscription, which will be kept in the checkpoint.
subscription = outer.createRandomSubscription(options);
} else {
subscription = checkpoint.getSubscription();
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 2a2bf91..83fd8d9 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -1756,7 +1756,7 @@ public class BigQueryIOTest implements Serializable {
SourceTestUtils.assertSplitAtFractionBehavior(
bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
- List<? extends BoundedSource<TableRow>> sources = bqSource.splitIntoBundles(100, options);
+ List<? extends BoundedSource<TableRow>> sources = bqSource.split(100, options);
assertEquals(1, sources.size());
BoundedSource<TableRow> actual = sources.get(0);
assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class));
@@ -1835,7 +1835,7 @@ public class BigQueryIOTest implements Serializable {
SourceTestUtils.assertSplitAtFractionBehavior(
bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
- List<? extends BoundedSource<TableRow>> sources = bqSource.splitIntoBundles(100, options);
+ List<? extends BoundedSource<TableRow>> sources = bqSource.split(100, options);
assertEquals(1, sources.size());
BoundedSource<TableRow> actual = sources.get(0);
assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class));
@@ -1917,7 +1917,7 @@ public class BigQueryIOTest implements Serializable {
SourceTestUtils.assertSplitAtFractionBehavior(
bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
- List<? extends BoundedSource<TableRow>> sources = bqSource.splitIntoBundles(100, options);
+ List<? extends BoundedSource<TableRow>> sources = bqSource.split(100, options);
assertEquals(1, sources.size());
BoundedSource<TableRow> actual = sources.get(0);
assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class));
@@ -1963,7 +1963,7 @@ public class BigQueryIOTest implements Serializable {
stringSource, 100, 0.3, ExpectedSplitOutcome.MUST_SUCCEED_AND_BE_CONSISTENT, options);
SourceTestUtils.assertSourcesEqualReferenceSource(
- stringSource, stringSource.splitIntoBundles(100, options), options);
+ stringSource, stringSource.split(100, options), options);
}
@Test
@@ -1994,7 +1994,7 @@ public class BigQueryIOTest implements Serializable {
stringSource, 100, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
SourceTestUtils.assertSourcesEqualReferenceSource(
- stringSource, stringSource.splitIntoBundles(100, options), options);
+ stringSource, stringSource.split(100, options), options);
}
@Test
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index 1c770a2..3653753 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -476,7 +476,7 @@ public class BigtableIOTest {
ByteKeyRange.ALL_KEYS,
null /*size*/);
List<BigtableSource> splits =
- source.splitIntoBundles(numRows * bytesPerRow / numSamples, null /* options */);
+ source.split(numRows * bytesPerRow / numSamples, null /* options */);
// Test num splits and split equality.
assertThat(splits, hasSize(numSamples));
@@ -503,7 +503,8 @@ public class BigtableIOTest {
null /*filter*/,
ByteKeyRange.ALL_KEYS,
null /*size*/);
- List<BigtableSource> splits = source.splitIntoBundles(numRows * bytesPerRow / numSplits, null);
+ List<BigtableSource> splits =
+ source.split(numRows * bytesPerRow / numSplits, null);
// Test num splits and split equality.
assertThat(splits, hasSize(numSplits));
@@ -528,7 +529,8 @@ public class BigtableIOTest {
RowFilter.newBuilder().setRowKeyRegexFilter(ByteString.copyFromUtf8(".*17.*")).build();
BigtableSource source =
new BigtableSource(serviceFactory, table, filter, ByteKeyRange.ALL_KEYS, null /*size*/);
- List<BigtableSource> splits = source.splitIntoBundles(numRows * bytesPerRow / numSplits, null);
+ List<BigtableSource> splits =
+ source.split(numRows * bytesPerRow / numSplits, null);
// Test num splits and split equality.
assertThat(splits, hasSize(numSplits));
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
index d2e88c3..949ba4f 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
@@ -324,7 +324,7 @@ public class PubsubUnboundedSourceTest {
}
@Test
- public void noSubscriptionSplitIntoBundlesGeneratesSubscription() throws Exception {
+ public void noSubscriptionSplitGeneratesSubscription() throws Exception {
TopicPath topicPath = PubsubClient.topicPathFromName("my_project", "my_topic");
factory = PubsubTestClient.createFactoryForCreateSubscription();
PubsubUnboundedSource<String> source =
@@ -343,7 +343,7 @@ public class PubsubUnboundedSourceTest {
PipelineOptions options = PipelineOptionsFactory.create();
List<PubsubSource<String>> splits =
- (new PubsubSource<>(source)).generateInitialSplits(3, options);
+ (new PubsubSource<>(source)).split(3, options);
// We have at least one returned split
assertThat(splits, hasSize(greaterThan(0)));
for (PubsubSource<String> split : splits) {
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
index d776ea0..93ff108 100644
--- a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
+++ b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
@@ -449,7 +449,7 @@ public class HadoopInputFormatIO {
}
@Override
- public List<BoundedSource<KV<K, V>>> splitIntoBundles(long desiredBundleSizeBytes,
+ public List<BoundedSource<KV<K, V>>> split(long desiredBundleSizeBytes,
PipelineOptions options) throws Exception {
// desiredBundleSizeBytes is not being considered as splitting based on this
// value is not supported by inputFormat getSplits() method.
@@ -485,7 +485,7 @@ public class HadoopInputFormatIO {
/**
* This is a helper function to compute splits. This method will also calculate size of the
* data being read. Note: This method is executed exactly once and the splits are retrieved
- * and cached in this. These splits are further used by splitIntoBundles() and
+ * and cached in this. These splits are further used by split() and
* getEstimatedSizeBytes().
*/
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
index 3a4a99d..da70632 100644
--- a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
+++ b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.io.hadoop.inputformat.EmployeeInputFormat.NewObjectsE
import org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO.HadoopInputFormatBoundedSource;
import org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO.SerializableConfiguration;
import org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO.SerializableSplit;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -518,8 +519,8 @@ public class HadoopInputFormatIOTest {
// Validate if estimated size is equal to the size of records.
assertEquals(referenceRecords.size(), estimatedSize);
List<BoundedSource<KV<Text, Employee>>> boundedSourceList =
- hifSource.splitIntoBundles(0, p.getOptions());
- // Validate if splitIntoBundles() has split correctly.
+ hifSource.split(0, p.getOptions());
+ // Validate if split() has split correctly.
assertEquals(TestEmployeeDataSet.NUMBER_OF_SPLITS, boundedSourceList.size());
List<KV<Text, Employee>> bundleRecords = new ArrayList<>();
for (BoundedSource<KV<Text, Employee>> source : boundedSourceList) {
@@ -638,12 +639,14 @@ public class HadoopInputFormatIOTest {
}
/**
- * This test validates behavior of {@link HadoopInputFormatBoundedSource#createReader()
- * createReader()} method when {@link HadoopInputFormatBoundedSource#splitIntoBundles()
- * splitIntoBundles()} is not called.
+ * This test validates behavior of
+ * {@link HadoopInputFormatBoundedSource#createReader(PipelineOptions)}
+ * createReader()} method when
+ * {@link HadoopInputFormatBoundedSource#split(long, PipelineOptions)}
+ * split()} is not called.
*/
@Test
- public void testCreateReaderIfSplitIntoBundlesNotCalled() throws Exception {
+ public void testCreateReaderIfSplitNotCalled() throws Exception {
HadoopInputFormatBoundedSource<Text, Employee> hifSource = getTestHIFSource(
EmployeeInputFormat.class,
Text.class,
@@ -658,7 +661,7 @@ public class HadoopInputFormatIOTest {
/**
* This test validates behavior of
* {@link HadoopInputFormatBoundedSource#computeSplitsIfNecessary() computeSplits()} when Hadoop
- * InputFormat's {@link InputFormat#getSplits() getSplits()} returns empty list.
+ * InputFormat's {@link InputFormat#getSplits(JobContext)} returns empty list.
*/
@Test
public void testComputeSplitsIfGetSplitsReturnsEmptyList() throws Exception {
@@ -843,6 +846,6 @@ public class HadoopInputFormatIOTest {
inputFormatValueClass,
keyCoder,
valueCoder);
- return boundedSource.splitIntoBundles(0, p.getOptions());
+ return boundedSource.split(0, p.getOptions());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
index ed191cb..ccdcef6 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
@@ -422,10 +422,9 @@ public class HBaseIO {
return sources;
}
- @Override
- public List<? extends BoundedSource<Result>>
- splitIntoBundles(long desiredBundleSizeBytes, PipelineOptions options)
- throws Exception {
+ @Override
+ public List<? extends BoundedSource<Result>> split(
+ long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
LOG.debug("desiredBundleSize {} bytes", desiredBundleSizeBytes);
long estimatedSizeBytes = getEstimatedSizeBytes(options);
int numSplits = 1;
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
index ee3369e..c2410ea 100644
--- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
+++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
@@ -197,7 +197,7 @@ public class HBaseIOTest {
HBaseIO.Read read = HBaseIO.read().withConfiguration(conf).withTableId(table);
HBaseSource source = new HBaseSource(read, null /* estimatedSizeBytes */);
List<? extends BoundedSource<Result>> splits =
- source.splitIntoBundles(numRows * bytesPerRow / numRegions,
+ source.split(numRows * bytesPerRow / numRegions,
null /* options */);
// Test num splits and split equality.
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
index b55944b..5cc2097 100644
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
@@ -265,7 +265,7 @@ public abstract class HDFSFileSource<T, K, V> extends BoundedSource<T> {
// =======================================================================
@Override
- public List<? extends BoundedSource<T>> splitIntoBundles(
+ public List<? extends BoundedSource<T>> split(
final long desiredBundleSizeBytes,
PipelineOptions options) throws Exception {
if (serializableSplit() == null) {
@@ -296,8 +296,8 @@ public abstract class HDFSFileSource<T, K, V> extends BoundedSource<T> {
long size = 0;
try {
- // If this source represents a split from splitIntoBundles, then return the size of the split,
- // rather then the entire input
+ // If this source represents a split from split,
+ // then return the size of the split, rather then the entire input
if (serializableSplit() != null) {
return serializableSplit().getSplit().getLength();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
index c821d9d..a964239 100644
--- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
+++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
@@ -159,7 +159,7 @@ public class HDFSFileSourceTest {
// Split with a small bundle size (has to be at least size of sync interval)
List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source
- .splitIntoBundles(SequenceFile.SYNC_INTERVAL, options);
+ .split(SequenceFile.SYNC_INTERVAL, options);
assertTrue(splits.size() > 2);
SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
int nonEmptySplits = 0;
@@ -184,7 +184,7 @@ public class HDFSFileSourceTest {
long originalSize = source.getEstimatedSizeBytes(options);
long splitTotalSize = 0;
- List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source.splitIntoBundles(
+ List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source.split(
SequenceFile.SYNC_INTERVAL, options
);
for (BoundedSource<KV<IntWritable, Text>> splitSource : splits) {
http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
index 89016ac..104bea4 100644
--- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
@@ -340,7 +340,7 @@ public class JmsIO {
}
@Override
- public List<UnboundedJmsSource> generateInitialSplits(
+ public List<UnboundedJmsSource> split(
int desiredNumSplits, PipelineOptions options) throws Exception {
List<UnboundedJmsSource> sources = new ArrayList<>();
if (spec.getTopic() != null) {