You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/18 19:01:43 UTC

[2/3] beam git commit: [BEAM-1272] Align the naming of "generateInitialSplits" and "splitIntoBundles" to better reflect their intention

[BEAM-1272] Align the naming of "generateInitialSplits" and "splitIntoBundles" to better reflect their intention


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/62464b5b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/62464b5b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/62464b5b

Branch: refs/heads/master
Commit: 62464b5bc8ff191dbbde2f3b1019742dea0287bc
Parents: 4124cc6
Author: Etienne Chauchot <ec...@gmail.com>
Authored: Thu Apr 13 10:55:20 2017 +0200
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Apr 18 12:01:27 2017 -0700

----------------------------------------------------------------------
 .../apex/translation/utils/ValuesSource.java     |  2 +-
 .../apex/examples/UnboundedTextSource.java       |  2 +-
 .../translation/GroupByKeyTranslatorTest.java    |  2 +-
 .../apex/translation/utils/CollectionSource.java |  2 +-
 .../UnboundedReadFromBoundedSource.java          |  7 ++++---
 .../direct/BoundedReadEvaluatorFactory.java      |  2 +-
 .../direct/UnboundedReadEvaluatorFactory.java    |  2 +-
 .../direct/BoundedReadEvaluatorFactoryTest.java  |  4 ++--
 .../beam/runners/direct/DirectRunnerTest.java    |  4 ++--
 .../UnboundedReadEvaluatorFactoryTest.java       |  2 +-
 .../translation/wrappers/SourceInputFormat.java  |  3 ++-
 .../streaming/io/BoundedSourceWrapper.java       |  2 +-
 .../streaming/io/UnboundedSocketSource.java      |  2 +-
 .../streaming/io/UnboundedSourceWrapper.java     |  2 +-
 .../flink/streaming/TestCountingSource.java      |  2 +-
 .../runners/dataflow/internal/CustomSources.java |  2 +-
 .../beam/runners/spark/io/MicrobatchSource.java  |  5 ++---
 .../beam/runners/spark/io/SourceDStream.java     |  2 +-
 .../apache/beam/runners/spark/io/SourceRDD.java  |  4 ++--
 .../sdk/io/BoundedReadFromUnboundedSource.java   |  4 ++--
 .../org/apache/beam/sdk/io/BoundedSource.java    | 13 +++++++++++--
 .../org/apache/beam/sdk/io/CountingSource.java   |  2 +-
 .../org/apache/beam/sdk/io/FileBasedSource.java  |  9 +++++----
 .../apache/beam/sdk/io/OffsetBasedSource.java    |  4 ++--
 .../org/apache/beam/sdk/io/UnboundedSource.java  |  2 +-
 .../apache/beam/sdk/testing/SourceTestUtils.java |  6 +++---
 .../org/apache/beam/sdk/io/AvroSourceTest.java   | 10 +++++-----
 .../apache/beam/sdk/io/CountingSourceTest.java   |  6 +++---
 .../apache/beam/sdk/io/FileBasedSourceTest.java  |  8 ++++----
 .../beam/sdk/io/OffsetBasedSourceTest.java       |  8 ++++----
 .../java/org/apache/beam/sdk/io/ReadTest.java    |  4 ++--
 .../java/org/apache/beam/sdk/io/TextIOTest.java  | 16 ++++++++--------
 .../org/apache/beam/sdk/io/XmlSourceTest.java    |  8 ++++----
 .../sdk/runners/dataflow/TestCountingSource.java |  2 +-
 .../beam/sdk/testing/SourceTestUtilsTest.java    |  2 +-
 .../apache/beam/sdk/transforms/CreateTest.java   | 12 ++++++------
 .../sdk/io/elasticsearch/ElasticsearchIO.java    |  2 +-
 .../sdk/io/elasticsearch/ElasticsearchIOIT.java  |  2 +-
 .../io/elasticsearch/ElasticsearchIOTest.java    |  4 ++--
 .../sdk/io/gcp/bigquery/BigQuerySourceBase.java  |  2 +-
 .../sdk/io/gcp/bigquery/TransformingSource.java  |  4 ++--
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java     |  6 +++---
 .../sdk/io/gcp/pubsub/PubsubUnboundedSource.java |  6 +++---
 .../beam/sdk/io/gcp/bigquery/BigQueryIOTest.java | 10 +++++-----
 .../beam/sdk/io/gcp/bigtable/BigtableIOTest.java |  8 +++++---
 .../io/gcp/pubsub/PubsubUnboundedSourceTest.java |  4 ++--
 .../hadoop/inputformat/HadoopInputFormatIO.java  |  4 ++--
 .../inputformat/HadoopInputFormatIOTest.java     | 19 +++++++++++--------
 .../org/apache/beam/sdk/io/hbase/HBaseIO.java    |  7 +++----
 .../apache/beam/sdk/io/hbase/HBaseIOTest.java    |  2 +-
 .../apache/beam/sdk/io/hdfs/HDFSFileSource.java  |  6 +++---
 .../beam/sdk/io/hdfs/HDFSFileSourceTest.java     |  4 ++--
 .../java/org/apache/beam/sdk/io/jms/JmsIO.java   |  2 +-
 .../org/apache/beam/sdk/io/jms/JmsIOTest.java    |  4 ++--
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java    | 12 ++++++------
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java    |  8 ++++----
 .../beam/sdk/io/kinesis/KinesisSource.java       |  2 +-
 .../beam/sdk/io/mongodb/MongoDbGridFSIO.java     |  4 ++--
 .../apache/beam/sdk/io/mongodb/MongoDbIO.java    |  2 +-
 .../beam/sdk/io/mongodb/MongoDBGridFSIOTest.java |  2 +-
 .../java/org/apache/beam/sdk/io/mqtt/MqttIO.java |  2 +-
 61 files changed, 156 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java
index 8526618..62c92a0 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ValuesSource.java
@@ -55,7 +55,7 @@ public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.Checkpoi
   }
 
   @Override
