You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/07/29 00:49:42 UTC
[3/4] beam git commit: [BEAM-92] Supports DynamicDestinations in
AvroIO.
[BEAM-92] Supports DynamicDestinations in AvroIO.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9f2622fa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9f2622fa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9f2622fa
Branch: refs/heads/master
Commit: 9f2622fa19da1284222e872fdcd63b086bdc3509
Parents: 1f2634d
Author: Reuven Lax <re...@google.com>
Authored: Thu Jul 6 20:22:25 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Jul 28 17:28:12 2017 -0700
----------------------------------------------------------------------
.../core/construction/ParDoTranslation.java | 2 +-
.../construction/WriteFilesTranslation.java | 81 ++--
.../construction/PTransformMatchersTest.java | 10 +-
.../construction/WriteFilesTranslationTest.java | 26 +-
.../direct/WriteWithShardingFactory.java | 10 +-
.../direct/WriteWithShardingFactoryTest.java | 8 +-
.../beam/runners/dataflow/DataflowRunner.java | 8 +-
.../runners/dataflow/DataflowRunnerTest.java | 10 +-
.../src/main/proto/beam_runner_api.proto | 2 +
.../java/org/apache/beam/sdk/io/AvroIO.java | 436 +++++++++++++++----
.../java/org/apache/beam/sdk/io/AvroSink.java | 93 ++--
.../beam/sdk/io/ConstantAvroDestination.java | 130 ++++++
.../beam/sdk/io/DefaultFilenamePolicy.java | 1 -
.../beam/sdk/io/DynamicAvroDestinations.java | 46 ++
.../beam/sdk/io/DynamicFileDestinations.java | 59 ++-
.../org/apache/beam/sdk/io/FileBasedSink.java | 121 +++--
.../java/org/apache/beam/sdk/io/TFRecordIO.java | 23 +-
.../java/org/apache/beam/sdk/io/TextIO.java | 228 ++++++----
.../java/org/apache/beam/sdk/io/TextSink.java | 14 +-
.../java/org/apache/beam/sdk/io/WriteFiles.java | 116 ++---
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 156 ++++++-
.../apache/beam/sdk/io/FileBasedSinkTest.java | 6 +-
.../java/org/apache/beam/sdk/io/SimpleSink.java | 10 +-
.../org/apache/beam/sdk/io/TextIOWriteTest.java | 23 +-
.../org/apache/beam/sdk/io/WriteFilesTest.java | 74 ++--
.../java/org/apache/beam/sdk/io/xml/XmlIO.java | 4 +-
.../org/apache/beam/sdk/io/xml/XmlSink.java | 8 +-
27 files changed, 1214 insertions(+), 491 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index d7b0e9f..5765c51 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -484,7 +484,7 @@ public class ParDoTranslation {
});
}
- private static SideInput toProto(PCollectionView<?> view) {
+ public static SideInput toProto(PCollectionView<?> view) {
Builder builder = SideInput.newBuilder();
builder.setAccessPattern(
FunctionSpec.newBuilder()
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
index b1d2da4..7954b0e 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
@@ -19,29 +19,35 @@
package org.apache.beam.runners.core.construction;
import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
import com.google.auto.service.AutoService;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.WriteFilesPayload;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TupleTag;
/**
* Utility methods for translating a {@link WriteFiles} to and from {@link RunnerApi}
@@ -53,28 +59,25 @@ public class WriteFilesTranslation {
public static final String CUSTOM_JAVA_FILE_BASED_SINK_URN =
"urn:beam:file_based_sink:javasdk:0.1";
- public static final String CUSTOM_JAVA_FILE_BASED_SINK_FORMAT_FUNCTION_URN =
- "urn:beam:file_based_sink_format_function:javasdk:0.1";
-
@VisibleForTesting
static WriteFilesPayload toProto(WriteFiles<?, ?, ?> transform) {
+ Map<String, SideInput> sideInputs = Maps.newHashMap();
+ for (PCollectionView<?> view : transform.getSink().getDynamicDestinations().getSideInputs()) {
+ sideInputs.put(view.getTagInternal().getId(), ParDoTranslation.toProto(view));
+ }
return WriteFilesPayload.newBuilder()
.setSink(toProto(transform.getSink()))
- .setFormatFunction(toProto(transform.getFormatFunction()))
.setWindowedWrites(transform.isWindowedWrites())
.setRunnerDeterminedSharding(
transform.getNumShards() == null && transform.getSharding() == null)
+ .putAllSideInputs(sideInputs)
.build();
}
- private static SdkFunctionSpec toProto(FileBasedSink<?, ?> sink) {
+ private static SdkFunctionSpec toProto(FileBasedSink<?, ?, ?> sink) {
return toProto(CUSTOM_JAVA_FILE_BASED_SINK_URN, sink);
}
- private static SdkFunctionSpec toProto(SerializableFunction<?, ?> serializableFunction) {
- return toProto(CUSTOM_JAVA_FILE_BASED_SINK_FORMAT_FUNCTION_URN, serializableFunction);
- }
-
private static SdkFunctionSpec toProto(String urn, Serializable serializable) {
return SdkFunctionSpec.newBuilder()
.setSpec(
@@ -91,7 +94,7 @@ public class WriteFilesTranslation {
}
@VisibleForTesting
- static FileBasedSink<?, ?> sinkFromProto(SdkFunctionSpec sinkProto) throws IOException {
+ static FileBasedSink<?, ?, ?> sinkFromProto(SdkFunctionSpec sinkProto) throws IOException {
checkArgument(
sinkProto.getSpec().getUrn().equals(CUSTOM_JAVA_FILE_BASED_SINK_URN),
"Cannot extract %s instance from %s with URN %s",
@@ -102,44 +105,44 @@ public class WriteFilesTranslation {
byte[] serializedSink =
sinkProto.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray();
- return (FileBasedSink<?, ?>)
+ return (FileBasedSink<?, ?, ?>)
SerializableUtils.deserializeFromByteArray(
serializedSink, FileBasedSink.class.getSimpleName());
}
- @VisibleForTesting
- static <InputT, OutputT> SerializableFunction<InputT, OutputT> formatFunctionFromProto(
- SdkFunctionSpec sinkProto) throws IOException {
- checkArgument(
- sinkProto.getSpec().getUrn().equals(CUSTOM_JAVA_FILE_BASED_SINK_FORMAT_FUNCTION_URN),
- "Cannot extract %s instance from %s with URN %s",
- SerializableFunction.class.getSimpleName(),
- FunctionSpec.class.getSimpleName(),
- sinkProto.getSpec().getUrn());
-
- byte[] serializedFunction =
- sinkProto.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray();
-
- return (SerializableFunction<InputT, OutputT>)
- SerializableUtils.deserializeFromByteArray(
- serializedFunction, FileBasedSink.class.getSimpleName());
- }
-
- public static <UserT, DestinationT, OutputT> FileBasedSink<OutputT, DestinationT> getSink(
+ public static <UserT, DestinationT, OutputT> FileBasedSink<UserT, DestinationT, OutputT> getSink(
AppliedPTransform<PCollection<UserT>, PDone, ? extends PTransform<PCollection<UserT>, PDone>>
transform)
throws IOException {
- return (FileBasedSink<OutputT, DestinationT>)
+ return (FileBasedSink<UserT, DestinationT, OutputT>)
sinkFromProto(getWriteFilesPayload(transform).getSink());
}
- public static <InputT, OutputT> SerializableFunction<InputT, OutputT> getFormatFunction(
- AppliedPTransform<
- PCollection<InputT>, PDone, ? extends PTransform<PCollection<InputT>, PDone>>
- transform)
- throws IOException {
- return formatFunctionFromProto(
- getWriteFilesPayload(transform).<InputT, OutputT>getFormatFunction());
+ public static <UserT, DestinationT, OutputT>
+ List<PCollectionView<?>> getDynamicDestinationSideInputs(
+ AppliedPTransform<
+ PCollection<UserT>, PDone, ? extends PTransform<PCollection<UserT>, PDone>>
+ transform)
+ throws IOException {
+ SdkComponents sdkComponents = SdkComponents.create();
+ RunnerApi.PTransform transformProto = PTransformTranslation.toProto(transform, sdkComponents);
+ List<PCollectionView<?>> views = Lists.newArrayList();
+ Map<String, SideInput> sideInputs = getWriteFilesPayload(transform).getSideInputsMap();
+ for (Map.Entry<String, SideInput> entry : sideInputs.entrySet()) {
+ PCollection<?> originalPCollection =
+ checkNotNull(
+ (PCollection<?>) transform.getInputs().get(new TupleTag<>(entry.getKey())),
+ "no input with tag %s",
+ entry.getKey());
+ views.add(
+ ParDoTranslation.viewFromProto(
+ entry.getValue(),
+ entry.getKey(),
+ originalPCollection,
+ transformProto,
+ RehydratedComponents.forComponents(sdkComponents.toComponents())));
+ }
+ return views;
}
public static <T> boolean isWindowedWrites(
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index 316645b..1862699 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -57,7 +57,6 @@ import org.apache.beam.sdk.transforms.Materialization;
import org.apache.beam.sdk.transforms.Materializations;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
@@ -549,15 +548,14 @@ public class PTransformMatchersTest implements Serializable {
false);
WriteFiles<Integer, Void, Integer> write =
WriteFiles.to(
- new FileBasedSink<Integer, Void>(
+ new FileBasedSink<Integer, Void, Integer>(
StaticValueProvider.of(outputDirectory),
- DynamicFileDestinations.constant(new FakeFilenamePolicy())) {
+ DynamicFileDestinations.<Integer>constant(new FakeFilenamePolicy())) {
@Override
- public WriteOperation<Integer, Void> createWriteOperation() {
+ public WriteOperation<Void, Integer> createWriteOperation() {
return null;
}
- },
- SerializableFunctions.<Integer>identity());
+ });
assertThat(
PTransformMatchers.writeWithRunnerDeterminedSharding().matches(appliedWrite(write)),
is(true));
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
index 4259ac8..e067fac 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
@@ -38,7 +38,6 @@ import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -63,12 +62,11 @@ public class WriteFilesTranslationTest {
public static class TestWriteFilesPayloadTranslation {
@Parameters(name = "{index}: {0}")
public static Iterable<WriteFiles<Object, Void, Object>> data() {
- SerializableFunction<Object, Object> format = SerializableFunctions.constant(null);
return ImmutableList.of(
- WriteFiles.to(new DummySink(), format),
- WriteFiles.to(new DummySink(), format).withWindowedWrites(),
- WriteFiles.to(new DummySink(), format).withNumShards(17),
- WriteFiles.to(new DummySink(), format).withWindowedWrites().withNumShards(42));
+ WriteFiles.to(new DummySink()),
+ WriteFiles.to(new DummySink()).withWindowedWrites(),
+ WriteFiles.to(new DummySink()).withNumShards(17),
+ WriteFiles.to(new DummySink()).withWindowedWrites().withNumShards(42));
}
@Parameter(0)
@@ -87,7 +85,8 @@ public class WriteFilesTranslationTest {
assertThat(payload.getWindowedWrites(), equalTo(writeFiles.isWindowedWrites()));
assertThat(
- (FileBasedSink<String, Void>) WriteFilesTranslation.sinkFromProto(payload.getSink()),
+ (FileBasedSink<String, Void, String>)
+ WriteFilesTranslation.sinkFromProto(payload.getSink()),
equalTo(writeFiles.getSink()));
}
@@ -118,16 +117,17 @@ public class WriteFilesTranslationTest {
* A simple {@link FileBasedSink} for testing serialization/deserialization. Not mocked to avoid
* any issues serializing mocks.
*/
- private static class DummySink extends FileBasedSink<Object, Void> {
+ private static class DummySink extends FileBasedSink<Object, Void, Object> {
DummySink() {
super(
StaticValueProvider.of(FileSystems.matchNewResource("nowhere", false)),
- DynamicFileDestinations.constant(new DummyFilenamePolicy()));
+ DynamicFileDestinations.constant(
+ new DummyFilenamePolicy(), SerializableFunctions.constant(null)));
}
@Override
- public WriteOperation<Object, Void> createWriteOperation() {
+ public WriteOperation<Void, Object> createWriteOperation() {
return new DummyWriteOperation(this);
}
@@ -152,13 +152,13 @@ public class WriteFilesTranslationTest {
}
}
- private static class DummyWriteOperation extends FileBasedSink.WriteOperation<Object, Void> {
- public DummyWriteOperation(FileBasedSink<Object, Void> sink) {
+ private static class DummyWriteOperation extends FileBasedSink.WriteOperation<Void, Object> {
+ public DummyWriteOperation(FileBasedSink<Object, Void, Object> sink) {
super(sink);
}
@Override
- public FileBasedSink.Writer<Object, Void> createWriter() throws Exception {
+ public FileBasedSink.Writer<Void, Object> createWriter() throws Exception {
throw new UnsupportedOperationException("Should never be called.");
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
index ba796ae..3557c5d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
@@ -24,10 +24,12 @@ import com.google.common.base.Suppliers;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.runners.core.construction.PTransformReplacements;
import org.apache.beam.runners.core.construction.WriteFilesTranslation;
+import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
@@ -61,10 +63,10 @@ class WriteWithShardingFactory<InputT>
AppliedPTransform<PCollection<InputT>, PDone, PTransform<PCollection<InputT>, PDone>>
transform) {
try {
- WriteFiles<InputT, ?, ?> replacement =
- WriteFiles.to(
- WriteFilesTranslation.getSink(transform),
- WriteFilesTranslation.getFormatFunction(transform));
+ List<PCollectionView<?>> sideInputs =
+ WriteFilesTranslation.getDynamicDestinationSideInputs(transform);
+ FileBasedSink sink = WriteFilesTranslation.getSink(transform);
+ WriteFiles<InputT, ?, ?> replacement = WriteFiles.to(sink).withSideInputs(sideInputs);
if (WriteFilesTranslation.isWindowedWrites(transform)) {
replacement = replacement.withWindowedWrites();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index 6dd069c..d0db44e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -55,7 +55,6 @@ import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -143,15 +142,14 @@ public class WriteWithShardingFactoryTest implements Serializable {
PTransform<PCollection<Object>, PDone> original =
WriteFiles.to(
- new FileBasedSink<Object, Void>(
+ new FileBasedSink<Object, Void, Object>(
StaticValueProvider.of(outputDirectory),
DynamicFileDestinations.constant(new FakeFilenamePolicy())) {
@Override
- public WriteOperation<Object, Void> createWriteOperation() {
+ public WriteOperation<Void, Object> createWriteOperation() {
throw new IllegalArgumentException("Should not be used");
}
- },
- SerializableFunctions.identity());
+ });
@SuppressWarnings("unchecked")
PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 762ac9f..f8d2c3c 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -92,6 +92,7 @@ import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.extensions.gcp.storage.PathValidator;
import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
@@ -1501,10 +1502,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
try {
+ List<PCollectionView<?>> sideInputs =
+ WriteFilesTranslation.getDynamicDestinationSideInputs(transform);
+ FileBasedSink sink = WriteFilesTranslation.getSink(transform);
WriteFiles<UserT, DestinationT, OutputT> replacement =
- WriteFiles.<UserT, DestinationT, OutputT>to(
- WriteFilesTranslation.<UserT, DestinationT, OutputT>getSink(transform),
- WriteFilesTranslation.<UserT, OutputT>getFormatFunction(transform));
+ WriteFiles.to(sink).withSideInputs(sideInputs);
if (WriteFilesTranslation.isWindowedWrites(transform)) {
replacement = replacement.withWindowedWrites();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 7556a28..9db73c6 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -1271,8 +1271,7 @@ public class DataflowRunnerTest implements Serializable {
StreamingShardedWriteFactory<Object, Void, Object> factory =
new StreamingShardedWriteFactory<>(p.getOptions());
- WriteFiles<Object, Void, Object> original =
- WriteFiles.to(new TestSink(tmpFolder.toString()), SerializableFunctions.identity());
+ WriteFiles<Object, Void, Object> original = WriteFiles.to(new TestSink(tmpFolder.toString()));
PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));
AppliedPTransform<PCollection<Object>, PDone, WriteFiles<Object, Void, Object>>
originalApplication =
@@ -1290,7 +1289,7 @@ public class DataflowRunnerTest implements Serializable {
assertThat(replacement.getNumShards().get(), equalTo(expectedNumShards));
}
- private static class TestSink extends FileBasedSink<Object, Void> {
+ private static class TestSink extends FileBasedSink<Object, Void, Object> {
@Override
public void validate(PipelineOptions options) {}
@@ -1315,11 +1314,12 @@ public class DataflowRunnerTest implements Serializable {
int shardNumber, int numShards, OutputFileHints outputFileHints) {
throw new UnsupportedOperationException("should not be called");
}
- }));
+ },
+ SerializableFunctions.identity()));
}
@Override
- public WriteOperation<Object, Void> createWriteOperation() {
+ public WriteOperation<Void, Object> createWriteOperation() {
throw new IllegalArgumentException("Should not be used");
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
index 42e2601..9afb565 100644
--- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
@@ -375,6 +375,8 @@ message WriteFilesPayload {
bool windowed_writes = 3;
bool runner_determined_sharding = 4;
+
+ map<string, SideInput> side_inputs = 5;
}
// A coder, the binary format for serialization and deserialization of data in
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 27c9073..824f725 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -25,7 +25,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
-import com.google.common.io.BaseEncoding;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.avro.Schema;
@@ -40,7 +39,6 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.ValueProvider;
@@ -51,7 +49,6 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
@@ -161,6 +158,51 @@ import org.apache.beam.sdk.values.TypeDescriptors;
* .withSuffix(".avro"));
* }</pre>
*
+ * <p>The following shows a more-complex example of AvroIO.Write usage, generating dynamic file
+ * destinations as well as a dynamic Avro schema per file. In this example, a PCollection of user
+ * events (e.g. actions on a website) is written out to Avro files. Each event contains the user id
+ * as an integer field. We want events for each user to go into a specific directory for that user,
+ * and each user's data should be written with a specific schema for that user; a side input is
+ * used, so the schema can be calculated in a different stage.
+ *
+ * <pre>{@code
+ * // This is the user class that controls dynamic destinations for this avro write. The input to
+ * // AvroIO.Write will be UserEvent, and we will be writing GenericRecords to the file (in order
+ * // to have dynamic schemas). Everything is per userid, so we define a dynamic destination type
+ * // of Integer.
+ * class UserDynamicAvroDestinations
+ * extends DynamicAvroDestinations<UserEvent, Integer, GenericRecord> {
+ * private final PCollectionView<Map<Integer, String>> userToSchemaMap;
+ * public UserDynamicAvroDestinations( PCollectionView<Map<Integer, String>> userToSchemaMap) {
+ * this.userToSchemaMap = userToSchemaMap;
+ * }
+ * public GenericRecord formatRecord(UserEvent record) {
+ * return formatUserRecord(record, getSchema(record.getUserId()));
+ * }
+ * public Schema getSchema(Integer userId) {
+ * return new Schema.Parser().parse(sideInput(userToSchemaMap).get(userId));
+ * }
+ * public Integer getDestination(UserEvent record) {
+ * return record.getUserId();
+ * }
+ * public Integer getDefaultDestination() {
+ * return 0;
+ * }
+ * public FilenamePolicy getFilenamePolicy(Integer userId) {
+ * return DefaultFilenamePolicy.fromParams(new Params().withBaseFilename(baseDir + "/user-"
+ * + userId + "/events"));
+ * }
+ * public List<PCollectionView<?>> getSideInputs() {
+ * return ImmutableList.<PCollectionView<?>>of(userToSchemaMap);
+ * }
+ * }
+ * PCollection<UserEvents> events = ...;
+ * PCollectionView<Integer, String> schemaMap = events.apply(
+ * "ComputeSchemas", new ComputePerUserSchemas());
+ * events.apply("WriteAvros", AvroIO.<Integer>writeCustomTypeToGenericRecords()
+ * .to(new UserDynamicAvros()));
+ * }</pre>
+ *
* <p>By default, {@link AvroIO.Write} produces output files that are compressed using the {@link
* org.apache.avro.file.Codec CodecFactory.deflateCodec(6)}. This default can be changed or
* overridden using {@link AvroIO.Write#withCodec}.
@@ -256,18 +298,53 @@ public class AvroIO {
* pattern).
*/
public static <T> Write<T> write(Class<T> recordClass) {
- return AvroIO.<T>defaultWriteBuilder()
- .setRecordClass(recordClass)
- .setSchema(ReflectData.get().getSchema(recordClass))
- .build();
+ return new Write<>(
+ AvroIO.<T, T>defaultWriteBuilder()
+ .setGenericRecords(false)
+ .setSchema(ReflectData.get().getSchema(recordClass))
+ .build());
}
/** Writes Avro records of the specified schema. */
public static Write<GenericRecord> writeGenericRecords(Schema schema) {
- return AvroIO.<GenericRecord>defaultWriteBuilder()
- .setRecordClass(GenericRecord.class)
- .setSchema(schema)
- .build();
+ return new Write<>(
+ AvroIO.<GenericRecord, GenericRecord>defaultWriteBuilder()
+ .setGenericRecords(true)
+ .setSchema(schema)
+ .build());
+ }
+
+ /**
+ * A {@link PTransform} that writes a {@link PCollection} to an avro file (or multiple avro files
+ * matching a sharding pattern), with each element of the input collection encoded into its own
+ * record of type OutputT.
+ *
+ * <p>This version allows you to apply {@link AvroIO} writes to a PCollection of a custom type
+ * {@link UserT}. A format mechanism that converts the input type {@link UserT} to the output type
+ * that will be written to the file must be specified. If using a custom {@link
+ * DynamicAvroDestinations} object this is done using {@link
+ * DynamicAvroDestinations#formatRecord}, otherwise the {@link
+ * AvroIO.TypedWrite#withFormatFunction} can be used to specify a format function.
+ *
+ * <p>The advantage of using a custom type is that is it allows a user-provided {@link
+ * DynamicAvroDestinations} object, set via {@link AvroIO.Write#to(DynamicAvroDestinations)} to
+ * examine the custom type when choosing a destination.
+ *
+ * <p>If the output type is {@link GenericRecord} use {@link #writeCustomTypeToGenericRecords()}
+ * instead.
+ */
+ public static <UserT, OutputT> TypedWrite<UserT, OutputT> writeCustomType() {
+ return AvroIO.<UserT, OutputT>defaultWriteBuilder().setGenericRecords(false).build();
+ }
+
+ /**
+ * Similar to {@link #writeCustomType()}, but specialized for the case where the output type is
+ * {@link GenericRecord}. A schema must be specified either in {@link
+ * DynamicAvroDestinations#getSchema} or if not using dynamic destinations, by using {@link
+ * TypedWrite#withSchema(Schema)}.
+ */
+ public static <UserT> TypedWrite<UserT, GenericRecord> writeCustomTypeToGenericRecords() {
+ return AvroIO.<UserT, GenericRecord>defaultWriteBuilder().setGenericRecords(true).build();
}
/**
@@ -277,12 +354,12 @@ public class AvroIO {
return writeGenericRecords(new Schema.Parser().parse(schema));
}
- private static <T> Write.Builder<T> defaultWriteBuilder() {
- return new AutoValue_AvroIO_Write.Builder<T>()
+ private static <UserT, OutputT> TypedWrite.Builder<UserT, OutputT> defaultWriteBuilder() {
+ return new AutoValue_AvroIO_TypedWrite.Builder<UserT, OutputT>()
.setFilenameSuffix(null)
.setShardTemplate(null)
.setNumShards(0)
- .setCodec(Write.DEFAULT_CODEC)
+ .setCodec(TypedWrite.DEFAULT_SERIALIZABLE_CODEC)
.setMetadata(ImmutableMap.<String, Object>of())
.setWindowedWrites(false);
}
@@ -572,15 +649,18 @@ public class AvroIO {
}
}
- /////////////////////////////////////////////////////////////////////////////
+ // ///////////////////////////////////////////////////////////////////////////
/** Implementation of {@link #write}. */
@AutoValue
- public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
- private static final SerializableAvroCodecFactory DEFAULT_CODEC =
- new SerializableAvroCodecFactory(CodecFactory.deflateCodec(6));
- // This should be a multiple of 4 to not get a partial encoded byte.
- private static final int METADATA_BYTES_MAX_LENGTH = 40;
+ public abstract static class TypedWrite<UserT, OutputT>
+ extends PTransform<PCollection<UserT>, PDone> {
+ static final CodecFactory DEFAULT_CODEC = CodecFactory.deflateCodec(6);
+ static final SerializableAvroCodecFactory DEFAULT_SERIALIZABLE_CODEC =
+ new SerializableAvroCodecFactory(DEFAULT_CODEC);
+
+ @Nullable
+ abstract SerializableFunction<UserT, OutputT> getFormatFunction();
@Nullable abstract ValueProvider<ResourceId> getFilenamePrefix();
@Nullable abstract String getShardTemplate();
@@ -590,11 +670,16 @@ public class AvroIO {
abstract ValueProvider<ResourceId> getTempDirectory();
abstract int getNumShards();
- @Nullable abstract Class<T> getRecordClass();
+
+ abstract boolean getGenericRecords();
+
@Nullable abstract Schema getSchema();
abstract boolean getWindowedWrites();
@Nullable abstract FilenamePolicy getFilenamePolicy();
+ @Nullable
+ abstract DynamicAvroDestinations<UserT, ?, OutputT> getDynamicDestinations();
+
/**
* The codec used to encode the blocks in the Avro file. String value drawn from those in
* https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html
@@ -603,25 +688,39 @@ public class AvroIO {
/** Avro file metadata. */
abstract ImmutableMap<String, Object> getMetadata();
- abstract Builder<T> toBuilder();
+ abstract Builder<UserT, OutputT> toBuilder();
@AutoValue.Builder
- abstract static class Builder<T> {
- abstract Builder<T> setFilenamePrefix(ValueProvider<ResourceId> filenamePrefix);
- abstract Builder<T> setFilenameSuffix(String filenameSuffix);
+ abstract static class Builder<UserT, OutputT> {
+ abstract Builder<UserT, OutputT> setFormatFunction(
+ SerializableFunction<UserT, OutputT> formatFunction);
- abstract Builder<T> setTempDirectory(ValueProvider<ResourceId> tempDirectory);
+ abstract Builder<UserT, OutputT> setFilenamePrefix(ValueProvider<ResourceId> filenamePrefix);
- abstract Builder<T> setNumShards(int numShards);
- abstract Builder<T> setShardTemplate(String shardTemplate);
- abstract Builder<T> setRecordClass(Class<T> recordClass);
- abstract Builder<T> setSchema(Schema schema);
- abstract Builder<T> setWindowedWrites(boolean windowedWrites);
- abstract Builder<T> setFilenamePolicy(FilenamePolicy filenamePolicy);
- abstract Builder<T> setCodec(SerializableAvroCodecFactory codec);
- abstract Builder<T> setMetadata(ImmutableMap<String, Object> metadata);
+ abstract Builder<UserT, OutputT> setFilenameSuffix(String filenameSuffix);
+
+ abstract Builder<UserT, OutputT> setTempDirectory(ValueProvider<ResourceId> tempDirectory);
+
+ abstract Builder<UserT, OutputT> setNumShards(int numShards);
+
+ abstract Builder<UserT, OutputT> setShardTemplate(String shardTemplate);
+
+ abstract Builder<UserT, OutputT> setGenericRecords(boolean genericRecords);
- abstract Write<T> build();
+ abstract Builder<UserT, OutputT> setSchema(Schema schema);
+
+ abstract Builder<UserT, OutputT> setWindowedWrites(boolean windowedWrites);
+
+ abstract Builder<UserT, OutputT> setFilenamePolicy(FilenamePolicy filenamePolicy);
+
+ abstract Builder<UserT, OutputT> setCodec(SerializableAvroCodecFactory codec);
+
+ abstract Builder<UserT, OutputT> setMetadata(ImmutableMap<String, Object> metadata);
+
+ abstract Builder<UserT, OutputT> setDynamicDestinations(
+ DynamicAvroDestinations<UserT, ?, OutputT> dynamicDestinations);
+
+ abstract TypedWrite<UserT, OutputT> build();
}
/**
@@ -635,7 +734,7 @@ public class AvroIO {
* common suffix (if supplied using {@link #withSuffix(String)}). This default can be overridden
* using {@link #to(FilenamePolicy)}.
*/
- public Write<T> to(String outputPrefix) {
+ public TypedWrite<UserT, OutputT> to(String outputPrefix) {
return to(FileBasedSink.convertToFileResourceIfPossible(outputPrefix));
}
@@ -658,14 +757,12 @@ public class AvroIO {
* infer a directory for temporary files.
*/
@Experimental(Kind.FILESYSTEM)
- public Write<T> to(ResourceId outputPrefix) {
+ public TypedWrite<UserT, OutputT> to(ResourceId outputPrefix) {
return toResource(StaticValueProvider.of(outputPrefix));
}
- /**
- * Like {@link #to(String)}.
- */
- public Write<T> to(ValueProvider<String> outputPrefix) {
+ /** Like {@link #to(String)}. */
+ public TypedWrite<UserT, OutputT> to(ValueProvider<String> outputPrefix) {
return toResource(NestedValueProvider.of(outputPrefix,
new SerializableFunction<String, ResourceId>() {
@Override
@@ -675,11 +772,9 @@ public class AvroIO {
}));
}
- /**
- * Like {@link #to(ResourceId)}.
- */
+ /** Like {@link #to(ResourceId)}. */
@Experimental(Kind.FILESYSTEM)
- public Write<T> toResource(ValueProvider<ResourceId> outputPrefix) {
+ public TypedWrite<UserT, OutputT> toResource(ValueProvider<ResourceId> outputPrefix) {
return toBuilder().setFilenamePrefix(outputPrefix).build();
}
@@ -687,16 +782,52 @@ public class AvroIO {
* Writes to files named according to the given {@link FileBasedSink.FilenamePolicy}. A
* directory for temporary files must be specified using {@link #withTempDirectory}.
*/
- public Write<T> to(FilenamePolicy filenamePolicy) {
+ @Experimental(Kind.FILESYSTEM)
+ public TypedWrite<UserT, OutputT> to(FilenamePolicy filenamePolicy) {
return toBuilder().setFilenamePolicy(filenamePolicy).build();
}
+ /**
+ * Use a {@link DynamicAvroDestinations} object to vend {@link FilenamePolicy} objects. These
+ * objects can examine the input record when creating a {@link FilenamePolicy}. A directory for
+ * temporary files must be specified using {@link #withTempDirectory}.
+ */
+ @Experimental(Kind.FILESYSTEM)
+ public TypedWrite<UserT, OutputT> to(
+ DynamicAvroDestinations<UserT, ?, OutputT> dynamicDestinations) {
+ return toBuilder().setDynamicDestinations(dynamicDestinations).build();
+ }
+
+ /**
+ * Sets the the output schema. Can only be used when the output type is {@link GenericRecord}
+ * and when not using {@link #to(DynamicAvroDestinations)}.
+ */
+ public TypedWrite<UserT, OutputT> withSchema(Schema schema) {
+ return toBuilder().setSchema(schema).build();
+ }
+
+ /**
+ * Specifies a format function to convert {@link UserT} to the output type. If {@link
+ * #to(DynamicAvroDestinations)} is used, {@link DynamicAvroDestinations#formatRecord} must be
+ * used instead.
+ */
+ public TypedWrite<UserT, OutputT> withFormatFunction(
+ SerializableFunction<UserT, OutputT> formatFunction) {
+ return toBuilder().setFormatFunction(formatFunction).build();
+ }
+
/** Set the base directory used to generate temporary files. */
@Experimental(Kind.FILESYSTEM)
- public Write<T> withTempDirectory(ValueProvider<ResourceId> tempDirectory) {
+ public TypedWrite<UserT, OutputT> withTempDirectory(ValueProvider<ResourceId> tempDirectory) {
return toBuilder().setTempDirectory(tempDirectory).build();
}
+ /** Set the base directory used to generate temporary files. */
+ @Experimental(Kind.FILESYSTEM)
+ public TypedWrite<UserT, OutputT> withTempDirectory(ResourceId tempDirectory) {
+ return withTempDirectory(StaticValueProvider.of(tempDirectory));
+ }
+
/**
* Uses the given {@link ShardNameTemplate} for naming output files. This option may only be
* used when using one of the default filename-prefix to() overrides.
@@ -704,7 +835,7 @@ public class AvroIO {
* <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are
* used.
*/
- public Write<T> withShardNameTemplate(String shardTemplate) {
+ public TypedWrite<UserT, OutputT> withShardNameTemplate(String shardTemplate) {
return toBuilder().setShardTemplate(shardTemplate).build();
}
@@ -715,7 +846,7 @@ public class AvroIO {
* <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are
* used.
*/
- public Write<T> withSuffix(String filenameSuffix) {
+ public TypedWrite<UserT, OutputT> withSuffix(String filenameSuffix) {
return toBuilder().setFilenameSuffix(filenameSuffix).build();
}
@@ -729,7 +860,7 @@ public class AvroIO {
*
* @param numShards the number of shards to use, or 0 to let the system decide.
*/
- public Write<T> withNumShards(int numShards) {
+ public TypedWrite<UserT, OutputT> withNumShards(int numShards) {
checkArgument(numShards >= 0);
return toBuilder().setNumShards(numShards).build();
}
@@ -744,7 +875,7 @@ public class AvroIO {
*
* <p>This is equivalent to {@code .withNumShards(1).withShardNameTemplate("")}
*/
- public Write<T> withoutSharding() {
+ public TypedWrite<UserT, OutputT> withoutSharding() {
return withNumShards(1).withShardNameTemplate("");
}
@@ -754,12 +885,12 @@ public class AvroIO {
* <p>If using {@link #to(FileBasedSink.FilenamePolicy)}. Filenames will be generated using
* {@link FilenamePolicy#windowedFilename}. See also {@link WriteFiles#withWindowedWrites()}.
*/
- public Write<T> withWindowedWrites() {
+ public TypedWrite<UserT, OutputT> withWindowedWrites() {
return toBuilder().setWindowedWrites(true).build();
}
/** Writes to Avro file(s) compressed using specified codec. */
- public Write<T> withCodec(CodecFactory codec) {
+ public TypedWrite<UserT, OutputT> withCodec(CodecFactory codec) {
return toBuilder().setCodec(new SerializableAvroCodecFactory(codec)).build();
}
@@ -768,7 +899,7 @@ public class AvroIO {
*
* <p>Supported value types are String, Long, and byte[].
*/
- public Write<T> withMetadata(Map<String, Object> metadata) {
+ public TypedWrite<UserT, OutputT> withMetadata(Map<String, Object> metadata) {
Map<String, String> badKeys = Maps.newLinkedHashMap();
for (Map.Entry<String, Object> entry : metadata.entrySet()) {
Object v = entry.getValue();
@@ -783,18 +914,31 @@ public class AvroIO {
return toBuilder().setMetadata(ImmutableMap.copyOf(metadata)).build();
}
- DynamicDestinations<T, Void> resolveDynamicDestinations() {
- FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
- if (usedFilenamePolicy == null) {
- usedFilenamePolicy =
- DefaultFilenamePolicy.fromStandardParameters(
- getFilenamePrefix(), getShardTemplate(), getFilenameSuffix(), getWindowedWrites());
+ DynamicAvroDestinations<UserT, ?, OutputT> resolveDynamicDestinations() {
+ DynamicAvroDestinations<UserT, ?, OutputT> dynamicDestinations = getDynamicDestinations();
+ if (dynamicDestinations == null) {
+ FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
+ if (usedFilenamePolicy == null) {
+ usedFilenamePolicy =
+ DefaultFilenamePolicy.fromStandardParameters(
+ getFilenamePrefix(),
+ getShardTemplate(),
+ getFilenameSuffix(),
+ getWindowedWrites());
+ }
+ dynamicDestinations =
+ constantDestinations(
+ usedFilenamePolicy,
+ getSchema(),
+ getMetadata(),
+ getCodec().getCodec(),
+ getFormatFunction());
}
- return DynamicFileDestinations.constant(usedFilenamePolicy);
+ return dynamicDestinations;
}
@Override
- public PDone expand(PCollection<T> input) {
+ public PDone expand(PCollection<UserT> input) {
checkArgument(
getFilenamePrefix() != null || getTempDirectory() != null,
"Need to set either the filename prefix or the tempDirectory of a AvroIO.Write "
@@ -805,24 +949,25 @@ public class AvroIO {
"shardTemplate and filenameSuffix should only be used with the default "
+ "filename policy");
}
+ if (getDynamicDestinations() != null) {
+ checkArgument(
+ getFormatFunction() == null,
+ "A format function should not be specified "
+ + "with DynamicDestinations. Use DynamicDestinations.formatRecord instead");
+ }
+
return expandTyped(input, resolveDynamicDestinations());
}
public <DestinationT> PDone expandTyped(
- PCollection<T> input, DynamicDestinations<T, DestinationT> dynamicDestinations) {
+ PCollection<UserT> input,
+ DynamicAvroDestinations<UserT, DestinationT, OutputT> dynamicDestinations) {
ValueProvider<ResourceId> tempDirectory = getTempDirectory();
if (tempDirectory == null) {
tempDirectory = getFilenamePrefix();
}
- WriteFiles<T, DestinationT, T> write =
- WriteFiles.to(
- new AvroSink<>(
- tempDirectory,
- dynamicDestinations,
- AvroCoder.of(getRecordClass(), getSchema()),
- getCodec(),
- getMetadata()),
- SerializableFunctions.<T>identity());
+ WriteFiles<UserT, DestinationT, OutputT> write =
+ WriteFiles.to(new AvroSink<>(tempDirectory, dynamicDestinations, getGenericRecords()));
if (getNumShards() > 0) {
write = write.withNumShards(getNumShards());
}
@@ -845,33 +990,11 @@ public class AvroIO {
: getTempDirectory().toString();
}
builder
- .add(DisplayData.item("schema", getRecordClass()).withLabel("Record Schema"))
.addIfNotDefault(
DisplayData.item("numShards", getNumShards()).withLabel("Maximum Output Shards"), 0)
- .addIfNotDefault(
- DisplayData.item("codec", getCodec().toString()).withLabel("Avro Compression Codec"),
- DEFAULT_CODEC.toString())
.addIfNotNull(
DisplayData.item("tempDirectory", tempDirectory)
.withLabel("Directory for temporary files"));
- builder.include("Metadata", new Metadata());
- }
-
- private class Metadata implements HasDisplayData {
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- for (Map.Entry<String, Object> entry : getMetadata().entrySet()) {
- DisplayData.Type type = DisplayData.inferType(entry.getValue());
- if (type != null) {
- builder.add(DisplayData.item(entry.getKey(), type, entry.getValue()));
- } else {
- String base64 = BaseEncoding.base64().encode((byte[]) entry.getValue());
- String repr = base64.length() <= METADATA_BYTES_MAX_LENGTH
- ? base64 : base64.substring(0, METADATA_BYTES_MAX_LENGTH) + "...";
- builder.add(DisplayData.item(entry.getKey(), repr));
- }
- }
- }
}
@Override
@@ -880,6 +1003,131 @@ public class AvroIO {
}
}
+ /**
+ * This class is used as the default return value of {@link AvroIO#write}
+ *
+ * <p>All methods in this class delegate to the appropriate method of {@link AvroIO.TypedWrite}.
+ * This class exists for backwards compatibility, and will be removed in Beam 3.0.
+ */
+ public static class Write<T> extends PTransform<PCollection<T>, PDone> {
+ @VisibleForTesting TypedWrite<T, T> inner;
+
+ Write(TypedWrite<T, T> inner) {
+ this.inner = inner;
+ }
+
+ /** See {@link TypedWrite#to(String)}. */
+ public Write<T> to(String outputPrefix) {
+ return new Write<>(
+ inner
+ .to(FileBasedSink.convertToFileResourceIfPossible(outputPrefix))
+ .withFormatFunction(SerializableFunctions.<T>identity()));
+ }
+
+ /** See {@link TypedWrite#to(ResourceId)} . */
+ @Experimental(Kind.FILESYSTEM)
+ public Write<T> to(ResourceId outputPrefix) {
+ return new Write<T>(
+ inner.to(outputPrefix).withFormatFunction(SerializableFunctions.<T>identity()));
+ }
+
+ /** See {@link TypedWrite#to(ValueProvider)}. */
+ public Write<T> to(ValueProvider<String> outputPrefix) {
+ return new Write<>(
+ inner.to(outputPrefix).withFormatFunction(SerializableFunctions.<T>identity()));
+ }
+
+ /** See {@link TypedWrite#to(ResourceId)}. */
+ @Experimental(Kind.FILESYSTEM)
+ public Write<T> toResource(ValueProvider<ResourceId> outputPrefix) {
+ return new Write<>(
+ inner.toResource(outputPrefix).withFormatFunction(SerializableFunctions.<T>identity()));
+ }
+
+ /** See {@link TypedWrite#to(FilenamePolicy)}. */
+ public Write<T> to(FilenamePolicy filenamePolicy) {
+ return new Write<>(
+ inner.to(filenamePolicy).withFormatFunction(SerializableFunctions.<T>identity()));
+ }
+
+ /** See {@link TypedWrite#to(DynamicAvroDestinations)}. */
+ public Write to(DynamicAvroDestinations<T, ?, T> dynamicDestinations) {
+ return new Write<>(inner.to(dynamicDestinations).withFormatFunction(null));
+ }
+
+ /** See {@link TypedWrite#withSchema}. */
+ public Write withSchema(Schema schema) {
+ return new Write<>(inner.withSchema(schema));
+ }
+ /** See {@link TypedWrite#withTempDirectory(ValueProvider)}. */
+ @Experimental(Kind.FILESYSTEM)
+ public Write<T> withTempDirectory(ValueProvider<ResourceId> tempDirectory) {
+ return new Write<>(inner.withTempDirectory(tempDirectory));
+ }
+
+ /** See {@link TypedWrite#withTempDirectory(ResourceId)}. */
+ public Write<T> withTempDirectory(ResourceId tempDirectory) {
+ return new Write<>(inner.withTempDirectory(tempDirectory));
+ }
+
+ /** See {@link TypedWrite#withShardNameTemplate}. */
+ public Write<T> withShardNameTemplate(String shardTemplate) {
+ return new Write<>(inner.withShardNameTemplate(shardTemplate));
+ }
+
+ /** See {@link TypedWrite#withSuffix}. */
+ public Write<T> withSuffix(String filenameSuffix) {
+ return new Write<>(inner.withSuffix(filenameSuffix));
+ }
+
+ /** See {@link TypedWrite#withNumShards}. */
+ public Write<T> withNumShards(int numShards) {
+ return new Write<>(inner.withNumShards(numShards));
+ }
+
+ /** See {@link TypedWrite#withoutSharding}. */
+ public Write<T> withoutSharding() {
+ return new Write<>(inner.withoutSharding());
+ }
+
+ /** See {@link TypedWrite#withWindowedWrites}. */
+ public Write withWindowedWrites() {
+ return new Write<T>(inner.withWindowedWrites());
+ }
+
+ /** See {@link TypedWrite#withCodec}. */
+ public Write<T> withCodec(CodecFactory codec) {
+ return new Write<>(inner.withCodec(codec));
+ }
+
+ /** See {@link TypedWrite#withMetadata} . */
+ public Write withMetadata(Map<String, Object> metadata) {
+ return new Write<>(inner.withMetadata(metadata));
+ }
+
+ @Override
+ public PDone expand(PCollection<T> input) {
+ return inner.expand(input);
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ inner.populateDisplayData(builder);
+ }
+ }
+
+ /**
+ * Returns a {@link DynamicAvroDestinations} that always returns the same {@link FilenamePolicy},
+ * schema, metadata, and codec.
+ */
+ public static <UserT, OutputT> DynamicAvroDestinations<UserT, Void, OutputT> constantDestinations(
+ FilenamePolicy filenamePolicy,
+ Schema schema,
+ Map<String, Object> metadata,
+ CodecFactory codec,
+ SerializableFunction<UserT, OutputT> formatFunction) {
+ return new ConstantAvroDestination<>(filenamePolicy, schema, metadata, codec, formatFunction);
+ }
/////////////////////////////////////////////////////////////////////////////
/** Disallow construction of utility class. */
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
index c78870b..acd3ea6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
@@ -17,93 +17,90 @@
*/
package org.apache.beam.sdk.io;
-import com.google.common.collect.ImmutableMap;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.util.MimeTypes;
/** A {@link FileBasedSink} for Avro files. */
-class AvroSink<T, DestinationT> extends FileBasedSink<T, DestinationT> {
- private final AvroCoder<T> coder;
- private final SerializableAvroCodecFactory codec;
- private final ImmutableMap<String, Object> metadata;
+class AvroSink<UserT, DestinationT, OutputT> extends FileBasedSink<UserT, DestinationT, OutputT> {
+ private final DynamicAvroDestinations<UserT, DestinationT, OutputT> dynamicDestinations;
+ private final boolean genericRecords;
AvroSink(
ValueProvider<ResourceId> outputPrefix,
- DynamicDestinations<T, DestinationT> dynamicDestinations,
- AvroCoder<T> coder,
- SerializableAvroCodecFactory codec,
- ImmutableMap<String, Object> metadata) {
+ DynamicAvroDestinations<UserT, DestinationT, OutputT> dynamicDestinations,
+ boolean genericRecords) {
// Avro handle compression internally using the codec.
super(outputPrefix, dynamicDestinations, CompressionType.UNCOMPRESSED);
- this.coder = coder;
- this.codec = codec;
- this.metadata = metadata;
+ this.dynamicDestinations = dynamicDestinations;
+ this.genericRecords = genericRecords;
}
@Override
- public WriteOperation<T, DestinationT> createWriteOperation() {
- return new AvroWriteOperation<>(this, coder, codec, metadata);
+ public DynamicAvroDestinations<UserT, DestinationT, OutputT> getDynamicDestinations() {
+ return (DynamicAvroDestinations<UserT, DestinationT, OutputT>) super.getDynamicDestinations();
+ }
+
+ @Override
+ public WriteOperation<DestinationT, OutputT> createWriteOperation() {
+ return new AvroWriteOperation<>(this, genericRecords);
}
/** A {@link WriteOperation WriteOperation} for Avro files. */
- private static class AvroWriteOperation<T, DestinationT> extends WriteOperation<T, DestinationT> {
- private final AvroCoder<T> coder;
- private final SerializableAvroCodecFactory codec;
- private final ImmutableMap<String, Object> metadata;
+ private static class AvroWriteOperation<DestinationT, OutputT>
+ extends WriteOperation<DestinationT, OutputT> {
+ private final DynamicAvroDestinations<?, DestinationT, ?> dynamicDestinations;
+ private final boolean genericRecords;
- private AvroWriteOperation(
- AvroSink<T, DestinationT> sink,
- AvroCoder<T> coder,
- SerializableAvroCodecFactory codec,
- ImmutableMap<String, Object> metadata) {
+ private AvroWriteOperation(AvroSink<?, DestinationT, OutputT> sink, boolean genericRecords) {
super(sink);
- this.coder = coder;
- this.codec = codec;
- this.metadata = metadata;
+ this.dynamicDestinations = sink.getDynamicDestinations();
+ this.genericRecords = genericRecords;
}
@Override
- public Writer<T, DestinationT> createWriter() throws Exception {
- return new AvroWriter<>(this, coder, codec, metadata);
+ public Writer<DestinationT, OutputT> createWriter() throws Exception {
+ return new AvroWriter<>(this, dynamicDestinations, genericRecords);
}
}
/** A {@link Writer Writer} for Avro files. */
- private static class AvroWriter<T, DestinationT> extends Writer<T, DestinationT> {
- private final AvroCoder<T> coder;
- private DataFileWriter<T> dataFileWriter;
- private SerializableAvroCodecFactory codec;
- private final ImmutableMap<String, Object> metadata;
+ private static class AvroWriter<DestinationT, OutputT> extends Writer<DestinationT, OutputT> {
+ private DataFileWriter<OutputT> dataFileWriter;
+ private final DynamicAvroDestinations<?, DestinationT, ?> dynamicDestinations;
+ private final boolean genericRecords;
public AvroWriter(
- WriteOperation<T, DestinationT> writeOperation,
- AvroCoder<T> coder,
- SerializableAvroCodecFactory codec,
- ImmutableMap<String, Object> metadata) {
+ WriteOperation<DestinationT, OutputT> writeOperation,
+ DynamicAvroDestinations<?, DestinationT, ?> dynamicDestinations,
+ boolean genericRecords) {
super(writeOperation, MimeTypes.BINARY);
- this.coder = coder;
- this.codec = codec;
- this.metadata = metadata;
+ this.dynamicDestinations = dynamicDestinations;
+ this.genericRecords = genericRecords;
}
@SuppressWarnings("deprecation") // uses internal test functionality.
@Override
protected void prepareWrite(WritableByteChannel channel) throws Exception {
- DatumWriter<T> datumWriter = coder.getType().equals(GenericRecord.class)
- ? new GenericDatumWriter<T>(coder.getSchema())
- : new ReflectDatumWriter<T>(coder.getSchema());
+ DestinationT destination = getDestination();
+ CodecFactory codec = dynamicDestinations.getCodec(destination);
+ Schema schema = dynamicDestinations.getSchema(destination);
+ Map<String, Object> metadata = dynamicDestinations.getMetadata(destination);
- dataFileWriter = new DataFileWriter<>(datumWriter).setCodec(codec.getCodec());
+ DatumWriter<OutputT> datumWriter =
+ genericRecords
+ ? new GenericDatumWriter<OutputT>(schema)
+ : new ReflectDatumWriter<OutputT>(schema);
+ dataFileWriter = new DataFileWriter<>(datumWriter).setCodec(codec);
for (Map.Entry<String, Object> entry : metadata.entrySet()) {
Object v = entry.getValue();
if (v instanceof String) {
@@ -118,11 +115,11 @@ class AvroSink<T, DestinationT> extends FileBasedSink<T, DestinationT> {
+ v.getClass().getSimpleName());
}
}
- dataFileWriter.create(coder.getSchema(), Channels.newOutputStream(channel));
+ dataFileWriter.create(schema, Channels.newOutputStream(channel));
}
@Override
- public void write(T value) throws Exception {
+ public void write(OutputT value) throws Exception {
dataFileWriter.append(value);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ConstantAvroDestination.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ConstantAvroDestination.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ConstantAvroDestination.java
new file mode 100644
index 0000000..b006e26
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ConstantAvroDestination.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io;
+
+import com.google.common.base.Function;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.io.BaseEncoding;
+import java.io.Serializable;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+
+/** Always returns a constant {@link FilenamePolicy}, {@link Schema}, metadata, and codec. */
+class ConstantAvroDestination<UserT, OutputT>
+ extends DynamicAvroDestinations<UserT, Void, OutputT> {
+ private static class SchemaFunction implements Serializable, Function<String, Schema> {
+ @Nullable
+ @Override
+ public Schema apply(@Nullable String input) {
+ return new Schema.Parser().parse(input);
+ }
+ }
+
+ // This should be a multiple of 4 to not get a partial encoded byte.
+ private static final int METADATA_BYTES_MAX_LENGTH = 40;
+ private final FilenamePolicy filenamePolicy;
+ private final Supplier<Schema> schema;
+ private final Map<String, Object> metadata;
+ private final SerializableAvroCodecFactory codec;
+ private final SerializableFunction<UserT, OutputT> formatFunction;
+
+ private class Metadata implements HasDisplayData {
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ for (Map.Entry<String, Object> entry : metadata.entrySet()) {
+ DisplayData.Type type = DisplayData.inferType(entry.getValue());
+ if (type != null) {
+ builder.add(DisplayData.item(entry.getKey(), type, entry.getValue()));
+ } else {
+ String base64 = BaseEncoding.base64().encode((byte[]) entry.getValue());
+ String repr =
+ base64.length() <= METADATA_BYTES_MAX_LENGTH
+ ? base64
+ : base64.substring(0, METADATA_BYTES_MAX_LENGTH) + "...";
+ builder.add(DisplayData.item(entry.getKey(), repr));
+ }
+ }
+ }
+ }
+
+ public ConstantAvroDestination(
+ FilenamePolicy filenamePolicy,
+ Schema schema,
+ Map<String, Object> metadata,
+ CodecFactory codec,
+ SerializableFunction<UserT, OutputT> formatFunction) {
+ this.filenamePolicy = filenamePolicy;
+ this.schema = Suppliers.compose(new SchemaFunction(), Suppliers.ofInstance(schema.toString()));
+ this.metadata = metadata;
+ this.codec = new SerializableAvroCodecFactory(codec);
+ this.formatFunction = formatFunction;
+ }
+
+ @Override
+ public OutputT formatRecord(UserT record) {
+ return formatFunction.apply(record);
+ }
+
+ @Override
+ public Void getDestination(UserT element) {
+ return (Void) null;
+ }
+
+ @Override
+ public Void getDefaultDestination() {
+ return (Void) null;
+ }
+
+ @Override
+ public FilenamePolicy getFilenamePolicy(Void destination) {
+ return filenamePolicy;
+ }
+
+ @Override
+ public Schema getSchema(Void destination) {
+ return schema.get();
+ }
+
+ @Override
+ public Map<String, Object> getMetadata(Void destination) {
+ return metadata;
+ }
+
+ @Override
+ public CodecFactory getCodec(Void destination) {
+ return codec.getCodec();
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ filenamePolicy.populateDisplayData(builder);
+ builder.add(DisplayData.item("schema", schema.get().toString()).withLabel("Record Schema"));
+ builder.addIfNotDefault(
+ DisplayData.item("codec", codec.getCodec().toString()).withLabel("Avro Compression Codec"),
+ AvroIO.TypedWrite.DEFAULT_SERIALIZABLE_CODEC.toString());
+ builder.include("Metadata", new Metadata());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
index 4021609..1f438d5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
@@ -157,7 +157,6 @@ public final class DefaultFilenamePolicy extends FilenamePolicy {
&& shardTemplate.equals(other.shardTemplate)
&& suffix.equals(other.suffix);
}
-
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicAvroDestinations.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicAvroDestinations.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicAvroDestinations.java
new file mode 100644
index 0000000..f4e8ee6
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicAvroDestinations.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
+
+/**
+ * A specialization of {@link DynamicDestinations} for {@link AvroIO}. In addition to dynamic file
+ * destinations, this allows specifying other AVRO properties (schema, metadata, codec) per
+ * destination.
+ */
+public abstract class DynamicAvroDestinations<UserT, DestinationT, OutputT>
+ extends DynamicDestinations<UserT, DestinationT, OutputT> {
+ /** Return an AVRO schema for a given destination. */
+ public abstract Schema getSchema(DestinationT destination);
+
+ /** Return AVRO file metadata for a given destination. */
+ public Map<String, Object> getMetadata(DestinationT destination) {
+ return ImmutableMap.<String, Object>of();
+ }
+
+ /** Return an AVRO codec for a given destination. */
+ public CodecFactory getCodec(DestinationT destination) {
+ return AvroIO.TypedWrite.DEFAULT_CODEC;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
index d05a01a7..b087bc5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
@@ -18,7 +18,6 @@
package org.apache.beam.sdk.io;
-import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import javax.annotation.Nullable;
@@ -28,20 +27,30 @@ import org.apache.beam.sdk.io.DefaultFilenamePolicy.ParamsCoder;
import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.display.DisplayData;
/** Some helper classes that derive from {@link FileBasedSink.DynamicDestinations}. */
public class DynamicFileDestinations {
/** Always returns a constant {@link FilenamePolicy}. */
- private static class ConstantFilenamePolicy<T> extends DynamicDestinations<T, Void> {
+ private static class ConstantFilenamePolicy<UserT, OutputT>
+ extends DynamicDestinations<UserT, Void, OutputT> {
private final FilenamePolicy filenamePolicy;
+ private final SerializableFunction<UserT, OutputT> formatFunction;
- public ConstantFilenamePolicy(FilenamePolicy filenamePolicy) {
- this.filenamePolicy = checkNotNull(filenamePolicy);
+ public ConstantFilenamePolicy(
+ FilenamePolicy filenamePolicy, SerializableFunction<UserT, OutputT> formatFunction) {
+ this.filenamePolicy = filenamePolicy;
+ this.formatFunction = formatFunction;
}
@Override
- public Void getDestination(T element) {
+ public OutputT formatRecord(UserT record) {
+ return formatFunction.apply(record);
+ }
+
+ @Override
+ public Void getDestination(UserT element) {
return (Void) null;
}
@@ -71,14 +80,24 @@ public class DynamicFileDestinations {
* A base class for a {@link DynamicDestinations} object that returns differently-configured
* instances of {@link DefaultFilenamePolicy}.
*/
- private static class DefaultPolicyDestinations<UserT> extends DynamicDestinations<UserT, Params> {
- SerializableFunction<UserT, Params> destinationFunction;
- Params emptyDestination;
+ private static class DefaultPolicyDestinations<UserT, OutputT>
+ extends DynamicDestinations<UserT, Params, OutputT> {
+ private final SerializableFunction<UserT, Params> destinationFunction;
+ private final Params emptyDestination;
+ private final SerializableFunction<UserT, OutputT> formatFunction;
public DefaultPolicyDestinations(
- SerializableFunction<UserT, Params> destinationFunction, Params emptyDestination) {
+ SerializableFunction<UserT, Params> destinationFunction,
+ Params emptyDestination,
+ SerializableFunction<UserT, OutputT> formatFunction) {
this.destinationFunction = destinationFunction;
this.emptyDestination = emptyDestination;
+ this.formatFunction = formatFunction;
+ }
+
+ @Override
+ public OutputT formatRecord(UserT record) {
+ return formatFunction.apply(record);
}
@Override
@@ -104,16 +123,28 @@ public class DynamicFileDestinations {
}
/** Returns a {@link DynamicDestinations} that always returns the same {@link FilenamePolicy}. */
- public static <T> DynamicDestinations<T, Void> constant(FilenamePolicy filenamePolicy) {
- return new ConstantFilenamePolicy<>(filenamePolicy);
+ public static <UserT, OutputT> DynamicDestinations<UserT, Void, OutputT> constant(
+ FilenamePolicy filenamePolicy, SerializableFunction<UserT, OutputT> formatFunction) {
+ return new ConstantFilenamePolicy<>(filenamePolicy, formatFunction);
+ }
+
+ /**
+ * A specialization of {@link #constant(FilenamePolicy, SerializableFunction)} for the case where
+ * UserT and OutputT are the same type and the format function is the identity.
+ */
+ public static <UserT> DynamicDestinations<UserT, Void, UserT> constant(
+ FilenamePolicy filenamePolicy) {
+ return new ConstantFilenamePolicy<>(filenamePolicy, SerializableFunctions.<UserT>identity());
}
/**
* Returns a {@link DynamicDestinations} that returns instances of {@link DefaultFilenamePolicy}
* configured with the given {@link Params}.
*/
- public static <UserT> DynamicDestinations<UserT, Params> toDefaultPolicies(
- SerializableFunction<UserT, Params> destinationFunction, Params emptyDestination) {
- return new DefaultPolicyDestinations<>(destinationFunction, emptyDestination);
+ public static <UserT, OutputT> DynamicDestinations<UserT, Params, OutputT> toDefaultPolicies(
+ SerializableFunction<UserT, Params> destinationFunction,
+ Params emptyDestination,
+ SerializableFunction<UserT, OutputT> formatFunction) {
+ return new DefaultPolicyDestinations<>(destinationFunction, emptyDestination, formatFunction);
}
}