You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/06/20 21:42:07 UTC

[1/2] beam git commit: Add spilling code to WriteFiles.

Repository: beam
Updated Branches:
  refs/heads/master 10e47646d -> 698b89e2b


Add spilling code to WriteFiles.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/69b01a61
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/69b01a61
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/69b01a61

Branch: refs/heads/master
Commit: 69b01a6118702277348d2f625af669225c9ed99e
Parents: 10e4764
Author: Reuven Lax <re...@google.com>
Authored: Sat May 13 12:53:08 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Jun 20 14:28:17 2017 -0700

----------------------------------------------------------------------
 runners/direct-java/pom.xml                     |   3 +-
 .../beam/runners/direct/DirectRunner.java       |  28 ++--
 .../java/org/apache/beam/sdk/io/WriteFiles.java | 133 +++++++++++++++----
 .../beam/sdk/testing/TestPipelineOptions.java   |  10 ++
 .../java/org/apache/beam/sdk/io/SimpleSink.java |   4 +
 .../org/apache/beam/sdk/io/WriteFilesTest.java  |  89 ++++++++++---
 6 files changed, 209 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/69b01a61/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index bec2113..6346575 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -155,7 +155,8 @@
               <systemPropertyVariables>
                 <beamTestPipelineOptions>
                   [
-                    "--runner=DirectRunner"
+                    "--runner=DirectRunner",
+                    "--unitTest"
                   ]
                 </beamTestPipelineOptions>
               </systemPropertyVariables>

http://git-wip-us.apache.org/repos/asf/beam/blob/69b01a61/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 136ccf3..a16e24d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -43,6 +43,7 @@ import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PTransformOverride;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
@@ -221,15 +222,18 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
   @SuppressWarnings("rawtypes")
   @VisibleForTesting
   List<PTransformOverride> defaultTransformOverrides() {
-    return ImmutableList.<PTransformOverride>builder()
-        .add(
-            PTransformOverride.of(
-                PTransformMatchers.writeWithRunnerDeterminedSharding(),
-                new WriteWithShardingFactory())) /* Uses a view internally. */
-        .add(
-            PTransformOverride.of(
-                PTransformMatchers.urnEqualTo(PTransformTranslation.CREATE_VIEW_TRANSFORM_URN),
-                new ViewOverrideFactory())) /* Uses pardos and GBKs */
+    TestPipelineOptions testOptions = options.as(TestPipelineOptions.class);
+    ImmutableList.Builder<PTransformOverride> builder = ImmutableList.builder();
+    if (!testOptions.isUnitTest()) {
+      builder.add(
+          PTransformOverride.of(
+              PTransformMatchers.writeWithRunnerDeterminedSharding(),
+              new WriteWithShardingFactory())); /* Uses a view internally. */
+    }
+    builder = builder.add(
+        PTransformOverride.of(
+            PTransformMatchers.urnEqualTo(PTransformTranslation.CREATE_VIEW_TRANSFORM_URN),
+            new ViewOverrideFactory())) /* Uses pardos and GBKs */
         .add(
             PTransformOverride.of(
                 PTransformMatchers.urnEqualTo(PTransformTranslation.TEST_STREAM_TRANSFORM_URN),
@@ -254,9 +258,9 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
                 new DirectGBKIntoKeyedWorkItemsOverrideFactory())) /* Returns a GBKO */
         .add(
             PTransformOverride.of(
-                PTransformMatchers.urnEqualTo(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN),
-                new DirectGroupByKeyOverrideFactory())) /* returns two chained primitives. */
-        .build();
+    PTransformMatchers.urnEqualTo(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN),
+                new DirectGroupByKeyOverrideFactory())); /* returns two chained primitives. */
+    return builder.build();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/69b01a61/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 2fd10ac..a220eab 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.FileBasedSink.FileResult;
 import org.apache.beam.sdk.io.FileBasedSink.FileResultCoder;
@@ -42,6 +43,7 @@ import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -56,8 +58,12 @@ import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -86,6 +92,18 @@ import org.slf4j.LoggerFactory;
 public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
   private static final Logger LOG = LoggerFactory.getLogger(WriteFiles.class);
 
