You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/05/10 19:20:26 UTC

[2/5] beam git commit: Simpler code for setting shard numbers on results in FileBasedSink

Simpler code for setting shard numbers on results in FileBasedSink


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a09a3ec0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a09a3ec0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a09a3ec0

Branch: refs/heads/master
Commit: a09a3ec0645689f54a4fffc944cc531192e1f014
Parents: 2d5fbf6
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue May 9 15:10:07 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Wed May 10 12:18:41 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 113 +++++++++++--------
 1 file changed, 68 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a09a3ec0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 2117794..7f729a7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -22,10 +22,12 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.base.Verify.verifyNotNull;
+import static org.apache.beam.sdk.io.WriteFiles.UNKNOWN_SHARDNUM;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Ordering;
 import java.io.IOException;
 import java.io.InputStream;
@@ -530,44 +532,63 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
       int numShards = Iterables.size(writerResults);
       Map<ResourceId, ResourceId> outputFilenames = new HashMap<>();
 
-      List<FileResult> unshardedFiles = new ArrayList<>();
       FilenamePolicy policy = getSink().getFilenamePolicy();
+      ResourceId baseOutputDir = getSink().getBaseOutputDirectoryProvider().get();
+
+      // Either all results have a shard number set (if the sink is configured with a fixed
+      // number of shards), or they all don't (otherwise).
+      Boolean isShardNumberSetEverywhere = null;
       for (FileResult result : writerResults) {
-        if (result.getShard() != WriteFiles.UNKNOWN_SHARDNUM) {
-          outputFilenames.put(result.getTempFilename(),
-              result.getDestinationFile(policy, getSink().getBaseOutputDirectoryProvider().get(),
-                  numShards, getSink().getExtension()));
+        boolean isShardNumberSetHere = (result.getShard() != UNKNOWN_SHARDNUM);
+        if (isShardNumberSetEverywhere == null) {
+          isShardNumberSetEverywhere = isShardNumberSetHere;
         } else {
-          unshardedFiles.add(result);
+          checkArgument(
+              isShardNumberSetEverywhere == isShardNumberSetHere,
+              "Found a mix of files with and without shard number set: %s",
+              result);
         }
       }
 
-      // writerResults won't contain destination filenames, so we dynamically generate them here.
-      if (unshardedFiles.size() > 0) {
-        checkArgument(outputFilenames.isEmpty());
+      if (isShardNumberSetEverywhere == null) {
+        isShardNumberSetEverywhere = true;
+      }
 
+      List<FileResult> resultsWithShardNumbers = Lists.newArrayList();
+      if (isShardNumberSetEverywhere) {
+        resultsWithShardNumbers = Lists.newArrayList(writerResults);
+      } else {
         // Sort files for idempotence. Sort by temporary filename.
         // Note that this codepath should not be used when processing triggered windows. In the
         // case of triggers, the list of FileResult objects in the Finalize iterable is not
         // deterministic, and might change over retries. This breaks the assumption below that
         // sorting the FileResult objects provides idempotency.
-        unshardedFiles = Ordering.from(new Comparator<FileResult>() {
-          @Override
-          public int compare(FileResult first, FileResult second) {
-            return first.getTempFilename().toString().compareTo(
-                second.getTempFilename().toString());
-          }
-        }).sortedCopy(unshardedFiles);
-        for (int i = 0; i < unshardedFiles.size(); i++) {
-          FileResult result = unshardedFiles.get(i);
-          result.setShard(i);
-          outputFilenames.put(result.getTempFilename(),
-              result.getDestinationFile(policy, getSink().getBaseOutputDirectoryProvider().get(),
-                  numShards, getSink().getExtension()));
+        List<FileResult> sortedByTempFilename =
+            Ordering.from(
+                new Comparator<FileResult>() {
+                  @Override
+                  public int compare(FileResult first, FileResult second) {
+                    String firstFilename = first.getTempFilename().toString();
+                    String secondFilename = second.getTempFilename().toString();
+                    return firstFilename.compareTo(secondFilename);
+                  }
+                })
+                .sortedCopy(writerResults);
+        for (int i = 0; i < sortedByTempFilename.size(); i++) {
+          resultsWithShardNumbers.add(sortedByTempFilename.get(i).withShard(i));
         }
       }
 
-      int numDistinctShards = new HashSet<ResourceId>(outputFilenames.values()).size();
+      for (FileResult result : resultsWithShardNumbers) {
+        checkArgument(
+            result.getShard() != UNKNOWN_SHARDNUM, "Should have set shard number on %s", result);
+        outputFilenames.put(
+            result.getTempFilename(),
+            result.getDestinationFile(
+                policy, baseOutputDir, numShards, getSink().getExtension()));
+      }
+
+      int numDistinctShards = new HashSet<>(outputFilenames.values()).size();
       checkState(numDistinctShards == outputFilenames.size(),
          "Only generated %s distinct file names for %s files.",
          numDistinctShards, outputFilenames.size());
@@ -920,12 +941,11 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
    */
   public static final class FileResult {
     private final ResourceId tempFilename;
-    private int shard;
-    private BoundedWindow window;
-    private PaneInfo paneInfo;
+    private final int shard;
+    private final BoundedWindow window;
+    private final PaneInfo paneInfo;
 
-    public FileResult(ResourceId tempFilename, int shard, BoundedWindow window,
-                      PaneInfo paneInfo) {
+    public FileResult(ResourceId tempFilename, int shard, BoundedWindow window, PaneInfo paneInfo) {
       this.tempFilename = tempFilename;
       this.shard = shard;
       this.window = window;
@@ -940,8 +960,8 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
       return shard;
     }
 
-    public void setShard(int shard) {
-      this.shard = shard;
+    public FileResult withShard(int shard) {
+      return new FileResult(tempFilename, shard, window, paneInfo);
     }
 
     public BoundedWindow getWindow() {
@@ -954,7 +974,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
 
     public ResourceId getDestinationFile(FilenamePolicy policy, ResourceId outputDirectory,
                                          int numShards, String extension) {
-      checkArgument(getShard() != WriteFiles.UNKNOWN_SHARDNUM);
+      checkArgument(getShard() != UNKNOWN_SHARDNUM);
       checkArgument(numShards > 0);
       if (getWindow() != null) {
         return policy.windowedFilename(outputDirectory, new WindowedContext(
@@ -966,8 +986,11 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
     }
 
     public String toString() {
-      return MoreObjects.toStringHelper(FileBasedSink.FileResult.class)
+      return MoreObjects.toStringHelper(FileResult.class)
           .add("tempFilename", tempFilename)
+          .add("shard", shard)
+          .add("window", window)
+          .add("paneInfo", paneInfo)
           .toString();
     }
   }
@@ -976,9 +999,10 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
    * A coder for {@link FileResult} objects.
    */
   public static final class FileResultCoder extends StructuredCoder<FileResult> {
-    private final Coder<String> stringCoder = StringUtf8Coder.of();
-    private final Coder<Integer> integerCoder = VarIntCoder.of();
-    private final Coder<PaneInfo> paneInfoCoder = NullableCoder.of(PaneInfoCoder.INSTANCE);
+    private static final Coder<String> FILENAME_CODER = StringUtf8Coder.of();
+    private static final Coder<Integer> SHARD_CODER = VarIntCoder.of();
+    private static final Coder<PaneInfo> PANE_INFO_CODER = NullableCoder.of(PaneInfoCoder.INSTANCE);
+
     private final Coder<BoundedWindow> windowCoder;
 
     protected FileResultCoder(Coder<BoundedWindow> windowCoder) {
@@ -1000,30 +1024,29 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
       if (value == null) {
         throw new CoderException("cannot encode a null value");
       }
-      stringCoder.encode(value.getTempFilename().toString(), outStream);
+      FILENAME_CODER.encode(value.getTempFilename().toString(), outStream);
       windowCoder.encode(value.getWindow(), outStream);
-      paneInfoCoder.encode(value.getPaneInfo(), outStream);
-      integerCoder.encode(value.getShard(), outStream);
+      PANE_INFO_CODER.encode(value.getPaneInfo(), outStream);
+      SHARD_CODER.encode(value.getShard(), outStream);
     }
 
     @Override
     public FileResult decode(InputStream inStream)
         throws IOException {
-      String tempFilename = stringCoder.decode(inStream);
-      assert tempFilename != null;  // fixes a compiler warning
+      String tempFilename = FILENAME_CODER.decode(inStream);
       BoundedWindow window = windowCoder.decode(inStream);
-      PaneInfo paneInfo = paneInfoCoder.decode(inStream);
-      int shard = integerCoder.decode(inStream);
+      PaneInfo paneInfo = PANE_INFO_CODER.decode(inStream);
+      int shard = SHARD_CODER.decode(inStream);
       return new FileResult(FileSystems.matchNewResource(tempFilename, false /* isDirectory */),
           shard, window, paneInfo);
     }
 
     @Override
     public void verifyDeterministic() throws NonDeterministicException {
-      stringCoder.verifyDeterministic();
+      FILENAME_CODER.verifyDeterministic();
       windowCoder.verifyDeterministic();
-      paneInfoCoder.verifyDeterministic();
-      integerCoder.verifyDeterministic();
+      PANE_INFO_CODER.verifyDeterministic();
+      SHARD_CODER.verifyDeterministic();
     }
   }