You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/08/17 23:25:03 UTC
[3/4] incubator-beam git commit: Set Gcs upload buffer size to 1M in
streaming mode in DataflowRunner
Set Gcs upload buffer size to 1M in streaming mode in DataflowRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/da3081a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/da3081a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/da3081a6
Branch: refs/heads/master
Commit: da3081a68c82c6f22ee382dfe0ffe1bd6be5d0e2
Parents: d93ef2e
Author: Pei He <pe...@google.com>
Authored: Mon Aug 15 12:22:11 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Aug 17 16:24:39 2016 -0700
----------------------------------------------------------------------
.../beam/runners/dataflow/DataflowRunner.java | 6 ++++
.../runners/dataflow/DataflowRunnerTest.java | 31 ++++++++++++++++++++
2 files changed, 37 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/da3081a6/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 6222289..6f8180e 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -125,6 +125,7 @@ import com.google.api.services.dataflow.model.DataflowPackage;
import com.google.api.services.dataflow.model.Job;
import com.google.api.services.dataflow.model.ListJobsResponse;
import com.google.api.services.dataflow.model.WorkerPool;
+import com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
@@ -309,6 +310,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
+ "' invalid. Please make sure the value is non-negative.");
}
+ if (dataflowOptions.isStreaming() && dataflowOptions.getGcsUploadBufferSizeBytes() == null) {
+ dataflowOptions.setGcsUploadBufferSizeBytes(
+ AbstractGoogleAsyncWriteChannel.UPLOAD_PIPE_BUFFER_SIZE_DEFAULT);
+ }
+
return new DataflowRunner(dataflowOptions);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/da3081a6/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index d7deffd..6f1653b 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -89,6 +89,7 @@ import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.DataflowPackage;
import com.google.api.services.dataflow.model.Job;
import com.google.api.services.dataflow.model.ListJobsResponse;
+import com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -795,6 +796,36 @@ public class DataflowRunnerTest {
}
}
+ @Test
+ public void testGcsUploadBufferSizeDefault() throws IOException {
+ DataflowPipelineOptions batchOptions = buildPipelineOptions();
+ DataflowRunner.fromOptions(batchOptions);
+ assertNull(batchOptions.getGcsUploadBufferSizeBytes());
+
+ DataflowPipelineOptions streamingOptions = buildPipelineOptions();
+ streamingOptions.setStreaming(true);
+ DataflowRunner.fromOptions(streamingOptions);
+ assertEquals(
+ AbstractGoogleAsyncWriteChannel.UPLOAD_PIPE_BUFFER_SIZE_DEFAULT,
+ streamingOptions.getGcsUploadBufferSizeBytes().intValue());
+ }
+
+ @Test
+ public void testGcsUploadBufferSize() throws IOException {
+ int gcsUploadBufferSizeBytes = 12345678;
+ DataflowPipelineOptions batchOptions = buildPipelineOptions();
+ batchOptions.setGcsUploadBufferSizeBytes(gcsUploadBufferSizeBytes);
+ DataflowRunner.fromOptions(batchOptions);
+ assertEquals(gcsUploadBufferSizeBytes, batchOptions.getGcsUploadBufferSizeBytes().intValue());
+
+ DataflowPipelineOptions streamingOptions = buildPipelineOptions();
+ streamingOptions.setStreaming(true);
+ streamingOptions.setGcsUploadBufferSizeBytes(gcsUploadBufferSizeBytes);
+ DataflowRunner.fromOptions(streamingOptions);
+ assertEquals(
+ gcsUploadBufferSizeBytes, streamingOptions.getGcsUploadBufferSizeBytes().intValue());
+ }
+
/**
* A fake PTransform for testing.
*/