You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/07/15 18:12:15 UTC

[1/2] beam git commit: Fixes sharding and cleanup for dynamic file writes

Repository: beam
Updated Branches:
  refs/heads/master 47273b969 -> 1f1df2722


Fixes sharding and cleanup for dynamic file writes

- Sharding was applied across all the destinations at the same time,
  instead of having each destination produce the requested number
  of shards
- Fixes a bunch of issues in cleanup of temporary files in case
  of multiple destinations


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ca406636
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ca406636
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ca406636

Branch: refs/heads/master
Commit: ca4066366bca9ed5ec97859e269875573af7adfc
Parents: 47273b9
Author: Reuven Lax <re...@google.com>
Authored: Tue Jul 11 19:19:42 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Sat Jul 15 10:56:54 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/DefaultFilenamePolicy.java      |  27 +++++
 .../org/apache/beam/sdk/io/FileBasedSink.java   |  43 ++++---
 .../java/org/apache/beam/sdk/io/WriteFiles.java | 117 ++++++++++++++-----
 .../apache/beam/sdk/io/FileBasedSinkTest.java   |   2 +-
 .../org/apache/beam/sdk/io/WriteFilesTest.java  |  66 +++++++++--
 5 files changed, 198 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ca406636/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
index 64d7edc..4021609 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
@@ -20,6 +20,8 @@ package org.apache.beam.sdk.io;
 import static com.google.common.base.MoreObjects.firstNonNull;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -139,6 +141,31 @@ public final class DefaultFilenamePolicy extends FilenamePolicy {
     public Params withSuffix(String suffix) {
       return new Params(baseFilename, shardTemplate, suffix, explicitTemplate);
     }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(baseFilename.get(), shardTemplate, suffix);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (!(o instanceof Params)) {
+        return false;
+      }
+      Params other = (Params) o;
+      return baseFilename.get().equals(other.baseFilename.get())
+          && shardTemplate.equals(other.shardTemplate)
+          && suffix.equals(other.suffix);
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this)
+          .add("baseFilename", baseFilename)
+          .add("shardTemplate", shardTemplate)
+          .add("suffix", suffix)
+          .toString();
+    }
   }
 
   /** A Coder for {@link Params}. */

http://git-wip-us.apache.org/repos/asf/beam/blob/ca406636/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index c68b794..9953975 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -218,7 +218,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
       implements HasDisplayData, Serializable {
     /**
      * Returns an object that represents at a high level the destination being written to. May not
-     * return null.
+     * return null. A destination must have deterministic hash and equality methods defined.
      */
     public abstract DestinationT getDestination(UserT element);
 
@@ -486,8 +486,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
     }
 
     /**
-     * Finalizes writing by copying temporary output files to their final location and optionally
-     * removing temporary files.
+     * Finalizes writing by copying temporary output files to their final location.
      *
      * <p>Finalization may be overridden by subclass implementations to perform customized
      * finalization (e.g., initiating some operation on output bundles, merging them, etc.). {@code
@@ -497,23 +496,37 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
      * idempotent, as it may be executed multiple times in the case of failure or for redundancy. It
      * is a best practice to attempt to try to make this method atomic.
      *
+     * <p>Returns the set of temporary files generated. Callers must call {@link
+     * #removeTemporaryFiles(Set)} to cleanup these files.
+     *
      * @param writerResults the results of writes (FileResult).
      */
