You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/07/13 03:06:48 UTC
[36/50] [abbrv] beam git commit: Adds DynamicDestinations support to
FileBasedSink
Adds DynamicDestinations support to FileBasedSink
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4c336e84
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4c336e84
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4c336e84
Branch: refs/heads/DSL_SQL
Commit: 4c336e840e69e83e15d9ffb7e0a0178dd3ab8404
Parents: 1f6117f
Author: Reuven Lax <re...@google.com>
Authored: Fri Jun 9 17:11:32 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Wed Jul 12 20:01:01 2017 -0700
----------------------------------------------------------------------
.../examples/common/WriteOneFilePerWindow.java | 52 +-
.../beam/examples/WindowedWordCountIT.java | 4 +-
.../complete/game/utils/WriteToText.java | 43 +-
.../construction/WriteFilesTranslation.java | 67 +-
.../construction/PTransformMatchersTest.java | 22 +-
.../construction/WriteFilesTranslationTest.java | 62 +-
.../direct/WriteWithShardingFactory.java | 6 +-
.../direct/WriteWithShardingFactoryTest.java | 18 +-
.../beam/runners/dataflow/DataflowRunner.java | 15 +-
.../runners/dataflow/DataflowRunnerTest.java | 35 +-
.../runners/spark/SparkRunnerDebuggerTest.java | 26 +-
.../src/main/proto/beam_runner_api.proto | 7 +-
.../apache/beam/sdk/coders/ShardedKeyCoder.java | 66 ++
.../java/org/apache/beam/sdk/io/AvroIO.java | 220 ++++---
.../java/org/apache/beam/sdk/io/AvroSink.java | 32 +-
.../beam/sdk/io/DefaultFilenamePolicy.java | 274 +++++---
.../beam/sdk/io/DynamicFileDestinations.java | 115 ++++
.../org/apache/beam/sdk/io/FileBasedSink.java | 513 ++++++++-------
.../java/org/apache/beam/sdk/io/TFRecordIO.java | 44 +-
.../java/org/apache/beam/sdk/io/TextIO.java | 488 ++++++++++----
.../java/org/apache/beam/sdk/io/TextSink.java | 22 +-
.../java/org/apache/beam/sdk/io/WriteFiles.java | 640 +++++++++++--------
.../sdk/transforms/SerializableFunctions.java | 50 ++
.../org/apache/beam/sdk/values/ShardedKey.java | 65 ++
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 85 ++-
.../beam/sdk/io/DefaultFilenamePolicyTest.java | 135 ++--
.../sdk/io/DrunkWritableByteChannelFactory.java | 2 +-
.../apache/beam/sdk/io/FileBasedSinkTest.java | 93 +--
.../java/org/apache/beam/sdk/io/SimpleSink.java | 56 +-
.../java/org/apache/beam/sdk/io/TextIOTest.java | 264 +++++++-
.../org/apache/beam/sdk/io/WriteFilesTest.java | 339 ++++++++--
.../beam/sdk/io/gcp/bigquery/BatchLoads.java | 2 +
.../io/gcp/bigquery/DynamicDestinations.java | 29 +-
.../io/gcp/bigquery/GenerateShardedTable.java | 1 +
.../beam/sdk/io/gcp/bigquery/ShardedKey.java | 67 --
.../sdk/io/gcp/bigquery/ShardedKeyCoder.java | 74 ---
.../sdk/io/gcp/bigquery/StreamingWriteFn.java | 1 +
.../io/gcp/bigquery/StreamingWriteTables.java | 2 +
.../sdk/io/gcp/bigquery/TagWithUniqueIds.java | 1 +
.../io/gcp/bigquery/WriteBundlesToFiles.java | 2 +
.../bigquery/WriteGroupedRecordsToFiles.java | 1 +
.../sdk/io/gcp/bigquery/WritePartition.java | 1 +
.../beam/sdk/io/gcp/bigquery/WriteTables.java | 1 +
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 2 +
.../java/org/apache/beam/sdk/io/xml/XmlIO.java | 4 +-
.../org/apache/beam/sdk/io/xml/XmlSink.java | 21 +-
.../org/apache/beam/sdk/io/xml/XmlSinkTest.java | 4 +-
47 files changed, 2710 insertions(+), 1363 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
index 5e6df9c..49865ba 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
@@ -17,11 +17,12 @@
*/
package org.apache.beam.examples.common;
-import static com.google.common.base.Verify.verifyNotNull;
+import static com.google.common.base.MoreObjects.firstNonNull;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
@@ -53,22 +54,12 @@ public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone
@Override
public PDone expand(PCollection<String> input) {
- // filenamePrefix may contain a directory and a filename component. Pull out only the filename
- // component from that path for the PerWindowFiles.
- String prefix = "";
ResourceId resource = FileBasedSink.convertToFileResourceIfPossible(filenamePrefix);
- if (!resource.isDirectory()) {
- prefix = verifyNotNull(
- resource.getFilename(),
- "A non-directory resource should have a non-null filename: %s",
- resource);
- }
-
-
- TextIO.Write write = TextIO.write()
- .to(resource.getCurrentDirectory())
- .withFilenamePolicy(new PerWindowFiles(prefix))
- .withWindowedWrites();
+ TextIO.Write write =
+ TextIO.write()
+ .to(new PerWindowFiles(resource))
+ .withTempDirectory(resource.getCurrentDirectory())
+ .withWindowedWrites();
if (numShards != null) {
write = write.withNumShards(numShards);
}
@@ -83,31 +74,36 @@ public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone
*/
public static class PerWindowFiles extends FilenamePolicy {
- private final String prefix;
+ private final ResourceId baseFilename;
- public PerWindowFiles(String prefix) {
- this.prefix = prefix;
+ public PerWindowFiles(ResourceId baseFilename) {
+ this.baseFilename = baseFilename;
}
public String filenamePrefixForWindow(IntervalWindow window) {
+ String prefix =
+ baseFilename.isDirectory() ? "" : firstNonNull(baseFilename.getFilename(), "");
return String.format("%s-%s-%s",
prefix, FORMATTER.print(window.start()), FORMATTER.print(window.end()));
}
@Override
- public ResourceId windowedFilename(
- ResourceId outputDirectory, WindowedContext context, String extension) {
+ public ResourceId windowedFilename(WindowedContext context, OutputFileHints outputFileHints) {
IntervalWindow window = (IntervalWindow) context.getWindow();
- String filename = String.format(
- "%s-%s-of-%s%s",
- filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards(),
- extension);
- return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
+ String filename =
+ String.format(
+ "%s-%s-of-%s%s",
+ filenamePrefixForWindow(window),
+ context.getShardNumber(),
+ context.getNumShards(),
+ outputFileHints.getSuggestedFilenameSuffix());
+ return baseFilename
+ .getCurrentDirectory()
+ .resolve(filename, StandardResolveOptions.RESOLVE_FILE);
}
@Override
- public ResourceId unwindowedFilename(
- ResourceId outputDirectory, Context context, String extension) {
+ public ResourceId unwindowedFilename(Context context, OutputFileHints outputFileHints) {
throw new UnsupportedOperationException("Unsupported.");
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
index eb7e4c4..bec7952 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.examples.common.WriteOneFilePerWindow.PerWindowFiles;
import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -149,7 +150,8 @@ public class WindowedWordCountIT {
String outputPrefix = options.getOutput();
- PerWindowFiles filenamePolicy = new PerWindowFiles(outputPrefix);
+ PerWindowFiles filenamePolicy =
+ new PerWindowFiles(FileBasedSink.convertToFileResourceIfPossible(outputPrefix));
List<ShardedFile> expectedOutputFiles = Lists.newArrayListWithCapacity(6);
http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java
index e6c8ddb..1d60198 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java
@@ -18,7 +18,6 @@
package org.apache.beam.examples.complete.game.utils;
import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Verify.verifyNotNull;
import java.io.Serializable;
import java.util.ArrayList;
@@ -28,6 +27,7 @@ import java.util.TimeZone;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
@@ -111,21 +111,12 @@ public class WriteToText<InputT>
checkArgument(
input.getWindowingStrategy().getWindowFn().windowCoder() == IntervalWindow.getCoder());
- // filenamePrefix may contain a directory and a filename component. Pull out only the filename
- // component from that path for the PerWindowFiles.
- String prefix = "";
ResourceId resource = FileBasedSink.convertToFileResourceIfPossible(filenamePrefix);
- if (!resource.isDirectory()) {
- prefix = verifyNotNull(
- resource.getFilename(),
- "A non-directory resource should have a non-null filename: %s",
- resource);
- }
return input.apply(
TextIO.write()
- .to(resource.getCurrentDirectory())
- .withFilenamePolicy(new PerWindowFiles(prefix))
+ .to(new PerWindowFiles(resource))
+ .withTempDirectory(resource.getCurrentDirectory())
.withWindowedWrites()
.withNumShards(3));
}
@@ -139,31 +130,33 @@ public class WriteToText<InputT>
*/
protected static class PerWindowFiles extends FilenamePolicy {
- private final String prefix;
+ private final ResourceId prefix;
- public PerWindowFiles(String prefix) {
+ public PerWindowFiles(ResourceId prefix) {
this.prefix = prefix;
}
public String filenamePrefixForWindow(IntervalWindow window) {
- return String.format("%s-%s-%s",
- prefix, formatter.print(window.start()), formatter.print(window.end()));
+ String filePrefix = prefix.isDirectory() ? "" : prefix.getFilename();
+ return String.format(
+ "%s-%s-%s", filePrefix, formatter.print(window.start()), formatter.print(window.end()));
}
@Override
- public ResourceId windowedFilename(
- ResourceId outputDirectory, WindowedContext context, String extension) {
+ public ResourceId windowedFilename(WindowedContext context, OutputFileHints outputFileHints) {
IntervalWindow window = (IntervalWindow) context.getWindow();
- String filename = String.format(
- "%s-%s-of-%s%s",
- filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards(),
- extension);
- return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
+ String filename =
+ String.format(
+ "%s-%s-of-%s%s",
+ filenamePrefixForWindow(window),
+ context.getShardNumber(),
+ context.getNumShards(),
+ outputFileHints.getSuggestedFilenameSuffix());
+ return prefix.getCurrentDirectory().resolve(filename, StandardResolveOptions.RESOLVE_FILE);
}
@Override
- public ResourceId unwindowedFilename(
- ResourceId outputDirectory, Context context, String extension) {
+ public ResourceId unwindowedFilename(Context context, OutputFileHints outputFileHints) {
throw new UnsupportedOperationException("Unsupported.");
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
index 99b77ef..b1d2da4 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
@@ -26,6 +26,7 @@ import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
import java.io.IOException;
+import java.io.Serializable;
import java.util.Collections;
import java.util.Map;
import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
@@ -37,6 +38,7 @@ import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
@@ -51,32 +53,45 @@ public class WriteFilesTranslation {
public static final String CUSTOM_JAVA_FILE_BASED_SINK_URN =
"urn:beam:file_based_sink:javasdk:0.1";
+ public static final String CUSTOM_JAVA_FILE_BASED_SINK_FORMAT_FUNCTION_URN =
+ "urn:beam:file_based_sink_format_function:javasdk:0.1";
+
@VisibleForTesting
- static WriteFilesPayload toProto(WriteFiles<?> transform) {
+ static WriteFilesPayload toProto(WriteFiles<?, ?, ?> transform) {
return WriteFilesPayload.newBuilder()
.setSink(toProto(transform.getSink()))
+ .setFormatFunction(toProto(transform.getFormatFunction()))
.setWindowedWrites(transform.isWindowedWrites())
.setRunnerDeterminedSharding(
transform.getNumShards() == null && transform.getSharding() == null)
.build();
}
- private static SdkFunctionSpec toProto(FileBasedSink<?> sink) {
+ private static SdkFunctionSpec toProto(FileBasedSink<?, ?> sink) {
+ return toProto(CUSTOM_JAVA_FILE_BASED_SINK_URN, sink);
+ }
+
+ private static SdkFunctionSpec toProto(SerializableFunction<?, ?> serializableFunction) {
+ return toProto(CUSTOM_JAVA_FILE_BASED_SINK_FORMAT_FUNCTION_URN, serializableFunction);
+ }
+
+ private static SdkFunctionSpec toProto(String urn, Serializable serializable) {
return SdkFunctionSpec.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
- .setUrn(CUSTOM_JAVA_FILE_BASED_SINK_URN)
+ .setUrn(urn)
.setParameter(
Any.pack(
BytesValue.newBuilder()
.setValue(
- ByteString.copyFrom(SerializableUtils.serializeToByteArray(sink)))
+ ByteString.copyFrom(
+ SerializableUtils.serializeToByteArray(serializable)))
.build())))
.build();
}
@VisibleForTesting
- static FileBasedSink<?> sinkFromProto(SdkFunctionSpec sinkProto) throws IOException {
+ static FileBasedSink<?, ?> sinkFromProto(SdkFunctionSpec sinkProto) throws IOException {
checkArgument(
sinkProto.getSpec().getUrn().equals(CUSTOM_JAVA_FILE_BASED_SINK_URN),
"Cannot extract %s instance from %s with URN %s",
@@ -87,16 +102,44 @@ public class WriteFilesTranslation {
byte[] serializedSink =
sinkProto.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray();
- return (FileBasedSink<?>)
+ return (FileBasedSink<?, ?>)
SerializableUtils.deserializeFromByteArray(
serializedSink, FileBasedSink.class.getSimpleName());
}
- public static <T> FileBasedSink<T> getSink(
- AppliedPTransform<PCollection<T>, PDone, ? extends PTransform<PCollection<T>, PDone>>
+ @VisibleForTesting
+ static <InputT, OutputT> SerializableFunction<InputT, OutputT> formatFunctionFromProto(
+ SdkFunctionSpec sinkProto) throws IOException {
+ checkArgument(
+ sinkProto.getSpec().getUrn().equals(CUSTOM_JAVA_FILE_BASED_SINK_FORMAT_FUNCTION_URN),
+ "Cannot extract %s instance from %s with URN %s",
+ SerializableFunction.class.getSimpleName(),
+ FunctionSpec.class.getSimpleName(),
+ sinkProto.getSpec().getUrn());
+
+ byte[] serializedFunction =
+ sinkProto.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray();
+
+ return (SerializableFunction<InputT, OutputT>)
+ SerializableUtils.deserializeFromByteArray(
+ serializedFunction, FileBasedSink.class.getSimpleName());
+ }
+
+ public static <UserT, DestinationT, OutputT> FileBasedSink<OutputT, DestinationT> getSink(
+ AppliedPTransform<PCollection<UserT>, PDone, ? extends PTransform<PCollection<UserT>, PDone>>
+ transform)
+ throws IOException {
+ return (FileBasedSink<OutputT, DestinationT>)
+ sinkFromProto(getWriteFilesPayload(transform).getSink());
+ }
+
+ public static <InputT, OutputT> SerializableFunction<InputT, OutputT> getFormatFunction(
+ AppliedPTransform<
+ PCollection<InputT>, PDone, ? extends PTransform<PCollection<InputT>, PDone>>
transform)
throws IOException {
- return (FileBasedSink<T>) sinkFromProto(getWriteFilesPayload(transform).getSink());
+ return formatFunctionFromProto(
+ getWriteFilesPayload(transform).<InputT, OutputT>getFormatFunction());
}
public static <T> boolean isWindowedWrites(
@@ -124,15 +167,15 @@ public class WriteFilesTranslation {
.unpack(WriteFilesPayload.class);
}
- static class WriteFilesTranslator implements TransformPayloadTranslator<WriteFiles<?>> {
+ static class WriteFilesTranslator implements TransformPayloadTranslator<WriteFiles<?, ?, ?>> {
@Override
- public String getUrn(WriteFiles<?> transform) {
+ public String getUrn(WriteFiles<?, ?, ?> transform) {
return PTransformTranslation.WRITE_FILES_TRANSFORM_URN;
}
@Override
public FunctionSpec translate(
- AppliedPTransform<?, ?, WriteFiles<?>> transform, SdkComponents components) {
+ AppliedPTransform<?, ?, WriteFiles<?, ?, ?>> transform, SdkComponents components) {
return FunctionSpec.newBuilder()
.setUrn(getUrn(transform.getTransform()))
.setParameter(Any.pack(toProto(transform.getTransform())))
http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index 6459849..99d3dd1 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.DefaultFilenamePolicy;
+import org.apache.beam.sdk.io.DynamicFileDestinations;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.LocalResources;
@@ -55,6 +56,7 @@ import org.apache.beam.sdk.transforms.Materialization;
import org.apache.beam.sdk.transforms.Materializations;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
@@ -537,30 +539,32 @@ public class PTransformMatchersTest implements Serializable {
public void writeWithRunnerDeterminedSharding() {
ResourceId outputDirectory = LocalResources.fromString("/foo/bar", true /* isDirectory */);
FilenamePolicy policy =
- DefaultFilenamePolicy.constructUsingStandardParameters(
+ DefaultFilenamePolicy.fromStandardParameters(
StaticValueProvider.of(outputDirectory),
DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE,
"",
false);
- WriteFiles<Integer> write =
+ WriteFiles<Integer, Void, Integer> write =
WriteFiles.to(
- new FileBasedSink<Integer>(StaticValueProvider.of(outputDirectory), policy) {
+ new FileBasedSink<Integer, Void>(
+ StaticValueProvider.of(outputDirectory), DynamicFileDestinations.constant(null)) {
@Override
- public WriteOperation<Integer> createWriteOperation() {
+ public WriteOperation<Integer, Void> createWriteOperation() {
return null;
}
- });
+ },
+ SerializableFunctions.<Integer>identity());
assertThat(
PTransformMatchers.writeWithRunnerDeterminedSharding().matches(appliedWrite(write)),
is(true));
- WriteFiles<Integer> withStaticSharding = write.withNumShards(3);
+ WriteFiles<Integer, Void, Integer> withStaticSharding = write.withNumShards(3);
assertThat(
PTransformMatchers.writeWithRunnerDeterminedSharding()
.matches(appliedWrite(withStaticSharding)),
is(false));
- WriteFiles<Integer> withCustomSharding =
+ WriteFiles<Integer, Void, Integer> withCustomSharding =
write.withSharding(Sum.integersGlobally().asSingletonView());
assertThat(
PTransformMatchers.writeWithRunnerDeterminedSharding()
@@ -568,8 +572,8 @@ public class PTransformMatchersTest implements Serializable {
is(false));
}
- private AppliedPTransform<?, ?, ?> appliedWrite(WriteFiles<Integer> write) {
- return AppliedPTransform.<PCollection<Integer>, PDone, WriteFiles<Integer>>of(
+ private AppliedPTransform<?, ?, ?> appliedWrite(WriteFiles<Integer, Void, Integer> write) {
+ return AppliedPTransform.<PCollection<Integer>, PDone, WriteFiles<Integer, Void, Integer>>of(
"WriteFiles",
Collections.<TupleTag<?>, PValue>emptyMap(),
Collections.<TupleTag<?>, PValue>emptyMap(),
http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
index 739034c..283df16 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
@@ -26,8 +26,10 @@ import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.ParDoPayload;
+import org.apache.beam.sdk.io.DynamicFileDestinations;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.io.fs.ResourceId;
@@ -36,6 +38,8 @@ import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.junit.Test;
@@ -56,16 +60,17 @@ public class WriteFilesTranslationTest {
@RunWith(Parameterized.class)
public static class TestWriteFilesPayloadTranslation {
@Parameters(name = "{index}: {0}")
- public static Iterable<WriteFiles<?>> data() {
- return ImmutableList.<WriteFiles<?>>of(
- WriteFiles.to(new DummySink()),
- WriteFiles.to(new DummySink()).withWindowedWrites(),
- WriteFiles.to(new DummySink()).withNumShards(17),
- WriteFiles.to(new DummySink()).withWindowedWrites().withNumShards(42));
+ public static Iterable<WriteFiles<Object, Void, Object>> data() {
+ SerializableFunction<Object, Object> format = SerializableFunctions.constant(null);
+ return ImmutableList.of(
+ WriteFiles.to(new DummySink(), format),
+ WriteFiles.to(new DummySink(), format).withWindowedWrites(),
+ WriteFiles.to(new DummySink(), format).withNumShards(17),
+ WriteFiles.to(new DummySink(), format).withWindowedWrites().withNumShards(42));
}
@Parameter(0)
- public WriteFiles<String> writeFiles;
+ public WriteFiles<String, Void, String> writeFiles;
public static TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
@@ -80,7 +85,7 @@ public class WriteFilesTranslationTest {
assertThat(payload.getWindowedWrites(), equalTo(writeFiles.isWindowedWrites()));
assertThat(
- (FileBasedSink<String>) WriteFilesTranslation.sinkFromProto(payload.getSink()),
+ (FileBasedSink<String, Void>) WriteFilesTranslation.sinkFromProto(payload.getSink()),
equalTo(writeFiles.getSink()));
}
@@ -89,9 +94,9 @@ public class WriteFilesTranslationTest {
PCollection<String> input = p.apply(Create.of("hello"));
PDone output = input.apply(writeFiles);
- AppliedPTransform<PCollection<String>, PDone, WriteFiles<String>> appliedPTransform =
- AppliedPTransform.<PCollection<String>, PDone, WriteFiles<String>>of(
- "foo", input.expand(), output.expand(), writeFiles, p);
+ AppliedPTransform<PCollection<String>, PDone, WriteFiles<String, Void, String>>
+ appliedPTransform =
+ AppliedPTransform.of("foo", input.expand(), output.expand(), writeFiles, p);
assertThat(
WriteFilesTranslation.isRunnerDeterminedSharding(appliedPTransform),
@@ -101,7 +106,9 @@ public class WriteFilesTranslationTest {
WriteFilesTranslation.isWindowedWrites(appliedPTransform),
equalTo(writeFiles.isWindowedWrites()));
- assertThat(WriteFilesTranslation.getSink(appliedPTransform), equalTo(writeFiles.getSink()));
+ assertThat(
+ WriteFilesTranslation.<String, Void, String>getSink(appliedPTransform),
+ equalTo(writeFiles.getSink()));
}
}
@@ -109,16 +116,16 @@ public class WriteFilesTranslationTest {
* A simple {@link FileBasedSink} for testing serialization/deserialization. Not mocked to avoid
* any issues serializing mocks.
*/
- private static class DummySink extends FileBasedSink<String> {
+ private static class DummySink extends FileBasedSink<Object, Void> {
DummySink() {
super(
StaticValueProvider.of(FileSystems.matchNewResource("nowhere", false)),
- new DummyFilenamePolicy());
+ DynamicFileDestinations.constant(new DummyFilenamePolicy()));
}
@Override
- public WriteOperation<String> createWriteOperation() {
+ public WriteOperation<Object, Void> createWriteOperation() {
return new DummyWriteOperation(this);
}
@@ -130,46 +137,39 @@ public class WriteFilesTranslationTest {
DummySink that = (DummySink) other;
- return getFilenamePolicy().equals(((DummySink) other).getFilenamePolicy())
- && getBaseOutputDirectoryProvider().isAccessible()
- && that.getBaseOutputDirectoryProvider().isAccessible()
- && getBaseOutputDirectoryProvider()
- .get()
- .equals(that.getBaseOutputDirectoryProvider().get());
+ return getTempDirectoryProvider().isAccessible()
+ && that.getTempDirectoryProvider().isAccessible()
+ && getTempDirectoryProvider().get().equals(that.getTempDirectoryProvider().get());
}
@Override
public int hashCode() {
return Objects.hash(
DummySink.class,
- getFilenamePolicy(),
- getBaseOutputDirectoryProvider().isAccessible()
- ? getBaseOutputDirectoryProvider().get()
- : null);
+ getTempDirectoryProvider().isAccessible() ? getTempDirectoryProvider().get() : null);
}
}
- private static class DummyWriteOperation extends FileBasedSink.WriteOperation<String> {
- public DummyWriteOperation(FileBasedSink<String> sink) {
+ private static class DummyWriteOperation extends FileBasedSink.WriteOperation<Object, Void> {
+ public DummyWriteOperation(FileBasedSink<Object, Void> sink) {
super(sink);
}
@Override
- public FileBasedSink.Writer<String> createWriter() throws Exception {
+ public FileBasedSink.Writer<Object, Void> createWriter() throws Exception {
throw new UnsupportedOperationException("Should never be called.");
}
}
private static class DummyFilenamePolicy extends FilenamePolicy {
@Override
- public ResourceId windowedFilename(
- ResourceId outputDirectory, WindowedContext c, String extension) {
+ public ResourceId windowedFilename(WindowedContext c, OutputFileHints outputFileHints) {
throw new UnsupportedOperationException("Should never be called.");
}
@Nullable
@Override
- public ResourceId unwindowedFilename(ResourceId outputDirectory, Context c, String extension) {
+ public ResourceId unwindowedFilename(Context c, OutputFileHints outputFileHints) {
throw new UnsupportedOperationException("Should never be called.");
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
index d8734a1..ba796ae 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
@@ -60,9 +60,11 @@ class WriteWithShardingFactory<InputT>
public PTransformReplacement<PCollection<InputT>, PDone> getReplacementTransform(
AppliedPTransform<PCollection<InputT>, PDone, PTransform<PCollection<InputT>, PDone>>
transform) {
-
try {
- WriteFiles<InputT> replacement = WriteFiles.to(WriteFilesTranslation.getSink(transform));
+ WriteFiles<InputT, ?, ?> replacement =
+ WriteFiles.to(
+ WriteFilesTranslation.getSink(transform),
+ WriteFilesTranslation.getFormatFunction(transform));
if (WriteFilesTranslation.isWindowedWrites(transform)) {
replacement = replacement.withWindowedWrites();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index 41d671f..546a181 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -39,9 +39,8 @@ import java.util.UUID;
import org.apache.beam.runners.direct.WriteWithShardingFactory.CalculateShardsFn;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.DefaultFilenamePolicy;
+import org.apache.beam.sdk.io.DynamicFileDestinations;
import org.apache.beam.sdk.io.FileBasedSink;
-import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.LocalResources;
import org.apache.beam.sdk.io.TextIO;
@@ -55,6 +54,7 @@ import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
@@ -137,21 +137,17 @@ public class WriteWithShardingFactoryTest implements Serializable {
@Test
public void withNoShardingSpecifiedReturnsNewTransform() {
ResourceId outputDirectory = LocalResources.fromString("/foo", true /* isDirectory */);
- FilenamePolicy policy =
- DefaultFilenamePolicy.constructUsingStandardParameters(
- StaticValueProvider.of(outputDirectory),
- DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE,
- "",
- false);
PTransform<PCollection<Object>, PDone> original =
WriteFiles.to(
- new FileBasedSink<Object>(StaticValueProvider.of(outputDirectory), policy) {
+ new FileBasedSink<Object, Void>(
+ StaticValueProvider.of(outputDirectory), DynamicFileDestinations.constant(null)) {
@Override
- public WriteOperation<Object> createWriteOperation() {
+ public WriteOperation<Object, Void> createWriteOperation() {
throw new IllegalArgumentException("Should not be used");
}
- });
+ },
+ SerializableFunctions.identity());
@SuppressWarnings("unchecked")
PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));
http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 5d9f0f3..8935759 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -1455,8 +1455,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
@VisibleForTesting
- static class StreamingShardedWriteFactory<T>
- implements PTransformOverrideFactory<PCollection<T>, PDone, WriteFiles<T>> {
+ static class StreamingShardedWriteFactory<UserT, DestinationT, OutputT>
+ implements PTransformOverrideFactory<
+ PCollection<UserT>, PDone, WriteFiles<UserT, DestinationT, OutputT>> {
// We pick 10 as a a default, as it works well with the default number of workers started
// by Dataflow.
static final int DEFAULT_NUM_SHARDS = 10;
@@ -1467,8 +1468,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
@Override
- public PTransformReplacement<PCollection<T>, PDone> getReplacementTransform(
- AppliedPTransform<PCollection<T>, PDone, WriteFiles<T>> transform) {
+ public PTransformReplacement<PCollection<UserT>, PDone> getReplacementTransform(
+ AppliedPTransform<PCollection<UserT>, PDone, WriteFiles<UserT, DestinationT, OutputT>>
+ transform) {
// By default, if numShards is not set WriteFiles will produce one file per bundle. In
// streaming, there are large numbers of small bundles, resulting in many tiny files.
// Instead we pick max workers * 2 to ensure full parallelism, but prevent too-many files.
@@ -1485,7 +1487,10 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
try {
- WriteFiles<T> replacement = WriteFiles.to(WriteFilesTranslation.getSink(transform));
+ WriteFiles<UserT, DestinationT, OutputT> replacement =
+ WriteFiles.<UserT, DestinationT, OutputT>to(
+ WriteFilesTranslation.<UserT, DestinationT, OutputT>getSink(transform),
+ WriteFilesTranslation.<UserT, OutputT>getFormatFunction(transform));
if (WriteFilesTranslation.isWindowedWrites(transform)) {
replacement = replacement.withWindowedWrites();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index bc1a042..94985f8 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -76,6 +76,7 @@ import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.extensions.gcp.auth.NoopCredentialFactory;
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator;
+import org.apache.beam.sdk.io.DynamicFileDestinations;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
@@ -100,6 +101,7 @@ import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.GcsUtil;
@@ -1263,30 +1265,39 @@ public class DataflowRunnerTest implements Serializable {
private void testStreamingWriteOverride(PipelineOptions options, int expectedNumShards) {
TestPipeline p = TestPipeline.fromOptions(options);
- StreamingShardedWriteFactory<Object> factory =
+ StreamingShardedWriteFactory<Object, Void, Object> factory =
new StreamingShardedWriteFactory<>(p.getOptions());
- WriteFiles<Object> original = WriteFiles.to(new TestSink(tmpFolder.toString()));
+ WriteFiles<Object, Void, Object> original =
+ WriteFiles.to(new TestSink(tmpFolder.toString()), SerializableFunctions.identity());
PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));
- AppliedPTransform<PCollection<Object>, PDone, WriteFiles<Object>> originalApplication =
- AppliedPTransform.of(
- "writefiles", objs.expand(), Collections.<TupleTag<?>, PValue>emptyMap(), original, p);
-
- WriteFiles<Object> replacement = (WriteFiles<Object>)
- factory.getReplacementTransform(originalApplication).getTransform();
+ AppliedPTransform<PCollection<Object>, PDone, WriteFiles<Object, Void, Object>>
+ originalApplication =
+ AppliedPTransform.of(
+ "writefiles",
+ objs.expand(),
+ Collections.<TupleTag<?>, PValue>emptyMap(),
+ original,
+ p);
+
+ WriteFiles<Object, Void, Object> replacement =
+ (WriteFiles<Object, Void, Object>)
+ factory.getReplacementTransform(originalApplication).getTransform();
assertThat(replacement, not(equalTo((Object) original)));
assertThat(replacement.getNumShards().get(), equalTo(expectedNumShards));
}
- private static class TestSink extends FileBasedSink<Object> {
+ private static class TestSink extends FileBasedSink<Object, Void> {
@Override
public void validate(PipelineOptions options) {}
TestSink(String tmpFolder) {
- super(StaticValueProvider.of(FileSystems.matchNewResource(tmpFolder, true)),
- null);
+ super(
+ StaticValueProvider.of(FileSystems.matchNewResource(tmpFolder, true)),
+ DynamicFileDestinations.constant(null));
}
+
@Override
- public WriteOperation<Object> createWriteOperation() {
+ public WriteOperation<Object, Void> createWriteOperation() {
throw new IllegalArgumentException("Should not be used");
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
index 64ff98c..246eb81 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
@@ -52,7 +52,6 @@ import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.junit.Test;
-
/**
* Test {@link SparkRunnerDebugger} with different pipelines.
*/
@@ -85,17 +84,20 @@ public class SparkRunnerDebuggerTest {
.apply(MapElements.via(new WordCount.FormatAsTextFn()))
.apply(TextIO.write().to("!!PLACEHOLDER-OUTPUT-DIR!!").withNumShards(3).withSuffix(".txt"));
- final String expectedPipeline = "sparkContext.parallelize(Arrays.asList(...))\n"
- + "_.mapPartitions(new org.apache.beam.runners.spark.examples.WordCount$ExtractWordsFn())\n"
- + "_.mapPartitions(new org.apache.beam.sdk.transforms.Count$PerElement$1())\n"
- + "_.combineByKey(..., new org.apache.beam.sdk.transforms.Count$CountFn(), ...)\n"
- + "_.groupByKey()\n"
- + "_.map(new org.apache.beam.sdk.transforms.Sum$SumLongFn())\n"
- + "_.mapPartitions(new org.apache.beam.runners.spark"
- + ".SparkRunnerDebuggerTest$PlusOne())\n"
- + "sparkContext.union(...)\n"
- + "_.mapPartitions(new org.apache.beam.runners.spark.examples.WordCount$FormatAsTextFn())\n"
- + "_.<org.apache.beam.sdk.io.AutoValue_TextIO_Write>";
+ final String expectedPipeline =
+ "sparkContext.parallelize(Arrays.asList(...))\n"
+ + "_.mapPartitions("
+ + "new org.apache.beam.runners.spark.examples.WordCount$ExtractWordsFn())\n"
+ + "_.mapPartitions(new org.apache.beam.sdk.transforms.Count$PerElement$1())\n"
+ + "_.combineByKey(..., new org.apache.beam.sdk.transforms.Count$CountFn(), ...)\n"
+ + "_.groupByKey()\n"
+ + "_.map(new org.apache.beam.sdk.transforms.Sum$SumLongFn())\n"
+ + "_.mapPartitions(new org.apache.beam.runners.spark"
+ + ".SparkRunnerDebuggerTest$PlusOne())\n"
+ + "sparkContext.union(...)\n"
+ + "_.mapPartitions("
+ + "new org.apache.beam.runners.spark.examples.WordCount$FormatAsTextFn())\n"
+ + "_.<org.apache.beam.sdk.io.TextIO$Write>";
SparkRunnerDebugger.DebugSparkPipelineResult result =
(SparkRunnerDebugger.DebugSparkPipelineResult) pipeline.run();
http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
index 24e907a..1f74afb 100644
--- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
@@ -367,9 +367,12 @@ message WriteFilesPayload {
// (Required) The SdkFunctionSpec of the FileBasedSink.
SdkFunctionSpec sink = 1;
- bool windowed_writes = 2;
+ // (Required) The format function.
+ SdkFunctionSpec format_function = 2;
- bool runner_determined_sharding = 3;
+ bool windowed_writes = 3;
+
+ bool runner_determined_sharding = 4;
}
// A coder, the binary format for serialization and deserialization of data in
http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ShardedKeyCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ShardedKeyCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ShardedKeyCoder.java
new file mode 100644
index 0000000..a86b198
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ShardedKeyCoder.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.coders;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.values.ShardedKey;
+
+/** A {@link Coder} for {@link ShardedKey}, using a wrapped key {@link Coder}. */
+@VisibleForTesting
+public class ShardedKeyCoder<KeyT> extends StructuredCoder<ShardedKey<KeyT>> {
+ public static <KeyT> ShardedKeyCoder<KeyT> of(Coder<KeyT> keyCoder) {
+ return new ShardedKeyCoder<>(keyCoder);
+ }
+
+ private final Coder<KeyT> keyCoder;
+ private final VarIntCoder shardNumberCoder;
+
+ protected ShardedKeyCoder(Coder<KeyT> keyCoder) {
+ this.keyCoder = keyCoder;
+ this.shardNumberCoder = VarIntCoder.of();
+ }
+
+ @Override
+ public List<? extends Coder<?>> getCoderArguments() {
+ return Arrays.asList(keyCoder);
+ }
+
+ @Override
+ public void encode(ShardedKey<KeyT> key, OutputStream outStream)
+ throws IOException {
+ keyCoder.encode(key.getKey(), outStream);
+ shardNumberCoder.encode(key.getShardNumber(), outStream);
+ }
+
+ @Override
+ public ShardedKey<KeyT> decode(InputStream inStream)
+ throws IOException {
+ return ShardedKey.of(keyCoder.decode(inStream), shardNumberCoder.decode(inStream));
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ keyCoder.verifyDeterministic();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 4143db2..89cadbd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -18,7 +18,6 @@
package org.apache.beam.sdk.io;
import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableMap;
@@ -35,6 +34,7 @@ import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.Read.Bounded;
import org.apache.beam.sdk.io.fs.ResourceId;
@@ -43,6 +43,7 @@ import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.values.PBegin;
@@ -52,18 +53,19 @@ import org.apache.beam.sdk.values.PDone;
/**
* {@link PTransform}s for reading and writing Avro files.
*
- * <p>To read a {@link PCollection} from one or more Avro files, use {@code AvroIO.read()},
- * using {@link AvroIO.Read#from} to specify the filename or filepattern to read from.
- * See {@link FileSystems} for information on supported file systems and filepatterns.
+ * <p>To read a {@link PCollection} from one or more Avro files, use {@code AvroIO.read()}, using
+ * {@link AvroIO.Read#from} to specify the filename or filepattern to read from. See {@link
+ * FileSystems} for information on supported file systems and filepatterns.
*
- * <p>To read specific records, such as Avro-generated classes, use {@link #read(Class)}.
- * To read {@link GenericRecord GenericRecords}, use {@link #readGenericRecords(Schema)} which takes
- * a {@link Schema} object, or {@link #readGenericRecords(String)} which takes an Avro schema in a
+ * <p>To read specific records, such as Avro-generated classes, use {@link #read(Class)}. To read
+ * {@link GenericRecord GenericRecords}, use {@link #readGenericRecords(Schema)} which takes a
+ * {@link Schema} object, or {@link #readGenericRecords(String)} which takes an Avro schema in a
* JSON-encoded string form. An exception will be thrown if a record doesn't match the specified
* schema.
*
* <p>For example:
- * <pre> {@code
+ *
+ * <pre>{@code
* Pipeline p = ...;
*
* // A simple Read of a local file (only runs locally):
@@ -75,34 +77,33 @@ import org.apache.beam.sdk.values.PDone;
* PCollection<GenericRecord> records =
* p.apply(AvroIO.readGenericRecords(schema)
* .from("gs://my_bucket/path/to/records-*.avro"));
- * } </pre>
+ * }</pre>
*
* <p>To write a {@link PCollection} to one or more Avro files, use {@link AvroIO.Write}, using
- * {@code AvroIO.write().to(String)} to specify the output filename prefix. The default
- * {@link DefaultFilenamePolicy} will use this prefix, in conjunction with a
- * {@link ShardNameTemplate} (set via {@link Write#withShardNameTemplate(String)}) and optional
- * filename suffix (set via {@link Write#withSuffix(String)}, to generate output filenames in a
- * sharded way. You can override this default write filename policy using
- * {@link Write#withFilenamePolicy(FileBasedSink.FilenamePolicy)} to specify a custom file naming
- * policy.
+ * {@code AvroIO.write().to(String)} to specify the output filename prefix. The default {@link
+ * DefaultFilenamePolicy} will use this prefix, in conjunction with a {@link ShardNameTemplate} (set
+ * via {@link Write#withShardNameTemplate(String)}) and optional filename suffix (set via {@link
+ * Write#withSuffix(String)}, to generate output filenames in a sharded way. You can override this
+ * default write filename policy using {@link Write#to(FileBasedSink.FilenamePolicy)} to specify a
+ * custom file naming policy.
*
* <p>By default, all input is put into the global window before writing. If per-window writes are
- * desired - for example, when using a streaming runner -
- * {@link AvroIO.Write#withWindowedWrites()} will cause windowing and triggering to be
- * preserved. When producing windowed writes, the number of output shards must be set explicitly
- * using {@link AvroIO.Write#withNumShards(int)}; some runners may set this for you to a
- * runner-chosen value, so you may need not set it yourself. A
- * {@link FileBasedSink.FilenamePolicy} must be set, and unique windows and triggers must produce
- * unique filenames.
+ * desired - for example, when using a streaming runner - {@link AvroIO.Write#withWindowedWrites()}
+ * will cause windowing and triggering to be preserved. When producing windowed writes with a
+ * streaming runner that supports triggers, the number of output shards must be set explicitly using
+ * {@link AvroIO.Write#withNumShards(int)}; some runners may set this for you to a runner-chosen
+ * value, so you may need not set it yourself. A {@link FileBasedSink.FilenamePolicy} must be set,
+ * and unique windows and triggers must produce unique filenames.
*
- * <p>To write specific records, such as Avro-generated classes, use {@link #write(Class)}.
- * To write {@link GenericRecord GenericRecords}, use either {@link #writeGenericRecords(Schema)}
- * which takes a {@link Schema} object, or {@link #writeGenericRecords(String)} which takes a schema
- * in a JSON-encoded string form. An exception will be thrown if a record doesn't match the
- * specified schema.
+ * <p>To write specific records, such as Avro-generated classes, use {@link #write(Class)}. To write
+ * {@link GenericRecord GenericRecords}, use either {@link #writeGenericRecords(Schema)} which takes
+ * a {@link Schema} object, or {@link #writeGenericRecords(String)} which takes a schema in a
+ * JSON-encoded string form. An exception will be thrown if a record doesn't match the specified
+ * schema.
*
* <p>For example:
- * <pre> {@code
+ *
+ * <pre>{@code
* // A simple Write to a local file (only runs locally):
* PCollection<AvroAutoGenClass> records = ...;
* records.apply(AvroIO.write(AvroAutoGenClass.class).to("/path/to/file.avro"));
@@ -113,11 +114,11 @@ import org.apache.beam.sdk.values.PDone;
* records.apply("WriteToAvro", AvroIO.writeGenericRecords(schema)
* .to("gs://my_bucket/path/to/numbers")
* .withSuffix(".avro"));
- * } </pre>
+ * }</pre>
*
- * <p>By default, {@link AvroIO.Write} produces output files that are compressed using the
- * {@link org.apache.avro.file.Codec CodecFactory.deflateCodec(6)}. This default can
- * be changed or overridden using {@link AvroIO.Write#withCodec}.
+ * <p>By default, {@link AvroIO.Write} produces output files that are compressed using the {@link
+ * org.apache.avro.file.Codec CodecFactory.deflateCodec(6)}. This default can be changed or
+ * overridden using {@link AvroIO.Write#withCodec}.
*/
public class AvroIO {
/**
@@ -258,11 +259,16 @@ public class AvroIO {
@Nullable abstract ValueProvider<ResourceId> getFilenamePrefix();
@Nullable abstract String getShardTemplate();
@Nullable abstract String getFilenameSuffix();
+
+ @Nullable
+ abstract ValueProvider<ResourceId> getTempDirectory();
+
abstract int getNumShards();
@Nullable abstract Class<T> getRecordClass();
@Nullable abstract Schema getSchema();
abstract boolean getWindowedWrites();
@Nullable abstract FilenamePolicy getFilenamePolicy();
+
/**
* The codec used to encode the blocks in the Avro file. String value drawn from those in
* https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html
@@ -277,6 +283,9 @@ public class AvroIO {
abstract static class Builder<T> {
abstract Builder<T> setFilenamePrefix(ValueProvider<ResourceId> filenamePrefix);
abstract Builder<T> setFilenameSuffix(String filenameSuffix);
+
+ abstract Builder<T> setTempDirectory(ValueProvider<ResourceId> tempDirectory);
+
abstract Builder<T> setNumShards(int numShards);
abstract Builder<T> setShardTemplate(String shardTemplate);
abstract Builder<T> setRecordClass(Class<T> recordClass);
@@ -296,9 +305,9 @@ public class AvroIO {
* <p>The name of the output files will be determined by the {@link FilenamePolicy} used.
*
* <p>By default, a {@link DefaultFilenamePolicy} will build output filenames using the
- * specified prefix, a shard name template (see {@link #withShardNameTemplate(String)}, and
- * a common suffix (if supplied using {@link #withSuffix(String)}). This default can be
- * overridden using {@link #withFilenamePolicy(FilenamePolicy)}.
+ * specified prefix, a shard name template (see {@link #withShardNameTemplate(String)}, and a
+ * common suffix (if supplied using {@link #withSuffix(String)}). This default can be overridden
+ * using {@link #to(FilenamePolicy)}.
*/
public Write<T> to(String outputPrefix) {
return to(FileBasedSink.convertToFileResourceIfPossible(outputPrefix));
@@ -306,14 +315,21 @@ public class AvroIO {
/**
* Writes to file(s) with the given output prefix. See {@link FileSystems} for information on
- * supported file systems.
- *
- * <p>The name of the output files will be determined by the {@link FilenamePolicy} used.
+ * supported file systems. This prefix is used by the {@link DefaultFilenamePolicy} to generate
+ * filenames.
*
* <p>By default, a {@link DefaultFilenamePolicy} will build output filenames using the
- * specified prefix, a shard name template (see {@link #withShardNameTemplate(String)}, and
- * a common suffix (if supplied using {@link #withSuffix(String)}). This default can be
- * overridden using {@link #withFilenamePolicy(FilenamePolicy)}.
+ * specified prefix, a shard name template (see {@link #withShardNameTemplate(String)}, and a
+ * common suffix (if supplied using {@link #withSuffix(String)}). This default can be overridden
+ * using {@link #to(FilenamePolicy)}.
+ *
+ * <p>This default policy can be overridden using {@link #to(FilenamePolicy)}, in which case
+ * {@link #withShardNameTemplate(String)} and {@link #withSuffix(String)} should not be set.
+ * Custom filename policies do not automatically see this prefix - you should explicitly pass
+ * the prefix into your {@link FilenamePolicy} object if you need this.
+ *
+ * <p>If {@link #withTempDirectory} has not been called, this filename prefix will be used to
+ * infer a directory for temporary files.
*/
@Experimental(Kind.FILESYSTEM)
public Write<T> to(ResourceId outputPrefix) {
@@ -342,15 +358,22 @@ public class AvroIO {
}
/**
- * Configures the {@link FileBasedSink.FilenamePolicy} that will be used to name written files.
+ * Writes to files named according to the given {@link FileBasedSink.FilenamePolicy}. A
+ * directory for temporary files must be specified using {@link #withTempDirectory}.
*/
- public Write<T> withFilenamePolicy(FilenamePolicy filenamePolicy) {
+ public Write<T> to(FilenamePolicy filenamePolicy) {
return toBuilder().setFilenamePolicy(filenamePolicy).build();
}
+ /** Set the base directory used to generate temporary files. */
+ @Experimental(Kind.FILESYSTEM)
+ public Write<T> withTempDirectory(ValueProvider<ResourceId> tempDirectory) {
+ return toBuilder().setTempDirectory(tempDirectory).build();
+ }
+
/**
* Uses the given {@link ShardNameTemplate} for naming output files. This option may only be
- * used when {@link #withFilenamePolicy(FilenamePolicy)} has not been configured.
+ * used when using one of the default filename-prefix to() overrides.
*
* <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are
* used.
@@ -360,8 +383,8 @@ public class AvroIO {
}
/**
- * Configures the filename suffix for written files. This option may only be used when
- * {@link #withFilenamePolicy(FilenamePolicy)} has not been configured.
+ * Configures the filename suffix for written files. This option may only be used when using one
+ * of the default filename-prefix to() overrides.
*
* <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are
* used.
@@ -402,9 +425,8 @@ public class AvroIO {
/**
* Preserves windowing of input elements and writes them to files based on the element's window.
*
- * <p>Requires use of {@link #withFilenamePolicy(FileBasedSink.FilenamePolicy)}. Filenames will
- * be generated using {@link FilenamePolicy#windowedFilename}. See also
- * {@link WriteFiles#withWindowedWrites()}.
+ * <p>If using {@link #to(FileBasedSink.FilenamePolicy)}. Filenames will be generated using
+ * {@link FilenamePolicy#windowedFilename}. See also {@link WriteFiles#withWindowedWrites()}.
*/
public Write<T> withWindowedWrites() {
return toBuilder().setWindowedWrites(true).build();
@@ -435,32 +457,46 @@ public class AvroIO {
return toBuilder().setMetadata(ImmutableMap.copyOf(metadata)).build();
}
- @Override
- public PDone expand(PCollection<T> input) {
- checkState(getFilenamePrefix() != null,
- "Need to set the filename prefix of an AvroIO.Write transform.");
- checkState(
- (getFilenamePolicy() == null)
- || (getShardTemplate() == null && getFilenameSuffix() == null),
- "Cannot set a filename policy and also a filename template or suffix.");
- checkState(getSchema() != null,
- "Need to set the schema of an AvroIO.Write transform.");
- checkState(!getWindowedWrites() || (getFilenamePolicy() != null),
- "When using windowed writes, a filename policy must be set via withFilenamePolicy().");
-
+ DynamicDestinations<T, Void> resolveDynamicDestinations() {
FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
if (usedFilenamePolicy == null) {
- usedFilenamePolicy = DefaultFilenamePolicy.constructUsingStandardParameters(
- getFilenamePrefix(), getShardTemplate(), getFilenameSuffix(), getWindowedWrites());
+ usedFilenamePolicy =
+ DefaultFilenamePolicy.fromStandardParameters(
+ getFilenamePrefix(), getShardTemplate(), getFilenameSuffix(), getWindowedWrites());
+ }
+ return DynamicFileDestinations.constant(usedFilenamePolicy);
+ }
+
+ @Override
+ public PDone expand(PCollection<T> input) {
+ checkArgument(
+ getFilenamePrefix() != null || getTempDirectory() != null,
+ "Need to set either the filename prefix or the tempDirectory of a AvroIO.Write "
+ + "transform.");
+ if (getFilenamePolicy() != null) {
+ checkArgument(
+ getShardTemplate() == null && getFilenameSuffix() == null,
+ "shardTemplate and filenameSuffix should only be used with the default "
+ + "filename policy");
}
+ return expandTyped(input, resolveDynamicDestinations());
+ }
- WriteFiles<T> write = WriteFiles.to(
- new AvroSink<>(
- getFilenamePrefix(),
- usedFilenamePolicy,
- AvroCoder.of(getRecordClass(), getSchema()),
- getCodec(),
- getMetadata()));
+ public <DestinationT> PDone expandTyped(
+ PCollection<T> input, DynamicDestinations<T, DestinationT> dynamicDestinations) {
+ ValueProvider<ResourceId> tempDirectory = getTempDirectory();
+ if (tempDirectory == null) {
+ tempDirectory = getFilenamePrefix();
+ }
+ WriteFiles<T, DestinationT, T> write =
+ WriteFiles.to(
+ new AvroSink<>(
+ tempDirectory,
+ dynamicDestinations,
+ AvroCoder.of(getRecordClass(), getSchema()),
+ getCodec(),
+ getMetadata()),
+ SerializableFunctions.<T>identity());
if (getNumShards() > 0) {
write = write.withNumShards(getNumShards());
}
@@ -473,31 +509,25 @@ public class AvroIO {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
- checkState(
- getFilenamePrefix() != null,
- "Unable to populate DisplayData for invalid AvroIO.Write (unset output prefix).");
- String outputPrefixString = null;
- if (getFilenamePrefix().isAccessible()) {
- ResourceId dir = getFilenamePrefix().get();
- outputPrefixString = dir.toString();
- } else {
- outputPrefixString = getFilenamePrefix().toString();
+ resolveDynamicDestinations().populateDisplayData(builder);
+
+ String tempDirectory = null;
+ if (getTempDirectory() != null) {
+ tempDirectory =
+ getTempDirectory().isAccessible()
+ ? getTempDirectory().get().toString()
+ : getTempDirectory().toString();
}
builder
- .add(DisplayData.item("schema", getRecordClass())
- .withLabel("Record Schema"))
- .addIfNotNull(DisplayData.item("filePrefix", outputPrefixString)
- .withLabel("Output File Prefix"))
- .addIfNotNull(DisplayData.item("shardNameTemplate", getShardTemplate())
- .withLabel("Output Shard Name Template"))
- .addIfNotNull(DisplayData.item("fileSuffix", getFilenameSuffix())
- .withLabel("Output File Suffix"))
- .addIfNotDefault(DisplayData.item("numShards", getNumShards())
- .withLabel("Maximum Output Shards"),
- 0)
- .addIfNotDefault(DisplayData.item("codec", getCodec().toString())
- .withLabel("Avro Compression Codec"),
- DEFAULT_CODEC.toString());
+ .add(DisplayData.item("schema", getRecordClass()).withLabel("Record Schema"))
+ .addIfNotDefault(
+ DisplayData.item("numShards", getNumShards()).withLabel("Maximum Output Shards"), 0)
+ .addIfNotDefault(
+ DisplayData.item("codec", getCodec().toString()).withLabel("Avro Compression Codec"),
+ DEFAULT_CODEC.toString())
+ .addIfNotNull(
+ DisplayData.item("tempDirectory", tempDirectory)
+ .withLabel("Directory for temporary files"));
builder.include("Metadata", new Metadata());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
index 6c36266..c78870b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
@@ -32,39 +32,40 @@ import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.util.MimeTypes;
/** A {@link FileBasedSink} for Avro files. */
-class AvroSink<T> extends FileBasedSink<T> {
+class AvroSink<T, DestinationT> extends FileBasedSink<T, DestinationT> {
private final AvroCoder<T> coder;
private final SerializableAvroCodecFactory codec;
private final ImmutableMap<String, Object> metadata;
AvroSink(
ValueProvider<ResourceId> outputPrefix,
- FilenamePolicy filenamePolicy,
+ DynamicDestinations<T, DestinationT> dynamicDestinations,
AvroCoder<T> coder,
SerializableAvroCodecFactory codec,
ImmutableMap<String, Object> metadata) {
// Avro handle compression internally using the codec.
- super(outputPrefix, filenamePolicy, CompressionType.UNCOMPRESSED);
+ super(outputPrefix, dynamicDestinations, CompressionType.UNCOMPRESSED);
this.coder = coder;
this.codec = codec;
this.metadata = metadata;
}
@Override
- public WriteOperation<T> createWriteOperation() {
+ public WriteOperation<T, DestinationT> createWriteOperation() {
return new AvroWriteOperation<>(this, coder, codec, metadata);
}
/** A {@link WriteOperation WriteOperation} for Avro files. */
- private static class AvroWriteOperation<T> extends WriteOperation<T> {
+ private static class AvroWriteOperation<T, DestinationT> extends WriteOperation<T, DestinationT> {
private final AvroCoder<T> coder;
private final SerializableAvroCodecFactory codec;
private final ImmutableMap<String, Object> metadata;
- private AvroWriteOperation(AvroSink<T> sink,
- AvroCoder<T> coder,
- SerializableAvroCodecFactory codec,
- ImmutableMap<String, Object> metadata) {
+ private AvroWriteOperation(
+ AvroSink<T, DestinationT> sink,
+ AvroCoder<T> coder,
+ SerializableAvroCodecFactory codec,
+ ImmutableMap<String, Object> metadata) {
super(sink);
this.coder = coder;
this.codec = codec;
@@ -72,22 +73,23 @@ class AvroSink<T> extends FileBasedSink<T> {
}
@Override
- public Writer<T> createWriter() throws Exception {
+ public Writer<T, DestinationT> createWriter() throws Exception {
return new AvroWriter<>(this, coder, codec, metadata);
}
}
/** A {@link Writer Writer} for Avro files. */
- private static class AvroWriter<T> extends Writer<T> {
+ private static class AvroWriter<T, DestinationT> extends Writer<T, DestinationT> {
private final AvroCoder<T> coder;
private DataFileWriter<T> dataFileWriter;
private SerializableAvroCodecFactory codec;
private final ImmutableMap<String, Object> metadata;
- public AvroWriter(WriteOperation<T> writeOperation,
- AvroCoder<T> coder,
- SerializableAvroCodecFactory codec,
- ImmutableMap<String, Object> metadata) {
+ public AvroWriter(
+ WriteOperation<T, DestinationT> writeOperation,
+ AvroCoder<T> coder,
+ SerializableAvroCodecFactory codec,
+ ImmutableMap<String, Object> metadata) {
super(writeOperation, MimeTypes.BINARY);
this.coder = coder;
this.codec = codec;
http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
index f9e4ac4..7a60e49 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
@@ -20,25 +20,31 @@ package org.apache.beam.sdk.io;
import static com.google.common.base.MoreObjects.firstNonNull;
import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
import java.text.DecimalFormat;
import java.util.Arrays;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
-import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* A default {@link FilenamePolicy} for windowed and unwindowed files. This policy is constructed
@@ -51,10 +57,7 @@ import org.slf4j.LoggerFactory;
* {@code WriteOneFilePerWindow} example pipeline.
*/
public final class DefaultFilenamePolicy extends FilenamePolicy {
-
- private static final Logger LOG = LoggerFactory.getLogger(DefaultFilenamePolicy.class);
-
- /** The default sharding name template used in {@link #constructUsingStandardParameters}. */
+ /** The default sharding name template. */
public static final String DEFAULT_UNWINDOWED_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
/** The default windowed sharding name template used when writing windowed files.
@@ -67,75 +70,184 @@ public final class DefaultFilenamePolicy extends FilenamePolicy {
"W-P" + DEFAULT_UNWINDOWED_SHARD_TEMPLATE;
/*
- * pattern for both windowed and non-windowed file names
+ * pattern for both windowed and non-windowed file names.
*/
private static final Pattern SHARD_FORMAT_RE = Pattern.compile("(S+|N+|W|P)");
/**
+ * Encapsulates constructor parameters to {@link DefaultFilenamePolicy}.
+ *
+ * <p>This is used as the {@code DestinationT} argument to allow {@link DefaultFilenamePolicy}
+ * objects to be dynamically generated.
+ */
+ public static class Params implements Serializable {
+ private final ValueProvider<ResourceId> baseFilename;
+ private final String shardTemplate;
+ private final boolean explicitTemplate;
+ private final String suffix;
+
+ /**
+ * Construct a default Params object. The shard template will be set to the default {@link
+ * #DEFAULT_UNWINDOWED_SHARD_TEMPLATE} value.
+ */
+ public Params() {
+ this.baseFilename = null;
+ this.shardTemplate = DEFAULT_UNWINDOWED_SHARD_TEMPLATE;
+ this.suffix = "";
+ this.explicitTemplate = false;
+ }
+
+ private Params(
+ ValueProvider<ResourceId> baseFilename,
+ String shardTemplate,
+ String suffix,
+ boolean explicitTemplate) {
+ this.baseFilename = baseFilename;
+ this.shardTemplate = shardTemplate;
+ this.suffix = suffix;
+ this.explicitTemplate = explicitTemplate;
+ }
+
+ /**
+ * Specify that writes are windowed. This affects the default shard template, changing it to
+ * {@link #DEFAULT_WINDOWED_SHARD_TEMPLATE}.
+ */
+ public Params withWindowedWrites() {
+ String template = this.shardTemplate;
+ if (!explicitTemplate) {
+ template = DEFAULT_WINDOWED_SHARD_TEMPLATE;
+ }
+ return new Params(baseFilename, template, suffix, explicitTemplate);
+ }
+
+ /** Sets the base filename. */
+ public Params withBaseFilename(ResourceId baseFilename) {
+ return withBaseFilename(StaticValueProvider.of(baseFilename));
+ }
+
+ /** Like {@link #withBaseFilename(ResourceId)}, but takes in a {@link ValueProvider}. */
+ public Params withBaseFilename(ValueProvider<ResourceId> baseFilename) {
+ return new Params(baseFilename, shardTemplate, suffix, explicitTemplate);
+ }
+
+ /** Sets the shard template. */
+ public Params withShardTemplate(String shardTemplate) {
+ return new Params(baseFilename, shardTemplate, suffix, true);
+ }
+
+ /** Sets the suffix. */
+ public Params withSuffix(String suffix) {
+ return new Params(baseFilename, shardTemplate, suffix, explicitTemplate);
+ }
+ }
+
+ /** A Coder for {@link Params}. */
+ public static class ParamsCoder extends AtomicCoder<Params> {
+ private static final ParamsCoder INSTANCE = new ParamsCoder();
+ private Coder<String> stringCoder = StringUtf8Coder.of();
+
+ public static ParamsCoder of() {
+ return INSTANCE;
+ }
+
+ @Override
+ public void encode(Params value, OutputStream outStream) throws IOException {
+ if (value == null) {
+ throw new CoderException("cannot encode a null value");
+ }
+ stringCoder.encode(value.baseFilename.get().toString(), outStream);
+ stringCoder.encode(value.shardTemplate, outStream);
+ stringCoder.encode(value.suffix, outStream);
+ }
+
+ @Override
+ public Params decode(InputStream inStream) throws IOException {
+ ResourceId prefix =
+ FileBasedSink.convertToFileResourceIfPossible(stringCoder.decode(inStream));
+ String shardTemplate = stringCoder.decode(inStream);
+ String suffix = stringCoder.decode(inStream);
+ return new Params()
+ .withBaseFilename(prefix)
+ .withShardTemplate(shardTemplate)
+ .withSuffix(suffix);
+ }
+ }
+
+ private final Params params;
+ /**
* Constructs a new {@link DefaultFilenamePolicy}.
*
* @see DefaultFilenamePolicy for more information on the arguments to this function.
*/
@VisibleForTesting
- DefaultFilenamePolicy(ValueProvider<String> prefix, String shardTemplate, String suffix) {
- this.prefix = prefix;
- this.shardTemplate = shardTemplate;
- this.suffix = suffix;
+ DefaultFilenamePolicy(Params params) {
+ this.params = params;
}
/**
- * A helper function to construct a {@link DefaultFilenamePolicy} using the standard filename
- * parameters, namely a provided {@link ResourceId} for the output prefix, and possibly-null
- * shard name template and suffix.
+ * Construct a {@link DefaultFilenamePolicy}.
*
- * <p>Any filename component of the provided resource will be used as the filename prefix.
+ * <p>This is a shortcut for:
*
- * <p>If provided, the shard name template will be used; otherwise
- * {@link #DEFAULT_UNWINDOWED_SHARD_TEMPLATE} will be used for non-windowed file names and
- * {@link #DEFAULT_WINDOWED_SHARD_TEMPLATE} will be used for windowed file names.
+ * <pre>{@code
+ * DefaultFilenamePolicy.fromParams(new Params()
+ * .withBaseFilename(baseFilename)
+ * .withShardTemplate(shardTemplate)
+ * .withSuffix(filenameSuffix)
+ * .withWindowedWrites())
+ * }</pre>
*
- * <p>If provided, the suffix will be used; otherwise the files will have an empty suffix.
+ * <p>Where the respective {@code with} methods are invoked only if the value is non-null or true.
*/
- public static DefaultFilenamePolicy constructUsingStandardParameters(
- ValueProvider<ResourceId> outputPrefix,
+ public static DefaultFilenamePolicy fromStandardParameters(
+ ValueProvider<ResourceId> baseFilename,
@Nullable String shardTemplate,
@Nullable String filenameSuffix,
boolean windowedWrites) {
- // Pick the appropriate default policy based on whether windowed writes are being performed.
- String defaultTemplate =
- windowedWrites ? DEFAULT_WINDOWED_SHARD_TEMPLATE : DEFAULT_UNWINDOWED_SHARD_TEMPLATE;
- return new DefaultFilenamePolicy(
- NestedValueProvider.of(outputPrefix, new ExtractFilename()),
- firstNonNull(shardTemplate, defaultTemplate),
- firstNonNull(filenameSuffix, ""));
+ Params params = new Params().withBaseFilename(baseFilename);
+ if (shardTemplate != null) {
+ params = params.withShardTemplate(shardTemplate);
+ }
+ if (filenameSuffix != null) {
+ params = params.withSuffix(filenameSuffix);
+ }
+ if (windowedWrites) {
+ params = params.withWindowedWrites();
+ }
+ return fromParams(params);
}
- private final ValueProvider<String> prefix;
- private final String shardTemplate;
- private final String suffix;
+ /** Construct a {@link DefaultFilenamePolicy} from a {@link Params} object. */
+ public static DefaultFilenamePolicy fromParams(Params params) {
+ return new DefaultFilenamePolicy(params);
+ }
/**
* Constructs a fully qualified name from components.
*
- * <p>The name is built from a prefix, shard template (with shard numbers
- * applied), and a suffix. All components are required, but may be empty
- * strings.
+ * <p>The name is built from a base filename, shard template (with shard numbers applied), and a
+ * suffix. All components are required, but may be empty strings.
*
- * <p>Within a shard template, repeating sequences of the letters "S" or "N"
- * are replaced with the shard number, or number of shards respectively.
- * "P" is replaced with by stringification of current pane.
- * "W" is replaced by stringification of current window.
+ * <p>Within a shard template, repeating sequences of the letters "S" or "N" are replaced with the
+ * shard number, or number of shards respectively. "P" is replaced with by stringification of
+ * current pane. "W" is replaced by stringification of current window.
*
- * <p>The numbers are formatted with leading zeros to match the length of the
- * repeated sequence of letters.
+ * <p>The numbers are formatted with leading zeros to match the length of the repeated sequence of
+ * letters.
*
- * <p>For example, if prefix = "output", shardTemplate = "-SSS-of-NNN", and
- * suffix = ".txt", with shardNum = 1 and numShards = 100, the following is
- * produced: "output-001-of-100.txt".
+ * <p>For example, if baseFilename = "path/to/output", shardTemplate = "-SSS-of-NNN", and suffix =
+ * ".txt", with shardNum = 1 and numShards = 100, the following is produced:
+ * "path/to/output-001-of-100.txt".
*/
- static String constructName(
- String prefix, String shardTemplate, String suffix, int shardNum, int numShards,
- String paneStr, String windowStr) {
+ static ResourceId constructName(
+ ResourceId baseFilename,
+ String shardTemplate,
+ String suffix,
+ int shardNum,
+ int numShards,
+ String paneStr,
+ String windowStr) {
+ String prefix = extractFilename(baseFilename);
// Matcher API works with StringBuffer, rather than StringBuilder.
StringBuffer sb = new StringBuffer();
sb.append(prefix);
@@ -165,27 +277,37 @@ public final class DefaultFilenamePolicy extends FilenamePolicy {
m.appendTail(sb);
sb.append(suffix);
- return sb.toString();
+ return baseFilename
+ .getCurrentDirectory()
+ .resolve(sb.toString(), StandardResolveOptions.RESOLVE_FILE);
}
@Override
@Nullable
- public ResourceId unwindowedFilename(ResourceId outputDirectory, Context context,
- String extension) {
- String filename = constructName(prefix.get(), shardTemplate, suffix, context.getShardNumber(),
- context.getNumShards(), null, null) + extension;
- return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
+ public ResourceId unwindowedFilename(Context context, OutputFileHints outputFileHints) {
+ return constructName(
+ params.baseFilename.get(),
+ params.shardTemplate,
+ params.suffix + outputFileHints.getSuggestedFilenameSuffix(),
+ context.getShardNumber(),
+ context.getNumShards(),
+ null,
+ null);
}
@Override
- public ResourceId windowedFilename(ResourceId outputDirectory,
- WindowedContext context, String extension) {
+ public ResourceId windowedFilename(WindowedContext context, OutputFileHints outputFileHints) {
final PaneInfo paneInfo = context.getPaneInfo();
String paneStr = paneInfoToString(paneInfo);
String windowStr = windowToString(context.getWindow());
- String filename = constructName(prefix.get(), shardTemplate, suffix, context.getShardNumber(),
- context.getNumShards(), paneStr, windowStr) + extension;
- return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
+ return constructName(
+ params.baseFilename.get(),
+ params.shardTemplate,
+ params.suffix + outputFileHints.getSuggestedFilenameSuffix(),
+ context.getShardNumber(),
+ context.getNumShards(),
+ paneStr,
+ windowStr);
}
/*
@@ -216,24 +338,32 @@ public final class DefaultFilenamePolicy extends FilenamePolicy {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
String filenamePattern;
- if (prefix.isAccessible()) {
- filenamePattern = String.format("%s%s%s", prefix.get(), shardTemplate, suffix);
+ if (params.baseFilename.isAccessible()) {
+ filenamePattern =
+ String.format("%s%s%s", params.baseFilename.get(), params.shardTemplate, params.suffix);
} else {
- filenamePattern = String.format("%s%s%s", prefix, shardTemplate, suffix);
+ filenamePattern =
+ String.format("%s%s%s", params.baseFilename, params.shardTemplate, params.suffix);
}
+
+ String outputPrefixString = null;
+ outputPrefixString =
+ params.baseFilename.isAccessible()
+ ? params.baseFilename.get().toString()
+ : params.baseFilename.toString();
+ builder.add(DisplayData.item("filenamePattern", filenamePattern).withLabel("Filename Pattern"));
+ builder.add(DisplayData.item("filePrefix", outputPrefixString).withLabel("Output File Prefix"));
+ builder.add(DisplayData.item("fileSuffix", params.suffix).withLabel("Output file Suffix"));
builder.add(
- DisplayData.item("filenamePattern", filenamePattern)
- .withLabel("Filename Pattern"));
+ DisplayData.item("shardNameTemplate", params.shardTemplate)
+ .withLabel("Output Shard Name Template"));
}
- private static class ExtractFilename implements SerializableFunction<ResourceId, String> {
- @Override
- public String apply(ResourceId input) {
- if (input.isDirectory()) {
- return "";
- } else {
- return firstNonNull(input.getFilename(), "");
- }
+ private static String extractFilename(ResourceId input) {
+ if (input.isDirectory()) {
+ return "";
+ } else {
+ return firstNonNull(input.getFilename(), "");
}
}
}