You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/05/02 19:23:21 UTC
[3/7] beam git commit: Removes TextIO.Read.Bound
Removes TextIO.Read.Bound
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/96315203
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/96315203
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/96315203
Branch: refs/heads/master
Commit: 96315203284ff60b10210d73b81b65ea0a395544
Parents: ef4658a
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 28 17:06:46 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue May 2 12:20:14 2017 -0700
----------------------------------------------------------------------
.../beam/examples/DebuggingWordCount.java | 2 +-
.../apache/beam/examples/MinimalWordCount.java | 2 +-
.../apache/beam/examples/WindowedWordCount.java | 2 +-
.../org/apache/beam/examples/WordCount.java | 2 +-
.../beam/examples/complete/AutoComplete.java | 2 +-
.../examples/complete/StreamingWordExtract.java | 2 +-
.../apache/beam/examples/complete/TfIdf.java | 2 +-
.../examples/complete/TopWikipediaSessions.java | 2 +-
.../examples/complete/TrafficMaxLaneFlow.java | 2 +-
.../beam/examples/complete/TrafficRoutes.java | 2 +-
.../beam/examples/cookbook/DistinctExample.java | 2 +-
.../beam/examples/cookbook/TriggerExample.java | 2 +-
.../beam/examples/MinimalWordCountJava8.java | 2 +-
.../examples/complete/game/HourlyTeamScore.java | 2 +-
.../beam/examples/complete/game/UserScore.java | 2 +-
.../examples/MinimalWordCountJava8Test.java | 2 +-
.../runners/apex/examples/WordCountTest.java | 2 +-
.../DataflowPipelineTranslatorTest.java | 12 +-
.../runners/dataflow/DataflowRunnerTest.java | 15 +-
.../beam/runners/spark/examples/WordCount.java | 2 +-
.../main/java/org/apache/beam/sdk/Pipeline.java | 4 +-
.../java/org/apache/beam/sdk/io/TextIO.java | 247 ++++++++-----------
.../java/org/apache/beam/sdk/io/TextIOTest.java | 34 +--
.../beam/sdk/runners/TransformTreeTest.java | 4 +-
.../display/DisplayDataEvaluatorTest.java | 2 +-
.../sdk/transforms/windowing/WindowingTest.java | 2 +-
.../beam/sdk/io/hdfs/HadoopFileSystemTest.java | 3 +-
27 files changed, 158 insertions(+), 201 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
index e6e3a92..06af209 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
@@ -136,7 +136,7 @@ public class DebuggingWordCount {
Pipeline p = Pipeline.create(options);
PCollection<KV<String, Long>> filteredWords =
- p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))
+ p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply(new WordCount.CountWords())
.apply(ParDo.of(new FilterTextFn(options.getFilterPattern())));
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
index cf72672..4a0c1bb 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
@@ -75,7 +75,7 @@ public class MinimalWordCount {
// the input text (a set of Shakespeare's texts).
// This example reads a public data set consisting of the complete works of Shakespeare.
- p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*"))
+ p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))
// Concept #2: Apply a ParDo transform to our PCollection of text lines. This ParDo invokes a
// DoFn (defined in-line) on each element that tokenizes the text line into individual words.
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
index d88de54..5c64c53 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
@@ -178,7 +178,7 @@ public class WindowedWordCount {
*/
PCollection<String> input = pipeline
/** Read from the GCS file. */
- .apply(TextIO.Read.from(options.getInputFile()))
+ .apply(TextIO.read().from(options.getInputFile()))
// Concept #2: Add an element timestamp, using an artificial time just to show windowing.
// See AddTimestampFn for more detail on this.
.apply(ParDo.of(new AddTimestampFn(minTimestamp, maxTimestamp)));
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
index b64d2c1..e331a86 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
@@ -176,7 +176,7 @@ public class WordCount {
// Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the
// static FormatAsTextFn() to the ParDo transform.
- p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))
+ p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply(new CountWords())
.apply(MapElements.via(new FormatAsTextFn()))
.apply("WriteCounts", TextIO.Write.to(options.getOutput()));
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
index e6621ce..bd69855 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
@@ -469,7 +469,7 @@ public class AutoComplete {
// Create the pipeline.
Pipeline p = Pipeline.create(options);
PCollection<KV<String, List<CompletionCandidate>>> toWrite = p
- .apply(TextIO.Read.from(options.getInputFile()))
+ .apply(TextIO.read().from(options.getInputFile()))
.apply(ParDo.of(new ExtractHashtags()))
.apply(Window.<String>into(windowFn))
.apply(ComputeTopCompletions.top(10, options.getRecursive()));
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
index 21a9849..f35d67a 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
@@ -132,7 +132,7 @@ public class StreamingWordExtract {
.append(options.getBigQueryTable())
.toString();
pipeline
- .apply("ReadLines", TextIO.Read.from(options.getInputFile()))
+ .apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply(ParDo.of(new ExtractWords()))
.apply(ParDo.of(new Uppercase()))
.apply(ParDo.of(new StringToRowConverter()))
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
index 2e1be90..1ef69c0 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
@@ -179,7 +179,7 @@ public class TfIdf {
}
PCollection<KV<URI, String>> oneUriToLines = pipeline
- .apply("TextIO.Read(" + uriString + ")", TextIO.Read.from(uriString))
+ .apply("TextIO.Read(" + uriString + ")", TextIO.read().from(uriString))
.apply("WithKeys(" + uriString + ")", WithKeys.<URI, String>of(uri));
urisToLines = urisToLines.and(oneUriToLines);
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
index 4c07ca4..bb8c8bc 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
@@ -207,7 +207,7 @@ public class TopWikipediaSessions {
double samplingThreshold = 0.1;
- p.apply(TextIO.Read.from(options.getInput()))
+ p.apply(TextIO.read().from(options.getInput()))
.apply(MapElements.via(new ParseTableRowJson()))
.apply(new ComputeTopSessions(samplingThreshold))
.apply("Write", TextIO.Write.withoutSharding().to(options.getOutput()));
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
index e57da93..d7c933e 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
@@ -291,7 +291,7 @@ public class TrafficMaxLaneFlow {
@Override
public PCollection<String> expand(PBegin begin) {
return begin
- .apply(TextIO.Read.from(inputFile))
+ .apply(TextIO.read().from(inputFile))
.apply(ParDo.of(new ExtractTimestamps()));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
index b1f938b..c9ba18c 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
@@ -301,7 +301,7 @@ public class TrafficRoutes {
@Override
public PCollection<String> expand(PBegin begin) {
return begin
- .apply(TextIO.Read.from(inputFile))
+ .apply(TextIO.read().from(inputFile))
.apply(ParDo.of(new ExtractTimestamps()));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java/src/main/java/org/apache/beam/examples/cookbook/DistinctExample.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DistinctExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DistinctExample.java
index 9670b7f..20c8fa0 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DistinctExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DistinctExample.java
@@ -87,7 +87,7 @@ public class DistinctExample {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline p = Pipeline.create(options);
- p.apply("ReadLines", TextIO.Read.from(options.getInput()))
+ p.apply("ReadLines", TextIO.read().from(options.getInput()))
.apply(Distinct.<String>create())
.apply("DedupedShakespeare", TextIO.Write.to(options.getOutput()));
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
index 49d5eda..e7596aa 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
@@ -446,7 +446,7 @@ public class TriggerExample {
options.getBigQueryDataset(), options.getBigQueryTable());
PCollectionList<TableRow> resultList = pipeline
- .apply("ReadMyFile", TextIO.Read.from(options.getInput()))
+ .apply("ReadMyFile", TextIO.read().from(options.getInput()))
.apply("InsertRandomDelays", ParDo.of(new InsertDelays()))
.apply(ParDo.of(new ExtractFlowInfo()))
.apply(new CalculateTotalFlow(options.getWindowDuration()));
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
index 6badb75..6dabadc 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
@@ -55,7 +55,7 @@ public class MinimalWordCountJava8 {
Pipeline p = Pipeline.create(options);
- p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*"))
+ p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))
.apply(FlatMapElements
.into(TypeDescriptors.strings())
.via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))))
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
index b905d61..3f1ffb0 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
@@ -155,7 +155,7 @@ public class HourlyTeamScore extends UserScore {
final Instant startMinTimestamp = new Instant(minFmt.parseMillis(options.getStartMin()));
// Read 'gaming' events from a text file.
- pipeline.apply(TextIO.Read.from(options.getInput()))
+ pipeline.apply(TextIO.read().from(options.getInput()))
// Parse the incoming data.
.apply("ParseGameEvent", ParDo.of(new ParseEventFn()))
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
index 0adaabc..c136c2e 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
@@ -226,7 +226,7 @@ public class UserScore {
Pipeline pipeline = Pipeline.create(options);
// Read events from a text file and parse them.
- pipeline.apply(TextIO.Read.from(options.getInput()))
+ pipeline.apply(TextIO.read().from(options.getInput()))
.apply("ParseGameEvent", ParDo.of(new ParseEventFn()))
// Extract and sum username/score pairs from the event data.
.apply("ExtractUserScore", new ExtractAndSumScore("user"))
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
index f3becf9..689005a 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
@@ -62,7 +62,7 @@ public class MinimalWordCountJava8Test implements Serializable {
public void testMinimalWordCountJava8() throws Exception {
p.getOptions().as(GcsOptions.class).setGcsUtil(buildMockGcsUtil());
- p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*"))
+ p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))
.apply(FlatMapElements
.into(TypeDescriptors.strings())
.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+"))))
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
index b980715..b0fab0b 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
@@ -106,7 +106,7 @@ public class WordCountTest {
WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(WordCountOptions.class);
Pipeline p = Pipeline.create(options);
- p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))
+ p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply(ParDo.of(new ExtractWordsFn()))
.apply(Count.<String>perElement())
.apply(ParDo.of(new FormatAsStringFn()))
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index fcd23cf..d47da45 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -137,7 +137,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
// Enable the FileSystems API to know about gs:// URIs in this test.
FileSystems.setDefaultConfigInWorkers(options);
- p.apply("ReadMyFile", TextIO.Read.from("gs://bucket/object"))
+ p.apply("ReadMyFile", TextIO.read().from("gs://bucket/object"))
.apply("WriteMyFile", TextIO.Write.to("gs://bucket/object"));
DataflowRunner runner = DataflowRunner.fromOptions(options);
runner.replaceTransforms(p);
@@ -465,7 +465,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
// Create a pipeline that the predefined step will be embedded into
Pipeline pipeline = Pipeline.create(options);
- pipeline.apply("ReadMyFile", TextIO.Read.from("gs://bucket/in"))
+ pipeline.apply("ReadMyFile", TextIO.read().from("gs://bucket/in"))
.apply(ParDo.of(new NoOpFn()))
.apply(new EmbeddedTransform(predefinedStep.clone()))
.apply(ParDo.of(new NoOpFn()));
@@ -523,7 +523,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
Pipeline pipeline = Pipeline.create(options);
String stepName = "DoFn1";
- pipeline.apply("ReadMyFile", TextIO.Read.from("gs://bucket/in"))
+ pipeline.apply("ReadMyFile", TextIO.read().from("gs://bucket/in"))
.apply(stepName, ParDo.of(new NoOpFn()))
.apply("WriteMyFile", TextIO.Write.to("gs://bucket/out"));
DataflowRunner runner = DataflowRunner.fromOptions(options);
@@ -723,7 +723,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
}
private void applyRead(Pipeline pipeline, String path) {
- pipeline.apply("Read(" + path + ")", TextIO.Read.from(path));
+ pipeline.apply("Read(" + path + ")", TextIO.read().from(path));
}
/**
@@ -736,7 +736,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
Pipeline pipeline = Pipeline.create(options);
DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(options);
- pipeline.apply(TextIO.Read.from("gs://bucket/foo**/baz"));
+ pipeline.apply(TextIO.read().from("gs://bucket/foo**/baz"));
// Check that translation does fail.
thrown.expectCause(allOf(
@@ -766,7 +766,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
Pipeline pipeline = Pipeline.create(options);
DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(options);
- pipeline.apply(TextIO.Read.from(new TestValueProvider()));
+ pipeline.apply(TextIO.read().from(new TestValueProvider()));
// Check that translation does not fail.
t.translate(
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 480591e..7261fe9 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -70,7 +70,6 @@ import org.apache.beam.sdk.extensions.gcp.auth.NoopCredentialFactory;
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.io.TextIO.Read;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -172,7 +171,7 @@ public class DataflowRunnerTest {
options.setRunner(DataflowRunner.class);
Pipeline p = Pipeline.create(options);
- p.apply("ReadMyFile", TextIO.Read.from("gs://bucket/object"))
+ p.apply("ReadMyFile", TextIO.read().from("gs://bucket/object"))
.apply("WriteMyFile", TextIO.Write.to("gs://bucket/object"));
// Enable the FileSystems API to know about gs:// URIs in this test.
@@ -335,7 +334,7 @@ public class DataflowRunnerTest {
RuntimeTestOptions options = dataflowOptions.as(RuntimeTestOptions.class);
Pipeline p = buildDataflowPipeline(dataflowOptions);
p
- .apply(TextIO.Read.from(options.getInput()))
+ .apply(TextIO.read().from(options.getInput()))
.apply(TextIO.Write.to(options.getOutput()));
}
@@ -347,7 +346,7 @@ public class DataflowRunnerTest {
DataflowPipelineOptions dataflowOptions = buildPipelineOptions();
RuntimeTestOptions options = dataflowOptions.as(RuntimeTestOptions.class);
Pipeline p = buildDataflowPipeline(dataflowOptions);
- PCollection<String> unconsumed = p.apply(Read.from(options.getInput()));
+ PCollection<String> unconsumed = p.apply(TextIO.read().from(options.getInput()));
DataflowRunner.fromOptions(dataflowOptions).replaceTransforms(p);
final AtomicBoolean unconsumedSeenAsInput = new AtomicBoolean();
p.traverseTopologically(new PipelineVisitor.Defaults() {
@@ -570,7 +569,7 @@ public class DataflowRunnerTest {
@Test
public void testNonGcsFilePathInReadFailure() throws IOException {
Pipeline p = buildDataflowPipeline(buildPipelineOptions());
- p.apply("ReadMyNonGcsFile", TextIO.Read.from(tmpFolder.newFile().getPath()));
+ p.apply("ReadMyNonGcsFile", TextIO.read().from(tmpFolder.newFile().getPath()));
thrown.expectCause(Matchers.allOf(
instanceOf(IllegalArgumentException.class),
@@ -587,7 +586,7 @@ public class DataflowRunnerTest {
public void testNonGcsFilePathInWriteFailure() throws IOException {
Pipeline p = buildDataflowPipeline(buildPipelineOptions());
- p.apply("ReadMyGcsFile", TextIO.Read.from("gs://bucket/object"))
+ p.apply("ReadMyGcsFile", TextIO.read().from("gs://bucket/object"))
.apply("WriteMyNonGcsFile", TextIO.Write.to("/tmp/file"));
thrown.expect(IllegalArgumentException.class);
@@ -598,7 +597,7 @@ public class DataflowRunnerTest {
@Test
public void testMultiSlashGcsFileReadPath() throws IOException {
Pipeline p = buildDataflowPipeline(buildPipelineOptions());
- p.apply("ReadInvalidGcsFile", TextIO.Read.from("gs://bucket/tmp//file"));
+ p.apply("ReadInvalidGcsFile", TextIO.read().from("gs://bucket/tmp//file"));
thrown.expectCause(Matchers.allOf(
instanceOf(IllegalArgumentException.class),
@@ -613,7 +612,7 @@ public class DataflowRunnerTest {
@Test
public void testMultiSlashGcsFileWritePath() throws IOException {
Pipeline p = buildDataflowPipeline(buildPipelineOptions());
- PCollection<String> pc = p.apply("ReadMyGcsFile", TextIO.Read.from("gs://bucket/object"));
+ PCollection<String> pc = p.apply("ReadMyGcsFile", TextIO.read().from("gs://bucket/object"));
pc.apply("WriteInvalidGcsFile", TextIO.Write.to("gs://bucket/tmp//file"));
thrown.expect(IllegalArgumentException.class);
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
index 0e6faad..2bcf140 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
@@ -125,7 +125,7 @@ public class WordCount {
// Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the
// static FormatAsTextFn() to the ParDo transform.
- p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))
+ p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply(new CountWords())
.apply(MapElements.via(new FormatAsTextFn()))
.apply("WriteCounts", TextIO.Write.to(options.getOutput()));
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index d578a7a..7cb9386 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -79,11 +79,11 @@ import org.slf4j.LoggerFactory;
* // A root PTransform, like TextIO.Read or Create, gets added
* // to the Pipeline by being applied:
* PCollection<String> lines =
- * p.apply(TextIO.Read.from("gs://bucket/dir/file*.txt"));
+ * p.apply(TextIO.read().from("gs://bucket/dir/file*.txt"));
*
* // A Pipeline can have multiple root transforms:
* PCollection<String> moreLines =
- * p.apply(TextIO.Read.from("gs://bucket/other/dir/file*.txt"));
+ * p.apply(TextIO.read().from("gs://bucket/other/dir/file*.txt"));
* PCollection<String> yetMoreLines =
* p.apply(Create.of("yet", "more", "lines").withCoder(StringUtf8Coder.of()));
*
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index f735019..31d2c3d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -73,7 +73,7 @@ import org.apache.beam.sdk.values.PDone;
*
* // A simple Read of a local file (only runs locally):
* PCollection<String> lines =
- * p.apply(TextIO.Read.from("/local/path/to/file.txt"));
+ * p.apply(TextIO.read().from("/local/path/to/file.txt"));
* }</pre>
*
* <p>To write a {@link PCollection} to one or more text files, use
@@ -109,174 +109,131 @@ import org.apache.beam.sdk.values.PDone;
*/
public class TextIO {
/**
- * A {@link PTransform} that reads from a text file (or multiple text
- * files matching a pattern) and returns a {@link PCollection} containing
- * the decoding of each of the lines of the text file(s) as a {@link String}.
+ * Reads from one or more text files and returns a bounded {@link PCollection} containing one
+ * element for each line of the input files.
*/
- public static class Read {
+ public static Read read() {
+ return new Read();
+ }
- /**
- * Returns a transform for reading text files that reads from the file(s)
- * with the given filename or filename pattern. This can be a local path (if running locally),
- * or a Google Cloud Storage filename or filename pattern of the form
- * {@code "gs://<bucket>/<filepath>"} (if running locally or using remote execution)
- * service). Standard <a href="http://docs.oracle.com/javase/tutorial/essential/io/find.html"
- * >Java Filesystem glob patterns</a> ("*", "?", "[..]") are supported.
- */
- public static Bound from(String filepattern) {
- return new Bound().from(filepattern);
+ /** Implementation of {@link #read}. */
+ public static class Read extends PTransform<PBegin, PCollection<String>> {
+ /** The filepattern to read from. */
+ @Nullable private final ValueProvider<String> filepattern;
+
+ /** Option to indicate the input source's compression type. Default is AUTO. */
+ private final TextIO.CompressionType compressionType;
+
+ private Read() {
+ this(null, null, TextIO.CompressionType.AUTO);
}
- /**
- * Same as {@code from(filepattern)}, but accepting a {@link ValueProvider}.
- */
- public static Bound from(ValueProvider<String> filepattern) {
- return new Bound().from(filepattern);
+ private Read(
+ @Nullable String name,
+ @Nullable ValueProvider<String> filepattern,
+ TextIO.CompressionType compressionType) {
+ super(name);
+ this.filepattern = filepattern;
+ this.compressionType = compressionType;
}
/**
- * Returns a transform for reading text files that decompresses all input files
- * using the specified compression type.
+ * Reads text files that reads from the file(s) with the given filename or filename pattern.
*
- * <p>If no compression type is specified, the default is {@link TextIO.CompressionType#AUTO}.
- * In this mode, the compression type of the file is determined by its extension
- * (e.g., {@code *.gz} is gzipped, {@code *.bz2} is bzipped, and all other extensions are
- * uncompressed).
+ * <p>This can be a local path (if running locally), or a Google Cloud Storage filename or
+ * filename pattern of the form {@code "gs://<bucket>/<filepath>"} (if running locally or using
+ * remote execution service).
+ *
+ * <p>Standard <a href="http://docs.oracle.com/javase/tutorial/essential/io/find.html" >Java
+ * Filesystem glob patterns</a> ("*", "?", "[..]") are supported.
*/
- public static Bound withCompressionType(TextIO.CompressionType compressionType) {
- return new Bound().withCompressionType(compressionType);
+ public Read from(String filepattern) {
+ checkNotNull(filepattern, "Filepattern cannot be empty.");
+ return new Read(name, StaticValueProvider.of(filepattern), compressionType);
}
- // TODO: strippingNewlines, etc.
+ /** Same as {@code from(filepattern)}, but accepting a {@link ValueProvider}. */
+ public Read from(ValueProvider<String> filepattern) {
+ checkNotNull(filepattern, "Filepattern cannot be empty.");
+ return new Read(name, filepattern, compressionType);
+ }
/**
- * A {@link PTransform} that reads from one or more text files and returns a bounded
- * {@link PCollection} containing one element for each line of the input files.
+ * Returns a new transform for reading from text files that's like this one but
+ * reads from input sources using the specified compression type.
+ *
+ * <p>If no compression type is specified, the default is {@link TextIO.CompressionType#AUTO}.
*/
- public static class Bound extends PTransform<PBegin, PCollection<String>> {
- /** The filepattern to read from. */
- @Nullable private final ValueProvider<String> filepattern;
-
- /** Option to indicate the input source's compression type. Default is AUTO. */
- private final TextIO.CompressionType compressionType;
-
- private Bound() {
- this(null, null, TextIO.CompressionType.AUTO);
- }
-
- private Bound(
- @Nullable String name,
- @Nullable ValueProvider<String> filepattern,
- TextIO.CompressionType compressionType) {
- super(name);
- this.filepattern = filepattern;
- this.compressionType = compressionType;
- }
-
- /**
- * Returns a new transform for reading from text files that's like this one but
- * that reads from the file(s) with the given name or pattern. See {@link TextIO.Read#from}
- * for a description of filepatterns.
- *
- * <p>Does not modify this object.
-
- */
- public Bound from(String filepattern) {
- checkNotNull(filepattern, "Filepattern cannot be empty.");
- return new Bound(name, StaticValueProvider.of(filepattern), compressionType);
- }
-
- /**
- * Same as {@code from(filepattern)}, but accepting a {@link ValueProvider}.
- */
- public Bound from(ValueProvider<String> filepattern) {
- checkNotNull(filepattern, "Filepattern cannot be empty.");
- return new Bound(name, filepattern, compressionType);
- }
-
- /**
- * Returns a new transform for reading from text files that's like this one but
- * reads from input sources using the specified compression type.
- *
- * <p>If no compression type is specified, the default is {@link TextIO.CompressionType#AUTO}.
- * See {@link TextIO.Read#withCompressionType} for more details.
- *
- * <p>Does not modify this object.
- */
- public Bound withCompressionType(TextIO.CompressionType compressionType) {
- return new Bound(name, filepattern, compressionType);
- }
-
- @Override
- public PCollection<String> expand(PBegin input) {
- if (filepattern == null) {
- throw new IllegalStateException("need to set the filepattern of a TextIO.Read transform");
- }
-
- final Bounded<String> read = org.apache.beam.sdk.io.Read.from(getSource());
- PCollection<String> pcol = input.getPipeline().apply("Read", read);
- // Honor the default output coder that would have been used by this PTransform.
- pcol.setCoder(getDefaultOutputCoder());
- return pcol;
- }
+ public Read withCompressionType(TextIO.CompressionType compressionType) {
+ return new Read(name, filepattern, compressionType);
+ }
- // Helper to create a source specific to the requested compression type.
- protected FileBasedSource<String> getSource() {
- switch (compressionType) {
- case UNCOMPRESSED:
- return new TextSource(filepattern);
- case AUTO:
- return CompressedSource.from(new TextSource(filepattern));
- case BZIP2:
- return
- CompressedSource.from(new TextSource(filepattern))
- .withDecompression(CompressedSource.CompressionMode.BZIP2);
- case GZIP:
- return
- CompressedSource.from(new TextSource(filepattern))
- .withDecompression(CompressedSource.CompressionMode.GZIP);
- case ZIP:
- return
- CompressedSource.from(new TextSource(filepattern))
- .withDecompression(CompressedSource.CompressionMode.ZIP);
- case DEFLATE:
- return
- CompressedSource.from(new TextSource(filepattern))
- .withDecompression(CompressedSource.CompressionMode.DEFLATE);
- default:
- throw new IllegalArgumentException("Unknown compression type: " + compressionType);
- }
+ @Override
+ public PCollection<String> expand(PBegin input) {
+ if (filepattern == null) {
+ throw new IllegalStateException("need to set the filepattern of a TextIO.Read transform");
}
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
+ final Bounded<String> read = org.apache.beam.sdk.io.Read.from(getSource());
+ PCollection<String> pcol = input.getPipeline().apply("Read", read);
+ // Honor the default output coder that would have been used by this PTransform.
+ pcol.setCoder(getDefaultOutputCoder());
+ return pcol;
+ }
- String filepatternDisplay = filepattern.isAccessible()
- ? filepattern.get() : filepattern.toString();
- builder
- .add(DisplayData.item("compressionType", compressionType.toString())
- .withLabel("Compression Type"))
- .addIfNotNull(DisplayData.item("filePattern", filepatternDisplay)
- .withLabel("File Pattern"));
+ // Helper to create a source specific to the requested compression type.
+ protected FileBasedSource<String> getSource() {
+ switch (compressionType) {
+ case UNCOMPRESSED:
+ return new TextSource(filepattern);
+ case AUTO:
+ return CompressedSource.from(new TextSource(filepattern));
+ case BZIP2:
+ return
+ CompressedSource.from(new TextSource(filepattern))
+ .withDecompression(CompressedSource.CompressionMode.BZIP2);
+ case GZIP:
+ return
+ CompressedSource.from(new TextSource(filepattern))
+ .withDecompression(CompressedSource.CompressionMode.GZIP);
+ case ZIP:
+ return
+ CompressedSource.from(new TextSource(filepattern))
+ .withDecompression(CompressedSource.CompressionMode.ZIP);
+ case DEFLATE:
+ return
+ CompressedSource.from(new TextSource(filepattern))
+ .withDecompression(CompressedSource.CompressionMode.DEFLATE);
+ default:
+ throw new IllegalArgumentException("Unknown compression type: " + compressionType);
}
+ }
- @Override
- protected Coder<String> getDefaultOutputCoder() {
- return StringUtf8Coder.of();
- }
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+
+ String filepatternDisplay = filepattern.isAccessible()
+ ? filepattern.get() : filepattern.toString();
+ builder
+ .add(DisplayData.item("compressionType", compressionType.toString())
+ .withLabel("Compression Type"))
+ .addIfNotNull(DisplayData.item("filePattern", filepatternDisplay)
+ .withLabel("File Pattern"));
+ }
- public String getFilepattern() {
- return filepattern.get();
- }
+ @Override
+ protected Coder<String> getDefaultOutputCoder() {
+ return StringUtf8Coder.of();
+ }
- public TextIO.CompressionType getCompressionType() {
- return compressionType;
- }
+ public String getFilepattern() {
+ return filepattern.get();
}
- /** Disallow construction of utility classes. */
- private Read() {}
+ public TextIO.CompressionType getCompressionType() {
+ return compressionType;
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 2ba1797..8a7965c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -220,7 +220,7 @@ public class TextIOTest {
}
}
- TextIO.Read.Bound read = TextIO.Read.from(filename);
+ TextIO.Read read = TextIO.read().from(filename);
PCollection<String> output = p.apply(read);
@@ -246,15 +246,15 @@ public class TextIOTest {
assertEquals(
"TextIO.Read/Read.out",
- p.apply(TextIO.Read.from("somefile")).getName());
+ p.apply(TextIO.read().from("somefile")).getName());
assertEquals(
"MyRead/Read.out",
- p.apply("MyRead", TextIO.Read.from(emptyTxt.getPath())).getName());
+ p.apply("MyRead", TextIO.read().from(emptyTxt.getPath())).getName());
}
@Test
public void testReadDisplayData() {
- TextIO.Read.Bound read = TextIO.Read
+ TextIO.Read read = TextIO.read()
.from("foo.*")
.withCompressionType(BZIP2);
@@ -269,7 +269,7 @@ public class TextIOTest {
public void testPrimitiveReadDisplayData() {
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
- TextIO.Read.Bound read = TextIO.Read
+ TextIO.Read read = TextIO.read()
.from("foobar");
Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
@@ -572,15 +572,15 @@ public class TextIOTest {
RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
p
- .apply(TextIO.Read.from(options.getInput()))
+ .apply(TextIO.read().from(options.getInput()))
.apply(TextIO.Write.to(options.getOutput()));
}
@Test
public void testCompressionTypeIsSet() throws Exception {
- TextIO.Read.Bound read = TextIO.Read.from("/tmp/test");
+ TextIO.Read read = TextIO.read().from("/tmp/test");
assertEquals(AUTO, read.getCompressionType());
- read = TextIO.Read.from("/tmp/test").withCompressionType(GZIP);
+ read = TextIO.read().from("/tmp/test").withCompressionType(GZIP);
assertEquals(GZIP, read.getCompressionType());
}
@@ -597,14 +597,14 @@ public class TextIOTest {
}
/**
- * Helper method that runs TextIO.Read.from(filename).withCompressionType(compressionType)
+ * Helper method that runs TextIO.read().from(filename).withCompressionType(compressionType)
* and asserts that the results match the given expected output.
*/
private void assertReadingCompressedFileMatchesExpected(
File file, CompressionType compressionType, String[] expected) {
- TextIO.Read.Bound read =
- TextIO.Read.from(file.getPath()).withCompressionType(compressionType);
+ TextIO.Read read =
+ TextIO.read().from(file.getPath()).withCompressionType(compressionType);
PCollection<String> output = p.apply("Read_" + file + "_" + compressionType.toString(), read);
PAssert.that(output).containsInAnyOrder(expected);
@@ -825,9 +825,9 @@ public class TextIOTest {
@Test
public void testTextIOGetName() {
- assertEquals("TextIO.Read", TextIO.Read.from("somefile").getName());
+ assertEquals("TextIO.Read", TextIO.read().from("somefile").getName());
assertEquals("TextIO.Write", TextIO.Write.to("somefile").getName());
- assertEquals("TextIO.Read", TextIO.Read.from("somefile").toString());
+ assertEquals("TextIO.Read", TextIO.read().from("somefile").toString());
}
@Test
@@ -1075,7 +1075,7 @@ public class TextIOTest {
// Sanity check: file is at least 2 bundles long.
assertThat(largeTxt.length(), greaterThan(2 * desiredBundleSize));
- FileBasedSource<String> source = TextIO.Read.from(largeTxt.getPath()).getSource();
+ FileBasedSource<String> source = TextIO.read().from(largeTxt.getPath()).getSource();
List<? extends FileBasedSource<String>> splits =
source.split(desiredBundleSize, options);
@@ -1092,7 +1092,7 @@ public class TextIOTest {
// Sanity check: file is at least 2 bundles long.
assertThat(largeGz.length(), greaterThan(2 * desiredBundleSize));
- FileBasedSource<String> source = TextIO.Read.from(largeGz.getPath()).getSource();
+ FileBasedSource<String> source = TextIO.read().from(largeGz.getPath()).getSource();
List<? extends FileBasedSource<String>> splits =
source.split(desiredBundleSize, options);
@@ -1110,7 +1110,7 @@ public class TextIOTest {
assertThat(largeTxt.length(), greaterThan(2 * desiredBundleSize));
FileBasedSource<String> source =
- TextIO.Read.from(largeTxt.getPath()).withCompressionType(GZIP).getSource();
+ TextIO.read().from(largeTxt.getPath()).withCompressionType(GZIP).getSource();
List<? extends FileBasedSource<String>> splits =
source.split(desiredBundleSize, options);
@@ -1128,7 +1128,7 @@ public class TextIOTest {
assertThat(largeGz.length(), greaterThan(2 * desiredBundleSize));
FileBasedSource<String> source =
- TextIO.Read.from(largeGz.getPath()).withCompressionType(GZIP).getSource();
+ TextIO.read().from(largeGz.getPath()).withCompressionType(GZIP).getSource();
List<? extends FileBasedSource<String>> splits =
source.split(desiredBundleSize, options);
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
index 07b6b4a..29d9774 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
@@ -121,7 +121,7 @@ public class TransformTreeTest {
final PTransform<PCollection<String>, PCollection<Iterable<String>>> sample =
Sample.fixedSizeGlobally(10);
- p.apply("ReadMyFile", TextIO.Read.from(inputFile.getPath()))
+ p.apply("ReadMyFile", TextIO.read().from(inputFile.getPath()))
.apply(sample)
.apply(Flatten.<String>iterables())
.apply("WriteMyFile", TextIO.Write.to(outputFile.getPath()));
@@ -167,7 +167,7 @@ public class TransformTreeTest {
assertThat(transform, not(instanceOf(Combine.Globally.class)));
assertThat(transform, not(instanceOf(WriteFiles.class)));
if (transform instanceof Read.Bounded
- && node.getEnclosingNode().getTransform() instanceof TextIO.Read.Bound) {
+ && node.getEnclosingNode().getTransform() instanceof TextIO.Read) {
assertTrue(visited.add(TransformsSeen.READ));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
index 6937405..f3dc378 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
@@ -94,7 +94,7 @@ public class DisplayDataEvaluatorTest implements Serializable {
@Test
public void testSourceTransform() {
- PTransform<? super PBegin, ? extends POutput> myTransform = TextIO.Read
+ PTransform<? super PBegin, ? extends POutput> myTransform = TextIO.read()
.from("foo.*");
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
index 30b0311..5e6580b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
@@ -215,7 +215,7 @@ public class WindowingTest implements Serializable {
}
PCollection<String> output = p.begin()
- .apply("ReadLines", TextIO.Read.from(filename))
+ .apply("ReadLines", TextIO.read().from(filename))
.apply(ParDo.of(new ExtractWordsWithTimestampsFn()))
.apply(new WindowedCount(FixedWindows.of(Duration.millis(10))));
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
index a5957b5..cf86c36 100644
--- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
+++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
@@ -221,7 +221,8 @@ public class HadoopFileSystemTest {
.as(HadoopFileSystemOptions.class);
options.setHdfsConfiguration(ImmutableList.of(fileSystem.fileSystem.getConf()));
FileSystems.setDefaultConfigInWorkers(options);
- PCollection<String> pc = p.apply(TextIO.Read.from(testPath("testFile*").toString()));
+ PCollection<String> pc = p.apply(
+ TextIO.read().from(testPath("testFile*").toString()));
PAssert.that(pc).containsInAnyOrder("testDataA", "testDataB", "testDataC");
p.run();
}