You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/12/06 00:29:59 UTC

[beam] 12/13: Reintroduces dynamic sharding with windowed writes for bounded collections

This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit d314339ed2f8d5ce385c7b40705ef13f6ea43b45
Author: Eugene Kirpichov <ki...@google.com>
AuthorDate: Thu Nov 30 13:05:00 2017 -0800

    Reintroduces dynamic sharding with windowed writes for bounded collections
---
 .../apache/beam/examples/WindowedWordCount.java    |  5 ++--
 .../examples/common/WriteOneFilePerWindow.java     | 12 ++++++----
 .../apache/beam/examples/WindowedWordCountIT.java  |  8 +++++++
 .../beam/runners/apex/examples/WordCountTest.java  |  2 +-
 .../construction/WriteFilesTranslationTest.java    |  1 +
 .../beam/runners/spark/io/AvroPipelineTest.java    |  5 ++--
 .../java/org/apache/beam/sdk/io/FileBasedSink.java | 27 +++-------------------
 .../java/org/apache/beam/sdk/io/WriteFiles.java    | 27 +++++++++++-----------
 .../org/apache/beam/sdk/io/WriteFilesTest.java     |  5 ++--
 9 files changed, 41 insertions(+), 51 deletions(-)

diff --git a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
index 21cfed8..b31ce4a 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
@@ -162,9 +162,8 @@ public class WindowedWordCount {
     void setMaxTimestampMillis(Long value);
 
     @Description("Fixed number of shards to produce per window")
-    @Default.Integer(3)
-    int getNumShards();
-    void setNumShards(int numShards);
+    Integer getNumShards();
+    void setNumShards(Integer numShards);
   }
 
   public static void main(String[] args) throws IOException {
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
index a5c84f6..abd14b7 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
@@ -19,6 +19,7 @@ package org.apache.beam.examples.common;
 
 import static com.google.common.base.MoreObjects.firstNonNull;
 
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
@@ -45,9 +46,10 @@ import org.joda.time.format.ISODateTimeFormat;
 public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone> {
   private static final DateTimeFormatter FORMATTER = ISODateTimeFormat.hourMinute();
   private String filenamePrefix;
-  private int numShards;
+  @Nullable
+  private Integer numShards;
 
-  public WriteOneFilePerWindow(String filenamePrefix, int numShards) {
+  public WriteOneFilePerWindow(String filenamePrefix, Integer numShards) {
     this.filenamePrefix = filenamePrefix;
     this.numShards = numShards;
   }
@@ -59,8 +61,10 @@ public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone
         TextIO.write()
             .to(new PerWindowFiles(resource))
             .withTempDirectory(resource.getCurrentDirectory())
-            .withWindowedWrites()
-            .withNumShards(numShards);
+            .withWindowedWrites();
+    if (numShards != null) {
+      write = write.withNumShards(numShards);
+    }
     return input.apply(write);
   }
 
diff --git a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
index 279de53..2f4ef34 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
@@ -87,6 +87,14 @@ public class WindowedWordCountIT {
   }
 
   @Test
+  public void testWindowedWordCountInBatchDynamicSharding() throws Exception {
+    WindowedWordCountITOptions options = batchOptions();
+    // This is the default value, but make it explicit.
+    options.setNumShards(null);
+    testWindowedWordCountPipeline(options);
+  }
+
+  @Test
   public void testWindowedWordCountInBatchStaticSharding() throws Exception {
     WindowedWordCountITOptions options = batchOptions();
     options.setNumShards(3);
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
index e050c15..ba75746 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
@@ -108,7 +108,7 @@ public class WordCountTest {
       .apply(ParDo.of(new ExtractWordsFn()))
       .apply(Count.<String>perElement())
       .apply(ParDo.of(new FormatAsStringFn()))
-      .apply("WriteCounts", TextIO.write().to(options.getOutput()).withNumShards(2))
+      .apply("WriteCounts", TextIO.write().to(options.getOutput()))
       ;
     p.run().waitUntilFinish();
   }
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
index 2d45681..038653d 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
@@ -64,6 +64,7 @@ public class WriteFilesTranslationTest {
     public static Iterable<WriteFiles<Object, Void, Object>> data() {
       return ImmutableList.of(
           WriteFiles.to(new DummySink()),
+          WriteFiles.to(new DummySink()).withWindowedWrites(),
           WriteFiles.to(new DummySink()).withNumShards(17),
           WriteFiles.to(new DummySink()).withWindowedWrites().withNumShards(42));
     }
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
index e17a6b8..fc65aac 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
@@ -74,8 +74,7 @@ public class AvroPipelineTest {
         AvroIO.readGenericRecords(schema).from(inputFile.getAbsolutePath()));
     input.apply(
         AvroIO.writeGenericRecords(schema)
-            .to(outputFile.getAbsolutePath())
-            .withoutSharding());
+            .to(outputFile.getAbsolutePath()));
     pipeline.run();
 
     List<GenericRecord> records = readGenericFile();
@@ -100,7 +99,7 @@ public class AvroPipelineTest {
     List<GenericRecord> records = Lists.newArrayList();
     GenericDatumReader<GenericRecord> genericDatumReader = new GenericDatumReader<>();
     try (DataFileReader<GenericRecord> dataFileReader =
-        new DataFileReader<>(outputFile, genericDatumReader)) {
+        new DataFileReader<>(new File(outputFile + "-00000-of-00001"), genericDatumReader)) {
       for (GenericRecord record : dataFileReader) {
         records.add(record);
       }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 48d7521..2e5d387 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -32,7 +32,6 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Ordering;
 import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.io.InputStream;
@@ -43,7 +42,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -634,28 +632,9 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
       if (numShards != null) {
         resultsWithShardNumbers = Lists.newArrayList(completeResults);
       } else {
-        checkState(
-            !windowedWrites,
-            "When doing windowed writes, shards should have been assigned when writing");
-        // Sort files for idempotence. Sort by temporary filename.
-        // Note that this codepath should not be used when processing triggered windows. In the
-        // case of triggers, the list of FileResult objects in the Finalize iterable is not
-        // deterministic, and might change over retries. This breaks the assumption below that
-        // sorting the FileResult objects provides idempotency.
-        List<FileResult<DestinationT>> sortedByTempFilename =
-            Ordering.from(
-                    new Comparator<FileResult<DestinationT>>() {
-                      @Override
-                      public int compare(
-                          FileResult<DestinationT> first, FileResult<DestinationT> second) {
-                        String firstFilename = first.getTempFilename().toString();
-                        String secondFilename = second.getTempFilename().toString();
-                        return firstFilename.compareTo(secondFilename);
-                      }
-                    })
-                .sortedCopy(completeResults);
-        for (int i = 0; i < sortedByTempFilename.size(); i++) {
-          resultsWithShardNumbers.add(sortedByTempFilename.get(i).withShard(i));
+        int i = 0;
+        for (FileResult<DestinationT> res : completeResults) {
+          resultsWithShardNumbers.add(res.withShard(i++));
         }
       }
 
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 54f055d..499a194 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.io;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.base.Objects;
@@ -42,8 +41,8 @@ import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
-import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.ShardedKeyCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
@@ -286,13 +285,12 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
           getWindowedWrites(),
           "Must use windowed writes when applying %s to an unbounded PCollection",
           WriteFiles.class.getSimpleName());
-    }
-    if (getWindowedWrites()) {
       // The reason for this is https://issues.apache.org/jira/browse/BEAM-1438
       // and similar behavior in other runners.
       checkArgument(
           getComputeNumShards() != null || getNumShardsProvider() != null,
-          "When using windowed writes, must specify number of output shards explicitly",
+          "When applying %s to an unbounded PCollection, "
+              + "must specify number of output shards explicitly",
           WriteFiles.class.getSimpleName());
     }
     this.writeOperation = getSink().createWriteOperation();
@@ -364,7 +362,7 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
   }
 
   private class GatherResults<ResultT>
-      extends PTransform<PCollection<ResultT>, PCollection<Iterable<ResultT>>> {
+      extends PTransform<PCollection<ResultT>, PCollection<List<ResultT>>> {
     private final Coder<ResultT> resultCoder;
 
     private GatherResults(Coder<ResultT> resultCoder) {
@@ -372,7 +370,7 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
     }
 
     @Override
-    public PCollection<Iterable<ResultT>> expand(PCollection<ResultT> input) {
+    public PCollection<List<ResultT>> expand(PCollection<ResultT> input) {
       if (getWindowedWrites()) {
         // Reshuffle the results to make them stable against retries.
         // Use a single void key to maximize size of bundles for finalization.
@@ -381,7 +379,9 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
             .apply("Reshuffle", Reshuffle.<Void, ResultT>of())
             .apply("Drop key", Values.<ResultT>create())
             .apply("Gather bundles", ParDo.of(new GatherBundlesPerWindowFn<ResultT>()))
-            .setCoder(IterableCoder.of(resultCoder));
+            .setCoder(ListCoder.of(resultCoder))
+            // Reshuffle one more time to stabilize the contents of the bundle lists to finalize.
+            .apply(Reshuffle.<List<ResultT>>viaRandomKey());
       } else {
         // Pass results via a side input rather than reshuffle, because we need to get an empty
         // iterable to finalize if there are no results.
@@ -389,7 +389,7 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
             .getPipeline()
             .apply(
                 Reify.viewInGlobalWindow(
-                    input.apply(View.<ResultT>asIterable()), IterableCoder.of(resultCoder)));
+                    input.apply(View.<ResultT>asList()), ListCoder.of(resultCoder)));
       }
     }
   }
@@ -742,7 +742,7 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
 
   private class FinalizeTempFileBundles
       extends PTransform<
-          PCollection<Iterable<FileResult<DestinationT>>>, WriteFilesResult<DestinationT>> {
+          PCollection<List<FileResult<DestinationT>>>, WriteFilesResult<DestinationT>> {
     @Nullable private final PCollectionView<Integer> numShardsView;
     private final Coder<DestinationT> destinationCoder;
 
@@ -754,7 +754,7 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
 
     @Override
     public WriteFilesResult<DestinationT> expand(
-        PCollection<Iterable<FileResult<DestinationT>>> input) {
+        PCollection<List<FileResult<DestinationT>>> input) {
 
       List<PCollectionView<?>> finalizeSideInputs = Lists.newArrayList(getSideInputs());
       if (numShardsView != null) {
@@ -772,7 +772,7 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
     }
 
     private class FinalizeFn
-        extends DoFn<Iterable<FileResult<DestinationT>>, KV<DestinationT, String>> {
+        extends DoFn<List<FileResult<DestinationT>>, KV<DestinationT, String>> {
       @ProcessElement
       public void process(ProcessContext c) throws Exception {
         getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
@@ -782,7 +782,6 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
         } else if (getNumShardsProvider() != null) {
           fixedNumShards = getNumShardsProvider().get();
         } else {
-          checkState(!getWindowedWrites(), "Windowed write should have set fixed sharding");
           fixedNumShards = null;
         }
         List<FileResult<DestinationT>> fileResults = Lists.newArrayList(c.element());
@@ -821,7 +820,7 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
     return resultsToFinalFilenames;
   }
 
-  private static class GatherBundlesPerWindowFn<T> extends DoFn<T, Iterable<T>> {
+  private static class GatherBundlesPerWindowFn<T> extends DoFn<T, List<T>> {
     @Nullable private transient Multimap<BoundedWindow, T> bundles = null;
 
     @StartBundle
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
index b68cbf9..da4e6da 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
@@ -386,10 +386,11 @@ public class WriteFilesTest {
 
   @Test
   @Category(NeedsRunner.class)
-  public void testWindowedWritesNeedSharding() {
+  public void testUnboundedWritesNeedSharding() {
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage(
-        "When using windowed writes, must specify number of output shards explicitly");
+        "When applying WriteFiles to an unbounded PCollection, "
+            + "must specify number of output shards explicitly");
 
     SimpleSink<Void> sink = makeSimpleSink();
     p.apply(Create.of("foo"))

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.