+  // The maximum number of file writers to keep open in a single bundle at a time, since file
+  // writers default to 64mb buffers. This comes into play when writing per-window files.
+  // The first 20 files from a single WriteFiles transform will write files inline in the
+  // transform. Anything beyond that might be shuffled.
+  // Keep in mind that specific runners may decide to run multiple bundles in parallel, based on
+  // their own policy.
+  private static final int DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE = 20;
+
+  // When we spill records, shard the output keys to prevent hotspots.
+  // We could consider making this a parameter.
+  private static final int SPILLED_RECORD_SHARDING_FACTOR = 10;
+
   static final int UNKNOWN_SHARDNUM = -1;
   private FileBasedSink<T> sink;
   private WriteOperation<T> writeOperation;
@@ -98,6 +116,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
   @Nullable
   private final ValueProvider<Integer> numShardsProvider;
   private final boolean windowedWrites;
+  private int maxNumWritersPerBundle;
 
   /**
    * Creates a {@link WriteFiles} transform that writes to the given {@link FileBasedSink}, letting
@@ -105,18 +124,21 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
    */
   public static <T> WriteFiles<T> to(FileBasedSink<T> sink) {
     checkNotNull(sink, "sink");
-    return new WriteFiles<>(sink, null /* runner-determined sharding */, null, false);
+    return new WriteFiles<>(sink, null /* runner-determined sharding */, null,
+        false, DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE);
   }
 
   private WriteFiles(
       FileBasedSink<T> sink,
       @Nullable PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards,
       @Nullable ValueProvider<Integer> numShardsProvider,
-      boolean windowedWrites) {
+      boolean windowedWrites,
+      int maxNumWritersPerBundle) {
     this.sink = sink;
     this.computeNumShards = computeNumShards;
     this.numShardsProvider = numShardsProvider;
     this.windowedWrites = windowedWrites;
+    this.maxNumWritersPerBundle = maxNumWritersPerBundle;
   }
 
   @Override
@@ -213,7 +235,16 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
    * more information.
    */
   public WriteFiles<T> withNumShards(ValueProvider<Integer> numShardsProvider) {
-    return new WriteFiles<>(sink, null, numShardsProvider, windowedWrites);
+    return new WriteFiles<>(sink, null, numShardsProvider, windowedWrites,
+        maxNumWritersPerBundle);
+  }
+
+  /**
+   * Set the maximum number of writers created in a bundle before spilling to shuffle.
+   */
+  public WriteFiles<T> withMaxNumWritersPerBundle(int maxNumWritersPerBundle) {
+    return new WriteFiles<>(sink, null, numShardsProvider, windowedWrites,
+        maxNumWritersPerBundle);
   }
 
   /**
@@ -226,7 +257,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
   public WriteFiles<T> withSharding(PTransform<PCollection<T>, PCollectionView<Integer>> sharding) {
     checkNotNull(
         sharding, "Cannot provide null sharding. Use withRunnerDeterminedSharding() instead");
-    return new WriteFiles<>(sink, sharding, null, windowedWrites);
+    return new WriteFiles<>(sink, sharding, null, windowedWrites, maxNumWritersPerBundle);
   }
 
   /**
@@ -234,7 +265,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
    * runner-determined sharding.
    */
   public WriteFiles<T> withRunnerDeterminedSharding() {
-    return new WriteFiles<>(sink, null, null, windowedWrites);
+    return new WriteFiles<>(sink, null, null, windowedWrites, maxNumWritersPerBundle);
   }
 
   /**
@@ -252,7 +283,8 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
    * positive value.
    */
   public WriteFiles<T> withWindowedWrites() {
-    return new WriteFiles<>(sink, computeNumShards, numShardsProvider, true);
+    return new WriteFiles<>(sink, computeNumShards, numShardsProvider, true,
+        maxNumWritersPerBundle);
   }
 
   /**
@@ -260,7 +292,13 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
    * {@link WriteOperation} associated with the {@link FileBasedSink} with windowed writes enabled.
    */
   private class WriteWindowedBundles extends DoFn<T, FileResult> {
+    private final TupleTag<KV<Integer, T>> unwrittedRecordsTag;
     private Map<KV<BoundedWindow, PaneInfo>, Writer<T>> windowedWriters;
+    int spilledShardNum = UNKNOWN_SHARDNUM;
+
+    WriteWindowedBundles(TupleTag<KV<Integer, T>> unwrittedRecordsTag) {
+      this.unwrittedRecordsTag = unwrittedRecordsTag;
+    }
 
     @StartBundle
     public void startBundle(StartBundleContext c) {
@@ -277,19 +315,28 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
       KV<BoundedWindow, PaneInfo> key = KV.of(window, paneInfo);
       writer = windowedWriters.get(key);
       if (writer == null) {
-        String uuid = UUID.randomUUID().toString();
-        LOG.info(
-            "Opening writer {} for write operation {}, window {} pane {}",
-            uuid,
-            writeOperation,
-            window,
-            paneInfo);
-        writer = writeOperation.createWriter();
-        writer.openWindowed(uuid, window, paneInfo, UNKNOWN_SHARDNUM);
-        windowedWriters.put(key, writer);
-        LOG.debug("Done opening writer");
+        if (windowedWriters.size() <= maxNumWritersPerBundle) {
+          String uuid = UUID.randomUUID().toString();
+          LOG.info(
+              "Opening writer {} for write operation {}, window {} pane {}",
+              uuid,
+              writeOperation,
+              window,
+              paneInfo);
+          writer = writeOperation.createWriter();
+          writer.openWindowed(uuid, window, paneInfo, UNKNOWN_SHARDNUM);
+          windowedWriters.put(key, writer);
+          LOG.debug("Done opening writer");
+        } else {
+          if (spilledShardNum == UNKNOWN_SHARDNUM) {
+            spilledShardNum = ThreadLocalRandom.current().nextInt(SPILLED_RECORD_SHARDING_FACTOR);
+          } else {
+            spilledShardNum = (spilledShardNum + 1) % SPILLED_RECORD_SHARDING_FACTOR;
+          }
+          c.output(unwrittedRecordsTag, KV.of(spilledShardNum, c.element()));
+          return;
+        }
       }
-
       writeOrClose(writer, c.element());
     }
 
@@ -352,11 +399,17 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
     }
   }
 
