You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/08/17 23:25:02 UTC
[2/4] incubator-beam git commit: addressed feedback
addressed feedback
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0e088b7f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0e088b7f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0e088b7f
Branch: refs/heads/master
Commit: 0e088b7fcb2b35d7fdc5125d4dc66e9fa6ae7ffd
Parents: da3081a
Author: Pei He <pe...@google.com>
Authored: Wed Aug 17 13:56:37 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Aug 17 16:24:39 2016 -0700
----------------------------------------------------------------------
.../beam/runners/dataflow/DataflowRunner.java | 6 ++++--
.../runners/dataflow/DataflowRunnerTest.java | 21 +++++++++++++-------
2 files changed, 18 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0e088b7f/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 6f8180e..1a845ea 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -219,6 +219,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
// The limit of CreateJob request size.
private static final int CREATE_JOB_REQUEST_LIMIT_BYTES = 10 * 1024 * 1024;
+ @VisibleForTesting
+ static final int GCS_UPLOAD_BUFFER_SIZE_BYTES_DEFAULT = 1 * 1024 * 1024;
+
private final Set<PCollection<?>> pcollectionsRequiringIndexedFormat;
/**
@@ -311,8 +314,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
if (dataflowOptions.isStreaming() && dataflowOptions.getGcsUploadBufferSizeBytes() == null) {
- dataflowOptions.setGcsUploadBufferSizeBytes(
- AbstractGoogleAsyncWriteChannel.UPLOAD_PIPE_BUFFER_SIZE_DEFAULT);
+ dataflowOptions.setGcsUploadBufferSizeBytes(GCS_UPLOAD_BUFFER_SIZE_BYTES_DEFAULT);
}
return new DataflowRunner(dataflowOptions);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0e088b7f/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 6f1653b..58b9878 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -797,31 +797,38 @@ public class DataflowRunnerTest {
}
@Test
- public void testGcsUploadBufferSizeDefault() throws IOException {
+ public void testGcsUploadBufferSizeIsUnsetForBatchWhenDefault() throws IOException {
DataflowPipelineOptions batchOptions = buildPipelineOptions();
- DataflowRunner.fromOptions(batchOptions);
+ batchOptions.setRunner(DataflowRunner.class);
+ Pipeline.create(batchOptions);
assertNull(batchOptions.getGcsUploadBufferSizeBytes());
+ }
+ @Test
+ public void testGcsUploadBufferSizeIsSetForStreamingWhenDefault() throws IOException {
DataflowPipelineOptions streamingOptions = buildPipelineOptions();
streamingOptions.setStreaming(true);
- DataflowRunner.fromOptions(streamingOptions);
+ streamingOptions.setRunner(DataflowRunner.class);
+ Pipeline.create(streamingOptions);
assertEquals(
- AbstractGoogleAsyncWriteChannel.UPLOAD_PIPE_BUFFER_SIZE_DEFAULT,
+ DataflowRunner.GCS_UPLOAD_BUFFER_SIZE_BYTES_DEFAULT,
streamingOptions.getGcsUploadBufferSizeBytes().intValue());
}
@Test
- public void testGcsUploadBufferSize() throws IOException {
+ public void testGcsUploadBufferSizeUnchangedWhenNotDefault() throws IOException {
int gcsUploadBufferSizeBytes = 12345678;
DataflowPipelineOptions batchOptions = buildPipelineOptions();
batchOptions.setGcsUploadBufferSizeBytes(gcsUploadBufferSizeBytes);
- DataflowRunner.fromOptions(batchOptions);
+ batchOptions.setRunner(DataflowRunner.class);
+ Pipeline.create(batchOptions);
assertEquals(gcsUploadBufferSizeBytes, batchOptions.getGcsUploadBufferSizeBytes().intValue());
DataflowPipelineOptions streamingOptions = buildPipelineOptions();
streamingOptions.setStreaming(true);
streamingOptions.setGcsUploadBufferSizeBytes(gcsUploadBufferSizeBytes);
- DataflowRunner.fromOptions(streamingOptions);
+ streamingOptions.setRunner(DataflowRunner.class);
+ Pipeline.create(streamingOptions);
assertEquals(
gcsUploadBufferSizeBytes, streamingOptions.getGcsUploadBufferSizeBytes().intValue());
}