You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2017/11/16 07:42:01 UTC
beam git commit: Cherrypick WriteFiles fix.
Repository: beam
Updated Branches:
refs/heads/release-2.2.0 8da5c1b61 -> 55a1124ac
Cherrypick WriteFiles fix.
This closes #4124: [BEAM-3169] Fixes a data loss bug in WriteFiles when used with fire-once triggers
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/55a1124a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/55a1124a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/55a1124a
Branch: refs/heads/release-2.2.0
Commit: 55a1124ac8d39de79d2fd985cec08fcacd7775eb
Parents: 8da5c1b
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Nov 14 19:34:42 2017 -0800
Committer: Reuven Lax <re...@relax-macbookpro2.roam.corp.google.com>
Committed: Wed Nov 15 19:51:52 2017 +0800
----------------------------------------------------------------------
.../apex/translation/ParDoTranslator.java | 2 +-
.../runners/apex/examples/WordCountTest.java | 2 +-
.../construction/WriteFilesTranslationTest.java | 1 -
.../beam/runners/spark/io/AvroPipelineTest.java | 12 +-
.../org/apache/beam/sdk/io/FileBasedSink.java | 168 ++++-----
.../java/org/apache/beam/sdk/io/WriteFiles.java | 375 ++++++++++++-------
.../apache/beam/sdk/io/FileBasedSinkTest.java | 77 ++--
.../org/apache/beam/sdk/io/TextIOWriteTest.java | 84 +++--
.../org/apache/beam/sdk/io/WriteFilesTest.java | 47 ++-
9 files changed, 453 insertions(+), 315 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/55a1124a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
index dd4bd67..6a052d1 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
@@ -213,7 +213,7 @@ class ParDoTranslator<InputT, OutputT>
sideInputCollection.getWindowingStrategy());
}
if (!sideInputCollection.getCoder().equals(firstSideInput.getCoder())) {
- String msg = "Multiple side inputs with different coders.";
+ String msg = context.getFullName() + ": Multiple side inputs with different coders.";
throw new UnsupportedOperationException(msg);
}
sourceCollections.add(context.<PCollection<Object>>getViewInput(sideInput));
http://git-wip-us.apache.org/repos/asf/beam/blob/55a1124a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
index ba75746..e050c15 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
@@ -108,7 +108,7 @@ public class WordCountTest {
.apply(ParDo.of(new ExtractWordsFn()))
.apply(Count.<String>perElement())
.apply(ParDo.of(new FormatAsStringFn()))
- .apply("WriteCounts", TextIO.write().to(options.getOutput()))
+ .apply("WriteCounts", TextIO.write().to(options.getOutput()).withNumShards(2))
;
p.run().waitUntilFinish();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/55a1124a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
index e8eda76..689518a 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
@@ -64,7 +64,6 @@ public class WriteFilesTranslationTest {
public static Iterable<WriteFiles<Object, Void, Object>> data() {
return ImmutableList.of(
WriteFiles.to(new DummySink()),
- WriteFiles.to(new DummySink()).withWindowedWrites(),
WriteFiles.to(new DummySink()).withNumShards(17),
WriteFiles.to(new DummySink()).withWindowedWrites().withNumShards(42));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/55a1124a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
index adde8d2..e17a6b8 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
@@ -47,7 +47,7 @@ import org.junit.rules.TemporaryFolder;
public class AvroPipelineTest {
private File inputFile;
- private File outputDir;
+ private File outputFile;
@Rule
public final TemporaryFolder tmpDir = new TemporaryFolder();
@@ -58,8 +58,7 @@ public class AvroPipelineTest {
@Before
public void setUp() throws IOException {
inputFile = tmpDir.newFile("test.avro");
- outputDir = tmpDir.newFolder("out");
- outputDir.delete();
+ outputFile = new File(tmpDir.getRoot(), "out.avro");
}
@Test
@@ -73,7 +72,10 @@ public class AvroPipelineTest {
PCollection<GenericRecord> input = pipeline.apply(
AvroIO.readGenericRecords(schema).from(inputFile.getAbsolutePath()));
- input.apply(AvroIO.writeGenericRecords(schema).to(outputDir.getAbsolutePath()));
+ input.apply(
+ AvroIO.writeGenericRecords(schema)
+ .to(outputFile.getAbsolutePath())
+ .withoutSharding());
pipeline.run();
List<GenericRecord> records = readGenericFile();
@@ -98,7 +100,7 @@ public class AvroPipelineTest {
List<GenericRecord> records = Lists.newArrayList();
GenericDatumReader<GenericRecord> genericDatumReader = new GenericDatumReader<>();
try (DataFileReader<GenericRecord> dataFileReader =
- new DataFileReader<>(new File(outputDir + "-00000-of-00001"), genericDatumReader)) {
+ new DataFileReader<>(outputFile, genericDatumReader)) {
for (GenericRecord record : dataFileReader) {
records.add(record);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/55a1124a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index ea5129f..f949112 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -44,6 +44,7 @@ import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
@@ -74,6 +75,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
import org.apache.beam.sdk.util.MimeTypes;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors.TypeVariableExtractor;
@@ -123,6 +125,7 @@ import org.slf4j.LoggerFactory;
public abstract class FileBasedSink<UserT, DestinationT, OutputT>
implements Serializable, HasDisplayData {
private static final Logger LOG = LoggerFactory.getLogger(FileBasedSink.class);
+ static final String TEMP_DIRECTORY_PREFIX = ".temp-beam";
/** @deprecated use {@link Compression}. */
@Deprecated
@@ -504,7 +507,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
implements SerializableFunction<ResourceId, ResourceId> {
private static final AtomicLong TEMP_COUNT = new AtomicLong(0);
private static final DateTimeFormatter TEMPDIR_TIMESTAMP =
- DateTimeFormat.forPattern("yyyy-MM-DD_HH-mm-ss");
+ DateTimeFormat.forPattern("yyyy-MM-dd_HH-mm-ss");
// The intent of the code is to have a consistent value of tempDirectory across
// all workers, which wouldn't happen if now() was called inline.
private final String timestamp = Instant.now().toString(TEMPDIR_TIMESTAMP);
@@ -515,7 +518,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
@Override
public ResourceId apply(ResourceId tempDirectory) {
// Temp directory has a timestamp and a unique ID
- String tempDirName = String.format(".temp-beam-%s-%s", timestamp, tempId);
+ String tempDirName = String.format(TEMP_DIRECTORY_PREFIX + "-%s-%s", timestamp, tempId);
return tempDirectory
.getCurrentDirectory()
.resolve(tempDirName, StandardResolveOptions.RESOLVE_DIRECTORY);
@@ -551,30 +554,6 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
this.windowedWrites = windowedWrites;
}
- /**
- * Finalizes writing by copying temporary output files to their final location.
- *
- * <p>Finalization may be overridden by subclass implementations to perform customized
- * finalization (e.g., initiating some operation on output bundles, merging them, etc.). {@code
- * writerResults} contains the filenames of written bundles.
- *
- * <p>If subclasses override this method, they must guarantee that its implementation is
- * idempotent, as it may be executed multiple times in the case of failure or for redundancy. It
- * is a best practice to attempt to try to make this method atomic.
- *
- * <p>Returns the map of temporary files generated to final filenames. Callers must call {@link
- * #removeTemporaryFiles(Set)} to cleanup the temporary files.
- *
- * @param writerResults the results of writes (FileResult).
- */
- public Map<ResourceId, ResourceId> finalize(Iterable<FileResult<DestinationT>> writerResults)
- throws Exception {
- // Collect names of temporary files and copies them.
- Map<ResourceId, ResourceId> outputFilenames = buildOutputFilenames(writerResults);
- copyToOutputFiles(outputFilenames);
- return outputFilenames;
- }
-
/*
* Remove temporary files after finalization.
*
@@ -596,34 +575,52 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
}
@Experimental(Kind.FILESYSTEM)
- protected final Map<ResourceId, ResourceId> buildOutputFilenames(
+ protected final List<KV<FileResult<DestinationT>, ResourceId>> buildOutputFilenames(
+ @Nullable DestinationT dest,
+ @Nullable BoundedWindow window,
+ @Nullable Integer numShards,
Iterable<FileResult<DestinationT>> writerResults) {
- int numShards = Iterables.size(writerResults);
- Map<ResourceId, ResourceId> outputFilenames = Maps.newHashMap();
-
- // Either all results have a shard number set (if the sink is configured with a fixed
- // number of shards), or they all don't (otherwise).
- Boolean isShardNumberSetEverywhere = null;
- for (FileResult<DestinationT> result : writerResults) {
- boolean isShardNumberSetHere = (result.getShard() != UNKNOWN_SHARDNUM);
- if (isShardNumberSetEverywhere == null) {
- isShardNumberSetEverywhere = isShardNumberSetHere;
- } else {
- checkArgument(
- isShardNumberSetEverywhere == isShardNumberSetHere,
- "Found a mix of files with and without shard number set: %s",
- result);
- }
+ for (FileResult<DestinationT> res : writerResults) {
+ checkArgument(
+ Objects.equals(dest, res.getDestination()),
+ "File result has wrong destination: expected %s, got %s",
+ dest, res.getDestination());
+ checkArgument(
+ Objects.equals(window, res.getWindow()),
+ "File result has wrong window: expected %s, got %s",
+ window, res.getWindow());
}
+ List<KV<FileResult<DestinationT>, ResourceId>> outputFilenames = Lists.newArrayList();
- if (isShardNumberSetEverywhere == null) {
- isShardNumberSetEverywhere = true;
+ final int effectiveNumShards;
+ if (numShards != null) {
+ effectiveNumShards = numShards;
+ for (FileResult<DestinationT> res : writerResults) {
+ checkArgument(
+ res.getShard() != UNKNOWN_SHARDNUM,
+ "Fixed sharding into %s shards was specified, "
+ + "but file result %s does not specify a shard",
+ numShards,
+ res);
+ }
+ } else {
+ effectiveNumShards = Iterables.size(writerResults);
+ for (FileResult<DestinationT> res : writerResults) {
+ checkArgument(
+ res.getShard() == UNKNOWN_SHARDNUM,
+ "Runner-chosen sharding was specified, "
+ + "but file result %s explicitly specifies a shard",
+ res);
+ }
}
List<FileResult<DestinationT>> resultsWithShardNumbers = Lists.newArrayList();
- if (isShardNumberSetEverywhere) {
+ if (numShards != null) {
resultsWithShardNumbers = Lists.newArrayList(writerResults);
} else {
+ checkState(
+ !windowedWrites,
+ "When doing windowed writes, shards should have been assigned when writing");
// Sort files for idempotence. Sort by temporary filename.
// Note that this codepath should not be used when processing triggered windows. In the
// case of triggers, the list of FileResult objects in the Finalize iterable is not
@@ -646,23 +643,24 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
}
}
+ Map<ResourceId, FileResult<DestinationT>> distinctFilenames = Maps.newHashMap();
for (FileResult<DestinationT> result : resultsWithShardNumbers) {
checkArgument(
result.getShard() != UNKNOWN_SHARDNUM, "Should have set shard number on %s", result);
- outputFilenames.put(
- result.getTempFilename(),
- result.getDestinationFile(
- getSink().getDynamicDestinations(),
- numShards,
- getSink().getWritableByteChannelFactory()));
+ ResourceId finalFilename = result.getDestinationFile(
+ getSink().getDynamicDestinations(),
+ effectiveNumShards,
+ getSink().getWritableByteChannelFactory());
+ checkArgument(
+ !distinctFilenames.containsKey(finalFilename),
+ "Filename policy must generate unique filenames, but generated the same name %s "
+ + "for file results %s and %s",
+ finalFilename,
+ result,
+ distinctFilenames.get(finalFilename));
+ distinctFilenames.put(finalFilename, result);
+ outputFilenames.add(KV.of(result, finalFilename));
}
-
- int numDistinctShards = new HashSet<>(outputFilenames.values()).size();
- checkState(
- numDistinctShards == outputFilenames.size(),
- "Only generated %s distinct file names for %s files.",
- numDistinctShards,
- outputFilenames.size());
return outputFilenames;
}
@@ -677,20 +675,23 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
* the policy) is "dir/file", the extension is ".txt", and the fileNamingTemplate is
* "-SSS-of-NNN", the contents of A will be copied to dir/file-000-of-003.txt, the contents of B
* will be copied to dir/file-001-of-003.txt, etc.
- *
- * @param filenames the filenames of temporary files.
*/
@VisibleForTesting
@Experimental(Kind.FILESYSTEM)
- final void copyToOutputFiles(Map<ResourceId, ResourceId> filenames) throws IOException {
- int numFiles = filenames.size();
+ final void copyToOutputFiles(
+ List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames) throws IOException {
+ int numFiles = resultsToFinalFilenames.size();
if (numFiles > 0) {
LOG.debug("Copying {} files.", numFiles);
- List<ResourceId> srcFiles = new ArrayList<>(filenames.size());
- List<ResourceId> dstFiles = new ArrayList<>(filenames.size());
- for (Map.Entry<ResourceId, ResourceId> srcDestPair : filenames.entrySet()) {
- srcFiles.add(srcDestPair.getKey());
- dstFiles.add(srcDestPair.getValue());
+ List<ResourceId> srcFiles = new ArrayList<>(resultsToFinalFilenames.size());
+ List<ResourceId> dstFiles = new ArrayList<>(resultsToFinalFilenames.size());
+ for (KV<FileResult<DestinationT>, ResourceId> entry : resultsToFinalFilenames) {
+ srcFiles.add(entry.getKey().getTempFilename());
+ dstFiles.add(entry.getValue());
+ LOG.info(
+ "Will copy temporary file {} to final location {}",
+ entry.getKey().getTempFilename(),
+ entry.getValue());
}
// During a failure case, files may have been deleted in an earlier step. Thus
// we ignore missing files here.
@@ -721,7 +722,10 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
// This may still fail to remove temporary outputs of some failed bundles, but at least
// the common case (where all bundles succeed) is guaranteed to be fully addressed.
- Set<ResourceId> matches = new HashSet<>();
+ Set<ResourceId> allMatches = new HashSet<>(knownFiles);
+ for (ResourceId match : allMatches) {
+ LOG.info("Will remove known temporary file {}", match);
+ }
// TODO: Windows OS cannot resolves and matches '*' in the path,
// ignore the exception for now to avoid failing the pipeline.
if (shouldRemoveTemporaryDirectory) {
@@ -730,28 +734,24 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
Iterables.getOnlyElement(
FileSystems.match(Collections.singletonList(tempDir.toString() + "*")));
for (Metadata matchResult : singleMatch.metadata()) {
- matches.add(matchResult.resourceId());
+ if (allMatches.add(matchResult.resourceId())) {
+ LOG.info("Will also remove unknown temporary file {}", matchResult.resourceId());
+ }
}
} catch (Exception e) {
LOG.warn("Failed to match temporary files under: [{}].", tempDir);
}
}
- Set<ResourceId> allMatches = new HashSet<>(matches);
- allMatches.addAll(knownFiles);
- LOG.debug(
- "Removing {} temporary files found under {} ({} matched glob, {} known files)",
- allMatches.size(),
- tempDir,
- matches.size(),
- allMatches.size() - matches.size());
FileSystems.delete(allMatches, StandardMoveOptions.IGNORE_MISSING_FILES);
- // Deletion of the temporary directory might fail, if not all temporary files are removed.
- try {
- FileSystems.delete(
- Collections.singletonList(tempDir), StandardMoveOptions.IGNORE_MISSING_FILES);
- } catch (Exception e) {
- LOG.warn("Failed to remove temporary directory: [{}].", tempDir);
+ if (shouldRemoveTemporaryDirectory) {
+ // Deletion of the temporary directory might fail, if not all temporary files are removed.
+ try {
+ FileSystems.delete(
+ Collections.singletonList(tempDir), StandardMoveOptions.IGNORE_MISSING_FILES);
+ } catch (Exception e) {
+ LOG.warn("Failed to remove temporary directory: [{}].", tempDir);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/55a1124a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 72ce5d0..f384dd5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -25,6 +25,7 @@ import static com.google.common.base.Preconditions.checkState;
import com.google.common.base.Objects;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
@@ -32,6 +33,7 @@ import com.google.common.collect.Sets;
import com.google.common.hash.Hashing;
import java.io.IOException;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -62,6 +64,8 @@ import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -181,12 +185,13 @@ public class WriteFiles<UserT, DestinationT, OutputT>
checkArgument(windowedWrites,
"Must use windowed writes when applying %s to an unbounded PCollection",
WriteFiles.class.getSimpleName());
+ }
+ if (windowedWrites) {
// The reason for this is https://issues.apache.org/jira/browse/BEAM-1438
// and similar behavior in other runners.
checkArgument(
computeNumShards != null || numShardsProvider != null,
- "When applying %s to an unbounded PCollection, "
- + "must specify number of output shards explicitly",
+ "When using windowed writes, must specify number of output shards explicitly",
WriteFiles.class.getSimpleName());
}
this.writeOperation = sink.createWriteOperation();
@@ -502,15 +507,16 @@ public class WriteFiles<UserT, DestinationT, OutputT>
if (writer == null) {
LOG.debug("Opening writer for write operation {}", writeOperation);
writer = writeOperation.createWriter();
+ int shardNumber =
+ shardNumberAssignment == ShardAssignment.ASSIGN_WHEN_WRITING
+ ? c.element().getKey().getShardNumber()
+ : UNKNOWN_SHARDNUM;
if (windowedWrites) {
- int shardNumber =
- shardNumberAssignment == ShardAssignment.ASSIGN_WHEN_WRITING
- ? c.element().getKey().getShardNumber()
- : UNKNOWN_SHARDNUM;
writer.openWindowed(
UUID.randomUUID().toString(), window, c.pane(), shardNumber, destination);
} else {
- writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, destination);
+ writer.openUnwindowed(
+ UUID.randomUUID().toString(), shardNumber, destination);
}
LOG.debug("Done opening writer");
writers.put(destination, writer);
@@ -532,7 +538,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
throw e;
}
}
- }
+ }
@Override
public void populateDisplayData(DisplayData.Builder builder) {
@@ -615,13 +621,15 @@ public class WriteFiles<UserT, DestinationT, OutputT>
}
}
- Multimap<DestinationT, FileResult<DestinationT>> perDestinationResults(
- Iterable<FileResult<DestinationT>> results) {
- Multimap<DestinationT, FileResult<DestinationT>> perDestination = ArrayListMultimap.create();
+ private static <DestinationT>
+ Multimap<KV<DestinationT, BoundedWindow>, FileResult<DestinationT>>
+ groupByDestinationAndWindow(Iterable<FileResult<DestinationT>> results) {
+ Multimap<KV<DestinationT, BoundedWindow>, FileResult<DestinationT>> res =
+ ArrayListMultimap.create();
for (FileResult<DestinationT> result : results) {
- perDestination.put(result.getDestination(), result);
+ res.put(KV.of(result.getDestination(), result.getWindow()), result);
}
- return perDestination;
+ return res;
}
/**
@@ -752,51 +760,28 @@ public class WriteFiles<UserT, DestinationT, OutputT>
PCollection<KV<DestinationT, String>> outputFilenames;
if (windowedWrites) {
- // When processing streaming windowed writes, results will arrive multiple times. This
- // means we can't share the below implementation that turns the results into a side input,
- // as new data arriving into a side input does not trigger the listening DoFn. Instead
- // we aggregate the result set using a singleton GroupByKey, so the DoFn will be triggered
- // whenever new data arrives.
- PCollection<KV<Void, FileResult<DestinationT>>> keyedResults =
- results.apply(
- "AttachSingletonKey", WithKeys.<Void, FileResult<DestinationT>>of((Void) null));
- keyedResults.setCoder(
- KvCoder.of(VoidCoder.of(), FileResultCoder.of(shardedWindowCoder, destinationCoder)));
-
- // Is the continuation trigger sufficient?
+ // We need to materialize the FileResult's before the renaming stage: this can be done either
+ // via a side input or via a GBK. However, when processing streaming windowed writes, results
+ // will arrive multiple times. This means we can't share the below implementation that turns
+ // the results into a side input, as new data arriving into a side input does not trigger the
+ // listening DoFn. We also can't use a GBK because we need only the materialization, but not
+ // the (potentially lossy, if the user's trigger is lossy) continuation triggering that GBK
+ // would give. So, we use a reshuffle (over a single key to maximize bundling).
outputFilenames =
- keyedResults
- .apply("FinalizeGroupByKey", GroupByKey.<Void, FileResult<DestinationT>>create())
+ results
+ .apply(WithKeys.<Void, FileResult<DestinationT>>of((Void) null))
+ .setCoder(KvCoder.of(VoidCoder.of(), results.getCoder()))
+ .apply("Reshuffle", Reshuffle.<Void, FileResult<DestinationT>>of())
+ .apply(Values.<FileResult<DestinationT>>create())
.apply(
"FinalizeWindowed",
ParDo.of(
- new DoFn<
- KV<Void, Iterable<FileResult<DestinationT>>>,
- KV<DestinationT, String>>() {
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- Set<ResourceId> tempFiles = Sets.newHashSet();
- Multimap<DestinationT, FileResult<DestinationT>> results =
- perDestinationResults(c.element().getValue());
- for (Map.Entry<DestinationT, Collection<FileResult<DestinationT>>> entry :
- results.asMap().entrySet()) {
- LOG.info(
- "Finalizing write operation {} for destination {} num shards: {}.",
- writeOperation,
- entry.getKey(),
- entry.getValue().size());
- Map<ResourceId, ResourceId> finalizeMap =
- writeOperation.finalize(entry.getValue());
- tempFiles.addAll(finalizeMap.keySet());
- for (ResourceId outputFile : finalizeMap.values()) {
- c.output(KV.of(entry.getKey(), outputFile.toString()));
- }
- LOG.debug("Done finalizing write operation for {}.", entry.getKey());
- }
- writeOperation.removeTemporaryFiles(tempFiles);
- LOG.debug("Removed temporary files for {}.", writeOperation);
- }
- }))
+ new FinalizeWindowedFn<DestinationT>(
+ numShardsView, numShardsProvider, writeOperation))
+ .withSideInputs(
+ numShardsView == null
+ ? ImmutableList.<PCollectionView<?>>of()
+ : ImmutableList.of(numShardsView)))
.setCoder(KvCoder.of(destinationCoder, StringUtf8Coder.of()));
} else {
final PCollectionView<Iterable<FileResult<DestinationT>>> resultsView =
@@ -817,58 +802,15 @@ public class WriteFiles<UserT, DestinationT, OutputT>
// set numShards, then all shards will be written out as empty files. For this reason we
// use a side input here.
PCollection<Void> singletonCollection = p.apply(Create.of((Void) null));
- outputFilenames = singletonCollection.apply(
- "FinalizeUnwindowed",
- ParDo.of(
- new DoFn<Void, KV<DestinationT, String>>() {
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- sink.getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
- // We must always output at least 1 shard, and honor user-specified numShards
- // if set.
- int minShardsNeeded;
- if (numShardsView != null) {
- minShardsNeeded = c.sideInput(numShardsView);
- } else if (numShardsProvider != null) {
- minShardsNeeded = numShardsProvider.get();
- } else {
- minShardsNeeded = 1;
- }
- Set<ResourceId> tempFiles = Sets.newHashSet();
- Multimap<DestinationT, FileResult<DestinationT>> perDestination =
- perDestinationResults(c.sideInput(resultsView));
- for (Map.Entry<DestinationT, Collection<FileResult<DestinationT>>> entry :
- perDestination.asMap().entrySet()) {
- Map<ResourceId, ResourceId> finalizeMap = Maps.newHashMap();
- finalizeMap.putAll(
- finalizeForDestinationFillEmptyShards(
- entry.getKey(), entry.getValue(), minShardsNeeded));
- tempFiles.addAll(finalizeMap.keySet());
- for (ResourceId outputFile :finalizeMap.values()) {
- c.output(KV.of(entry.getKey(), outputFile.toString()));
- }
- }
- if (perDestination.isEmpty()) {
- // If there is no input at all, write empty files to the default
- // destination.
- Map<ResourceId, ResourceId> finalizeMap = Maps.newHashMap();
- DestinationT destination =
- getSink().getDynamicDestinations().getDefaultDestination();
- finalizeMap.putAll(
- finalizeForDestinationFillEmptyShards(
- destination,
- Lists.<FileResult<DestinationT>>newArrayList(),
- minShardsNeeded));
- tempFiles.addAll(finalizeMap.keySet());
- for (ResourceId outputFile :finalizeMap.values()) {
- c.output(KV.of(destination, outputFile.toString()));
- }
- }
- writeOperation.removeTemporaryFiles(tempFiles);
- }
- })
- .withSideInputs(finalizeSideInputs.build()))
- .setCoder(KvCoder.of(destinationCoder, StringUtf8Coder.of()));
+ outputFilenames =
+ singletonCollection
+ .apply(
+ "FinalizeUnwindowed",
+ ParDo.of(
+ new FinalizeUnwindowedFn<>(
+ numShardsView, numShardsProvider, resultsView, writeOperation))
+ .withSideInputs(finalizeSideInputs.build()))
+ .setCoder(KvCoder.of(destinationCoder, StringUtf8Coder.of()));
}
TupleTag<KV<DestinationT, String>> perDestinationOutputFilenamesTag =
@@ -879,41 +821,196 @@ public class WriteFiles<UserT, DestinationT, OutputT>
outputFilenames);
}
- /**
- * Finalize a list of files for a single destination. If a minimum number of shards is needed,
- * this function will generate empty files for this destination to ensure that all shards are
- * generated.
- */
- private Map<ResourceId, ResourceId> finalizeForDestinationFillEmptyShards(
- DestinationT destination, Collection<FileResult<DestinationT>> results, int minShardsNeeded)
- throws Exception {
- checkState(!windowedWrites);
-
- LOG.info(
- "Finalizing write operation {} for destination {} num shards {}.",
- writeOperation,
- destination,
- results.size());
- int extraShardsNeeded = minShardsNeeded - results.size();
- if (extraShardsNeeded > 0) {
+ private static class FinalizeWindowedFn<DestinationT>
+ extends DoFn<FileResult<DestinationT>, KV<DestinationT, String>> {
+ @Nullable private final PCollectionView<Integer> numShardsView;
+ @Nullable private final ValueProvider<Integer> numShardsProvider;
+ private final WriteOperation<DestinationT, ?> writeOperation;
+
+ @Nullable private transient List<FileResult<DestinationT>> fileResults;
+ @Nullable private Integer fixedNumShards;
+
+ public FinalizeWindowedFn(
+ @Nullable PCollectionView<Integer> numShardsView,
+ @Nullable ValueProvider<Integer> numShardsProvider,
+ WriteOperation<DestinationT, ?> writeOperation) {
+ this.numShardsView = numShardsView;
+ this.numShardsProvider = numShardsProvider;
+ this.writeOperation = writeOperation;
+ }
+
+ @StartBundle
+ public void startBundle() {
+ fileResults = Lists.newArrayList();
+ fixedNumShards = null;
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ fileResults.add(c.element());
+ if (fixedNumShards == null) {
+ if (numShardsView != null) {
+ fixedNumShards = c.sideInput(numShardsView);
+ } else if (numShardsProvider != null) {
+ fixedNumShards = numShardsProvider.get();
+ } else {
+ fixedNumShards = null;
+ }
+ }
+ }
+
+ @FinishBundle
+ public void finishBundle(FinishBundleContext c) throws Exception {
+ Set<ResourceId> tempFiles = Sets.newHashSet();
+ Multimap<KV<DestinationT, BoundedWindow>, FileResult<DestinationT>> results =
+ groupByDestinationAndWindow(fileResults);
+ List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames = Lists.newArrayList();
+ for (Map.Entry<KV<DestinationT, BoundedWindow>, Collection<FileResult<DestinationT>>>
+ destEntry : results.asMap().entrySet()) {
+ DestinationT destination = destEntry.getKey().getKey();
+ BoundedWindow window = destEntry.getKey().getValue();
+ resultsToFinalFilenames.addAll(writeOperation.buildOutputFilenames(
+ destination, window, fixedNumShards, destEntry.getValue()));
+ }
+ LOG.info("Will finalize {} files", resultsToFinalFilenames.size());
+ for (KV<FileResult<DestinationT>, ResourceId> entry : resultsToFinalFilenames) {
+ FileResult<DestinationT> res = entry.getKey();
+ tempFiles.add(res.getTempFilename());
+ c.output(
+ KV.of(res.getDestination(), entry.getValue().toString()),
+ res.getWindow().maxTimestamp(),
+ res.getWindow());
+ }
+ writeOperation.copyToOutputFiles(resultsToFinalFilenames);
+ writeOperation.removeTemporaryFiles(tempFiles);
+ }
+ }
+
+ private static class FinalizeUnwindowedFn<DestinationT>
+ extends DoFn<Void, KV<DestinationT, String>> {
+ @Nullable private final PCollectionView<Integer> numShardsView;
+ @Nullable private final ValueProvider<Integer> numShardsProvider;
+ private final PCollectionView<Iterable<FileResult<DestinationT>>> resultsView;
+ private final WriteOperation<DestinationT, ?> writeOperation;
+
+ public FinalizeUnwindowedFn(
+ @Nullable PCollectionView<Integer> numShardsView,
+ @Nullable ValueProvider<Integer> numShardsProvider,
+ PCollectionView<Iterable<FileResult<DestinationT>>> resultsView,
+ WriteOperation<DestinationT, ?> writeOperation) {
+ this.numShardsView = numShardsView;
+ this.numShardsProvider = numShardsProvider;
+ this.resultsView = resultsView;
+ this.writeOperation = writeOperation;
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ writeOperation.getSink().getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
+ @Nullable Integer fixedNumShards;
+ if (numShardsView != null) {
+ fixedNumShards = c.sideInput(numShardsView);
+ } else if (numShardsProvider != null) {
+ fixedNumShards = numShardsProvider.get();
+ } else {
+ fixedNumShards = null;
+ }
+ Multimap<DestinationT, FileResult<DestinationT>> resultsByDestMultimap =
+ ArrayListMultimap.create();
+ for (FileResult<DestinationT> result : c.sideInput(resultsView)) {
+ resultsByDestMultimap.put(result.getDestination(), result);
+ }
+ Map<DestinationT, Collection<FileResult<DestinationT>>> resultsByDest =
+ resultsByDestMultimap.asMap();
+ if (resultsByDest.isEmpty()) {
+ Collection<FileResult<DestinationT>> empty = ImmutableList.of();
+ resultsByDest =
+ Collections.singletonMap(
+ writeOperation.getSink().getDynamicDestinations().getDefaultDestination(), empty);
+ }
+ List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames = Lists.newArrayList();
+ for (Map.Entry<DestinationT, Collection<FileResult<DestinationT>>>
+ destEntry : resultsByDest.entrySet()) {
+ resultsToFinalFilenames.addAll(
+ finalizeForDestinationFillEmptyShards(
+ destEntry.getKey(), fixedNumShards, destEntry.getValue()));
+ }
+ Set<ResourceId> tempFiles = Sets.newHashSet();
+ for (KV<FileResult<DestinationT>, ResourceId> entry :
+ resultsToFinalFilenames) {
+ tempFiles.add(entry.getKey().getTempFilename());
+ c.output(KV.of(entry.getKey().getDestination(), entry.getValue().toString()));
+ }
+ writeOperation.copyToOutputFiles(resultsToFinalFilenames);
+ writeOperation.removeTemporaryFiles(tempFiles);
+ }
+
+ /**
+ * Finalize a list of files for a single destination. If a minimum number of shards is needed,
+ * this function will generate empty files for this destination to ensure that all shards are
+ * generated.
+ */
+ private List<KV<FileResult<DestinationT>, ResourceId>> finalizeForDestinationFillEmptyShards(
+ DestinationT destination,
+ @Nullable Integer fixedNumShards,
+ Collection<FileResult<DestinationT>> existingResults)
+ throws Exception {
+ checkState(!writeOperation.windowedWrites);
+
LOG.info(
- "Creating {} empty output shards in addition to {} written "
- + "for a total of {} for destination {}.",
- extraShardsNeeded,
- results.size(),
- minShardsNeeded,
- destination);
- for (int i = 0; i < extraShardsNeeded; ++i) {
- Writer<DestinationT, OutputT> writer = writeOperation.createWriter();
- // Currently this code path is only called in the unwindowed case.
- writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, destination);
- FileResult<DestinationT> emptyWrite = writer.close();
- results.add(emptyWrite);
+ "Finalizing write operation {} for destination {} num shards {}.",
+ writeOperation,
+ destination,
+ existingResults.size());
+ if (fixedNumShards != null) {
+ checkArgument(
+ existingResults.size() <= fixedNumShards,
+ "Fixed sharding into %s shards was specified, but got %s file results",
+ fixedNumShards,
+ existingResults.size());
+ }
+ // We must always output at least 1 shard, and honor user-specified numShards
+ // if set.
+ Set<Integer> missingShardNums;
+ if (fixedNumShards == null) {
+ missingShardNums = ImmutableSet.of(UNKNOWN_SHARDNUM);
+ } else {
+ missingShardNums = Sets.newHashSet();
+ for (int i = 0; i < fixedNumShards; ++i) {
+ missingShardNums.add(i);
+ }
+ for (FileResult<DestinationT> res : existingResults) {
+ checkArgument(
+ res.getShard() != UNKNOWN_SHARDNUM,
+ "Fixed sharding into %s shards was specified, "
+ + "but file result %s does not specify a shard",
+ fixedNumShards,
+ res);
+ missingShardNums.remove(res.getShard());
+ }
+ }
+ List<FileResult<DestinationT>> completeResults = Lists.newArrayList(existingResults);
+ if (!missingShardNums.isEmpty()) {
+ LOG.info(
+ "Creating {} empty output shards in addition to {} written for destination {}.",
+ missingShardNums.size(),
+ existingResults.size(),
+ destination);
+ for (int shard : missingShardNums) {
+ Writer<DestinationT, ?> writer = writeOperation.createWriter();
+ // Currently this code path is only called in the unwindowed case.
+ writer.openUnwindowed(UUID.randomUUID().toString(), shard, destination);
+ FileResult<DestinationT> emptyWrite = writer.close();
+ completeResults.add(emptyWrite);
+ }
+ LOG.debug("Done creating extra shards for {}.", destination);
}
- LOG.debug("Done creating extra shards for {}.", destination);
+ return
+ writeOperation.buildOutputFilenames(
+ destination,
+ null,
+ (fixedNumShards == null) ? null : completeResults.size(),
+ completeResults);
}
- Map<ResourceId, ResourceId> finalizeMap = writeOperation.finalize(results);
- LOG.debug("Done finalizing write operation {} for destination {}", writeOperation, destination);
- return finalizeMap;
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/55a1124a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index 0a96b7e..29f3c1b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.io;
+import static org.apache.beam.sdk.io.WriteFiles.UNKNOWN_SHARDNUM;
+import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -25,6 +27,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
@@ -41,9 +44,8 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
+import java.util.Set;
import java.util.zip.GZIPInputStream;
import org.apache.beam.sdk.io.FileBasedSink.CompressionType;
import org.apache.beam.sdk.io.FileBasedSink.FileResult;
@@ -52,6 +54,7 @@ import org.apache.beam.sdk.io.FileBasedSink.WriteOperation;
import org.apache.beam.sdk.io.FileBasedSink.Writer;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.values.KV;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.compress.compressors.deflate.DeflateCompressorInputStream;
import org.junit.Rule;
@@ -97,7 +100,7 @@ public class FileBasedSinkTest {
expected.addAll(values);
expected.add(SimpleSink.SimpleWriter.FOOTER);
- SimpleSink.SimpleWriter writer =
+ SimpleSink.SimpleWriter<Void> writer =
buildWriteOperationWithTempDir(getBaseTempDirectory()).createWriter();
writer.openUnwindowed(testUid, -1, null);
for (String value : values) {
@@ -186,7 +189,7 @@ public class FileBasedSinkTest {
}
/** Finalize and verify that files are copied and temporary files are optionally removed. */
- private void runFinalize(SimpleSink.SimpleWriteOperation writeOp, List<File> temporaryFiles)
+ private void runFinalize(SimpleSink.SimpleWriteOperation<Void> writeOp, List<File> temporaryFiles)
throws Exception {
int numFiles = temporaryFiles.size();
@@ -196,13 +199,21 @@ public class FileBasedSinkTest {
fileResults.add(
new FileResult<Void>(
LocalResources.fromFile(temporaryFiles.get(i), false),
- WriteFiles.UNKNOWN_SHARDNUM,
+ UNKNOWN_SHARDNUM,
null,
null,
null));
}
- writeOp.removeTemporaryFiles(writeOp.finalize(fileResults).keySet());
+ // TODO: test with null first argument?
+ List<KV<FileResult<Void>, ResourceId>> resultsToFinalFilenames =
+ writeOp.buildOutputFilenames(null, null, null, fileResults);
+ Set<ResourceId> tempFiles = Sets.newHashSet();
+ for (KV<FileResult<Void>, ResourceId> res : resultsToFinalFilenames) {
+ tempFiles.add(res.getKey().getTempFilename());
+ }
+ writeOp.copyToOutputFiles(resultsToFinalFilenames);
+ writeOp.removeTemporaryFiles(tempFiles);
for (int i = 0; i < numFiles; i++) {
ResourceId outputFilename =
@@ -263,14 +274,14 @@ public class FileBasedSinkTest {
/** Output files are copied to the destination location with the correct names and contents. */
@Test
public void testCopyToOutputFiles() throws Exception {
- SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation();
+ SimpleSink.SimpleWriteOperation<Void> writeOp = buildWriteOperation();
List<String> inputFilenames = Arrays.asList("input-1", "input-2", "input-3");
List<String> inputContents = Arrays.asList("1", "2", "3");
List<String> expectedOutputFilenames =
Arrays.asList("file-00-of-03.test", "file-01-of-03.test", "file-02-of-03.test");
- Map<ResourceId, ResourceId> inputFilePaths = new HashMap<>();
- List<ResourceId> expectedOutputPaths = new ArrayList<>();
+ List<KV<FileResult<Void>, ResourceId>> resultsToFinalFilenames = Lists.newArrayList();
+ List<ResourceId> expectedOutputPaths = Lists.newArrayList();
for (int i = 0; i < inputFilenames.size(); i++) {
// Generate output paths.
@@ -282,17 +293,20 @@ public class FileBasedSinkTest {
File inputTmpFile = tmpFolder.newFile(inputFilenames.get(i));
List<String> lines = Collections.singletonList(inputContents.get(i));
writeFile(lines, inputTmpFile);
- inputFilePaths.put(
- LocalResources.fromFile(inputTmpFile, false),
- writeOp
- .getSink()
- .getDynamicDestinations()
- .getFilenamePolicy(null)
- .unwindowedFilename(i, inputFilenames.size(), CompressionType.UNCOMPRESSED));
+ ResourceId finalFilename = writeOp
+ .getSink()
+ .getDynamicDestinations()
+ .getFilenamePolicy(null)
+ .unwindowedFilename(i, inputFilenames.size(), CompressionType.UNCOMPRESSED);
+ resultsToFinalFilenames.add(
+ KV.of(
+ new FileResult<Void>(
+ LocalResources.fromFile(inputTmpFile, false), UNKNOWN_SHARDNUM, null, null, null),
+ finalFilename));
}
// Copy input files to output files.
- writeOp.copyToOutputFiles(inputFilePaths);
+ writeOp.copyToOutputFiles(resultsToFinalFilenames);
// Assert that the contents were copied.
for (int i = 0; i < expectedOutputPaths.size(); i++) {
@@ -302,7 +316,7 @@ public class FileBasedSinkTest {
}
public List<ResourceId> generateDestinationFilenames(
- ResourceId outputDirectory, FilenamePolicy policy, int numFiles) {
+ FilenamePolicy policy, int numFiles) {
List<ResourceId> filenames = new ArrayList<>();
for (int i = 0; i < numFiles; i++) {
filenames.add(policy.unwindowedFilename(i, numFiles, CompressionType.UNCOMPRESSED));
@@ -327,17 +341,17 @@ public class FileBasedSinkTest {
root.resolve("file.00000.of.00003.test", StandardResolveOptions.RESOLVE_FILE),
root.resolve("file.00001.of.00003.test", StandardResolveOptions.RESOLVE_FILE),
root.resolve("file.00002.of.00003.test", StandardResolveOptions.RESOLVE_FILE));
- actual = generateDestinationFilenames(root, policy, 3);
+ actual = generateDestinationFilenames(policy, 3);
assertEquals(expected, actual);
expected =
Collections.singletonList(
root.resolve("file.00000.of.00001.test", StandardResolveOptions.RESOLVE_FILE));
- actual = generateDestinationFilenames(root, policy, 1);
+ actual = generateDestinationFilenames(policy, 1);
assertEquals(expected, actual);
expected = new ArrayList<>();
- actual = generateDestinationFilenames(root, policy, 0);
+ actual = generateDestinationFilenames(policy, 0);
assertEquals(expected, actual);
}
@@ -352,18 +366,19 @@ public class FileBasedSinkTest {
ResourceId temp1 = root.resolve("temp1", StandardResolveOptions.RESOLVE_FILE);
ResourceId temp2 = root.resolve("temp2", StandardResolveOptions.RESOLVE_FILE);
ResourceId temp3 = root.resolve("temp3", StandardResolveOptions.RESOLVE_FILE);
- ResourceId output = root.resolve("file-03.test", StandardResolveOptions.RESOLVE_FILE);
// More than one shard does.
try {
Iterable<FileResult<Void>> results =
Lists.newArrayList(
- new FileResult<Void>(temp1, 1, null, null, null),
- new FileResult<Void>(temp2, 1, null, null, null),
- new FileResult<Void>(temp3, 1, null, null, null));
- writeOp.buildOutputFilenames(results);
+ new FileResult<Void>(temp1, 1 /* shard */, null, null, null),
+ new FileResult<Void>(temp2, 1 /* shard */, null, null, null),
+ new FileResult<Void>(temp3, 1 /* shard */, null, null, null));
+ writeOp.buildOutputFilenames(null, null, 5 /* numShards */, results);
fail("Should have failed.");
- } catch (IllegalStateException exn) {
- assertEquals("Only generated 1 distinct file names for 3 files.", exn.getMessage());
+ } catch (IllegalArgumentException exn) {
+ assertThat(exn.getMessage(), containsString("generated the same name"));
+ assertThat(exn.getMessage(), containsString("temp1"));
+ assertThat(exn.getMessage(), containsString("temp2"));
}
}
@@ -383,17 +398,17 @@ public class FileBasedSinkTest {
root.resolve("file-00000-of-00003", StandardResolveOptions.RESOLVE_FILE),
root.resolve("file-00001-of-00003", StandardResolveOptions.RESOLVE_FILE),
root.resolve("file-00002-of-00003", StandardResolveOptions.RESOLVE_FILE));
- actual = generateDestinationFilenames(root, policy, 3);
+ actual = generateDestinationFilenames(policy, 3);
assertEquals(expected, actual);
expected =
Collections.singletonList(
root.resolve("file-00000-of-00001", StandardResolveOptions.RESOLVE_FILE));
- actual = generateDestinationFilenames(root, policy, 1);
+ actual = generateDestinationFilenames(policy, 1);
assertEquals(expected, actual);
expected = new ArrayList<>();
- actual = generateDestinationFilenames(root, policy, 0);
+ actual = generateDestinationFilenames(policy, 0);
assertEquals(expected, actual);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/55a1124a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
index 0f40067..1ade0d0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
@@ -38,12 +38,8 @@ import com.google.common.collect.Lists;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
-import java.io.IOException;
-import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.nio.file.SimpleFileVisitor;
-import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -68,50 +64,28 @@ import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.PCollection;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.joda.time.Duration;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
/** Tests for {@link TextIO.Write}. */
public class TextIOWriteTest {
private static final String MY_HEADER = "myHeader";
private static final String MY_FOOTER = "myFooter";
- private static Path tempFolder;
+ @Rule public transient TemporaryFolder tempFolder = new TemporaryFolder();
- @Rule public TestPipeline p = TestPipeline.create();
+ @Rule public transient TestPipeline p = TestPipeline.create();
- @Rule public ExpectedException expectedException = ExpectedException.none();
-
- @BeforeClass
- public static void setupClass() throws IOException {
- tempFolder = Files.createTempDirectory("TextIOTest");
- }
-
- @AfterClass
- public static void teardownClass() throws IOException {
- Files.walkFileTree(
- tempFolder,
- new SimpleFileVisitor<Path>() {
- @Override
- public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
- throws IOException {
- Files.delete(file);
- return FileVisitResult.CONTINUE;
- }
-
- @Override
- public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
- Files.delete(dir);
- return FileVisitResult.CONTINUE;
- }
- });
- }
+ @Rule public transient ExpectedException expectedException = ExpectedException.none();
static class TestDynamicDestinations
extends FileBasedSink.DynamicDestinations<String, String, String> {
@@ -174,7 +148,9 @@ public class TextIOWriteTest {
public void testDynamicDestinations() throws Exception {
ResourceId baseDir =
FileSystems.matchNewResource(
- Files.createTempDirectory(tempFolder, "testDynamicDestinations").toString(), true);
+ Files.createTempDirectory(tempFolder.getRoot().toPath(), "testDynamicDestinations")
+ .toString(),
+ true);
List<String> elements = Lists.newArrayList("aaaa", "aaab", "baaa", "baab", "caaa", "caab");
PCollection<String> input = p.apply(Create.of(elements).withCoder(StringUtf8Coder.of()));
@@ -262,7 +238,9 @@ public class TextIOWriteTest {
public void testDynamicDefaultFilenamePolicy() throws Exception {
ResourceId baseDir =
FileSystems.matchNewResource(
- Files.createTempDirectory(tempFolder, "testDynamicDestinations").toString(), true);
+ Files.createTempDirectory(tempFolder.getRoot().toPath(), "testDynamicDestinations")
+ .toString(),
+ true);
List<UserWriteType> elements =
Lists.newArrayList(
@@ -371,7 +349,7 @@ public class TextIOWriteTest {
private void runTestWrite(String[] elems, String header, String footer, int numShards)
throws Exception {
String outputName = "file.txt";
- Path baseDir = Files.createTempDirectory(tempFolder, "testwrite");
+ Path baseDir = Files.createTempDirectory(tempFolder.getRoot().toPath(), "testwrite");
ResourceId baseFilename =
FileBasedSink.convertToFileResourceIfPossible(baseDir.resolve(outputName).toString());
@@ -544,7 +522,7 @@ public class TextIOWriteTest {
String outputName = "file.txt";
ResourceId baseDir =
FileSystems.matchNewResource(
- Files.createTempDirectory(tempFolder, "testwrite").toString(), true);
+ Files.createTempDirectory(tempFolder.getRoot().toPath(), "testwrite").toString(), true);
PCollection<String> input = p.apply(Create.of(Arrays.asList(LINES2_ARRAY)).withCoder(coder));
@@ -640,4 +618,34 @@ public class TextIOWriteTest {
p.apply(Create.of("")).apply(TextIO.write().to(options.getOutput()));
}
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testWindowedWritesWithOnceTrigger() throws Throwable {
+ // Tests for https://issues.apache.org/jira/browse/BEAM-3169
+ PCollection<String> data =
+ p.apply(Create.of("0", "1", "2"))
+ .apply(
+ Window.<String>into(FixedWindows.of(Duration.standardSeconds(1)))
+ // According to this trigger, all data should be written.
+ // However, the continuation of this trigger is elementCountAtLeast(1),
+ // so with a buggy implementation that used a GBK before renaming files,
+ // only 1 file would be renamed.
+ .triggering(AfterPane.elementCountAtLeast(3))
+ .withAllowedLateness(Duration.standardMinutes(1))
+ .discardingFiredPanes());
+ PCollection<String> filenames =
+ data.apply(
+ TextIO.write()
+ .to(new File(tempFolder.getRoot(), "windowed-writes").getAbsolutePath())
+ .withNumShards(2)
+ .withWindowedWrites()
+ .<Void>withOutputFilenames())
+ .getPerDestinationOutputFilenames()
+ .apply(Values.<String>create());
+
+ PAssert.that(filenames.apply(TextIO.readAll())).containsInAnyOrder("0", "1", "2");
+
+ p.run();
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/55a1124a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
index e0f7b39..40ae0ea 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
@@ -21,11 +21,14 @@ import static com.google.common.base.MoreObjects.firstNonNull;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFor;
import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
@@ -35,7 +38,6 @@ import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
-import java.nio.file.Paths;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Arrays;
@@ -83,6 +85,7 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.commons.compress.utils.Sets;
+import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;
@@ -156,10 +159,6 @@ public class WriteFilesTest {
}
}
- private String appendToTempFolder(String filename) {
- return Paths.get(tmpFolder.getRoot().getPath(), filename).toString();
- }
-
private String getBaseOutputFilename() {
return getBaseOutputDirectory().resolve("file", StandardResolveOptions.RESOLVE_FILE).toString();
}
@@ -187,7 +186,11 @@ public class WriteFilesTest {
IDENTITY_MAP,
getBaseOutputFilename(),
WriteFiles.to(makeSimpleSink()));
- checkFileContents(getBaseOutputFilename(), Collections.<String>emptyList(), Optional.of(1));
+ checkFileContents(
+ getBaseOutputFilename(),
+ Collections.<String>emptyList(),
+ Optional.of(1),
+ true /* expectRemovedTempDirectory */);
}
/**
@@ -241,7 +244,8 @@ public class WriteFilesTest {
p.run();
- checkFileContents(getBaseOutputFilename(), inputs, Optional.of(3));
+ checkFileContents(
+ getBaseOutputFilename(), inputs, Optional.of(3), true /* expectRemovedTempDirectory */);
}
/**
@@ -314,7 +318,10 @@ public class WriteFilesTest {
inputs,
Window.<String>into(FixedWindows.of(Duration.millis(2))),
getBaseOutputFilename(),
- WriteFiles.to(makeSimpleSink()).withMaxNumWritersPerBundle(2).withWindowedWrites());
+ WriteFiles.to(makeSimpleSink())
+ .withMaxNumWritersPerBundle(2)
+ .withWindowedWrites()
+ .withNumShards(1));
}
public void testBuildWrite() {
@@ -379,11 +386,10 @@ public class WriteFilesTest {
@Test
@Category(NeedsRunner.class)
- public void testUnboundedNeedsSharding() {
+ public void testWindowedWritesNeedSharding() {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(
- "When applying WriteFiles to an unbounded PCollection, "
- + "must specify number of output shards explicitly");
+ "When using windowed writes, must specify number of output shards explicitly");
SimpleSink<Void> sink = makeSimpleSink();
p.apply(Create.of("foo"))
@@ -491,7 +497,11 @@ public class WriteFilesTest {
for (int j = i; j < numInputs; j += 5) {
expected.add("record_" + j);
}
- checkFileContents(base.toString(), expected, Optional.of(numShards));
+ checkFileContents(
+ base.toString(),
+ expected,
+ Optional.of(numShards),
+ bounded /* expectRemovedTempDirectory */);
}
}
@@ -659,14 +669,15 @@ public class WriteFilesTest {
p.run();
Optional<Integer> numShards =
- (write.getNumShards() != null)
+ (write.getNumShards() != null && !write.isWindowedWrites())
? Optional.of(write.getNumShards().get())
: Optional.<Integer>absent();
- checkFileContents(baseName, inputs, numShards);
+ checkFileContents(baseName, inputs, numShards, !write.isWindowedWrites());
}
static void checkFileContents(
- String baseName, List<String> inputs, Optional<Integer> numExpectedShards)
+ String baseName, List<String> inputs, Optional<Integer> numExpectedShards,
+ boolean expectRemovedTempDirectory)
throws IOException {
List<File> outputFiles = Lists.newArrayList();
final String pattern = baseName + "*";
@@ -675,6 +686,7 @@ public class WriteFilesTest {
for (Metadata meta : metadata) {
outputFiles.add(new File(meta.resourceId().toString()));
}
+ assertFalse("Should have produced at least 1 output file", outputFiles.isEmpty());
if (numExpectedShards.isPresent()) {
assertEquals(numExpectedShards.get().intValue(), outputFiles.size());
Pattern shardPattern = Pattern.compile("\\d{4}-of-\\d{4}");
@@ -710,6 +722,11 @@ public class WriteFilesTest {
}
}
assertThat(actual, containsInAnyOrder(inputs.toArray()));
+ if (expectRemovedTempDirectory) {
+ assertThat(
+ Lists.newArrayList(new File(baseName).getParentFile().list()),
+ Matchers.everyItem(not(containsString(FileBasedSink.TEMP_DIRECTORY_PREFIX))));
+ }
}
/** Options for test, exposed for PipelineOptionsFactory. */