-    public void finalize(Iterable<FileResult<DestinationT>> writerResults) throws Exception {
-      // Collect names of temporary files and rename them.
+    public Set<ResourceId> finalize(Iterable<FileResult<DestinationT>> writerResults)
+        throws Exception {
+      // Collect names of temporary files and copies them.
       Map<ResourceId, ResourceId> outputFilenames = buildOutputFilenames(writerResults);
       copyToOutputFiles(outputFilenames);
+      return outputFilenames.keySet();
+    }
 
-      // Optionally remove temporary files.
-      // We remove the entire temporary directory, rather than specifically removing the files
-      // from writerResults, because writerResults includes only successfully completed bundles,
-      // and we'd like to clean up the failed ones too.
-      // Note that due to GCS eventual consistency, matching files in the temp directory is also
-      // currently non-perfect and may fail to delete some files.
-      //
-      // When windows or triggers are specified, files are generated incrementally so deleting
-      // the entire directory in finalize is incorrect.
-      removeTemporaryFiles(outputFilenames.keySet(), !windowedWrites);
+    /*
+     * Remove temporary files after finalization.
+     *
+     * <p>In the case where we are doing global-window, untriggered writes, we remove the entire
+     * temporary directory, rather than specifically removing the files from writerResults, because
+     * writerResults includes only successfully completed bundles, and we'd like to clean up the
+     * failed ones too. The reason we remove files here rather than in finalize is that finalize
+     * might be called multiple times (e.g. if the bundle contained multiple destinations), and
+     * deleting the entire directory can't be done until all calls to finalize.
+     *
+     * <p>When windows or triggers are specified, files are generated incrementally so deleting the
+     * entire directory in finalize is incorrect. If windowedWrites is true, we instead delete the
+     * files individually. This means that some temporary files generated by failed bundles might
+     * not be cleaned up. Note that {@link WriteFiles} does attempt clean up files if exceptions
+     * are thrown, however there are still some scenarios where temporary files might be left.
+     */
+    public void removeTemporaryFiles(Set<ResourceId> filenames) throws IOException {
+      removeTemporaryFiles(filenames, !windowedWrites);
     }
 
     @Experimental(Kind.FILESYSTEM)

http://git-wip-us.apache.org/repos/asf/beam/blob/ca406636/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 7013044..d8d7478 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -19,15 +19,21 @@ package org.apache.beam.sdk.io;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.base.Objects;
+import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
 import com.google.common.hash.Hashing;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
@@ -44,6 +50,7 @@ import org.apache.beam.sdk.io.FileBasedSink.FileResult;
 import org.apache.beam.sdk.io.FileBasedSink.FileResultCoder;
 import org.apache.beam.sdk.io.FileBasedSink.WriteOperation;
 import org.apache.beam.sdk.io.FileBasedSink.Writer;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
@@ -594,6 +601,15 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     }
   }
 
