You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/07/15 18:12:15 UTC
[1/2] beam git commit: Fixes sharding and cleanup for dynamic file
writes
Repository: beam
Updated Branches:
refs/heads/master 47273b969 -> 1f1df2722
Fixes sharding and cleanup for dynamic file writes
- Sharding was applied across all the destinations at the same time,
instead of having each destination produce the requested number
of shards
- Fixes a bunch of issues in cleanup of temporary files in case
of multiple destinations
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ca406636
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ca406636
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ca406636
Branch: refs/heads/master
Commit: ca4066366bca9ed5ec97859e269875573af7adfc
Parents: 47273b9
Author: Reuven Lax <re...@google.com>
Authored: Tue Jul 11 19:19:42 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Sat Jul 15 10:56:54 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/DefaultFilenamePolicy.java | 27 +++++
.../org/apache/beam/sdk/io/FileBasedSink.java | 43 ++++---
.../java/org/apache/beam/sdk/io/WriteFiles.java | 117 ++++++++++++++-----
.../apache/beam/sdk/io/FileBasedSinkTest.java | 2 +-
.../org/apache/beam/sdk/io/WriteFilesTest.java | 66 +++++++++--
5 files changed, 198 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/ca406636/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
index 64d7edc..4021609 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
@@ -20,6 +20,8 @@ package org.apache.beam.sdk.io;
import static com.google.common.base.MoreObjects.firstNonNull;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -139,6 +141,31 @@ public final class DefaultFilenamePolicy extends FilenamePolicy {
public Params withSuffix(String suffix) {
return new Params(baseFilename, shardTemplate, suffix, explicitTemplate);
}
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(baseFilename.get(), shardTemplate, suffix);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof Params)) {
+ return false;
+ }
+ Params other = (Params) o;
+ return baseFilename.get().equals(other.baseFilename.get())
+ && shardTemplate.equals(other.shardTemplate)
+ && suffix.equals(other.suffix);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("baseFilename", baseFilename)
+ .add("shardTemplate", shardTemplate)
+ .add("suffix", suffix)
+ .toString();
+ }
}
/** A Coder for {@link Params}. */
http://git-wip-us.apache.org/repos/asf/beam/blob/ca406636/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index c68b794..9953975 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -218,7 +218,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
implements HasDisplayData, Serializable {
/**
* Returns an object that represents at a high level the destination being written to. May not
- * return null.
+ * return null. A destination must have deterministic hash and equality methods defined.
*/
public abstract DestinationT getDestination(UserT element);
@@ -486,8 +486,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
}
/**
- * Finalizes writing by copying temporary output files to their final location and optionally
- * removing temporary files.
+ * Finalizes writing by copying temporary output files to their final location.
*
* <p>Finalization may be overridden by subclass implementations to perform customized
* finalization (e.g., initiating some operation on output bundles, merging them, etc.). {@code
@@ -497,23 +496,37 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
* idempotent, as it may be executed multiple times in the case of failure or for redundancy. It
* is a best practice to attempt to try to make this method atomic.
*
+ * <p>Returns the set of temporary files generated. Callers must call {@link
+ * #removeTemporaryFiles(Set)} to cleanup these files.
+ *
* @param writerResults the results of writes (FileResult).
*/
- public void finalize(Iterable<FileResult<DestinationT>> writerResults) throws Exception {
- // Collect names of temporary files and rename them.
+ public Set<ResourceId> finalize(Iterable<FileResult<DestinationT>> writerResults)
+ throws Exception {
+ // Collect names of temporary files and copies them.
Map<ResourceId, ResourceId> outputFilenames = buildOutputFilenames(writerResults);
copyToOutputFiles(outputFilenames);
+ return outputFilenames.keySet();
+ }
- // Optionally remove temporary files.
- // We remove the entire temporary directory, rather than specifically removing the files
- // from writerResults, because writerResults includes only successfully completed bundles,
- // and we'd like to clean up the failed ones too.
- // Note that due to GCS eventual consistency, matching files in the temp directory is also
- // currently non-perfect and may fail to delete some files.
- //
- // When windows or triggers are specified, files are generated incrementally so deleting
- // the entire directory in finalize is incorrect.
- removeTemporaryFiles(outputFilenames.keySet(), !windowedWrites);
+ /*
+ * Remove temporary files after finalization.
+ *
+ * <p>In the case where we are doing global-window, untriggered writes, we remove the entire
+ * temporary directory, rather than specifically removing the files from writerResults, because
+ * writerResults includes only successfully completed bundles, and we'd like to clean up the
+ * failed ones too. The reason we remove files here rather than in finalize is that finalize
+ * might be called multiple times (e.g. if the bundle contained multiple destinations), and
+ * deleting the entire directory can't be done until all calls to finalize.
+ *
+ * <p>When windows or triggers are specified, files are generated incrementally so deleting the
+ * entire directory in finalize is incorrect. If windowedWrites is true, we instead delete the
+ * files individually. This means that some temporary files generated by failed bundles might
+ * not be cleaned up. Note that {@link WriteFiles} does attempt clean up files if exceptions
+ * are thrown, however there are still some scenarios where temporary files might be left.
+ */
+ public void removeTemporaryFiles(Set<ResourceId> filenames) throws IOException {
+ removeTemporaryFiles(filenames, !windowedWrites);
}
@Experimental(Kind.FILESYSTEM)
http://git-wip-us.apache.org/repos/asf/beam/blob/ca406636/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 7013044..d8d7478 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -19,15 +19,21 @@ package org.apache.beam.sdk.io;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
import com.google.common.base.Objects;
+import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
import com.google.common.hash.Hashing;
import java.io.IOException;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
@@ -44,6 +50,7 @@ import org.apache.beam.sdk.io.FileBasedSink.FileResult;
import org.apache.beam.sdk.io.FileBasedSink.FileResultCoder;
import org.apache.beam.sdk.io.FileBasedSink.WriteOperation;
import org.apache.beam.sdk.io.FileBasedSink.Writer;
+import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
@@ -594,6 +601,15 @@ public class WriteFiles<UserT, DestinationT, OutputT>
}
}
+ Multimap<DestinationT, FileResult<DestinationT>> perDestinationResults(
+ Iterable<FileResult<DestinationT>> results) {
+ Multimap<DestinationT, FileResult<DestinationT>> perDestination = ArrayListMultimap.create();
+ for (FileResult<DestinationT> result : results) {
+ perDestination.put(result.getDestination(), result);
+ }
+ return perDestination;
+ }
+
/**
* A write is performed as sequence of three {@link ParDo}'s.
*
@@ -737,11 +753,21 @@ public class WriteFiles<UserT, DestinationT, OutputT>
new DoFn<KV<Void, Iterable<FileResult<DestinationT>>>, Integer>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
- LOG.info("Finalizing write operation {}.", writeOperation);
- List<FileResult<DestinationT>> results =
- Lists.newArrayList(c.element().getValue());
- writeOperation.finalize(results);
- LOG.debug("Done finalizing write operation");
+ Set<ResourceId> tempFiles = Sets.newHashSet();
+ Multimap<DestinationT, FileResult<DestinationT>> results =
+ perDestinationResults(c.element().getValue());
+ for (Map.Entry<DestinationT, Collection<FileResult<DestinationT>>> entry :
+ results.asMap().entrySet()) {
+ LOG.info(
+ "Finalizing write operation {} for destination {} num shards: {}.",
+ writeOperation,
+ entry.getKey(),
+ entry.getValue().size());
+ tempFiles.addAll(writeOperation.finalize(entry.getValue()));
+ LOG.debug("Done finalizing write operation for {}.", entry.getKey());
+ }
+ writeOperation.removeTemporaryFiles(tempFiles);
+ LOG.debug("Removed temporary files for {}.", writeOperation);
}
}));
} else {
@@ -769,11 +795,6 @@ public class WriteFiles<UserT, DestinationT, OutputT>
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
LOG.info("Finalizing write operation {}.", writeOperation);
- List<FileResult<DestinationT>> results =
- Lists.newArrayList(c.sideInput(resultsView));
- LOG.debug(
- "Side input initialized to finalize write operation {}.", writeOperation);
-
// We must always output at least 1 shard, and honor user-specified numShards
// if
// set.
@@ -785,31 +806,67 @@ public class WriteFiles<UserT, DestinationT, OutputT>
} else {
minShardsNeeded = 1;
}
- int extraShardsNeeded = minShardsNeeded - results.size();
- if (extraShardsNeeded > 0) {
- LOG.info(
- "Creating {} empty output shards in addition to {} written "
- + "for a total of {}.",
- extraShardsNeeded,
- results.size(),
- minShardsNeeded);
- for (int i = 0; i < extraShardsNeeded; ++i) {
- Writer<OutputT, DestinationT> writer = writeOperation.createWriter();
- writer.openUnwindowed(
- UUID.randomUUID().toString(),
- UNKNOWN_SHARDNUM,
- sink.getDynamicDestinations().getDefaultDestination());
- FileResult<DestinationT> emptyWrite = writer.close();
- results.add(emptyWrite);
- }
- LOG.debug("Done creating extra shards.");
+ Set<ResourceId> tempFiles = Sets.newHashSet();
+ Multimap<DestinationT, FileResult<DestinationT>> perDestination =
+ perDestinationResults(c.sideInput(resultsView));
+ for (Map.Entry<DestinationT, Collection<FileResult<DestinationT>>> entry :
+ perDestination.asMap().entrySet()) {
+ tempFiles.addAll(
+ finalizeForDestinationFillEmptyShards(
+ entry.getKey(), entry.getValue(), minShardsNeeded));
}
- writeOperation.finalize(results);
- LOG.debug("Done finalizing write operation {}", writeOperation);
+ if (perDestination.isEmpty()) {
+ // If there is no input at all, write empty files to the default
+ // destination.
+ tempFiles.addAll(
+ finalizeForDestinationFillEmptyShards(
+ getSink().getDynamicDestinations().getDefaultDestination(),
+ Lists.<FileResult<DestinationT>>newArrayList(),
+ minShardsNeeded));
+ }
+ writeOperation.removeTemporaryFiles(tempFiles);
}
})
.withSideInputs(sideInputs.build()));
}
return PDone.in(input.getPipeline());
}
+
+ /**
+ * Finalize a list of files for a single destination. If a minimum number of shards is needed,
+ * this function will generate empty files for this destination to ensure that all shards are
+ * generated.
+ */
+ private Set<ResourceId> finalizeForDestinationFillEmptyShards(
+ DestinationT destination, Collection<FileResult<DestinationT>> results, int minShardsNeeded)
+ throws Exception {
+ checkState(!windowedWrites);
+
+ LOG.info(
+ "Finalizing write operation {} for destination {} num shards {}.",
+ writeOperation,
+ destination,
+ results.size());
+ int extraShardsNeeded = minShardsNeeded - results.size();
+ if (extraShardsNeeded > 0) {
+ LOG.info(
+ "Creating {} empty output shards in addition to {} written "
+ + "for a total of {} for destination {}.",
+ extraShardsNeeded,
+ results.size(),
+ minShardsNeeded,
+ destination);
+ for (int i = 0; i < extraShardsNeeded; ++i) {
+ Writer<OutputT, DestinationT> writer = writeOperation.createWriter();
+ // Currently this code path is only called in the unwindowed case.
+ writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, destination);
+ FileResult<DestinationT> emptyWrite = writer.close();
+ results.add(emptyWrite);
+ }
+ LOG.debug("Done creating extra shards for {}.", destination);
+ }
+ Set<ResourceId> tempFiles = writeOperation.finalize(results);
+ LOG.debug("Done finalizing write operation {} for destination {}", writeOperation, destination);
+ return tempFiles;
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/ca406636/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index b756778..a6ad746 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -203,7 +203,7 @@ public class FileBasedSinkTest {
null));
}
- writeOp.finalize(fileResults);
+ writeOp.removeTemporaryFiles(writeOp.finalize(fileResults));
for (int i = 0; i < numFiles; i++) {
ResourceId outputFilename =
http://git-wip-us.apache.org/repos/asf/beam/blob/ca406636/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
index 1ca7169..60088de 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
@@ -27,6 +27,7 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
@@ -35,11 +36,15 @@ import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.nio.file.Paths;
+import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
@@ -80,6 +85,7 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.commons.compress.utils.Sets;
import org.joda.time.Duration;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;
@@ -449,16 +455,23 @@ public class WriteFilesTest {
@Test
@Category(NeedsRunner.class)
public void testDynamicDestinationsBounded() throws Exception {
- testDynamicDestinationsHelper(true);
+ testDynamicDestinationsHelper(true, false);
}
@Test
@Category(NeedsRunner.class)
public void testDynamicDestinationsUnbounded() throws Exception {
- testDynamicDestinationsHelper(false);
+ testDynamicDestinationsHelper(false, false);
}
- private void testDynamicDestinationsHelper(boolean bounded) throws IOException {
+ @Test
+ @Category(NeedsRunner.class)
+ public void testDynamicDestinationsFillEmptyShards() throws Exception {
+ testDynamicDestinationsHelper(true, true);
+ }
+
+ private void testDynamicDestinationsHelper(boolean bounded, boolean emptyShards)
+ throws IOException {
TestDestinations dynamicDestinations = new TestDestinations(getBaseOutputDirectory());
SimpleSink<Integer> sink =
new SimpleSink<>(
@@ -469,15 +482,21 @@ public class WriteFilesTest {
options.setTestFlag("test_value");
Pipeline p = TestPipeline.create(options);
- List<String> inputs = Lists.newArrayList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");
+ final int numInputs = 100;
+ List<String> inputs = Lists.newArrayList();
+ for (int i = 0; i < numInputs; ++i) {
+ inputs.add(Integer.toString(i));
+ }
// Prepare timestamps for the elements.
List<Long> timestamps = new ArrayList<>();
for (long i = 0; i < inputs.size(); i++) {
timestamps.add(i + 1);
}
-
+ // If emptyShards==true make numShards larger than the number of elements per destination.
+ // This will force every destination to generate some empty shards.
+ int numShards = emptyShards ? 2 * numInputs / 5 : 2;
WriteFiles<String, Integer, String> writeFiles =
- WriteFiles.to(sink, new TestDynamicFormatFunction()).withNumShards(1);
+ WriteFiles.to(sink, new TestDynamicFormatFunction()).withNumShards(numShards);
PCollection<String> input = p.apply(Create.timestamped(inputs, timestamps));
if (!bounded) {
@@ -492,8 +511,11 @@ public class WriteFilesTest {
for (int i = 0; i < 5; ++i) {
ResourceId base =
getBaseOutputDirectory().resolve("file_" + i, StandardResolveOptions.RESOLVE_FILE);
- List<String> expected = Lists.newArrayList("record_" + i, "record_" + (i + 5));
- checkFileContents(base.toString(), expected, Optional.of(1));
+ List<String> expected = Lists.newArrayList();
+ for (int j = i; j < numInputs; j += 5) {
+ expected.add("record_" + j);
+ }
+ checkFileContents(base.toString(), expected, Optional.of(numShards));
}
}
@@ -599,13 +621,14 @@ public class WriteFilesTest {
BoundedWindow window,
PaneInfo paneInfo,
OutputFileHints outputFileHints) {
+ DecimalFormat df = new DecimalFormat("0000");
IntervalWindow intervalWindow = (IntervalWindow) window;
String filename =
String.format(
"%s-%s-of-%s%s%s",
filenamePrefixForWindow(intervalWindow),
- shardNumber,
- numShards,
+ df.format(shardNumber),
+ df.format(numShards),
outputFileHints.getSuggestedFilenameSuffix(),
suffix);
return baseFilename
@@ -616,12 +639,17 @@ public class WriteFilesTest {
@Override
public ResourceId unwindowedFilename(
int shardNumber, int numShards, OutputFileHints outputFileHints) {
+ DecimalFormat df = new DecimalFormat("0000");
String prefix =
baseFilename.isDirectory() ? "" : firstNonNull(baseFilename.getFilename(), "");
String filename =
String.format(
"%s-%s-of-%s%s%s",
- prefix, shardNumber, numShards, outputFileHints.getSuggestedFilenameSuffix(), suffix);
+ prefix,
+ df.format(shardNumber),
+ df.format(numShards),
+ outputFileHints.getSuggestedFilenameSuffix(),
+ suffix);
return baseFilename
.getCurrentDirectory()
.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
@@ -674,6 +702,22 @@ public class WriteFilesTest {
}
if (numExpectedShards.isPresent()) {
assertEquals(numExpectedShards.get().intValue(), outputFiles.size());
+ Pattern shardPattern = Pattern.compile("\\d{4}-of-\\d{4}");
+
+ Set<String> expectedShards = Sets.newHashSet();
+ DecimalFormat df = new DecimalFormat("0000");
+ for (int i = 0; i < numExpectedShards.get(); i++) {
+ expectedShards.add(
+ String.format("%s-of-%s", df.format(i), df.format(numExpectedShards.get())));
+ }
+
+ Set<String> outputShards = Sets.newHashSet();
+ for (File file : outputFiles) {
+ Matcher matcher = shardPattern.matcher(file.getName());
+ assertTrue(matcher.find());
+ assertTrue(outputShards.add(matcher.group()));
+ }
+ assertEquals(expectedShards, outputShards);
}
List<String> actual = Lists.newArrayList();
[2/2] beam git commit: This closes #3546: [BEAM-2601] Fix broken
per-destination finalization.
Posted by jk...@apache.org.
This closes #3546: [BEAM-2601] Fix broken per-destination finalization.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1f1df272
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1f1df272
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1f1df272
Branch: refs/heads/master
Commit: 1f1df2722bd8697c51cb93e635804771a4f559ba
Parents: 47273b9 ca40663
Author: Eugene Kirpichov <ki...@google.com>
Authored: Sat Jul 15 10:56:59 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Sat Jul 15 10:56:59 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/DefaultFilenamePolicy.java | 27 +++++
.../org/apache/beam/sdk/io/FileBasedSink.java | 43 ++++---
.../java/org/apache/beam/sdk/io/WriteFiles.java | 117 ++++++++++++++-----
.../apache/beam/sdk/io/FileBasedSinkTest.java | 2 +-
.../org/apache/beam/sdk/io/WriteFilesTest.java | 66 +++++++++--
5 files changed, 198 insertions(+), 57 deletions(-)
----------------------------------------------------------------------