You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/06/21 00:56:22 UTC
[1/2] beam git commit: BEAM-1438 Auto shard streaming sinks
Repository: beam
Updated Branches:
refs/heads/master 223dbb449 -> 7b4f5eeaa
BEAM-1438 Auto shard streaming sinks
If a Write operation in streaming requests runner-determined sharding,
make the Dataflow runner default to maxNumWorkers * 2 shards.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cbb922c8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cbb922c8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cbb922c8
Branch: refs/heads/master
Commit: cbb922c8a72680c5b8b4299197b515abf650bfdf
Parents: 223dbb4
Author: Reuven Lax <re...@google.com>
Authored: Wed Feb 8 12:53:27 2017 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Jun 20 17:42:16 2017 -0700
----------------------------------------------------------------------
.../beam/runners/dataflow/DataflowRunner.java | 57 +++++++++++++++++++
.../runners/dataflow/DataflowRunnerTest.java | 60 +++++++++++++++++++-
2 files changed, 114 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/cbb922c8/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index c584b31..1741287 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -67,10 +67,12 @@ import org.apache.beam.runners.core.construction.ReplacementOutputs;
import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource;
import org.apache.beam.runners.core.construction.UnconsumedReads;
+import org.apache.beam.runners.core.construction.WriteFilesTranslation;
import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
import org.apache.beam.runners.dataflow.StreamingViewOverrides.StreamingCreatePCollectionViewFactory;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
import org.apache.beam.runners.dataflow.util.DataflowTemplateJob;
import org.apache.beam.runners.dataflow.util.DataflowTransport;
import org.apache.beam.runners.dataflow.util.MonitoringUtil;
@@ -91,6 +93,7 @@ import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
@@ -340,6 +343,10 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
PTransformMatchers.splittableParDoMulti(),
new SplittableParDoOverrides.SplittableParDoOverrideFactory()))
.add(
+ PTransformOverride.of(
+ PTransformMatchers.writeWithRunnerDeterminedSharding(),
+ new StreamingShardedWriteFactory(options)))
+ .add(
// Streaming Bounded Read is implemented in terms of Streaming Unbounded Read, and
// must precede it
PTransformOverride.of(
@@ -1443,6 +1450,56 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
@VisibleForTesting
+ static class StreamingShardedWriteFactory<T>
+ implements PTransformOverrideFactory<PCollection<T>, PDone, WriteFiles<T>> {
+ // We pick 10 as a a default, as it works well with the default number of workers started
+ // by Dataflow.
+ static final int DEFAULT_NUM_SHARDS = 10;
+ DataflowPipelineWorkerPoolOptions options;
+
+ StreamingShardedWriteFactory(PipelineOptions options) {
+ this.options = options.as(DataflowPipelineWorkerPoolOptions.class);
+ }
+
+ @Override
+ public PTransformReplacement<PCollection<T>, PDone> getReplacementTransform(
+ AppliedPTransform<PCollection<T>, PDone, WriteFiles<T>> transform) {
+ // By default, if numShards is not set WriteFiles will produce one file per bundle. In
+ // streaming, there are large numbers of small bundles, resulting in many tiny files.
+ // Instead we pick max workers * 2 to ensure full parallelism, but prevent too-many files.
+ // (current_num_workers * 2 might be a better choice, but that value is not easily available
+ // today).
+ // If the user does not set either numWorkers or maxNumWorkers, default to 10 shards.
+ int numShards;
+ if (options.getMaxNumWorkers() > 0) {
+ numShards = options.getMaxNumWorkers() * 2;
+ } else if (options.getNumWorkers() > 0) {
+ numShards = options.getNumWorkers() * 2;
+ } else {
+ numShards = DEFAULT_NUM_SHARDS;
+ }
+
+ try {
+ WriteFiles<T> replacement = WriteFiles.to(WriteFilesTranslation.getSink(transform));
+ if (WriteFilesTranslation.isWindowedWrites(transform)) {
+ replacement = replacement.withWindowedWrites();
+ }
+ return PTransformReplacement.of(
+ PTransformReplacements.getSingletonMainInput(transform),
+ replacement.withNumShards(numShards));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Map<PValue, ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> outputs,
+ PDone newOutput) {
+ return Collections.emptyMap();
+ }
+ }
+
+ @VisibleForTesting
static String getContainerImageForJob(DataflowPipelineOptions options) {
String workerHarnessContainerImage = options.getWorkerHarnessContainerImage();
if (!workerHarnessContainerImage.contains("IMAGE")) {
http://git-wip-us.apache.org/repos/asf/beam/blob/cbb922c8/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 8f10b18..aae21cf 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -23,6 +23,7 @@ import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -62,21 +63,28 @@ import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
+import org.apache.beam.runners.dataflow.DataflowRunner.StreamingShardedWriteFactory;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.extensions.gcp.auth.NoopCredentialFactory;
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator;
+import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.testing.ExpectedLogs;
@@ -87,7 +95,10 @@ import org.apache.beam.sdk.util.GcsUtil;
import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.hamcrest.Description;
import org.hamcrest.Matchers;
@@ -823,7 +834,6 @@ public class DataflowRunnerTest {
DataflowRunner.fromOptions(options);
}
-
@Test
public void testValidProfileLocation() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
@@ -1047,8 +1057,8 @@ public class DataflowRunnerTest {
}
/**
- * Tests that the {@link DataflowRunner} with {@code --templateLocation} returns normally
- * when the runner issuccessfully run.
+ * Tests that the {@link DataflowRunner} with {@code --templateLocation} returns normally when the
+ * runner is successfully run.
*/
@Test
public void testTemplateRunnerFullCompletion() throws Exception {
@@ -1127,4 +1137,48 @@ public class DataflowRunnerTest {
assertThat(
getContainerImageForJob(options), equalTo("gcr.io/java/foo"));
}
+
+ @Test
+ public void testStreamingWriteWithNoShardingReturnsNewTransform() {
+ PipelineOptions options = TestPipeline.testingPipelineOptions();
+ options.as(DataflowPipelineWorkerPoolOptions.class).setMaxNumWorkers(10);
+ testStreamingWriteOverride(options, 20);
+ }
+
+ @Test
+ public void testStreamingWriteWithNoShardingReturnsNewTransformMaxWorkersUnset() {
+ PipelineOptions options = TestPipeline.testingPipelineOptions();
+ testStreamingWriteOverride(options, StreamingShardedWriteFactory.DEFAULT_NUM_SHARDS);
+ }
+
+ private void testStreamingWriteOverride(PipelineOptions options, int expectedNumShards) {
+ TestPipeline p = TestPipeline.fromOptions(options);
+
+ StreamingShardedWriteFactory<Object> factory =
+ new StreamingShardedWriteFactory<>(p.getOptions());
+ WriteFiles<Object> original = WriteFiles.to(new TestSink(tmpFolder.toString()));
+ PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));
+ AppliedPTransform<PCollection<Object>, PDone, WriteFiles<Object>> originalApplication =
+ AppliedPTransform.of(
+ "writefiles", objs.expand(), Collections.<TupleTag<?>, PValue>emptyMap(), original, p);
+
+ WriteFiles<Object> replacement = (WriteFiles<Object>)
+ factory.getReplacementTransform(originalApplication).getTransform();
+ assertThat(replacement, not(equalTo((Object) original)));
+ assertThat(replacement.getNumShards().get(), equalTo(expectedNumShards));
+ }
+
+ private static class TestSink extends FileBasedSink<Object> {
+ @Override
+ public void validate(PipelineOptions options) {}
+
+ TestSink(String tmpFolder) {
+ super(StaticValueProvider.of(FileSystems.matchNewResource(tmpFolder, true)),
+ null);
+ }
+ @Override
+ public WriteOperation<Object> createWriteOperation() {
+ throw new IllegalArgumentException("Should not be used");
+ }
+ }
}
[2/2] beam git commit: This closes #1952: BEAM-1438 Auto shard
streaming sinks
Posted by jk...@apache.org.
This closes #1952: BEAM-1438 Auto shard streaming sinks
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7b4f5eea
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7b4f5eea
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7b4f5eea
Branch: refs/heads/master
Commit: 7b4f5eeaa273bc22beba18a62006ead8bc096d50
Parents: 223dbb4 cbb922c
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Jun 20 17:42:52 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Jun 20 17:42:52 2017 -0700
----------------------------------------------------------------------
.../beam/runners/dataflow/DataflowRunner.java | 57 +++++++++++++++++++
.../runners/dataflow/DataflowRunnerTest.java | 60 +++++++++++++++++++-
2 files changed, 114 insertions(+), 3 deletions(-)
----------------------------------------------------------------------