+  enum ShardAssignment { ASSIGN_IN_FINALIZE, ASSIGN_WHEN_WRITING };
+
   /**
    * Like {@link WriteWindowedBundles} and {@link WriteUnwindowedBundles}, but where the elements
    * for each shard have been collected into a single iterable.
    */
   private class WriteShardedBundles extends DoFn<KV<Integer, Iterable<T>>, FileResult> {
+    ShardAssignment shardNumberAssignment;
+    WriteShardedBundles(ShardAssignment shardNumberAssignment) {
+      this.shardNumberAssignment = shardNumberAssignment;
+    }
     @ProcessElement
     public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
       // In a sharded write, single input element represents one shard. We can open and close
@@ -364,7 +417,9 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
       LOG.info("Opening writer for write operation {}", writeOperation);
       Writer<T> writer = writeOperation.createWriter();
       if (windowedWrites) {
-        writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), c.element().getKey());
+        int shardNumber = shardNumberAssignment == ShardAssignment.ASSIGN_WHEN_WRITING
+            ? c.element().getKey() : UNKNOWN_SHARDNUM;
+        writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), shardNumber);
       } else {
         writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM);
       }
@@ -493,14 +548,35 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
     // initial ParDo.
     PCollection<FileResult> results;
     final PCollectionView<Integer> numShardsView;