+  Multimap<DestinationT, FileResult<DestinationT>> perDestinationResults(
+      Iterable<FileResult<DestinationT>> results) {
+    Multimap<DestinationT, FileResult<DestinationT>> perDestination = ArrayListMultimap.create();
+    for (FileResult<DestinationT> result : results) {
+      perDestination.put(result.getDestination(), result);
+    }
+    return perDestination;
+  }
+
   /**
    * A write is performed as sequence of three {@link ParDo}'s.
    *
@@ -737,11 +753,21 @@ public class WriteFiles<UserT, DestinationT, OutputT>
                   new DoFn<KV<Void, Iterable<FileResult<DestinationT>>>, Integer>() {
                     @ProcessElement
                     public void processElement(ProcessContext c) throws Exception {
-                      LOG.info("Finalizing write operation {}.", writeOperation);
-                      List<FileResult<DestinationT>> results =
-                          Lists.newArrayList(c.element().getValue());
-                      writeOperation.finalize(results);
-                      LOG.debug("Done finalizing write operation");
+                      Set<ResourceId> tempFiles = Sets.newHashSet();
+                      Multimap<DestinationT, FileResult<DestinationT>> results =
+                          perDestinationResults(c.element().getValue());
+                      for (Map.Entry<DestinationT, Collection<FileResult<DestinationT>>> entry :
+                          results.asMap().entrySet()) {
+                        LOG.info(
+                            "Finalizing write operation {} for destination {} num shards: {}.",
+                            writeOperation,
+                            entry.getKey(),
+                            entry.getValue().size());
+                        tempFiles.addAll(writeOperation.finalize(entry.getValue()));
+                        LOG.debug("Done finalizing write operation for {}.", entry.getKey());
+                      }
+                      writeOperation.removeTemporaryFiles(tempFiles);
+                      LOG.debug("Removed temporary files for {}.", writeOperation);
                     }
                   }));
     } else {
@@ -769,11 +795,6 @@ public class WriteFiles<UserT, DestinationT, OutputT>
                     @ProcessElement
                     public void processElement(ProcessContext c) throws Exception {
                       LOG.info("Finalizing write operation {}.", writeOperation);
-                      List<FileResult<DestinationT>> results =
-                          Lists.newArrayList(c.sideInput(resultsView));
-                      LOG.debug(
-                          "Side input initialized to finalize write operation {}.", writeOperation);
-
                       // We must always output at least 1 shard, and honor user-specified numShards
                       // if
                       // set.
@@ -785,31 +806,67 @@ public class WriteFiles<UserT, DestinationT, OutputT>
                       } else {
                         minShardsNeeded = 1;
                       }
-                      int extraShardsNeeded = minShardsNeeded - results.size();
-                      if (extraShardsNeeded > 0) {
-                        LOG.info(
-                            "Creating {} empty output shards in addition to {} written "
-                                + "for a total of {}.",
-                            extraShardsNeeded,
-                            results.size(),
-                            minShardsNeeded);
-                        for (int i = 0; i < extraShardsNeeded; ++i) {
-                          Writer<OutputT, DestinationT> writer = writeOperation.createWriter();
-                          writer.openUnwindowed(
-                              UUID.randomUUID().toString(),
-                              UNKNOWN_SHARDNUM,
-                              sink.getDynamicDestinations().getDefaultDestination());
-                          FileResult<DestinationT> emptyWrite = writer.close();
-                          results.add(emptyWrite);
-                        }
-                        LOG.debug("Done creating extra shards.");
+                      Set<ResourceId> tempFiles = Sets.newHashSet();
+                      Multimap<DestinationT, FileResult<DestinationT>> perDestination =
+                          perDestinationResults(c.sideInput(resultsView));
+                      for (Map.Entry<DestinationT, Collection<FileResult<DestinationT>>> entry :
+                          perDestination.asMap().entrySet()) {
+                        tempFiles.addAll(
+                            finalizeForDestinationFillEmptyShards(
+                                entry.getKey(), entry.getValue(), minShardsNeeded));
                       }
-                      writeOperation.finalize(results);
-                      LOG.debug("Done finalizing write operation {}", writeOperation);
+                      if (perDestination.isEmpty()) {
+                        // If there is no input at all, write empty files to the default
+                        // destination.
+                        tempFiles.addAll(
+                            finalizeForDestinationFillEmptyShards(
+                                getSink().getDynamicDestinations().getDefaultDestination(),
+                                Lists.<FileResult<DestinationT>>newArrayList(),
+                                minShardsNeeded));
+                      }
+                      writeOperation.removeTemporaryFiles(tempFiles);
                     }
                   })
               .withSideInputs(sideInputs.build()));
     }
     return PDone.in(input.getPipeline());
   }
+
+  /**
+   * Finalize a list of files for a single destination. If a minimum number of shards is needed,
+   * this function will generate empty files for this destination to ensure that all shards are
+   * generated.
+   */
+  private Set<ResourceId> finalizeForDestinationFillEmptyShards(
+      DestinationT destination, Collection<FileResult<DestinationT>> results, int minShardsNeeded)
+      throws Exception {
+    checkState(!windowedWrites);
+
+    LOG.info(
+        "Finalizing write operation {} for destination {} num shards {}.",
+        writeOperation,
+        destination,
+        results.size());
+    int extraShardsNeeded = minShardsNeeded - results.size();
+    if (extraShardsNeeded > 0) {
+      LOG.info(
+          "Creating {} empty output shards in addition to {} written "
+              + "for a total of {} for destination {}.",
+          extraShardsNeeded,
+          results.size(),
+          minShardsNeeded,
+          destination);
+      for (int i = 0; i < extraShardsNeeded; ++i) {
+        Writer<OutputT, DestinationT> writer = writeOperation.createWriter();
+        // Currently this code path is only called in the unwindowed case.
+        writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, destination);
+        FileResult<DestinationT> emptyWrite = writer.close();
+        results.add(emptyWrite);
+      }
+      LOG.debug("Done creating extra shards for {}.", destination);
+    }
+    Set<ResourceId> tempFiles = writeOperation.finalize(results);
+    LOG.debug("Done finalizing write operation {} for destination {}", writeOperation, destination);
+    return tempFiles;
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/ca406636/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index b756778..a6ad746 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -203,7 +203,7 @@ public class FileBasedSinkTest {
               null));
     }
 
