You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/05/02 19:23:24 UTC
[6/7] beam git commit: Removes TextIO.Write.Bound
Removes TextIO.Write.Bound
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4f5098dd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4f5098dd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4f5098dd
Branch: refs/heads/master
Commit: 4f5098ddf641f97417955ea33f429385d6fce384
Parents: 987b4e6
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 28 17:28:06 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue May 2 12:20:14 2017 -0700
----------------------------------------------------------------------
.../apache/beam/examples/MinimalWordCount.java | 2 +-
.../org/apache/beam/examples/WordCount.java | 2 +-
.../examples/common/WriteOneFilePerWindow.java | 2 +-
.../apache/beam/examples/complete/TfIdf.java | 2 +-
.../examples/complete/TopWikipediaSessions.java | 2 +-
.../beam/examples/cookbook/DistinctExample.java | 2 +-
.../beam/examples/cookbook/JoinExamples.java | 2 +-
.../beam/examples/MinimalWordCountJava8.java | 2 +-
.../examples/MinimalWordCountJava8Test.java | 2 +-
.../runners/apex/examples/WordCountTest.java | 2 +-
.../direct/WriteWithShardingFactoryTest.java | 2 +-
.../beam/runners/flink/ReadSourceITCase.java | 2 +-
.../flink/ReadSourceStreamingITCase.java | 2 +-
.../flink/streaming/GroupByNullKeyTest.java | 2 +-
.../streaming/TopWikipediaSessionsITCase.java | 2 +-
.../DataflowPipelineTranslatorTest.java | 4 +-
.../runners/dataflow/DataflowRunnerTest.java | 8 +-
.../beam/runners/spark/examples/WordCount.java | 2 +-
.../runners/spark/SparkRunnerDebuggerTest.java | 2 +-
.../beam/runners/spark/io/NumShardsTest.java | 3 +-
.../main/java/org/apache/beam/sdk/Pipeline.java | 2 +-
.../apache/beam/sdk/io/ShardNameTemplate.java | 2 +-
.../java/org/apache/beam/sdk/io/TextIO.java | 553 +++++++------------
.../org/apache/beam/sdk/io/package-info.java | 2 +-
.../java/org/apache/beam/sdk/io/TextIOTest.java | 20 +-
.../beam/sdk/runners/TransformTreeTest.java | 2 +-
.../org/apache/beam/sdk/util/NameUtilsTest.java | 2 +-
.../org/apache/beam/sdk/values/PDoneTest.java | 2 +-
28 files changed, 249 insertions(+), 385 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
index 4a0c1bb..5ac8080 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
@@ -111,7 +111,7 @@ public class MinimalWordCount {
// formatted strings) to a series of text files.
//
// By default, it will write to a set of files with names like wordcount-00001-of-00005
- .apply(TextIO.Write.to("wordcounts"));
+ .apply(TextIO.write().to("wordcounts"));
// Run the pipeline.
p.run().waitUntilFinish();
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
index e331a86..bfa7eb3 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
@@ -179,7 +179,7 @@ public class WordCount {
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply(new CountWords())
.apply(MapElements.via(new FormatAsTextFn()))
- .apply("WriteCounts", TextIO.Write.to(options.getOutput()));
+ .apply("WriteCounts", TextIO.write().to(options.getOutput()));
p.run().waitUntilFinish();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
index 6609828..461b46d 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
@@ -48,7 +48,7 @@ public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone
@Override
public PDone expand(PCollection<String> input) {
return input.apply(
- TextIO.Write
+ TextIO.write()
.to(new PerWindowFiles(filenamePrefix))
.withWindowedWrites()
.withNumShards(3));
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
index 1ef69c0..6fd9755 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
@@ -400,7 +400,7 @@ public class TfIdf {
c.element().getValue().getValue()));
}
}))
- .apply(TextIO.Write
+ .apply(TextIO.write()
.to(output)
.withSuffix(".csv"));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
index bb8c8bc..478e2dc 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
@@ -210,7 +210,7 @@ public class TopWikipediaSessions {
p.apply(TextIO.read().from(options.getInput()))
.apply(MapElements.via(new ParseTableRowJson()))
.apply(new ComputeTopSessions(samplingThreshold))
- .apply("Write", TextIO.Write.withoutSharding().to(options.getOutput()));
+ .apply("Write", TextIO.write().withoutSharding().to(options.getOutput()));
p.run().waitUntilFinish();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/examples/java/src/main/java/org/apache/beam/examples/cookbook/DistinctExample.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DistinctExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DistinctExample.java
index 20c8fa0..bb16528 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DistinctExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DistinctExample.java
@@ -89,7 +89,7 @@ public class DistinctExample {
p.apply("ReadLines", TextIO.read().from(options.getInput()))
.apply(Distinct.<String>create())
- .apply("DedupedShakespeare", TextIO.Write.to(options.getOutput()));
+ .apply("DedupedShakespeare", TextIO.write().to(options.getOutput()));
p.run().waitUntilFinish();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
index 05a3ad3..d1fffb4 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
@@ -169,7 +169,7 @@ public class JoinExamples {
PCollection<TableRow> eventsTable = p.apply(BigQueryIO.read().from(GDELT_EVENTS_TABLE));
PCollection<TableRow> countryCodes = p.apply(BigQueryIO.read().from(COUNTRY_CODES));
PCollection<String> formattedResults = joinEvents(eventsTable, countryCodes);
- formattedResults.apply(TextIO.Write.to(options.getOutput()));
+ formattedResults.apply(TextIO.write().to(options.getOutput()));
p.run().waitUntilFinish();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
index 6dabadc..85c291d 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
@@ -65,7 +65,7 @@ public class MinimalWordCountJava8 {
.into(TypeDescriptors.strings())
.via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
// CHANGE 3/3: The Google Cloud Storage path is required for outputting the results to.
- .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));
+ .apply(TextIO.write().to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));
p.run().waitUntilFinish();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
index 689005a..e071b4e 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
@@ -71,7 +71,7 @@ public class MinimalWordCountJava8Test implements Serializable {
.apply(MapElements
.into(TypeDescriptors.strings())
.via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
- .apply(TextIO.Write.to("gs://your-output-bucket/and-output-prefix"));
+ .apply(TextIO.write().to("gs://your-output-bucket/and-output-prefix"));
}
private GcsUtil buildMockGcsUtil() throws IOException {
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
index b0fab0b..83af61b 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
@@ -110,7 +110,7 @@ public class WordCountTest {
.apply(ParDo.of(new ExtractWordsFn()))
.apply(Count.<String>perElement())
.apply(ParDo.of(new FormatAsStringFn()))
- .apply("WriteCounts", TextIO.Write.to(options.getOutput()))
+ .apply("WriteCounts", TextIO.write().to(options.getOutput()))
;
p.run().waitUntilFinish();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index 960640c..53d2ba3 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -86,7 +86,7 @@ public class WriteWithShardingFactoryTest {
String targetLocation = IOChannelUtils.resolve(outputPath, fileName);
// TextIO is implemented in terms of the WriteFiles PTransform. When sharding is not specified,
// resharding should be automatically applied
- p.apply(Create.of(strs)).apply(TextIO.Write.to(targetLocation));
+ p.apply(Create.of(strs)).apply(TextIO.write().to(targetLocation));
p.run();
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
index 65d198e..5985da8 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
@@ -76,7 +76,7 @@ public class ReadSourceITCase extends JavaProgramTestBase {
}
}));
- result.apply(TextIO.Write.to(new URI(resultPath).getPath() + "/part"));
+ result.apply(TextIO.write().to(new URI(resultPath).getPath() + "/part"));
p.run();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
index 4f597c3..0707c21 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
@@ -65,7 +65,7 @@ public class ReadSourceStreamingITCase extends StreamingProgramTestBase {
c.output(c.element().toString());
}
}))
- .apply(TextIO.Write.to(resultPath));
+ .apply(TextIO.write().to(resultPath));
p.run();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
index 82d9f4f..2bd8e72 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
@@ -123,7 +123,7 @@ public class GroupByNullKeyTest extends StreamingProgramTestBase implements Seri
c.output(str.toString());
}
}));
- output.apply(TextIO.Write.to(resultPath));
+ output.apply(TextIO.write().to(resultPath));
p.run();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
index 9e6bba8..28335e3 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
@@ -126,7 +126,7 @@ public class TopWikipediaSessionsITCase extends StreamingProgramTestBase impleme
}
}));
- format.apply(TextIO.Write.to(resultPath));
+ format.apply(TextIO.write().to(resultPath));
p.run();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index d47da45..31c47b4 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -138,7 +138,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
FileSystems.setDefaultConfigInWorkers(options);
p.apply("ReadMyFile", TextIO.read().from("gs://bucket/object"))
- .apply("WriteMyFile", TextIO.Write.to("gs://bucket/object"));
+ .apply("WriteMyFile", TextIO.write().to("gs://bucket/object"));
DataflowRunner runner = DataflowRunner.fromOptions(options);
runner.replaceTransforms(p);
@@ -525,7 +525,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
String stepName = "DoFn1";
pipeline.apply("ReadMyFile", TextIO.read().from("gs://bucket/in"))
.apply(stepName, ParDo.of(new NoOpFn()))
- .apply("WriteMyFile", TextIO.Write.to("gs://bucket/out"));
+ .apply("WriteMyFile", TextIO.write().to("gs://bucket/out"));
DataflowRunner runner = DataflowRunner.fromOptions(options);
runner.replaceTransforms(pipeline);
Job job =
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 7261fe9..d011994 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -172,7 +172,7 @@ public class DataflowRunnerTest {
Pipeline p = Pipeline.create(options);
p.apply("ReadMyFile", TextIO.read().from("gs://bucket/object"))
- .apply("WriteMyFile", TextIO.Write.to("gs://bucket/object"));
+ .apply("WriteMyFile", TextIO.write().to("gs://bucket/object"));
// Enable the FileSystems API to know about gs:// URIs in this test.
FileSystems.setDefaultConfigInWorkers(options);
@@ -335,7 +335,7 @@ public class DataflowRunnerTest {
Pipeline p = buildDataflowPipeline(dataflowOptions);
p
.apply(TextIO.read().from(options.getInput()))
- .apply(TextIO.Write.to(options.getOutput()));
+ .apply(TextIO.write().to(options.getOutput()));
}
/**
@@ -587,7 +587,7 @@ public class DataflowRunnerTest {
Pipeline p = buildDataflowPipeline(buildPipelineOptions());
p.apply("ReadMyGcsFile", TextIO.read().from("gs://bucket/object"))
- .apply("WriteMyNonGcsFile", TextIO.Write.to("/tmp/file"));
+ .apply("WriteMyNonGcsFile", TextIO.write().to("/tmp/file"));
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(containsString("Expected a valid 'gs://' path but was given"));
@@ -613,7 +613,7 @@ public class DataflowRunnerTest {
public void testMultiSlashGcsFileWritePath() throws IOException {
Pipeline p = buildDataflowPipeline(buildPipelineOptions());
PCollection<String> pc = p.apply("ReadMyGcsFile", TextIO.read().from("gs://bucket/object"));
- pc.apply("WriteInvalidGcsFile", TextIO.Write.to("gs://bucket/tmp//file"));
+ pc.apply("WriteInvalidGcsFile", TextIO.write().to("gs://bucket/tmp//file"));
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("consecutive slashes");
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
index 2bcf140..0779bd5 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
@@ -128,7 +128,7 @@ public class WordCount {
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply(new CountWords())
.apply(MapElements.via(new FormatAsTextFn()))
- .apply("WriteCounts", TextIO.Write.to(options.getOutput()));
+ .apply("WriteCounts", TextIO.write().to(options.getOutput()));
p.run().waitUntilFinish();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
index ce52b90..e43bc4e 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
@@ -88,7 +88,7 @@ public class SparkRunnerDebuggerTest {
wordCounts
.apply(MapElements.via(new WordCount.FormatAsTextFn()))
- .apply(TextIO.Write.to("!!PLACEHOLDER-OUTPUT-DIR!!").withNumShards(3).withSuffix(".txt"));
+ .apply(TextIO.write().to("!!PLACEHOLDER-OUTPUT-DIR!!").withNumShards(3).withSuffix(".txt"));
final String expectedPipeline = "sparkContext.parallelize(Arrays.asList(...))\n"
+ "_.mapPartitions(new org.apache.beam.runners.spark.examples.WordCount$ExtractWordsFn())\n"
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
index c936ed3..5021744 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
@@ -73,7 +73,8 @@ public class NumShardsTest {
PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()));
PCollection<String> output = inputWords.apply(new WordCount.CountWords())
.apply(MapElements.via(new WordCount.FormatAsTextFn()));
- output.apply(TextIO.Write.to(outputDir.getAbsolutePath()).withNumShards(3).withSuffix(".txt"));
+ output.apply(
+ TextIO.write().to(outputDir.getAbsolutePath()).withNumShards(3).withSuffix(".txt"));
p.run().waitUntilFinish();
int count = 0;
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index 7cb9386..d4c46cc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -99,7 +99,7 @@ import org.slf4j.LoggerFactory;
* .apply(new Count<String>());
* PCollection<String> formattedWordCounts =
* wordCounts.apply(ParDo.of(new FormatCounts()));
- * formattedWordCounts.apply(TextIO.Write.to("gs://bucket/dir/counts.txt"));
+ * formattedWordCounts.apply(TextIO.write().to("gs://bucket/dir/counts.txt"));
*
* // PTransforms aren't executed when they're applied, rather they're
* // just added to the Pipeline. Once the whole Pipeline of PTransforms
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ShardNameTemplate.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ShardNameTemplate.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ShardNameTemplate.java
index 7f48a5c..cc85242 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ShardNameTemplate.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ShardNameTemplate.java
@@ -45,7 +45,7 @@ package org.apache.beam.sdk.io;
*
* <pre>{@code
* pipeline.apply(
- * TextIO.Write.to("gs://bucket/path")
+ * TextIO.write().to("gs://bucket/path")
* .withShardNameTemplate("-SS-of-NN")
* .withSuffix(".txt"))
* }</pre>
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index f8670a6..2d82572 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -86,9 +86,9 @@ import org.apache.beam.sdk.values.PDone;
*
* <p>By default, all input is put into the global window before writing. If per-window writes are
* desired - for example, when using a streaming runner -
- * {@link TextIO.Write.Bound#withWindowedWrites()} will cause windowing and triggering to be
+ * {@link TextIO.Write#withWindowedWrites()} will cause windowing and triggering to be
* preserved. When producing windowed writes, the number of output shards must be set explicitly
- * using {@link TextIO.Write.Bound#withNumShards(int)}; some runners may set this for you to a
+ * using {@link TextIO.Write#withNumShards(int)}; some runners may set this for you to a
* runner-chosen value, so you may need not set it yourself. A {@link FilenamePolicy} must be
* set, and unique windows and triggers must produce unique filenames.
*
@@ -99,11 +99,11 @@ import org.apache.beam.sdk.values.PDone;
* <pre>{@code
* // A simple Write to a local file (only runs locally):
* PCollection<String> lines = ...;
- * lines.apply(TextIO.Write.to("/path/to/file.txt"));
+ * lines.apply(TextIO.write().to("/path/to/file.txt"));
*
* // Same as above, only with Gzip compression:
* PCollection<String> lines = ...;
- * lines.apply(TextIO.Write.to("/path/to/file.txt"));
+ * lines.apply(TextIO.write().to("/path/to/file.txt"));
* .withSuffix(".txt")
* .withWritableByteChannelFactory(FileBasedSink.CompressionType.GZIP));
* }</pre>
@@ -117,6 +117,15 @@ public class TextIO {
return new AutoValue_TextIO_Read.Builder().setCompressionType(CompressionType.AUTO).build();
}
+ /**
+ * A {@link PTransform} that writes a {@link PCollection} to text file (or
+ * multiple text files matching a sharding pattern), with each
+ * element of the input collection encoded into its own line.
+ */
+ public static Write write() {
+ return new Write();
+ }
+
/** Implementation of {@link #read}. */
@AutoValue
public abstract static class Read extends PTransform<PBegin, PCollection<String>> {
@@ -227,49 +236,105 @@ public class TextIO {
/////////////////////////////////////////////////////////////////////////////
- /**
- * A {@link PTransform} that writes a {@link PCollection} to text file (or
- * multiple text files matching a sharding pattern), with each
- * element of the input collection encoded into its own line.
- */
- public static class Write {
+ /** Implementation of {@link #write}. */
+ public static class Write extends PTransform<PCollection<String>, PDone> {
+ private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
+
+ /** The prefix of each file written, combined with suffix and shardTemplate. */
+ private final ValueProvider<String> filenamePrefix;
+ /** The suffix of each file written, combined with prefix and shardTemplate. */
+ private final String filenameSuffix;
+
+ /** An optional header to add to each file. */
+ @Nullable private final String header;
+
+ /** An optional footer to add to each file. */
+ @Nullable private final String footer;
+
+ /** Requested number of shards. 0 for automatic. */
+ private final int numShards;
+
+ /** The shard template of each file written, combined with prefix and suffix. */
+ private final String shardTemplate;
+
+ /** A policy for naming output files. */
+ private final FilenamePolicy filenamePolicy;
+
+ /** Whether to write windowed output files. */
+ private boolean windowedWrites;
+
+ /**
+ * The {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink}. Default is
+ * {@link FileBasedSink.CompressionType#UNCOMPRESSED}.
+ */
+ private final WritableByteChannelFactory writableByteChannelFactory;
+
+ private Write() {
+ this(null, null, "", null, null, 0, DEFAULT_SHARD_TEMPLATE,
+ FileBasedSink.CompressionType.UNCOMPRESSED, null, false);
+ }
+
+ private Write(String name, ValueProvider<String> filenamePrefix, String filenameSuffix,
+ @Nullable String header, @Nullable String footer, int numShards,
+ String shardTemplate,
+ WritableByteChannelFactory writableByteChannelFactory,
+ FilenamePolicy filenamePolicy,
+ boolean windowedWrites) {
+ super(name);
+ this.header = header;
+ this.footer = footer;
+ this.filenamePrefix = filenamePrefix;
+ this.filenameSuffix = filenameSuffix;
+ this.numShards = numShards;
+ this.shardTemplate = shardTemplate;
+ this.writableByteChannelFactory =
+ firstNonNull(writableByteChannelFactory, FileBasedSink.CompressionType.UNCOMPRESSED);
+ this.filenamePolicy = filenamePolicy;
+ this.windowedWrites = windowedWrites;
+ }
/**
- * Returns a transform for writing to text files that writes to the file(s)
- * with the given prefix. This can be a local filename
+ * Writes to text files with the given prefix. This can be a local filename
* (if running locally), or a Google Cloud Storage filename of
* the form {@code "gs://<bucket>/<filepath>"}
* (if running locally or using remote execution).
*
* <p>The files written will begin with this prefix, followed by
- * a shard identifier (see {@link Bound#withNumShards(int)}, and end
- * in a common extension, if given by {@link Bound#withSuffix(String)}.
+ * a shard identifier (see {@link #withNumShards(int)}, and end
+ * in a common extension, if given by {@link #withSuffix(String)}.
*/
- public static Bound to(String prefix) {
- return new Bound().to(prefix);
+ public Write to(String filenamePrefix) {
+ validateOutputComponent(filenamePrefix);
+ return new Write(name, StaticValueProvider.of(filenamePrefix), filenameSuffix,
+ header, footer, numShards, shardTemplate,
+ writableByteChannelFactory, filenamePolicy, windowedWrites);
}
- public static Bound to(FilenamePolicy filenamePolicy) {
- return new Bound().to(filenamePolicy);
-
+ /** Like {@link #to(String)}, but with a {@link ValueProvider}. */
+ public Write to(ValueProvider<String> filenamePrefix) {
+ return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
+ shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
}
- /**
- * Like {@link #to(String)}, but with a {@link ValueProvider}.
- */
- public static Bound to(ValueProvider<String> prefix) {
- return new Bound().to(prefix);
+
+ /** Like {@link #to(String)}, but with a {@link FilenamePolicy}. */
+ public Write to(FilenamePolicy filenamePolicy) {
+ return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
+ shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
}
/**
- * Returns a transform for writing to text files that appends the specified suffix
- * to the created files.
+ * Writes to the file(s) with the given filename suffix.
+ *
+ * @see ShardNameTemplate
*/
- public static Bound withSuffix(String nameExtension) {
- return new Bound().withSuffix(nameExtension);
+ public Write withSuffix(String nameExtension) {
+ validateOutputComponent(nameExtension);
+ return new Write(name, filenamePrefix, nameExtension, header, footer, numShards,
+ shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
}
/**
- * Returns a transform for writing to text files that uses the provided shard count.
+ * Uses the provided shard count.
*
* <p>Constraining the number of shards is likely to reduce
* the performance of a pipeline. Setting this value is not recommended
@@ -277,371 +342,169 @@ public class TextIO {
*
* @param numShards the number of shards to use, or 0 to let the system
* decide.
+ * @see ShardNameTemplate
*/
- public static Bound withNumShards(int numShards) {
- return new Bound().withNumShards(numShards);
+ public Write withNumShards(int numShards) {
+ checkArgument(numShards >= 0);
+ return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
+ shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
}
/**
- * Returns a transform for writing to text files that uses the given shard name
- * template.
+ * Uses the given shard name template.
*
- * <p>See {@link ShardNameTemplate} for a description of shard templates.
+ * @see ShardNameTemplate
*/
- public static Bound withShardNameTemplate(String shardTemplate) {
- return new Bound().withShardNameTemplate(shardTemplate);
+ public Write withShardNameTemplate(String shardTemplate) {
+ return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
+ shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
}
/**
- * Returns a transform for writing to text files that forces a single file as
- * output.
+ * Forces a single file as output.
+ *
+ * <p>Constraining the number of shards is likely to reduce
+ * the performance of a pipeline. Using this setting is not recommended
+ * unless you truly require a single output file.
+ *
+ * <p>This is a shortcut for
+ * {@code .withNumShards(1).withShardNameTemplate("")}
*/
- public static Bound withoutSharding() {
- return new Bound().withoutSharding();
+ public Write withoutSharding() {
+ return new Write(name, filenamePrefix, filenameSuffix, header, footer, 1, "",
+ writableByteChannelFactory, filenamePolicy, windowedWrites);
}
/**
- * Returns a transform for writing to text files that adds a header string to the files
- * it writes. Note that a newline character will be added after the header.
+ * Adds a header string to each file. A newline after the header is added automatically.
*
* <p>A {@code null} value will clear any previously configured header.
- *
- * @param header the string to be added as file header
*/
- public static Bound withHeader(@Nullable String header) {
- return new Bound().withHeader(header);
+ public Write withHeader(@Nullable String header) {
+ return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
+ shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
}
/**
- * Returns a transform for writing to text files that adds a footer string to the files
- * it writes. Note that a newline character will be added after the header.
+ * Adds a footer string to each file. A newline after the footer is added automatically.
*
* <p>A {@code null} value will clear any previously configured footer.
- *
- * @param footer the string to be added as file footer
*/
- public static Bound withFooter(@Nullable String footer) {
- return new Bound().withFooter(footer);
+ public Write withFooter(@Nullable String footer) {
+ return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
+ shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
}
/**
* Returns a transform for writing to text files like this one but that has the given
- * {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink} during output. The
- * default is value is {@link FileBasedSink.CompressionType#UNCOMPRESSED}.
+ * {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink} during output.
+ * The default is value is {@link FileBasedSink.CompressionType#UNCOMPRESSED}.
*
* <p>A {@code null} value will reset the value to the default value mentioned above.
- *
- * @param writableByteChannelFactory the factory to be used during output
*/
- public static Bound withWritableByteChannelFactory(
+ public Write withWritableByteChannelFactory(
WritableByteChannelFactory writableByteChannelFactory) {
- return new Bound().withWritableByteChannelFactory(writableByteChannelFactory);
+ return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
+ shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
}
- // TODO: appendingNewlines, etc.
-
- /**
- * A PTransform that writes a bounded PCollection to a text file (or
- * multiple text files matching a sharding pattern), with each
- * PCollection element being encoded into its own line.
- */
- public static class Bound extends PTransform<PCollection<String>, PDone> {
- private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
-
- /** The prefix of each file written, combined with suffix and shardTemplate. */
- private final ValueProvider<String> filenamePrefix;
- /** The suffix of each file written, combined with prefix and shardTemplate. */
- private final String filenameSuffix;
-
- /** An optional header to add to each file. */
- @Nullable private final String header;
-
- /** An optional footer to add to each file. */
- @Nullable private final String footer;
-
- /** Requested number of shards. 0 for automatic. */
- private final int numShards;
-
- /** The shard template of each file written, combined with prefix and suffix. */
- private final String shardTemplate;
-
- /** A policy for naming output files. */
- private final FilenamePolicy filenamePolicy;
-
- /** Whether to write windowed output files. */
- private boolean windowedWrites;
-
- /**
- * The {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink}. Default is
- * {@link FileBasedSink.CompressionType#UNCOMPRESSED}.
- */
- private final WritableByteChannelFactory writableByteChannelFactory;
-
- private Bound() {
- this(null, null, "", null, null, 0, DEFAULT_SHARD_TEMPLATE,
- FileBasedSink.CompressionType.UNCOMPRESSED, null, false);
- }
-
- private Bound(String name, ValueProvider<String> filenamePrefix, String filenameSuffix,
- @Nullable String header, @Nullable String footer, int numShards,
- String shardTemplate,
- WritableByteChannelFactory writableByteChannelFactory,
- FilenamePolicy filenamePolicy,
- boolean windowedWrites) {
- super(name);
- this.header = header;
- this.footer = footer;
- this.filenamePrefix = filenamePrefix;
- this.filenameSuffix = filenameSuffix;
- this.numShards = numShards;
- this.shardTemplate = shardTemplate;
- this.writableByteChannelFactory =
- firstNonNull(writableByteChannelFactory, FileBasedSink.CompressionType.UNCOMPRESSED);
- this.filenamePolicy = filenamePolicy;
- this.windowedWrites = windowedWrites;
- }
-
- /**
- * Returns a transform for writing to text files that's like this one but
- * that writes to the file(s) with the given filename prefix.
- *
- * <p>See {@link TextIO.Write#to(String) Write.to(String)} for more information.
- *
- * <p>Does not modify this object.
- */
- public Bound to(String filenamePrefix) {
- validateOutputComponent(filenamePrefix);
- return new Bound(name, StaticValueProvider.of(filenamePrefix), filenameSuffix,
- header, footer, numShards, shardTemplate,
- writableByteChannelFactory, filenamePolicy, windowedWrites);
- }
-
- /**
- * Like {@link #to(String)}, but with a {@link ValueProvider}.
- */
- public Bound to(ValueProvider<String> filenamePrefix) {
- return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
- shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
- }
-
- /**
- * Like {@link #to(String)}, but with a {@link FilenamePolicy}.
- */
- public Bound to(FilenamePolicy filenamePolicy) {
- return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
- shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
- }
-
- /**
- * Returns a transform for writing to text files that that's like this one but
- * that writes to the file(s) with the given filename suffix.
- *
- * <p>Does not modify this object.
- *
- * @see ShardNameTemplate
- */
- public Bound withSuffix(String nameExtension) {
- validateOutputComponent(nameExtension);
- return new Bound(name, filenamePrefix, nameExtension, header, footer, numShards,
- shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
- }
-
- /**
- * Returns a transform for writing to text files that's like this one but
- * that uses the provided shard count.
- *
- * <p>Constraining the number of shards is likely to reduce
- * the performance of a pipeline. Setting this value is not recommended
- * unless you require a specific number of output files.
- *
- * <p>Does not modify this object.
- *
- * @param numShards the number of shards to use, or 0 to let the system
- * decide.
- * @see ShardNameTemplate
- */
- public Bound withNumShards(int numShards) {
- checkArgument(numShards >= 0);
- return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
- shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
- }
-
- /**
- * Returns a transform for writing to text files that's like this one but
- * that uses the given shard name template.
- *
- * <p>Does not modify this object.
- *
- * @see ShardNameTemplate
- */
- public Bound withShardNameTemplate(String shardTemplate) {
- return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
- shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
- }
-
- /**
- * Returns a transform for writing to text files that's like this one but
- * that forces a single file as output.
- *
- * <p>Constraining the number of shards is likely to reduce
- * the performance of a pipeline. Using this setting is not recommended
- * unless you truly require a single output file.
- *
- * <p>This is a shortcut for
- * {@code .withNumShards(1).withShardNameTemplate("")}
- *
- * <p>Does not modify this object.
- */
- public Bound withoutSharding() {
- return new Bound(name, filenamePrefix, filenameSuffix, header, footer, 1, "",
- writableByteChannelFactory, filenamePolicy, windowedWrites);
- }
+ public Write withWindowedWrites() {
+ return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
+ shardTemplate, writableByteChannelFactory, filenamePolicy, true);
+ }
- /**
- * Returns a transform for writing to text files that adds a header string to the files
- * it writes. Note that a newline character will be added after the header.
- *
- * <p>A {@code null} value will clear any previously configured header.
- *
- * <p>Does not modify this object.
- *
- * @param header the string to be added as file header
- */
- public Bound withHeader(@Nullable String header) {
- return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
- shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
+ @Override
+ public PDone expand(PCollection<String> input) {
+ if (filenamePolicy == null && filenamePrefix == null) {
+ throw new IllegalStateException(
+ "need to set the filename prefix of an TextIO.Write transform");
}
-
- /**
- * Returns a transform for writing to text files that adds a footer string to the files
- * it writes. Note that a newline character will be added after the header.
- *
- * <p>A {@code null} value will clear any previously configured footer.
- *
- * <p>Does not modify this object.
- *
- * @param footer the string to be added as file footer
- */
- public Bound withFooter(@Nullable String footer) {
- return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
- shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
+ if (filenamePolicy != null && filenamePrefix != null) {
+ throw new IllegalStateException(
+ "cannot set both a filename policy and a filename prefix");
}
-
- /**
- * Returns a transform for writing to text files like this one but that has the given
- * {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink} during output.
- * The default is value is {@link FileBasedSink.CompressionType#UNCOMPRESSED}.
- *
- * <p>A {@code null} value will reset the value to the default value mentioned above.
- *
- * <p>Does not modify this object.
- *
- * @param writableByteChannelFactory the factory to be used during output
- */
- public Bound withWritableByteChannelFactory(
- WritableByteChannelFactory writableByteChannelFactory) {
- return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
- shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
+ WriteFiles<String> write = null;
+ if (filenamePolicy != null) {
+ write = WriteFiles.to(
+ new TextSink(filenamePolicy, header, footer, writableByteChannelFactory));
+ } else {
+ write = WriteFiles.to(
+ new TextSink(filenamePrefix, filenameSuffix, header, footer, shardTemplate,
+ writableByteChannelFactory));
}
-
- public Bound withWindowedWrites() {
- return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
- shardTemplate, writableByteChannelFactory, filenamePolicy, true);
+ if (getNumShards() > 0) {
+ write = write.withNumShards(getNumShards());
}
-
- @Override
- public PDone expand(PCollection<String> input) {
- if (filenamePolicy == null && filenamePrefix == null) {
- throw new IllegalStateException(
- "need to set the filename prefix of an TextIO.Write transform");
- }
- if (filenamePolicy != null && filenamePrefix != null) {
- throw new IllegalStateException(
- "cannot set both a filename policy and a filename prefix");
- }
- WriteFiles<String> write = null;
- if (filenamePolicy != null) {
- write = WriteFiles.to(
- new TextSink(filenamePolicy, header, footer, writableByteChannelFactory));
- } else {
- write = WriteFiles.to(
- new TextSink(filenamePrefix, filenameSuffix, header, footer, shardTemplate,
- writableByteChannelFactory));
- }
- if (getNumShards() > 0) {
- write = write.withNumShards(getNumShards());
- }
- if (windowedWrites) {
- write = write.withWindowedWrites();
- }
- return input.apply("WriteFiles", write);
+ if (windowedWrites) {
+ write = write.withWindowedWrites();
}
+ return input.apply("WriteFiles", write);
+ }
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
- String prefixString = "";
- if (filenamePrefix != null) {
- prefixString = filenamePrefix.isAccessible()
- ? filenamePrefix.get() : filenamePrefix.toString();
- }
- builder
- .addIfNotNull(DisplayData.item("filePrefix", prefixString)
- .withLabel("Output File Prefix"))
- .addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix)
- .withLabel("Output File Suffix"), "")
- .addIfNotDefault(DisplayData.item("shardNameTemplate", shardTemplate)
- .withLabel("Output Shard Name Template"),
- DEFAULT_SHARD_TEMPLATE)
- .addIfNotDefault(DisplayData.item("numShards", numShards)
- .withLabel("Maximum Output Shards"), 0)
- .addIfNotNull(DisplayData.item("fileHeader", header)
- .withLabel("File Header"))
- .addIfNotNull(DisplayData.item("fileFooter", footer)
- .withLabel("File Footer"))
- .add(DisplayData
- .item("writableByteChannelFactory", writableByteChannelFactory.toString())
- .withLabel("Compression/Transformation Type"));
+ String prefixString = "";
+ if (filenamePrefix != null) {
+ prefixString = filenamePrefix.isAccessible()
+ ? filenamePrefix.get() : filenamePrefix.toString();
}
+ builder
+ .addIfNotNull(DisplayData.item("filePrefix", prefixString)
+ .withLabel("Output File Prefix"))
+ .addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix)
+ .withLabel("Output File Suffix"), "")
+ .addIfNotDefault(DisplayData.item("shardNameTemplate", shardTemplate)
+ .withLabel("Output Shard Name Template"),
+ DEFAULT_SHARD_TEMPLATE)
+ .addIfNotDefault(DisplayData.item("numShards", numShards)
+ .withLabel("Maximum Output Shards"), 0)
+ .addIfNotNull(DisplayData.item("fileHeader", header)
+ .withLabel("File Header"))
+ .addIfNotNull(DisplayData.item("fileFooter", footer)
+ .withLabel("File Footer"))
+ .add(DisplayData
+ .item("writableByteChannelFactory", writableByteChannelFactory.toString())
+ .withLabel("Compression/Transformation Type"));
+ }
- /**
- * Returns the current shard name template string.
- */
- public String getShardNameTemplate() {
- return shardTemplate;
- }
+ /**
+ * Returns the current shard name template string.
+ */
+ public String getShardNameTemplate() {
+ return shardTemplate;
+ }
- @Override
- protected Coder<Void> getDefaultOutputCoder() {
- return VoidCoder.of();
- }
+ @Override
+ protected Coder<Void> getDefaultOutputCoder() {
+ return VoidCoder.of();
+ }
- public String getFilenamePrefix() {
- return filenamePrefix.get();
- }
+ public String getFilenamePrefix() {
+ return filenamePrefix.get();
+ }
- public String getShardTemplate() {
- return shardTemplate;
- }
+ public String getShardTemplate() {
+ return shardTemplate;
+ }
- public int getNumShards() {
- return numShards;
- }
+ public int getNumShards() {
+ return numShards;
+ }
- public String getFilenameSuffix() {
- return filenameSuffix;
- }
+ public String getFilenameSuffix() {
+ return filenameSuffix;
+ }
- @Nullable
- public String getHeader() {
- return header;
- }
+ @Nullable
+ public String getHeader() {
+ return header;
+ }
- @Nullable
- public String getFooter() {
- return footer;
- }
+ @Nullable
+ public String getFooter() {
+ return footer;
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
index c65d7dd..3fc8e32 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
@@ -29,7 +29,7 @@
* and {@code Write} transforms that persist PCollections to external storage:
* <pre> {@code
* PCollection<Integer> numbers = ...;
- * numbers.apply(TextIO.Write.to("gs://my_bucket/path/to/numbers"));
+ * numbers.apply(TextIO.write().to("gs://my_bucket/path/to/numbers"));
* } </pre>
*/
package org.apache.beam.sdk.io;
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 8a7965c..095b69f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -299,8 +299,8 @@ public class TextIOTest {
PCollection<String> input =
p.apply(Create.of(Arrays.asList(elems)).withCoder(StringUtf8Coder.of()));
- TextIO.Write.Bound write =
- TextIO.Write.to(baseFilename)
+ TextIO.Write write =
+ TextIO.write().to(baseFilename)
.withHeader(header)
.withFooter(footer);
@@ -463,7 +463,7 @@ public class TextIOTest {
final WritableByteChannelFactory writableByteChannelFactory =
new DrunkWritableByteChannelFactory();
- TextIO.Write.Bound write = TextIO.Write.to(baseDir.resolve(outputName).toString())
+ TextIO.Write write = TextIO.write().to(baseDir.resolve(outputName).toString())
.withoutSharding().withWritableByteChannelFactory(writableByteChannelFactory);
DisplayData displayData = DisplayData.from(write);
assertThat(displayData, hasDisplayItem("writableByteChannelFactory", "DRUNK"));
@@ -483,7 +483,7 @@ public class TextIOTest {
@Test
public void testWriteDisplayData() {
- TextIO.Write.Bound write = TextIO.Write
+ TextIO.Write write = TextIO.write()
.to("foo")
.withSuffix("bar")
.withShardNameTemplate("-SS-of-NN-")
@@ -504,7 +504,7 @@ public class TextIOTest {
@Test
public void testWriteDisplayDataValidateThenHeader() {
- TextIO.Write.Bound write = TextIO.Write
+ TextIO.Write write = TextIO.write()
.to("foo")
.withHeader("myHeader");
@@ -515,7 +515,7 @@ public class TextIOTest {
@Test
public void testWriteDisplayDataValidateThenFooter() {
- TextIO.Write.Bound write = TextIO.Write
+ TextIO.Write write = TextIO.write()
.to("foo")
.withFooter("myFooter");
@@ -534,7 +534,7 @@ public class TextIOTest {
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
- TextIO.Write.Bound write = TextIO.Write.to(outputPath);
+ TextIO.Write write = TextIO.write().to(outputPath);
Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
assertThat("TextIO.Write should include the file prefix in its primitive display data",
@@ -553,7 +553,7 @@ public class TextIOTest {
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Output name components are not allowed to contain");
- input.apply(TextIO.Write.to(filename));
+ input.apply(TextIO.write().to(filename));
}
/** Options for testing. */
@@ -573,7 +573,7 @@ public class TextIOTest {
p
.apply(TextIO.read().from(options.getInput()))
- .apply(TextIO.Write.to(options.getOutput()));
+ .apply(TextIO.write().to(options.getOutput()));
}
@Test
@@ -826,7 +826,7 @@ public class TextIOTest {
@Test
public void testTextIOGetName() {
assertEquals("TextIO.Read", TextIO.read().from("somefile").getName());
- assertEquals("TextIO.Write", TextIO.Write.to("somefile").getName());
+ assertEquals("TextIO.Write", TextIO.write().to("somefile").getName());
assertEquals("TextIO.Read", TextIO.read().from("somefile").toString());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
index 29d9774..6c3aba2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
@@ -124,7 +124,7 @@ public class TransformTreeTest {
p.apply("ReadMyFile", TextIO.read().from(inputFile.getPath()))
.apply(sample)
.apply(Flatten.<String>iterables())
- .apply("WriteMyFile", TextIO.Write.to(outputFile.getPath()));
+ .apply("WriteMyFile", TextIO.write().to(outputFile.getPath()));
final EnumSet<TransformsSeen> visited =
EnumSet.noneOf(TransformsSeen.class);
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
index c685a63..411f913 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
@@ -137,7 +137,7 @@ public class NameUtilsTest {
assertEquals(
"NameUtilsTest.SomeTransform",
NameUtils.approximatePTransformName(AutoValue_NameUtilsTest_SomeTransform.class));
- assertEquals("TextIO.Write", NameUtils.approximatePTransformName(TextIO.Write.Bound.class));
+ assertEquals("TextIO.Write", NameUtils.approximatePTransformName(TextIO.Write.class));
}
@AutoValue
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
index 7c9d1d9..b07a5b8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
@@ -71,7 +71,7 @@ public class PDoneTest {
return
begin
.apply(Create.of(LINES))
- .apply(TextIO.Write.to(filename));
+ .apply(TextIO.write().to(filename));
}
}