You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/06/21 00:56:22 UTC

[1/2] beam git commit: BEAM-1438 Auto shard streaming sinks

Repository: beam
Updated Branches:
  refs/heads/master 223dbb449 -> 7b4f5eeaa


BEAM-1438 Auto shard streaming sinks

If a Write operation in streaming requests runner-determined sharding,
make the Dataflow runner default to maxNumWorkers * 2 shards.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cbb922c8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cbb922c8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cbb922c8

Branch: refs/heads/master
Commit: cbb922c8a72680c5b8b4299197b515abf650bfdf
Parents: 223dbb4
Author: Reuven Lax <re...@google.com>
Authored: Wed Feb 8 12:53:27 2017 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Jun 20 17:42:16 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   | 57 +++++++++++++++++++
 .../runners/dataflow/DataflowRunnerTest.java    | 60 +++++++++++++++++++-
 2 files changed, 114 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/cbb922c8/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index c584b31..1741287 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -67,10 +67,12 @@ import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
 import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource;
 import org.apache.beam.runners.core.construction.UnconsumedReads;
+import org.apache.beam.runners.core.construction.WriteFilesTranslation;
 import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
 import org.apache.beam.runners.dataflow.StreamingViewOverrides.StreamingCreatePCollectionViewFactory;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
 import org.apache.beam.runners.dataflow.util.DataflowTemplateJob;
 import org.apache.beam.runners.dataflow.util.DataflowTransport;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil;