-  public java.util.List<? extends UnboundedSource<T, CheckpointMark>> generateInitialSplits(
+  public java.util.List<? extends UnboundedSource<T, CheckpointMark>> split(
       int desiredNumSplits, PipelineOptions options) throws Exception {
     return Collections.singletonList(this);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java
index 8132ee5..abe97f6 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/UnboundedTextSource.java
@@ -39,7 +39,7 @@ public class UnboundedTextSource extends UnboundedSource<String, UnboundedSource
   private static final long serialVersionUID = 1L;
 
   @Override
-  public List<? extends UnboundedSource<String, CheckpointMark>> generateInitialSplits(
+  public List<? extends UnboundedSource<String, CheckpointMark>> split(
       int desiredNumSplits, PipelineOptions options) throws Exception {
     return Collections.<UnboundedSource<String, CheckpointMark>>singletonList(this);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
index 96963a0..193de71 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/GroupByKeyTranslatorTest.java
@@ -131,7 +131,7 @@ public class GroupByKeyTranslatorTest {
     }
 
     @Override
-    public List<? extends UnboundedSource<String, CheckpointMark>> generateInitialSplits(
+    public List<? extends UnboundedSource<String, CheckpointMark>> split(
         int desiredNumSplits, PipelineOptions options) throws Exception {
       return Collections.<UnboundedSource<String, CheckpointMark>>singletonList(this);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java
index c3b35f9..92812b4 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/CollectionSource.java
@@ -47,7 +47,7 @@ public class CollectionSource<T> extends UnboundedSource<T, UnboundedSource.Chec
   }
 
   @Override
-  public List<? extends UnboundedSource<T, CheckpointMark>> generateInitialSplits(
+  public List<? extends UnboundedSource<T, CheckpointMark>> split(
       int desiredNumSplits, PipelineOptions options) throws Exception {
     return Collections.singletonList(this);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
index 6b7bd71..f67af8a 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
@@ -61,7 +61,8 @@ import org.slf4j.LoggerFactory;
 /**
  * {@link PTransform} that converts a {@link BoundedSource} as an {@link UnboundedSource}.
  *
- * <p>{@link BoundedSource} is read directly without calling {@link BoundedSource#splitIntoBundles},
+ * <p>{@link BoundedSource} is read directly without calling
+ * {@link BoundedSource#split},
  * and element timestamps are propagated. While any elements remain, the watermark is the beginning
  * of time {@link BoundedWindow#TIMESTAMP_MIN_VALUE}, and after all elements have been produced
  * the watermark goes to the end of time {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.
@@ -130,7 +131,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
     }
 
     @Override
-    public List<BoundedToUnboundedSourceAdapter<T>> generateInitialSplits(
+    public List<BoundedToUnboundedSourceAdapter<T>> split(
         int desiredNumSplits, PipelineOptions options) throws Exception {
       try {
         long desiredBundleSize = boundedSource.getEstimatedSizeBytes(options) / desiredNumSplits;
@@ -140,7 +141,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
           return ImmutableList.of(this);
         }
         List<? extends BoundedSource<T>> splits =
-            boundedSource.splitIntoBundles(desiredBundleSize, options);
+            boundedSource.split(desiredBundleSize, options);
         if (splits == null) {
           LOG.warn("BoundedSource cannot split {}, skips the initial splits.", boundedSource);
           return ImmutableList.of(this);

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
index 5bd6f7e..0c2afe8 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
@@ -196,7 +196,7 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
       long estimatedBytes = source.getEstimatedSizeBytes(options);
       long bytesPerBundle = estimatedBytes / targetParallelism;
       List<? extends BoundedSource<T>> bundles =
-          source.splitIntoBundles(bytesPerBundle, options);
+          source.split(bytesPerBundle, options);
       ImmutableList.Builder<CommittedBundle<BoundedSourceShard<T>>> shards =
           ImmutableList.builder();
       for (BoundedSource<T> bundle : bundles) {

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index 91e7248..d3609f8 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -301,7 +301,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
         throws Exception {
       UnboundedSource<OutputT, ?> source = transform.getTransform().getSource();
       List<? extends UnboundedSource<OutputT, ?>> splits =
-          source.generateInitialSplits(targetParallelism, evaluationContext.getPipelineOptions());
+          source.split(targetParallelism, evaluationContext.getPipelineOptions());
       UnboundedReadDeduplicator deduplicator =
           source.requiresDeduping()
               ? UnboundedReadDeduplicator.CachedIdDeduplicator.create()

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
index 8361bdc..2b5b46d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
@@ -265,7 +265,7 @@ public class BoundedReadEvaluatorFactoryTest {
   public void boundedSourceInMemoryTransformEvaluatorShardsOfSource() throws Exception {
     PipelineOptions options = PipelineOptionsFactory.create();
     List<? extends BoundedSource<Long>> splits =
-        source.splitIntoBundles(source.getEstimatedSizeBytes(options) / 2, options);
+        source.split(source.getEstimatedSizeBytes(options) / 2, options);
 
     UncommittedBundle<BoundedSourceShard<Long>> rootBundle = bundleFactory.createRootBundle();
     for (BoundedSource<Long> split : splits) {
@@ -365,7 +365,7 @@ public class BoundedReadEvaluatorFactoryTest {
     }
 
     @Override
-    public List<? extends OffsetBasedSource<T>> splitIntoBundles(
+    public List<? extends OffsetBasedSource<T>> split(
         long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
       return ImmutableList.of(this);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 3b81f4d..ed19be2 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -549,13 +549,13 @@ public class DirectRunnerTest implements Serializable {
     }
 
     @Override
-    public List<? extends BoundedSource<T>> splitIntoBundles(
+    public List<? extends BoundedSource<T>> split(
         long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
       // Must have more than
       checkState(
           desiredBundleSizeBytes < getEstimatedSizeBytes(options),
           "Must split into more than one source");
-      return underlying.splitIntoBundles(desiredBundleSizeBytes, options);
+      return underlying.split(desiredBundleSizeBytes, options);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index 8707f31..567ee98 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -450,7 +450,7 @@ public class UnboundedReadEvaluatorFactoryTest {
     }
 
     @Override
-    public List<? extends UnboundedSource<T, TestCheckpointMark>> generateInitialSplits(
+    public List<? extends UnboundedSource<T, TestCheckpointMark>> split(
         int desiredNumSplits, PipelineOptions options) throws Exception {
       return ImmutableList.of(this);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
index 443378f..a87472b 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
@@ -100,7 +100,8 @@ public class SourceInputFormat<T>
   public SourceInputSplit<T>[] createInputSplits(int numSplits) throws IOException {
     try {
       long desiredSizeBytes = initialSource.getEstimatedSizeBytes(options) / numSplits;
-      List<? extends Source<T>> shards = initialSource.splitIntoBundles(desiredSizeBytes, options);
+      List<? extends Source<T>> shards =
+          initialSource.split(desiredSizeBytes, options);
       int numShards = shards.size();
       SourceInputSplit<T>[] sourceInputSplits = new SourceInputSplit[numShards];
       for (int i = 0; i < numShards; i++) {

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
index 820a9bd..2ed5024 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
@@ -76,7 +76,7 @@ public class BoundedSourceWrapper<OutputT>
     // get the splits early. we assume that the generated splits are stable,
     // this is necessary so that the mapping of state to source is correct
     // when restoring
-    splitSources = source.splitIntoBundles(desiredBundleSize, pipelineOptions);
+    splitSources = source.split(desiredBundleSize, pipelineOptions);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
index ed03dda..910a33f 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
@@ -94,7 +94,7 @@ public class UnboundedSocketSource<CheckpointMarkT extends UnboundedSource.Check
   }
 
   @Override
-  public List<? extends UnboundedSource<String, CheckpointMarkT>> generateInitialSplits(
+  public List<? extends UnboundedSource<String, CheckpointMarkT>> split(
       int desiredNumSplits,
       PipelineOptions options) throws Exception {
     return Collections.<UnboundedSource<String, CheckpointMarkT>>singletonList(this);

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index 2849464..bb9b58a 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -156,7 +156,7 @@ public class UnboundedSourceWrapper<
     // get the splits early. we assume that the generated splits are stable,
     // this is necessary so that the mapping of state to source is correct
     // when restoring
-    splitSources = source.generateInitialSplits(parallelism, pipelineOptions);
+    splitSources = source.split(parallelism, pipelineOptions);
   }
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
index 9251d42..3a08088 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java
@@ -104,7 +104,7 @@ public class TestCountingSource
   }
 
   @Override
-  public List<TestCountingSource> generateInitialSplits(
+  public List<TestCountingSource> split(
       int desiredNumSplits, PipelineOptions options) {
     List<TestCountingSource> splits = new ArrayList<>();
     int numSplits = allowSplitting ? desiredNumSplits : 1;

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java
index ffbf153..778ccf3 100755
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java
@@ -98,7 +98,7 @@ public class CustomSources {
       int desiredNumSplits =
           getDesiredNumUnboundedSourceSplits(options.as(DataflowPipelineOptions.class));
       for (UnboundedSource<?, ?> split :
-          unboundedSource.generateInitialSplits(desiredNumSplits, options)) {
+          unboundedSource.split(desiredNumSplits, options)) {
         encodedSplits.add(encodeBase64String(serializeToByteArray(split)));
       }
       checkArgument(!encodedSplits.isEmpty(), "UnboundedSources must have at least one split");

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
index 847de19..7c07920 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
@@ -102,12 +102,11 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
   }
 
   @Override
-  public List<? extends BoundedSource<T>>
-      splitIntoBundles(long desiredBundleSizeBytes,
+  public List<? extends BoundedSource<T>> split(long desiredBundleSizeBytes,
                        PipelineOptions options) throws Exception {
     List<MicrobatchSource<T, CheckpointMarkT>> result = new ArrayList<>();
     List<? extends UnboundedSource<T, CheckpointMarkT>> splits =
-        source.generateInitialSplits(numInitialSplits, options);
+        source.split(numInitialSplits, options);
     int numSplits = splits.size();
     long[] numRecords = splitNumRecords(maxNumRecords, numSplits);
     for (int i = 0; i < numSplits; i++) {

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
index fb6da97..d33529c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
@@ -104,7 +104,7 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
     try {
       this.numPartitions =
           createMicrobatchSource()
-              .splitIntoBundles(initialParallelism, options)
+              .split(initialParallelism, options)
               .size();
     } catch (Exception e) {
       throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
index 2f9a827..b99ae10 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
@@ -105,7 +105,7 @@ public class SourceRDD {
             + "size of {} bytes.", source, DEFAULT_BUNDLE_SIZE);
       }
       try {
-        List<? extends Source<T>> partitionedSources = source.splitIntoBundles(desiredSizeBytes,
+        List<? extends Source<T>> partitionedSources = source.split(desiredSizeBytes,
             runtimeContext.getPipelineOptions());
         Partition[] partitions = new SourcePartition[partitionedSources.size()];
         for (int i = 0; i < partitionedSources.size(); i++) {
@@ -258,7 +258,7 @@ public class SourceRDD {
     @Override
     public Partition[] getPartitions() {
       try {
-        List<? extends Source<T>> partitionedSources = microbatchSource.splitIntoBundles(
+        List<? extends Source<T>> partitionedSources = microbatchSource.split(
             -1 /* ignored */, runtimeContext.getPipelineOptions());
         Partition[] partitions = new CheckpointableSourcePartition[partitionedSources.size()];
         for (int i = 0; i < partitionedSources.size(); i++) {

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
index d7f1d7b..e54176f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java
@@ -186,12 +186,12 @@ public class BoundedReadFromUnboundedSource<T> extends PTransform<PBegin, PColle
     }
 
     @Override
-    public List<? extends BoundedSource<ValueWithRecordId<T>>> splitIntoBundles(
+    public List<? extends BoundedSource<ValueWithRecordId<T>>> split(
         long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
       List<UnboundedToBoundedSourceAdapter<T>> result = new ArrayList<>();
       int numInitialSplits = numInitialSplits(getMaxNumRecords());
       List<? extends UnboundedSource<T, ?>> splits =
-          getSource().generateInitialSplits(numInitialSplits, options);
+          getSource().split(numInitialSplits, options);
       int numSplits = splits.size();
       long[] numRecords = splitNumRecords(getMaxNumRecords(), numSplits);
       for (int i = 0; i < numSplits; i++) {

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java
index cd389e8..0b19aa2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java
@@ -34,7 +34,7 @@ import org.joda.time.Instant;
  *
  * <p>The operations are:
  * <ul>
- * <li>Splitting into bundles of given size: {@link #splitIntoBundles};
+ * <li>Splitting into sources that read bundles of given size: {@link #split};
  * <li>Size estimation: {@link #getEstimatedSizeBytes};
  * <li>The accompanying {@link BoundedReader reader} has additional functionality to enable runners
  * to dynamically adapt based on runtime conditions.
@@ -54,10 +54,19 @@ public abstract class BoundedSource<T> extends Source<T> {
   /**
    * Splits the source into bundles of approximately {@code desiredBundleSizeBytes}.
    */
-  public abstract List<? extends BoundedSource<T>> splitIntoBundles(
+  public abstract List<? extends BoundedSource<T>> split(
       long desiredBundleSizeBytes, PipelineOptions options) throws Exception;
 
   /**
+   * {@link BoundedSource#split(long, PipelineOptions)} old method name to be used with Dataflow.
+   */
+  @Deprecated
+  public List<? extends BoundedSource<T>> splitIntoBundles(
+      long desiredBundleSizeBytes, PipelineOptions options) throws Exception{
+      return split(desiredBundleSizeBytes, options);
+  }
+
+  /**
    * An estimate of the total size (in bytes) of the data that would be read from this source.
    * This estimate is in terms of external storage size, before any decompression or other
    * processing done by the reader.

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
index 4d1305c..73b663d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java
@@ -326,7 +326,7 @@ public class CountingSource {
      * {@code [2, 8, 14, ...)}, and {@code [4, 10, 16, ...)}.
      */
     @Override
-    public List<? extends UnboundedSource<Long, CountingSource.CounterMark>> generateInitialSplits(
+    public List<? extends UnboundedSource<Long, CountingSource.CounterMark>> split(
         int desiredNumSplits, PipelineOptions options) throws Exception {
       // Using Javadoc example, stride 2 with 3 splits becomes stride 6.
       long newStride = stride * desiredNumSplits;

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
index f38743a..95e6078 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
@@ -324,15 +324,15 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
       @Override
       public List<? extends FileBasedSource<T>> call() throws Exception {
         return createForSubrangeOfFile(file, 0, Long.MAX_VALUE)
-            .splitIntoBundles(desiredBundleSizeBytes, options);
+            .split(desiredBundleSizeBytes, options);
       }
     });
   }
 
   @Override
-  public final List<? extends FileBasedSource<T>> splitIntoBundles(
+  public final List<? extends FileBasedSource<T>> split(
       long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
-    // This implementation of method splitIntoBundles is provided to simplify subclasses. Here we
+    // This implementation of method split is provided to simplify subclasses. Here we
     // split a FileBasedSource based on a file pattern to FileBasedSources based on full single
     // files. For files that can be efficiently seeked, we further split FileBasedSources based on
     // those files to FileBasedSources based on sub ranges of single files.
@@ -370,7 +370,8 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
     } else {
       if (isSplittable()) {
         List<FileBasedSource<T>> splitResults = new ArrayList<>();
-        for (OffsetBasedSource<T> split : super.splitIntoBundles(desiredBundleSizeBytes, options)) {
+        for (OffsetBasedSource<T> split :
+            super.split(desiredBundleSizeBytes, options)) {
           splitResults.add((FileBasedSource<T>) split);
         }
         return splitResults;

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
index e9a398d..05f0d97 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
@@ -108,7 +108,7 @@ public abstract class OffsetBasedSource<T> extends BoundedSource<T> {
   }
 
   @Override
-  public List<? extends OffsetBasedSource<T>> splitIntoBundles(
+  public List<? extends OffsetBasedSource<T>> split(
       long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
     // Split the range into bundles based on the desiredBundleSizeBytes. Final bundle is adjusted to
     // make sure that we do not end up with a too small bundle at the end. If the desired bundle
@@ -163,7 +163,7 @@ public abstract class OffsetBasedSource<T> extends BoundedSource<T> {
   /**
    * Returns approximately how many bytes of data correspond to a single offset in this source.
    * Used for translation between this source's range and methods defined in terms of bytes, such
-   * as {@link #getEstimatedSizeBytes} and {@link #splitIntoBundles}.
+   * as {@link #getEstimatedSizeBytes} and {@link #split}.
    *
    * <p>Defaults to {@code 1} byte, which is the common case for, e.g., file sources.
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java
index cc1f598..af6a8cc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java
@@ -65,7 +65,7 @@ public abstract class UnboundedSource<
    * as possible, but does not have to match exactly.  A low number of splits
    * will limit the amount of parallelism in the source.
    */
-  public abstract List<? extends UnboundedSource<OutputT, CheckpointMarkT>> generateInitialSplits(
+  public abstract List<? extends UnboundedSource<OutputT, CheckpointMarkT>> split(
       int desiredNumSplits, PipelineOptions options) throws Exception;
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
index a2a33f3..2ab5b35 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
@@ -60,7 +60,7 @@ import org.slf4j.LoggerFactory;
  * amount of test coverage with few code. Most notable ones are:
  * <ul>
  *   <li>{@link #assertSourcesEqualReferenceSource} helps testing that the data read
- *   by the union of sources produced by {@link BoundedSource#splitIntoBundles}
+ *   by the union of sources produced by {@link BoundedSource#split}
  *   is the same as data read by the original source.
  *   <li>If your source implements dynamic work rebalancing, use the
  *   {@code assertSplitAtFraction} family of functions - they test behavior of
@@ -685,7 +685,7 @@ public class SourceTestUtils {
    *
    * <p>It forwards most methods to the given {@code boundedSource}, except:
    * <ol>
-   * <li> {@link BoundedSource#splitIntoBundles} rejects initial splitting
+   * <li> {@link BoundedSource#split} rejects initial splitting
    * by returning itself in a list.
    * <li> {@link BoundedReader#splitAtFraction} rejects dynamic splitting by returning null.
    * </ol>
@@ -708,7 +708,7 @@ public class SourceTestUtils {
     }
 
     @Override
-    public List<? extends BoundedSource<T>> splitIntoBundles(
+    public List<? extends BoundedSource<T>> split(
         long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
       return ImmutableList.of(this);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
index fb7b27d..78485c7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
@@ -168,7 +168,7 @@ public class AvroSourceTest {
 
     AvroSource<FixedRecord> source = AvroSource.from(filename).withSchema(FixedRecord.class);
     List<? extends BoundedSource<FixedRecord>> splits =
-        source.splitIntoBundles(file.length() / 3, null);
+        source.split(file.length() / 3, null);
     for (BoundedSource<FixedRecord> subSource : splits) {
       int items = SourceTestUtils.readFromSource(subSource, null).size();
       // Shouldn't split while unstarted.
@@ -201,7 +201,7 @@ public class AvroSourceTest {
     }
 
     List<? extends BoundedSource<FixedRecord>> splits =
-        source.splitIntoBundles(file.length() / 3, null);
+        source.split(file.length() / 3, null);
     for (BoundedSource<FixedRecord> subSource : splits) {
       try (BoundedSource.BoundedReader<FixedRecord> reader = subSource.createReader(null)) {
         assertEquals(Double.valueOf(0.0), reader.getFractionConsumed());
@@ -339,7 +339,7 @@ public class AvroSourceTest {
     int nonEmptySplits;
 
     // Split with the minimum bundle size
-    splits = source.splitIntoBundles(100L, options);
+    splits = source.split(100L, options);
     assertTrue(splits.size() > 2);
     SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
     nonEmptySplits = 0;
@@ -351,7 +351,7 @@ public class AvroSourceTest {
     assertTrue(nonEmptySplits > 2);
 
     // Split with larger bundle size
-    splits = source.splitIntoBundles(file.length() / 4, options);
+    splits = source.split(file.length() / 4, options);
     assertTrue(splits.size() > 2);
     SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
     nonEmptySplits = 0;
@@ -363,7 +363,7 @@ public class AvroSourceTest {
     assertTrue(nonEmptySplits > 2);
 
     // Split with the file length
-    splits = source.splitIntoBundles(file.length(), options);
+    splits = source.split(file.length(), options);
     assertTrue(splits.size() == 1);
     SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
index 0e3b07e..8807164 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
@@ -110,7 +110,7 @@ public class CountingSourceTest {
 
     BoundedSource<Long> initial = CountingSource.upTo(numElements);
     List<? extends BoundedSource<Long>> splits =
-        initial.splitIntoBundles(splitSizeBytes, p.getOptions());
+        initial.split(splitSizeBytes, p.getOptions());
     assertEquals("Expected exact splitting", numSplits, splits.size());
 
     // Assemble all the splits into one flattened PCollection, also verify their sizes.
@@ -234,7 +234,7 @@ public class CountingSourceTest {
 
     UnboundedSource<Long, ?> initial = CountingSource.unbounded();
     List<? extends UnboundedSource<Long, ?>> splits =
-        initial.generateInitialSplits(numSplits, p.getOptions());
+        initial.split(numSplits, p.getOptions());
     assertEquals("Expected exact splitting", numSplits, splits.size());
 
     long elementsPerSplit = numElements / numSplits;
@@ -262,7 +262,7 @@ public class CountingSourceTest {
     UnboundedCountingSource initial =
         CountingSource.createUnbounded().withRate(elementsPerPeriod, period);
     List<? extends UnboundedSource<Long, ?>> splits =
-        initial.generateInitialSplits(numSplits, p.getOptions());
+        initial.split(numSplits, p.getOptions());
     assertEquals("Expected exact splitting", numSplits, splits.size());
 
     long elementsPerSplit = numElements / numSplits;

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
index a889305..94a29da 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
@@ -410,7 +410,7 @@ public class FileBasedSourceTest {
 
     TestFileBasedSource source =
         new TestFileBasedSource(file0.getParent() + "/" + "file*", Long.MAX_VALUE, null);
-    List<? extends BoundedSource<String>> splits = source.splitIntoBundles(Long.MAX_VALUE, null);
+    List<? extends BoundedSource<String>> splits = source.split(Long.MAX_VALUE, null);
     assertEquals(numFiles, splits.size());
   }
 
@@ -421,7 +421,7 @@ public class FileBasedSourceTest {
     TestFileBasedSource source = new TestFileBasedSource(missingFilePath, Long.MAX_VALUE, null);
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage(String.format("Unable to find any files matching %s", missingFilePath));
-    source.splitIntoBundles(1234, options);
+    source.split(1234, options);
   }
 
   @Test
@@ -698,7 +698,7 @@ public class FileBasedSourceTest {
 
     TestFileBasedSource source = new TestFileBasedSource(file.getPath(), 16, null);
 
-    List<? extends BoundedSource<String>> sources = source.splitIntoBundles(32, null);
+    List<? extends BoundedSource<String>> sources = source.split(32, null);
 
     // Not a trivial split.
     assertTrue(sources.size() > 1);
@@ -877,7 +877,7 @@ public class FileBasedSourceTest {
 
     TestFileBasedSource source =
         new TestFileBasedSource(new File(file1.getParent(), "file*").getPath(), 64, null);
-    List<? extends BoundedSource<String>> sources = source.splitIntoBundles(512, null);
+    List<? extends BoundedSource<String>> sources = source.split(512, null);
 
     // Not a trivial split.
     assertTrue(sources.size() > 1);

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
index a300a9a..25168a3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
@@ -147,7 +147,7 @@ public class OffsetBasedSourceTest {
     CoarseRangeSource testSource = new CoarseRangeSource(start, end, minBundleSize, 1);
     long[] boundaries = {0, 150, 300, 450, 600, 750, 900, 1000};
     assertSplitsAre(
-        testSource.splitIntoBundles(150 * testSource.getBytesPerOffset(), null),
+        testSource.split(150 * testSource.getBytesPerOffset(), null),
         boundaries);
   }
 
@@ -159,7 +159,7 @@ public class OffsetBasedSourceTest {
     CoarseRangeSource testSource = new CoarseRangeSource(start, end, minBundleSize, 1);
     long[] boundaries = {300, 450, 600, 750, 900, 1000};
     assertSplitsAre(
-        testSource.splitIntoBundles(150 * testSource.getBytesPerOffset(), null),
+        testSource.split(150 * testSource.getBytesPerOffset(), null),
         boundaries);
   }
 
@@ -182,7 +182,7 @@ public class OffsetBasedSourceTest {
     CoarseRangeSource testSource = new CoarseRangeSource(start, end, minBundleSize, 1);
     long[] boundaries = {300, 450, 600, 750, 1000};
     assertSplitsAre(
-        testSource.splitIntoBundles(100 * testSource.getBytesPerOffset(), null),
+        testSource.split(100 * testSource.getBytesPerOffset(), null),
         boundaries);
   }
 
@@ -195,7 +195,7 @@ public class OffsetBasedSourceTest {
     // Last 10 bytes should collapse to the previous bundle.
     long[] boundaries = {0, 110, 220, 330, 440, 550, 660, 770, 880, 1000};
     assertSplitsAre(
-        testSource.splitIntoBundles(110 * testSource.getBytesPerOffset(), null),
+        testSource.split(110 * testSource.getBytesPerOffset(), null),
         boundaries);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
index 416a086..74acf18 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
@@ -152,7 +152,7 @@ public class ReadTest implements Serializable{
 
   private abstract static class CustomBoundedSource extends BoundedSource<String> {
     @Override
-    public List<? extends BoundedSource<String>> splitIntoBundles(
+    public List<? extends BoundedSource<String>> split(
         long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
       return null;
     }
@@ -186,7 +186,7 @@ public class ReadTest implements Serializable{
   private abstract static class CustomUnboundedSource
       extends UnboundedSource<String, NoOpCheckpointMark> {
     @Override
-    public List<? extends UnboundedSource<String, NoOpCheckpointMark>> generateInitialSplits(
+    public List<? extends UnboundedSource<String, NoOpCheckpointMark>> split(
         int desiredNumSplits, PipelineOptions options) throws Exception {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 2e36273..3b6992a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -1118,7 +1118,7 @@ public class TextIOTest {
   }
 
   @Test
-  public void testInitialSplitIntoBundlesAutoModeTxt() throws Exception {
+  public void testInitialSplitAutoModeTxt() throws Exception {
     PipelineOptions options = TestPipeline.testingPipelineOptions();
     long desiredBundleSize = 1000;
 
@@ -1127,7 +1127,7 @@ public class TextIOTest {
 
     FileBasedSource<String> source = TextIO.Read.from(largeTxt.getPath()).getSource();
     List<? extends FileBasedSource<String>> splits =
-        source.splitIntoBundles(desiredBundleSize, options);
+        source.split(desiredBundleSize, options);
 
     // At least 2 splits and they are equal to reading the whole file.
     assertThat(splits, hasSize(greaterThan(1)));
@@ -1135,7 +1135,7 @@ public class TextIOTest {
   }
 
   @Test
-  public void testInitialSplitIntoBundlesAutoModeGz() throws Exception {
+  public void testInitialSplitAutoModeGz() throws Exception {
     long desiredBundleSize = 1000;
     PipelineOptions options = TestPipeline.testingPipelineOptions();
 
@@ -1144,7 +1144,7 @@ public class TextIOTest {
 
     FileBasedSource<String> source = TextIO.Read.from(largeGz.getPath()).getSource();
     List<? extends FileBasedSource<String>> splits =
-        source.splitIntoBundles(desiredBundleSize, options);
+        source.split(desiredBundleSize, options);
 
     // Exactly 1 split, even in AUTO mode, since it is a gzip file.
     assertThat(splits, hasSize(equalTo(1)));
@@ -1152,7 +1152,7 @@ public class TextIOTest {
   }
 
   @Test
-  public void testInitialSplitIntoBundlesGzipModeTxt() throws Exception {
+  public void testInitialSplitGzipModeTxt() throws Exception {
     PipelineOptions options = TestPipeline.testingPipelineOptions();
     long desiredBundleSize = 1000;
 
@@ -1162,7 +1162,7 @@ public class TextIOTest {
     FileBasedSource<String> source =
         TextIO.Read.from(largeTxt.getPath()).withCompressionType(GZIP).getSource();
     List<? extends FileBasedSource<String>> splits =
-        source.splitIntoBundles(desiredBundleSize, options);
+        source.split(desiredBundleSize, options);
 
     // Exactly 1 split, even though splittable text file, since using GZIP mode.
     assertThat(splits, hasSize(equalTo(1)));
@@ -1170,7 +1170,7 @@ public class TextIOTest {
   }
 
   @Test
-  public void testInitialSplitIntoBundlesGzipModeGz() throws Exception {
+  public void testInitialSplitGzipModeGz() throws Exception {
     PipelineOptions options = TestPipeline.testingPipelineOptions();
     long desiredBundleSize = 1000;
 
@@ -1180,7 +1180,7 @@ public class TextIOTest {
     FileBasedSource<String> source =
         TextIO.Read.from(largeGz.getPath()).withCompressionType(GZIP).getSource();
     List<? extends FileBasedSource<String>> splits =
-        source.splitIntoBundles(desiredBundleSize, options);
+        source.split(desiredBundleSize, options);
 
     // Exactly 1 split using .gz extension and using GZIP mode.
     assertThat(splits, hasSize(equalTo(1)));

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java
index d6898d5..5f71f30 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java
@@ -363,7 +363,7 @@ public class XmlSourceTest {
             .withRecordElement("train")
             .withRecordClass(Train.class)
             .withMinBundleSize(10);
-    List<? extends FileBasedSource<Train>> splits = source.splitIntoBundles(50, null);
+    List<? extends FileBasedSource<Train>> splits = source.split(50, null);
 
     assertTrue(splits.size() > 2);
 
@@ -686,7 +686,7 @@ public class XmlSourceTest {
             .withRecordElement("train")
             .withRecordClass(Train.class)
             .withMinBundleSize(10);
-    List<? extends FileBasedSource<Train>> splits = source.splitIntoBundles(100, null);
+    List<? extends FileBasedSource<Train>> splits = source.split(100, null);
 
     assertTrue(splits.size() > 2);
 
@@ -710,7 +710,7 @@ public class XmlSourceTest {
             .withRecordElement("train")
             .withRecordClass(Train.class)
             .withMinBundleSize(10);
-    List<? extends FileBasedSource<Train>> splits = source.splitIntoBundles(256, null);
+    List<? extends FileBasedSource<Train>> splits = source.split(256, null);
 
     // Not a trivial split
     assertTrue(splits.size() > 2);
@@ -737,7 +737,7 @@ public class XmlSourceTest {
             .withMinBundleSize(10);
 
     List<? extends FileBasedSource<Train>> splits =
-        fileSource.splitIntoBundles(file.length() / 3, null);
+        fileSource.split(file.length() / 3, null);
     for (BoundedSource<Train> splitSource : splits) {
       int numItems = readEverythingFromReader(splitSource.createReader(null)).size();
       // Should not split while unstarted.

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java
index b53d1fc..9fcc3c5 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/dataflow/TestCountingSource.java
@@ -104,7 +104,7 @@ public class TestCountingSource
   }
 
   @Override
-  public List<TestCountingSource> generateInitialSplits(
+  public List<TestCountingSource> split(
       int desiredNumSplits, PipelineOptions options) {
     List<TestCountingSource> splits = new ArrayList<>();
     int numSplits = allowSplitting ? desiredNumSplits : 1;

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SourceTestUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SourceTestUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SourceTestUtilsTest.java
index efb385d..62114b0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SourceTestUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/SourceTestUtilsTest.java
@@ -43,7 +43,7 @@ public class SourceTestUtilsTest {
     PipelineOptions options = PipelineOptionsFactory.create();
     BoundedSource<Long> baseSource = CountingSource.upTo(100);
     BoundedSource<Long> unsplittableSource = SourceTestUtils.toUnsplittableSource(baseSource);
-    List<?> splits = unsplittableSource.splitIntoBundles(1, options);
+    List<?> splits = unsplittableSource.split(1, options);
     assertEquals(splits.size(), 1);
     assertEquals(splits.get(0), unsplittableSource);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index 43e4463..8a30476 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -406,32 +406,32 @@ public class CreateTest {
   }
 
   @Test
-  public void testSourceSplitIntoBundles() throws Exception {
+  public void testSourceSplit() throws Exception {
     CreateSource<Integer> source =
         CreateSource.fromIterable(
             ImmutableList.of(1, 2, 3, 4, 5, 6, 7, 8), BigEndianIntegerCoder.of());
     PipelineOptions options = PipelineOptionsFactory.create();
-    List<? extends BoundedSource<Integer>> splitSources = source.splitIntoBundles(12, options);
+    List<? extends BoundedSource<Integer>> splitSources = source.split(12, options);
     assertThat(splitSources, hasSize(3));
     SourceTestUtils.assertSourcesEqualReferenceSource(source, splitSources, options);
   }
 
   @Test
-  public void testSourceSplitIntoBundlesVoid() throws Exception {
+  public void testSourceSplitVoid() throws Exception {
     CreateSource<Void> source =
         CreateSource.fromIterable(
             Lists.<Void>newArrayList(null, null, null, null, null), VoidCoder.of());
     PipelineOptions options = PipelineOptionsFactory.create();
-    List<? extends BoundedSource<Void>> splitSources = source.splitIntoBundles(3, options);
+    List<? extends BoundedSource<Void>> splitSources = source.split(3, options);
     SourceTestUtils.assertSourcesEqualReferenceSource(source, splitSources, options);
   }
 
   @Test
-  public void testSourceSplitIntoBundlesEmpty() throws Exception {
+  public void testSourceSplitEmpty() throws Exception {
     CreateSource<Integer> source =
         CreateSource.fromIterable(ImmutableList.<Integer>of(), BigEndianIntegerCoder.of());
     PipelineOptions options = PipelineOptionsFactory.create();
-    List<? extends BoundedSource<Integer>> splitSources = source.splitIntoBundles(12, options);
+    List<? extends BoundedSource<Integer>> splitSources = source.split(12, options);
     SourceTestUtils.assertSourcesEqualReferenceSource(source, splitSources, options);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index baf0cc2..8e138ef 100644
--- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -428,7 +428,7 @@ public class ElasticsearchIO {
     }
 
     @Override
-    public List<? extends BoundedSource<String>> splitIntoBundles(
+    public List<? extends BoundedSource<String>> split(
         long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
       List<BoundedElasticsearchSource> sources = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
index b5fec17..d968bc2 100644
--- a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
+++ b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
@@ -88,7 +88,7 @@ public class ElasticsearchIOIT {
     // as many bundles as ES shards and bundle size is shard size
     long desiredBundleSizeBytes = 0;
     List<? extends BoundedSource<String>> splits =
-        initialSource.splitIntoBundles(desiredBundleSizeBytes, options);
+        initialSource.split(desiredBundleSizeBytes, options);
     SourceTestUtils.assertSourcesEqualReferenceSource(initialSource, splits, options);
     //this is the number of ES shards
     // (By default, each index in Elasticsearch is allocated 5 primary shards)

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index bca0fe8..260af79 100644
--- a/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++ b/sdks/java/io/elasticsearch/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -326,7 +326,7 @@ public class ElasticsearchIOTest implements Serializable {
   }
 
   @Test
-  public void testSplitIntoBundles() throws Exception {
+  public void testSplit() throws Exception {
     ElasticSearchIOTestUtils.insertTestDocuments(ES_INDEX, ES_TYPE, NUM_DOCS, node.client());
     PipelineOptions options = PipelineOptionsFactory.create();
     ElasticsearchIO.Read read =
@@ -336,7 +336,7 @@ public class ElasticsearchIOTest implements Serializable {
     // as many bundles as ES shards and bundle size is shard size
     int desiredBundleSizeBytes = 0;
     List<? extends BoundedSource<String>> splits =
-        initialSource.splitIntoBundles(desiredBundleSizeBytes, options);
+        initialSource.split(desiredBundleSizeBytes, options);
     SourceTestUtils.assertSourcesEqualReferenceSource(initialSource, splits, options);
     //this is the number of ES shards
     // (By default, each index in Elasticsearch is allocated 5 primary shards)

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index 746258f..1b90dc3 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -81,7 +81,7 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
   }
 
   @Override
-  public List<BoundedSource<TableRow>> splitIntoBundles(
+  public List<BoundedSource<TableRow>> split(
       long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
     BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
     TableReference tableToExtract = getTableToExtract(bqOptions);

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java
index f7d8252..b8e6b39 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java
@@ -52,10 +52,10 @@ class TransformingSource<T, V> extends BoundedSource<V> {
   }
 
   @Override
-  public List<? extends BoundedSource<V>> splitIntoBundles(
+  public List<? extends BoundedSource<V>> split(
       long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
     return Lists.transform(
-        boundedSource.splitIntoBundles(desiredBundleSizeBytes, options),
+        boundedSource.split(desiredBundleSizeBytes, options),
         new Function<BoundedSource<T>, BoundedSource<V>>() {
           @Override
           public BoundedSource<V> apply(BoundedSource<T> input) {

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 89c67a4..28f8878 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -724,7 +724,7 @@ public class BigtableIO {
     }
 
     @Override
-    public List<BigtableSource> splitIntoBundles(
+    public List<BigtableSource> split(
         long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
       // Update the desiredBundleSizeBytes in order to limit the
       // number of splits to maximumNumberOfSplits.
@@ -734,11 +734,11 @@ public class BigtableIO {
           Math.max(sizeEstimate / maximumNumberOfSplits, desiredBundleSizeBytes);
 
       // Delegate to testable helper.
-      return splitIntoBundlesBasedOnSamples(desiredBundleSizeBytes, getSampleRowKeys(options));
+      return splitBasedOnSamples(desiredBundleSizeBytes, getSampleRowKeys(options));
     }
 
     /** Helper that splits this source into bundles based on Cloud Bigtable sampled row keys. */
-    private List<BigtableSource> splitIntoBundlesBasedOnSamples(
+    private List<BigtableSource> splitBasedOnSamples(
         long desiredBundleSizeBytes, List<SampleRowKeysResponse> sampleRowKeys) {
       // There are no regions, or no samples available. Just scan the entire range.
       if (sampleRowKeys.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index 9d8763b..0389d4b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -1119,7 +1119,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
     }
 
     @Override
-    public List<PubsubSource<T>> generateInitialSplits(
+    public List<PubsubSource<T>> split(
         int desiredNumSplits, PipelineOptions options) throws Exception {
       List<PubsubSource<T>> result = new ArrayList<>(desiredNumSplits);
       PubsubSource<T> splitSource = this;
@@ -1142,8 +1142,8 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>>
       SubscriptionPath subscription = subscriptionPath;
       if (subscription == null) {
         if (checkpoint == null) {
-          // This reader has never been started and there was no call to #splitIntoBundles; create
-          // a single random subscription, which will be kept in the checkpoint.
+          // This reader has never been started and there was no call to #split;
+          // create a single random subscription, which will be kept in the checkpoint.
           subscription = outer.createRandomSubscription(options);
         } else {
           subscription = checkpoint.getSubscription();

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 2a2bf91..83fd8d9 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -1756,7 +1756,7 @@ public class BigQueryIOTest implements Serializable {
     SourceTestUtils.assertSplitAtFractionBehavior(
         bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
 
-    List<? extends BoundedSource<TableRow>> sources = bqSource.splitIntoBundles(100, options);
+    List<? extends BoundedSource<TableRow>> sources = bqSource.split(100, options);
     assertEquals(1, sources.size());
     BoundedSource<TableRow> actual = sources.get(0);
     assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class));
@@ -1835,7 +1835,7 @@ public class BigQueryIOTest implements Serializable {
     SourceTestUtils.assertSplitAtFractionBehavior(
         bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
 
-    List<? extends BoundedSource<TableRow>> sources = bqSource.splitIntoBundles(100, options);
+    List<? extends BoundedSource<TableRow>> sources = bqSource.split(100, options);
     assertEquals(1, sources.size());
     BoundedSource<TableRow> actual = sources.get(0);
     assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class));
@@ -1917,7 +1917,7 @@ public class BigQueryIOTest implements Serializable {
     SourceTestUtils.assertSplitAtFractionBehavior(
         bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
 
-    List<? extends BoundedSource<TableRow>> sources = bqSource.splitIntoBundles(100, options);
+    List<? extends BoundedSource<TableRow>> sources = bqSource.split(100, options);
     assertEquals(1, sources.size());
     BoundedSource<TableRow> actual = sources.get(0);
     assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class));
@@ -1963,7 +1963,7 @@ public class BigQueryIOTest implements Serializable {
         stringSource, 100, 0.3, ExpectedSplitOutcome.MUST_SUCCEED_AND_BE_CONSISTENT, options);
 
     SourceTestUtils.assertSourcesEqualReferenceSource(
-        stringSource, stringSource.splitIntoBundles(100, options), options);
+        stringSource, stringSource.split(100, options), options);
   }
 
   @Test
@@ -1994,7 +1994,7 @@ public class BigQueryIOTest implements Serializable {
         stringSource, 100, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
 
     SourceTestUtils.assertSourcesEqualReferenceSource(
-        stringSource, stringSource.splitIntoBundles(100, options), options);
+        stringSource, stringSource.split(100, options), options);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index 1c770a2..3653753 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -476,7 +476,7 @@ public class BigtableIOTest {
             ByteKeyRange.ALL_KEYS,
             null /*size*/);
     List<BigtableSource> splits =
-        source.splitIntoBundles(numRows * bytesPerRow / numSamples, null /* options */);
+        source.split(numRows * bytesPerRow / numSamples, null /* options */);
 
     // Test num splits and split equality.
     assertThat(splits, hasSize(numSamples));
@@ -503,7 +503,8 @@ public class BigtableIOTest {
         null /*filter*/,
         ByteKeyRange.ALL_KEYS,
         null /*size*/);
-    List<BigtableSource> splits = source.splitIntoBundles(numRows * bytesPerRow / numSplits, null);
+    List<BigtableSource> splits =
+        source.split(numRows * bytesPerRow / numSplits, null);
 
     // Test num splits and split equality.
     assertThat(splits, hasSize(numSplits));
@@ -528,7 +529,8 @@ public class BigtableIOTest {
         RowFilter.newBuilder().setRowKeyRegexFilter(ByteString.copyFromUtf8(".*17.*")).build();
     BigtableSource source =
         new BigtableSource(serviceFactory, table, filter, ByteKeyRange.ALL_KEYS, null /*size*/);
-    List<BigtableSource> splits = source.splitIntoBundles(numRows * bytesPerRow / numSplits, null);
+    List<BigtableSource> splits =
+        source.split(numRows * bytesPerRow / numSplits, null);
 
     // Test num splits and split equality.
     assertThat(splits, hasSize(numSplits));

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
index d2e88c3..949ba4f 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSourceTest.java
@@ -324,7 +324,7 @@ public class PubsubUnboundedSourceTest {
   }
 
   @Test
-  public void noSubscriptionSplitIntoBundlesGeneratesSubscription() throws Exception {
+  public void noSubscriptionSplitGeneratesSubscription() throws Exception {
     TopicPath topicPath = PubsubClient.topicPathFromName("my_project", "my_topic");
     factory = PubsubTestClient.createFactoryForCreateSubscription();
     PubsubUnboundedSource<String> source =
@@ -343,7 +343,7 @@ public class PubsubUnboundedSourceTest {
 
     PipelineOptions options = PipelineOptionsFactory.create();
     List<PubsubSource<String>> splits =
-        (new PubsubSource<>(source)).generateInitialSplits(3, options);
+        (new PubsubSource<>(source)).split(3, options);
     // We have at least one returned split
     assertThat(splits, hasSize(greaterThan(0)));
     for (PubsubSource<String> split : splits) {

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
index d776ea0..93ff108 100644
--- a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
+++ b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
@@ -449,7 +449,7 @@ public class HadoopInputFormatIO {
     }
 
     @Override
-    public List<BoundedSource<KV<K, V>>> splitIntoBundles(long desiredBundleSizeBytes,
+    public List<BoundedSource<KV<K, V>>> split(long desiredBundleSizeBytes,
         PipelineOptions options) throws Exception {
       // desiredBundleSizeBytes is not being considered as splitting based on this
       // value is not supported by inputFormat getSplits() method.
@@ -485,7 +485,7 @@ public class HadoopInputFormatIO {
     /**
      * This is a helper function to compute splits. This method will also calculate size of the
      * data being read. Note: This method is executed exactly once and the splits are retrieved
-     * and cached in this. These splits are further used by splitIntoBundles() and
+     * and cached in this. These splits are further used by split() and
      * getEstimatedSizeBytes().
      */
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
index 3a4a99d..da70632 100644
--- a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
+++ b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.io.hadoop.inputformat.EmployeeInputFormat.NewObjectsE
 import org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO.HadoopInputFormatBoundedSource;
 import org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO.SerializableConfiguration;
 import org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO.SerializableSplit;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -518,8 +519,8 @@ public class HadoopInputFormatIOTest {
     // Validate if estimated size is equal to the size of records.
     assertEquals(referenceRecords.size(), estimatedSize);
     List<BoundedSource<KV<Text, Employee>>> boundedSourceList =
-        hifSource.splitIntoBundles(0, p.getOptions());
-    // Validate if splitIntoBundles() has split correctly.
+        hifSource.split(0, p.getOptions());
+    // Validate if split() has split correctly.
     assertEquals(TestEmployeeDataSet.NUMBER_OF_SPLITS, boundedSourceList.size());
     List<KV<Text, Employee>> bundleRecords = new ArrayList<>();
     for (BoundedSource<KV<Text, Employee>> source : boundedSourceList) {
@@ -638,12 +639,14 @@ public class HadoopInputFormatIOTest {
   }
 
   /**
-   * This test validates behavior of {@link HadoopInputFormatBoundedSource#createReader()
-   * createReader()} method when {@link HadoopInputFormatBoundedSource#splitIntoBundles()
-   * splitIntoBundles()} is not called.
+   * This test validates behavior of
+   * {@link HadoopInputFormatBoundedSource#createReader(PipelineOptions)}
+   * createReader()} method when
+   * {@link HadoopInputFormatBoundedSource#split(long, PipelineOptions)}
+   * split()} is not called.
    */
   @Test
-  public void testCreateReaderIfSplitIntoBundlesNotCalled() throws Exception {
+  public void testCreateReaderIfSplitNotCalled() throws Exception {
     HadoopInputFormatBoundedSource<Text, Employee> hifSource = getTestHIFSource(
         EmployeeInputFormat.class,
         Text.class,
@@ -658,7 +661,7 @@ public class HadoopInputFormatIOTest {
   /**
    * This test validates behavior of
    * {@link HadoopInputFormatBoundedSource#computeSplitsIfNecessary() computeSplits()} when Hadoop
-   * InputFormat's {@link InputFormat#getSplits() getSplits()} returns empty list.
+   * InputFormat's {@link InputFormat#getSplits(JobContext)} returns empty list.
    */
   @Test
   public void testComputeSplitsIfGetSplitsReturnsEmptyList() throws Exception {
@@ -843,6 +846,6 @@ public class HadoopInputFormatIOTest {
         inputFormatValueClass,
         keyCoder,
         valueCoder);
-    return boundedSource.splitIntoBundles(0, p.getOptions());
+    return boundedSource.split(0, p.getOptions());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
index ed191cb..ccdcef6 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
@@ -422,10 +422,9 @@ public class HBaseIO {
             return sources;
         }
 
-        @Override
-        public List<? extends BoundedSource<Result>>
-            splitIntoBundles(long desiredBundleSizeBytes, PipelineOptions options)
-                throws Exception {
+    @Override
+    public List<? extends BoundedSource<Result>> split(
+        long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
             LOG.debug("desiredBundleSize {} bytes", desiredBundleSizeBytes);
             long estimatedSizeBytes = getEstimatedSizeBytes(options);
             int numSplits = 1;

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
index ee3369e..c2410ea 100644
--- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
+++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
@@ -197,7 +197,7 @@ public class HBaseIOTest {
         HBaseIO.Read read = HBaseIO.read().withConfiguration(conf).withTableId(table);
         HBaseSource source = new HBaseSource(read, null /* estimatedSizeBytes */);
         List<? extends BoundedSource<Result>> splits =
-                source.splitIntoBundles(numRows * bytesPerRow / numRegions,
+                source.split(numRows * bytesPerRow / numRegions,
                         null /* options */);
 
         // Test num splits and split equality.

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
index b55944b..5cc2097 100644
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java
@@ -265,7 +265,7 @@ public abstract class HDFSFileSource<T, K, V> extends BoundedSource<T> {
   // =======================================================================
 
   @Override
-  public List<? extends BoundedSource<T>> splitIntoBundles(
+  public List<? extends BoundedSource<T>> split(
       final long desiredBundleSizeBytes,
       PipelineOptions options) throws Exception {
     if (serializableSplit() == null) {
@@ -296,8 +296,8 @@ public abstract class HDFSFileSource<T, K, V> extends BoundedSource<T> {
     long size = 0;
 
     try {
-      // If this source represents a split from splitIntoBundles, then return the size of the split,
-      // rather then the entire input
+      // If this source represents a split from split,
+      // then return the size of the split, rather then the entire input
       if (serializableSplit() != null) {
         return serializableSplit().getSplit().getLength();
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
index c821d9d..a964239 100644
--- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
+++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
@@ -159,7 +159,7 @@ public class HDFSFileSourceTest {
 
     // Split with a small bundle size (has to be at least size of sync interval)
     List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source
-        .splitIntoBundles(SequenceFile.SYNC_INTERVAL, options);
+        .split(SequenceFile.SYNC_INTERVAL, options);
     assertTrue(splits.size() > 2);
     SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
     int nonEmptySplits = 0;
@@ -184,7 +184,7 @@ public class HDFSFileSourceTest {
 
     long originalSize = source.getEstimatedSizeBytes(options);
     long splitTotalSize = 0;
-    List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source.splitIntoBundles(
+    List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source.split(
         SequenceFile.SYNC_INTERVAL, options
     );
     for (BoundedSource<KV<IntWritable, Text>> splitSource : splits) {

http://git-wip-us.apache.org/repos/asf/beam/blob/62464b5b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
index 89016ac..104bea4 100644
--- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
@@ -340,7 +340,7 @@ public class JmsIO {
     }
 
     @Override
-    public List<UnboundedJmsSource> generateInitialSplits(
+    public List<UnboundedJmsSource> split(
         int desiredNumSplits, PipelineOptions options) throws Exception {
       List<UnboundedJmsSource> sources = new ArrayList<>();
       if (spec.getTopic() != null) {