-    writeOp.finalize(fileResults);
+    writeOp.removeTemporaryFiles(writeOp.finalize(fileResults));
 
     for (int i = 0; i < numFiles; i++) {
       ResourceId outputFilename =

http://git-wip-us.apache.org/repos/asf/beam/blob/ca406636/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
index 1ca7169..60088de 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
@@ -27,6 +27,7 @@ import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
@@ -35,11 +36,15 @@ import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
 import java.nio.file.Paths;
+import java.text.DecimalFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
@@ -80,6 +85,7 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.commons.compress.utils.Sets;
 import org.joda.time.Duration;
 import org.joda.time.format.DateTimeFormatter;
 import org.joda.time.format.ISODateTimeFormat;
@@ -449,16 +455,23 @@ public class WriteFilesTest {
   @Test
   @Category(NeedsRunner.class)
   public void testDynamicDestinationsBounded() throws Exception {
-    testDynamicDestinationsHelper(true);
+    testDynamicDestinationsHelper(true, false);
   }
 
   @Test
   @Category(NeedsRunner.class)
   public void testDynamicDestinationsUnbounded() throws Exception {
-    testDynamicDestinationsHelper(false);
+    testDynamicDestinationsHelper(false, false);
   }
 
-  private void testDynamicDestinationsHelper(boolean bounded) throws IOException {
+  @Test
+  @Category(NeedsRunner.class)
+  public void testDynamicDestinationsFillEmptyShards() throws Exception {
+    testDynamicDestinationsHelper(true, true);
+  }
+
+  private void testDynamicDestinationsHelper(boolean bounded, boolean emptyShards)
+      throws IOException {
     TestDestinations dynamicDestinations = new TestDestinations(getBaseOutputDirectory());
     SimpleSink<Integer> sink =
         new SimpleSink<>(
@@ -469,15 +482,21 @@ public class WriteFilesTest {
     options.setTestFlag("test_value");
     Pipeline p = TestPipeline.create(options);
 
-    List<String> inputs = Lists.newArrayList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");
+    final int numInputs = 100;
+    List<String> inputs = Lists.newArrayList();
+    for (int i = 0; i < numInputs; ++i) {
+      inputs.add(Integer.toString(i));
+    }
     // Prepare timestamps for the elements.
     List<Long> timestamps = new ArrayList<>();
     for (long i = 0; i < inputs.size(); i++) {
       timestamps.add(i + 1);
     }
-
+    // If emptyShards==true make numShards larger than the number of elements per destination.
+    // This will force every destination to generate some empty shards.
+    int numShards = emptyShards ? 2 * numInputs / 5 : 2;
     WriteFiles<String, Integer, String> writeFiles =
-        WriteFiles.to(sink, new TestDynamicFormatFunction()).withNumShards(1);
+        WriteFiles.to(sink, new TestDynamicFormatFunction()).withNumShards(numShards);
 
     PCollection<String> input = p.apply(Create.timestamped(inputs, timestamps));
     if (!bounded) {
@@ -492,8 +511,11 @@ public class WriteFilesTest {
     for (int i = 0; i < 5; ++i) {
       ResourceId base =
           getBaseOutputDirectory().resolve("file_" + i, StandardResolveOptions.RESOLVE_FILE);
-      List<String> expected = Lists.newArrayList("record_" + i, "record_" + (i + 5));
-      checkFileContents(base.toString(), expected, Optional.of(1));
+      List<String> expected = Lists.newArrayList();
+      for (int j = i; j < numInputs; j += 5) {
+        expected.add("record_" + j);
+      }
+      checkFileContents(base.toString(), expected, Optional.of(numShards));
     }
   }
 
@@ -599,13 +621,14 @@ public class WriteFilesTest {
         BoundedWindow window,
         PaneInfo paneInfo,
         OutputFileHints outputFileHints) {
+      DecimalFormat df = new DecimalFormat("0000");
       IntervalWindow intervalWindow = (IntervalWindow) window;
       String filename =
           String.format(
               "%s-%s-of-%s%s%s",
               filenamePrefixForWindow(intervalWindow),
-              shardNumber,
-              numShards,
+              df.format(shardNumber),
+              df.format(numShards),
               outputFileHints.getSuggestedFilenameSuffix(),
               suffix);
       return baseFilename
@@ -616,12 +639,17 @@ public class WriteFilesTest {
     @Override
     public ResourceId unwindowedFilename(
         int shardNumber, int numShards, OutputFileHints outputFileHints) {
+      DecimalFormat df = new DecimalFormat("0000");
       String prefix =
           baseFilename.isDirectory() ? "" : firstNonNull(baseFilename.getFilename(), "");
       String filename =
           String.format(
               "%s-%s-of-%s%s%s",
-              prefix, shardNumber, numShards, outputFileHints.getSuggestedFilenameSuffix(), suffix);
+              prefix,
+              df.format(shardNumber),
+              df.format(numShards),
+              outputFileHints.getSuggestedFilenameSuffix(),
+              suffix);
       return baseFilename
           .getCurrentDirectory()
           .resolve(filename, StandardResolveOptions.RESOLVE_FILE);
@@ -674,6 +702,22 @@ public class WriteFilesTest {
     }
     if (numExpectedShards.isPresent()) {
       assertEquals(numExpectedShards.get().intValue(), outputFiles.size());
+      Pattern shardPattern = Pattern.compile("\\d{4}-of-\\d{4}");
+
+      Set<String> expectedShards = Sets.newHashSet();
+      DecimalFormat df = new DecimalFormat("0000");
+      for (int i = 0; i < numExpectedShards.get(); i++) {
+        expectedShards.add(
+            String.format("%s-of-%s", df.format(i), df.format(numExpectedShards.get())));
+      }
+
+      Set<String> outputShards = Sets.newHashSet();
+      for (File file : outputFiles) {
+        Matcher matcher = shardPattern.matcher(file.getName());
+        assertTrue(matcher.find());
+        assertTrue(outputShards.add(matcher.group()));
+      }
+      assertEquals(expectedShards, outputShards);
     }
 
     List<String> actual = Lists.newArrayList();


[2/2] beam git commit: This closes #3546: [BEAM-2601] Fix broken per-destination finalization.

Posted by jk...@apache.org.
This closes #3546: [BEAM-2601] Fix broken per-destination finalization.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1f1df272
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1f1df272
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1f1df272

Branch: refs/heads/master
Commit: 1f1df2722bd8697c51cb93e635804771a4f559ba
Parents: 47273b9 ca40663
Author: Eugene Kirpichov <ki...@google.com>
Authored: Sat Jul 15 10:56:59 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Sat Jul 15 10:56:59 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/DefaultFilenamePolicy.java      |  27 +++++
 .../org/apache/beam/sdk/io/FileBasedSink.java   |  43 ++++---
 .../java/org/apache/beam/sdk/io/WriteFiles.java | 117 ++++++++++++++-----
 .../apache/beam/sdk/io/FileBasedSinkTest.java   |   2 +-
 .../org/apache/beam/sdk/io/WriteFilesTest.java  |  66 +++++++++--
 5 files changed, 198 insertions(+), 57 deletions(-)
----------------------------------------------------------------------