You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/07/11 01:24:05 UTC
[2/5] beam git commit: Adds DynamicDestinations support to
FileBasedSink
http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
index 511d697..b57b28c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
@@ -34,27 +34,29 @@ import org.apache.beam.sdk.util.MimeTypes;
* '\n'} represented in {@code UTF-8} format as the record separator. Each record (including the
* last) is terminated.
*/
-class TextSink extends FileBasedSink<String> {
+class TextSink<UserT, DestinationT> extends FileBasedSink<String, DestinationT> {
@Nullable private final String header;
@Nullable private final String footer;
TextSink(
ValueProvider<ResourceId> baseOutputFilename,
- FilenamePolicy filenamePolicy,
+ DynamicDestinations<UserT, DestinationT> dynamicDestinations,
@Nullable String header,
@Nullable String footer,
WritableByteChannelFactory writableByteChannelFactory) {
- super(baseOutputFilename, filenamePolicy, writableByteChannelFactory);
+ super(baseOutputFilename, dynamicDestinations, writableByteChannelFactory);
this.header = header;
this.footer = footer;
}
+
@Override
- public WriteOperation<String> createWriteOperation() {
- return new TextWriteOperation(this, header, footer);
+ public WriteOperation<String, DestinationT> createWriteOperation() {
+ return new TextWriteOperation<>(this, header, footer);
}
/** A {@link WriteOperation WriteOperation} for text files. */
- private static class TextWriteOperation extends WriteOperation<String> {
+ private static class TextWriteOperation<DestinationT>
+ extends WriteOperation<String, DestinationT> {
@Nullable private final String header;
@Nullable private final String footer;
@@ -65,20 +67,20 @@ class TextSink extends FileBasedSink<String> {
}
@Override
- public Writer<String> createWriter() throws Exception {
- return new TextWriter(this, header, footer);
+ public Writer<String, DestinationT> createWriter() throws Exception {
+ return new TextWriter<>(this, header, footer);
}
}
/** A {@link Writer Writer} for text files. */
- private static class TextWriter extends Writer<String> {
+ private static class TextWriter<DestinationT> extends Writer<String, DestinationT> {
private static final String NEWLINE = "\n";
@Nullable private final String header;
@Nullable private final String footer;
private OutputStreamWriter out;
public TextWriter(
- WriteOperation<String> writeOperation,
+ WriteOperation<String, DestinationT> writeOperation,
@Nullable String header,
@Nullable String footer) {
super(writeOperation, MimeTypes.TEXT);
http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index a220eab..7013044 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -20,9 +20,12 @@ package org.apache.beam.sdk.io;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.hash.Hashing;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -30,8 +33,11 @@ import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ShardedKeyCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.FileBasedSink.FileResult;
@@ -47,6 +53,7 @@ import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -55,6 +62,7 @@ import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
@@ -62,6 +70,7 @@ import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.ShardedKey;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.slf4j.Logger;
@@ -72,13 +81,12 @@ import org.slf4j.LoggerFactory;
* global initialization of a sink, followed by a parallel write, and ends with a sequential
* finalization of the write. The output of a write is {@link PDone}.
*
- * <p>By default, every bundle in the input {@link PCollection} will be processed by a
- * {@link WriteOperation}, so the number of output
- * will vary based on runner behavior, though at least 1 output will always be produced. The
- * exact parallelism of the write stage can be controlled using {@link WriteFiles#withNumShards},
- * typically used to control how many files are produced or to globally limit the number of
- * workers connecting to an external service. However, this option can often hurt performance: it
- * adds an additional {@link GroupByKey} to the pipeline.
+ * <p>By default, every bundle in the input {@link PCollection} will be processed by a {@link
+ * WriteOperation}, so the number of output will vary based on runner behavior, though at least 1
+ * output will always be produced. The exact parallelism of the write stage can be controlled using
+ * {@link WriteFiles#withNumShards}, typically used to control how many files are produced or to
+ * globally limit the number of workers connecting to an external service. However, this option can
+ * often hurt performance: it adds an additional {@link GroupByKey} to the pipeline.
*
* <p>Example usage with runner-determined sharding:
*
@@ -89,7 +97,8 @@ import org.slf4j.LoggerFactory;
* <pre>{@code p.apply(WriteFiles.to(new MySink(...)).withNumShards(3));}</pre>
*/
@Experimental(Experimental.Kind.SOURCE_SINK)
-public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
+public class WriteFiles<UserT, DestinationT, OutputT>
+ extends PTransform<PCollection<UserT>, PDone> {
private static final Logger LOG = LoggerFactory.getLogger(WriteFiles.class);
// The maximum number of file writers to keep open in a single bundle at a time, since file
@@ -105,12 +114,12 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
private static final int SPILLED_RECORD_SHARDING_FACTOR = 10;
static final int UNKNOWN_SHARDNUM = -1;
- private FileBasedSink<T> sink;
- private WriteOperation<T> writeOperation;
+ private FileBasedSink<OutputT, DestinationT> sink;
+ private SerializableFunction<UserT, OutputT> formatFunction;
+ private WriteOperation<OutputT, DestinationT> writeOperation;
// This allows the number of shards to be dynamically computed based on the input
// PCollection.
- @Nullable
- private final PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards;
+ @Nullable private final PTransform<PCollection<UserT>, PCollectionView<Integer>> computeNumShards;
// We don't use a side input for static sharding, as we want this value to be updatable
// when a pipeline is updated.
@Nullable
@@ -122,19 +131,28 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
* Creates a {@link WriteFiles} transform that writes to the given {@link FileBasedSink}, letting
* the runner control how many different shards are produced.
*/
- public static <T> WriteFiles<T> to(FileBasedSink<T> sink) {
+ public static <UserT, DestinationT, OutputT> WriteFiles<UserT, DestinationT, OutputT> to(
+ FileBasedSink<OutputT, DestinationT> sink,
+ SerializableFunction<UserT, OutputT> formatFunction) {
checkNotNull(sink, "sink");
- return new WriteFiles<>(sink, null /* runner-determined sharding */, null,
- false, DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE);
+ return new WriteFiles<>(
+ sink,
+ formatFunction,
+ null /* runner-determined sharding */,
+ null,
+ false,
+ DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE);
}
private WriteFiles(
- FileBasedSink<T> sink,
- @Nullable PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards,
+ FileBasedSink<OutputT, DestinationT> sink,
+ SerializableFunction<UserT, OutputT> formatFunction,
+ @Nullable PTransform<PCollection<UserT>, PCollectionView<Integer>> computeNumShards,
@Nullable ValueProvider<Integer> numShardsProvider,
boolean windowedWrites,
int maxNumWritersPerBundle) {
this.sink = sink;
+ this.formatFunction = checkNotNull(formatFunction);
this.computeNumShards = computeNumShards;
this.numShardsProvider = numShardsProvider;
this.windowedWrites = windowedWrites;
@@ -142,7 +160,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
}
@Override
- public PDone expand(PCollection<T> input) {
+ public PDone expand(PCollection<UserT> input) {
if (input.isBounded() == IsBounded.UNBOUNDED) {
checkArgument(windowedWrites,
"Must use windowed writes when applying %s to an unbounded PCollection",
@@ -181,13 +199,16 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
}
}
- /**
- * Returns the {@link FileBasedSink} associated with this PTransform.
- */
- public FileBasedSink<T> getSink() {
+ /** Returns the {@link FileBasedSink} associated with this PTransform. */
+ public FileBasedSink<OutputT, DestinationT> getSink() {
return sink;
}
+ /** Returns the the format function that maps the user type to the record written to files. */
+ public SerializableFunction<UserT, OutputT> getFormatFunction() {
+ return formatFunction;
+ }
+
/**
* Returns whether or not to perform windowed writes.
*/
@@ -202,7 +223,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
* #withRunnerDeterminedSharding()}.
*/
@Nullable
- public PTransform<PCollection<T>, PCollectionView<Integer>> getSharding() {
+ public PTransform<PCollection<UserT>, PCollectionView<Integer>> getSharding() {
return computeNumShards;
}
@@ -220,7 +241,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
* <p>A value less than or equal to 0 will be equivalent to the default behavior of
* runner-determined sharding.
*/
- public WriteFiles<T> withNumShards(int numShards) {
+ public WriteFiles<UserT, DestinationT, OutputT> withNumShards(int numShards) {
if (numShards > 0) {
return withNumShards(StaticValueProvider.of(numShards));
}
@@ -234,16 +255,26 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
* <p>This option should be used sparingly as it can hurt performance. See {@link WriteFiles} for
* more information.
*/
- public WriteFiles<T> withNumShards(ValueProvider<Integer> numShardsProvider) {
- return new WriteFiles<>(sink, null, numShardsProvider, windowedWrites,
+ public WriteFiles<UserT, DestinationT, OutputT> withNumShards(
+ ValueProvider<Integer> numShardsProvider) {
+ return new WriteFiles<>(
+ sink,
+ formatFunction,
+ computeNumShards,
+ numShardsProvider,
+ windowedWrites,
maxNumWritersPerBundle);
}
- /**
- * Set the maximum number of writers created in a bundle before spilling to shuffle.
- */
- public WriteFiles<T> withMaxNumWritersPerBundle(int maxNumWritersPerBundle) {
- return new WriteFiles<>(sink, null, numShardsProvider, windowedWrites,
+ /** Set the maximum number of writers created in a bundle before spilling to shuffle. */
+ public WriteFiles<UserT, DestinationT, OutputT> withMaxNumWritersPerBundle(
+ int maxNumWritersPerBundle) {
+ return new WriteFiles<>(
+ sink,
+ formatFunction,
+ computeNumShards,
+ numShardsProvider,
+ windowedWrites,
maxNumWritersPerBundle);
}
@@ -254,97 +285,167 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
* <p>This option should be used sparingly as it can hurt performance. See {@link WriteFiles} for
* more information.
*/
- public WriteFiles<T> withSharding(PTransform<PCollection<T>, PCollectionView<Integer>> sharding) {
+ public WriteFiles<UserT, DestinationT, OutputT> withSharding(
+ PTransform<PCollection<UserT>, PCollectionView<Integer>> sharding) {
checkNotNull(
sharding, "Cannot provide null sharding. Use withRunnerDeterminedSharding() instead");
- return new WriteFiles<>(sink, sharding, null, windowedWrites, maxNumWritersPerBundle);
+ return new WriteFiles<>(
+ sink, formatFunction, sharding, null, windowedWrites, maxNumWritersPerBundle);
}
/**
* Returns a new {@link WriteFiles} that will write to the current {@link FileBasedSink} with
* runner-determined sharding.
*/
- public WriteFiles<T> withRunnerDeterminedSharding() {
- return new WriteFiles<>(sink, null, null, windowedWrites, maxNumWritersPerBundle);
+ public WriteFiles<UserT, DestinationT, OutputT> withRunnerDeterminedSharding() {
+ return new WriteFiles<>(
+ sink, formatFunction, null, null, windowedWrites, maxNumWritersPerBundle);
}
/**
* Returns a new {@link WriteFiles} that writes preserves windowing on it's input.
*
- * <p>If this option is not specified, windowing and triggering are replaced by
- * {@link GlobalWindows} and {@link DefaultTrigger}.
+ * <p>If this option is not specified, windowing and triggering are replaced by {@link
+ * GlobalWindows} and {@link DefaultTrigger}.
*
- * <p>If there is no data for a window, no output shards will be generated for that window.
- * If a window triggers multiple times, then more than a single output shard might be
- * generated multiple times; it's up to the sink implementation to keep these output shards
- * unique.
+ * <p>If there is no data for a window, no output shards will be generated for that window. If a
+ * window triggers multiple times, then more than a single output shard might be generated
+ * multiple times; it's up to the sink implementation to keep these output shards unique.
*
- * <p>This option can only be used if {@link #withNumShards(int)} is also set to a
- * positive value.
+ * <p>This option can only be used if {@link #withNumShards(int)} is also set to a positive value.
*/
- public WriteFiles<T> withWindowedWrites() {
- return new WriteFiles<>(sink, computeNumShards, numShardsProvider, true,
- maxNumWritersPerBundle);
+ public WriteFiles<UserT, DestinationT, OutputT> withWindowedWrites() {
+ return new WriteFiles<>(
+ sink, formatFunction, computeNumShards, numShardsProvider, true, maxNumWritersPerBundle);
+ }
+
+ private static class WriterKey<DestinationT> {
+ private final BoundedWindow window;
+ private final PaneInfo paneInfo;
+ private final DestinationT destination;
+
+ WriterKey(BoundedWindow window, PaneInfo paneInfo, DestinationT destination) {
+ this.window = window;
+ this.paneInfo = paneInfo;
+ this.destination = destination;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof WriterKey)) {
+ return false;
+ }
+ WriterKey other = (WriterKey) o;
+ return Objects.equal(window, other.window)
+ && Objects.equal(paneInfo, other.paneInfo)
+ && Objects.equal(destination, other.destination);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(window, paneInfo, destination);
+ }
+ }
+
+ // Hash the destination in a manner that we can then use as a key in a GBK. Since Java's
+ // hashCode isn't guaranteed to be stable across machines, we instead serialize the destination
+ // and use murmur3_32 to hash it. We enforce that destinationCoder must be deterministic, so
+ // this can be used as a key.
+ private static <DestinationT> int hashDestination(
+ DestinationT destination, Coder<DestinationT> destinationCoder) throws IOException {
+ return Hashing.murmur3_32()
+ .hashBytes(CoderUtils.encodeToByteArray(destinationCoder, destination))
+ .asInt();
}
/**
- * Writes all the elements in a bundle using a {@link Writer} produced by the
- * {@link WriteOperation} associated with the {@link FileBasedSink} with windowed writes enabled.
+ * Writes all the elements in a bundle using a {@link Writer} produced by the {@link
+ * WriteOperation} associated with the {@link FileBasedSink}.
*/
- private class WriteWindowedBundles extends DoFn<T, FileResult> {
- private final TupleTag<KV<Integer, T>> unwrittedRecordsTag;
- private Map<KV<BoundedWindow, PaneInfo>, Writer<T>> windowedWriters;
- int spilledShardNum = UNKNOWN_SHARDNUM;
-
- WriteWindowedBundles(TupleTag<KV<Integer, T>> unwrittedRecordsTag) {
- this.unwrittedRecordsTag = unwrittedRecordsTag;
+ private class WriteBundles extends DoFn<UserT, FileResult<DestinationT>> {
+ private final TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittenRecordsTag;
+ private final Coder<DestinationT> destinationCoder;
+ private final boolean windowedWrites;
+
+ private Map<WriterKey<DestinationT>, Writer<OutputT, DestinationT>> writers;
+ private int spilledShardNum = UNKNOWN_SHARDNUM;
+
+ WriteBundles(
+ boolean windowedWrites,
+ TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittenRecordsTag,
+ Coder<DestinationT> destinationCoder) {
+ this.windowedWrites = windowedWrites;
+ this.unwrittenRecordsTag = unwrittenRecordsTag;
+ this.destinationCoder = destinationCoder;
}
@StartBundle
public void startBundle(StartBundleContext c) {
// Reset state in case of reuse. We need to make sure that each bundle gets unique writers.
- windowedWriters = Maps.newHashMap();
+ writers = Maps.newHashMap();
}
@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
PaneInfo paneInfo = c.pane();
- Writer<T> writer;
// If we are doing windowed writes, we need to ensure that we have separate files for
- // data in different windows/panes.
- KV<BoundedWindow, PaneInfo> key = KV.of(window, paneInfo);
- writer = windowedWriters.get(key);
+ // data in different windows/panes. Similar for dynamic writes, make sure that different
+ // destinations go to different writers.
+ // In the case of unwindowed writes, the window and the pane will always be the same, and
+ // the map will only have a single element.
+ DestinationT destination = sink.getDynamicDestinations().getDestination(c.element());
+ WriterKey<DestinationT> key = new WriterKey<>(window, c.pane(), destination);
+ Writer<OutputT, DestinationT> writer = writers.get(key);
if (writer == null) {
- if (windowedWriters.size() <= maxNumWritersPerBundle) {
+ if (writers.size() <= maxNumWritersPerBundle) {
String uuid = UUID.randomUUID().toString();
LOG.info(
- "Opening writer {} for write operation {}, window {} pane {}",
+ "Opening writer {} for write operation {}, window {} pane {} destination {}",
uuid,
writeOperation,
window,
- paneInfo);
+ paneInfo,
+ destination);
writer = writeOperation.createWriter();
- writer.openWindowed(uuid, window, paneInfo, UNKNOWN_SHARDNUM);
- windowedWriters.put(key, writer);
+ if (windowedWrites) {
+ writer.openWindowed(uuid, window, paneInfo, UNKNOWN_SHARDNUM, destination);
+ } else {
+ writer.openUnwindowed(uuid, UNKNOWN_SHARDNUM, destination);
+ }
+ writers.put(key, writer);
LOG.debug("Done opening writer");
} else {
if (spilledShardNum == UNKNOWN_SHARDNUM) {
+ // Cache the random value so we only call ThreadLocalRandom once per DoFn instance.
spilledShardNum = ThreadLocalRandom.current().nextInt(SPILLED_RECORD_SHARDING_FACTOR);
} else {
spilledShardNum = (spilledShardNum + 1) % SPILLED_RECORD_SHARDING_FACTOR;
}
- c.output(unwrittedRecordsTag, KV.of(spilledShardNum, c.element()));
+ c.output(
+ unwrittenRecordsTag,
+ KV.of(
+ ShardedKey.of(hashDestination(destination, destinationCoder), spilledShardNum),
+ c.element()));
return;
}
}
- writeOrClose(writer, c.element());
+ writeOrClose(writer, formatFunction.apply(c.element()));
}
@FinishBundle
public void finishBundle(FinishBundleContext c) throws Exception {
- for (Map.Entry<KV<BoundedWindow, PaneInfo>, Writer<T>> entry : windowedWriters.entrySet()) {
- FileResult result = entry.getValue().close();
- BoundedWindow window = entry.getKey().getKey();
+ for (Map.Entry<WriterKey<DestinationT>, Writer<OutputT, DestinationT>> entry :
+ writers.entrySet()) {
+ Writer<OutputT, DestinationT> writer = entry.getValue();
+ FileResult<DestinationT> result;
+ try {
+ result = writer.close();
+ } catch (Exception e) {
+ // If anything goes wrong, make sure to delete the temporary file.
+ writer.cleanup();
+ throw e;
+ }
+ BoundedWindow window = entry.getKey().window;
c.output(result, window.maxTimestamp(), window);
}
}
@@ -355,90 +456,62 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
}
}
- /**
- * Writes all the elements in a bundle using a {@link Writer} produced by the
- * {@link WriteOperation} associated with the {@link FileBasedSink} with windowed writes disabled.
- */
- private class WriteUnwindowedBundles extends DoFn<T, FileResult> {
- // Writer that will write the records in this bundle. Lazily
- // initialized in processElement.
- private Writer<T> writer = null;
- private BoundedWindow window = null;
-
- @StartBundle
- public void startBundle(StartBundleContext c) {
- // Reset state in case of reuse. We need to make sure that each bundle gets unique writers.
- writer = null;
- }
-
- @ProcessElement
- public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
- // Cache a single writer for the bundle.
- if (writer == null) {
- LOG.info("Opening writer for write operation {}", writeOperation);
- writer = writeOperation.createWriter();
- writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM);
- LOG.debug("Done opening writer");
- }
- this.window = window;
- writeOrClose(this.writer, c.element());
- }
+ enum ShardAssignment { ASSIGN_IN_FINALIZE, ASSIGN_WHEN_WRITING }
- @FinishBundle
- public void finishBundle(FinishBundleContext c) throws Exception {
- if (writer == null) {
- return;
- }
- FileResult result = writer.close();
- c.output(result, window.maxTimestamp(), window);
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- builder.delegate(WriteFiles.this);
- }
- }
-
- enum ShardAssignment { ASSIGN_IN_FINALIZE, ASSIGN_WHEN_WRITING };
-
- /**
- * Like {@link WriteWindowedBundles} and {@link WriteUnwindowedBundles}, but where the elements
- * for each shard have been collected into a single iterable.
+ /*
+ * Like {@link WriteBundles}, but where the elements for each shard have been collected into a
+ * single iterable.
*/
- private class WriteShardedBundles extends DoFn<KV<Integer, Iterable<T>>, FileResult> {
+ private class WriteShardedBundles
+ extends DoFn<KV<ShardedKey<Integer>, Iterable<UserT>>, FileResult<DestinationT>> {
ShardAssignment shardNumberAssignment;
WriteShardedBundles(ShardAssignment shardNumberAssignment) {
this.shardNumberAssignment = shardNumberAssignment;
}
+
@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
- // In a sharded write, single input element represents one shard. We can open and close
- // the writer in each call to processElement.
- LOG.info("Opening writer for write operation {}", writeOperation);
- Writer<T> writer = writeOperation.createWriter();
- if (windowedWrites) {
- int shardNumber = shardNumberAssignment == ShardAssignment.ASSIGN_WHEN_WRITING
- ? c.element().getKey() : UNKNOWN_SHARDNUM;
- writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), shardNumber);
- } else {
- writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM);
- }
- LOG.debug("Done opening writer");
-
- try {
- for (T t : c.element().getValue()) {
- writeOrClose(writer, t);
+ // Since we key by a 32-bit hash of the destination, there might be multiple destinations
+ // in this iterable. The number of destinations is generally very small (1000s or less), so
+ // there will rarely be hash collisions.
+ Map<DestinationT, Writer<OutputT, DestinationT>> writers = Maps.newHashMap();
+ for (UserT input : c.element().getValue()) {
+ DestinationT destination = sink.getDynamicDestinations().getDestination(input);
+ Writer<OutputT, DestinationT> writer = writers.get(destination);
+ if (writer == null) {
+ LOG.debug("Opening writer for write operation {}", writeOperation);
+ writer = writeOperation.createWriter();
+ if (windowedWrites) {
+ int shardNumber =
+ shardNumberAssignment == ShardAssignment.ASSIGN_WHEN_WRITING
+ ? c.element().getKey().getShardNumber()
+ : UNKNOWN_SHARDNUM;
+ writer.openWindowed(
+ UUID.randomUUID().toString(), window, c.pane(), shardNumber, destination);
+ } else {
+ writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, destination);
+ }
+ LOG.debug("Done opening writer");
+ writers.put(destination, writer);
+ }
+ writeOrClose(writer, formatFunction.apply(input));
}
- // Close the writer; if this throws let the error propagate.
- FileResult result = writer.close();
- c.output(result);
- } catch (Exception e) {
- // If anything goes wrong, make sure to delete the temporary file.
- writer.cleanup();
- throw e;
+ // Close all writers.
+ for (Map.Entry<DestinationT, Writer<OutputT, DestinationT>> entry : writers.entrySet()) {
+ Writer<OutputT, DestinationT> writer = entry.getValue();
+ FileResult<DestinationT> result;
+ try {
+ // Close the writer; if this throws let the error propagate.
+ result = writer.close();
+ c.output(result);
+ } catch (Exception e) {
+ // If anything goes wrong, make sure to delete the temporary file.
+ writer.cleanup();
+ throw e;
+ }
+ }
}
- }
@Override
public void populateDisplayData(DisplayData.Builder builder) {
@@ -446,12 +519,15 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
}
}
- private static <T> void writeOrClose(Writer<T> writer, T t) throws Exception {
+ private static <OutputT, DestinationT> void writeOrClose(
+ Writer<OutputT, DestinationT> writer, OutputT t) throws Exception {
try {
writer.write(t);
} catch (Exception e) {
try {
writer.close();
+ // If anything goes wrong, make sure to delete the temporary file.
+ writer.cleanup();
} catch (Exception closeException) {
if (closeException instanceof InterruptedException) {
// Do not silently ignore interrupted state.
@@ -464,20 +540,25 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
}
}
- private static class ApplyShardingKey<T> extends DoFn<T, KV<Integer, T>> {
+ private class ApplyShardingKey extends DoFn<UserT, KV<ShardedKey<Integer>, UserT>> {
private final PCollectionView<Integer> numShardsView;
private final ValueProvider<Integer> numShardsProvider;
+ private final Coder<DestinationT> destinationCoder;
+
private int shardNumber;
- ApplyShardingKey(PCollectionView<Integer> numShardsView,
- ValueProvider<Integer> numShardsProvider) {
+ ApplyShardingKey(
+ PCollectionView<Integer> numShardsView,
+ ValueProvider<Integer> numShardsProvider,
+ Coder<DestinationT> destinationCoder) {
+ this.destinationCoder = destinationCoder;
this.numShardsView = numShardsView;
this.numShardsProvider = numShardsProvider;
shardNumber = UNKNOWN_SHARDNUM;
}
@ProcessElement
- public void processElement(ProcessContext context) {
+ public void processElement(ProcessContext context) throws IOException {
final int shardCount;
if (numShardsView != null) {
shardCount = context.sideInput(numShardsView);
@@ -497,86 +578,110 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
} else {
shardNumber = (shardNumber + 1) % shardCount;
}
- context.output(KV.of(shardNumber, context.element()));
+ // We avoid using destination itself as a sharding key, because destination is often large.
+ // e.g. when using {@link DefaultFilenamePolicy}, the destination contains the entire path
+ // to the file. Often most of the path is constant across all destinations, just the path
+ // suffix is appended by the destination function. Instead we key by a 32-bit hash (carefully
+ // chosen to be guaranteed stable), and call getDestination again in the next ParDo to resolve
+ // the destinations. This does mean that multiple destinations might end up on the same shard,
+ // however the number of collisions should be small, so there's no need to worry about memory
+ // issues.
+ DestinationT destination = sink.getDynamicDestinations().getDestination(context.element());
+ context.output(
+ KV.of(
+ ShardedKey.of(hashDestination(destination, destinationCoder), shardNumber),
+ context.element()));
}
}
/**
* A write is performed as sequence of three {@link ParDo}'s.
*
- * <p>This singleton collection containing the WriteOperation is then used as a side
- * input to a ParDo over the PCollection of elements to write. In this bundle-writing phase,
- * {@link WriteOperation#createWriter} is called to obtain a {@link Writer}.
- * {@link Writer#open} and {@link Writer#close} are called in
- * {@link DoFn.StartBundle} and {@link DoFn.FinishBundle}, respectively, and
- * {@link Writer#write} method is called for every element in the bundle. The output
- * of this ParDo is a PCollection of <i>writer result</i> objects (see {@link FileBasedSink}
- * for a description of writer results)-one for each bundle.
+ * <p>This singleton collection containing the WriteOperation is then used as a side input to a
+ * ParDo over the PCollection of elements to write. In this bundle-writing phase, {@link
+ * WriteOperation#createWriter} is called to obtain a {@link Writer}. {@link Writer#open} and
+ * {@link Writer#close} are called in {@link DoFn.StartBundle} and {@link DoFn.FinishBundle},
+ * respectively, and {@link Writer#write} method is called for every element in the bundle. The
+ * output of this ParDo is a PCollection of <i>writer result</i> objects (see {@link
+ * FileBasedSink} for a description of writer results)-one for each bundle.
*
* <p>The final do-once ParDo uses a singleton collection asinput and the collection of writer
- * results as a side-input. In this ParDo, {@link WriteOperation#finalize} is called
- * to finalize the write.
+ * results as a side-input. In this ParDo, {@link WriteOperation#finalize} is called to finalize
+ * the write.
*
- * <p>If the write of any element in the PCollection fails, {@link Writer#close} will be
- * called before the exception that caused the write to fail is propagated and the write result
- * will be discarded.
+ * <p>If the write of any element in the PCollection fails, {@link Writer#close} will be called
+ * before the exception that caused the write to fail is propagated and the write result will be
+ * discarded.
*
* <p>Since the {@link WriteOperation} is serialized after the initialization ParDo and
* deserialized in the bundle-writing and finalization phases, any state change to the
- * WriteOperation object that occurs during initialization is visible in the latter
- * phases. However, the WriteOperation is not serialized after the bundle-writing
- * phase. This is why implementations should guarantee that
- * {@link WriteOperation#createWriter} does not mutate WriteOperation).
+ * WriteOperation object that occurs during initialization is visible in the latter phases.
+ * However, the WriteOperation is not serialized after the bundle-writing phase. This is why
+ * implementations should guarantee that {@link WriteOperation#createWriter} does not mutate
+ * WriteOperation).
*/
- private PDone createWrite(PCollection<T> input) {
+ private PDone createWrite(PCollection<UserT> input) {
Pipeline p = input.getPipeline();
if (!windowedWrites) {
// Re-window the data into the global window and remove any existing triggers.
input =
input.apply(
- Window.<T>into(new GlobalWindows())
+ Window.<UserT>into(new GlobalWindows())
.triggering(DefaultTrigger.of())
.discardingFiredPanes());
}
-
// Perform the per-bundle writes as a ParDo on the input PCollection (with the
// WriteOperation as a side input) and collect the results of the writes in a
// PCollection. There is a dependency between this ParDo and the first (the
// WriteOperation PCollection as a side input), so this will happen after the
// initial ParDo.
- PCollection<FileResult> results;
+ PCollection<FileResult<DestinationT>> results;
final PCollectionView<Integer> numShardsView;
@SuppressWarnings("unchecked")
Coder<BoundedWindow> shardedWindowCoder =
(Coder<BoundedWindow>) input.getWindowingStrategy().getWindowFn().windowCoder();
+ final Coder<DestinationT> destinationCoder;
+ try {
+ destinationCoder =
+ sink.getDynamicDestinations()
+ .getDestinationCoderWithDefault(input.getPipeline().getCoderRegistry());
+ destinationCoder.verifyDeterministic();
+ } catch (CannotProvideCoderException | NonDeterministicException e) {
+ throw new RuntimeException(e);
+ }
+
if (computeNumShards == null && numShardsProvider == null) {
numShardsView = null;
- if (windowedWrites) {
- TupleTag<FileResult> writtenRecordsTag = new TupleTag<>("writtenRecordsTag");
- TupleTag<KV<Integer, T>> unwrittedRecordsTag = new TupleTag<>("unwrittenRecordsTag");
- PCollectionTuple writeTuple = input.apply("WriteWindowedBundles", ParDo.of(
- new WriteWindowedBundles(unwrittedRecordsTag))
- .withOutputTags(writtenRecordsTag, TupleTagList.of(unwrittedRecordsTag)));
- PCollection<FileResult> writtenBundleFiles = writeTuple.get(writtenRecordsTag)
- .setCoder(FileResultCoder.of(shardedWindowCoder));
- // Any "spilled" elements are written using WriteShardedBundles. Assign shard numbers in
- // finalize to stay consistent with what WriteWindowedBundles does.
- PCollection<FileResult> writtenGroupedFiles =
- writeTuple
- .get(unwrittedRecordsTag)
- .setCoder(KvCoder.of(VarIntCoder.of(), input.getCoder()))
- .apply("GroupUnwritten", GroupByKey.<Integer, T>create())
- .apply("WriteUnwritten", ParDo.of(
- new WriteShardedBundles(ShardAssignment.ASSIGN_IN_FINALIZE)))
- .setCoder(FileResultCoder.of(shardedWindowCoder));
- results = PCollectionList.of(writtenBundleFiles).and(writtenGroupedFiles)
- .apply(Flatten.<FileResult>pCollections());
- } else {
- results =
- input.apply("WriteUnwindowedBundles", ParDo.of(new WriteUnwindowedBundles()));
- }
+ TupleTag<FileResult<DestinationT>> writtenRecordsTag = new TupleTag<>("writtenRecordsTag");
+ TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittedRecordsTag =
+ new TupleTag<>("unwrittenRecordsTag");
+ String writeName = windowedWrites ? "WriteWindowedBundles" : "WriteBundles";
+ PCollectionTuple writeTuple =
+ input.apply(
+ writeName,
+ ParDo.of(new WriteBundles(windowedWrites, unwrittedRecordsTag, destinationCoder))
+ .withOutputTags(writtenRecordsTag, TupleTagList.of(unwrittedRecordsTag)));
+ PCollection<FileResult<DestinationT>> writtenBundleFiles =
+ writeTuple
+ .get(writtenRecordsTag)
+ .setCoder(FileResultCoder.of(shardedWindowCoder, destinationCoder));
+ // Any "spilled" elements are written using WriteShardedBundles. Assign shard numbers in
+ // finalize to stay consistent with what WriteWindowedBundles does.
+ PCollection<FileResult<DestinationT>> writtenGroupedFiles =
+ writeTuple
+ .get(unwrittedRecordsTag)
+ .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
+ .apply("GroupUnwritten", GroupByKey.<ShardedKey<Integer>, UserT>create())
+ .apply(
+ "WriteUnwritten",
+ ParDo.of(new WriteShardedBundles(ShardAssignment.ASSIGN_IN_FINALIZE)))
+ .setCoder(FileResultCoder.of(shardedWindowCoder, destinationCoder));
+ results =
+ PCollectionList.of(writtenBundleFiles)
+ .and(writtenGroupedFiles)
+ .apply(Flatten.<FileResult<DestinationT>>pCollections());
} else {
List<PCollectionView<?>> sideInputs = Lists.newArrayList();
if (computeNumShards != null) {
@@ -585,23 +690,31 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
} else {
numShardsView = null;
}
-
- PCollection<KV<Integer, Iterable<T>>> sharded =
+ PCollection<KV<ShardedKey<Integer>, Iterable<UserT>>> sharded =
input
- .apply("ApplyShardLabel", ParDo.of(
- new ApplyShardingKey<T>(numShardsView,
- (numShardsView != null) ? null : numShardsProvider))
- .withSideInputs(sideInputs))
- .apply("GroupIntoShards", GroupByKey.<Integer, T>create());
+ .apply(
+ "ApplyShardLabel",
+ ParDo.of(
+ new ApplyShardingKey(
+ numShardsView,
+ (numShardsView != null) ? null : numShardsProvider,
+ destinationCoder))
+ .withSideInputs(sideInputs))
+ .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
+ .apply("GroupIntoShards", GroupByKey.<ShardedKey<Integer>, UserT>create());
+ shardedWindowCoder =
+ (Coder<BoundedWindow>) sharded.getWindowingStrategy().getWindowFn().windowCoder();
// Since this path might be used by streaming runners processing triggers, it's important
// to assign shard numbers here so that they are deterministic. The ASSIGN_IN_FINALIZE
// strategy works by sorting all FileResult objects and assigning them numbers, which is not
// guaranteed to work well when processing triggers - if the finalize step retries it might
// see a different Iterable of FileResult objects, and it will assign different shard numbers.
- results = sharded.apply("WriteShardedBundles",
- ParDo.of(new WriteShardedBundles(ShardAssignment.ASSIGN_WHEN_WRITING)));
+ results =
+ sharded.apply(
+ "WriteShardedBundles",
+ ParDo.of(new WriteShardedBundles(ShardAssignment.ASSIGN_WHEN_WRITING)));
}
- results.setCoder(FileResultCoder.of(shardedWindowCoder));
+ results.setCoder(FileResultCoder.of(shardedWindowCoder, destinationCoder));
if (windowedWrites) {
// When processing streaming windowed writes, results will arrive multiple times. This
@@ -609,26 +722,31 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
// as new data arriving into a side input does not trigger the listening DoFn. Instead
// we aggregate the result set using a singleton GroupByKey, so the DoFn will be triggered
// whenever new data arrives.
- PCollection<KV<Void, FileResult>> keyedResults =
- results.apply("AttachSingletonKey", WithKeys.<Void, FileResult>of((Void) null));
- keyedResults.setCoder(KvCoder.of(VoidCoder.of(),
- FileResultCoder.of(shardedWindowCoder)));
+ PCollection<KV<Void, FileResult<DestinationT>>> keyedResults =
+ results.apply(
+ "AttachSingletonKey", WithKeys.<Void, FileResult<DestinationT>>of((Void) null));
+ keyedResults.setCoder(
+ KvCoder.of(VoidCoder.of(), FileResultCoder.of(shardedWindowCoder, destinationCoder)));
// Is the continuation trigger sufficient?
keyedResults
- .apply("FinalizeGroupByKey", GroupByKey.<Void, FileResult>create())
- .apply("Finalize", ParDo.of(new DoFn<KV<Void, Iterable<FileResult>>, Integer>() {
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- LOG.info("Finalizing write operation {}.", writeOperation);
- List<FileResult> results = Lists.newArrayList(c.element().getValue());
- writeOperation.finalize(results);
- LOG.debug("Done finalizing write operation");
- }
- }));
+ .apply("FinalizeGroupByKey", GroupByKey.<Void, FileResult<DestinationT>>create())
+ .apply(
+ "Finalize",
+ ParDo.of(
+ new DoFn<KV<Void, Iterable<FileResult<DestinationT>>>, Integer>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ LOG.info("Finalizing write operation {}.", writeOperation);
+ List<FileResult<DestinationT>> results =
+ Lists.newArrayList(c.element().getValue());
+ writeOperation.finalize(results);
+ LOG.debug("Done finalizing write operation");
+ }
+ }));
} else {
- final PCollectionView<Iterable<FileResult>> resultsView =
- results.apply(View.<FileResult>asIterable());
+ final PCollectionView<Iterable<FileResult<DestinationT>>> resultsView =
+ results.apply(View.<FileResult<DestinationT>>asIterable());
ImmutableList.Builder<PCollectionView<?>> sideInputs =
ImmutableList.<PCollectionView<?>>builder().add(resultsView);
if (numShardsView != null) {
@@ -644,41 +762,53 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
// set numShards, then all shards will be written out as empty files. For this reason we
// use a side input here.
PCollection<Void> singletonCollection = p.apply(Create.of((Void) null));
- singletonCollection
- .apply("Finalize", ParDo.of(new DoFn<Void, Integer>() {
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- LOG.info("Finalizing write operation {}.", writeOperation);
- List<FileResult> results = Lists.newArrayList(c.sideInput(resultsView));
- LOG.debug("Side input initialized to finalize write operation {}.", writeOperation);
-
- // We must always output at least 1 shard, and honor user-specified numShards if
- // set.
- int minShardsNeeded;
- if (numShardsView != null) {
- minShardsNeeded = c.sideInput(numShardsView);
- } else if (numShardsProvider != null) {
- minShardsNeeded = numShardsProvider.get();
- } else {
- minShardsNeeded = 1;
- }
- int extraShardsNeeded = minShardsNeeded - results.size();
- if (extraShardsNeeded > 0) {
- LOG.info(
- "Creating {} empty output shards in addition to {} written for a total of {}.",
- extraShardsNeeded, results.size(), minShardsNeeded);
- for (int i = 0; i < extraShardsNeeded; ++i) {
- Writer<T> writer = writeOperation.createWriter();
- writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM);
- FileResult emptyWrite = writer.close();
- results.add(emptyWrite);
- }
- LOG.debug("Done creating extra shards.");
- }
- writeOperation.finalize(results);
- LOG.debug("Done finalizing write operation {}", writeOperation);
- }
- }).withSideInputs(sideInputs.build()));
+ singletonCollection.apply(
+ "Finalize",
+ ParDo.of(
+ new DoFn<Void, Integer>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ LOG.info("Finalizing write operation {}.", writeOperation);
+ List<FileResult<DestinationT>> results =
+ Lists.newArrayList(c.sideInput(resultsView));
+ LOG.debug(
+ "Side input initialized to finalize write operation {}.", writeOperation);
+
+ // We must always output at least 1 shard, and honor user-specified numShards
+ // if
+ // set.
+ int minShardsNeeded;
+ if (numShardsView != null) {
+ minShardsNeeded = c.sideInput(numShardsView);
+ } else if (numShardsProvider != null) {
+ minShardsNeeded = numShardsProvider.get();
+ } else {
+ minShardsNeeded = 1;
+ }
+ int extraShardsNeeded = minShardsNeeded - results.size();
+ if (extraShardsNeeded > 0) {
+ LOG.info(
+ "Creating {} empty output shards in addition to {} written "
+ + "for a total of {}.",
+ extraShardsNeeded,
+ results.size(),
+ minShardsNeeded);
+ for (int i = 0; i < extraShardsNeeded; ++i) {
+ Writer<OutputT, DestinationT> writer = writeOperation.createWriter();
+ writer.openUnwindowed(
+ UUID.randomUUID().toString(),
+ UNKNOWN_SHARDNUM,
+ sink.getDynamicDestinations().getDefaultDestination());
+ FileResult<DestinationT> emptyWrite = writer.close();
+ results.add(emptyWrite);
+ }
+ LOG.debug("Done creating extra shards.");
+ }
+ writeOperation.finalize(results);
+ LOG.debug("Done finalizing write operation {}", writeOperation);
+ }
+ })
+ .withSideInputs(sideInputs.build()));
}
return PDone.in(input.getPipeline());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunctions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunctions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunctions.java
new file mode 100644
index 0000000..d057d81
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunctions.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.transforms;
+
+/** Useful {@link SerializableFunction} overrides. */
+public class SerializableFunctions {
+ private static class Identity<T> implements SerializableFunction<T, T> {
+ @Override
+ public T apply(T input) {
+ return input;
+ }
+ }
+
+ private static class Constant<InT, OutT> implements SerializableFunction<InT, OutT> {
+ OutT value;
+
+ Constant(OutT value) {
+ this.value = value;
+ }
+
+ @Override
+ public OutT apply(InT input) {
+ return value;
+ }
+ }
+
+ public static <T> SerializableFunction<T, T> identity() {
+ return new Identity<>();
+ }
+
+ public static <InT, OutT> SerializableFunction<InT, OutT> constant(OutT value) {
+ return new Constant<>(value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java
new file mode 100644
index 0000000..e56af13
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.values;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** A key and a shard number. */
+public class ShardedKey<K> implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private final K key;
+ private final int shardNumber;
+
+ public static <K> ShardedKey<K> of(K key, int shardNumber) {
+ return new ShardedKey<>(key, shardNumber);
+ }
+
+ private ShardedKey(K key, int shardNumber) {
+ this.key = key;
+ this.shardNumber = shardNumber;
+ }
+
+ public K getKey() {
+ return key;
+ }
+
+ public int getShardNumber() {
+ return shardNumber;
+ }
+
+ @Override
+ public String toString() {
+ return "key: " + key + " shard: " + shardNumber;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof ShardedKey)) {
+ return false;
+ }
+ ShardedKey<K> other = (ShardedKey<K>) o;
+ return Objects.equals(key, other.key) && Objects.equals(shardNumber, other.shardNumber);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(key, shardNumber);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 6d01d32..260e47a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -54,10 +54,11 @@ import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -276,37 +277,42 @@ public class AvroIOTest {
}
private static class WindowedFilenamePolicy extends FilenamePolicy {
- final String outputFilePrefix;
+ final ResourceId outputFilePrefix;
- WindowedFilenamePolicy(String outputFilePrefix) {
+ WindowedFilenamePolicy(ResourceId outputFilePrefix) {
this.outputFilePrefix = outputFilePrefix;
}
@Override
- public ResourceId windowedFilename(
- ResourceId outputDirectory, WindowedContext input, String extension) {
- String filename = String.format(
- "%s-%s-%s-of-%s-pane-%s%s%s",
- outputFilePrefix,
- input.getWindow(),
- input.getShardNumber(),
- input.getNumShards() - 1,
- input.getPaneInfo().getIndex(),
- input.getPaneInfo().isLast() ? "-final" : "",
- extension);
- return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
+ public ResourceId windowedFilename(WindowedContext input, OutputFileHints outputFileHints) {
+ String filenamePrefix =
+ outputFilePrefix.isDirectory() ? "" : firstNonNull(outputFilePrefix.getFilename(), "");
+
+ String filename =
+ String.format(
+ "%s-%s-%s-of-%s-pane-%s%s%s",
+ filenamePrefix,
+ input.getWindow(),
+ input.getShardNumber(),
+ input.getNumShards() - 1,
+ input.getPaneInfo().getIndex(),
+ input.getPaneInfo().isLast() ? "-final" : "",
+ outputFileHints.getSuggestedFilenameSuffix());
+ return outputFilePrefix
+ .getCurrentDirectory()
+ .resolve(filename, StandardResolveOptions.RESOLVE_FILE);
}
@Override
- public ResourceId unwindowedFilename(
- ResourceId outputDirectory, Context input, String extension) {
+ public ResourceId unwindowedFilename(Context input, OutputFileHints outputFileHints) {
throw new UnsupportedOperationException("Expecting windowed outputs only");
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
- builder.add(DisplayData.item("fileNamePrefix", outputFilePrefix)
- .withLabel("File Name Prefix"));
+ builder.add(
+ DisplayData.item("fileNamePrefix", outputFilePrefix.toString())
+ .withLabel("File Name Prefix"));
}
}
@@ -359,15 +365,18 @@ public class AvroIOTest {
Arrays.copyOfRange(secondWindowArray, 1, secondWindowArray.length))
.advanceWatermarkToInfinity();
- FilenamePolicy policy = new WindowedFilenamePolicy(baseFilename);
+ FilenamePolicy policy =
+ new WindowedFilenamePolicy(FileBasedSink.convertToFileResourceIfPossible(baseFilename));
windowedAvroWritePipeline
.apply(values)
.apply(Window.<GenericClass>into(FixedWindows.of(Duration.standardMinutes(1))))
- .apply(AvroIO.write(GenericClass.class)
- .to(baseFilename)
- .withFilenamePolicy(policy)
- .withWindowedWrites()
- .withNumShards(2));
+ .apply(
+ AvroIO.write(GenericClass.class)
+ .to(policy)
+ .withTempDirectory(
+ StaticValueProvider.of(FileSystems.matchNewResource(baseDir.toString(), true)))
+ .withWindowedWrites()
+ .withNumShards(2));
windowedAvroWritePipeline.run();
// Validate that the data written matches the expected elements in the expected order
@@ -494,13 +503,14 @@ public class AvroIOTest {
expectedFiles.add(
new File(
DefaultFilenamePolicy.constructName(
- outputFilePrefix,
- shardNameTemplate,
- "" /* no suffix */,
- i,
- numShards,
- null,
- null)));
+ FileBasedSink.convertToFileResourceIfPossible(outputFilePrefix),
+ shardNameTemplate,
+ "" /* no suffix */,
+ i,
+ numShards,
+ null,
+ null)
+ .toString()));
}
List<String> actualElements = new ArrayList<>();
@@ -572,15 +582,4 @@ public class AvroIOTest {
assertThat(displayData, hasDisplayItem("numShards", 100));
assertThat(displayData, hasDisplayItem("codec", CodecFactory.snappyCodec().toString()));
}
-
- @Test
- public void testWindowedWriteRequiresFilenamePolicy() {
- PCollection<String> emptyInput = p.apply(Create.empty(StringUtf8Coder.of()));
- AvroIO.Write write = AvroIO.write(String.class).to("/tmp/some/file").withWindowedWrites();
-
- expectedException.expect(IllegalStateException.class);
- expectedException.expectMessage(
- "When using windowed writes, a filename policy must be set via withFilenamePolicy()");
- emptyInput.apply(write);
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java
index 217420c..9dc6d33 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java
@@ -17,9 +17,9 @@
*/
package org.apache.beam.sdk.io;
-import static org.apache.beam.sdk.io.DefaultFilenamePolicy.constructName;
import static org.junit.Assert.assertEquals;
+import org.apache.beam.sdk.io.fs.ResourceId;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -30,69 +30,108 @@ import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class DefaultFilenamePolicyTest {
+ private static String constructName(
+ String baseFilename,
+ String shardTemplate,
+ String suffix,
+ int shardNum,
+ int numShards,
+ String paneStr,
+ String windowStr) {
+ ResourceId constructed =
+ DefaultFilenamePolicy.constructName(
+ FileSystems.matchNewResource(baseFilename, false),
+ shardTemplate,
+ suffix,
+ shardNum,
+ numShards,
+ paneStr,
+ windowStr);
+ return constructed.toString();
+ }
+
@Test
public void testConstructName() {
- assertEquals("output-001-of-123.txt",
- constructName("output", "-SSS-of-NNN", ".txt", 1, 123, null, null));
+ assertEquals(
+ "/path/to/output-001-of-123.txt",
+ constructName("/path/to/output", "-SSS-of-NNN", ".txt", 1, 123, null, null));
- assertEquals("out.txt/part-00042",
- constructName("out.txt", "/part-SSSSS", "", 42, 100, null, null));
+ assertEquals(
+ "/path/to/out.txt/part-00042",
+ constructName("/path/to/out.txt", "/part-SSSSS", "", 42, 100, null, null));
- assertEquals("out.txt",
- constructName("ou", "t.t", "xt", 1, 1, null, null));
+ assertEquals("/path/to/out.txt", constructName("/path/to/ou", "t.t", "xt", 1, 1, null, null));
- assertEquals("out0102shard.txt",
- constructName("out", "SSNNshard", ".txt", 1, 2, null, null));
+ assertEquals(
+ "/path/to/out0102shard.txt",
+ constructName("/path/to/out", "SSNNshard", ".txt", 1, 2, null, null));
- assertEquals("out-2/1.part-1-of-2.txt",
- constructName("out", "-N/S.part-S-of-N", ".txt", 1, 2, null, null));
+ assertEquals(
+ "/path/to/out-2/1.part-1-of-2.txt",
+ constructName("/path/to/out", "-N/S.part-S-of-N", ".txt", 1, 2, null, null));
}
@Test
public void testConstructNameWithLargeShardCount() {
- assertEquals("out-100-of-5000.txt",
- constructName("out", "-SS-of-NN", ".txt", 100, 5000, null, null));
+ assertEquals(
+ "/out-100-of-5000.txt", constructName("/out", "-SS-of-NN", ".txt", 100, 5000, null, null));
}
@Test
public void testConstructWindowedName() {
- assertEquals("output-001-of-123.txt",
- constructName("output", "-SSS-of-NNN", ".txt", 1, 123, null, null));
-
- assertEquals("output-001-of-123-PPP-W.txt",
- constructName("output", "-SSS-of-NNN-PPP-W", ".txt", 1, 123, null, null));
-
- assertEquals("out.txt/part-00042-myPaneStr-myWindowStr",
- constructName("out.txt", "/part-SSSSS-P-W", "", 42, 100, "myPaneStr",
- "myWindowStr"));
-
- assertEquals("out.txt", constructName("ou", "t.t", "xt", 1, 1, "myPaneStr2",
- "anotherWindowStr"));
-
- assertEquals("out0102shard-oneMoreWindowStr-anotherPaneStr.txt",
- constructName("out", "SSNNshard-W-P", ".txt", 1, 2, "anotherPaneStr",
- "oneMoreWindowStr"));
-
- assertEquals("out-2/1.part-1-of-2-slidingWindow1-myPaneStr3-windowslidingWindow1-"
- + "panemyPaneStr3.txt",
- constructName("out", "-N/S.part-S-of-N-W-P-windowW-paneP", ".txt", 1, 2, "myPaneStr3",
- "slidingWindow1"));
+ assertEquals(
+ "/path/to/output-001-of-123.txt",
+ constructName("/path/to/output", "-SSS-of-NNN", ".txt", 1, 123, null, null));
+
+ assertEquals(
+ "/path/to/output-001-of-123-PPP-W.txt",
+ constructName("/path/to/output", "-SSS-of-NNN-PPP-W", ".txt", 1, 123, null, null));
+
+ assertEquals(
+ "/path/to/out" + ".txt/part-00042-myPaneStr-myWindowStr",
+ constructName(
+ "/path/to/out.txt", "/part-SSSSS-P-W", "", 42, 100, "myPaneStr", "myWindowStr"));
+
+ assertEquals(
+ "/path/to/out.txt",
+ constructName("/path/to/ou", "t.t", "xt", 1, 1, "myPaneStr2", "anotherWindowStr"));
+
+ assertEquals(
+ "/path/to/out0102shard-oneMoreWindowStr-anotherPaneStr.txt",
+ constructName(
+ "/path/to/out", "SSNNshard-W-P", ".txt", 1, 2, "anotherPaneStr", "oneMoreWindowStr"));
+
+ assertEquals(
+ "/out-2/1.part-1-of-2-slidingWindow1-myPaneStr3-windowslidingWindow1-"
+ + "panemyPaneStr3.txt",
+ constructName(
+ "/out",
+ "-N/S.part-S-of-N-W-P-windowW-paneP",
+ ".txt",
+ 1,
+ 2,
+ "myPaneStr3",
+ "slidingWindow1"));
// test first/last pane
- assertEquals("out.txt/part-00042-myWindowStr-pane-11-true-false",
- constructName("out.txt", "/part-SSSSS-W-P", "", 42, 100, "pane-11-true-false",
- "myWindowStr"));
-
- assertEquals("out.txt", constructName("ou", "t.t", "xt", 1, 1, "pane",
- "anotherWindowStr"));
-
- assertEquals("out0102shard-oneMoreWindowStr-pane--1-false-false-pane--1-false-false.txt",
- constructName("out", "SSNNshard-W-P-P", ".txt", 1, 2, "pane--1-false-false",
- "oneMoreWindowStr"));
-
- assertEquals("out-2/1.part-1-of-2-sWindow1-winsWindow1-ppaneL.txt",
- constructName("out",
- "-N/S.part-S-of-N-W-winW-pP", ".txt", 1, 2, "paneL", "sWindow1"));
+ assertEquals(
+ "/out.txt/part-00042-myWindowStr-pane-11-true-false",
+ constructName(
+ "/out.txt", "/part-SSSSS-W-P", "", 42, 100, "pane-11-true-false", "myWindowStr"));
+
+ assertEquals(
+ "/path/to/out.txt",
+ constructName("/path/to/ou", "t.t", "xt", 1, 1, "pane", "anotherWindowStr"));
+
+ assertEquals(
+ "/out0102shard-oneMoreWindowStr-pane--1-false-false-pane--1-false-false.txt",
+ constructName(
+ "/out", "SSNNshard-W-P-P", ".txt", 1, 2, "pane--1-false-false", "oneMoreWindowStr"));
+
+ assertEquals(
+ "/path/to/out-2/1.part-1-of-2-sWindow1-winsWindow1-ppaneL.txt",
+ constructName(
+ "/path/to/out", "-N/S.part-S-of-N-W-winW-pP", ".txt", 1, 2, "paneL", "sWindow1"));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DrunkWritableByteChannelFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DrunkWritableByteChannelFactory.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DrunkWritableByteChannelFactory.java
index 6615a2e..a7644b6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DrunkWritableByteChannelFactory.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DrunkWritableByteChannelFactory.java
@@ -39,7 +39,7 @@ public class DrunkWritableByteChannelFactory implements WritableByteChannelFacto
}
@Override
- public String getFilenameSuffix() {
+ public String getSuggestedFilenameSuffix() {
return ".drunk";
}
http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index caad759..755bb59 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -103,7 +103,7 @@ public class FileBasedSinkTest {
SimpleSink.SimpleWriter writer =
buildWriteOperationWithTempDir(getBaseTempDirectory()).createWriter();
- writer.openUnwindowed(testUid, -1);
+ writer.openUnwindowed(testUid, -1, null);
for (String value : values) {
writer.write(value);
}
@@ -198,23 +198,27 @@ public class FileBasedSinkTest {
throws Exception {
int numFiles = temporaryFiles.size();
- List<FileResult> fileResults = new ArrayList<>();
+ List<FileResult<Void>> fileResults = new ArrayList<>();
// Create temporary output bundles and output File objects.
for (int i = 0; i < numFiles; i++) {
fileResults.add(
- new FileResult(
+ new FileResult<Void>(
LocalResources.fromFile(temporaryFiles.get(i), false),
WriteFiles.UNKNOWN_SHARDNUM,
null,
+ null,
null));
}
writeOp.finalize(fileResults);
- ResourceId outputDirectory = writeOp.getSink().getBaseOutputDirectoryProvider().get();
for (int i = 0; i < numFiles; i++) {
- ResourceId outputFilename = writeOp.getSink().getFilenamePolicy()
- .unwindowedFilename(outputDirectory, new Context(i, numFiles), "");
+ ResourceId outputFilename =
+ writeOp
+ .getSink()
+ .getDynamicDestinations()
+ .getFilenamePolicy(null)
+ .unwindowedFilename(new Context(i, numFiles), CompressionType.UNCOMPRESSED);
assertTrue(new File(outputFilename.toString()).exists());
assertFalse(temporaryFiles.get(i).exists());
}
@@ -231,11 +235,12 @@ public class FileBasedSinkTest {
private void testRemoveTemporaryFiles(int numFiles, ResourceId tempDirectory)
throws Exception {
String prefix = "file";
- SimpleSink sink =
- new SimpleSink(getBaseOutputDirectory(), prefix, "", "");
+ SimpleSink<Void> sink =
+ SimpleSink.makeSimpleSink(
+ getBaseOutputDirectory(), prefix, "", "", CompressionType.UNCOMPRESSED);
- WriteOperation<String> writeOp =
- new SimpleSink.SimpleWriteOperation(sink, tempDirectory);
+ WriteOperation<String, Void> writeOp =
+ new SimpleSink.SimpleWriteOperation<>(sink, tempDirectory);
List<File> temporaryFiles = new ArrayList<>();
List<File> outputFiles = new ArrayList<>();
@@ -272,8 +277,6 @@ public class FileBasedSinkTest {
@Test
public void testCopyToOutputFiles() throws Exception {
SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation();
- ResourceId outputDirectory = writeOp.getSink().getBaseOutputDirectoryProvider().get();
-
List<String> inputFilenames = Arrays.asList("input-1", "input-2", "input-3");
List<String> inputContents = Arrays.asList("1", "2", "3");
List<String> expectedOutputFilenames = Arrays.asList(
@@ -292,9 +295,14 @@ public class FileBasedSinkTest {
File inputTmpFile = tmpFolder.newFile(inputFilenames.get(i));
List<String> lines = Collections.singletonList(inputContents.get(i));
writeFile(lines, inputTmpFile);
- inputFilePaths.put(LocalResources.fromFile(inputTmpFile, false),
- writeOp.getSink().getFilenamePolicy()
- .unwindowedFilename(outputDirectory, new Context(i, inputFilenames.size()), ""));
+ inputFilePaths.put(
+ LocalResources.fromFile(inputTmpFile, false),
+ writeOp
+ .getSink()
+ .getDynamicDestinations()
+ .getFilenamePolicy(null)
+ .unwindowedFilename(
+ new Context(i, inputFilenames.size()), CompressionType.UNCOMPRESSED));
}
// Copy input files to output files.
@@ -311,7 +319,8 @@ public class FileBasedSinkTest {
ResourceId outputDirectory, FilenamePolicy policy, int numFiles) {
List<ResourceId> filenames = new ArrayList<>();
for (int i = 0; i < numFiles; i++) {
- filenames.add(policy.unwindowedFilename(outputDirectory, new Context(i, numFiles), ""));
+ filenames.add(
+ policy.unwindowedFilename(new Context(i, numFiles), CompressionType.UNCOMPRESSED));
}
return filenames;
}
@@ -326,8 +335,10 @@ public class FileBasedSinkTest {
List<ResourceId> actual;
ResourceId root = getBaseOutputDirectory();
- SimpleSink sink = new SimpleSink(root, "file", ".SSSSS.of.NNNNN", ".test");
- FilenamePolicy policy = sink.getFilenamePolicy();
+ SimpleSink<Void> sink =
+ SimpleSink.makeSimpleSink(
+ root, "file", ".SSSSS.of.NNNNN", ".test", CompressionType.UNCOMPRESSED);
+ FilenamePolicy policy = sink.getDynamicDestinations().getFilenamePolicy(null);
expected = Arrays.asList(
root.resolve("file.00000.of.00003.test", StandardResolveOptions.RESOLVE_FILE),
@@ -352,8 +363,9 @@ public class FileBasedSinkTest {
@Test
public void testCollidingOutputFilenames() throws IOException {
ResourceId root = getBaseOutputDirectory();
- SimpleSink sink = new SimpleSink(root, "file", "-NN", "test");
- SimpleSink.SimpleWriteOperation writeOp = new SimpleSink.SimpleWriteOperation(sink);
+ SimpleSink<Void> sink =
+ SimpleSink.makeSimpleSink(root, "file", "-NN", "test", CompressionType.UNCOMPRESSED);
+ SimpleSink.SimpleWriteOperation<Void> writeOp = new SimpleSink.SimpleWriteOperation<>(sink);
ResourceId temp1 = root.resolve("temp1", StandardResolveOptions.RESOLVE_FILE);
ResourceId temp2 = root.resolve("temp2", StandardResolveOptions.RESOLVE_FILE);
@@ -361,11 +373,11 @@ public class FileBasedSinkTest {
ResourceId output = root.resolve("file-03.test", StandardResolveOptions.RESOLVE_FILE);
// More than one shard does.
try {
- Iterable<FileResult> results =
+ Iterable<FileResult<Void>> results =
Lists.newArrayList(
- new FileResult(temp1, 1, null, null),
- new FileResult(temp2, 1, null, null),
- new FileResult(temp3, 1, null, null));
+ new FileResult<Void>(temp1, 1, null, null, null),
+ new FileResult<Void>(temp2, 1, null, null, null),
+ new FileResult<Void>(temp3, 1, null, null, null));
writeOp.buildOutputFilenames(results);
fail("Should have failed.");
} catch (IllegalStateException exn) {
@@ -379,8 +391,10 @@ public class FileBasedSinkTest {
List<ResourceId> expected;
List<ResourceId> actual;
ResourceId root = getBaseOutputDirectory();
- SimpleSink sink = new SimpleSink(root, "file", "-SSSSS-of-NNNNN", "");
- FilenamePolicy policy = sink.getFilenamePolicy();
+ SimpleSink<Void> sink =
+ SimpleSink.makeSimpleSink(
+ root, "file", "-SSSSS-of-NNNNN", "", CompressionType.UNCOMPRESSED);
+ FilenamePolicy policy = sink.getDynamicDestinations().getFilenamePolicy(null);
expected = Arrays.asList(
root.resolve("file-00000-of-00003", StandardResolveOptions.RESOLVE_FILE),
@@ -486,10 +500,11 @@ public class FileBasedSinkTest {
public void testFileBasedWriterWithWritableByteChannelFactory() throws Exception {
final String testUid = "testId";
ResourceId root = getBaseOutputDirectory();
- WriteOperation<String> writeOp =
- new SimpleSink(root, "file", "-SS-of-NN", "txt", new DrunkWritableByteChannelFactory())
+ WriteOperation<String, Void> writeOp =
+ SimpleSink.makeSimpleSink(
+ root, "file", "-SS-of-NN", "txt", new DrunkWritableByteChannelFactory())
.createWriteOperation();
- final Writer<String> writer = writeOp.createWriter();
+ final Writer<String, Void> writer = writeOp.createWriter();
final ResourceId expectedFile =
writeOp.tempDirectory.get().resolve(testUid, StandardResolveOptions.RESOLVE_FILE);
@@ -503,7 +518,7 @@ public class FileBasedSinkTest {
expected.add("footer");
expected.add("footer");
- writer.openUnwindowed(testUid, -1);
+ writer.openUnwindowed(testUid, -1, null);
writer.write("a");
writer.write("b");
final FileResult result = writer.close();
@@ -513,20 +528,20 @@ public class FileBasedSinkTest {
}
/** Build a SimpleSink with default options. */
- private SimpleSink buildSink() {
- return new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", ".test");
+ private SimpleSink<Void> buildSink() {
+ return SimpleSink.makeSimpleSink(
+ getBaseOutputDirectory(), "file", "-SS-of-NN", ".test", CompressionType.UNCOMPRESSED);
}
- /**
- * Build a SimpleWriteOperation with default options and the given temporary directory.
- */
- private SimpleSink.SimpleWriteOperation buildWriteOperationWithTempDir(ResourceId tempDirectory) {
- SimpleSink sink = buildSink();
- return new SimpleSink.SimpleWriteOperation(sink, tempDirectory);
+ /** Build a SimpleWriteOperation with default options and the given temporary directory. */
+ private SimpleSink.SimpleWriteOperation<Void> buildWriteOperationWithTempDir(
+ ResourceId tempDirectory) {
+ SimpleSink<Void> sink = buildSink();
+ return new SimpleSink.SimpleWriteOperation<>(sink, tempDirectory);
}
/** Build a write operation with the default options for it and its parent sink. */
- private SimpleSink.SimpleWriteOperation buildWriteOperation() {
+ private SimpleSink.SimpleWriteOperation<Void> buildWriteOperation() {
return buildSink().createWriteOperation();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
index bdf37f6..9196178 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
@@ -19,37 +19,55 @@ package org.apache.beam.sdk.io;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
+import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.util.MimeTypes;
/**
- * A simple {@link FileBasedSink} that writes {@link String} values as lines with
- * header and footer.
+ * A simple {@link FileBasedSink} that writes {@link String} values as lines with header and footer.
*/
-class SimpleSink extends FileBasedSink<String> {
- public SimpleSink(ResourceId baseOutputDirectory, String prefix, String template, String suffix) {
- this(baseOutputDirectory, prefix, template, suffix, CompressionType.UNCOMPRESSED);
+class SimpleSink<DestinationT> extends FileBasedSink<String, DestinationT> {
+ public SimpleSink(
+ ResourceId tempDirectory,
+ DynamicDestinations<String, DestinationT> dynamicDestinations,
+ WritableByteChannelFactory writableByteChannelFactory) {
+ super(StaticValueProvider.of(tempDirectory), dynamicDestinations, writableByteChannelFactory);
}
- public SimpleSink(ResourceId baseOutputDirectory, String prefix, String template, String suffix,
- WritableByteChannelFactory writableByteChannelFactory) {
- super(
- StaticValueProvider.of(baseOutputDirectory),
- new DefaultFilenamePolicy(StaticValueProvider.of(prefix), template, suffix),
- writableByteChannelFactory);
+ public static SimpleSink<Void> makeSimpleSink(
+ ResourceId tempDirectory, FilenamePolicy filenamePolicy) {
+ return new SimpleSink<>(
+ tempDirectory,
+ DynamicFileDestinations.<String>constant(filenamePolicy),
+ CompressionType.UNCOMPRESSED);
}
- public SimpleSink(ResourceId baseOutputDirectory, FilenamePolicy filenamePolicy) {
- super(StaticValueProvider.of(baseOutputDirectory), filenamePolicy);
+ public static SimpleSink<Void> makeSimpleSink(
+ ResourceId baseDirectory,
+ String prefix,
+ String shardTemplate,
+ String suffix,
+ WritableByteChannelFactory writableByteChannelFactory) {
+ DynamicDestinations<String, Void> dynamicDestinations =
+ DynamicFileDestinations.constant(
+ DefaultFilenamePolicy.fromParams(
+ new Params()
+ .withBaseFilename(
+ baseDirectory.resolve(prefix, StandardResolveOptions.RESOLVE_FILE))
+ .withShardTemplate(shardTemplate)
+ .withSuffix(suffix)));
+ return new SimpleSink<>(baseDirectory, dynamicDestinations, writableByteChannelFactory);
}
@Override
- public SimpleWriteOperation createWriteOperation() {
- return new SimpleWriteOperation(this);
+ public SimpleWriteOperation<DestinationT> createWriteOperation() {
+ return new SimpleWriteOperation<>(this);
}
- static final class SimpleWriteOperation extends WriteOperation<String> {
+ static final class SimpleWriteOperation<DestinationT>
+ extends WriteOperation<String, DestinationT> {
public SimpleWriteOperation(SimpleSink sink, ResourceId tempOutputDirectory) {
super(sink, tempOutputDirectory);
}
@@ -59,12 +77,12 @@ class SimpleSink extends FileBasedSink<String> {
}
@Override
- public SimpleWriter createWriter() throws Exception {
- return new SimpleWriter(this);
+ public SimpleWriter<DestinationT> createWriter() throws Exception {
+ return new SimpleWriter<>(this);
}
}
- static final class SimpleWriter extends Writer<String> {
+ static final class SimpleWriter<DestinationT> extends Writer<String, DestinationT> {
static final String HEADER = "header";
static final String FOOTER = "footer";