+    @SuppressWarnings("unchecked")
     Coder<BoundedWindow> shardedWindowCoder =
         (Coder<BoundedWindow>) input.getWindowingStrategy().getWindowFn().windowCoder();
     if (computeNumShards == null && numShardsProvider == null) {
       numShardsView = null;
-      results =
-          input.apply(
-              "WriteBundles",
-              ParDo.of(windowedWrites ? new WriteWindowedBundles() : new WriteUnwindowedBundles()));
+      if (windowedWrites) {
+        TupleTag<FileResult> writtenRecordsTag = new TupleTag<>("writtenRecordsTag");
+        TupleTag<KV<Integer, T>> unwrittedRecordsTag = new TupleTag<>("unwrittenRecordsTag");
+        PCollectionTuple writeTuple = input.apply("WriteWindowedBundles", ParDo.of(
+            new WriteWindowedBundles(unwrittedRecordsTag))
+            .withOutputTags(writtenRecordsTag, TupleTagList.of(unwrittedRecordsTag)));
+        PCollection<FileResult> writtenBundleFiles = writeTuple.get(writtenRecordsTag)
+            .setCoder(FileResultCoder.of(shardedWindowCoder));
+        // Any "spilled" elements are written using WriteShardedBundles. Assign shard numbers in
+        // finalize to stay consistent with what WriteWindowedBundles does.
+        PCollection<FileResult> writtenGroupedFiles =
+            writeTuple
+                .get(unwrittedRecordsTag)
+                .setCoder(KvCoder.of(VarIntCoder.of(), input.getCoder()))
+                .apply("GroupUnwritten", GroupByKey.<Integer, T>create())
+                .apply("WriteUnwritten", ParDo.of(
+                    new WriteShardedBundles(ShardAssignment.ASSIGN_IN_FINALIZE)))
+                .setCoder(FileResultCoder.of(shardedWindowCoder));
+        results = PCollectionList.of(writtenBundleFiles).and(writtenGroupedFiles)
+            .apply(Flatten.<FileResult>pCollections());
+      } else {
+        results =
+            input.apply("WriteUnwindowedBundles", ParDo.of(new WriteUnwindowedBundles()));
+      }
     } else {
       List<PCollectionView<?>> sideInputs = Lists.newArrayList();
       if (computeNumShards != null) {
@@ -517,10 +593,13 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
                       (numShardsView != null) ? null : numShardsProvider))
                   .withSideInputs(sideInputs))
               .apply("GroupIntoShards", GroupByKey.<Integer, T>create());
-      shardedWindowCoder =
-          (Coder<BoundedWindow>) sharded.getWindowingStrategy().getWindowFn().windowCoder();
-
-      results = sharded.apply("WriteShardedBundles", ParDo.of(new WriteShardedBundles()));
+      // Since this path might be used by streaming runners processing triggers, it's important
+      // to assign shard numbers here so that they are deterministic. The ASSIGN_IN_FINALIZE
+      // strategy works by sorting all FileResult objects and assigning them numbers, which is not
+      // guaranteed to work well when processing triggers - if the finalize step retries it might
+      // see a different Iterable of FileResult objects, and it will assign different shard numbers.
+      results = sharded.apply("WriteShardedBundles",
+          ParDo.of(new WriteShardedBundles(ShardAssignment.ASSIGN_WHEN_WRITING)));
     }
     results.setCoder(FileResultCoder.of(shardedWindowCoder));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/69b01a61/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java
