You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/12/06 00:29:52 UTC
[beam] 05/13: consolidates windowed/unwindowed finalize fns somewhat
This is an automated email from the ASF dual-hosted git repository.
jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 97df5e703d4a891ab63a40b46c4e87d7c373168b
Author: Eugene Kirpichov <ki...@google.com>
AuthorDate: Wed Nov 15 19:51:10 2017 -0800
consolidates windowed/unwindowed finalize fns somewhat
---
.../java/org/apache/beam/sdk/io/FileBasedSink.java | 138 +++++++++----
.../java/org/apache/beam/sdk/io/WriteFiles.java | 221 ++++++---------------
.../org/apache/beam/sdk/io/FileBasedSinkTest.java | 19 +-
3 files changed, 171 insertions(+), 207 deletions(-)
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index c8bdbfc..5bc84be 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -28,10 +28,12 @@ import static org.apache.beam.sdk.values.TypeDescriptors.extractFromTypeParamete
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -39,6 +41,7 @@ import java.io.Serializable;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
@@ -46,6 +49,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
@@ -72,6 +76,7 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
import org.apache.beam.sdk.util.MimeTypes;
@@ -103,10 +108,9 @@ import org.slf4j.LoggerFactory;
*
* <p>In order to ensure fault-tolerance, a bundle may be executed multiple times (e.g., in the
* event of failure/retry or for redundancy). However, exactly one of these executions will have its
- * result passed to the finalize method. Each call to {@link Writer#open} or {@link
- * Writer#openUnwindowed} is passed a unique <i>bundle id</i> when it is called by the WriteFiles
- * transform, so even redundant or retried bundles will have a unique way of identifying their
- * output.
+ * result passed to the finalize method. Each call to {@link Writer#open} is passed a unique
+ * <i>bundle id</i> when it is called by the WriteFiles transform, so even redundant or retried
+ * bundles will have a unique way of identifying their output.
*
* <p>The bundle id should be used to guarantee that a bundle's output is unique. This uniqueness
* guarantee is important; if a bundle is to be output to a file, for example, the name of the file
@@ -447,7 +451,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
* written,
*
* <ol>
- * <li>{@link WriteOperation#finalize} is given a list of the temporary files containing the
+ * <li>{@link WriteOperation#finalizeDestination} is given a list of the temporary files containing the
* output bundles.
* <li>During finalize, these temporary files are copied to final output locations and named
* according to a file naming template.
@@ -577,17 +581,22 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
* not be cleaned up. Note that {@link WriteFiles} does attempt clean up files if exceptions
* are thrown, however there are still some scenarios where temporary files might be left.
*/
- public void removeTemporaryFiles(Set<ResourceId> filenames) throws IOException {
+ public void removeTemporaryFiles(Collection<ResourceId> filenames) throws IOException {
removeTemporaryFiles(filenames, !windowedWrites);
}
@Experimental(Kind.FILESYSTEM)
- protected final List<KV<FileResult<DestinationT>, ResourceId>> buildOutputFilenames(
+ protected final List<KV<FileResult<DestinationT>, ResourceId>> finalizeDestination(
@Nullable DestinationT dest,
@Nullable BoundedWindow window,
@Nullable Integer numShards,
- Iterable<FileResult<DestinationT>> writerResults) {
- for (FileResult<DestinationT> res : writerResults) {
+ Collection<FileResult<DestinationT>> existingResults) throws Exception {
+ Collection<FileResult<DestinationT>> completeResults =
+ windowedWrites
+ ? existingResults
+ : createMissingEmptyShards(dest, numShards, existingResults);
+
+ for (FileResult<DestinationT> res : completeResults) {
checkArgument(
Objects.equals(dest, res.getDestination()),
"File result has wrong destination: expected %s, got %s",
@@ -602,7 +611,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
final int effectiveNumShards;
if (numShards != null) {
effectiveNumShards = numShards;
- for (FileResult<DestinationT> res : writerResults) {
+ for (FileResult<DestinationT> res : completeResults) {
checkArgument(
res.getShard() != UNKNOWN_SHARDNUM,
"Fixed sharding into %s shards was specified, "
@@ -611,8 +620,8 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
res);
}
} else {
- effectiveNumShards = Iterables.size(writerResults);
- for (FileResult<DestinationT> res : writerResults) {
+ effectiveNumShards = Iterables.size(completeResults);
+ for (FileResult<DestinationT> res : completeResults) {
checkArgument(
res.getShard() == UNKNOWN_SHARDNUM,
"Runner-chosen sharding was specified, "
@@ -623,7 +632,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
List<FileResult<DestinationT>> resultsWithShardNumbers = Lists.newArrayList();
if (numShards != null) {
- resultsWithShardNumbers = Lists.newArrayList(writerResults);
+ resultsWithShardNumbers = Lists.newArrayList(completeResults);
} else {
checkState(
!windowedWrites,
@@ -644,7 +653,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
return firstFilename.compareTo(secondFilename);
}
})
- .sortedCopy(writerResults);
+ .sortedCopy(completeResults);
for (int i = 0; i < sortedByTempFilename.size(); i++) {
resultsWithShardNumbers.add(sortedByTempFilename.get(i).withShard(i));
}
@@ -672,10 +681,71 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
return outputFilenames;
}
+ private Collection<FileResult<DestinationT>> createMissingEmptyShards(
+ @Nullable DestinationT dest,
+ @Nullable Integer numShards,
+ Collection<FileResult<DestinationT>> existingResults)
+ throws Exception {
+ Collection<FileResult<DestinationT>> completeResults;
+ LOG.info("Finalizing for destination {} num shards {}.", dest, existingResults.size());
+ if (numShards != null) {
+ checkArgument(
+ existingResults.size() <= numShards,
+ "Fixed sharding into %s shards was specified, but got %s file results",
+ numShards,
+ existingResults.size());
+ }
+ // We must always output at least 1 shard, and honor user-specified numShards
+ // if set.
+ Set<Integer> missingShardNums;
+ if (numShards == null) {
+ missingShardNums = ImmutableSet.of(UNKNOWN_SHARDNUM);
+ } else {
+ missingShardNums = Sets.newHashSet();
+ for (int i = 0; i < numShards; ++i) {
+ missingShardNums.add(i);
+ }
+ for (FileResult<DestinationT> res : existingResults) {
+ checkArgument(
+ res.getShard() != UNKNOWN_SHARDNUM,
+ "Fixed sharding into %s shards was specified, "
+ + "but file result %s does not specify a shard",
+ numShards,
+ res);
+ missingShardNums.remove(res.getShard());
+ }
+ }
+ completeResults = Lists.newArrayList(existingResults);
+ if (!missingShardNums.isEmpty()) {
+ LOG.info(
+ "Creating {} empty output shards in addition to {} written for destination {}.",
+ missingShardNums.size(),
+ existingResults.size(),
+ dest);
+ for (int shard : missingShardNums) {
+ String uuid = UUID.randomUUID().toString();
+ LOG.info("Opening empty writer {} for destination {}", uuid, dest);
+ Writer<DestinationT, ?> writer = createWriter();
+ // Currently this code path is only called in the unwindowed case.
+ writer.open(uuid, dest);
+ writer.close();
+ completeResults.add(
+ new FileResult<>(
+ writer.getOutputFile(),
+ shard,
+ GlobalWindow.INSTANCE,
+ PaneInfo.ON_TIME_AND_ONLY_FIRING,
+ dest));
+ }
+ LOG.debug("Done creating extra shards for {}.", dest);
+ }
+ return completeResults;
+ }
+
/**
* Copy temporary files to final output filenames using the file naming template.
*
- * <p>Can be called from subclasses that override {@link WriteOperation#finalize}.
+ * <p>Can be called from subclasses that override {@link WriteOperation#finalizeDestination}.
*
* <p>Files will be named according to the {@link FilenamePolicy}. The order of the output files
* will be the same as the sorted order of the input filenames. In other words (when using
@@ -686,40 +756,38 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
*/
@VisibleForTesting
@Experimental(Kind.FILESYSTEM)
- final void copyToOutputFiles(
+ final void moveToOutputFiles(
List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames) throws IOException {
int numFiles = resultsToFinalFilenames.size();
- if (numFiles > 0) {
- LOG.debug("Copying {} files.", numFiles);
- List<ResourceId> srcFiles = new ArrayList<>(resultsToFinalFilenames.size());
- List<ResourceId> dstFiles = new ArrayList<>(resultsToFinalFilenames.size());
- for (KV<FileResult<DestinationT>, ResourceId> entry : resultsToFinalFilenames) {
- srcFiles.add(entry.getKey().getTempFilename());
- dstFiles.add(entry.getValue());
- LOG.info(
- "Will copy temporary file {} to final location {}",
- entry.getKey().getTempFilename(),
- entry.getValue());
- }
- // During a failure case, files may have been deleted in an earlier step. Thus
- // we ignore missing files here.
- FileSystems.copy(srcFiles, dstFiles, StandardMoveOptions.IGNORE_MISSING_FILES);
- } else {
- LOG.info("No output files to write.");
+ LOG.debug("Copying {} files.", numFiles);
+ List<ResourceId> srcFiles = new ArrayList<>(resultsToFinalFilenames.size());
+ List<ResourceId> dstFiles = new ArrayList<>(resultsToFinalFilenames.size());
+ for (KV<FileResult<DestinationT>, ResourceId> entry : resultsToFinalFilenames) {
+ srcFiles.add(entry.getKey().getTempFilename());
+ dstFiles.add(entry.getValue());
+ LOG.info(
+ "Will copy temporary file {} to final location {}",
+ entry.getKey().getTempFilename(),
+ entry.getValue());
}
+ // During a failure case, files may have been deleted in an earlier step. Thus
+ // we ignore missing files here.
+ FileSystems.copy(srcFiles, dstFiles, StandardMoveOptions.IGNORE_MISSING_FILES);
+ removeTemporaryFiles(srcFiles);
}
/**
* Removes temporary output files. Uses the temporary directory to find files to remove.
*
- * <p>Can be called from subclasses that override {@link WriteOperation#finalize}.
+ * <p>Can be called from subclasses that override {@link WriteOperation#finalizeDestination}.
* <b>Note:</b>If finalize is overridden and does <b>not</b> rename or otherwise finalize
* temporary files, this method will remove them.
*/
@VisibleForTesting
@Experimental(Kind.FILESYSTEM)
final void removeTemporaryFiles(
- Set<ResourceId> knownFiles, boolean shouldRemoveTemporaryDirectory) throws IOException {
+ Collection<ResourceId> knownFiles, boolean shouldRemoveTemporaryDirectory)
+ throws IOException {
ResourceId tempDir = tempDirectory.get();
LOG.debug("Removing temporary bundle output files in {}.", tempDir);
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 28ac1a5..9cfabfe 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -26,18 +26,14 @@ import com.google.common.base.Objects;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
import com.google.common.hash.Hashing;
import java.io.IOException;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
@@ -66,6 +62,7 @@ import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
@@ -630,7 +627,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
* FileBasedSink} for a description of writer results)-one for each bundle.
*
* <p>The final do-once ParDo uses a singleton collection asinput and the collection of writer
- * results as a side-input. In this ParDo, {@link WriteOperation#finalize} is called to finalize
+ * results as a side-input. In this ParDo, {@link WriteOperation#finalizeDestination} is called to finalize
* the write.
*
* <p>If the write of any element in the PCollection fails, {@link Writer#close} will be called
@@ -661,11 +658,25 @@ public class WriteFiles<UserT, DestinationT, OutputT>
// PCollection. There is a dependency between this ParDo and the first (the
// WriteOperation PCollection as a side input), so this will happen after the
// initial ParDo.
- PCollectionView<Integer> numShardsView =
+ final PCollectionView<Integer> numShardsView =
(computeNumShards == null) ? null : input.apply(computeNumShards);
List<PCollectionView<Integer>> shardingSideInputs = numShardsView == null
? ImmutableList.<PCollectionView<Integer>>of()
: ImmutableList.of(numShardsView);
+ SerializableFunction<DoFn.ProcessContext, Integer> getFixedNumShards =
+ new SerializableFunction<DoFn.ProcessContext, Integer>() {
+ @Override
+ public Integer apply(DoFn<?, ?>.ProcessContext c) {
+ if (numShardsView != null) {
+ return c.sideInput(numShardsView);
+ } else if (numShardsProvider != null) {
+ return numShardsProvider.get();
+ } else {
+ return null;
+ }
+ }
+ };
+
@SuppressWarnings("unchecked")
Coder<BoundedWindow> shardedWindowCoder =
(Coder<BoundedWindow>) input.getWindowingStrategy().getWindowFn().windowCoder();
@@ -755,16 +766,11 @@ public class WriteFiles<UserT, DestinationT, OutputT>
.apply(Values.<FileResult<DestinationT>>create())
.apply(
"FinalizeWindowed",
- ParDo.of(
- new FinalizeWindowedFn<>(
- numShardsView, numShardsProvider, writeOperation))
- .withSideInputs(
- numShardsView == null
- ? ImmutableList.<PCollectionView<?>>of()
- : ImmutableList.of(numShardsView)))
+ ParDo.of(new FinalizeWindowedFn<>(getFixedNumShards, writeOperation))
+ .withSideInputs(shardingSideInputs))
.setCoder(KvCoder.of(destinationCoder, StringUtf8Coder.of()));
} else {
- final PCollectionView<Iterable<FileResult<DestinationT>>> resultsView =
+ PCollectionView<Iterable<FileResult<DestinationT>>> resultsView =
results.apply(View.<FileResult<DestinationT>>asIterable());
// Finalize the write in another do-once ParDo on the singleton collection containing the
@@ -775,14 +781,13 @@ public class WriteFiles<UserT, DestinationT, OutputT>
// For the non-windowed case, we guarantee that if no data is written but the user has
// set numShards, then all shards will be written out as empty files. For this reason we
// use a side input here.
- PCollection<Void> singletonCollection = p.apply(Create.of((Void) null));
outputFilenames =
- singletonCollection
+ p.apply(Create.of((Void) null))
.apply(
"FinalizeUnwindowed",
ParDo.of(
new FinalizeUnwindowedFn<>(
- numShardsView, numShardsProvider, resultsView, writeOperation))
+ getFixedNumShards, resultsView, writeOperation))
.withSideInputs(
FluentIterable.concat(sideInputs, shardingSideInputs)
.append(resultsView)
@@ -798,19 +803,16 @@ public class WriteFiles<UserT, DestinationT, OutputT>
private static class FinalizeWindowedFn<DestinationT>
extends DoFn<FileResult<DestinationT>, KV<DestinationT, String>> {
- @Nullable private final PCollectionView<Integer> numShardsView;
- @Nullable private final ValueProvider<Integer> numShardsProvider;
+ private final SerializableFunction<DoFn.ProcessContext, Integer> getFixedNumShards;
private final WriteOperation<DestinationT, ?> writeOperation;
@Nullable private transient List<FileResult<DestinationT>> fileResults;
@Nullable private Integer fixedNumShards;
public FinalizeWindowedFn(
- @Nullable PCollectionView<Integer> numShardsView,
- @Nullable ValueProvider<Integer> numShardsProvider,
+ SerializableFunction<DoFn.ProcessContext, Integer> getFixedNumShards,
WriteOperation<DestinationT, ?> writeOperation) {
- this.numShardsView = numShardsView;
- this.numShardsProvider = numShardsProvider;
+ this.getFixedNumShards = getFixedNumShards;
this.writeOperation = writeOperation;
}
@@ -824,58 +826,37 @@ public class WriteFiles<UserT, DestinationT, OutputT>
public void processElement(ProcessContext c) {
fileResults.add(c.element());
if (fixedNumShards == null) {
- if (numShardsView != null) {
- fixedNumShards = c.sideInput(numShardsView);
- } else if (numShardsProvider != null) {
- fixedNumShards = numShardsProvider.get();
- } else {
- throw new IllegalStateException(
- "When finalizing a windowed write, should have set fixed sharding");
- }
+ fixedNumShards = getFixedNumShards.apply(c);
+ checkState(fixedNumShards != null, "Windowed write should have set fixed sharding");
}
}
@FinishBundle
public void finishBundle(FinishBundleContext c) throws Exception {
- Set<ResourceId> tempFiles = Sets.newHashSet();
- Multimap<KV<DestinationT, BoundedWindow>, FileResult<DestinationT>> results =
- groupByDestinationAndWindow(fileResults);
- List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames = Lists.newArrayList();
- for (Map.Entry<KV<DestinationT, BoundedWindow>, Collection<FileResult<DestinationT>>>
- destEntry : results.asMap().entrySet()) {
- DestinationT destination = destEntry.getKey().getKey();
- BoundedWindow window = destEntry.getKey().getValue();
- resultsToFinalFilenames.addAll(writeOperation.buildOutputFilenames(
- destination, window, fixedNumShards, destEntry.getValue()));
- }
- LOG.info("Will finalize {} files", resultsToFinalFilenames.size());
+ List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames =
+ finalizeAllDestinations(writeOperation, fileResults, fixedNumShards);
for (KV<FileResult<DestinationT>, ResourceId> entry : resultsToFinalFilenames) {
FileResult<DestinationT> res = entry.getKey();
- tempFiles.add(res.getTempFilename());
c.output(
KV.of(res.getDestination(), entry.getValue().toString()),
res.getWindow().maxTimestamp(),
res.getWindow());
}
- writeOperation.copyToOutputFiles(resultsToFinalFilenames);
- writeOperation.removeTemporaryFiles(tempFiles);
+ writeOperation.moveToOutputFiles(resultsToFinalFilenames);
}
}
private static class FinalizeUnwindowedFn<DestinationT>
extends DoFn<Void, KV<DestinationT, String>> {
- @Nullable private final PCollectionView<Integer> numShardsView;
- @Nullable private final ValueProvider<Integer> numShardsProvider;
+ private final SerializableFunction<DoFn.ProcessContext, Integer> getFixedNumShards;
private final PCollectionView<Iterable<FileResult<DestinationT>>> resultsView;
private final WriteOperation<DestinationT, ?> writeOperation;
public FinalizeUnwindowedFn(
- @Nullable PCollectionView<Integer> numShardsView,
- @Nullable ValueProvider<Integer> numShardsProvider,
+ SerializableFunction<DoFn.ProcessContext, Integer> getFixedNumShards,
PCollectionView<Iterable<FileResult<DestinationT>>> resultsView,
WriteOperation<DestinationT, ?> writeOperation) {
- this.numShardsView = numShardsView;
- this.numShardsProvider = numShardsProvider;
+ this.getFixedNumShards = getFixedNumShards;
this.resultsView = resultsView;
this.writeOperation = writeOperation;
}
@@ -883,118 +864,40 @@ public class WriteFiles<UserT, DestinationT, OutputT>
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
writeOperation.getSink().getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
- @Nullable Integer fixedNumShards;
- if (numShardsView != null) {
- fixedNumShards = c.sideInput(numShardsView);
- } else if (numShardsProvider != null) {
- fixedNumShards = numShardsProvider.get();
- } else {
- fixedNumShards = null;
- }
- Multimap<DestinationT, FileResult<DestinationT>> resultsByDestMultimap =
- ArrayListMultimap.create();
- for (FileResult<DestinationT> result : c.sideInput(resultsView)) {
- resultsByDestMultimap.put(result.getDestination(), result);
- }
- Map<DestinationT, Collection<FileResult<DestinationT>>> resultsByDest =
- resultsByDestMultimap.asMap();
- if (resultsByDest.isEmpty()) {
- Collection<FileResult<DestinationT>> empty = ImmutableList.of();
- resultsByDest =
- Collections.singletonMap(
- writeOperation.getSink().getDynamicDestinations().getDefaultDestination(), empty);
- }
- List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames = Lists.newArrayList();
- for (Map.Entry<DestinationT, Collection<FileResult<DestinationT>>>
- destEntry : resultsByDest.entrySet()) {
- resultsToFinalFilenames.addAll(
- finalizeForDestinationFillEmptyShards(
- destEntry.getKey(), fixedNumShards, destEntry.getValue()));
- }
- Set<ResourceId> tempFiles = Sets.newHashSet();
- for (KV<FileResult<DestinationT>, ResourceId> entry :
- resultsToFinalFilenames) {
- tempFiles.add(entry.getKey().getTempFilename());
+ List<FileResult<DestinationT>> fileResults = Lists.newArrayList(c.sideInput(resultsView));
+ List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames =
+ fileResults.isEmpty()
+ ? writeOperation.finalizeDestination(
+ writeOperation.getSink().getDynamicDestinations().getDefaultDestination(),
+ GlobalWindow.INSTANCE,
+ getFixedNumShards.apply(c),
+ ImmutableList.<FileResult<DestinationT>>of())
+ : finalizeAllDestinations(writeOperation, fileResults, getFixedNumShards.apply(c));
+ for (KV<FileResult<DestinationT>, ResourceId> entry : resultsToFinalFilenames) {
c.output(KV.of(entry.getKey().getDestination(), entry.getValue().toString()));
}
- writeOperation.copyToOutputFiles(resultsToFinalFilenames);
- writeOperation.removeTemporaryFiles(tempFiles);
+ writeOperation.moveToOutputFiles(resultsToFinalFilenames);
}
+ }
- /**
- * Finalize a list of files for a single destination. If a minimum number of shards is needed,
- * this function will generate empty files for this destination to ensure that all shards are
- * generated.
- */
- private List<KV<FileResult<DestinationT>, ResourceId>> finalizeForDestinationFillEmptyShards(
- DestinationT destination,
- @Nullable Integer fixedNumShards,
- Collection<FileResult<DestinationT>> existingResults)
- throws Exception {
- checkState(!writeOperation.windowedWrites);
-
- LOG.info(
- "Finalizing write operation {} for destination {} num shards {}.",
- writeOperation,
- destination,
- existingResults.size());
- if (fixedNumShards != null) {
- checkArgument(
- existingResults.size() <= fixedNumShards,
- "Fixed sharding into %s shards was specified, but got %s file results",
- fixedNumShards,
- existingResults.size());
- }
- // We must always output at least 1 shard, and honor user-specified numShards
- // if set.
- Set<Integer> missingShardNums;
- if (fixedNumShards == null) {
- missingShardNums = ImmutableSet.of(UNKNOWN_SHARDNUM);
- } else {
- missingShardNums = Sets.newHashSet();
- for (int i = 0; i < fixedNumShards; ++i) {
- missingShardNums.add(i);
- }
- for (FileResult<DestinationT> res : existingResults) {
- checkArgument(
- res.getShard() != UNKNOWN_SHARDNUM,
- "Fixed sharding into %s shards was specified, "
- + "but file result %s does not specify a shard",
+ private static <DestinationT>
+ List<KV<FileResult<DestinationT>, ResourceId>> finalizeAllDestinations(
+ WriteOperation<DestinationT, ?> writeOperation,
+ List<FileResult<DestinationT>> fileResults,
+ Integer fixedNumShards)
+ throws Exception {
+ List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames = Lists.newArrayList();
+ Multimap<KV<DestinationT, BoundedWindow>, FileResult<DestinationT>> resultsByDestMultimap =
+ groupByDestinationAndWindow(fileResults);
+ for (Map.Entry<KV<DestinationT, BoundedWindow>, Collection<FileResult<DestinationT>>>
+ destEntry : resultsByDestMultimap.asMap().entrySet()) {
+ resultsToFinalFilenames.addAll(
+ writeOperation.finalizeDestination(
+ destEntry.getKey().getKey(),
+ destEntry.getKey().getValue(),
fixedNumShards,
- res);
- missingShardNums.remove(res.getShard());
- }
- }
- List<FileResult<DestinationT>> completeResults = Lists.newArrayList(existingResults);
- if (!missingShardNums.isEmpty()) {
- LOG.info(
- "Creating {} empty output shards in addition to {} written for destination {}.",
- missingShardNums.size(),
- existingResults.size(),
- destination);
- for (int shard : missingShardNums) {
- String uuid = UUID.randomUUID().toString();
- LOG.info("Opening empty writer {} for destination {}", uuid, writeOperation, destination);
- Writer<DestinationT, ?> writer = writeOperation.createWriter();
- // Currently this code path is only called in the unwindowed case.
- writer.open(uuid, destination);
- writer.close();
- completeResults.add(
- new FileResult<>(
- writer.getOutputFile(),
- shard,
- GlobalWindow.INSTANCE,
- PaneInfo.ON_TIME_AND_ONLY_FIRING,
- destination));
- }
- LOG.debug("Done creating extra shards for {}.", destination);
- }
- return
- writeOperation.buildOutputFilenames(
- destination,
- GlobalWindow.INSTANCE,
- (fixedNumShards == null) ? null : completeResults.size(),
- completeResults);
+ destEntry.getValue()));
}
+ return resultsToFinalFilenames;
}
}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index f7988bb..561d036 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -27,7 +27,6 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
@@ -45,7 +44,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.Set;
import java.util.zip.GZIPInputStream;
import org.apache.beam.sdk.io.FileBasedSink.CompressionType;
import org.apache.beam.sdk.io.FileBasedSink.FileResult;
@@ -205,13 +203,8 @@ public class FileBasedSinkTest {
// TODO: test with null first argument?
List<KV<FileResult<Void>, ResourceId>> resultsToFinalFilenames =
- writeOp.buildOutputFilenames(null, null, null, fileResults);
- Set<ResourceId> tempFiles = Sets.newHashSet();
- for (KV<FileResult<Void>, ResourceId> res : resultsToFinalFilenames) {
- tempFiles.add(res.getKey().getTempFilename());
- }
- writeOp.copyToOutputFiles(resultsToFinalFilenames);
- writeOp.removeTemporaryFiles(tempFiles);
+ writeOp.finalizeDestination(null, null, null, fileResults);
+ writeOp.moveToOutputFiles(resultsToFinalFilenames);
for (int i = 0; i < numFiles; i++) {
ResourceId outputFilename =
@@ -304,7 +297,7 @@ public class FileBasedSinkTest {
}
// Copy input files to output files.
- writeOp.copyToOutputFiles(resultsToFinalFilenames);
+ writeOp.moveToOutputFiles(resultsToFinalFilenames);
// Assert that the contents were copied.
for (int i = 0; i < expectedOutputPaths.size(); i++) {
@@ -355,7 +348,7 @@ public class FileBasedSinkTest {
/** Reject non-distinct output filenames. */
@Test
- public void testCollidingOutputFilenames() throws IOException {
+ public void testCollidingOutputFilenames() throws Exception {
ResourceId root = getBaseOutputDirectory();
SimpleSink<Void> sink =
SimpleSink.makeSimpleSink(root, "file", "-NN", "test", Compression.UNCOMPRESSED);
@@ -366,12 +359,12 @@ public class FileBasedSinkTest {
ResourceId temp3 = root.resolve("temp3", StandardResolveOptions.RESOLVE_FILE);
// More than one shard does.
try {
- Iterable<FileResult<Void>> results =
+ List<FileResult<Void>> results =
Lists.newArrayList(
new FileResult<Void>(temp1, 1 /* shard */, null, null, null),
new FileResult<Void>(temp2, 1 /* shard */, null, null, null),
new FileResult<Void>(temp3, 1 /* shard */, null, null, null));
- writeOp.buildOutputFilenames(null, null, 5 /* numShards */, results);
+ writeOp.finalizeDestination(null, null, 5 /* numShards */, results);
fail("Should have failed.");
} catch (IllegalArgumentException exn) {
assertThat(exn.getMessage(), containsString("generated the same name"));
--
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.