@@ -91,6 +93,7 @@ import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.io.WriteFiles;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
@@ -340,6 +343,10 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
                   PTransformMatchers.splittableParDoMulti(),
                   new SplittableParDoOverrides.SplittableParDoOverrideFactory()))
           .add(
+              PTransformOverride.of(
+                  PTransformMatchers.writeWithRunnerDeterminedSharding(),
+                  new StreamingShardedWriteFactory(options)))
+          .add(
               // Streaming Bounded Read is implemented in terms of Streaming Unbounded Read, and
               // must precede it
               PTransformOverride.of(
@@ -1443,6 +1450,56 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   }
 
   @VisibleForTesting
+  static class StreamingShardedWriteFactory<T>
+      implements PTransformOverrideFactory<PCollection<T>, PDone, WriteFiles<T>> {
+    // We pick 10 as a a default, as it works well with the default number of workers started
+    // by Dataflow.
+    static final int DEFAULT_NUM_SHARDS = 10;
+    DataflowPipelineWorkerPoolOptions options;
+
+    StreamingShardedWriteFactory(PipelineOptions options) {
+      this.options = options.as(DataflowPipelineWorkerPoolOptions.class);
+    }
+
+    @Override
+    public PTransformReplacement<PCollection<T>, PDone> getReplacementTransform(
+        AppliedPTransform<PCollection<T>, PDone, WriteFiles<T>> transform) {
+      // By default, if numShards is not set WriteFiles will produce one file per bundle. In
+      // streaming, there are large numbers of small bundles, resulting in many tiny files.
+      // Instead we pick max workers * 2 to ensure full parallelism, but prevent too-many files.
+      // (current_num_workers * 2 might be a better choice, but that value is not easily available
+      // today).
+      // If the user does not set either numWorkers or maxNumWorkers, default to 10 shards.
+      int numShards;
+      if (options.getMaxNumWorkers() > 0) {
+        numShards = options.getMaxNumWorkers() * 2;
+      } else if (options.getNumWorkers() > 0) {
+        numShards = options.getNumWorkers() * 2;
+      } else {
+        numShards = DEFAULT_NUM_SHARDS;
+      }
+
+      try {
+        WriteFiles<T> replacement = WriteFiles.to(WriteFilesTranslation.getSink(transform));
+        if (WriteFilesTranslation.isWindowedWrites(transform)) {
+          replacement = replacement.withWindowedWrites();
+        }
+        return PTransformReplacement.of(
+            PTransformReplacements.getSingletonMainInput(transform),
+            replacement.withNumShards(numShards));
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public Map<PValue, ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> outputs,
+                                                     PDone newOutput) {
+      return Collections.emptyMap();
+    }
+  }
+
+  @VisibleForTesting
   static String getContainerImageForJob(DataflowPipelineOptions options) {
     String workerHarnessContainerImage = options.getWorkerHarnessContainerImage();
     if (!workerHarnessContainerImage.contains("IMAGE")) {

http://git-wip-us.apache.org/repos/asf/beam/blob/cbb922c8/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 8f10b18..aae21cf 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -23,6 +23,7 @@ import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.startsWith;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -62,21 +63,28 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Pattern;
+import org.apache.beam.runners.dataflow.DataflowRunner.StreamingShardedWriteFactory;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.extensions.gcp.auth.NoopCredentialFactory;
 import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
 import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator;
+import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.WriteFiles;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.runners.TransformHierarchy.Node;
 import org.apache.beam.sdk.testing.ExpectedLogs;
@@ -87,7 +95,10 @@ import org.apache.beam.sdk.util.GcsUtil;
 import org.apache.beam.sdk.util.ReleaseInfo;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.hamcrest.Description;
 import org.hamcrest.Matchers;
@@ -823,7 +834,6 @@ public class DataflowRunnerTest {
     DataflowRunner.fromOptions(options);
   }
 
-
   @Test
   public void testValidProfileLocation() throws IOException {
     DataflowPipelineOptions options = buildPipelineOptions();
@@ -1047,8 +1057,8 @@ public class DataflowRunnerTest {
   }
 
   /**
-   * Tests that the {@link DataflowRunner} with {@code --templateLocation} returns normally
-   * when the runner issuccessfully run.
+   * Tests that the {@link DataflowRunner} with {@code --templateLocation} returns normally when the
+   * runner is successfully run.
    */
   @Test
   public void testTemplateRunnerFullCompletion() throws Exception {
@@ -1127,4 +1137,48 @@ public class DataflowRunnerTest {
     assertThat(
         getContainerImageForJob(options), equalTo("gcr.io/java/foo"));
   }
+
+  @Test
+  public void testStreamingWriteWithNoShardingReturnsNewTransform() {
+    PipelineOptions options = TestPipeline.testingPipelineOptions();
+    options.as(DataflowPipelineWorkerPoolOptions.class).setMaxNumWorkers(10);
+    testStreamingWriteOverride(options, 20);
+  }
+
+  @Test
+  public void testStreamingWriteWithNoShardingReturnsNewTransformMaxWorkersUnset() {
+    PipelineOptions options = TestPipeline.testingPipelineOptions();
+    testStreamingWriteOverride(options, StreamingShardedWriteFactory.DEFAULT_NUM_SHARDS);
+  }
+
+  private void testStreamingWriteOverride(PipelineOptions options, int expectedNumShards) {
+    TestPipeline p = TestPipeline.fromOptions(options);
+
+    StreamingShardedWriteFactory<Object> factory =
+        new StreamingShardedWriteFactory<>(p.getOptions());
+    WriteFiles<Object> original = WriteFiles.to(new TestSink(tmpFolder.toString()));
+    PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));
+    AppliedPTransform<PCollection<Object>, PDone, WriteFiles<Object>> originalApplication =
+        AppliedPTransform.of(
+            "writefiles", objs.expand(), Collections.<TupleTag<?>, PValue>emptyMap(), original, p);
+
+    WriteFiles<Object> replacement = (WriteFiles<Object>)
+        factory.getReplacementTransform(originalApplication).getTransform();
+    assertThat(replacement, not(equalTo((Object) original)));
+    assertThat(replacement.getNumShards().get(), equalTo(expectedNumShards));
+  }
+
+  private static class TestSink extends FileBasedSink<Object> {
+    @Override
+    public void validate(PipelineOptions options) {}
+
+    TestSink(String tmpFolder) {
+      super(StaticValueProvider.of(FileSystems.matchNewResource(tmpFolder, true)),
+          null);
+    }
+    @Override
+    public WriteOperation<Object> createWriteOperation() {
+      throw new IllegalArgumentException("Should not be used");
+    }
+  }
 }


[2/2] beam git commit: This closes #1952: BEAM-1438 Auto shard streaming sinks

Posted by jk...@apache.org.
This closes #1952: BEAM-1438 Auto shard streaming sinks


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7b4f5eea
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7b4f5eea
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7b4f5eea

Branch: refs/heads/master
Commit: 7b4f5eeaa273bc22beba18a62006ead8bc096d50
Parents: 223dbb4 cbb922c
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Jun 20 17:42:52 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Jun 20 17:42:52 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   | 57 +++++++++++++++++++
 .../runners/dataflow/DataflowRunnerTest.java    | 60 +++++++++++++++++++-
 2 files changed, 114 insertions(+), 3 deletions(-)
----------------------------------------------------------------------