index 206bc1f..904f3a2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java
@@ -20,8 +20,10 @@ package org.apache.beam.sdk.testing;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Hidden;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
@@ -50,6 +52,14 @@ public interface TestPipelineOptions extends PipelineOptions {
   Long getTestTimeoutSeconds();
   void setTestTimeoutSeconds(Long value);
 
+  @Default.Boolean(false)
+  @Internal
+  @Hidden
+  @org.apache.beam.sdk.options.Description(
+      "Indicates whether this is an automatically-run unit test.")
+  boolean isUnitTest();
+  void setUnitTest(boolean unitTest);
+
   /**
    * Factory for {@link PipelineResult} matchers which always pass.
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/69b01a61/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
index c97313d..bdf37f6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
@@ -40,6 +40,10 @@ class SimpleSink extends FileBasedSink<String> {
         writableByteChannelFactory);
   }
 
+  public SimpleSink(ResourceId baseOutputDirectory, FilenamePolicy filenamePolicy) {
+    super(StaticValueProvider.of(baseOutputDirectory), filenamePolicy);
+  }
+
   @Override
   public SimpleWriteOperation createWriteOperation() {
     return new SimpleWriteOperation(this);

http://git-wip-us.apache.org/repos/asf/beam/blob/69b01a61/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
index a5dacd1..e6a0dcf 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
@@ -41,6 +41,7 @@ import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.SimpleSink.SimpleWriter;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
@@ -62,12 +63,15 @@ import org.apache.beam.sdk.transforms.Top;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.joda.time.Duration;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -160,7 +164,7 @@ public class WriteFilesTest {
   public void testWrite() throws IOException {
     List<String> inputs = Arrays.asList("Critical canary", "Apprehensive eagle",
         "Intimidating pigeon", "Pedantic gull", "Frisky finch");
-    runWrite(inputs, IDENTITY_MAP, getBaseOutputFilename());
+    runWrite(inputs, IDENTITY_MAP, getBaseOutputFilename(), WriteFiles.to(makeSimpleSink()));
   }
 
   /**
@@ -169,7 +173,8 @@ public class WriteFilesTest {
   @Test
   @Category(NeedsRunner.class)
   public void testEmptyWrite() throws IOException {
-    runWrite(Collections.<String>emptyList(), IDENTITY_MAP, getBaseOutputFilename());
+    runWrite(Collections.<String>emptyList(), IDENTITY_MAP, getBaseOutputFilename(),
+        WriteFiles.to(makeSimpleSink()));
     checkFileContents(getBaseOutputFilename(), Collections.<String>emptyList(),
         Optional.of(1));
   }
@@ -185,7 +190,7 @@ public class WriteFilesTest {
         Arrays.asList("one", "two", "three", "four", "five", "six"),
         IDENTITY_MAP,
         getBaseOutputFilename(),
-        Optional.of(1));
+        WriteFiles.to(makeSimpleSink()).withNumShards(1));
   }
 
   private ResourceId getBaseOutputDirectory() {
@@ -194,7 +199,8 @@ public class WriteFilesTest {
 
   }
   private SimpleSink makeSimpleSink() {
-    return new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", "simple");
+    FilenamePolicy filenamePolicy = new PerWindowFiles("file", "simple");
+    return new SimpleSink(getBaseOutputDirectory(), filenamePolicy);
   }
 
   @Test
@@ -235,7 +241,7 @@ public class WriteFilesTest {
         Arrays.asList("one", "two", "three", "four", "five", "six"),
         IDENTITY_MAP,
         getBaseOutputFilename(),
-        Optional.of(20));
+        WriteFiles.to(makeSimpleSink()).withNumShards(20));
   }
 
   /**
@@ -245,7 +251,7 @@ public class WriteFilesTest {
   @Category(NeedsRunner.class)
   public void testWriteWithEmptyPCollection() throws IOException {
     List<String> inputs = new ArrayList<>();
-    runWrite(inputs, IDENTITY_MAP, getBaseOutputFilename());
+    runWrite(inputs, IDENTITY_MAP, getBaseOutputFilename(), WriteFiles.to(makeSimpleSink()));
   }
 
   /**
@@ -258,7 +264,7 @@ public class WriteFilesTest {
         "Intimidating pigeon", "Pedantic gull", "Frisky finch");
     runWrite(
         inputs, new WindowAndReshuffle<>(Window.<String>into(FixedWindows.of(Duration.millis(2)))),
-        getBaseOutputFilename());
+        getBaseOutputFilename(), WriteFiles.to(makeSimpleSink()));
   }
 
   /**
@@ -274,10 +280,23 @@ public class WriteFilesTest {
         inputs,
         new WindowAndReshuffle<>(
             Window.<String>into(Sessions.withGapDuration(Duration.millis(1)))),
-        getBaseOutputFilename());
+        getBaseOutputFilename(),
+        WriteFiles.to(makeSimpleSink()));
   }
 
   @Test
+  @Category(NeedsRunner.class)
+  public void testWriteSpilling() throws IOException {
+    List<String> inputs = Lists.newArrayList();
+    for (int i = 0; i < 100; ++i) {
+      inputs.add("mambo_number_" + i);
+    }
+    runWrite(
+        inputs, Window.<String>into(FixedWindows.of(Duration.millis(2))),
+        getBaseOutputFilename(),
+        WriteFiles.to(makeSimpleSink()).withMaxNumWritersPerBundle(2).withWindowedWrites());
+  }
+
   public void testBuildWrite() {
     SimpleSink sink = makeSimpleSink();
     WriteFiles<String> write = WriteFiles.to(sink).withNumShards(3);
@@ -365,8 +384,45 @@ public class WriteFilesTest {
    */
   private void runWrite(
       List<String> inputs, PTransform<PCollection<String>, PCollection<String>> transform,
-      String baseName) throws IOException {
-    runShardedWrite(inputs, transform, baseName, Optional.<Integer>absent());
+      String baseName, WriteFiles<String> write) throws IOException {
+    runShardedWrite(inputs, transform, baseName, write);
+  }
+
+  private static class PerWindowFiles extends FilenamePolicy {
+    private static final DateTimeFormatter FORMATTER = ISODateTimeFormat.hourMinuteSecondMillis();
+    private final String prefix;
+    private final String suffix;
+
+    public PerWindowFiles(String prefix, String suffix) {
+      this.prefix = prefix;
+      this.suffix = suffix;
+    }
+
+    public String filenamePrefixForWindow(IntervalWindow window) {
+      return String.format("%s%s-%s",
+          prefix, FORMATTER.print(window.start()), FORMATTER.print(window.end()));
+    }
+
+    @Override
+    public ResourceId windowedFilename(
+        ResourceId outputDirectory, WindowedContext context, String extension) {
+      IntervalWindow window = (IntervalWindow) context.getWindow();
+      String filename = String.format(
+          "%s-%s-of-%s%s%s",
+          filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards(),
+          extension, suffix);
+      return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
+    }
+
+    @Override
+    public ResourceId unwindowedFilename(
+        ResourceId outputDirectory, Context context, String extension) {
+      String filename = String.format(
+          "%s%s-of-%s%s%s",
+          prefix, context.getShardNumber(), context.getNumShards(),
+          extension, suffix);
+      return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
+    }
   }
 
   /**
@@ -379,7 +435,7 @@ public class WriteFilesTest {
       List<String> inputs,
       PTransform<PCollection<String>, PCollection<String>> transform,
       String baseName,
-      Optional<Integer> numConfiguredShards) throws IOException {
+      WriteFiles<String> write) throws IOException {
     // Flag to validate that the pipeline options are passed to the Sink
     WriteOptions options = TestPipeline.testingPipelineOptions().as(WriteOptions.class);
     options.setTestFlag("test_value");
@@ -390,18 +446,15 @@ public class WriteFilesTest {
     for (long i = 0; i < inputs.size(); i++) {
       timestamps.add(i + 1);
     }
-
-    SimpleSink sink = makeSimpleSink();
-    WriteFiles<String> write = WriteFiles.to(sink);
-    if (numConfiguredShards.isPresent()) {
-      write = write.withNumShards(numConfiguredShards.get());
-    }
     p.apply(Create.timestamped(inputs, timestamps).withCoder(StringUtf8Coder.of()))
         .apply(transform)
         .apply(write);
     p.run();
 
-    checkFileContents(baseName, inputs, numConfiguredShards);
+    Optional<Integer> numShards =
+        (write.getNumShards() != null)
+            ? Optional.of(write.getNumShards().get()) : Optional.<Integer>absent();
+    checkFileContents(baseName, inputs, numShards);
   }
 
   static void checkFileContents(String baseName, List<String> inputs,


[2/2] beam git commit: This closes #3161: [BEAM-2302] Add spilling code to WriteFiles.

Posted by jk...@apache.org.
This closes #3161: [BEAM-2302] Add spilling code to WriteFiles.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/698b89e2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/698b89e2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/698b89e2

Branch: refs/heads/master
Commit: 698b89e2b5b88403a5c762b039d3ec8c48b25b26
Parents: 10e4764 69b01a6
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Jun 20 14:28:39 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Jun 20 14:28:39 2017 -0700

----------------------------------------------------------------------
 runners/direct-java/pom.xml                     |   3 +-
 .../beam/runners/direct/DirectRunner.java       |  28 ++--
 .../java/org/apache/beam/sdk/io/WriteFiles.java | 133 +++++++++++++++----
 .../beam/sdk/testing/TestPipelineOptions.java   |  10 ++
 .../java/org/apache/beam/sdk/io/SimpleSink.java |   4 +
 .../org/apache/beam/sdk/io/WriteFilesTest.java  |  89 ++++++++++---
 6 files changed, 209 insertions(+), 58 deletions(-)
----------------------------------------------------------------------