You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/12/06 00:29:54 UTC
[beam] 07/13: Refactors WriteFiles into sub-transforms
This is an automated email from the ASF dual-hosted git repository.
jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 2f73a9534a709dd1fe775ab31e0598d2d89f123c
Author: Eugene Kirpichov <ki...@google.com>
AuthorDate: Fri Nov 17 12:25:45 2017 -0800
Refactors WriteFiles into sub-transforms
---
.../java/org/apache/beam/sdk/io/WriteFiles.java | 630 +++++++++++----------
1 file changed, 322 insertions(+), 308 deletions(-)
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 87459e9..0a538b1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -24,7 +24,6 @@ import static com.google.common.base.Preconditions.checkState;
import com.google.common.base.Objects;
import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -37,7 +36,6 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
-import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
@@ -47,6 +45,7 @@ import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.ShardedKeyCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
import org.apache.beam.sdk.io.FileBasedSink.FileResult;
import org.apache.beam.sdk.io.FileBasedSink.FileResultCoder;
import org.apache.beam.sdk.io.FileBasedSink.WriteOperation;
@@ -176,53 +175,12 @@ public class WriteFiles<UserT, DestinationT, OutputT>
return PCollectionViews.toAdditionalInputs(sideInputs);
}
- @Override
- public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) {
- if (input.isBounded() == IsBounded.UNBOUNDED) {
- checkArgument(windowedWrites,
- "Must use windowed writes when applying %s to an unbounded PCollection",
- WriteFiles.class.getSimpleName());
- }
- if (windowedWrites) {
- // The reason for this is https://issues.apache.org/jira/browse/BEAM-1438
- // and similar behavior in other runners.
- checkArgument(
- computeNumShards != null || numShardsProvider != null,
- "When using windowed writes, must specify number of output shards explicitly",
- WriteFiles.class.getSimpleName());
- }
- this.writeOperation = sink.createWriteOperation();
- this.writeOperation.setWindowedWrites(windowedWrites);
- return createWrite(input);
- }
-
- @Override
- public void validate(PipelineOptions options) {
- sink.validate(options);
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- builder
- .add(DisplayData.item("sink", sink.getClass()).withLabel("WriteFiles Sink"))
- .include("sink", sink);
- if (getSharding() != null) {
- builder.include("sharding", getSharding());
- } else {
- builder.addIfNotNull(DisplayData.item("numShards", getNumShards())
- .withLabel("Fixed Number of Shards"));
- }
- }
-
/** Returns the {@link FileBasedSink} associated with this PTransform. */
public FileBasedSink<UserT, DestinationT, OutputT> getSink() {
return sink;
}
- /**
- * Returns whether or not to perform windowed writes.
- */
+ /** Returns whether or not to perform windowed writes. */
public boolean isWindowedWrites() {
return windowedWrites;
}
@@ -339,50 +297,189 @@ public class WriteFiles<UserT, DestinationT, OutputT>
sink, computeNumShards, numShardsProvider, true, maxNumWritersPerBundle, sideInputs);
}
- private static class WriterKey<DestinationT> {
- private final BoundedWindow window;
- private final PaneInfo paneInfo;
- private final DestinationT destination;
+ @Override
+ public void validate(PipelineOptions options) {
+ sink.validate(options);
+ }
- WriterKey(BoundedWindow window, PaneInfo paneInfo, DestinationT destination) {
- this.window = window;
- this.paneInfo = paneInfo;
- this.destination = destination;
+ @Override
+ public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) {
+ if (input.isBounded() == IsBounded.UNBOUNDED) {
+ checkArgument(
+ windowedWrites,
+ "Must use windowed writes when applying %s to an unbounded PCollection",
+ WriteFiles.class.getSimpleName());
+ }
+ if (windowedWrites) {
+ // The reason for this is https://issues.apache.org/jira/browse/BEAM-1438
+ // and similar behavior in other runners.
+ checkArgument(
+ computeNumShards != null || numShardsProvider != null,
+ "When using windowed writes, must specify number of output shards explicitly",
+ WriteFiles.class.getSimpleName());
}
+ this.writeOperation = sink.createWriteOperation();
+ this.writeOperation.setWindowedWrites(windowedWrites);
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof WriterKey)) {
- return false;
- }
- WriterKey other = (WriterKey) o;
- return Objects.equal(window, other.window)
- && Objects.equal(paneInfo, other.paneInfo)
- && Objects.equal(destination, other.destination);
+ if (!windowedWrites) {
+ // Re-window the data into the global window and remove any existing triggers.
+ input =
+ input.apply(
+ "RewindowIntoGlobal",
+ Window.<UserT>into(new GlobalWindows())
+ .triggering(DefaultTrigger.of())
+ .discardingFiredPanes());
+ }
+
+ Coder<DestinationT> destinationCoder;
+ try {
+ destinationCoder =
+ getDynamicDestinations()
+ .getDestinationCoderWithDefault(input.getPipeline().getCoderRegistry());
+ destinationCoder.verifyDeterministic();
+ } catch (CannotProvideCoderException | NonDeterministicException e) {
+ throw new RuntimeException(e);
+ }
+ @SuppressWarnings("unchecked")
+ Coder<BoundedWindow> windowCoder =
+ (Coder<BoundedWindow>) input.getWindowingStrategy().getWindowFn().windowCoder();
+ FileResultCoder<DestinationT> fileResultCoder =
+ FileResultCoder.of(windowCoder, destinationCoder);
+
+ PCollectionView<Integer> numShardsView =
+ (computeNumShards == null) ? null : input.apply(computeNumShards);
+
+ PCollection<FileResult<DestinationT>> tempFileResults =
+ (computeNumShards == null && numShardsProvider == null)
+ ? input.apply(
+ "WriteUnshardedBundlesToTempFiles",
+ new WriteUnshardedBundlesToTempFiles(destinationCoder, fileResultCoder))
+ : input.apply(
+ "WriteShardedBundlesToTempFiles",
+ new WriteShardedBundlesToTempFiles(
+ destinationCoder, fileResultCoder, numShardsView));
+
+ return tempFileResults
+ .apply("GatherTempFileResults", new GatherResults<>(fileResultCoder))
+ .apply(
+ "FinalizeTempFileBundles",
+ new FinalizeTempFileBundles(numShardsView, destinationCoder));
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder
+ .add(DisplayData.item("sink", sink.getClass()).withLabel("WriteFiles Sink"))
+ .include("sink", sink);
+ if (getSharding() != null) {
+ builder.include("sharding", getSharding());
+ } else {
+ builder.addIfNotNull(
+ DisplayData.item("numShards", getNumShards()).withLabel("Fixed Number of Shards"));
+ }
+ }
+
+ private DynamicDestinations<UserT, DestinationT, OutputT> getDynamicDestinations() {
+ return (DynamicDestinations<UserT, DestinationT, OutputT>)
+ writeOperation.getSink().getDynamicDestinations();
+ }
+
+ private class GatherResults<ResultT>
+ extends PTransform<PCollection<ResultT>, PCollection<Iterable<ResultT>>> {
+ private final Coder<ResultT> resultCoder;
+
+ private GatherResults(Coder<ResultT> resultCoder) {
+ this.resultCoder = resultCoder;
}
@Override
- public int hashCode() {
- return Objects.hashCode(window, paneInfo, destination);
+ public PCollection<Iterable<ResultT>> expand(PCollection<ResultT> input) {
+ if (windowedWrites) {
+ // Reshuffle the results to make them stable against retries.
+ // Use a single void key to maximize size of bundles for finalization.
+ return input
+ .apply("Add void key", WithKeys.<Void, ResultT>of((Void) null))
+ .apply("Reshuffle", Reshuffle.<Void, ResultT>of())
+ .apply("Drop key", Values.<ResultT>create())
+ .apply("Gather bundles", ParDo.of(new GatherBundlesPerWindowFn<ResultT>()))
+ .setCoder(IterableCoder.of(resultCoder));
+ } else {
+ // Pass results via a side input rather than reshuffle, because we need to get an empty
+ // iterable to finalize if there are no results.
+ return input
+ .getPipeline()
+ .apply(
+ Reify.viewInGlobalWindow(
+ input.apply(View.<ResultT>asIterable()), IterableCoder.of(resultCoder)));
+ }
}
}
- // Hash the destination in a manner that we can then use as a key in a GBK. Since Java's
- // hashCode isn't guaranteed to be stable across machines, we instead serialize the destination
- // and use murmur3_32 to hash it. We enforce that destinationCoder must be deterministic, so
- // this can be used as a key.
- private static <DestinationT> int hashDestination(
- DestinationT destination, Coder<DestinationT> destinationCoder) throws IOException {
- return Hashing.murmur3_32()
- .hashBytes(CoderUtils.encodeToByteArray(destinationCoder, destination))
- .asInt();
+ private class WriteUnshardedBundlesToTempFiles
+ extends PTransform<PCollection<UserT>, PCollection<FileResult<DestinationT>>> {
+ private final Coder<DestinationT> destinationCoder;
+ private final Coder<FileResult<DestinationT>> fileResultCoder;
+
+ private WriteUnshardedBundlesToTempFiles(
+ Coder<DestinationT> destinationCoder, Coder<FileResult<DestinationT>> fileResultCoder) {
+ this.destinationCoder = destinationCoder;
+ this.fileResultCoder = fileResultCoder;
+ }
+
+ @Override
+ public PCollection<FileResult<DestinationT>> expand(PCollection<UserT> input) {
+ TupleTag<FileResult<DestinationT>> writtenRecordsTag = new TupleTag<>("writtenRecords");
+ TupleTag<KV<ShardedKey<Integer>, UserT>> spilledRecordsTag = new TupleTag<>("spilledRecords");
+ PCollectionTuple writeTuple =
+ input.apply(
+ "WriteUnshardedBundles",
+ ParDo.of(
+ new WriteUnshardedTempFilesWithSpillingFn(
+ spilledRecordsTag, destinationCoder))
+ .withSideInputs(sideInputs)
+ .withOutputTags(writtenRecordsTag, TupleTagList.of(spilledRecordsTag)));
+ PCollection<FileResult<DestinationT>> writtenBundleFiles =
+ writeTuple.get(writtenRecordsTag).setCoder(fileResultCoder);
+ // Any "spilled" elements are written using WriteShardedBundles. Assign shard numbers in
+ // finalize to stay consistent with what WriteWindowedBundles does.
+ PCollection<FileResult<DestinationT>> writtenSpilledFiles =
+ writeTuple
+ .get(spilledRecordsTag)
+ .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
+ // Here we group by a synthetic shard number in the range [0, spill factor),
+ // just for the sake of getting some parallelism within each destination when
+ // writing the spilled records, whereas the non-spilled records don't have a shard
+ // number assigned at all. Drop the shard number on the spilled records so that
+ // shard numbers are assigned together to both the spilled and non-spilled files in
+ // finalize.
+ .apply("GroupSpilled", GroupByKey.<ShardedKey<Integer>, UserT>create())
+ .apply(
+ "WriteSpilled",
+ ParDo.of(new WriteShardsIntoTempFilesFn()).withSideInputs(sideInputs))
+ .setCoder(fileResultCoder)
+ .apply(
+ "DropShardNum",
+ ParDo.of(
+ new DoFn<FileResult<DestinationT>, FileResult<DestinationT>>() {
+ @ProcessElement
+ public void process(ProcessContext c) {
+ c.output(c.element().withShard(UNKNOWN_SHARDNUM));
+ }
+ }));
+ return PCollectionList.of(writtenBundleFiles)
+ .and(writtenSpilledFiles)
+ .apply(Flatten.<FileResult<DestinationT>>pCollections())
+ .setCoder(fileResultCoder);
+ }
}
/**
* Writes all the elements in a bundle using a {@link Writer} produced by the {@link
* WriteOperation} associated with the {@link FileBasedSink}.
*/
- private class WriteBundles extends DoFn<UserT, FileResult<DestinationT>> {
+ private class WriteUnshardedTempFilesWithSpillingFn
+ extends DoFn<UserT, FileResult<DestinationT>> {
private final TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittenRecordsTag;
private final Coder<DestinationT> destinationCoder;
@@ -391,7 +488,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
private int spilledShardNum = UNKNOWN_SHARDNUM;
- WriteBundles(
+ WriteUnshardedTempFilesWithSpillingFn(
TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittenRecordsTag,
Coder<DestinationT> destinationCoder) {
this.unwrittenRecordsTag = unwrittenRecordsTag;
@@ -406,14 +503,14 @@ public class WriteFiles<UserT, DestinationT, OutputT>
@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
- sink.getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
+ getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
PaneInfo paneInfo = c.pane();
// If we are doing windowed writes, we need to ensure that we have separate files for
// data in different windows/panes. Similar for dynamic writes, make sure that different
// destinations go to different writers.
// In the case of unwindowed writes, the window and the pane will always be the same, and
// the map will only have a single element.
- DestinationT destination = sink.getDynamicDestinations().getDestination(c.element());
+ DestinationT destination = getDynamicDestinations().getDestination(c.element());
WriterKey<DestinationT> key = new WriterKey<>(window, c.pane(), destination);
Writer<DestinationT, OutputT> writer = writers.get(key);
if (writer == null) {
@@ -444,7 +541,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
return;
}
}
- writeOrClose(writer, getSink().getDynamicDestinations().formatRecord(c.element()));
+ writeOrClose(writer, getDynamicDestinations().formatRecord(c.element()));
}
@FinishBundle
@@ -468,64 +565,6 @@ public class WriteFiles<UserT, DestinationT, OutputT>
window);
}
}
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- builder.delegate(WriteFiles.this);
- }
- }
-
- /*
- * Like {@link WriteBundles}, but where the elements for each shard have been collected into a
- * single iterable.
- */
- private class WriteShardedBundles
- extends DoFn<KV<ShardedKey<Integer>, Iterable<UserT>>, FileResult<DestinationT>> {
- @ProcessElement
- public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
- sink.getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
- // Since we key by a 32-bit hash of the destination, there might be multiple destinations
- // in this iterable. The number of destinations is generally very small (1000s or less), so
- // there will rarely be hash collisions.
- Map<DestinationT, Writer<DestinationT, OutputT>> writers = Maps.newHashMap();
- for (UserT input : c.element().getValue()) {
- DestinationT destination = sink.getDynamicDestinations().getDestination(input);
- Writer<DestinationT, OutputT> writer = writers.get(destination);
- if (writer == null) {
- String uuid = UUID.randomUUID().toString();
- LOG.info(
- "Opening writer {} for window {} pane {} destination {}",
- uuid,
- window,
- c.pane(),
- destination);
- writer = writeOperation.createWriter();
- writer.open(uuid, destination);
- writers.put(destination, writer);
- }
- writeOrClose(writer, getSink().getDynamicDestinations().formatRecord(input));
- }
-
- // Close all writers.
- for (Map.Entry<DestinationT, Writer<DestinationT, OutputT>> entry : writers.entrySet()) {
- Writer<DestinationT, OutputT> writer = entry.getValue();
- try {
- // Close the writer; if this throws let the error propagate.
- writer.close();
- } catch (Exception e) {
- // If anything goes wrong, make sure to delete the temporary file.
- writer.cleanup();
- throw e;
- }
- int shard = c.element().getKey().getShardNumber();
- c.output(new FileResult<>(writer.getOutputFile(), shard, window, c.pane(), entry.getKey()));
- }
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- builder.delegate(WriteFiles.this);
- }
}
private static <DestinationT, OutputT> void writeOrClose(
@@ -549,21 +588,90 @@ public class WriteFiles<UserT, DestinationT, OutputT>
}
}
- private class ApplyShardingKey extends DoFn<UserT, KV<ShardedKey<Integer>, UserT>> {
+ private static class WriterKey<DestinationT> {
+ private final BoundedWindow window;
+ private final PaneInfo paneInfo;
+ private final DestinationT destination;
+
+ WriterKey(BoundedWindow window, PaneInfo paneInfo, DestinationT destination) {
+ this.window = window;
+ this.paneInfo = paneInfo;
+ this.destination = destination;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof WriterKey)) {
+ return false;
+ }
+ WriterKey other = (WriterKey) o;
+ return Objects.equal(window, other.window)
+ && Objects.equal(paneInfo, other.paneInfo)
+ && Objects.equal(destination, other.destination);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(window, paneInfo, destination);
+ }
+ }
+
+ // Hash the destination in a manner that we can then use as a key in a GBK. Since Java's
+ // hashCode isn't guaranteed to be stable across machines, we instead serialize the destination
+ // and use murmur3_32 to hash it. We enforce that destinationCoder must be deterministic, so
+ // this can be used as a key.
+ private static <DestinationT> int hashDestination(
+ DestinationT destination, Coder<DestinationT> destinationCoder) throws IOException {
+ return Hashing.murmur3_32()
+ .hashBytes(CoderUtils.encodeToByteArray(destinationCoder, destination))
+ .asInt();
+ }
+
+ private class WriteShardedBundlesToTempFiles
+ extends PTransform<PCollection<UserT>, PCollection<FileResult<DestinationT>>> {
+ private final Coder<DestinationT> destinationCoder;
+ private final Coder<FileResult<DestinationT>> fileResultCoder;
+ private final PCollectionView<Integer> numShardsView;
+
+ private WriteShardedBundlesToTempFiles(
+ Coder<DestinationT> destinationCoder,
+ Coder<FileResult<DestinationT>> fileResultCoder,
+ PCollectionView<Integer> numShardsView) {
+ this.destinationCoder = destinationCoder;
+ this.fileResultCoder = fileResultCoder;
+ this.numShardsView = numShardsView;
+ }
+
+ @Override
+ public PCollection<FileResult<DestinationT>> expand(PCollection<UserT> input) {
+ return input
+ .apply(
+ "ApplyShardingKey",
+ ParDo.of(new ApplyShardingKeyFn(numShardsView, destinationCoder))
+ .withSideInputs(
+ numShardsView == null
+ ? ImmutableList.<PCollectionView<Integer>>of()
+ : ImmutableList.of(numShardsView)))
+ .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
+ .apply("GroupIntoShards", GroupByKey.<ShardedKey<Integer>, UserT>create())
+ .apply(
+ "WriteShardsIntoTempFiles",
+ ParDo.of(new WriteShardsIntoTempFilesFn()).withSideInputs(sideInputs))
+ .setCoder(fileResultCoder);
+ }
+ }
+
+ private class ApplyShardingKeyFn extends DoFn<UserT, KV<ShardedKey<Integer>, UserT>> {
private final @Nullable PCollectionView<Integer> numShardsView;
- private final ValueProvider<Integer> numShardsProvider;
private final Coder<DestinationT> destinationCoder;
private int shardNumber;
- ApplyShardingKey(
- PCollectionView<Integer> numShardsView,
- ValueProvider<Integer> numShardsProvider,
- Coder<DestinationT> destinationCoder) {
- this.destinationCoder = destinationCoder;
+ ApplyShardingKeyFn(
+ @Nullable PCollectionView<Integer> numShardsView, Coder<DestinationT> destinationCoder) {
this.numShardsView = numShardsView;
- this.numShardsProvider = numShardsProvider;
- shardNumber = UNKNOWN_SHARDNUM;
+ this.destinationCoder = destinationCoder;
+ this.shardNumber = UNKNOWN_SHARDNUM;
}
@ProcessElement
@@ -595,7 +703,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
// the destinations. This does mean that multiple destinations might end up on the same shard,
// however the number of collisions should be small, so there's no need to worry about memory
// issues.
- DestinationT destination = sink.getDynamicDestinations().getDestination(context.element());
+ DestinationT destination = getDynamicDestinations().getDestination(context.element());
context.output(
KV.of(
ShardedKey.of(hashDestination(destination, destinationCoder), shardNumber),
@@ -603,168 +711,86 @@ public class WriteFiles<UserT, DestinationT, OutputT>
}
}
- private static <DestinationT>
- Multimap<KV<DestinationT, BoundedWindow>, FileResult<DestinationT>>
- groupByDestinationAndWindow(Iterable<FileResult<DestinationT>> results) {
- Multimap<KV<DestinationT, BoundedWindow>, FileResult<DestinationT>> res =
- ArrayListMultimap.create();
- for (FileResult<DestinationT> result : results) {
- res.put(KV.of(result.getDestination(), result.getWindow()), result);
+ private class WriteShardsIntoTempFilesFn
+ extends DoFn<KV<ShardedKey<Integer>, Iterable<UserT>>, FileResult<DestinationT>> {
+ @ProcessElement
+ public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
+ getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
+ // Since we key by a 32-bit hash of the destination, there might be multiple destinations
+ // in this iterable. The number of destinations is generally very small (1000s or less), so
+ // there will rarely be hash collisions.
+ Map<DestinationT, Writer<DestinationT, OutputT>> writers = Maps.newHashMap();
+ for (UserT input : c.element().getValue()) {
+ DestinationT destination = getDynamicDestinations().getDestination(input);
+ Writer<DestinationT, OutputT> writer = writers.get(destination);
+ if (writer == null) {
+ String uuid = UUID.randomUUID().toString();
+ LOG.info(
+ "Opening writer {} for window {} pane {} destination {}",
+ uuid,
+ window,
+ c.pane(),
+ destination);
+ writer = writeOperation.createWriter();
+ writer.open(uuid, destination);
+ writers.put(destination, writer);
+ }
+ writeOrClose(writer, getDynamicDestinations().formatRecord(input));
+ }
+
+ // Close all writers.
+ for (Map.Entry<DestinationT, Writer<DestinationT, OutputT>> entry : writers.entrySet()) {
+ Writer<DestinationT, OutputT> writer = entry.getValue();
+ try {
+ // Close the writer; if this throws let the error propagate.
+ writer.close();
+ } catch (Exception e) {
+ // If anything goes wrong, make sure to delete the temporary file.
+ writer.cleanup();
+ throw e;
+ }
+ int shard = c.element().getKey().getShardNumber();
+ c.output(new FileResult<>(writer.getOutputFile(), shard, window, c.pane(), entry.getKey()));
+ }
}
- return res;
}
- /**
- * A write is performed as sequence of three {@link ParDo}'s.
- *
- * <p>This singleton collection containing the WriteOperation is then used as a side input to a
- * ParDo over the PCollection of elements to write. In this bundle-writing phase, {@link
- * WriteOperation#createWriter} is called to obtain a {@link Writer}. {@link Writer#open} and
- * {@link Writer#close} are called in {@link DoFn.StartBundle} and {@link DoFn.FinishBundle},
- * respectively, and {@link Writer#write} method is called for every element in the bundle. The
- * output of this ParDo is a PCollection of <i>writer result</i> objects (see {@link
- * FileBasedSink} for a description of writer results)-one for each bundle.
- *
- * <p>The final do-once ParDo uses a singleton collection asinput and the collection of writer
- * results as a side-input. In this ParDo, {@link WriteOperation#finalizeDestination} is called to finalize
- * the write.
- *
- * <p>If the write of any element in the PCollection fails, {@link Writer#close} will be called
- * before the exception that caused the write to fail is propagated and the write result will be
- * discarded.
- *
- * <p>Since the {@link WriteOperation} is serialized after the initialization ParDo and
- * deserialized in the bundle-writing and finalization phases, any state change to the
- * WriteOperation object that occurs during initialization is visible in the latter phases.
- * However, the WriteOperation is not serialized after the bundle-writing phase. This is why
- * implementations should guarantee that {@link WriteOperation#createWriter} does not mutate
- * WriteOperation).
- */
- private WriteFilesResult<DestinationT> createWrite(PCollection<UserT> input) {
- Pipeline p = input.getPipeline();
+ private class FinalizeTempFileBundles
+ extends PTransform<
+ PCollection<Iterable<FileResult<DestinationT>>>, WriteFilesResult<DestinationT>> {
+ @Nullable private final PCollectionView<Integer> numShardsView;
+ private final Coder<DestinationT> destinationCoder;
- if (!windowedWrites) {
- // Re-window the data into the global window and remove any existing triggers.
- input =
- input.apply(
- Window.<UserT>into(new GlobalWindows())
- .triggering(DefaultTrigger.of())
- .discardingFiredPanes());
+ private FinalizeTempFileBundles(
+ @Nullable PCollectionView<Integer> numShardsView, Coder<DestinationT> destinationCoder) {
+ this.numShardsView = numShardsView;
+ this.destinationCoder = destinationCoder;
}
- final FileBasedSink.DynamicDestinations<?, DestinationT, OutputT> destinations =
- writeOperation.getSink().getDynamicDestinations();
-
- // Perform the per-bundle writes as a ParDo on the input PCollection (with the
- // WriteOperation as a side input) and collect the results of the writes in a
- // PCollection. There is a dependency between this ParDo and the first (the
- // WriteOperation PCollection as a side input), so this will happen after the
- // initial ParDo.
- final PCollectionView<Integer> numShardsView =
- (computeNumShards == null) ? null : input.apply(computeNumShards);
- List<PCollectionView<Integer>> shardingSideInputs = numShardsView == null
- ? ImmutableList.<PCollectionView<Integer>>of()
- : ImmutableList.of(numShardsView);
+ @Override
+ public WriteFilesResult<DestinationT> expand(
+ PCollection<Iterable<FileResult<DestinationT>>> input) {
- @SuppressWarnings("unchecked")
- Coder<BoundedWindow> shardedWindowCoder =
- (Coder<BoundedWindow>) input.getWindowingStrategy().getWindowFn().windowCoder();
- final Coder<DestinationT> destinationCoder;
- try {
- destinationCoder =
- destinations.getDestinationCoderWithDefault(input.getPipeline().getCoderRegistry());
- destinationCoder.verifyDeterministic();
- } catch (CannotProvideCoderException | NonDeterministicException e) {
- throw new RuntimeException(e);
- }
- final FileResultCoder<DestinationT> fileResultCoder =
- FileResultCoder.of(shardedWindowCoder, destinationCoder);
-
- PCollection<FileResult<DestinationT>> results;
- if (computeNumShards == null && numShardsProvider == null) {
- TupleTag<FileResult<DestinationT>> writtenRecordsTag =
- new TupleTag<>("writtenRecordsTag");
- TupleTag<KV<ShardedKey<Integer>, UserT>> spilledRecordsTag =
- new TupleTag<>("spilledRecordsTag");
- String writeName = windowedWrites ? "WriteWindowedBundles" : "WriteBundles";
- PCollectionTuple writeTuple =
- input.apply(
- writeName,
- ParDo.of(new WriteBundles(spilledRecordsTag, destinationCoder))
- .withSideInputs(sideInputs)
- .withOutputTags(writtenRecordsTag, TupleTagList.of(spilledRecordsTag)));
- PCollection<FileResult<DestinationT>> writtenBundleFiles =
- writeTuple.get(writtenRecordsTag).setCoder(fileResultCoder);
- // Any "spilled" elements are written using WriteShardedBundles. Assign shard numbers in
- // finalize to stay consistent with what WriteWindowedBundles does.
- PCollection<FileResult<DestinationT>> writtenSpilledFiles =
- writeTuple
- .get(spilledRecordsTag)
- .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
- // Here we group by a synthetic shard number in the range [0, spill factor),
- // just for the sake of getting some parallelism within each destination when
- // writing the spilled records, whereas the non-spilled records don't have a shard
- // number assigned at all. Drop the shard number on the spilled records so that
- // shard numbers are assigned together to both the spilled and non-spilled files in
- // finalize.
- .apply("GroupSpilled", GroupByKey.<ShardedKey<Integer>, UserT>create())
- .apply(
- "WriteSpilled", ParDo.of(new WriteShardedBundles()).withSideInputs(sideInputs))
- .setCoder(fileResultCoder)
- .apply("DropShardNum", ParDo.of(
- new DoFn<FileResult<DestinationT>, FileResult<DestinationT>>() {
- @ProcessElement
- public void process(ProcessContext c) {
- c.output(c.element().withShard(UNKNOWN_SHARDNUM));
- }
- }));
- results =
- PCollectionList.of(writtenBundleFiles)
- .and(writtenSpilledFiles)
- .apply(Flatten.<FileResult<DestinationT>>pCollections());
- } else {
- results =
+ List<PCollectionView<?>> finalizeSideInputs = Lists.newArrayList(sideInputs);
+ if (numShardsView != null) {
+ finalizeSideInputs.add(numShardsView);
+ }
+ PCollection<KV<DestinationT, String>> outputFilenames =
input
- .apply(
- "ApplyShardLabel",
- ParDo.of(new ApplyShardingKey(numShardsView, numShardsProvider, destinationCoder))
- .withSideInputs(shardingSideInputs))
- .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
- .apply("GroupIntoShards", GroupByKey.<ShardedKey<Integer>, UserT>create())
- .apply(
- "WriteShardedBundles",
- ParDo.of(new WriteShardedBundles()).withSideInputs(this.sideInputs));
- }
- results.setCoder(fileResultCoder);
+ .apply("Finalize", ParDo.of(new FinalizeFn()).withSideInputs(finalizeSideInputs))
+ .setCoder(KvCoder.of(destinationCoder, StringUtf8Coder.of()));
- PCollection<Iterable<FileResult<DestinationT>>> fileResultBundles;
- if (windowedWrites) {
- // Reshuffle the results to make them stable against retries.
- // Use a single void key to maximize size of bundles for finalization.
- PCollection<FileResult<DestinationT>> stableResults = results
- .apply("Add void key", WithKeys.<Void, FileResult<DestinationT>>of((Void) null))
- .apply("Reshuffle", Reshuffle.<Void, FileResult<DestinationT>>of())
- .apply("Drop key", Values.<FileResult<DestinationT>>create());
- fileResultBundles =
- stableResults
- .apply(
- "Gather bundles",
- ParDo.of(new GatherBundlesPerWindowFn<FileResult<DestinationT>>()))
- .setCoder(IterableCoder.of(fileResultCoder));
- } else {
- // Pass results via a side input rather than reshuffle, because we need to get an empty
- // iterable to finalize if there are no results.
- fileResultBundles =
- p.apply(
- Reify.viewInGlobalWindow(
- results.apply(View.<FileResult<DestinationT>>asIterable()),
- IterableCoder.of(fileResultCoder)));
+ TupleTag<KV<DestinationT, String>> perDestinationOutputFilenamesTag =
+ new TupleTag<>("perDestinationOutputFilenames");
+ return WriteFilesResult.in(
+ input.getPipeline(), perDestinationOutputFilenamesTag, outputFilenames);
}
- class FinalizeFn extends DoFn<Iterable<FileResult<DestinationT>>, KV<DestinationT, String>> {
+ private class FinalizeFn
+ extends DoFn<Iterable<FileResult<DestinationT>>, KV<DestinationT, String>> {
@ProcessElement
public void process(ProcessContext c) throws Exception {
- writeOperation.getSink().getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
+ getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
@Nullable Integer fixedNumShards;
if (numShardsView != null) {
fixedNumShards = c.sideInput(numShardsView);
@@ -776,11 +802,11 @@ public class WriteFiles<UserT, DestinationT, OutputT>
}
List<FileResult<DestinationT>> fileResults = Lists.newArrayList(c.element());
LOG.info("Finalizing {} file results", fileResults.size());
- DestinationT defaultDest = destinations.getDefaultDestination();
+ DestinationT defaultDest = getDynamicDestinations().getDefaultDestination();
List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames =
fileResults.isEmpty()
? writeOperation.finalizeDestination(
- defaultDest, GlobalWindow.INSTANCE, fixedNumShards, fileResults)
+ defaultDest, GlobalWindow.INSTANCE, fixedNumShards, fileResults)
: finalizeAllDestinations(fileResults, fixedNumShards);
for (KV<FileResult<DestinationT>, ResourceId> entry : resultsToFinalFilenames) {
FileResult<DestinationT> res = entry.getKey();
@@ -789,18 +815,6 @@ public class WriteFiles<UserT, DestinationT, OutputT>
writeOperation.moveToOutputFiles(resultsToFinalFilenames);
}
}
-
- List<PCollectionView<?>> sideInputs =
- FluentIterable.concat(this.sideInputs, shardingSideInputs).toList();
- PCollection<KV<DestinationT, String>> outputFilenames =
- fileResultBundles
- .apply("Finalize", ParDo.of(new FinalizeFn()).withSideInputs(sideInputs))
- .setCoder(KvCoder.of(destinationCoder, StringUtf8Coder.of()));
-
- TupleTag<KV<DestinationT, String>> perDestinationOutputFilenamesTag =
- new TupleTag<>("perDestinationOutputFilenames");
- return WriteFilesResult.in(
- input.getPipeline(), perDestinationOutputFilenamesTag, outputFilenames);
}
private List<KV<FileResult<DestinationT>, ResourceId>> finalizeAllDestinations(
--
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.