You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/07/29 00:49:40 UTC
[1/4] beam git commit: [BEAM-92] Supports DynamicDestinations in
AvroIO.
Repository: beam
Updated Branches:
refs/heads/master 1f2634d23 -> 540fa9b42
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
index 60088de..1d4ce08 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
@@ -68,8 +68,6 @@ import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Top;
import org.apache.beam.sdk.transforms.View;
@@ -178,11 +176,7 @@ public class WriteFilesTest {
"Intimidating pigeon",
"Pedantic gull",
"Frisky finch");
- runWrite(
- inputs,
- IDENTITY_MAP,
- getBaseOutputFilename(),
- WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity()));
+ runWrite(inputs, IDENTITY_MAP, getBaseOutputFilename(), WriteFiles.to(makeSimpleSink()));
}
/** Test that WriteFiles with an empty input still produces one shard. */
@@ -193,7 +187,7 @@ public class WriteFilesTest {
Collections.<String>emptyList(),
IDENTITY_MAP,
getBaseOutputFilename(),
- WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity()));
+ WriteFiles.to(makeSimpleSink()));
checkFileContents(getBaseOutputFilename(), Collections.<String>emptyList(), Optional.of(1));
}
@@ -208,7 +202,7 @@ public class WriteFilesTest {
Arrays.asList("one", "two", "three", "four", "five", "six"),
IDENTITY_MAP,
getBaseOutputFilename(),
- WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity()).withNumShards(1));
+ WriteFiles.to(makeSimpleSink()));
}
private ResourceId getBaseOutputDirectory() {
@@ -241,9 +235,7 @@ public class WriteFilesTest {
}
SimpleSink<Void> sink = makeSimpleSink();
- WriteFiles<String, ?, String> write =
- WriteFiles.to(sink, SerializableFunctions.<String>identity())
- .withSharding(new LargestInt());
+ WriteFiles<String, ?, String> write = WriteFiles.to(sink).withSharding(new LargestInt());
p.apply(Create.timestamped(inputs, timestamps).withCoder(StringUtf8Coder.of()))
.apply(IDENTITY_MAP)
.apply(write);
@@ -264,8 +256,7 @@ public class WriteFilesTest {
Arrays.asList("one", "two", "three", "four", "five", "six"),
IDENTITY_MAP,
getBaseOutputFilename(),
- WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity())
- .withNumShards(20));
+ WriteFiles.to(makeSimpleSink()).withNumShards(20));
}
/** Test a WriteFiles transform with an empty PCollection. */
@@ -273,11 +264,7 @@ public class WriteFilesTest {
@Category(NeedsRunner.class)
public void testWriteWithEmptyPCollection() throws IOException {
List<String> inputs = new ArrayList<>();
- runWrite(
- inputs,
- IDENTITY_MAP,
- getBaseOutputFilename(),
- WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity()));
+ runWrite(inputs, IDENTITY_MAP, getBaseOutputFilename(), WriteFiles.to(makeSimpleSink()));
}
/** Test a WriteFiles with a windowed PCollection. */
@@ -295,7 +282,7 @@ public class WriteFilesTest {
inputs,
new WindowAndReshuffle<>(Window.<String>into(FixedWindows.of(Duration.millis(2)))),
getBaseOutputFilename(),
- WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity()));
+ WriteFiles.to(makeSimpleSink()));
}
/** Test a WriteFiles with sessions. */
@@ -314,7 +301,7 @@ public class WriteFilesTest {
inputs,
new WindowAndReshuffle<>(Window.<String>into(Sessions.withGapDuration(Duration.millis(1)))),
getBaseOutputFilename(),
- WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity()));
+ WriteFiles.to(makeSimpleSink()));
}
@Test
@@ -328,15 +315,12 @@ public class WriteFilesTest {
inputs,
Window.<String>into(FixedWindows.of(Duration.millis(2))),
getBaseOutputFilename(),
- WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity())
- .withMaxNumWritersPerBundle(2)
- .withWindowedWrites());
+ WriteFiles.to(makeSimpleSink()).withMaxNumWritersPerBundle(2).withWindowedWrites());
}
public void testBuildWrite() {
SimpleSink<Void> sink = makeSimpleSink();
- WriteFiles<String, ?, String> write =
- WriteFiles.to(sink, SerializableFunctions.<String>identity()).withNumShards(3);
+ WriteFiles<String, ?, String> write = WriteFiles.to(sink).withNumShards(3);
assertThat((SimpleSink<Void>) write.getSink(), is(sink));
PTransform<PCollection<String>, PCollectionView<Integer>> originalSharding =
write.getSharding();
@@ -358,7 +342,7 @@ public class WriteFilesTest {
@Test
public void testDisplayData() {
- DynamicDestinations<String, Void> dynamicDestinations =
+ DynamicDestinations<String, Void, String> dynamicDestinations =
DynamicFileDestinations.constant(
DefaultFilenamePolicy.fromParams(
new Params()
@@ -374,8 +358,7 @@ public class WriteFilesTest {
builder.add(DisplayData.item("foo", "bar"));
}
};
- WriteFiles<String, ?, String> write =
- WriteFiles.to(sink, SerializableFunctions.<String>identity());
+ WriteFiles<String, ?, String> write = WriteFiles.to(sink);
DisplayData displayData = DisplayData.from(write);
@@ -391,9 +374,7 @@ public class WriteFilesTest {
"Must use windowed writes when applying WriteFiles to an unbounded PCollection");
SimpleSink<Void> sink = makeSimpleSink();
- p.apply(Create.of("foo"))
- .setIsBoundedInternal(IsBounded.UNBOUNDED)
- .apply(WriteFiles.to(sink, SerializableFunctions.<String>identity()));
+ p.apply(Create.of("foo")).setIsBoundedInternal(IsBounded.UNBOUNDED).apply(WriteFiles.to(sink));
p.run();
}
@@ -408,13 +389,13 @@ public class WriteFilesTest {
SimpleSink<Void> sink = makeSimpleSink();
p.apply(Create.of("foo"))
.setIsBoundedInternal(IsBounded.UNBOUNDED)
- .apply(WriteFiles.to(sink, SerializableFunctions.<String>identity()).withWindowedWrites());
+ .apply(WriteFiles.to(sink).withWindowedWrites());
p.run();
}
// Test DynamicDestinations class. Expects user values to be string-encoded integers.
// Stores the integer mod 5 as the destination, and uses that in the file prefix.
- static class TestDestinations extends DynamicDestinations<String, Integer> {
+ static class TestDestinations extends DynamicDestinations<String, Integer, String> {
private ResourceId baseOutputDirectory;
TestDestinations(ResourceId baseOutputDirectory) {
@@ -422,6 +403,11 @@ public class WriteFilesTest {
}
@Override
+ public String formatRecord(String record) {
+ return "record_" + record;
+ }
+
+ @Override
public Integer getDestination(String element) {
return Integer.valueOf(element) % 5;
}
@@ -444,14 +430,6 @@ public class WriteFilesTest {
}
}
- // Test format function. Prepend a string to each record before writing.
- static class TestDynamicFormatFunction implements SerializableFunction<String, String> {
- @Override
- public String apply(String input) {
- return "record_" + input;
- }
- }
-
@Test
@Category(NeedsRunner.class)
public void testDynamicDestinationsBounded() throws Exception {
@@ -495,8 +473,7 @@ public class WriteFilesTest {
// If emptyShards==true make numShards larger than the number of elements per destination.
// This will force every destination to generate some empty shards.
int numShards = emptyShards ? 2 * numInputs / 5 : 2;
- WriteFiles<String, Integer, String> writeFiles =
- WriteFiles.to(sink, new TestDynamicFormatFunction()).withNumShards(numShards);
+ WriteFiles<String, Integer, String> writeFiles = WriteFiles.to(sink).withNumShards(numShards);
PCollection<String> input = p.apply(Create.timestamped(inputs, timestamps));
if (!bounded) {
@@ -521,7 +498,7 @@ public class WriteFilesTest {
@Test
public void testShardedDisplayData() {
- DynamicDestinations<String, Void> dynamicDestinations =
+ DynamicDestinations<String, Void, String> dynamicDestinations =
DynamicFileDestinations.constant(
DefaultFilenamePolicy.fromParams(
new Params()
@@ -537,8 +514,7 @@ public class WriteFilesTest {
builder.add(DisplayData.item("foo", "bar"));
}
};
- WriteFiles<String, ?, String> write =
- WriteFiles.to(sink, SerializableFunctions.<String>identity()).withNumShards(1);
+ WriteFiles<String, ?, String> write = WriteFiles.to(sink).withNumShards(1);
DisplayData displayData = DisplayData.from(write);
assertThat(displayData, hasDisplayItem("sink", sink.getClass()));
assertThat(displayData, includesDisplayDataFor("sink", sink));
@@ -547,7 +523,7 @@ public class WriteFilesTest {
@Test
public void testCustomShardStrategyDisplayData() {
- DynamicDestinations<String, Void> dynamicDestinations =
+ DynamicDestinations<String, Void, String> dynamicDestinations =
DynamicFileDestinations.constant(
DefaultFilenamePolicy.fromParams(
new Params()
@@ -564,7 +540,7 @@ public class WriteFilesTest {
}
};
WriteFiles<String, ?, String> write =
- WriteFiles.to(sink, SerializableFunctions.<String>identity())
+ WriteFiles.to(sink)
.withSharding(
new PTransform<PCollection<String>, PCollectionView<Integer>>() {
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
index 442fba5..7255a94 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
@@ -36,7 +36,6 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
@@ -522,8 +521,7 @@ public class XmlIO {
@Override
public PDone expand(PCollection<T> input) {
- return input.apply(
- org.apache.beam.sdk.io.WriteFiles.to(createSink(), SerializableFunctions.<T>identity()));
+ return input.apply(org.apache.beam.sdk.io.WriteFiles.to(createSink()));
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
index 74e0bda..b663544 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.MimeTypes;
/** Implementation of {@link XmlIO#write}. */
-class XmlSink<T> extends FileBasedSink<T, Void> {
+class XmlSink<T> extends FileBasedSink<T, Void, T> {
private static final String XML_EXTENSION = ".xml";
private final XmlIO.Write<T> spec;
@@ -46,7 +46,7 @@ class XmlSink<T> extends FileBasedSink<T, Void> {
}
XmlSink(XmlIO.Write<T> spec) {
- super(spec.getFilenamePrefix(), DynamicFileDestinations.constant(makeFilenamePolicy(spec)));
+ super(spec.getFilenamePrefix(), DynamicFileDestinations.<T>constant(makeFilenamePolicy(spec)));
this.spec = spec;
}
@@ -77,7 +77,7 @@ class XmlSink<T> extends FileBasedSink<T, Void> {
}
/** {@link WriteOperation} for XML {@link FileBasedSink}s. */
- protected static final class XmlWriteOperation<T> extends WriteOperation<T, Void> {
+ protected static final class XmlWriteOperation<T> extends WriteOperation<Void, T> {
public XmlWriteOperation(XmlSink<T> sink) {
super(sink);
}
@@ -112,7 +112,7 @@ class XmlSink<T> extends FileBasedSink<T, Void> {
}
/** A {@link Writer} that can write objects as XML elements. */
- protected static final class XmlWriter<T> extends Writer<T, Void> {
+ protected static final class XmlWriter<T> extends Writer<Void, T> {
final Marshaller marshaller;
private OutputStream os = null;
[4/4] beam git commit: This closes #3541: [BEAM-92] Supports
DynamicDestinations in AvroIO.
Posted by jk...@apache.org.
This closes #3541: [BEAM-92] Supports DynamicDestinations in AvroIO.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/540fa9b4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/540fa9b4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/540fa9b4
Branch: refs/heads/master
Commit: 540fa9b4246f2e680b6550537c4dda575e5cf71f
Parents: 1f2634d 9f2622f
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Jul 28 17:28:29 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Jul 28 17:28:29 2017 -0700
----------------------------------------------------------------------
.../core/construction/ParDoTranslation.java | 2 +-
.../construction/WriteFilesTranslation.java | 81 ++--
.../construction/PTransformMatchersTest.java | 10 +-
.../construction/WriteFilesTranslationTest.java | 26 +-
.../direct/WriteWithShardingFactory.java | 10 +-
.../direct/WriteWithShardingFactoryTest.java | 8 +-
.../beam/runners/dataflow/DataflowRunner.java | 8 +-
.../runners/dataflow/DataflowRunnerTest.java | 10 +-
.../src/main/proto/beam_runner_api.proto | 2 +
.../java/org/apache/beam/sdk/io/AvroIO.java | 436 +++++++++++++++----
.../java/org/apache/beam/sdk/io/AvroSink.java | 93 ++--
.../beam/sdk/io/ConstantAvroDestination.java | 130 ++++++
.../beam/sdk/io/DefaultFilenamePolicy.java | 1 -
.../beam/sdk/io/DynamicAvroDestinations.java | 46 ++
.../beam/sdk/io/DynamicFileDestinations.java | 59 ++-
.../org/apache/beam/sdk/io/FileBasedSink.java | 121 +++--
.../java/org/apache/beam/sdk/io/TFRecordIO.java | 23 +-
.../java/org/apache/beam/sdk/io/TextIO.java | 228 ++++++----
.../java/org/apache/beam/sdk/io/TextSink.java | 14 +-
.../java/org/apache/beam/sdk/io/WriteFiles.java | 116 ++---
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 156 ++++++-
.../apache/beam/sdk/io/FileBasedSinkTest.java | 6 +-
.../java/org/apache/beam/sdk/io/SimpleSink.java | 10 +-
.../org/apache/beam/sdk/io/TextIOWriteTest.java | 23 +-
.../org/apache/beam/sdk/io/WriteFilesTest.java | 74 ++--
.../java/org/apache/beam/sdk/io/xml/XmlIO.java | 4 +-
.../org/apache/beam/sdk/io/xml/XmlSink.java | 8 +-
27 files changed, 1214 insertions(+), 491 deletions(-)
----------------------------------------------------------------------
[3/4] beam git commit: [BEAM-92] Supports DynamicDestinations in
AvroIO.
Posted by jk...@apache.org.
[BEAM-92] Supports DynamicDestinations in AvroIO.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9f2622fa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9f2622fa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9f2622fa
Branch: refs/heads/master
Commit: 9f2622fa19da1284222e872fdcd63b086bdc3509
Parents: 1f2634d
Author: Reuven Lax <re...@google.com>
Authored: Thu Jul 6 20:22:25 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Jul 28 17:28:12 2017 -0700
----------------------------------------------------------------------
.../core/construction/ParDoTranslation.java | 2 +-
.../construction/WriteFilesTranslation.java | 81 ++--
.../construction/PTransformMatchersTest.java | 10 +-
.../construction/WriteFilesTranslationTest.java | 26 +-
.../direct/WriteWithShardingFactory.java | 10 +-
.../direct/WriteWithShardingFactoryTest.java | 8 +-
.../beam/runners/dataflow/DataflowRunner.java | 8 +-
.../runners/dataflow/DataflowRunnerTest.java | 10 +-
.../src/main/proto/beam_runner_api.proto | 2 +
.../java/org/apache/beam/sdk/io/AvroIO.java | 436 +++++++++++++++----
.../java/org/apache/beam/sdk/io/AvroSink.java | 93 ++--
.../beam/sdk/io/ConstantAvroDestination.java | 130 ++++++
.../beam/sdk/io/DefaultFilenamePolicy.java | 1 -
.../beam/sdk/io/DynamicAvroDestinations.java | 46 ++
.../beam/sdk/io/DynamicFileDestinations.java | 59 ++-
.../org/apache/beam/sdk/io/FileBasedSink.java | 121 +++--
.../java/org/apache/beam/sdk/io/TFRecordIO.java | 23 +-
.../java/org/apache/beam/sdk/io/TextIO.java | 228 ++++++----
.../java/org/apache/beam/sdk/io/TextSink.java | 14 +-
.../java/org/apache/beam/sdk/io/WriteFiles.java | 116 ++---
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 156 ++++++-
.../apache/beam/sdk/io/FileBasedSinkTest.java | 6 +-
.../java/org/apache/beam/sdk/io/SimpleSink.java | 10 +-
.../org/apache/beam/sdk/io/TextIOWriteTest.java | 23 +-
.../org/apache/beam/sdk/io/WriteFilesTest.java | 74 ++--
.../java/org/apache/beam/sdk/io/xml/XmlIO.java | 4 +-
.../org/apache/beam/sdk/io/xml/XmlSink.java | 8 +-
27 files changed, 1214 insertions(+), 491 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index d7b0e9f..5765c51 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -484,7 +484,7 @@ public class ParDoTranslation {
});
}
- private static SideInput toProto(PCollectionView<?> view) {
+ public static SideInput toProto(PCollectionView<?> view) {
Builder builder = SideInput.newBuilder();
builder.setAccessPattern(
FunctionSpec.newBuilder()
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
index b1d2da4..7954b0e 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
@@ -19,29 +19,35 @@
package org.apache.beam.runners.core.construction;
import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
import com.google.auto.service.AutoService;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.WriteFilesPayload;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TupleTag;
/**
* Utility methods for translating a {@link WriteFiles} to and from {@link RunnerApi}
@@ -53,28 +59,25 @@ public class WriteFilesTranslation {
public static final String CUSTOM_JAVA_FILE_BASED_SINK_URN =
"urn:beam:file_based_sink:javasdk:0.1";
- public static final String CUSTOM_JAVA_FILE_BASED_SINK_FORMAT_FUNCTION_URN =
- "urn:beam:file_based_sink_format_function:javasdk:0.1";
-
@VisibleForTesting
static WriteFilesPayload toProto(WriteFiles<?, ?, ?> transform) {
+ Map<String, SideInput> sideInputs = Maps.newHashMap();
+ for (PCollectionView<?> view : transform.getSink().getDynamicDestinations().getSideInputs()) {
+ sideInputs.put(view.getTagInternal().getId(), ParDoTranslation.toProto(view));
+ }
return WriteFilesPayload.newBuilder()
.setSink(toProto(transform.getSink()))
- .setFormatFunction(toProto(transform.getFormatFunction()))
.setWindowedWrites(transform.isWindowedWrites())
.setRunnerDeterminedSharding(
transform.getNumShards() == null && transform.getSharding() == null)
+ .putAllSideInputs(sideInputs)
.build();
}
- private static SdkFunctionSpec toProto(FileBasedSink<?, ?> sink) {
+ private static SdkFunctionSpec toProto(FileBasedSink<?, ?, ?> sink) {
return toProto(CUSTOM_JAVA_FILE_BASED_SINK_URN, sink);
}
- private static SdkFunctionSpec toProto(SerializableFunction<?, ?> serializableFunction) {
- return toProto(CUSTOM_JAVA_FILE_BASED_SINK_FORMAT_FUNCTION_URN, serializableFunction);
- }
-
private static SdkFunctionSpec toProto(String urn, Serializable serializable) {
return SdkFunctionSpec.newBuilder()
.setSpec(
@@ -91,7 +94,7 @@ public class WriteFilesTranslation {
}
@VisibleForTesting
- static FileBasedSink<?, ?> sinkFromProto(SdkFunctionSpec sinkProto) throws IOException {
+ static FileBasedSink<?, ?, ?> sinkFromProto(SdkFunctionSpec sinkProto) throws IOException {
checkArgument(
sinkProto.getSpec().getUrn().equals(CUSTOM_JAVA_FILE_BASED_SINK_URN),
"Cannot extract %s instance from %s with URN %s",
@@ -102,44 +105,44 @@ public class WriteFilesTranslation {
byte[] serializedSink =
sinkProto.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray();
- return (FileBasedSink<?, ?>)
+ return (FileBasedSink<?, ?, ?>)
SerializableUtils.deserializeFromByteArray(
serializedSink, FileBasedSink.class.getSimpleName());
}
- @VisibleForTesting
- static <InputT, OutputT> SerializableFunction<InputT, OutputT> formatFunctionFromProto(
- SdkFunctionSpec sinkProto) throws IOException {
- checkArgument(
- sinkProto.getSpec().getUrn().equals(CUSTOM_JAVA_FILE_BASED_SINK_FORMAT_FUNCTION_URN),
- "Cannot extract %s instance from %s with URN %s",
- SerializableFunction.class.getSimpleName(),
- FunctionSpec.class.getSimpleName(),
- sinkProto.getSpec().getUrn());
-
- byte[] serializedFunction =
- sinkProto.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray();
-
- return (SerializableFunction<InputT, OutputT>)
- SerializableUtils.deserializeFromByteArray(
- serializedFunction, FileBasedSink.class.getSimpleName());
- }
-
- public static <UserT, DestinationT, OutputT> FileBasedSink<OutputT, DestinationT> getSink(
+ public static <UserT, DestinationT, OutputT> FileBasedSink<UserT, DestinationT, OutputT> getSink(
AppliedPTransform<PCollection<UserT>, PDone, ? extends PTransform<PCollection<UserT>, PDone>>
transform)
throws IOException {
- return (FileBasedSink<OutputT, DestinationT>)
+ return (FileBasedSink<UserT, DestinationT, OutputT>)
sinkFromProto(getWriteFilesPayload(transform).getSink());
}
- public static <InputT, OutputT> SerializableFunction<InputT, OutputT> getFormatFunction(
- AppliedPTransform<
- PCollection<InputT>, PDone, ? extends PTransform<PCollection<InputT>, PDone>>
- transform)
- throws IOException {
- return formatFunctionFromProto(
- getWriteFilesPayload(transform).<InputT, OutputT>getFormatFunction());
+ public static <UserT, DestinationT, OutputT>
+ List<PCollectionView<?>> getDynamicDestinationSideInputs(
+ AppliedPTransform<
+ PCollection<UserT>, PDone, ? extends PTransform<PCollection<UserT>, PDone>>
+ transform)
+ throws IOException {
+ SdkComponents sdkComponents = SdkComponents.create();
+ RunnerApi.PTransform transformProto = PTransformTranslation.toProto(transform, sdkComponents);
+ List<PCollectionView<?>> views = Lists.newArrayList();
+ Map<String, SideInput> sideInputs = getWriteFilesPayload(transform).getSideInputsMap();
+ for (Map.Entry<String, SideInput> entry : sideInputs.entrySet()) {
+ PCollection<?> originalPCollection =
+ checkNotNull(
+ (PCollection<?>) transform.getInputs().get(new TupleTag<>(entry.getKey())),
+ "no input with tag %s",
+ entry.getKey());
+ views.add(
+ ParDoTranslation.viewFromProto(
+ entry.getValue(),
+ entry.getKey(),
+ originalPCollection,
+ transformProto,
+ RehydratedComponents.forComponents(sdkComponents.toComponents())));
+ }
+ return views;
}
public static <T> boolean isWindowedWrites(
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index 316645b..1862699 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -57,7 +57,6 @@ import org.apache.beam.sdk.transforms.Materialization;
import org.apache.beam.sdk.transforms.Materializations;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
@@ -549,15 +548,14 @@ public class PTransformMatchersTest implements Serializable {
false);
WriteFiles<Integer, Void, Integer> write =
WriteFiles.to(
- new FileBasedSink<Integer, Void>(
+ new FileBasedSink<Integer, Void, Integer>(
StaticValueProvider.of(outputDirectory),
- DynamicFileDestinations.constant(new FakeFilenamePolicy())) {
+ DynamicFileDestinations.<Integer>constant(new FakeFilenamePolicy())) {
@Override
- public WriteOperation<Integer, Void> createWriteOperation() {
+ public WriteOperation<Void, Integer> createWriteOperation() {
return null;
}
- },
- SerializableFunctions.<Integer>identity());
+ });
assertThat(
PTransformMatchers.writeWithRunnerDeterminedSharding().matches(appliedWrite(write)),
is(true));
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
index 4259ac8..e067fac 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
@@ -38,7 +38,6 @@ import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -63,12 +62,11 @@ public class WriteFilesTranslationTest {
public static class TestWriteFilesPayloadTranslation {
@Parameters(name = "{index}: {0}")
public static Iterable<WriteFiles<Object, Void, Object>> data() {
- SerializableFunction<Object, Object> format = SerializableFunctions.constant(null);
return ImmutableList.of(
- WriteFiles.to(new DummySink(), format),
- WriteFiles.to(new DummySink(), format).withWindowedWrites(),
- WriteFiles.to(new DummySink(), format).withNumShards(17),
- WriteFiles.to(new DummySink(), format).withWindowedWrites().withNumShards(42));
+ WriteFiles.to(new DummySink()),
+ WriteFiles.to(new DummySink()).withWindowedWrites(),
+ WriteFiles.to(new DummySink()).withNumShards(17),
+ WriteFiles.to(new DummySink()).withWindowedWrites().withNumShards(42));
}
@Parameter(0)
@@ -87,7 +85,8 @@ public class WriteFilesTranslationTest {
assertThat(payload.getWindowedWrites(), equalTo(writeFiles.isWindowedWrites()));
assertThat(
- (FileBasedSink<String, Void>) WriteFilesTranslation.sinkFromProto(payload.getSink()),
+ (FileBasedSink<String, Void, String>)
+ WriteFilesTranslation.sinkFromProto(payload.getSink()),
equalTo(writeFiles.getSink()));
}
@@ -118,16 +117,17 @@ public class WriteFilesTranslationTest {
* A simple {@link FileBasedSink} for testing serialization/deserialization. Not mocked to avoid
* any issues serializing mocks.
*/
- private static class DummySink extends FileBasedSink<Object, Void> {
+ private static class DummySink extends FileBasedSink<Object, Void, Object> {
DummySink() {
super(
StaticValueProvider.of(FileSystems.matchNewResource("nowhere", false)),
- DynamicFileDestinations.constant(new DummyFilenamePolicy()));
+ DynamicFileDestinations.constant(
+ new DummyFilenamePolicy(), SerializableFunctions.constant(null)));
}
@Override
- public WriteOperation<Object, Void> createWriteOperation() {
+ public WriteOperation<Void, Object> createWriteOperation() {
return new DummyWriteOperation(this);
}
@@ -152,13 +152,13 @@ public class WriteFilesTranslationTest {
}
}
- private static class DummyWriteOperation extends FileBasedSink.WriteOperation<Object, Void> {
- public DummyWriteOperation(FileBasedSink<Object, Void> sink) {
+ private static class DummyWriteOperation extends FileBasedSink.WriteOperation<Void, Object> {
+ public DummyWriteOperation(FileBasedSink<Object, Void, Object> sink) {
super(sink);
}
@Override
- public FileBasedSink.Writer<Object, Void> createWriter() throws Exception {
+ public FileBasedSink.Writer<Void, Object> createWriter() throws Exception {
throw new UnsupportedOperationException("Should never be called.");
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
index ba796ae..3557c5d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
@@ -24,10 +24,12 @@ import com.google.common.base.Suppliers;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.runners.core.construction.PTransformReplacements;
import org.apache.beam.runners.core.construction.WriteFilesTranslation;
+import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
@@ -61,10 +63,10 @@ class WriteWithShardingFactory<InputT>
AppliedPTransform<PCollection<InputT>, PDone, PTransform<PCollection<InputT>, PDone>>
transform) {
try {
- WriteFiles<InputT, ?, ?> replacement =
- WriteFiles.to(
- WriteFilesTranslation.getSink(transform),
- WriteFilesTranslation.getFormatFunction(transform));
+ List<PCollectionView<?>> sideInputs =
+ WriteFilesTranslation.getDynamicDestinationSideInputs(transform);
+ FileBasedSink sink = WriteFilesTranslation.getSink(transform);
+ WriteFiles<InputT, ?, ?> replacement = WriteFiles.to(sink).withSideInputs(sideInputs);
if (WriteFilesTranslation.isWindowedWrites(transform)) {
replacement = replacement.withWindowedWrites();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index 6dd069c..d0db44e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -55,7 +55,6 @@ import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -143,15 +142,14 @@ public class WriteWithShardingFactoryTest implements Serializable {
PTransform<PCollection<Object>, PDone> original =
WriteFiles.to(
- new FileBasedSink<Object, Void>(
+ new FileBasedSink<Object, Void, Object>(
StaticValueProvider.of(outputDirectory),
DynamicFileDestinations.constant(new FakeFilenamePolicy())) {
@Override
- public WriteOperation<Object, Void> createWriteOperation() {
+ public WriteOperation<Void, Object> createWriteOperation() {
throw new IllegalArgumentException("Should not be used");
}
- },
- SerializableFunctions.identity());
+ });
@SuppressWarnings("unchecked")
PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 762ac9f..f8d2c3c 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -92,6 +92,7 @@ import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.extensions.gcp.storage.PathValidator;
import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
@@ -1501,10 +1502,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
try {
+ List<PCollectionView<?>> sideInputs =
+ WriteFilesTranslation.getDynamicDestinationSideInputs(transform);
+ FileBasedSink sink = WriteFilesTranslation.getSink(transform);
WriteFiles<UserT, DestinationT, OutputT> replacement =
- WriteFiles.<UserT, DestinationT, OutputT>to(
- WriteFilesTranslation.<UserT, DestinationT, OutputT>getSink(transform),
- WriteFilesTranslation.<UserT, OutputT>getFormatFunction(transform));
+ WriteFiles.to(sink).withSideInputs(sideInputs);
if (WriteFilesTranslation.isWindowedWrites(transform)) {
replacement = replacement.withWindowedWrites();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 7556a28..9db73c6 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -1271,8 +1271,7 @@ public class DataflowRunnerTest implements Serializable {
StreamingShardedWriteFactory<Object, Void, Object> factory =
new StreamingShardedWriteFactory<>(p.getOptions());
- WriteFiles<Object, Void, Object> original =
- WriteFiles.to(new TestSink(tmpFolder.toString()), SerializableFunctions.identity());
+ WriteFiles<Object, Void, Object> original = WriteFiles.to(new TestSink(tmpFolder.toString()));
PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));
AppliedPTransform<PCollection<Object>, PDone, WriteFiles<Object, Void, Object>>
originalApplication =
@@ -1290,7 +1289,7 @@ public class DataflowRunnerTest implements Serializable {
assertThat(replacement.getNumShards().get(), equalTo(expectedNumShards));
}
- private static class TestSink extends FileBasedSink<Object, Void> {
+ private static class TestSink extends FileBasedSink<Object, Void, Object> {
@Override
public void validate(PipelineOptions options) {}
@@ -1315,11 +1314,12 @@ public class DataflowRunnerTest implements Serializable {
int shardNumber, int numShards, OutputFileHints outputFileHints) {
throw new UnsupportedOperationException("should not be called");
}
- }));
+ },
+ SerializableFunctions.identity()));
}
@Override
- public WriteOperation<Object, Void> createWriteOperation() {
+ public WriteOperation<Void, Object> createWriteOperation() {
throw new IllegalArgumentException("Should not be used");
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
index 42e2601..9afb565 100644
--- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
@@ -375,6 +375,8 @@ message WriteFilesPayload {
bool windowed_writes = 3;
bool runner_determined_sharding = 4;
+
+ map<string, SideInput> side_inputs = 5;
}
// A coder, the binary format for serialization and deserialization of data in
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 27c9073..824f725 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -25,7 +25,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
-import com.google.common.io.BaseEncoding;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.avro.Schema;
@@ -40,7 +39,6 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.ValueProvider;
@@ -51,7 +49,6 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
@@ -161,6 +158,51 @@ import org.apache.beam.sdk.values.TypeDescriptors;
* .withSuffix(".avro"));
* }</pre>
*
+ * <p>The following shows a more-complex example of AvroIO.Write usage, generating dynamic file
+ * destinations as well as a dynamic Avro schema per file. In this example, a PCollection of user
+ * events (e.g. actions on a website) is written out to Avro files. Each event contains the user id
+ * as an integer field. We want events for each user to go into a specific directory for that user,
+ * and each user's data should be written with a specific schema for that user; a side input is
+ * used, so the schema can be calculated in a different stage.
+ *
+ * <pre>{@code
+ * // This is the user class that controls dynamic destinations for this avro write. The input to
+ * // AvroIO.Write will be UserEvent, and we will be writing GenericRecords to the file (in order
+ * // to have dynamic schemas). Everything is per userid, so we define a dynamic destination type
+ * // of Integer.
+ * class UserDynamicAvroDestinations
+ * extends DynamicAvroDestinations<UserEvent, Integer, GenericRecord> {
+ * private final PCollectionView<Map<Integer, String>> userToSchemaMap;
+ * public UserDynamicAvroDestinations( PCollectionView<Map<Integer, String>> userToSchemaMap) {
+ * this.userToSchemaMap = userToSchemaMap;
+ * }
+ * public GenericRecord formatRecord(UserEvent record) {
+ * return formatUserRecord(record, getSchema(record.getUserId()));
+ * }
+ * public Schema getSchema(Integer userId) {
+ * return new Schema.Parser().parse(sideInput(userToSchemaMap).get(userId));
+ * }
+ * public Integer getDestination(UserEvent record) {
+ * return record.getUserId();
+ * }
+ * public Integer getDefaultDestination() {
+ * return 0;
+ * }
+ * public FilenamePolicy getFilenamePolicy(Integer userId) {
+ * return DefaultFilenamePolicy.fromParams(new Params().withBaseFilename(baseDir + "/user-"
+ * + userId + "/events"));
+ * }
+ * public List<PCollectionView<?>> getSideInputs() {
+ * return ImmutableList.<PCollectionView<?>>of(userToSchemaMap);
+ * }
+ * }
+ * PCollection<UserEvents> events = ...;
+ * PCollectionView<Integer, String> schemaMap = events.apply(
+ * "ComputeSchemas", new ComputePerUserSchemas());
+ * events.apply("WriteAvros", AvroIO.<Integer>writeCustomTypeToGenericRecords()
+ * .to(new UserDynamicAvros()));
+ * }</pre>
+ *
* <p>By default, {@link AvroIO.Write} produces output files that are compressed using the {@link
* org.apache.avro.file.Codec CodecFactory.deflateCodec(6)}. This default can be changed or
* overridden using {@link AvroIO.Write#withCodec}.
@@ -256,18 +298,53 @@ public class AvroIO {
* pattern).
*/
public static <T> Write<T> write(Class<T> recordClass) {
- return AvroIO.<T>defaultWriteBuilder()
- .setRecordClass(recordClass)
- .setSchema(ReflectData.get().getSchema(recordClass))
- .build();
+ return new Write<>(
+ AvroIO.<T, T>defaultWriteBuilder()
+ .setGenericRecords(false)
+ .setSchema(ReflectData.get().getSchema(recordClass))
+ .build());
}
/** Writes Avro records of the specified schema. */
public static Write<GenericRecord> writeGenericRecords(Schema schema) {
- return AvroIO.<GenericRecord>defaultWriteBuilder()
- .setRecordClass(GenericRecord.class)
- .setSchema(schema)
- .build();
+ return new Write<>(
+ AvroIO.<GenericRecord, GenericRecord>defaultWriteBuilder()
+ .setGenericRecords(true)
+ .setSchema(schema)
+ .build());
+ }
+
+ /**
+ * A {@link PTransform} that writes a {@link PCollection} to an avro file (or multiple avro files
+ * matching a sharding pattern), with each element of the input collection encoded into its own
+ * record of type OutputT.
+ *
+ * <p>This version allows you to apply {@link AvroIO} writes to a PCollection of a custom type
+ * {@link UserT}. A format mechanism that converts the input type {@link UserT} to the output type
+ * that will be written to the file must be specified. If using a custom {@link
+ * DynamicAvroDestinations} object this is done using {@link
+ * DynamicAvroDestinations#formatRecord}, otherwise the {@link
+ * AvroIO.TypedWrite#withFormatFunction} can be used to specify a format function.
+ *
+ * <p>The advantage of using a custom type is that is it allows a user-provided {@link
+ * DynamicAvroDestinations} object, set via {@link AvroIO.Write#to(DynamicAvroDestinations)} to
+ * examine the custom type when choosing a destination.
+ *
+ * <p>If the output type is {@link GenericRecord} use {@link #writeCustomTypeToGenericRecords()}
+ * instead.
+ */
+ public static <UserT, OutputT> TypedWrite<UserT, OutputT> writeCustomType() {
+ return AvroIO.<UserT, OutputT>defaultWriteBuilder().setGenericRecords(false).build();
+ }
+
+ /**
+ * Similar to {@link #writeCustomType()}, but specialized for the case where the output type is
+ * {@link GenericRecord}. A schema must be specified either in {@link
+ * DynamicAvroDestinations#getSchema} or if not using dynamic destinations, by using {@link
+ * TypedWrite#withSchema(Schema)}.
+ */
+ public static <UserT> TypedWrite<UserT, GenericRecord> writeCustomTypeToGenericRecords() {
+ return AvroIO.<UserT, GenericRecord>defaultWriteBuilder().setGenericRecords(true).build();
}
/**
@@ -277,12 +354,12 @@ public class AvroIO {
return writeGenericRecords(new Schema.Parser().parse(schema));
}
- private static <T> Write.Builder<T> defaultWriteBuilder() {
- return new AutoValue_AvroIO_Write.Builder<T>()
+ private static <UserT, OutputT> TypedWrite.Builder<UserT, OutputT> defaultWriteBuilder() {
+ return new AutoValue_AvroIO_TypedWrite.Builder<UserT, OutputT>()
.setFilenameSuffix(null)
.setShardTemplate(null)
.setNumShards(0)
- .setCodec(Write.DEFAULT_CODEC)
+ .setCodec(TypedWrite.DEFAULT_SERIALIZABLE_CODEC)
.setMetadata(ImmutableMap.<String, Object>of())
.setWindowedWrites(false);
}
@@ -572,15 +649,18 @@ public class AvroIO {
}
}
- /////////////////////////////////////////////////////////////////////////////
+ // ///////////////////////////////////////////////////////////////////////////
/** Implementation of {@link #write}. */
@AutoValue
- public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
- private static final SerializableAvroCodecFactory DEFAULT_CODEC =
- new SerializableAvroCodecFactory(CodecFactory.deflateCodec(6));
- // This should be a multiple of 4 to not get a partial encoded byte.
- private static final int METADATA_BYTES_MAX_LENGTH = 40;
+ public abstract static class TypedWrite<UserT, OutputT>
+ extends PTransform<PCollection<UserT>, PDone> {
+ static final CodecFactory DEFAULT_CODEC = CodecFactory.deflateCodec(6);
+ static final SerializableAvroCodecFactory DEFAULT_SERIALIZABLE_CODEC =
+ new SerializableAvroCodecFactory(DEFAULT_CODEC);
+
+ @Nullable
+ abstract SerializableFunction<UserT, OutputT> getFormatFunction();
@Nullable abstract ValueProvider<ResourceId> getFilenamePrefix();
@Nullable abstract String getShardTemplate();
@@ -590,11 +670,16 @@ public class AvroIO {
abstract ValueProvider<ResourceId> getTempDirectory();
abstract int getNumShards();
- @Nullable abstract Class<T> getRecordClass();
+
+ abstract boolean getGenericRecords();
+
@Nullable abstract Schema getSchema();
abstract boolean getWindowedWrites();
@Nullable abstract FilenamePolicy getFilenamePolicy();
+ @Nullable
+ abstract DynamicAvroDestinations<UserT, ?, OutputT> getDynamicDestinations();
+
/**
* The codec used to encode the blocks in the Avro file. String value drawn from those in
* https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html
@@ -603,25 +688,39 @@ public class AvroIO {
/** Avro file metadata. */
abstract ImmutableMap<String, Object> getMetadata();
- abstract Builder<T> toBuilder();
+ abstract Builder<UserT, OutputT> toBuilder();
@AutoValue.Builder
- abstract static class Builder<T> {
- abstract Builder<T> setFilenamePrefix(ValueProvider<ResourceId> filenamePrefix);
- abstract Builder<T> setFilenameSuffix(String filenameSuffix);
+ abstract static class Builder<UserT, OutputT> {
+ abstract Builder<UserT, OutputT> setFormatFunction(
+ SerializableFunction<UserT, OutputT> formatFunction);
- abstract Builder<T> setTempDirectory(ValueProvider<ResourceId> tempDirectory);
+ abstract Builder<UserT, OutputT> setFilenamePrefix(ValueProvider<ResourceId> filenamePrefix);
- abstract Builder<T> setNumShards(int numShards);
- abstract Builder<T> setShardTemplate(String shardTemplate);
- abstract Builder<T> setRecordClass(Class<T> recordClass);
- abstract Builder<T> setSchema(Schema schema);
- abstract Builder<T> setWindowedWrites(boolean windowedWrites);
- abstract Builder<T> setFilenamePolicy(FilenamePolicy filenamePolicy);
- abstract Builder<T> setCodec(SerializableAvroCodecFactory codec);
- abstract Builder<T> setMetadata(ImmutableMap<String, Object> metadata);
+ abstract Builder<UserT, OutputT> setFilenameSuffix(String filenameSuffix);
+
+ abstract Builder<UserT, OutputT> setTempDirectory(ValueProvider<ResourceId> tempDirectory);
+
+ abstract Builder<UserT, OutputT> setNumShards(int numShards);
+
+ abstract Builder<UserT, OutputT> setShardTemplate(String shardTemplate);
+
+ abstract Builder<UserT, OutputT> setGenericRecords(boolean genericRecords);
- abstract Write<T> build();
+ abstract Builder<UserT, OutputT> setSchema(Schema schema);
+
+ abstract Builder<UserT, OutputT> setWindowedWrites(boolean windowedWrites);
+
+ abstract Builder<UserT, OutputT> setFilenamePolicy(FilenamePolicy filenamePolicy);
+
+ abstract Builder<UserT, OutputT> setCodec(SerializableAvroCodecFactory codec);
+
+ abstract Builder<UserT, OutputT> setMetadata(ImmutableMap<String, Object> metadata);
+
+ abstract Builder<UserT, OutputT> setDynamicDestinations(
+ DynamicAvroDestinations<UserT, ?, OutputT> dynamicDestinations);
+
+ abstract TypedWrite<UserT, OutputT> build();
}
/**
@@ -635,7 +734,7 @@ public class AvroIO {
* common suffix (if supplied using {@link #withSuffix(String)}). This default can be overridden
* using {@link #to(FilenamePolicy)}.
*/
- public Write<T> to(String outputPrefix) {
+ public TypedWrite<UserT, OutputT> to(String outputPrefix) {
return to(FileBasedSink.convertToFileResourceIfPossible(outputPrefix));
}
@@ -658,14 +757,12 @@ public class AvroIO {
* infer a directory for temporary files.
*/
@Experimental(Kind.FILESYSTEM)
- public Write<T> to(ResourceId outputPrefix) {
+ public TypedWrite<UserT, OutputT> to(ResourceId outputPrefix) {
return toResource(StaticValueProvider.of(outputPrefix));
}
- /**
- * Like {@link #to(String)}.
- */
- public Write<T> to(ValueProvider<String> outputPrefix) {
+ /** Like {@link #to(String)}. */
+ public TypedWrite<UserT, OutputT> to(ValueProvider<String> outputPrefix) {
return toResource(NestedValueProvider.of(outputPrefix,
new SerializableFunction<String, ResourceId>() {
@Override
@@ -675,11 +772,9 @@ public class AvroIO {
}));
}
- /**
- * Like {@link #to(ResourceId)}.
- */
+ /** Like {@link #to(ResourceId)}. */
@Experimental(Kind.FILESYSTEM)
- public Write<T> toResource(ValueProvider<ResourceId> outputPrefix) {
+ public TypedWrite<UserT, OutputT> toResource(ValueProvider<ResourceId> outputPrefix) {
return toBuilder().setFilenamePrefix(outputPrefix).build();
}
@@ -687,16 +782,52 @@ public class AvroIO {
* Writes to files named according to the given {@link FileBasedSink.FilenamePolicy}. A
* directory for temporary files must be specified using {@link #withTempDirectory}.
*/
- public Write<T> to(FilenamePolicy filenamePolicy) {
+ @Experimental(Kind.FILESYSTEM)
+ public TypedWrite<UserT, OutputT> to(FilenamePolicy filenamePolicy) {
return toBuilder().setFilenamePolicy(filenamePolicy).build();
}
+ /**
+ * Use a {@link DynamicAvroDestinations} object to vend {@link FilenamePolicy} objects. These
+ * objects can examine the input record when creating a {@link FilenamePolicy}. A directory for
+ * temporary files must be specified using {@link #withTempDirectory}.
+ */
+ @Experimental(Kind.FILESYSTEM)
+ public TypedWrite<UserT, OutputT> to(
+ DynamicAvroDestinations<UserT, ?, OutputT> dynamicDestinations) {
+ return toBuilder().setDynamicDestinations(dynamicDestinations).build();
+ }
+
+ /**
+ * Sets the the output schema. Can only be used when the output type is {@link GenericRecord}
+ * and when not using {@link #to(DynamicAvroDestinations)}.
+ */
+ public TypedWrite<UserT, OutputT> withSchema(Schema schema) {
+ return toBuilder().setSchema(schema).build();
+ }
+
+ /**
+ * Specifies a format function to convert {@link UserT} to the output type. If {@link
+ * #to(DynamicAvroDestinations)} is used, {@link DynamicAvroDestinations#formatRecord} must be
+ * used instead.
+ */
+ public TypedWrite<UserT, OutputT> withFormatFunction(
+ SerializableFunction<UserT, OutputT> formatFunction) {
+ return toBuilder().setFormatFunction(formatFunction).build();
+ }
+
/** Set the base directory used to generate temporary files. */
@Experimental(Kind.FILESYSTEM)
- public Write<T> withTempDirectory(ValueProvider<ResourceId> tempDirectory) {
+ public TypedWrite<UserT, OutputT> withTempDirectory(ValueProvider<ResourceId> tempDirectory) {
return toBuilder().setTempDirectory(tempDirectory).build();
}
+ /** Set the base directory used to generate temporary files. */
+ @Experimental(Kind.FILESYSTEM)
+ public TypedWrite<UserT, OutputT> withTempDirectory(ResourceId tempDirectory) {
+ return withTempDirectory(StaticValueProvider.of(tempDirectory));
+ }
+
/**
* Uses the given {@link ShardNameTemplate} for naming output files. This option may only be
* used when using one of the default filename-prefix to() overrides.
@@ -704,7 +835,7 @@ public class AvroIO {
* <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are
* used.
*/
- public Write<T> withShardNameTemplate(String shardTemplate) {
+ public TypedWrite<UserT, OutputT> withShardNameTemplate(String shardTemplate) {
return toBuilder().setShardTemplate(shardTemplate).build();
}
@@ -715,7 +846,7 @@ public class AvroIO {
* <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are
* used.
*/
- public Write<T> withSuffix(String filenameSuffix) {
+ public TypedWrite<UserT, OutputT> withSuffix(String filenameSuffix) {
return toBuilder().setFilenameSuffix(filenameSuffix).build();
}
@@ -729,7 +860,7 @@ public class AvroIO {
*
* @param numShards the number of shards to use, or 0 to let the system decide.
*/
- public Write<T> withNumShards(int numShards) {
+ public TypedWrite<UserT, OutputT> withNumShards(int numShards) {
checkArgument(numShards >= 0);
return toBuilder().setNumShards(numShards).build();
}
@@ -744,7 +875,7 @@ public class AvroIO {
*
* <p>This is equivalent to {@code .withNumShards(1).withShardNameTemplate("")}
*/
- public Write<T> withoutSharding() {
+ public TypedWrite<UserT, OutputT> withoutSharding() {
return withNumShards(1).withShardNameTemplate("");
}
@@ -754,12 +885,12 @@ public class AvroIO {
* <p>If using {@link #to(FileBasedSink.FilenamePolicy)}. Filenames will be generated using
* {@link FilenamePolicy#windowedFilename}. See also {@link WriteFiles#withWindowedWrites()}.
*/
- public Write<T> withWindowedWrites() {
+ public TypedWrite<UserT, OutputT> withWindowedWrites() {
return toBuilder().setWindowedWrites(true).build();
}
/** Writes to Avro file(s) compressed using specified codec. */
- public Write<T> withCodec(CodecFactory codec) {
+ public TypedWrite<UserT, OutputT> withCodec(CodecFactory codec) {
return toBuilder().setCodec(new SerializableAvroCodecFactory(codec)).build();
}
@@ -768,7 +899,7 @@ public class AvroIO {
*
* <p>Supported value types are String, Long, and byte[].
*/
- public Write<T> withMetadata(Map<String, Object> metadata) {
+ public TypedWrite<UserT, OutputT> withMetadata(Map<String, Object> metadata) {
Map<String, String> badKeys = Maps.newLinkedHashMap();
for (Map.Entry<String, Object> entry : metadata.entrySet()) {
Object v = entry.getValue();
@@ -783,18 +914,31 @@ public class AvroIO {
return toBuilder().setMetadata(ImmutableMap.copyOf(metadata)).build();
}
- DynamicDestinations<T, Void> resolveDynamicDestinations() {
- FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
- if (usedFilenamePolicy == null) {
- usedFilenamePolicy =
- DefaultFilenamePolicy.fromStandardParameters(
- getFilenamePrefix(), getShardTemplate(), getFilenameSuffix(), getWindowedWrites());
+ DynamicAvroDestinations<UserT, ?, OutputT> resolveDynamicDestinations() {
+ DynamicAvroDestinations<UserT, ?, OutputT> dynamicDestinations = getDynamicDestinations();
+ if (dynamicDestinations == null) {
+ FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
+ if (usedFilenamePolicy == null) {
+ usedFilenamePolicy =
+ DefaultFilenamePolicy.fromStandardParameters(
+ getFilenamePrefix(),
+ getShardTemplate(),
+ getFilenameSuffix(),
+ getWindowedWrites());
+ }
+ dynamicDestinations =
+ constantDestinations(
+ usedFilenamePolicy,
+ getSchema(),
+ getMetadata(),
+ getCodec().getCodec(),
+ getFormatFunction());
}
- return DynamicFileDestinations.constant(usedFilenamePolicy);
+ return dynamicDestinations;
}
@Override
- public PDone expand(PCollection<T> input) {
+ public PDone expand(PCollection<UserT> input) {
checkArgument(
getFilenamePrefix() != null || getTempDirectory() != null,
"Need to set either the filename prefix or the tempDirectory of a AvroIO.Write "
@@ -805,24 +949,25 @@ public class AvroIO {
"shardTemplate and filenameSuffix should only be used with the default "
+ "filename policy");
}
+ if (getDynamicDestinations() != null) {
+ checkArgument(
+ getFormatFunction() == null,
+ "A format function should not be specified "
+ + "with DynamicDestinations. Use DynamicDestinations.formatRecord instead");
+ }
+
return expandTyped(input, resolveDynamicDestinations());
}
public <DestinationT> PDone expandTyped(
- PCollection<T> input, DynamicDestinations<T, DestinationT> dynamicDestinations) {
+ PCollection<UserT> input,
+ DynamicAvroDestinations<UserT, DestinationT, OutputT> dynamicDestinations) {
ValueProvider<ResourceId> tempDirectory = getTempDirectory();
if (tempDirectory == null) {
tempDirectory = getFilenamePrefix();
}
- WriteFiles<T, DestinationT, T> write =
- WriteFiles.to(
- new AvroSink<>(
- tempDirectory,
- dynamicDestinations,
- AvroCoder.of(getRecordClass(), getSchema()),
- getCodec(),
- getMetadata()),
- SerializableFunctions.<T>identity());
+ WriteFiles<UserT, DestinationT, OutputT> write =
+ WriteFiles.to(new AvroSink<>(tempDirectory, dynamicDestinations, getGenericRecords()));
if (getNumShards() > 0) {
write = write.withNumShards(getNumShards());
}
@@ -845,33 +990,11 @@ public class AvroIO {
: getTempDirectory().toString();
}
builder
- .add(DisplayData.item("schema", getRecordClass()).withLabel("Record Schema"))
.addIfNotDefault(
DisplayData.item("numShards", getNumShards()).withLabel("Maximum Output Shards"), 0)
- .addIfNotDefault(
- DisplayData.item("codec", getCodec().toString()).withLabel("Avro Compression Codec"),
- DEFAULT_CODEC.toString())
.addIfNotNull(
DisplayData.item("tempDirectory", tempDirectory)
.withLabel("Directory for temporary files"));
- builder.include("Metadata", new Metadata());
- }
-
- private class Metadata implements HasDisplayData {
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- for (Map.Entry<String, Object> entry : getMetadata().entrySet()) {
- DisplayData.Type type = DisplayData.inferType(entry.getValue());
- if (type != null) {
- builder.add(DisplayData.item(entry.getKey(), type, entry.getValue()));
- } else {
- String base64 = BaseEncoding.base64().encode((byte[]) entry.getValue());
- String repr = base64.length() <= METADATA_BYTES_MAX_LENGTH
- ? base64 : base64.substring(0, METADATA_BYTES_MAX_LENGTH) + "...";
- builder.add(DisplayData.item(entry.getKey(), repr));
- }
- }
- }
}
@Override
@@ -880,6 +1003,131 @@ public class AvroIO {
}
}
+ /**
+ * This class is used as the default return value of {@link AvroIO#write}
+ *
+ * <p>All methods in this class delegate to the appropriate method of {@link AvroIO.TypedWrite}.
+ * This class exists for backwards compatibility, and will be removed in Beam 3.0.
+ */
+ public static class Write<T> extends PTransform<PCollection<T>, PDone> {
+ @VisibleForTesting TypedWrite<T, T> inner;
+
+ Write(TypedWrite<T, T> inner) {
+ this.inner = inner;
+ }
+
+ /** See {@link TypedWrite#to(String)}. */
+ public Write<T> to(String outputPrefix) {
+ return new Write<>(
+ inner
+ .to(FileBasedSink.convertToFileResourceIfPossible(outputPrefix))
+ .withFormatFunction(SerializableFunctions.<T>identity()));
+ }
+
+ /** See {@link TypedWrite#to(ResourceId)} . */
+ @Experimental(Kind.FILESYSTEM)
+ public Write<T> to(ResourceId outputPrefix) {
+ return new Write<T>(
+ inner.to(outputPrefix).withFormatFunction(SerializableFunctions.<T>identity()));
+ }
+
+ /** See {@link TypedWrite#to(ValueProvider)}. */
+ public Write<T> to(ValueProvider<String> outputPrefix) {
+ return new Write<>(
+ inner.to(outputPrefix).withFormatFunction(SerializableFunctions.<T>identity()));
+ }
+
+ /** See {@link TypedWrite#to(ResourceId)}. */
+ @Experimental(Kind.FILESYSTEM)
+ public Write<T> toResource(ValueProvider<ResourceId> outputPrefix) {
+ return new Write<>(
+ inner.toResource(outputPrefix).withFormatFunction(SerializableFunctions.<T>identity()));
+ }
+
+ /** See {@link TypedWrite#to(FilenamePolicy)}. */
+ public Write<T> to(FilenamePolicy filenamePolicy) {
+ return new Write<>(
+ inner.to(filenamePolicy).withFormatFunction(SerializableFunctions.<T>identity()));
+ }
+
+ /** See {@link TypedWrite#to(DynamicAvroDestinations)}. */
+ public Write to(DynamicAvroDestinations<T, ?, T> dynamicDestinations) {
+ return new Write<>(inner.to(dynamicDestinations).withFormatFunction(null));
+ }
+
+ /** See {@link TypedWrite#withSchema}. */
+ public Write withSchema(Schema schema) {
+ return new Write<>(inner.withSchema(schema));
+ }
+ /** See {@link TypedWrite#withTempDirectory(ValueProvider)}. */
+ @Experimental(Kind.FILESYSTEM)
+ public Write<T> withTempDirectory(ValueProvider<ResourceId> tempDirectory) {
+ return new Write<>(inner.withTempDirectory(tempDirectory));
+ }
+
+ /** See {@link TypedWrite#withTempDirectory(ResourceId)}. */
+ public Write<T> withTempDirectory(ResourceId tempDirectory) {
+ return new Write<>(inner.withTempDirectory(tempDirectory));
+ }
+
+ /** See {@link TypedWrite#withShardNameTemplate}. */
+ public Write<T> withShardNameTemplate(String shardTemplate) {
+ return new Write<>(inner.withShardNameTemplate(shardTemplate));
+ }
+
+ /** See {@link TypedWrite#withSuffix}. */
+ public Write<T> withSuffix(String filenameSuffix) {
+ return new Write<>(inner.withSuffix(filenameSuffix));
+ }
+
+ /** See {@link TypedWrite#withNumShards}. */
+ public Write<T> withNumShards(int numShards) {
+ return new Write<>(inner.withNumShards(numShards));
+ }
+
+ /** See {@link TypedWrite#withoutSharding}. */
+ public Write<T> withoutSharding() {
+ return new Write<>(inner.withoutSharding());
+ }
+
+ /** See {@link TypedWrite#withWindowedWrites}. */
+ public Write withWindowedWrites() {
+ return new Write<T>(inner.withWindowedWrites());
+ }
+
+ /** See {@link TypedWrite#withCodec}. */
+ public Write<T> withCodec(CodecFactory codec) {
+ return new Write<>(inner.withCodec(codec));
+ }
+
+ /** See {@link TypedWrite#withMetadata} . */
+ public Write withMetadata(Map<String, Object> metadata) {
+ return new Write<>(inner.withMetadata(metadata));
+ }
+
+ @Override
+ public PDone expand(PCollection<T> input) {
+ return inner.expand(input);
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ inner.populateDisplayData(builder);
+ }
+ }
+
+ /**
+ * Returns a {@link DynamicAvroDestinations} that always returns the same {@link FilenamePolicy},
+ * schema, metadata, and codec.
+ */
+ public static <UserT, OutputT> DynamicAvroDestinations<UserT, Void, OutputT> constantDestinations(
+ FilenamePolicy filenamePolicy,
+ Schema schema,
+ Map<String, Object> metadata,
+ CodecFactory codec,
+ SerializableFunction<UserT, OutputT> formatFunction) {
+ return new ConstantAvroDestination<>(filenamePolicy, schema, metadata, codec, formatFunction);
+ }
/////////////////////////////////////////////////////////////////////////////
/** Disallow construction of utility class. */
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
index c78870b..acd3ea6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
@@ -17,93 +17,90 @@
*/
package org.apache.beam.sdk.io;
-import com.google.common.collect.ImmutableMap;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.util.MimeTypes;
/** A {@link FileBasedSink} for Avro files. */
-class AvroSink<T, DestinationT> extends FileBasedSink<T, DestinationT> {
- private final AvroCoder<T> coder;
- private final SerializableAvroCodecFactory codec;
- private final ImmutableMap<String, Object> metadata;
+class AvroSink<UserT, DestinationT, OutputT> extends FileBasedSink<UserT, DestinationT, OutputT> {
+ private final DynamicAvroDestinations<UserT, DestinationT, OutputT> dynamicDestinations;
+ private final boolean genericRecords;
AvroSink(
ValueProvider<ResourceId> outputPrefix,
- DynamicDestinations<T, DestinationT> dynamicDestinations,
- AvroCoder<T> coder,
- SerializableAvroCodecFactory codec,
- ImmutableMap<String, Object> metadata) {
+ DynamicAvroDestinations<UserT, DestinationT, OutputT> dynamicDestinations,
+ boolean genericRecords) {
// Avro handle compression internally using the codec.
super(outputPrefix, dynamicDestinations, CompressionType.UNCOMPRESSED);
- this.coder = coder;
- this.codec = codec;
- this.metadata = metadata;
+ this.dynamicDestinations = dynamicDestinations;
+ this.genericRecords = genericRecords;
}
@Override
- public WriteOperation<T, DestinationT> createWriteOperation() {
- return new AvroWriteOperation<>(this, coder, codec, metadata);
+ public DynamicAvroDestinations<UserT, DestinationT, OutputT> getDynamicDestinations() {
+ return (DynamicAvroDestinations<UserT, DestinationT, OutputT>) super.getDynamicDestinations();
+ }
+
+ @Override
+ public WriteOperation<DestinationT, OutputT> createWriteOperation() {
+ return new AvroWriteOperation<>(this, genericRecords);
}
/** A {@link WriteOperation WriteOperation} for Avro files. */
- private static class AvroWriteOperation<T, DestinationT> extends WriteOperation<T, DestinationT> {
- private final AvroCoder<T> coder;
- private final SerializableAvroCodecFactory codec;
- private final ImmutableMap<String, Object> metadata;
+ private static class AvroWriteOperation<DestinationT, OutputT>
+ extends WriteOperation<DestinationT, OutputT> {
+ private final DynamicAvroDestinations<?, DestinationT, ?> dynamicDestinations;
+ private final boolean genericRecords;
- private AvroWriteOperation(
- AvroSink<T, DestinationT> sink,
- AvroCoder<T> coder,
- SerializableAvroCodecFactory codec,
- ImmutableMap<String, Object> metadata) {
+ private AvroWriteOperation(AvroSink<?, DestinationT, OutputT> sink, boolean genericRecords) {
super(sink);
- this.coder = coder;
- this.codec = codec;
- this.metadata = metadata;
+ this.dynamicDestinations = sink.getDynamicDestinations();
+ this.genericRecords = genericRecords;
}
@Override
- public Writer<T, DestinationT> createWriter() throws Exception {
- return new AvroWriter<>(this, coder, codec, metadata);
+ public Writer<DestinationT, OutputT> createWriter() throws Exception {
+ return new AvroWriter<>(this, dynamicDestinations, genericRecords);
}
}
/** A {@link Writer Writer} for Avro files. */
- private static class AvroWriter<T, DestinationT> extends Writer<T, DestinationT> {
- private final AvroCoder<T> coder;
- private DataFileWriter<T> dataFileWriter;
- private SerializableAvroCodecFactory codec;
- private final ImmutableMap<String, Object> metadata;
+ private static class AvroWriter<DestinationT, OutputT> extends Writer<DestinationT, OutputT> {
+ private DataFileWriter<OutputT> dataFileWriter;
+ private final DynamicAvroDestinations<?, DestinationT, ?> dynamicDestinations;
+ private final boolean genericRecords;
public AvroWriter(
- WriteOperation<T, DestinationT> writeOperation,
- AvroCoder<T> coder,
- SerializableAvroCodecFactory codec,
- ImmutableMap<String, Object> metadata) {
+ WriteOperation<DestinationT, OutputT> writeOperation,
+ DynamicAvroDestinations<?, DestinationT, ?> dynamicDestinations,
+ boolean genericRecords) {
super(writeOperation, MimeTypes.BINARY);
- this.coder = coder;
- this.codec = codec;
- this.metadata = metadata;
+ this.dynamicDestinations = dynamicDestinations;
+ this.genericRecords = genericRecords;
}
@SuppressWarnings("deprecation") // uses internal test functionality.
@Override
protected void prepareWrite(WritableByteChannel channel) throws Exception {
- DatumWriter<T> datumWriter = coder.getType().equals(GenericRecord.class)
- ? new GenericDatumWriter<T>(coder.getSchema())
- : new ReflectDatumWriter<T>(coder.getSchema());
+ DestinationT destination = getDestination();
+ CodecFactory codec = dynamicDestinations.getCodec(destination);
+ Schema schema = dynamicDestinations.getSchema(destination);
+ Map<String, Object> metadata = dynamicDestinations.getMetadata(destination);
- dataFileWriter = new DataFileWriter<>(datumWriter).setCodec(codec.getCodec());
+ DatumWriter<OutputT> datumWriter =
+ genericRecords
+ ? new GenericDatumWriter<OutputT>(schema)
+ : new ReflectDatumWriter<OutputT>(schema);
+ dataFileWriter = new DataFileWriter<>(datumWriter).setCodec(codec);
for (Map.Entry<String, Object> entry : metadata.entrySet()) {
Object v = entry.getValue();
if (v instanceof String) {
@@ -118,11 +115,11 @@ class AvroSink<T, DestinationT> extends FileBasedSink<T, DestinationT> {
+ v.getClass().getSimpleName());
}
}
- dataFileWriter.create(coder.getSchema(), Channels.newOutputStream(channel));
+ dataFileWriter.create(schema, Channels.newOutputStream(channel));
}
@Override
- public void write(T value) throws Exception {
+ public void write(OutputT value) throws Exception {
dataFileWriter.append(value);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ConstantAvroDestination.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ConstantAvroDestination.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ConstantAvroDestination.java
new file mode 100644
index 0000000..b006e26
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ConstantAvroDestination.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io;
+
+import com.google.common.base.Function;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.io.BaseEncoding;
+import java.io.Serializable;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+
+/** Always returns a constant {@link FilenamePolicy}, {@link Schema}, metadata, and codec. */
+class ConstantAvroDestination<UserT, OutputT>
+ extends DynamicAvroDestinations<UserT, Void, OutputT> {
+ private static class SchemaFunction implements Serializable, Function<String, Schema> {
+ @Nullable
+ @Override
+ public Schema apply(@Nullable String input) {
+ return new Schema.Parser().parse(input);
+ }
+ }
+
+ // This should be a multiple of 4 to not get a partial encoded byte.
+ private static final int METADATA_BYTES_MAX_LENGTH = 40;
+ private final FilenamePolicy filenamePolicy;
+ private final Supplier<Schema> schema;
+ private final Map<String, Object> metadata;
+ private final SerializableAvroCodecFactory codec;
+ private final SerializableFunction<UserT, OutputT> formatFunction;
+
+ private class Metadata implements HasDisplayData {
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ for (Map.Entry<String, Object> entry : metadata.entrySet()) {
+ DisplayData.Type type = DisplayData.inferType(entry.getValue());
+ if (type != null) {
+ builder.add(DisplayData.item(entry.getKey(), type, entry.getValue()));
+ } else {
+ String base64 = BaseEncoding.base64().encode((byte[]) entry.getValue());
+ String repr =
+ base64.length() <= METADATA_BYTES_MAX_LENGTH
+ ? base64
+ : base64.substring(0, METADATA_BYTES_MAX_LENGTH) + "...";
+ builder.add(DisplayData.item(entry.getKey(), repr));
+ }
+ }
+ }
+ }
+
+ public ConstantAvroDestination(
+ FilenamePolicy filenamePolicy,
+ Schema schema,
+ Map<String, Object> metadata,
+ CodecFactory codec,
+ SerializableFunction<UserT, OutputT> formatFunction) {
+ this.filenamePolicy = filenamePolicy;
+ this.schema = Suppliers.compose(new SchemaFunction(), Suppliers.ofInstance(schema.toString()));
+ this.metadata = metadata;
+ this.codec = new SerializableAvroCodecFactory(codec);
+ this.formatFunction = formatFunction;
+ }
+
+ @Override
+ public OutputT formatRecord(UserT record) {
+ return formatFunction.apply(record);
+ }
+
+ @Override
+ public Void getDestination(UserT element) {
+ return (Void) null;
+ }
+
+ @Override
+ public Void getDefaultDestination() {
+ return (Void) null;
+ }
+
+ @Override
+ public FilenamePolicy getFilenamePolicy(Void destination) {
+ return filenamePolicy;
+ }
+
+ @Override
+ public Schema getSchema(Void destination) {
+ return schema.get();
+ }
+
+ @Override
+ public Map<String, Object> getMetadata(Void destination) {
+ return metadata;
+ }
+
+ @Override
+ public CodecFactory getCodec(Void destination) {
+ return codec.getCodec();
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ filenamePolicy.populateDisplayData(builder);
+ builder.add(DisplayData.item("schema", schema.get().toString()).withLabel("Record Schema"));
+ builder.addIfNotDefault(
+ DisplayData.item("codec", codec.getCodec().toString()).withLabel("Avro Compression Codec"),
+ AvroIO.TypedWrite.DEFAULT_SERIALIZABLE_CODEC.toString());
+ builder.include("Metadata", new Metadata());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
index 4021609..1f438d5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
@@ -157,7 +157,6 @@ public final class DefaultFilenamePolicy extends FilenamePolicy {
&& shardTemplate.equals(other.shardTemplate)
&& suffix.equals(other.suffix);
}
-
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicAvroDestinations.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicAvroDestinations.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicAvroDestinations.java
new file mode 100644
index 0000000..f4e8ee6
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicAvroDestinations.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
+
+/**
+ * A specialization of {@link DynamicDestinations} for {@link AvroIO}. In addition to dynamic file
+ * destinations, this allows specifying other AVRO properties (schema, metadata, codec) per
+ * destination.
+ */
+public abstract class DynamicAvroDestinations<UserT, DestinationT, OutputT>
+ extends DynamicDestinations<UserT, DestinationT, OutputT> {
+ /** Return an AVRO schema for a given destination. */
+ public abstract Schema getSchema(DestinationT destination);
+
+ /** Return AVRO file metadata for a given destination. */
+ public Map<String, Object> getMetadata(DestinationT destination) {
+ return ImmutableMap.<String, Object>of();
+ }
+
+ /** Return an AVRO codec for a given destination. */
+ public CodecFactory getCodec(DestinationT destination) {
+ return AvroIO.TypedWrite.DEFAULT_CODEC;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
index d05a01a7..b087bc5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
@@ -18,7 +18,6 @@
package org.apache.beam.sdk.io;
-import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import javax.annotation.Nullable;
@@ -28,20 +27,30 @@ import org.apache.beam.sdk.io.DefaultFilenamePolicy.ParamsCoder;
import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.display.DisplayData;
/** Some helper classes that derive from {@link FileBasedSink.DynamicDestinations}. */
public class DynamicFileDestinations {
/** Always returns a constant {@link FilenamePolicy}. */
- private static class ConstantFilenamePolicy<T> extends DynamicDestinations<T, Void> {
+ private static class ConstantFilenamePolicy<UserT, OutputT>
+ extends DynamicDestinations<UserT, Void, OutputT> {
private final FilenamePolicy filenamePolicy;
+ private final SerializableFunction<UserT, OutputT> formatFunction;
- public ConstantFilenamePolicy(FilenamePolicy filenamePolicy) {
- this.filenamePolicy = checkNotNull(filenamePolicy);
+ public ConstantFilenamePolicy(
+ FilenamePolicy filenamePolicy, SerializableFunction<UserT, OutputT> formatFunction) {
+ this.filenamePolicy = filenamePolicy;
+ this.formatFunction = formatFunction;
}
@Override
- public Void getDestination(T element) {
+ public OutputT formatRecord(UserT record) {
+ return formatFunction.apply(record);
+ }
+
+ @Override
+ public Void getDestination(UserT element) {
return (Void) null;
}
@@ -71,14 +80,24 @@ public class DynamicFileDestinations {
* A base class for a {@link DynamicDestinations} object that returns differently-configured
* instances of {@link DefaultFilenamePolicy}.
*/
- private static class DefaultPolicyDestinations<UserT> extends DynamicDestinations<UserT, Params> {
- SerializableFunction<UserT, Params> destinationFunction;
- Params emptyDestination;
+ private static class DefaultPolicyDestinations<UserT, OutputT>
+ extends DynamicDestinations<UserT, Params, OutputT> {
+ private final SerializableFunction<UserT, Params> destinationFunction;
+ private final Params emptyDestination;
+ private final SerializableFunction<UserT, OutputT> formatFunction;
public DefaultPolicyDestinations(
- SerializableFunction<UserT, Params> destinationFunction, Params emptyDestination) {
+ SerializableFunction<UserT, Params> destinationFunction,
+ Params emptyDestination,
+ SerializableFunction<UserT, OutputT> formatFunction) {
this.destinationFunction = destinationFunction;
this.emptyDestination = emptyDestination;
+ this.formatFunction = formatFunction;
+ }
+
+ @Override
+ public OutputT formatRecord(UserT record) {
+ return formatFunction.apply(record);
}
@Override
@@ -104,16 +123,28 @@ public class DynamicFileDestinations {
}
/** Returns a {@link DynamicDestinations} that always returns the same {@link FilenamePolicy}. */
- public static <T> DynamicDestinations<T, Void> constant(FilenamePolicy filenamePolicy) {
- return new ConstantFilenamePolicy<>(filenamePolicy);
+ public static <UserT, OutputT> DynamicDestinations<UserT, Void, OutputT> constant(
+ FilenamePolicy filenamePolicy, SerializableFunction<UserT, OutputT> formatFunction) {
+ return new ConstantFilenamePolicy<>(filenamePolicy, formatFunction);
+ }
+
+ /**
+ * A specialization of {@link #constant(FilenamePolicy, SerializableFunction)} for the case where
+ * UserT and OutputT are the same type and the format function is the identity.
+ */
+ public static <UserT> DynamicDestinations<UserT, Void, UserT> constant(
+ FilenamePolicy filenamePolicy) {
+ return new ConstantFilenamePolicy<>(filenamePolicy, SerializableFunctions.<UserT>identity());
}
/**
* Returns a {@link DynamicDestinations} that returns instances of {@link DefaultFilenamePolicy}
* configured with the given {@link Params}.
*/
- public static <UserT> DynamicDestinations<UserT, Params> toDefaultPolicies(
- SerializableFunction<UserT, Params> destinationFunction, Params emptyDestination) {
- return new DefaultPolicyDestinations<>(destinationFunction, emptyDestination);
+ public static <UserT, OutputT> DynamicDestinations<UserT, Params, OutputT> toDefaultPolicies(
+ SerializableFunction<UserT, Params> destinationFunction,
+ Params emptyDestination,
+ SerializableFunction<UserT, OutputT> formatFunction) {
+ return new DefaultPolicyDestinations<>(destinationFunction, emptyDestination, formatFunction);
}
}
[2/4] beam git commit: [BEAM-92] Supports DynamicDestinations in
AvroIO.
Posted by jk...@apache.org.
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 3bf5d5b..4e2b61c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -27,8 +27,10 @@ import static org.apache.beam.sdk.values.TypeDescriptors.extractFromTypeParamete
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import java.io.IOException;
import java.io.InputStream;
@@ -40,7 +42,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -67,6 +68,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
@@ -74,6 +76,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
import org.apache.beam.sdk.util.MimeTypes;
+import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors.TypeVariableExtractor;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
@@ -95,9 +98,9 @@ import org.slf4j.LoggerFactory;
* <p>The process of writing to file-based sink is as follows:
*
* <ol>
- * <li>An optional subclass-defined initialization,
- * <li>a parallel write of bundles to temporary files, and finally,
- * <li>these temporary files are renamed with final output filenames.
+ * <li>An optional subclass-defined initialization,
+ * <li>a parallel write of bundles to temporary files, and finally,
+ * <li>these temporary files are renamed with final output filenames.
* </ol>
*
* <p>In order to ensure fault-tolerance, a bundle may be executed multiple times (e.g., in the
@@ -121,7 +124,8 @@ import org.slf4j.LoggerFactory;
* @param <OutputT> the type of values written to the sink.
*/
@Experimental(Kind.FILESYSTEM)
-public abstract class FileBasedSink<OutputT, DestinationT> implements Serializable, HasDisplayData {
+public abstract class FileBasedSink<UserT, DestinationT, OutputT>
+ implements Serializable, HasDisplayData {
private static final Logger LOG = LoggerFactory.getLogger(FileBasedSink.class);
/** Directly supported file output compression types. */
@@ -199,7 +203,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
}
}
- private final DynamicDestinations<?, DestinationT> dynamicDestinations;
+ private final DynamicDestinations<?, DestinationT, OutputT> dynamicDestinations;
/**
* The {@link WritableByteChannelFactory} that is used to wrap the raw data output to the
@@ -215,8 +219,54 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
* destination type into an instance of {@link FilenamePolicy}.
*/
@Experimental(Kind.FILESYSTEM)
- public abstract static class DynamicDestinations<UserT, DestinationT>
+ public abstract static class DynamicDestinations<UserT, DestinationT, OutputT>
implements HasDisplayData, Serializable {
+ interface SideInputAccessor {
+ <SideInputT> SideInputT sideInput(PCollectionView<SideInputT> view);
+ }
+
+ private SideInputAccessor sideInputAccessor;
+
+ static class SideInputAccessorViaProcessContext implements SideInputAccessor {
+ private DoFn<?, ?>.ProcessContext processContext;
+
+ SideInputAccessorViaProcessContext(DoFn<?, ?>.ProcessContext processContext) {
+ this.processContext = processContext;
+ }
+
+ @Override
+ public <SideInputT> SideInputT sideInput(PCollectionView<SideInputT> view) {
+ return processContext.sideInput(view);
+ }
+ }
+
+ /**
+ * Override to specify that this object needs access to one or more side inputs. This side
+ * inputs must be globally windowed, as they will be accessed from the global window.
+ */
+ public List<PCollectionView<?>> getSideInputs() {
+ return ImmutableList.of();
+ }
+
+ /**
+ * Returns the value of a given side input. The view must be present in {@link
+ * #getSideInputs()}.
+ */
+ protected final <SideInputT> SideInputT sideInput(PCollectionView<SideInputT> view) {
+ return sideInputAccessor.sideInput(view);
+ }
+
+ final void setSideInputAccessor(SideInputAccessor sideInputAccessor) {
+ this.sideInputAccessor = sideInputAccessor;
+ }
+
+ final void setSideInputAccessorFromProcessContext(DoFn<?, ?>.ProcessContext context) {
+ this.sideInputAccessor = new SideInputAccessorViaProcessContext(context);
+ }
+
+ /** Convert an input record type into the output type. */
+ public abstract OutputT formatRecord(UserT record);
+
/**
* Returns an object that represents at a high level the destination being written to. May not
* return null. A destination must have deterministic hash and equality methods defined.
@@ -256,12 +306,13 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
return destinationCoder;
}
// If dynamicDestinations doesn't provide a coder, try to find it in the coder registry.
- @Nullable TypeDescriptor<DestinationT> descriptor =
+ @Nullable
+ TypeDescriptor<DestinationT> descriptor =
extractFromTypeParameters(
this,
DynamicDestinations.class,
new TypeVariableExtractor<
- DynamicDestinations<UserT, DestinationT>, DestinationT>() {});
+ DynamicDestinations<UserT, DestinationT, OutputT>, DestinationT>() {});
checkArgument(
descriptor != null,
"Unable to infer a coder for DestinationT, "
@@ -323,7 +374,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
@Experimental(Kind.FILESYSTEM)
public FileBasedSink(
ValueProvider<ResourceId> tempDirectoryProvider,
- DynamicDestinations<?, DestinationT> dynamicDestinations) {
+ DynamicDestinations<?, DestinationT, OutputT> dynamicDestinations) {
this(tempDirectoryProvider, dynamicDestinations, CompressionType.UNCOMPRESSED);
}
@@ -331,7 +382,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
@Experimental(Kind.FILESYSTEM)
public FileBasedSink(
ValueProvider<ResourceId> tempDirectoryProvider,
- DynamicDestinations<?, DestinationT> dynamicDestinations,
+ DynamicDestinations<?, DestinationT, OutputT> dynamicDestinations,
WritableByteChannelFactory writableByteChannelFactory) {
this.tempDirectoryProvider =
NestedValueProvider.of(tempDirectoryProvider, new ExtractDirectory());
@@ -341,8 +392,8 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
/** Return the {@link DynamicDestinations} used. */
@SuppressWarnings("unchecked")
- public <UserT> DynamicDestinations<UserT, DestinationT> getDynamicDestinations() {
- return (DynamicDestinations<UserT, DestinationT>) dynamicDestinations;
+ public DynamicDestinations<UserT, DestinationT, OutputT> getDynamicDestinations() {
+ return (DynamicDestinations<UserT, DestinationT, OutputT>) dynamicDestinations;
}
/**
@@ -357,7 +408,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
public void validate(PipelineOptions options) {}
/** Return a subclass of {@link WriteOperation} that will manage the write to the sink. */
- public abstract WriteOperation<OutputT, DestinationT> createWriteOperation();
+ public abstract WriteOperation<DestinationT, OutputT> createWriteOperation();
public void populateDisplayData(DisplayData.Builder builder) {
getDynamicDestinations().populateDisplayData(builder);
@@ -371,11 +422,11 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
* written,
*
* <ol>
- * <li>{@link WriteOperation#finalize} is given a list of the temporary files containing the
- * output bundles.
- * <li>During finalize, these temporary files are copied to final output locations and named
- * according to a file naming template.
- * <li>Finally, any temporary files that were created during the write are removed.
+ * <li>{@link WriteOperation#finalize} is given a list of the temporary files containing the
+ * output bundles.
+ * <li>During finalize, these temporary files are copied to final output locations and named
+ * according to a file naming template.
+ * <li>Finally, any temporary files that were created during the write are removed.
* </ol>
*
* <p>Subclass implementations of WriteOperation must implement {@link
@@ -400,9 +451,9 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
*
* @param <OutputT> the type of values written to the sink.
*/
- public abstract static class WriteOperation<OutputT, DestinationT> implements Serializable {
+ public abstract static class WriteOperation<DestinationT, OutputT> implements Serializable {
/** The Sink that this WriteOperation will write to. */
- protected final FileBasedSink<OutputT, DestinationT> sink;
+ protected final FileBasedSink<?, DestinationT, OutputT> sink;
/** Directory for temporary output files. */
protected final ValueProvider<ResourceId> tempDirectory;
@@ -428,7 +479,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
*
* @param sink the FileBasedSink that will be used to configure this write operation.
*/
- public WriteOperation(FileBasedSink<OutputT, DestinationT> sink) {
+ public WriteOperation(FileBasedSink<?, DestinationT, OutputT> sink) {
this(
sink,
NestedValueProvider.of(sink.getTempDirectoryProvider(), new TemporaryDirectoryBuilder()));
@@ -463,12 +514,12 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
* @param tempDirectory the base directory to be used for temporary output files.
*/
@Experimental(Kind.FILESYSTEM)
- public WriteOperation(FileBasedSink<OutputT, DestinationT> sink, ResourceId tempDirectory) {
+ public WriteOperation(FileBasedSink<?, DestinationT, OutputT> sink, ResourceId tempDirectory) {
this(sink, StaticValueProvider.of(tempDirectory));
}
private WriteOperation(
- FileBasedSink<OutputT, DestinationT> sink, ValueProvider<ResourceId> tempDirectory) {
+ FileBasedSink<?, DestinationT, OutputT> sink, ValueProvider<ResourceId> tempDirectory) {
this.sink = sink;
this.tempDirectory = tempDirectory;
this.windowedWrites = false;
@@ -478,7 +529,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
* Clients must implement to return a subclass of {@link Writer}. This method must not mutate
* the state of the object.
*/
- public abstract Writer<OutputT, DestinationT> createWriter() throws Exception;
+ public abstract Writer<DestinationT, OutputT> createWriter() throws Exception;
/** Indicates that the operation will be performing windowed writes. */
public void setWindowedWrites(boolean windowedWrites) {
@@ -533,7 +584,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
protected final Map<ResourceId, ResourceId> buildOutputFilenames(
Iterable<FileResult<DestinationT>> writerResults) {
int numShards = Iterables.size(writerResults);
- Map<ResourceId, ResourceId> outputFilenames = new HashMap<>();
+ Map<ResourceId, ResourceId> outputFilenames = Maps.newHashMap();
// Either all results have a shard number set (if the sink is configured with a fixed
// number of shards), or they all don't (otherwise).
@@ -597,7 +648,6 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
"Only generated %s distinct file names for %s files.",
numDistinctShards,
outputFilenames.size());
-
return outputFilenames;
}
@@ -691,7 +741,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
}
/** Returns the FileBasedSink for this write operation. */
- public FileBasedSink<OutputT, DestinationT> getSink() {
+ public FileBasedSink<?, DestinationT, OutputT> getSink() {
return sink;
}
@@ -727,10 +777,10 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
*
* @param <OutputT> the type of values to write.
*/
- public abstract static class Writer<OutputT, DestinationT> {
+ public abstract static class Writer<DestinationT, OutputT> {
private static final Logger LOG = LoggerFactory.getLogger(Writer.class);
- private final WriteOperation<OutputT, DestinationT> writeOperation;
+ private final WriteOperation<DestinationT, OutputT> writeOperation;
/** Unique id for this output bundle. */
private String id;
@@ -757,7 +807,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
private final String mimeType;
/** Construct a new {@link Writer} that will produce files of the given MIME type. */
- public Writer(WriteOperation<OutputT, DestinationT> writeOperation, String mimeType) {
+ public Writer(WriteOperation<DestinationT, OutputT> writeOperation, String mimeType) {
checkNotNull(writeOperation);
this.writeOperation = writeOperation;
this.mimeType = mimeType;
@@ -930,9 +980,14 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
}
/** Return the WriteOperation that this Writer belongs to. */
- public WriteOperation<OutputT, DestinationT> getWriteOperation() {
+ public WriteOperation<DestinationT, OutputT> getWriteOperation() {
return writeOperation;
}
+
+ /** Return the user destination object for this writer. */
+ public DestinationT getDestination() {
+ return destination;
+ }
}
/**
@@ -987,7 +1042,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
@Experimental(Kind.FILESYSTEM)
public ResourceId getDestinationFile(
- DynamicDestinations<?, DestinationT> dynamicDestinations,
+ DynamicDestinations<?, DestinationT, ?> dynamicDestinations,
int numShards,
OutputFileHints outputFileHints) {
checkArgument(getShard() != UNKNOWN_SHARDNUM);
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index 6e7b243..29b3e29 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -45,7 +45,6 @@ import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.values.PBegin;
@@ -357,10 +356,12 @@ public class TFRecordIO {
checkState(getOutputPrefix() != null,
"need to set the output prefix of a TFRecordIO.Write transform");
WriteFiles<byte[], Void, byte[]> write =
- WriteFiles.<byte[], Void, byte[]>to(
+ WriteFiles.to(
new TFRecordSink(
- getOutputPrefix(), getShardTemplate(), getFilenameSuffix(), getCompressionType()),
- SerializableFunctions.<byte[]>identity());
+ getOutputPrefix(),
+ getShardTemplate(),
+ getFilenameSuffix(),
+ getCompressionType()));
if (getNumShards() > 0) {
write = write.withNumShards(getNumShards());
}
@@ -548,7 +549,7 @@ public class TFRecordIO {
/** A {@link FileBasedSink} for TFRecord files. Produces TFRecord files. */
@VisibleForTesting
- static class TFRecordSink extends FileBasedSink<byte[], Void> {
+ static class TFRecordSink extends FileBasedSink<byte[], Void, byte[]> {
@VisibleForTesting
TFRecordSink(
ValueProvider<ResourceId> outputPrefix,
@@ -557,7 +558,7 @@ public class TFRecordIO {
TFRecordIO.CompressionType compressionType) {
super(
outputPrefix,
- DynamicFileDestinations.constant(
+ DynamicFileDestinations.<byte[]>constant(
DefaultFilenamePolicy.fromStandardParameters(
outputPrefix, shardTemplate, suffix, false)),
writableByteChannelFactory(compressionType));
@@ -571,7 +572,7 @@ public class TFRecordIO {
}
@Override
- public WriteOperation<byte[], Void> createWriteOperation() {
+ public WriteOperation<Void, byte[]> createWriteOperation() {
return new TFRecordWriteOperation(this);
}
@@ -591,23 +592,23 @@ public class TFRecordIO {
}
/** A {@link WriteOperation WriteOperation} for TFRecord files. */
- private static class TFRecordWriteOperation extends WriteOperation<byte[], Void> {
+ private static class TFRecordWriteOperation extends WriteOperation<Void, byte[]> {
private TFRecordWriteOperation(TFRecordSink sink) {
super(sink);
}
@Override
- public Writer<byte[], Void> createWriter() throws Exception {
+ public Writer<Void, byte[]> createWriter() throws Exception {
return new TFRecordWriter(this);
}
}
/** A {@link Writer Writer} for TFRecord files. */
- private static class TFRecordWriter extends Writer<byte[], Void> {
+ private static class TFRecordWriter extends Writer<Void, byte[]> {
private WritableByteChannel outChannel;
private TFRecordCodec codec;
- private TFRecordWriter(WriteOperation<byte[], Void> writeOperation) {
+ private TFRecordWriter(WriteOperation<Void, byte[]> writeOperation) {
super(writeOperation, MimeTypes.BINARY);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 765a842..312dc07 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -23,6 +23,10 @@ import static com.google.common.base.Preconditions.checkState;
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicates;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
@@ -52,8 +56,8 @@ import org.apache.beam.sdk.values.PDone;
*
* <p>To read a {@link PCollection} from one or more text files, use {@code TextIO.read()} to
* instantiate a transform and use {@link TextIO.Read#from(String)} to specify the path of the
- * file(s) to be read. Alternatively, if the filenames to be read are themselves in a
- * {@link PCollection}, apply {@link TextIO#readAll()}.
+ * file(s) to be read. Alternatively, if the filenames to be read are themselves in a {@link
+ * PCollection}, apply {@link TextIO#readAll()}.
*
* <p>{@link TextIO.Read} returns a {@link PCollection} of {@link String Strings}, each
* corresponding to one line of an input UTF-8 text file (split into lines delimited by '\n', '\r',
@@ -70,8 +74,8 @@ import org.apache.beam.sdk.values.PDone;
*
* <p>If it is known that the filepattern will match a very large number of files (e.g. tens of
* thousands or more), use {@link Read#withHintMatchesManyFiles} for better performance and
- * scalability. Note that it may decrease performance if the filepattern matches only a small
- * number of files.
+ * scalability. Note that it may decrease performance if the filepattern matches only a small number
+ * of files.
*
* <p>Example 2: reading a PCollection of filenames.
*
@@ -121,9 +125,9 @@ import org.apache.beam.sdk.values.PDone;
* allows you to convert any input value into a custom destination object, and map that destination
* object to a {@link FilenamePolicy}. This allows using different filename policies (or more
* commonly, differently-configured instances of the same policy) based on the input record. Often
- * this is used in conjunction with {@link TextIO#writeCustomType(SerializableFunction)}, which
- * allows your {@link DynamicDestinations} object to examine the input type and takes a format
- * function to convert that type to a string for writing.
+ * this is used in conjunction with {@link TextIO#writeCustomType}, which allows your {@link
+ * DynamicDestinations} object to examine the input type and takes a format function to convert that
+ * type to a string for writing.
*
* <p>A convenience shortcut is provided for the case where the default naming policy is used, but
* different configurations of this policy are wanted based on the input record. Default naming
@@ -189,20 +193,23 @@ public class TextIO {
* line.
*
* <p>This version allows you to apply {@link TextIO} writes to a PCollection of a custom type
- * {@link T}, along with a format function that converts the input type {@link T} to the String
- * that will be written to the file. The advantage of this is it allows a user-provided {@link
+ * {@link UserT}. A format mechanism that converts the input type {@link UserT} to the String that
+ * will be written to the file must be specified. If using a custom {@link DynamicDestinations}
+ * object this is done using {@link DynamicDestinations#formatRecord}, otherwise the {@link
+ * TypedWrite#withFormatFunction} can be used to specify a format function.
+ *
+ * <p>The advantage of using a custom type is that is it allows a user-provided {@link
* DynamicDestinations} object, set via {@link Write#to(DynamicDestinations)} to examine the
- * user's custom type when choosing a destination.
+ * custom type when choosing a destination.
*/
- public static <T> TypedWrite<T> writeCustomType(SerializableFunction<T, String> formatFunction) {
- return new AutoValue_TextIO_TypedWrite.Builder<T>()
+ public static <UserT> TypedWrite<UserT> writeCustomType() {
+ return new AutoValue_TextIO_TypedWrite.Builder<UserT>()
.setFilenamePrefix(null)
.setTempDirectory(null)
.setShardTemplate(null)
.setFilenameSuffix(null)
.setFilenamePolicy(null)
.setDynamicDestinations(null)
- .setFormatFunction(formatFunction)
.setWritableByteChannelFactory(FileBasedSink.CompressionType.UNCOMPRESSED)
.setWindowedWrites(false)
.setNumShards(0)
@@ -417,11 +424,11 @@ public class TextIO {
}
}
- /////////////////////////////////////////////////////////////////////////////
+ // ///////////////////////////////////////////////////////////////////////////
/** Implementation of {@link #write}. */
@AutoValue
- public abstract static class TypedWrite<T> extends PTransform<PCollection<T>, PDone> {
+ public abstract static class TypedWrite<UserT> extends PTransform<PCollection<UserT>, PDone> {
/** The prefix of each file written, combined with suffix and shardTemplate. */
@Nullable abstract ValueProvider<ResourceId> getFilenamePrefix();
@@ -449,10 +456,19 @@ public class TextIO {
/** Allows for value-dependent {@link DynamicDestinations} to be vended. */
@Nullable
- abstract DynamicDestinations<T, ?> getDynamicDestinations();
+ abstract DynamicDestinations<UserT, ?, String> getDynamicDestinations();
+
+ @Nullable
+ /** A destination function for using {@link DefaultFilenamePolicy} */
+ abstract SerializableFunction<UserT, Params> getDestinationFunction();
- /** A function that converts T to a String, for writing to the file. */
- abstract SerializableFunction<T, String> getFormatFunction();
+ @Nullable
+ /** A default destination for empty PCollections. */
+ abstract Params getEmptyDestination();
+
+ /** A function that converts UserT to a String, for writing to the file. */
+ @Nullable
+ abstract SerializableFunction<UserT, String> getFormatFunction();
/** Whether to write windowed output files. */
abstract boolean getWindowedWrites();
@@ -463,37 +479,42 @@ public class TextIO {
*/
abstract WritableByteChannelFactory getWritableByteChannelFactory();
- abstract Builder<T> toBuilder();
+ abstract Builder<UserT> toBuilder();
@AutoValue.Builder
- abstract static class Builder<T> {
- abstract Builder<T> setFilenamePrefix(ValueProvider<ResourceId> filenamePrefix);
+ abstract static class Builder<UserT> {
+ abstract Builder<UserT> setFilenamePrefix(ValueProvider<ResourceId> filenamePrefix);
+
+ abstract Builder<UserT> setTempDirectory(ValueProvider<ResourceId> tempDirectory);
- abstract Builder<T> setTempDirectory(ValueProvider<ResourceId> tempDirectory);
+ abstract Builder<UserT> setShardTemplate(@Nullable String shardTemplate);
- abstract Builder<T> setShardTemplate(@Nullable String shardTemplate);
+ abstract Builder<UserT> setFilenameSuffix(@Nullable String filenameSuffix);
- abstract Builder<T> setFilenameSuffix(@Nullable String filenameSuffix);
+ abstract Builder<UserT> setHeader(@Nullable String header);
- abstract Builder<T> setHeader(@Nullable String header);
+ abstract Builder<UserT> setFooter(@Nullable String footer);
- abstract Builder<T> setFooter(@Nullable String footer);
+ abstract Builder<UserT> setFilenamePolicy(@Nullable FilenamePolicy filenamePolicy);
- abstract Builder<T> setFilenamePolicy(@Nullable FilenamePolicy filenamePolicy);
+ abstract Builder<UserT> setDynamicDestinations(
+ @Nullable DynamicDestinations<UserT, ?, String> dynamicDestinations);
- abstract Builder<T> setDynamicDestinations(
- @Nullable DynamicDestinations<T, ?> dynamicDestinations);
+ abstract Builder<UserT> setDestinationFunction(
+ @Nullable SerializableFunction<UserT, Params> destinationFunction);
- abstract Builder<T> setFormatFunction(SerializableFunction<T, String> formatFunction);
+ abstract Builder<UserT> setEmptyDestination(Params emptyDestination);
- abstract Builder<T> setNumShards(int numShards);
+ abstract Builder<UserT> setFormatFunction(SerializableFunction<UserT, String> formatFunction);
- abstract Builder<T> setWindowedWrites(boolean windowedWrites);
+ abstract Builder<UserT> setNumShards(int numShards);
- abstract Builder<T> setWritableByteChannelFactory(
+ abstract Builder<UserT> setWindowedWrites(boolean windowedWrites);
+
+ abstract Builder<UserT> setWritableByteChannelFactory(
WritableByteChannelFactory writableByteChannelFactory);
- abstract TypedWrite<T> build();
+ abstract TypedWrite<UserT> build();
}
/**
@@ -513,18 +534,18 @@ public class TextIO {
* <p>If {@link #withTempDirectory} has not been called, this filename prefix will be used to
* infer a directory for temporary files.
*/
- public TypedWrite<T> to(String filenamePrefix) {
+ public TypedWrite<UserT> to(String filenamePrefix) {
return to(FileBasedSink.convertToFileResourceIfPossible(filenamePrefix));
}
/** Like {@link #to(String)}. */
@Experimental(Kind.FILESYSTEM)
- public TypedWrite<T> to(ResourceId filenamePrefix) {
+ public TypedWrite<UserT> to(ResourceId filenamePrefix) {
return toResource(StaticValueProvider.of(filenamePrefix));
}
/** Like {@link #to(String)}. */
- public TypedWrite<T> to(ValueProvider<String> outputPrefix) {
+ public TypedWrite<UserT> to(ValueProvider<String> outputPrefix) {
return toResource(NestedValueProvider.of(outputPrefix,
new SerializableFunction<String, ResourceId>() {
@Override
@@ -538,7 +559,7 @@ public class TextIO {
* Writes to files named according to the given {@link FileBasedSink.FilenamePolicy}. A
* directory for temporary files must be specified using {@link #withTempDirectory}.
*/
- public TypedWrite<T> to(FilenamePolicy filenamePolicy) {
+ public TypedWrite<UserT> to(FilenamePolicy filenamePolicy) {
return toBuilder().setFilenamePolicy(filenamePolicy).build();
}
@@ -547,7 +568,7 @@ public class TextIO {
* objects can examine the input record when creating a {@link FilenamePolicy}. A directory for
* temporary files must be specified using {@link #withTempDirectory}.
*/
- public TypedWrite<T> to(DynamicDestinations<T, ?> dynamicDestinations) {
+ public TypedWrite<UserT> to(DynamicDestinations<UserT, ?, String> dynamicDestinations) {
return toBuilder().setDynamicDestinations(dynamicDestinations).build();
}
@@ -558,26 +579,39 @@ public class TextIO {
* emptyDestination parameter specified where empty files should be written for when the written
* {@link PCollection} is empty.
*/
- public TypedWrite<T> to(
- SerializableFunction<T, Params> destinationFunction, Params emptyDestination) {
- return to(DynamicFileDestinations.toDefaultPolicies(destinationFunction, emptyDestination));
+ public TypedWrite<UserT> to(
+ SerializableFunction<UserT, Params> destinationFunction, Params emptyDestination) {
+ return toBuilder()
+ .setDestinationFunction(destinationFunction)
+ .setEmptyDestination(emptyDestination)
+ .build();
}
/** Like {@link #to(ResourceId)}. */
@Experimental(Kind.FILESYSTEM)
- public TypedWrite<T> toResource(ValueProvider<ResourceId> filenamePrefix) {
+ public TypedWrite<UserT> toResource(ValueProvider<ResourceId> filenamePrefix) {
return toBuilder().setFilenamePrefix(filenamePrefix).build();
}
+ /**
+ * Specifies a format function to convert {@link UserT} to the output type. If {@link
+ * #to(DynamicDestinations)} is used, {@link DynamicDestinations#formatRecord(Object)} must be
+ * used instead.
+ */
+ public TypedWrite<UserT> withFormatFunction(
+ SerializableFunction<UserT, String> formatFunction) {
+ return toBuilder().setFormatFunction(formatFunction).build();
+ }
+
/** Set the base directory used to generate temporary files. */
@Experimental(Kind.FILESYSTEM)
- public TypedWrite<T> withTempDirectory(ValueProvider<ResourceId> tempDirectory) {
+ public TypedWrite<UserT> withTempDirectory(ValueProvider<ResourceId> tempDirectory) {
return toBuilder().setTempDirectory(tempDirectory).build();
}
/** Set the base directory used to generate temporary files. */
@Experimental(Kind.FILESYSTEM)
- public TypedWrite<T> withTempDirectory(ResourceId tempDirectory) {
+ public TypedWrite<UserT> withTempDirectory(ResourceId tempDirectory) {
return withTempDirectory(StaticValueProvider.of(tempDirectory));
}
@@ -589,7 +623,7 @@ public class TextIO {
* <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are
* used.
*/
- public TypedWrite<T> withShardNameTemplate(String shardTemplate) {
+ public TypedWrite<UserT> withShardNameTemplate(String shardTemplate) {
return toBuilder().setShardTemplate(shardTemplate).build();
}
@@ -601,7 +635,7 @@ public class TextIO {
* <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are
* used.
*/
- public TypedWrite<T> withSuffix(String filenameSuffix) {
+ public TypedWrite<UserT> withSuffix(String filenameSuffix) {
return toBuilder().setFilenameSuffix(filenameSuffix).build();
}
@@ -615,7 +649,7 @@ public class TextIO {
*
* @param numShards the number of shards to use, or 0 to let the system decide.
*/
- public TypedWrite<T> withNumShards(int numShards) {
+ public TypedWrite<UserT> withNumShards(int numShards) {
checkArgument(numShards >= 0);
return toBuilder().setNumShards(numShards).build();
}
@@ -629,7 +663,7 @@ public class TextIO {
*
* <p>This is equivalent to {@code .withNumShards(1).withShardNameTemplate("")}
*/
- public TypedWrite<T> withoutSharding() {
+ public TypedWrite<UserT> withoutSharding() {
return withNumShards(1).withShardNameTemplate("");
}
@@ -638,7 +672,7 @@ public class TextIO {
*
* <p>A {@code null} value will clear any previously configured header.
*/
- public TypedWrite<T> withHeader(@Nullable String header) {
+ public TypedWrite<UserT> withHeader(@Nullable String header) {
return toBuilder().setHeader(header).build();
}
@@ -647,7 +681,7 @@ public class TextIO {
*
* <p>A {@code null} value will clear any previously configured footer.
*/
- public TypedWrite<T> withFooter(@Nullable String footer) {
+ public TypedWrite<UserT> withFooter(@Nullable String footer) {
return toBuilder().setFooter(footer).build();
}
@@ -658,7 +692,7 @@ public class TextIO {
*
* <p>A {@code null} value will reset the value to the default value mentioned above.
*/
- public TypedWrite<T> withWritableByteChannelFactory(
+ public TypedWrite<UserT> withWritableByteChannelFactory(
WritableByteChannelFactory writableByteChannelFactory) {
return toBuilder().setWritableByteChannelFactory(writableByteChannelFactory).build();
}
@@ -669,36 +703,58 @@ public class TextIO {
* <p>If using {@link #to(FileBasedSink.FilenamePolicy)}. Filenames will be generated using
* {@link FilenamePolicy#windowedFilename}. See also {@link WriteFiles#withWindowedWrites()}.
*/
- public TypedWrite<T> withWindowedWrites() {
+ public TypedWrite<UserT> withWindowedWrites() {
return toBuilder().setWindowedWrites(true).build();
}
- private DynamicDestinations<T, ?> resolveDynamicDestinations() {
- DynamicDestinations<T, ?> dynamicDestinations = getDynamicDestinations();
+ private DynamicDestinations<UserT, ?, String> resolveDynamicDestinations() {
+ DynamicDestinations<UserT, ?, String> dynamicDestinations = getDynamicDestinations();
if (dynamicDestinations == null) {
- FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
- if (usedFilenamePolicy == null) {
- usedFilenamePolicy =
- DefaultFilenamePolicy.fromStandardParameters(
- getFilenamePrefix(),
- getShardTemplate(),
- getFilenameSuffix(),
- getWindowedWrites());
+ if (getDestinationFunction() != null) {
+ dynamicDestinations =
+ DynamicFileDestinations.toDefaultPolicies(
+ getDestinationFunction(), getEmptyDestination(), getFormatFunction());
+ } else {
+ FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
+ if (usedFilenamePolicy == null) {
+ usedFilenamePolicy =
+ DefaultFilenamePolicy.fromStandardParameters(
+ getFilenamePrefix(),
+ getShardTemplate(),
+ getFilenameSuffix(),
+ getWindowedWrites());
+ }
+ dynamicDestinations =
+ DynamicFileDestinations.constant(usedFilenamePolicy, getFormatFunction());
}
- dynamicDestinations = DynamicFileDestinations.constant(usedFilenamePolicy);
}
return dynamicDestinations;
}
@Override
- public PDone expand(PCollection<T> input) {
+ public PDone expand(PCollection<UserT> input) {
checkState(
getFilenamePrefix() != null || getTempDirectory() != null,
"Need to set either the filename prefix or the tempDirectory of a TextIO.Write "
+ "transform.");
- checkState(
- getFilenamePolicy() == null || getDynamicDestinations() == null,
- "Cannot specify both a filename policy and dynamic destinations");
+
+ List<?> allToArgs =
+ Lists.newArrayList(
+ getFilenamePolicy(),
+ getDynamicDestinations(),
+ getFilenamePrefix(),
+ getDestinationFunction());
+ checkArgument(
+ 1 == Iterables.size(Iterables.filter(allToArgs, Predicates.notNull())),
+ "Exactly one of filename policy, dynamic destinations, filename prefix, or destination "
+ + "function must be set");
+
+ if (getDynamicDestinations() != null) {
+ checkArgument(
+ getFormatFunction() == null,
+ "A format function should not be specified "
+ + "with DynamicDestinations. Use DynamicDestinations.formatRecord instead");
+ }
if (getFilenamePolicy() != null || getDynamicDestinations() != null) {
checkState(
getShardTemplate() == null && getFilenameSuffix() == null,
@@ -709,20 +765,20 @@ public class TextIO {
}
public <DestinationT> PDone expandTyped(
- PCollection<T> input, DynamicDestinations<T, DestinationT> dynamicDestinations) {
+ PCollection<UserT> input,
+ DynamicDestinations<UserT, DestinationT, String> dynamicDestinations) {
ValueProvider<ResourceId> tempDirectory = getTempDirectory();
if (tempDirectory == null) {
tempDirectory = getFilenamePrefix();
}
- WriteFiles<T, DestinationT, String> write =
+ WriteFiles<UserT, DestinationT, String> write =
WriteFiles.to(
new TextSink<>(
tempDirectory,
dynamicDestinations,
getHeader(),
getFooter(),
- getWritableByteChannelFactory()),
- getFormatFunction());
+ getWritableByteChannelFactory()));
if (getNumShards() > 0) {
write = write.withNumShards(getNumShards());
}
@@ -774,7 +830,7 @@ public class TextIO {
@VisibleForTesting TypedWrite<String> inner;
Write() {
- this(TextIO.writeCustomType(SerializableFunctions.<String>identity()));
+ this(TextIO.<String>writeCustomType());
}
Write(TypedWrite<String> inner) {
@@ -783,43 +839,53 @@ public class TextIO {
/** See {@link TypedWrite#to(String)}. */
public Write to(String filenamePrefix) {
- return new Write(inner.to(filenamePrefix));
+ return new Write(
+ inner.to(filenamePrefix).withFormatFunction(SerializableFunctions.<String>identity()));
}
/** See {@link TypedWrite#to(ResourceId)}. */
@Experimental(Kind.FILESYSTEM)
public Write to(ResourceId filenamePrefix) {
- return new Write(inner.to(filenamePrefix));
+ return new Write(
+ inner.to(filenamePrefix).withFormatFunction(SerializableFunctions.<String>identity()));
}
/** See {@link TypedWrite#to(ValueProvider)}. */
public Write to(ValueProvider<String> outputPrefix) {
- return new Write(inner.to(outputPrefix));
+ return new Write(
+ inner.to(outputPrefix).withFormatFunction(SerializableFunctions.<String>identity()));
}
/** See {@link TypedWrite#toResource(ValueProvider)}. */
@Experimental(Kind.FILESYSTEM)
public Write toResource(ValueProvider<ResourceId> filenamePrefix) {
- return new Write(inner.toResource(filenamePrefix));
+ return new Write(
+ inner
+ .toResource(filenamePrefix)
+ .withFormatFunction(SerializableFunctions.<String>identity()));
}
/** See {@link TypedWrite#to(FilenamePolicy)}. */
@Experimental(Kind.FILESYSTEM)
public Write to(FilenamePolicy filenamePolicy) {
- return new Write(inner.to(filenamePolicy));
+ return new Write(
+ inner.to(filenamePolicy).withFormatFunction(SerializableFunctions.<String>identity()));
}
/** See {@link TypedWrite#to(DynamicDestinations)}. */
@Experimental(Kind.FILESYSTEM)
- public Write to(DynamicDestinations<String, ?> dynamicDestinations) {
- return new Write(inner.to(dynamicDestinations));
+ public Write to(DynamicDestinations<String, ?, String> dynamicDestinations) {
+ return new Write(inner.to(dynamicDestinations).withFormatFunction(null));
}
/** See {@link TypedWrite#to(SerializableFunction, Params)}. */
@Experimental(Kind.FILESYSTEM)
public Write to(
SerializableFunction<String, Params> destinationFunction, Params emptyDestination) {
- return new Write(inner.to(destinationFunction, emptyDestination));
+ return new Write(
+ inner
+ .to(destinationFunction, emptyDestination)
+ .withFormatFunction(SerializableFunctions.<String>identity()));
}
/** See {@link TypedWrite#withTempDirectory(ValueProvider)}. */
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
index b57b28c..387e0ac 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
@@ -34,13 +34,13 @@ import org.apache.beam.sdk.util.MimeTypes;
* '\n'} represented in {@code UTF-8} format as the record separator. Each record (including the
* last) is terminated.
*/
-class TextSink<UserT, DestinationT> extends FileBasedSink<String, DestinationT> {
+class TextSink<UserT, DestinationT> extends FileBasedSink<UserT, DestinationT, String> {
@Nullable private final String header;
@Nullable private final String footer;
TextSink(
ValueProvider<ResourceId> baseOutputFilename,
- DynamicDestinations<UserT, DestinationT> dynamicDestinations,
+ DynamicDestinations<UserT, DestinationT, String> dynamicDestinations,
@Nullable String header,
@Nullable String footer,
WritableByteChannelFactory writableByteChannelFactory) {
@@ -50,13 +50,13 @@ class TextSink<UserT, DestinationT> extends FileBasedSink<String, DestinationT>
}
@Override
- public WriteOperation<String, DestinationT> createWriteOperation() {
+ public WriteOperation<DestinationT, String> createWriteOperation() {
return new TextWriteOperation<>(this, header, footer);
}
/** A {@link WriteOperation WriteOperation} for text files. */
private static class TextWriteOperation<DestinationT>
- extends WriteOperation<String, DestinationT> {
+ extends WriteOperation<DestinationT, String> {
@Nullable private final String header;
@Nullable private final String footer;
@@ -67,20 +67,20 @@ class TextSink<UserT, DestinationT> extends FileBasedSink<String, DestinationT>
}
@Override
- public Writer<String, DestinationT> createWriter() throws Exception {
+ public Writer<DestinationT, String> createWriter() throws Exception {
return new TextWriter<>(this, header, footer);
}
}
/** A {@link Writer Writer} for text files. */
- private static class TextWriter<DestinationT> extends Writer<String, DestinationT> {
+ private static class TextWriter<DestinationT> extends Writer<DestinationT, String> {
private static final String NEWLINE = "\n";
@Nullable private final String header;
@Nullable private final String footer;
private OutputStreamWriter out;
public TextWriter(
- WriteOperation<String, DestinationT> writeOperation,
+ WriteOperation<DestinationT, String> writeOperation,
@Nullable String header,
@Nullable String footer) {
super(writeOperation, MimeTypes.TEXT);
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index d8d7478..85c5652 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -60,7 +60,6 @@ import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -76,7 +75,9 @@ import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.ShardedKey;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
@@ -121,9 +122,8 @@ public class WriteFiles<UserT, DestinationT, OutputT>
private static final int SPILLED_RECORD_SHARDING_FACTOR = 10;
static final int UNKNOWN_SHARDNUM = -1;
- private FileBasedSink<OutputT, DestinationT> sink;
- private SerializableFunction<UserT, OutputT> formatFunction;
- private WriteOperation<OutputT, DestinationT> writeOperation;
+ private FileBasedSink<UserT, DestinationT, OutputT> sink;
+ private WriteOperation<DestinationT, OutputT> writeOperation;
// This allows the number of shards to be dynamically computed based on the input
// PCollection.
@Nullable private final PTransform<PCollection<UserT>, PCollectionView<Integer>> computeNumShards;
@@ -133,37 +133,44 @@ public class WriteFiles<UserT, DestinationT, OutputT>
private final ValueProvider<Integer> numShardsProvider;
private final boolean windowedWrites;
private int maxNumWritersPerBundle;
+ // This is the set of side inputs used by this transform. This is usually populated by the users's
+ // DynamicDestinations object.
+ private final List<PCollectionView<?>> sideInputs;
/**
* Creates a {@link WriteFiles} transform that writes to the given {@link FileBasedSink}, letting
* the runner control how many different shards are produced.
*/
public static <UserT, DestinationT, OutputT> WriteFiles<UserT, DestinationT, OutputT> to(
- FileBasedSink<OutputT, DestinationT> sink,
- SerializableFunction<UserT, OutputT> formatFunction) {
+ FileBasedSink<UserT, DestinationT, OutputT> sink) {
checkNotNull(sink, "sink");
return new WriteFiles<>(
sink,
- formatFunction,
null /* runner-determined sharding */,
null,
false,
- DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE);
+ DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE,
+ sink.getDynamicDestinations().getSideInputs());
}
private WriteFiles(
- FileBasedSink<OutputT, DestinationT> sink,
- SerializableFunction<UserT, OutputT> formatFunction,
+ FileBasedSink<UserT, DestinationT, OutputT> sink,
@Nullable PTransform<PCollection<UserT>, PCollectionView<Integer>> computeNumShards,
@Nullable ValueProvider<Integer> numShardsProvider,
boolean windowedWrites,
- int maxNumWritersPerBundle) {
+ int maxNumWritersPerBundle,
+ List<PCollectionView<?>> sideInputs) {
this.sink = sink;
- this.formatFunction = checkNotNull(formatFunction);
this.computeNumShards = computeNumShards;
this.numShardsProvider = numShardsProvider;
this.windowedWrites = windowedWrites;
this.maxNumWritersPerBundle = maxNumWritersPerBundle;
+ this.sideInputs = sideInputs;
+ }
+
+ @Override
+ public Map<TupleTag<?>, PValue> getAdditionalInputs() {
+ return PCollectionViews.toAdditionalInputs(sideInputs);
}
@Override
@@ -207,15 +214,10 @@ public class WriteFiles<UserT, DestinationT, OutputT>
}
/** Returns the {@link FileBasedSink} associated with this PTransform. */
- public FileBasedSink<OutputT, DestinationT> getSink() {
+ public FileBasedSink<UserT, DestinationT, OutputT> getSink() {
return sink;
}
- /** Returns the the format function that maps the user type to the record written to files. */
- public SerializableFunction<UserT, OutputT> getFormatFunction() {
- return formatFunction;
- }
-
/**
* Returns whether or not to perform windowed writes.
*/
@@ -266,11 +268,11 @@ public class WriteFiles<UserT, DestinationT, OutputT>
ValueProvider<Integer> numShardsProvider) {
return new WriteFiles<>(
sink,
- formatFunction,
computeNumShards,
numShardsProvider,
windowedWrites,
- maxNumWritersPerBundle);
+ maxNumWritersPerBundle,
+ sideInputs);
}
/** Set the maximum number of writers created in a bundle before spilling to shuffle. */
@@ -278,11 +280,22 @@ public class WriteFiles<UserT, DestinationT, OutputT>
int maxNumWritersPerBundle) {
return new WriteFiles<>(
sink,
- formatFunction,
computeNumShards,
numShardsProvider,
windowedWrites,
- maxNumWritersPerBundle);
+ maxNumWritersPerBundle,
+ sideInputs);
+ }
+
+ public WriteFiles<UserT, DestinationT, OutputT> withSideInputs(
+ List<PCollectionView<?>> sideInputs) {
+ return new WriteFiles<>(
+ sink,
+ computeNumShards,
+ numShardsProvider,
+ windowedWrites,
+ maxNumWritersPerBundle,
+ sideInputs);
}
/**
@@ -297,7 +310,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
checkNotNull(
sharding, "Cannot provide null sharding. Use withRunnerDeterminedSharding() instead");
return new WriteFiles<>(
- sink, formatFunction, sharding, null, windowedWrites, maxNumWritersPerBundle);
+ sink, sharding, null, windowedWrites, maxNumWritersPerBundle, sideInputs);
}
/**
@@ -305,8 +318,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
* runner-determined sharding.
*/
public WriteFiles<UserT, DestinationT, OutputT> withRunnerDeterminedSharding() {
- return new WriteFiles<>(
- sink, formatFunction, null, null, windowedWrites, maxNumWritersPerBundle);
+ return new WriteFiles<>(sink, null, null, windowedWrites, maxNumWritersPerBundle, sideInputs);
}
/**
@@ -323,7 +335,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
*/
public WriteFiles<UserT, DestinationT, OutputT> withWindowedWrites() {
return new WriteFiles<>(
- sink, formatFunction, computeNumShards, numShardsProvider, true, maxNumWritersPerBundle);
+ sink, computeNumShards, numShardsProvider, true, maxNumWritersPerBundle, sideInputs);
}
private static class WriterKey<DestinationT> {
@@ -374,7 +386,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
private final Coder<DestinationT> destinationCoder;
private final boolean windowedWrites;
- private Map<WriterKey<DestinationT>, Writer<OutputT, DestinationT>> writers;
+ private Map<WriterKey<DestinationT>, Writer<DestinationT, OutputT>> writers;
private int spilledShardNum = UNKNOWN_SHARDNUM;
WriteBundles(
@@ -394,6 +406,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
+ sink.getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
PaneInfo paneInfo = c.pane();
// If we are doing windowed writes, we need to ensure that we have separate files for
// data in different windows/panes. Similar for dynamic writes, make sure that different
@@ -402,7 +415,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
// the map will only have a single element.
DestinationT destination = sink.getDynamicDestinations().getDestination(c.element());
WriterKey<DestinationT> key = new WriterKey<>(window, c.pane(), destination);
- Writer<OutputT, DestinationT> writer = writers.get(key);
+ Writer<DestinationT, OutputT> writer = writers.get(key);
if (writer == null) {
if (writers.size() <= maxNumWritersPerBundle) {
String uuid = UUID.randomUUID().toString();
@@ -436,14 +449,14 @@ public class WriteFiles<UserT, DestinationT, OutputT>
return;
}
}
- writeOrClose(writer, formatFunction.apply(c.element()));
+ writeOrClose(writer, getSink().getDynamicDestinations().formatRecord(c.element()));
}
@FinishBundle
public void finishBundle(FinishBundleContext c) throws Exception {
- for (Map.Entry<WriterKey<DestinationT>, Writer<OutputT, DestinationT>> entry :
+ for (Map.Entry<WriterKey<DestinationT>, Writer<DestinationT, OutputT>> entry :
writers.entrySet()) {
- Writer<OutputT, DestinationT> writer = entry.getValue();
+ Writer<DestinationT, OutputT> writer = entry.getValue();
FileResult<DestinationT> result;
try {
result = writer.close();
@@ -478,13 +491,14 @@ public class WriteFiles<UserT, DestinationT, OutputT>
@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
+ sink.getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
// Since we key by a 32-bit hash of the destination, there might be multiple destinations
// in this iterable. The number of destinations is generally very small (1000s or less), so
// there will rarely be hash collisions.
- Map<DestinationT, Writer<OutputT, DestinationT>> writers = Maps.newHashMap();
+ Map<DestinationT, Writer<DestinationT, OutputT>> writers = Maps.newHashMap();
for (UserT input : c.element().getValue()) {
DestinationT destination = sink.getDynamicDestinations().getDestination(input);
- Writer<OutputT, DestinationT> writer = writers.get(destination);
+ Writer<DestinationT, OutputT> writer = writers.get(destination);
if (writer == null) {
LOG.debug("Opening writer for write operation {}", writeOperation);
writer = writeOperation.createWriter();
@@ -501,12 +515,12 @@ public class WriteFiles<UserT, DestinationT, OutputT>
LOG.debug("Done opening writer");
writers.put(destination, writer);
}
- writeOrClose(writer, formatFunction.apply(input));
- }
+ writeOrClose(writer, getSink().getDynamicDestinations().formatRecord(input));
+ }
// Close all writers.
- for (Map.Entry<DestinationT, Writer<OutputT, DestinationT>> entry : writers.entrySet()) {
- Writer<OutputT, DestinationT> writer = entry.getValue();
+ for (Map.Entry<DestinationT, Writer<DestinationT, OutputT>> entry : writers.entrySet()) {
+ Writer<DestinationT, OutputT> writer = entry.getValue();
FileResult<DestinationT> result;
try {
// Close the writer; if this throws let the error propagate.
@@ -526,8 +540,8 @@ public class WriteFiles<UserT, DestinationT, OutputT>
}
}
- private static <OutputT, DestinationT> void writeOrClose(
- Writer<OutputT, DestinationT> writer, OutputT t) throws Exception {
+ private static <DestinationT, OutputT> void writeOrClose(
+ Writer<DestinationT, OutputT> writer, OutputT t) throws Exception {
try {
writer.write(t);
} catch (Exception e) {
@@ -678,6 +692,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
input.apply(
writeName,
ParDo.of(new WriteBundles(windowedWrites, unwrittedRecordsTag, destinationCoder))
+ .withSideInputs(sideInputs)
.withOutputTags(writtenRecordsTag, TupleTagList.of(unwrittedRecordsTag)));
PCollection<FileResult<DestinationT>> writtenBundleFiles =
writeTuple
@@ -692,17 +707,18 @@ public class WriteFiles<UserT, DestinationT, OutputT>
.apply("GroupUnwritten", GroupByKey.<ShardedKey<Integer>, UserT>create())
.apply(
"WriteUnwritten",
- ParDo.of(new WriteShardedBundles(ShardAssignment.ASSIGN_IN_FINALIZE)))
+ ParDo.of(new WriteShardedBundles(ShardAssignment.ASSIGN_IN_FINALIZE))
+ .withSideInputs(sideInputs))
.setCoder(FileResultCoder.of(shardedWindowCoder, destinationCoder));
results =
PCollectionList.of(writtenBundleFiles)
.and(writtenGroupedFiles)
.apply(Flatten.<FileResult<DestinationT>>pCollections());
} else {
- List<PCollectionView<?>> sideInputs = Lists.newArrayList();
+ List<PCollectionView<?>> shardingSideInputs = Lists.newArrayList();
if (computeNumShards != null) {
numShardsView = input.apply(computeNumShards);
- sideInputs.add(numShardsView);
+ shardingSideInputs.add(numShardsView);
} else {
numShardsView = null;
}
@@ -715,7 +731,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
numShardsView,
(numShardsView != null) ? null : numShardsProvider,
destinationCoder))
- .withSideInputs(sideInputs))
+ .withSideInputs(shardingSideInputs))
.setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
.apply("GroupIntoShards", GroupByKey.<ShardedKey<Integer>, UserT>create());
shardedWindowCoder =
@@ -728,7 +744,8 @@ public class WriteFiles<UserT, DestinationT, OutputT>
results =
sharded.apply(
"WriteShardedBundles",
- ParDo.of(new WriteShardedBundles(ShardAssignment.ASSIGN_WHEN_WRITING)));
+ ParDo.of(new WriteShardedBundles(ShardAssignment.ASSIGN_WHEN_WRITING))
+ .withSideInputs(sideInputs));
}
results.setCoder(FileResultCoder.of(shardedWindowCoder, destinationCoder));
@@ -773,11 +790,12 @@ public class WriteFiles<UserT, DestinationT, OutputT>
} else {
final PCollectionView<Iterable<FileResult<DestinationT>>> resultsView =
results.apply(View.<FileResult<DestinationT>>asIterable());
- ImmutableList.Builder<PCollectionView<?>> sideInputs =
+ ImmutableList.Builder<PCollectionView<?>> finalizeSideInputs =
ImmutableList.<PCollectionView<?>>builder().add(resultsView);
if (numShardsView != null) {
- sideInputs.add(numShardsView);
+ finalizeSideInputs.add(numShardsView);
}
+ finalizeSideInputs.addAll(sideInputs);
// Finalize the write in another do-once ParDo on the singleton collection containing the
// Writer. The results from the per-bundle writes are given as an Iterable side input.
@@ -794,7 +812,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
new DoFn<Void, Integer>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
- LOG.info("Finalizing write operation {}.", writeOperation);
+ sink.getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
// We must always output at least 1 shard, and honor user-specified numShards
// if
// set.
@@ -827,7 +845,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
writeOperation.removeTemporaryFiles(tempFiles);
}
})
- .withSideInputs(sideInputs.build()));
+ .withSideInputs(finalizeSideInputs.build()));
}
return PDone.in(input.getPipeline());
}
@@ -857,7 +875,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
minShardsNeeded,
destination);
for (int i = 0; i < extraShardsNeeded; ++i) {
- Writer<OutputT, DestinationT> writer = writeOperation.createWriter();
+ Writer<DestinationT, OutputT> writer = writeOperation.createWriter();
// Currently this code path is only called in the unwindowed case.
writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, destination);
FileResult<DestinationT> emptyWrite = writer.close();
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 154ff5a..a96b6be 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -30,9 +30,11 @@ import static org.junit.Assert.assertTrue;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
@@ -41,6 +43,7 @@ import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
@@ -48,6 +51,7 @@ import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.reflect.Nullable;
@@ -55,6 +59,7 @@ import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
@@ -68,6 +73,7 @@ import org.apache.beam.sdk.testing.UsesTestStream;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -77,6 +83,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -535,17 +542,147 @@ public class AvroIOTest {
assertThat(actualElements, containsInAnyOrder(allElements.toArray()));
}
+ private static final String SCHEMA_TEMPLATE_STRING =
+ "{\"namespace\": \"example.avro\",\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"TestTemplateSchema$$\",\n"
+ + " \"fields\": [\n"
+ + " {\"name\": \"$$full\", \"type\": \"string\"},\n"
+ + " {\"name\": \"$$suffix\", \"type\": [\"string\", \"null\"]}\n"
+ + " ]\n"
+ + "}";
+
+ private static String schemaFromPrefix(String prefix) {
+ return SCHEMA_TEMPLATE_STRING.replace("$$", prefix);
+ }
+
+ private static GenericRecord createRecord(String record, String prefix, Schema schema) {
+ GenericRecord genericRecord = new GenericData.Record(schema);
+ genericRecord.put(prefix + "full", record);
+ genericRecord.put(prefix + "suffix", record.substring(1));
+ return genericRecord;
+ }
+
+ private static class TestDynamicDestinations
+ extends DynamicAvroDestinations<String, String, GenericRecord> {
+ ResourceId baseDir;
+ PCollectionView<Map<String, String>> schemaView;
+
+ TestDynamicDestinations(ResourceId baseDir, PCollectionView<Map<String, String>> schemaView) {
+ this.baseDir = baseDir;
+ this.schemaView = schemaView;
+ }
+
+ @Override
+ public Schema getSchema(String destination) {
+ // Return a per-destination schema.
+ String schema = sideInput(schemaView).get(destination);
+ return new Schema.Parser().parse(schema);
+ }
+
+ @Override
+ public List<PCollectionView<?>> getSideInputs() {
+ return ImmutableList.<PCollectionView<?>>of(schemaView);
+ }
+
+ @Override
+ public GenericRecord formatRecord(String record) {
+ String prefix = record.substring(0, 1);
+ return createRecord(record, prefix, getSchema(prefix));
+ }
+
+ @Override
+ public String getDestination(String element) {
+ // Destination is based on first character of string.
+ return element.substring(0, 1);
+ }
+
+ @Override
+ public String getDefaultDestination() {
+ return "";
+ }
+
+ @Override
+ public FilenamePolicy getFilenamePolicy(String destination) {
+ return DefaultFilenamePolicy.fromStandardParameters(
+ StaticValueProvider.of(
+ baseDir.resolve("file_" + destination + ".txt", StandardResolveOptions.RESOLVE_FILE)),
+ null,
+ null,
+ false);
+ }
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testDynamicDestinations() throws Exception {
+ ResourceId baseDir =
+ FileSystems.matchNewResource(
+ Files.createTempDirectory(tmpFolder.getRoot().toPath(), "testDynamicDestinations")
+ .toString(),
+ true);
+
+ List<String> elements = Lists.newArrayList("aaaa", "aaab", "baaa", "baab", "caaa", "caab");
+ List<GenericRecord> expectedElements = Lists.newArrayListWithExpectedSize(elements.size());
+ Map<String, String> schemaMap = Maps.newHashMap();
+ for (String element : elements) {
+ String prefix = element.substring(0, 1);
+ String jsonSchema = schemaFromPrefix(prefix);
+ schemaMap.put(prefix, jsonSchema);
+ expectedElements.add(createRecord(element, prefix, new Schema.Parser().parse(jsonSchema)));
+ }
+ PCollectionView<Map<String, String>> schemaView =
+ writePipeline
+ .apply("createSchemaView", Create.of(schemaMap))
+ .apply(View.<String, String>asMap());
+
+ PCollection<String> input =
+ writePipeline.apply("createInput", Create.of(elements).withCoder(StringUtf8Coder.of()));
+ input.apply(
+ AvroIO.<String>writeCustomTypeToGenericRecords()
+ .to(new TestDynamicDestinations(baseDir, schemaView))
+ .withoutSharding()
+ .withTempDirectory(baseDir));
+ writePipeline.run();
+
+ // Validate that the data written matches the expected elements in the expected order.
+
+ List<String> prefixes = Lists.newArrayList();
+ for (String element : elements) {
+ prefixes.add(element.substring(0, 1));
+ }
+ prefixes = ImmutableSet.copyOf(prefixes).asList();
+
+ List<GenericRecord> actualElements = new ArrayList<>();
+ for (String prefix : prefixes) {
+ File expectedFile =
+ new File(
+ baseDir
+ .resolve(
+ "file_" + prefix + ".txt-00000-of-00001", StandardResolveOptions.RESOLVE_FILE)
+ .toString());
+ assertTrue("Expected output file " + expectedFile.getAbsolutePath(), expectedFile.exists());
+ Schema schema = new Schema.Parser().parse(schemaFromPrefix(prefix));
+ try (DataFileReader<GenericRecord> reader =
+ new DataFileReader<>(expectedFile, new GenericDatumReader<GenericRecord>(schema))) {
+ Iterators.addAll(actualElements, reader);
+ }
+ expectedFile.delete();
+ }
+ assertThat(actualElements, containsInAnyOrder(expectedElements.toArray()));
+ }
+
@Test
public void testWriteWithDefaultCodec() throws Exception {
AvroIO.Write<String> write = AvroIO.write(String.class).to("/tmp/foo/baz");
- assertEquals(CodecFactory.deflateCodec(6).toString(), write.getCodec().toString());
+ assertEquals(CodecFactory.deflateCodec(6).toString(), write.inner.getCodec().toString());
}
@Test
public void testWriteWithCustomCodec() throws Exception {
AvroIO.Write<String> write =
AvroIO.write(String.class).to("/tmp/foo/baz").withCodec(CodecFactory.snappyCodec());
- assertEquals(SNAPPY_CODEC, write.getCodec().toString());
+ assertEquals(SNAPPY_CODEC, write.inner.getCodec().toString());
}
@Test
@@ -556,7 +693,7 @@ public class AvroIOTest {
assertEquals(
CodecFactory.deflateCodec(9).toString(),
- SerializableUtils.clone(write.getCodec()).getCodec().toString());
+ SerializableUtils.clone(write.inner.getCodec()).getCodec().toString());
}
@Test
@@ -567,7 +704,7 @@ public class AvroIOTest {
assertEquals(
CodecFactory.xzCodec(9).toString(),
- SerializableUtils.clone(write.getCodec()).getCodec().toString());
+ SerializableUtils.clone(write.inner.getCodec()).getCodec().toString());
}
@Test
@@ -618,7 +755,8 @@ public class AvroIOTest {
String shardNameTemplate =
firstNonNull(
- write.getShardTemplate(), DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
+ write.inner.getShardTemplate(),
+ DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
assertTestOutputs(expectedElements, numShards, outputFilePrefix, shardNameTemplate);
}
@@ -710,7 +848,13 @@ public class AvroIOTest {
assertThat(displayData, hasDisplayItem("filePrefix", "/foo"));
assertThat(displayData, hasDisplayItem("shardNameTemplate", "-SS-of-NN-"));
assertThat(displayData, hasDisplayItem("fileSuffix", "bar"));
- assertThat(displayData, hasDisplayItem("schema", GenericClass.class));
+ assertThat(
+ displayData,
+ hasDisplayItem(
+ "schema",
+ "{\"type\":\"record\",\"name\":\"GenericClass\",\"namespace\":\"org.apache.beam.sdk.io"
+ + ".AvroIOTest$\",\"fields\":[{\"name\":\"intField\",\"type\":\"int\"},"
+ + "{\"name\":\"stringField\",\"type\":\"string\"}]}"));
assertThat(displayData, hasDisplayItem("numShards", 100));
assertThat(displayData, hasDisplayItem("codec", CodecFactory.snappyCodec().toString()));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index a6ad746..ff30e33 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -231,7 +231,7 @@ public class FileBasedSinkTest {
SimpleSink.makeSimpleSink(
getBaseOutputDirectory(), prefix, "", "", CompressionType.UNCOMPRESSED);
- WriteOperation<String, Void> writeOp =
+ WriteOperation<Void, String> writeOp =
new SimpleSink.SimpleWriteOperation<>(sink, tempDirectory);
List<File> temporaryFiles = new ArrayList<>();
@@ -482,11 +482,11 @@ public class FileBasedSinkTest {
public void testFileBasedWriterWithWritableByteChannelFactory() throws Exception {
final String testUid = "testId";
ResourceId root = getBaseOutputDirectory();
- WriteOperation<String, Void> writeOp =
+ WriteOperation<Void, String> writeOp =
SimpleSink.makeSimpleSink(
root, "file", "-SS-of-NN", "txt", new DrunkWritableByteChannelFactory())
.createWriteOperation();
- final Writer<String, Void> writer = writeOp.createWriter();
+ final Writer<Void, String> writer = writeOp.createWriter();
final ResourceId expectedFile =
writeOp.tempDirectory.get().resolve(testUid, StandardResolveOptions.RESOLVE_FILE);
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
index 9196178..382898d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
@@ -28,10 +28,10 @@ import org.apache.beam.sdk.util.MimeTypes;
/**
* A simple {@link FileBasedSink} that writes {@link String} values as lines with header and footer.
*/
-class SimpleSink<DestinationT> extends FileBasedSink<String, DestinationT> {
+class SimpleSink<DestinationT> extends FileBasedSink<String, DestinationT, String> {
public SimpleSink(
ResourceId tempDirectory,
- DynamicDestinations<String, DestinationT> dynamicDestinations,
+ DynamicDestinations<String, DestinationT, String> dynamicDestinations,
WritableByteChannelFactory writableByteChannelFactory) {
super(StaticValueProvider.of(tempDirectory), dynamicDestinations, writableByteChannelFactory);
}
@@ -50,7 +50,7 @@ class SimpleSink<DestinationT> extends FileBasedSink<String, DestinationT> {
String shardTemplate,
String suffix,
WritableByteChannelFactory writableByteChannelFactory) {
- DynamicDestinations<String, Void> dynamicDestinations =
+ DynamicDestinations<String, Void, String> dynamicDestinations =
DynamicFileDestinations.constant(
DefaultFilenamePolicy.fromParams(
new Params()
@@ -67,7 +67,7 @@ class SimpleSink<DestinationT> extends FileBasedSink<String, DestinationT> {
}
static final class SimpleWriteOperation<DestinationT>
- extends WriteOperation<String, DestinationT> {
+ extends WriteOperation<DestinationT, String> {
public SimpleWriteOperation(SimpleSink sink, ResourceId tempOutputDirectory) {
super(sink, tempOutputDirectory);
}
@@ -82,7 +82,7 @@ class SimpleSink<DestinationT> extends FileBasedSink<String, DestinationT> {
}
}
- static final class SimpleWriter<DestinationT> extends Writer<String, DestinationT> {
+ static final class SimpleWriter<DestinationT> extends Writer<DestinationT, String> {
static final String HEADER = "header";
static final String FOOTER = "footer";
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
index a73ed7d..7f80c26 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
@@ -110,7 +110,8 @@ public class TextIOWriteTest {
});
}
- static class TestDynamicDestinations extends FileBasedSink.DynamicDestinations<String, String> {
+ static class TestDynamicDestinations
+ extends FileBasedSink.DynamicDestinations<String, String, String> {
ResourceId baseDir;
TestDynamicDestinations(ResourceId baseDir) {
@@ -118,6 +119,11 @@ public class TextIOWriteTest {
}
@Override
+ public String formatRecord(String record) {
+ return record;
+ }
+
+ @Override
public String getDestination(String element) {
// Destination is based on first character of string.
return element.substring(0, 1);
@@ -169,10 +175,7 @@ public class TextIOWriteTest {
List<String> elements = Lists.newArrayList("aaaa", "aaab", "baaa", "baab", "caaa", "caab");
PCollection<String> input = p.apply(Create.of(elements).withCoder(StringUtf8Coder.of()));
- input.apply(
- TextIO.write()
- .to(new TestDynamicDestinations(baseDir))
- .withTempDirectory(FileSystems.matchNewResource(baseDir.toString(), true)));
+ input.apply(TextIO.write().to(new TestDynamicDestinations(baseDir)).withTempDirectory(baseDir));
p.run();
assertOutputFiles(
@@ -268,8 +271,14 @@ public class TextIOWriteTest {
new UserWriteType("caab", "sixth"));
PCollection<UserWriteType> input = p.apply(Create.of(elements));
input.apply(
- TextIO.writeCustomType(new SerializeUserWrite())
- .to(new UserWriteDestination(baseDir), new DefaultFilenamePolicy.Params())
+ TextIO.<UserWriteType>writeCustomType()
+ .to(
+ new UserWriteDestination(baseDir),
+ new DefaultFilenamePolicy.Params()
+ .withBaseFilename(
+ baseDir.resolve(
+ "empty", ResolveOptions.StandardResolveOptions.RESOLVE_FILE)))
+ .withFormatFunction(new SerializeUserWrite())
.withTempDirectory(FileSystems.matchNewResource(baseDir.toString(), true)));
p.run();