You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/20 21:49:30 UTC
[3/4] beam git commit: Remove Sink in favor of FileBasedSink
Remove Sink in favor of FileBasedSink
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6a6a1a8c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6a6a1a8c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6a6a1a8c
Branch: refs/heads/master
Commit: 6a6a1a8c0d39965e540dbe74ddf73c839ca46889
Parents: 8319369
Author: Reuven Lax <re...@google.com>
Authored: Wed Apr 5 12:13:44 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Apr 20 13:49:36 2017 -0700
----------------------------------------------------------------------
.../core/construction/PTransformMatchers.java | 6 +-
.../construction/PTransformMatchersTest.java | 16 +-
.../direct/WriteWithShardingFactory.java | 15 +-
.../direct/WriteWithShardingFactoryTest.java | 18 +-
.../beam/runners/flink/WriteSinkITCase.java | 192 -----
.../beam/runners/dataflow/DataflowRunner.java | 16 +-
.../DataflowPipelineTranslatorTest.java | 2 +-
.../java/org/apache/beam/sdk/io/AvroIO.java | 6 +-
.../org/apache/beam/sdk/io/FileBasedSink.java | 93 +--
.../main/java/org/apache/beam/sdk/io/Sink.java | 295 --------
.../java/org/apache/beam/sdk/io/TFRecordIO.java | 6 +-
.../java/org/apache/beam/sdk/io/TextIO.java | 8 +-
.../main/java/org/apache/beam/sdk/io/Write.java | 603 ----------------
.../java/org/apache/beam/sdk/io/WriteFiles.java | 555 +++++++++++++++
.../main/java/org/apache/beam/sdk/io/XmlIO.java | 37 +-
.../java/org/apache/beam/sdk/io/XmlSink.java | 4 +-
.../java/org/apache/beam/sdk/values/PDone.java | 3 +-
.../apache/beam/sdk/io/FileBasedSinkTest.java | 73 --
.../java/org/apache/beam/sdk/io/SimpleSink.java | 95 +++
.../org/apache/beam/sdk/io/WriteFilesTest.java | 457 ++++++++++++
.../java/org/apache/beam/sdk/io/WriteTest.java | 705 -------------------
.../beam/sdk/runners/TransformTreeTest.java | 6 +-
.../apache/beam/sdk/io/hdfs/HDFSFileSink.java | 6 +-
.../java/org/apache/beam/sdk/io/hdfs/Sink.java | 195 +++++
.../java/org/apache/beam/sdk/io/hdfs/Write.java | 582 +++++++++++++++
.../beam/sdk/io/hdfs/HDFSFileSinkTest.java | 1 -
26 files changed, 2014 insertions(+), 1981 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
index 09946bc..b2bd7d9 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
@@ -22,7 +22,7 @@ import java.util.HashSet;
import java.util.Set;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.io.Write;
+import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.runners.PTransformMatcher;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.DoFn;
@@ -268,8 +268,8 @@ public class PTransformMatchers {
return new PTransformMatcher() {
@Override
public boolean matches(AppliedPTransform<?, ?, ?> application) {
- if (application.getTransform() instanceof Write) {
- Write write = (Write) application.getTransform();
+ if (application.getTransform() instanceof WriteFiles) {
+ WriteFiles write = (WriteFiles) application.getTransform();
return write.getSharding() == null && write.getNumShards() == null;
}
return false;
http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index 4084cdc..d9bc1e7 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -30,7 +30,7 @@ import java.util.Collections;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.FileBasedSink;
-import org.apache.beam.sdk.io.Write;
+import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.PTransformMatcher;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -499,8 +499,8 @@ public class PTransformMatchersTest implements Serializable {
@Test
public void writeWithRunnerDeterminedSharding() {
- Write<Integer> write =
- Write.to(
+ WriteFiles<Integer> write =
+ WriteFiles.to(
new FileBasedSink<Integer>("foo", "bar") {
@Override
public FileBasedWriteOperation<Integer> createWriteOperation(
@@ -512,13 +512,13 @@ public class PTransformMatchersTest implements Serializable {
PTransformMatchers.writeWithRunnerDeterminedSharding().matches(appliedWrite(write)),
is(true));
- Write<Integer> withStaticSharding = write.withNumShards(3);
+ WriteFiles<Integer> withStaticSharding = write.withNumShards(3);
assertThat(
PTransformMatchers.writeWithRunnerDeterminedSharding()
.matches(appliedWrite(withStaticSharding)),
is(false));
- Write<Integer> withCustomSharding =
+ WriteFiles<Integer> withCustomSharding =
write.withSharding(Sum.integersGlobally().asSingletonView());
assertThat(
PTransformMatchers.writeWithRunnerDeterminedSharding()
@@ -526,9 +526,9 @@ public class PTransformMatchersTest implements Serializable {
is(false));
}
- private AppliedPTransform<?, ?, ?> appliedWrite(Write<Integer> write) {
- return AppliedPTransform.<PCollection<Integer>, PDone, Write<Integer>>of(
- "Write",
+ private AppliedPTransform<?, ?, ?> appliedWrite(WriteFiles<Integer> write) {
+ return AppliedPTransform.<PCollection<Integer>, PDone, WriteFiles<Integer>>of(
+ "WriteFiles",
Collections.<TupleTag<?>, PValue>emptyMap(),
Collections.<TupleTag<?>, PValue>emptyMap(),
write,
http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
index a23ab94..24462e5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
@@ -26,7 +26,7 @@ import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.runners.core.construction.PTransformReplacements;
-import org.apache.beam.sdk.io.Write;
+import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Count;
@@ -43,18 +43,19 @@ import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
/**
- * A {@link PTransformOverrideFactory} that overrides {@link Write} {@link PTransform PTransforms}
- * with an unspecified number of shards with a write with a specified number of shards. The number
- * of shards is the log base 10 of the number of input records, with up to 2 additional shards.
+ * A {@link PTransformOverrideFactory} that overrides {@link WriteFiles}
+ * {@link PTransform PTransforms} with an unspecified number of shards with a write with a
+ * specified number of shards. The number of shards is the log base 10 of the number of input
+ * records, with up to 2 additional shards.
*/
class WriteWithShardingFactory<InputT>
- implements PTransformOverrideFactory<PCollection<InputT>, PDone, Write<InputT>> {
+ implements PTransformOverrideFactory<PCollection<InputT>, PDone, WriteFiles<InputT>> {
static final int MAX_RANDOM_EXTRA_SHARDS = 3;
@VisibleForTesting static final int MIN_SHARDS_FOR_LOG = 3;
@Override
public PTransformReplacement<PCollection<InputT>, PDone> getReplacementTransform(
- AppliedPTransform<PCollection<InputT>, PDone, Write<InputT>> transform) {
+ AppliedPTransform<PCollection<InputT>, PDone, WriteFiles<InputT>> transform) {
return PTransformReplacement.of(
PTransformReplacements.getSingletonMainInput(transform),
@@ -108,7 +109,7 @@ class WriteWithShardingFactory<InputT>
private int calculateShards(long totalRecords) {
if (totalRecords == 0) {
- // Write out at least one shard, even if there is no input.
+ // WriteFiles out at least one shard, even if there is no input.
return 1;
}
// Windows get their own number of random extra shards. This is stored in a side input, so
http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index 361850d..b0c9f6d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -39,9 +39,9 @@ import java.util.UUID;
import org.apache.beam.runners.direct.WriteWithShardingFactory.CalculateShardsFn;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.Sink;
+import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.io.Write;
+import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -84,7 +84,7 @@ public class WriteWithShardingFactoryTest {
String fileName = "resharded_write";
String outputPath = tmp.getRoot().getAbsolutePath();
String targetLocation = IOChannelUtils.resolve(outputPath, fileName);
- // TextIO is implemented in terms of the Write PTransform. When sharding is not specified,
+ // TextIO is implemented in terms of the WriteFiles PTransform. When sharding is not specified,
// resharding should be automatically applied
p.apply(Create.of(strs)).apply(TextIO.Write.to(targetLocation));
@@ -121,10 +121,10 @@ public class WriteWithShardingFactoryTest {
@Test
public void withNoShardingSpecifiedReturnsNewTransform() {
- Write<Object> original = Write.to(new TestSink());
+ WriteFiles<Object> original = WriteFiles.to(new TestSink());
PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));
- AppliedPTransform<PCollection<Object>, PDone, Write<Object>> originalApplication =
+ AppliedPTransform<PCollection<Object>, PDone, WriteFiles<Object>> originalApplication =
AppliedPTransform.of(
"write", objs.expand(), Collections.<TupleTag<?>, PValue>emptyMap(), original, p);
@@ -207,12 +207,16 @@ public class WriteWithShardingFactoryTest {
assertThat(shards, containsInAnyOrder(13));
}
- private static class TestSink extends Sink<Object> {
+ private static class TestSink extends FileBasedSink<Object> {
+ public TestSink() {
+ super("", "");
+ }
+
@Override
public void validate(PipelineOptions options) {}
@Override
- public WriteOperation<Object, ?> createWriteOperation(PipelineOptions options) {
+ public FileBasedWriteOperation<Object> createWriteOperation(PipelineOptions options) {
throw new IllegalArgumentException("Should not be used");
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/runners/flink/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
deleted file mode 100644
index 38b790e..0000000
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.flink;
-
-import static org.junit.Assert.assertNotNull;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableList;
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.net.URI;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.Sink;
-import org.apache.beam.sdk.io.Write;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * Tests the translation of custom Write sinks.
- */
-public class WriteSinkITCase extends JavaProgramTestBase {
-
- protected String resultPath;
-
- public WriteSinkITCase(){
- }
-
- static final String[] EXPECTED_RESULT = new String[] {
- "Joe red 3", "Mary blue 4", "Max yellow 23"};
-
- @Override
- protected void preSubmit() throws Exception {
- resultPath = getTempDirPath("result-" + System.nanoTime());
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
- }
-
- @Override
- protected void testProgram() throws Exception {
- runProgram(resultPath);
- }
-
- @Override
- public void stopCluster() throws Exception {
- try {
- super.stopCluster();
- } catch (final IOException ioe) {
- if (ioe.getMessage().startsWith("Unable to delete file")) {
- // that's ok for the test itself, just the OS playing with us on cleanup phase
- }
- }
- }
-
- private static void runProgram(String resultPath) {
- Pipeline p = FlinkTestPipeline.createForBatch();
-
- p.apply(Create.of(ImmutableList.copyOf(EXPECTED_RESULT))).setCoder(StringUtf8Coder.of())
- .apply("CustomSink", Write.to(new MyCustomSink(resultPath)));
-
- p.run();
- }
-
- /**
- * Simple custom sink which writes to a file.
- */
- private static class MyCustomSink extends Sink<String> {
-
- private final String resultPath;
-
- public MyCustomSink(String resultPath) {
- this.resultPath = resultPath;
- }
-
- @Override
- public void validate(PipelineOptions options) {
- assertNotNull(options);
- }
-
- @Override
- public WriteOperation<String, ?> createWriteOperation(PipelineOptions options) {
- return new MyWriteOperation();
- }
-
- private class MyWriteOperation extends WriteOperation<String, String> {
-
- @Override
- public Coder<String> getWriterResultCoder() {
- return StringUtf8Coder.of();
- }
-
- @Override
- public void initialize(PipelineOptions options) throws Exception {
-
- }
-
- @Override
- public void setWindowedWrites(boolean windowedWrites) {
-
- }
-
- @Override
- public void finalize(Iterable<String> writerResults, PipelineOptions options)
- throws Exception {
-
- }
-
- @Override
- public Writer<String, String> createWriter(PipelineOptions options) throws Exception {
- return new MyWriter();
- }
-
- @Override
- public Sink<String> getSink() {
- return MyCustomSink.this;
- }
-
- /**
- * Simple Writer which writes to a file.
- */
- private class MyWriter extends Writer<String, String> {
-
- private PrintWriter internalWriter;
-
- @Override
- public final void openWindowed(String uId,
- BoundedWindow window,
- PaneInfo paneInfo,
- int shard,
- int numShards) throws Exception {
- throw new UnsupportedOperationException("Windowed writes not supported.");
- }
-
- @Override
- public final void openUnwindowed(String uId, int shard, int numShards) throws Exception {
- Path path = new Path(resultPath + "/" + uId);
- FileSystem.get(new URI("file:///")).create(path, false);
- internalWriter = new PrintWriter(new File(path.toUri()));
- }
-
- @Override
- public void cleanup() throws Exception {
-
- }
-
- @Override
- public void write(String value) throws Exception {
- internalWriter.println(value);
- }
-
- @Override
- public String close() throws Exception {
- internalWriter.close();
- return resultPath;
- }
-
- @Override
- public WriteOperation<String, String> getWriteOperation() {
- return MyWriteOperation.this;
- }
- }
- }
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 4eec6b8..19ea529 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -85,7 +85,7 @@ import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.io.Write;
+import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -341,10 +341,10 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
PTransformMatchers.stateOrTimerParDoSingle(),
BatchStatefulParDoOverrides.singleOutputOverrideFactory()))
- // Write uses views internally
+ // WriteFiles uses views internally
.add(
PTransformOverride.of(
- PTransformMatchers.classEqualTo(Write.class), new BatchWriteFactory(this)))
+ PTransformMatchers.classEqualTo(WriteFiles.class), new BatchWriteFactory(this)))
.add(
PTransformOverride.of(
PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
@@ -803,7 +803,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
private class BatchWriteFactory<T>
- implements PTransformOverrideFactory<PCollection<T>, PDone, Write<T>> {
+ implements PTransformOverrideFactory<PCollection<T>, PDone, WriteFiles<T>> {
private final DataflowRunner runner;
private BatchWriteFactory(DataflowRunner dataflowRunner) {
this.runner = dataflowRunner;
@@ -811,7 +811,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
@Override
public PTransformReplacement<PCollection<T>, PDone> getReplacementTransform(
- AppliedPTransform<PCollection<T>, PDone, Write<T>> transform) {
+ AppliedPTransform<PCollection<T>, PDone, WriteFiles<T>> transform) {
return PTransformReplacement.of(
PTransformReplacements.getSingletonMainInput(transform),
new BatchWrite<>(runner, transform.getTransform()));
@@ -826,17 +826,17 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
/**
* Specialized implementation which overrides
- * {@link org.apache.beam.sdk.io.Write Write} to provide Google
+ * {@link WriteFiles WriteFiles} to provide Google
* Cloud Dataflow specific path validation of {@link FileBasedSink}s.
*/
private static class BatchWrite<T> extends PTransform<PCollection<T>, PDone> {
private final DataflowRunner runner;
- private final Write<T> transform;
+ private final WriteFiles<T> transform;
/**
* Builds an instance of this class from the overridden transform.
*/
@SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
- public BatchWrite(DataflowRunner runner, Write<T> transform) {
+ public BatchWrite(DataflowRunner runner, WriteFiles<T> transform) {
this.runner = runner;
this.transform = transform;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 5016d88..b779f0f 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -532,7 +532,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
Collections.<DataflowPackage>emptyList())
.getJob();
- assertEquals(13, job.getSteps().size());
+ assertEquals(8, job.getSteps().size());
Step step = job.getSteps().get(1);
assertEquals(stepName, getString(step.getProperties(), PropertyNames.USER_NAME));
assertAllStepOutputsHaveUniqueIds(job);
http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 33fe323..5cba3f3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -886,16 +886,16 @@ public class AvroIO {
throw new IllegalStateException("need to set the schema of an AvroIO.Write transform");
}
- org.apache.beam.sdk.io.Write<T> write = null;
+ WriteFiles<T> write = null;
if (filenamePolicy != null) {
- write = org.apache.beam.sdk.io.Write.to(
+ write = WriteFiles.to(
new AvroSink<>(
filenamePolicy,
AvroCoder.of(type, schema),
codec,
metadata));
} else {
- write = org.apache.beam.sdk.io.Write.to(
+ write = WriteFiles.to(
new AvroSink<>(
filenamePrefix,
filenameSuffix,
http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 9b5f130..d9682e7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -57,6 +57,7 @@ import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.IOChannelFactory;
@@ -70,12 +71,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Abstract {@link Sink} for file-based output. An implementation of FileBasedSink writes file-based
+ * Abstract class for file-based output. An implementation of FileBasedSink writes file-based
* output and defines the format of output files (how values are written, headers/footers, MIME
* type, etc.).
*
* <p>At pipeline construction time, the methods of FileBasedSink are called to validate the sink
- * and to create a {@link Sink.WriteOperation} that manages the process of writing to the sink.
+ * and to create a {@link FileBasedWriteOperation} that manages the process of writing to the sink.
*
* <p>The process of writing to file-based sink is as follows:
* <ol>
@@ -84,11 +85,28 @@ import org.slf4j.LoggerFactory;
* <li>these temporary files are renamed with final output filenames.
* </ol>
*
+ * <p>In order to ensure fault-tolerance, a bundle may be executed multiple times (e.g., in the
+ * event of failure/retry or for redundancy). However, exactly one of these executions will have its
+ * result passed to the finalize method. Each call to {@link FileBasedWriter#openWindowed}
+ * or {@link FileBasedWriter#openUnwindowed} is passed a unique <i>bundle id</i> when it is called
+ * by the WriteFiles transform, so even redundant or retried bundles will have a unique way of
+ * identifying
+ * their output.
+ *
+ * <p>The bundle id should be used to guarantee that a bundle's output is unique. This uniqueness
+ * guarantee is important; if a bundle is to be output to a file, for example, the name of the file
+ * will encode the unique bundle id to avoid conflicts with other writers.
+ *
+ * {@link FileBasedSink} can take a custom {@link FilenamePolicy} object to determine output
+ * filenames, and this policy object can be used to write windowed or triggered
+ * PCollections into separate files per window pane. This allows file output from unbounded
+ * PCollections, and also works for bounded PCollecctions.
+ *
* <p>Supported file systems are those registered with {@link IOChannelUtils}.
*
* @param <T> the type of values written to the sink.
*/
-public abstract class FileBasedSink<T> extends Sink<T> {
+public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
private static final Logger LOG = LoggerFactory.getLogger(FileBasedSink.class);
/**
@@ -385,19 +403,15 @@ public abstract class FileBasedSink<T> extends Sink<T> {
return fileNamePolicy;
}
- @Override
public void validate(PipelineOptions options) {}
/**
* Return a subclass of {@link FileBasedSink.FileBasedWriteOperation} that will manage the write
* to the sink.
*/
- @Override
public abstract FileBasedWriteOperation<T> createWriteOperation(PipelineOptions options);
- @Override
public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
getFileNamePolicy().populateDisplayData(builder);
}
@@ -417,8 +431,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
}
/**
- * Abstract {@link Sink.WriteOperation} that manages the process of writing to a
- * {@link FileBasedSink}.
+ * Abstract operation that manages the process of writing to {@link FileBasedSink}.
*
* <p>The primary responsibilities of the FileBasedWriteOperation is the management of output
* files. During a write, {@link FileBasedSink.FileBasedWriter}s write bundles to temporary file
@@ -457,7 +470,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
*
* @param <T> the type of values written to the sink.
*/
- public abstract static class FileBasedWriteOperation<T> extends WriteOperation<T, FileResult> {
+ public abstract static class FileBasedWriteOperation<T> implements Serializable {
/**
* The Sink that this WriteOperation will write to.
*/
@@ -531,27 +544,18 @@ public abstract class FileBasedSink<T> extends Sink<T> {
/**
* Clients must implement to return a subclass of {@link FileBasedSink.FileBasedWriter}. This
- * method must satisfy the restrictions placed on implementations of
- * {@link Sink.WriteOperation#createWriter}. Namely, it must not mutate the state of the object.
+ * method must not mutate the state of the object.
*/
- @Override
public abstract FileBasedWriter<T> createWriter(PipelineOptions options) throws Exception;
- @Override
+ /**
+ * Indicates that the operation will be performing windowed writes.
+ */
public void setWindowedWrites(boolean windowedWrites) {
this.windowedWrites = windowedWrites;
}
/**
- * Initialization of the sink. Default implementation is a no-op. May be overridden by subclass
- * implementations to perform initialization of the sink at pipeline runtime. This method must
- * be idempotent and is subject to the same implementation restrictions as
- * {@link Sink.WriteOperation#initialize}.
- */
- @Override
- public void initialize(PipelineOptions options) throws Exception {}
-
- /**
* Finalizes writing by copying temporary output files to their final location and optionally
* removing temporary files.
*
@@ -565,10 +569,8 @@ public abstract class FileBasedSink<T> extends Sink<T> {
*
* @param writerResults the results of writes (FileResult).
*/
- @Override
public void finalize(Iterable<FileResult> writerResults,
- PipelineOptions options)
- throws Exception {
+ PipelineOptions options) throws Exception {
// Collect names of temporary files and rename them.
Map<String, String> outputFilenames = buildOutputFilenames(writerResults);
copyToOutputFiles(outputFilenames, options);
@@ -696,24 +698,22 @@ public abstract class FileBasedSink<T> extends Sink<T> {
/**
* Provides a coder for {@link FileBasedSink.FileResult}.
*/
- @Override
- public Coder<FileResult> getWriterResultCoder() {
+ public final Coder<FileResult> getFileResultCoder() {
return FileResultCoder.of();
}
/**
* Returns the FileBasedSink for this write operation.
*/
- @Override
public FileBasedSink<T> getSink() {
return sink;
}
}
/**
- * Abstract {@link Sink.Writer} that writes a bundle to a {@link FileBasedSink}. Subclass
- * implementations provide a method that can write a single value to a {@link WritableByteChannel}
- * ({@link Sink.Writer#write}).
+ * Abstract writer that writes a bundle to a {@link FileBasedSink}. Subclass
+ * implementations provide a method that can write a single value to a
+ * {@link WritableByteChannel}.
*
* <p>Subclass implementations may also override methods that write headers and footers before and
* after the values in a bundle, respectively, as well as provide a MIME type for the output
@@ -724,7 +724,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
*
* @param <T> the type of values to write.
*/
- public abstract static class FileBasedWriter<T> extends Writer<T, FileResult> {
+ public abstract static class FileBasedWriter<T> {
private static final Logger LOG = LoggerFactory.getLogger(FileBasedWriter.class);
final FileBasedWriteOperation<T> writeOperation;
@@ -793,9 +793,17 @@ public abstract class FileBasedSink<T> extends Sink<T> {
protected void finishWrite() throws Exception {}
/**
- * Opens the channel.
+ * Performs bundle initialization. For example, creates a temporary file for writing or
+ * initializes any state that will be used across calls to {@link FileBasedWriter#write}.
+ *
+ * <p>The unique id that is given to open should be used to ensure that the writer's output
+ * does not interfere with the output of other Writers, as a bundle may be executed many
+ * times for fault tolerance.
+ *
+ * <p>The window and paneInfo arguments are populated when windowed writes are requested.
+ * shard and numShards are populated for the case of static sharding. In cases where the
+ * runner is dynamically picking sharding, shard and numShards might both be set to -1.
*/
- @Override
public final void openWindowed(String uId,
BoundedWindow window,
PaneInfo paneInfo,
@@ -807,7 +815,15 @@ public abstract class FileBasedSink<T> extends Sink<T> {
open(uId, window, paneInfo, shard, numShards);
}
- @Override
+ /**
+ * Called for each value in the bundle.
+ */
+ public abstract void write(T value) throws Exception;
+
+ /**
+ * Similar to {@link #openWindowed} however for the case where unwindowed writes were
+ * requested.
+ */
public final void openUnwindowed(String uId,
int shard,
int numShards) throws Exception {
@@ -854,8 +870,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
LOG.debug("Starting write of bundle {} to {}.", this.id, filename);
}
- @Override
- public void cleanup() throws Exception {
+ public final void cleanup() throws Exception {
if (filename != null) {
IOChannelUtils.getFactory(filename).remove(Lists.<String>newArrayList(filename));
}
@@ -864,7 +879,6 @@ public abstract class FileBasedSink<T> extends Sink<T> {
/**
* Closes the channel and returns the bundle result.
*/
- @Override
public final FileResult close() throws Exception {
try (WritableByteChannel theChannel = channel) {
LOG.debug("Writing footer to {}.", filename);
@@ -892,7 +906,6 @@ public abstract class FileBasedSink<T> extends Sink<T> {
/**
* Return the FileBasedWriteOperation that this Writer belongs to.
*/
- @Override
public FileBasedWriteOperation<T> getWriteOperation() {
return writeOperation;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Sink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Sink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Sink.java
deleted file mode 100644
index ba1afbb..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Sink.java
+++ /dev/null
@@ -1,295 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io;
-
-import java.io.Serializable;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.HasDisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.values.PCollection;
-
-/**
- * A {@code Sink} represents a resource that can be written to using the {@link Write} transform.
- *
- * <p>A parallel write to a {@code Sink} consists of three phases:
- * <ol>
- * <li>A sequential <i>initialization</i> phase (e.g., creating a temporary output directory, etc.)
- * <li>A <i>parallel write</i> phase where workers write bundles of records
- * <li>A sequential <i>finalization</i> phase (e.g., committing the writes, merging output files,
- * etc.)
- * </ol>
- *
- * <p>The {@link Write} transform can be used in a pipeline to perform this write.
- * Specifically, a Write transform can be applied to a {@link PCollection} {@code p} by:
- *
- * <p>{@code p.apply(Write.to(new MySink()));}
- *
- * <p>Implementing a {@link Sink} and the corresponding write operations requires extending three
- * abstract classes:
- *
- * <ul>
- * <li>{@link Sink}: an immutable logical description of the location/resource to write to.
- * Depending on the type of sink, it may contain fields such as the path to an output directory
- * on a filesystem, a database table name, etc. Implementors of {@link Sink} must
- * implement two methods: {@link Sink#validate} and {@link Sink#createWriteOperation}.
- * {@link Sink#validate Validate} is called by the Write transform at pipeline creation, and should
- * validate that the Sink can be written to. The createWriteOperation method is also called at
- * pipeline creation, and should return a WriteOperation object that defines how to write to the
- * Sink. Note that implementations of Sink must be serializable and Sinks must be immutable.
- *
- * <li>{@link WriteOperation}: The WriteOperation implements the <i>initialization</i> and
- * <i>finalization</i> phases of a write. Implementors of {@link WriteOperation} must implement
- * corresponding {@link WriteOperation#initialize} and {@link WriteOperation#finalize} methods. A
- * WriteOperation must also implement {@link WriteOperation#createWriter} that creates Writers,
- * {@link WriteOperation#getWriterResultCoder} that returns a {@link Coder} for the result of a
- * parallel write, and a {@link WriteOperation#getSink} that returns the Sink that the write
- * operation corresponds to. See below for more information about these methods and restrictions on
- * their implementation.
- *
- * <li>{@link Writer}: A Writer writes a bundle of records. Writer defines several methods:
- * {@link Writer#openWindowed} and {@link Writer#openUnwindowed}, which are called once at the
- * start of writing a bundle, depending on whether windowed or unwindowed output is requested.
- * {@link Writer#write}, which writes a single record from the bundle; {@link Writer#close},
- * which is called once at the end of writing a bundle; and {@link Writer#getWriteOperation},
- * which returns the write operation that the writer belongs to.
- * </ul>
- *
- * <h2>WriteOperation</h2>
- *
- * <p>{@link WriteOperation#initialize} and {@link WriteOperation#finalize} are conceptually called
- * once: at the beginning and end of a Write transform. However, implementors must ensure that these
- * methods are idempotent, as they may be called multiple times on different machines in the case of
- * failure/retry or for redundancy.
- *
- * <p>The finalize method of WriteOperation is passed an Iterable of a writer result type. This
- * writer result type should encode the result of a write and, in most cases, some encoding of the
- * unique bundle id.
- *
- * <p>All implementations of {@link WriteOperation} must be serializable.
- *
- * <p>WriteOperation may have mutable state. For instance, {@link WriteOperation#initialize} may
- * mutate the object state. These mutations will be visible in {@link WriteOperation#createWriter}
- * and {@link WriteOperation#finalize} because the object will be serialized after initialize and
- * deserialized before these calls. However, it is not serialized again after createWriter is
- * called, as createWriter will be called within workers to create Writers for the bundles that are
- * distributed to these workers. Therefore, newWriter should not mutate the WriteOperation state (as
- * these mutations will not be visible in finalize).
- *
- * <h2>Bundle Ids:</h2>
- *
- * <p>In order to ensure fault-tolerance, a bundle may be executed multiple times (e.g., in the
- * event of failure/retry or for redundancy). However, exactly one of these executions will have its
- * result passed to the WriteOperation's finalize method. Each call to {@link Writer#openWindowed}
- * or {@link Writer#openUnwindowed} is passed a unique <i>bundle id</i> when it is called by the
- * Write transform, so even redundant or retried bundles will have a unique way of identifying
- * their output.
- *
- * <p>The bundle id should be used to guarantee that a bundle's output is unique. This uniqueness
- * guarantee is important; if a bundle is to be output to a file, for example, the name of the file
- * must be unique to avoid conflicts with other Writers. The bundle id should be encoded in the
- * writer result returned by the Writer and subsequently used by the WriteOperation's finalize
- * method to identify the results of successful writes.
- *
- * <p>For example, consider the scenario where a Writer writes files containing serialized records
- * and the WriteOperation's finalization step is to merge or rename these output files. In this
- * case, a Writer may use its unique id to name its output file (to avoid conflicts) and return the
- * name of the file it wrote as its writer result. The WriteOperation will then receive an Iterable
- * of output file names that it can then merge or rename using some bundle naming scheme.
- *
- * <h2>Writer Results:</h2>
- *
- * <p>{@link WriteOperation}s and {@link Writer}s must agree on a writer result type that will be
- * returned by a Writer after it writes a bundle. This type can be a client-defined object or an
- * existing type; {@link WriteOperation#getWriterResultCoder} should return a {@link Coder} for the
- * type.
- *
- * <p>A note about thread safety: Any use of static members or methods in Writer should be thread
- * safe, as different instances of Writer objects may be created in different threads on the same
- * worker.
- *
- * @param <T> the type that will be written to the Sink.
- */
-@Experimental(Experimental.Kind.SOURCE_SINK)
-public abstract class Sink<T> implements Serializable, HasDisplayData {
- /**
- * Ensures that the sink is valid and can be written to before the write operation begins. One
- * should use {@link com.google.common.base.Preconditions} to implement this method.
- */
- public abstract void validate(PipelineOptions options);
-
- /**
- * Returns an instance of a {@link WriteOperation} that can write to this Sink.
- */
- public abstract WriteOperation<T, ?> createWriteOperation(PipelineOptions options);
-
- /**
- * {@inheritDoc}
- *
- * <p>By default, does not register any display data. Implementors may override this method
- * to provide their own display data.
- */
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {}
-
- /**
- * A {@link WriteOperation} defines the process of a parallel write of objects to a Sink.
- *
- * <p>The {@code WriteOperation} defines how to perform initialization and finalization of a
- * parallel write to a sink as well as how to create a {@link Sink.Writer} object that can write
- * a bundle to the sink.
- *
- * <p>Since operations in Beam may be run multiple times for redundancy or fault-tolerance,
- * the initialization and finalization defined by a WriteOperation <b>must be idempotent</b>.
- *
- * <p>{@code WriteOperation}s may be mutable; a {@code WriteOperation} is serialized after the
- * call to {@code initialize} method and deserialized before calls to
- * {@code createWriter} and {@code finalized}. However, it is not
- * reserialized after {@code createWriter}, so {@code createWriter} should not mutate the
- * state of the {@code WriteOperation}.
- *
- * <p>See {@link Sink} for more detailed documentation about the process of writing to a Sink.
- *
- * @param <T> The type of objects to write
- * @param <WriteT> The result of a per-bundle write
- */
- public abstract static class WriteOperation<T, WriteT> implements Serializable {
- /**
- * Performs initialization before writing to the sink. Called before writing begins.
- */
- public abstract void initialize(PipelineOptions options) throws Exception;
-
- /**
- * Indicates that the operation will be performing windowed writes.
- */
- public abstract void setWindowedWrites(boolean windowedWrites);
-
- /**
- * Given an Iterable of results from bundle writes, performs finalization after writing and
- * closes the sink. Called after all bundle writes are complete.
- *
- * <p>The results that are passed to finalize are those returned by bundles that completed
- * successfully. Although bundles may have been run multiple times (for fault-tolerance), only
- * one writer result will be passed to finalize for each bundle. An implementation of finalize
- * should perform clean up of any failed and successfully retried bundles. Note that these
- * failed bundles will not have their writer result passed to finalize, so finalize should be
- * capable of locating any temporary/partial output written by failed bundles.
- *
- * <p>A best practice is to make finalize atomic. If this is impossible given the semantics
- * of the sink, finalize should be idempotent, as it may be called multiple times in the case of
- * failure/retry or for redundancy.
- *
- * <p>Note that the iteration order of the writer results is not guaranteed to be consistent if
- * finalize is called multiple times.
- *
- * @param writerResults an Iterable of results from successful bundle writes.
- */
- public abstract void finalize(Iterable<WriteT> writerResults, PipelineOptions options)
- throws Exception;
-
- /**
- * Creates a new {@link Sink.Writer} to write a bundle of the input to the sink.
- *
- * <p>The bundle id that the writer will use to uniquely identify its output will be passed to
- * {@link Writer#openWindowed} or {@link Writer#openUnwindowed}.
- *
- * <p>Must not mutate the state of the WriteOperation.
- */
- public abstract Writer<T, WriteT> createWriter(PipelineOptions options) throws Exception;
-
- /**
- * Returns the Sink that this write operation writes to.
- */
- public abstract Sink<T> getSink();
-
- /**
- * Returns a coder for the writer result type.
- */
- public abstract Coder<WriteT> getWriterResultCoder();
- }
-
- /**
- * A Writer writes a bundle of elements from a PCollection to a sink.
- * {@link Writer#openWindowed} or {@link Writer#openUnwindowed} is called before writing begins
- * and {@link Writer#close} is called after all elements in the bundle have been written.
- * {@link Writer#write} writes an element to the sink.
- *
- * <p>Note that any access to static members or methods of a Writer must be thread-safe, as
- * multiple instances of a Writer may be instantiated in different threads on the same worker.
- *
- * <p>See {@link Sink} for more detailed documentation about the process of writing to a Sink.
- *
- * @param <T> The type of object to write
- * @param <WriteT> The writer results type (e.g., the bundle's output filename, as String)
- */
- public abstract static class Writer<T, WriteT> {
- /**
- * Performs bundle initialization. For example, creates a temporary file for writing or
- * initializes any state that will be used across calls to {@link Writer#write}.
- *
- * <p>The unique id that is given to open should be used to ensure that the writer's output does
- * not interfere with the output of other Writers, as a bundle may be executed many times for
- * fault tolerance. See {@link Sink} for more information about bundle ids.
- *
- * <p>The window and paneInfo arguments are populated when windowed writes are requested.
- * shard and numbShards are populated for the case of static sharding. In cases where the
- * runner is dynamically picking sharding, shard and numShards might both be set to -1.
- */
- public abstract void openWindowed(String uId,
- BoundedWindow window,
- PaneInfo paneInfo,
- int shard,
- int numShards) throws Exception;
-
- /**
- * Perform bundle initialization for the case where the file is written unwindowed.
- */
- public abstract void openUnwindowed(String uId,
- int shard,
- int numShards) throws Exception;
-
- public abstract void cleanup() throws Exception;
-
- /**
- * Called for each value in the bundle.
- */
- public abstract void write(T value) throws Exception;
-
- /**
- * Finishes writing the bundle. Closes any resources used for writing the bundle.
- *
- * <p>Returns a writer result that will be used in the {@link Sink.WriteOperation}'s
- * finalization. The result should contain some way to identify the output of this bundle (using
- * the bundle id). {@link WriteOperation#finalize} will use the writer result to identify
- * successful writes. See {@link Sink} for more information about bundle ids.
- *
- * @return the writer result
- */
- public abstract WriteT close() throws Exception;
-
- /**
- * Returns the write operation this writer belongs to.
- */
- public abstract WriteOperation<T, WriteT> getWriteOperation();
-
-
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index 748086d..34dbe21 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -343,9 +343,9 @@ public class TFRecordIO {
if (getFilenamePrefix() == null) {
throw new IllegalStateException(
"need to set the filename prefix of a TFRecordIO.Write transform");
- }
- org.apache.beam.sdk.io.Write<byte[]> write =
- org.apache.beam.sdk.io.Write.to(
+ }
+ org.apache.beam.sdk.io.WriteFiles<byte[]> write =
+ org.apache.beam.sdk.io.WriteFiles.to(
new TFRecordSink(
getFilenamePrefix(),
getFilenameSuffix(),
http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index ea80639..fbd76df 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -702,12 +702,12 @@ public class TextIO {
throw new IllegalStateException(
"cannot set both a filename policy and a filename prefix");
}
- org.apache.beam.sdk.io.Write<String> write = null;
+ WriteFiles<String> write = null;
if (filenamePolicy != null) {
- write = org.apache.beam.sdk.io.Write.to(
+ write = WriteFiles.to(
new TextSink(filenamePolicy, header, footer, writableByteChannelFactory));
} else {
- write = org.apache.beam.sdk.io.Write.to(
+ write = WriteFiles.to(
new TextSink(filenamePrefix, filenameSuffix, header, footer, shardTemplate,
writableByteChannelFactory));
}
@@ -717,7 +717,7 @@ public class TextIO {
if (windowedWrites) {
write = write.withWindowedWrites();
}
- return input.apply("Write", write);
+ return input.apply("WriteFiles", write);
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
deleted file mode 100644
index 16f3eb6..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
+++ /dev/null
@@ -1,603 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.Sink.WriteOperation;
-import org.apache.beam.sdk.io.Sink.Writer;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.WithKeys;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PDone;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A {@link PTransform} that writes to a {@link Sink}. A write begins with a sequential global
- * initialization of a sink, followed by a parallel write, and ends with a sequential finalization
- * of the write. The output of a write is {@link PDone}.
- *
- * <p>By default, every bundle in the input {@link PCollection} will be processed by a
- * {@link WriteOperation}, so the number of outputs will vary based on runner behavior, though at
- * least 1 output will always be produced. The exact parallelism of the write stage can be
- * controlled using {@link Write#withNumShards}, typically used to control how many files are
- * produced or to globally limit the number of workers connecting to an external service. However,
- * this option can often hurt performance: it adds an additional {@link GroupByKey} to the pipeline.
- *
- * <p>{@code Write} re-windows the data into the global window, so it is typically not well suited
- * to use in streaming pipelines.
- *
- * <p>Example usage with runner-determined sharding:
- *
- * <pre>{@code p.apply(Write.to(new MySink(...)));}</pre>
- *
- * <p>Example usage with a fixed number of shards:
- *
- * <pre>{@code p.apply(Write.to(new MySink(...)).withNumShards(3));}</pre>
- */
-@Experimental(Experimental.Kind.SOURCE_SINK)
-public class Write<T> extends PTransform<PCollection<T>, PDone> {
- private static final Logger LOG = LoggerFactory.getLogger(Write.class);
-
- private static final int UNKNOWN_SHARDNUM = -1;
- private static final int UNKNOWN_NUMSHARDS = -1;
-
- private final Sink<T> sink;
- // This allows the number of shards to be dynamically computed based on the input
- // PCollection.
- @Nullable
- private final PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards;
- // We don't use a side input for static sharding, as we want this value to be updatable
- // when a pipeline is updated.
- @Nullable
- private final ValueProvider<Integer> numShardsProvider;
- private boolean windowedWrites;
-
- /**
- * Creates a {@link Write} transform that writes to the given {@link Sink}, letting the runner
- * control how many different shards are produced.
- */
- public static <T> Write<T> to(Sink<T> sink) {
- checkNotNull(sink, "sink");
- return new Write<>(sink, null /* runner-determined sharding */, null, false);
- }
-
- private Write(
- Sink<T> sink,
- @Nullable PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards,
- @Nullable ValueProvider<Integer> numShardsProvider,
- boolean windowedWrites) {
- this.sink = sink;
- this.computeNumShards = computeNumShards;
- this.numShardsProvider = numShardsProvider;
- this.windowedWrites = windowedWrites;
- }
-
- @Override
- public PDone expand(PCollection<T> input) {
- checkArgument(IsBounded.BOUNDED == input.isBounded() || windowedWrites,
- "%s can only be applied to an unbounded PCollection if doing windowed writes",
- Write.class.getSimpleName());
- PipelineOptions options = input.getPipeline().getOptions();
- sink.validate(options);
- return createWrite(input, sink.createWriteOperation(options));
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- builder
- .add(DisplayData.item("sink", sink.getClass()).withLabel("Write Sink"))
- .include("sink", sink);
- if (getSharding() != null) {
- builder.include("sharding", getSharding());
- } else if (getNumShards() != null) {
- String numShards = getNumShards().isAccessible()
- ? getNumShards().get().toString() : getNumShards().toString();
- builder.add(DisplayData.item("numShards", numShards)
- .withLabel("Fixed Number of Shards"));
- }
- }
-
- /**
- * Returns the {@link Sink} associated with this PTransform.
- */
- public Sink<T> getSink() {
- return sink;
- }
-
- /**
- * Gets the {@link PTransform} that will be used to determine sharding. This can be either a
- * static number of shards (as following a call to {@link #withNumShards(int)}), dynamic (by
- * {@link #withSharding(PTransform)}), or runner-determined (by {@link
- * #withRunnerDeterminedSharding()}.
- */
- @Nullable
- public PTransform<PCollection<T>, PCollectionView<Integer>> getSharding() {
- return computeNumShards;
- }
-
- public ValueProvider<Integer> getNumShards() {
- return numShardsProvider;
- }
-
- /**
- * Returns a new {@link Write} that will write to the current {@link Sink} using the
- * specified number of shards.
- *
- * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for
- * more information.
- *
- * <p>A value less than or equal to 0 will be equivalent to the default behavior of
- * runner-determined sharding.
- */
- public Write<T> withNumShards(int numShards) {
- if (numShards > 0) {
- return withNumShards(StaticValueProvider.of(numShards));
- }
- return withRunnerDeterminedSharding();
- }
-
- /**
- * Returns a new {@link Write} that will write to the current {@link Sink} using the
- * {@link ValueProvider} specified number of shards.
- *
- * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for
- * more information.
- */
- public Write<T> withNumShards(ValueProvider<Integer> numShardsProvider) {
- return new Write<>(sink, null, numShardsProvider, windowedWrites);
- }
-
- /**
- * Returns a new {@link Write} that will write to the current {@link Sink} using the
- * specified {@link PTransform} to compute the number of shards.
- *
- * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for
- * more information.
- */
- public Write<T> withSharding(PTransform<PCollection<T>, PCollectionView<Integer>> sharding) {
- checkNotNull(
- sharding, "Cannot provide null sharding. Use withRunnerDeterminedSharding() instead");
- return new Write<>(sink, sharding, null, windowedWrites);
- }
-
- /**
- * Returns a new {@link Write} that will write to the current {@link Sink} with
- * runner-determined sharding.
- */
- public Write<T> withRunnerDeterminedSharding() {
- return new Write<>(sink, null, null, windowedWrites);
- }
-
- /**
- * Returns a new {@link Write} that writes preserves windowing on it's input.
- *
- * <p>If this option is not specified, windowing and triggering are replaced by
- * {@link GlobalWindows} and {@link DefaultTrigger}.
- *
- * <p>If there is no data for a window, no output shards will be generated for that window.
- * If a window triggers multiple times, then more than a single output shard might be
- * generated multiple times; it's up to the sink implementation to keep these output shards
- * unique.
- *
- * <p>This option can only be used if {@link #withNumShards(int)} is also set to a
- * positive value.
- */
- public Write<T> withWindowedWrites() {
- return new Write<>(sink, computeNumShards, numShardsProvider, true);
- }
-
- /**
- * Writes all the elements in a bundle using a {@link Writer} produced by the
- * {@link WriteOperation} associated with the {@link Sink}.
- */
- private class WriteBundles<WriteT> extends DoFn<T, WriteT> {
- // Writer that will write the records in this bundle. Lazily
- // initialized in processElement.
- private Writer<T, WriteT> writer = null;
- private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView;
-
- WriteBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView) {
- this.writeOperationView = writeOperationView;
- }
-
- @ProcessElement
- public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
- // Lazily initialize the Writer
- if (writer == null) {
- WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
- LOG.info("Opening writer for write operation {}", writeOperation);
- writer = writeOperation.createWriter(c.getPipelineOptions());
-
- if (windowedWrites) {
- writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), UNKNOWN_SHARDNUM,
- UNKNOWN_NUMSHARDS);
- } else {
- writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, UNKNOWN_NUMSHARDS);
- }
- LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView);
- }
- try {
- writer.write(c.element());
- } catch (Exception e) {
- // Discard write result and close the write.
- try {
- writer.close();
- // The writer does not need to be reset, as this DoFn cannot be reused.
- } catch (Exception closeException) {
- if (closeException instanceof InterruptedException) {
- // Do not silently ignore interrupted state.
- Thread.currentThread().interrupt();
- }
- // Do not mask the exception that caused the write to fail.
- e.addSuppressed(closeException);
- }
- throw e;
- }
- }
-
- @FinishBundle
- public void finishBundle(Context c) throws Exception {
- if (writer != null) {
- WriteT result = writer.close();
- c.output(result);
- // Reset state in case of reuse.
- writer = null;
- }
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- builder.delegate(Write.this);
- }
- }
-
- /**
- * Like {@link WriteBundles}, but where the elements for each shard have been collected into
- * a single iterable.
- *
- * @see WriteBundles
- */
- private class WriteShardedBundles<WriteT> extends DoFn<KV<Integer, Iterable<T>>, WriteT> {
- private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView;
- private final PCollectionView<Integer> numShardsView;
-
- WriteShardedBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView,
- PCollectionView<Integer> numShardsView) {
- this.writeOperationView = writeOperationView;
- this.numShardsView = numShardsView;
- }
-
- @ProcessElement
- public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
- int numShards = numShardsView != null ? c.sideInput(numShardsView) : getNumShards().get();
- // In a sharded write, single input element represents one shard. We can open and close
- // the writer in each call to processElement.
- WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
- LOG.info("Opening writer for write operation {}", writeOperation);
- Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions());
- if (windowedWrites) {
- writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), c.element().getKey(),
- numShards);
- } else {
- writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, UNKNOWN_NUMSHARDS);
- }
- LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView);
-
- try {
- try {
- for (T t : c.element().getValue()) {
- writer.write(t);
- }
- } catch (Exception e) {
- try {
- writer.close();
- } catch (Exception closeException) {
- if (closeException instanceof InterruptedException) {
- // Do not silently ignore interrupted state.
- Thread.currentThread().interrupt();
- }
- // Do not mask the exception that caused the write to fail.
- e.addSuppressed(closeException);
- }
- throw e;
- }
-
- // Close the writer; if this throws let the error propagate.
- WriteT result = writer.close();
- c.output(result);
- } catch (Exception e) {
- // If anything goes wrong, make sure to delete the temporary file.
- writer.cleanup();
- throw e;
- }
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- builder.delegate(Write.this);
- }
- }
-
- private static class ApplyShardingKey<T> extends DoFn<T, KV<Integer, T>> {
- private final PCollectionView<Integer> numShardsView;
- private final ValueProvider<Integer> numShardsProvider;
- private int shardNumber;
-
- ApplyShardingKey(PCollectionView<Integer> numShardsView,
- ValueProvider<Integer> numShardsProvider) {
- this.numShardsView = numShardsView;
- this.numShardsProvider = numShardsProvider;
- shardNumber = UNKNOWN_SHARDNUM;
- }
-
- @ProcessElement
- public void processElement(ProcessContext context) {
- int shardCount = 0;
- if (numShardsView != null) {
- shardCount = context.sideInput(numShardsView);
- } else {
- checkNotNull(numShardsProvider);
- shardCount = numShardsProvider.get();
- }
- checkArgument(
- shardCount > 0,
- "Must have a positive number of shards specified for non-runner-determined sharding."
- + " Got %s",
- shardCount);
- if (shardNumber == UNKNOWN_SHARDNUM) {
- // We want to desynchronize the first record sharding key for each instance of
- // ApplyShardingKey, so records in a small PCollection will be statistically balanced.
- shardNumber = ThreadLocalRandom.current().nextInt(shardCount);
- } else {
- shardNumber = (shardNumber + 1) % shardCount;
- }
- context.output(KV.of(shardNumber, context.element()));
- }
- }
-
- /**
- * A write is performed as sequence of three {@link ParDo}'s.
- *
- * <p>In the first, a do-once ParDo is applied to a singleton PCollection containing the Sink's
- * {@link WriteOperation}. In this initialization ParDo, {@link WriteOperation#initialize} is
- * called. The output of this ParDo is a singleton PCollection
- * containing the WriteOperation.
- *
- * <p>This singleton collection containing the WriteOperation is then used as a side input to a
- * ParDo over the PCollection of elements to write. In this bundle-writing phase,
- * {@link WriteOperation#createWriter} is called to obtain a {@link Writer}.
- * {@link Writer#open} and {@link Writer#close} are called in {@link DoFn#startBundle} and
- * {@link DoFn#finishBundle}, respectively, and {@link Writer#write} method is called for
- * every element in the bundle. The output of this ParDo is a PCollection of
- * <i>writer result</i> objects (see {@link Sink} for a description of writer results)-one for
- * each bundle.
- *
- * <p>The final do-once ParDo uses the singleton collection of the WriteOperation as input and
- * the collection of writer results as a side-input. In this ParDo,
- * {@link WriteOperation#finalize} is called to finalize the write.
- *
- * <p>If the write of any element in the PCollection fails, {@link Writer#close} will be called
- * before the exception that caused the write to fail is propagated and the write result will be
- * discarded.
- *
- * <p>Since the {@link WriteOperation} is serialized after the initialization ParDo and
- * deserialized in the bundle-writing and finalization phases, any state change to the
- * WriteOperation object that occurs during initialization is visible in the latter phases.
- * However, the WriteOperation is not serialized after the bundle-writing phase. This is why
- * implementations should guarantee that {@link WriteOperation#createWriter} does not mutate
- * WriteOperation).
- */
- private <WriteT> PDone createWrite(
- PCollection<T> input, WriteOperation<T, WriteT> writeOperation) {
- Pipeline p = input.getPipeline();
- writeOperation.setWindowedWrites(windowedWrites);
-
- // A coder to use for the WriteOperation.
- @SuppressWarnings("unchecked")
- Coder<WriteOperation<T, WriteT>> operationCoder =
- (Coder<WriteOperation<T, WriteT>>) SerializableCoder.of(writeOperation.getClass());
-
- // A singleton collection of the WriteOperation, to be used as input to a ParDo to initialize
- // the sink.
- PCollection<WriteOperation<T, WriteT>> operationCollection =
- p.apply("CreateOperationCollection", Create.of(writeOperation).withCoder(operationCoder));
-
- // Initialize the resource in a do-once ParDo on the WriteOperation.
- operationCollection = operationCollection
- .apply("Initialize", ParDo.of(
- new DoFn<WriteOperation<T, WriteT>, WriteOperation<T, WriteT>>() {
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- WriteOperation<T, WriteT> writeOperation = c.element();
- LOG.info("Initializing write operation {}", writeOperation);
- writeOperation.initialize(c.getPipelineOptions());
- writeOperation.setWindowedWrites(windowedWrites);
- LOG.debug("Done initializing write operation {}", writeOperation);
- // The WriteOperation is also the output of this ParDo, so it can have mutable
- // state.
- c.output(writeOperation);
- }
- }))
- .setCoder(operationCoder);
-
- // Create a view of the WriteOperation to be used as a sideInput to the parallel write phase.
- final PCollectionView<WriteOperation<T, WriteT>> writeOperationView =
- operationCollection.apply(View.<WriteOperation<T, WriteT>>asSingleton());
-
- if (!windowedWrites) {
- // Re-window the data into the global window and remove any existing triggers.
- input =
- input.apply(
- Window.<T>into(new GlobalWindows())
- .triggering(DefaultTrigger.of())
- .discardingFiredPanes());
- }
-
-
- // Perform the per-bundle writes as a ParDo on the input PCollection (with the WriteOperation
- // as a side input) and collect the results of the writes in a PCollection.
- // There is a dependency between this ParDo and the first (the WriteOperation PCollection
- // as a side input), so this will happen after the initial ParDo.
- PCollection<WriteT> results;
- final PCollectionView<Integer> numShardsView;
- if (computeNumShards == null && numShardsProvider == null) {
- if (windowedWrites) {
- throw new IllegalStateException("When doing windowed writes, numShards must be set"
- + "explicitly to a positive value");
- }
- numShardsView = null;
- results = input
- .apply("WriteBundles",
- ParDo.of(new WriteBundles<>(writeOperationView))
- .withSideInputs(writeOperationView));
- } else {
- if (computeNumShards != null) {
- numShardsView = input.apply(computeNumShards);
- results = input
- .apply("ApplyShardLabel", ParDo.of(
- new ApplyShardingKey<T>(numShardsView, null)).withSideInputs(numShardsView))
- .apply("GroupIntoShards", GroupByKey.<Integer, T>create())
- .apply("WriteShardedBundles",
- ParDo.of(new WriteShardedBundles<>(writeOperationView, numShardsView))
- .withSideInputs(numShardsView, writeOperationView));
- } else {
- numShardsView = null;
- results = input
- .apply("ApplyShardLabel", ParDo.of(new ApplyShardingKey<T>(null, numShardsProvider)))
- .apply("GroupIntoShards", GroupByKey.<Integer, T>create())
- .apply("WriteShardedBundles",
- ParDo.of(new WriteShardedBundles<>(writeOperationView, null))
- .withSideInputs(writeOperationView));
- }
- }
- results.setCoder(writeOperation.getWriterResultCoder());
-
- if (windowedWrites) {
- // When processing streaming windowed writes, results will arrive multiple times. This
- // means we can't share the below implementation that turns the results into a side input,
- // as new data arriving into a side input does not trigger the listening DoFn. Instead
- // we aggregate the result set using a singleton GroupByKey, so the DoFn will be triggered
- // whenever new data arrives.
- PCollection<KV<Void, WriteT>> keyedResults =
- results.apply("AttachSingletonKey", WithKeys.<Void, WriteT>of((Void) null));
- keyedResults.setCoder(KvCoder.<Void, WriteT>of(VoidCoder.of(), writeOperation
- .getWriterResultCoder()));
-
- // Is the continuation trigger sufficient?
- keyedResults
- .apply("FinalizeGroupByKey", GroupByKey.<Void, WriteT>create())
- .apply("Finalize", ParDo.of(new DoFn<KV<Void, Iterable<WriteT>>, Integer>() {
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
- LOG.info("Finalizing write operation {}.", writeOperation);
- List<WriteT> results = Lists.newArrayList(c.element().getValue());
- writeOperation.finalize(results, c.getPipelineOptions());
- LOG.debug("Done finalizing write operation {}", writeOperation);
- }
- }).withSideInputs(writeOperationView));
- } else {
- final PCollectionView<Iterable<WriteT>> resultsView =
- results.apply(View.<WriteT>asIterable());
- ImmutableList.Builder<PCollectionView<?>> sideInputs =
- ImmutableList.<PCollectionView<?>>builder().add(resultsView);
- if (numShardsView != null) {
- sideInputs.add(numShardsView);
- }
-
- // Finalize the write in another do-once ParDo on the singleton collection containing the
- // Writer. The results from the per-bundle writes are given as an Iterable side input.
- // The WriteOperation's state is the same as after its initialization in the first do-once
- // ParDo. There is a dependency between this ParDo and the parallel write (the writer
- // results collection as a side input), so it will happen after the parallel write.
- // For the non-windowed case, we guarantee that if no data is written but the user has
- // set numShards, then all shards will be written out as empty files. For this reason we
- // use a side input here.
- operationCollection
- .apply("Finalize", ParDo.of(new DoFn<WriteOperation<T, WriteT>, Integer>() {
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- WriteOperation<T, WriteT> writeOperation = c.element();
- LOG.info("Finalizing write operation {}.", writeOperation);
- List<WriteT> results = Lists.newArrayList(c.sideInput(resultsView));
- LOG.debug("Side input initialized to finalize write operation {}.", writeOperation);
-
- // We must always output at least 1 shard, and honor user-specified numShards if
- // set.
- int minShardsNeeded;
- if (numShardsView != null) {
- minShardsNeeded = c.sideInput(numShardsView);
- } else if (numShardsProvider != null) {
- minShardsNeeded = numShardsProvider.get();
- } else {
- minShardsNeeded = 1;
- }
- int extraShardsNeeded = minShardsNeeded - results.size();
- if (extraShardsNeeded > 0) {
- LOG.info(
- "Creating {} empty output shards in addition to {} written for a total of "
- + " {}.", extraShardsNeeded, results.size(), minShardsNeeded);
- for (int i = 0; i < extraShardsNeeded; ++i) {
- Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions());
- writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM,
- UNKNOWN_NUMSHARDS);
- WriteT emptyWrite = writer.close();
- results.add(emptyWrite);
- }
- LOG.debug("Done creating extra shards.");
- }
- writeOperation.finalize(results, c.getPipelineOptions());
- LOG.debug("Done finalizing write operation {}", writeOperation);
- }
- }).withSideInputs(sideInputs.build()));
- }
- return PDone.in(input.getPipeline());
- }
-}