You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/08/17 23:25:01 UTC

[1/4] incubator-beam git commit: fix unused imports

Repository: incubator-beam
Updated Branches:
  refs/heads/master d93ef2edd -> a07648bb6


fix unused imports


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d9ff2e42
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d9ff2e42
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d9ff2e42

Branch: refs/heads/master
Commit: d9ff2e42339e04358c66308bd292a5a460547f77
Parents: 0e088b7
Author: Pei He <pe...@google.com>
Authored: Wed Aug 17 14:30:23 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Aug 17 16:24:39 2016 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/beam/runners/dataflow/DataflowRunner.java  | 1 -
 .../java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java   | 1 -
 2 files changed, 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9ff2e42/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 1a845ea..c4dd703 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -125,7 +125,6 @@ import com.google.api.services.dataflow.model.DataflowPackage;
 import com.google.api.services.dataflow.model.Job;
 import com.google.api.services.dataflow.model.ListJobsResponse;
 import com.google.api.services.dataflow.model.WorkerPool;
-import com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9ff2e42/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 58b9878..92a6bcb 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -89,7 +89,6 @@ import com.google.api.services.dataflow.Dataflow;
 import com.google.api.services.dataflow.model.DataflowPackage;
 import com.google.api.services.dataflow.model.Job;
 import com.google.api.services.dataflow.model.ListJobsResponse;
-import com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;


[2/4] incubator-beam git commit: addressed feedback

Posted by lc...@apache.org.
addressed feedback


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0e088b7f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0e088b7f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0e088b7f

Branch: refs/heads/master
Commit: 0e088b7fcb2b35d7fdc5125d4dc66e9fa6ae7ffd
Parents: da3081a
Author: Pei He <pe...@google.com>
Authored: Wed Aug 17 13:56:37 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Aug 17 16:24:39 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   |  6 ++++--
 .../runners/dataflow/DataflowRunnerTest.java    | 21 +++++++++++++-------
 2 files changed, 18 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0e088b7f/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 6f8180e..1a845ea 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -219,6 +219,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   // The limit of CreateJob request size.
   private static final int CREATE_JOB_REQUEST_LIMIT_BYTES = 10 * 1024 * 1024;
 
+  @VisibleForTesting
+  static final int GCS_UPLOAD_BUFFER_SIZE_BYTES_DEFAULT = 1 * 1024 * 1024;
+
   private final Set<PCollection<?>> pcollectionsRequiringIndexedFormat;
 
   /**
@@ -311,8 +314,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     if (dataflowOptions.isStreaming() && dataflowOptions.getGcsUploadBufferSizeBytes() == null) {
-      dataflowOptions.setGcsUploadBufferSizeBytes(
-          AbstractGoogleAsyncWriteChannel.UPLOAD_PIPE_BUFFER_SIZE_DEFAULT);
+      dataflowOptions.setGcsUploadBufferSizeBytes(GCS_UPLOAD_BUFFER_SIZE_BYTES_DEFAULT);
     }
 
     return new DataflowRunner(dataflowOptions);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0e088b7f/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 6f1653b..58b9878 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -797,31 +797,38 @@ public class DataflowRunnerTest {
   }
 
   @Test
-  public void testGcsUploadBufferSizeDefault() throws IOException {
+  public void testGcsUploadBufferSizeIsUnsetForBatchWhenDefault() throws IOException {
     DataflowPipelineOptions batchOptions = buildPipelineOptions();
-    DataflowRunner.fromOptions(batchOptions);
+    batchOptions.setRunner(DataflowRunner.class);
+    Pipeline.create(batchOptions);
     assertNull(batchOptions.getGcsUploadBufferSizeBytes());
+  }
 
+  @Test
+  public void testGcsUploadBufferSizeIsSetForStreamingWhenDefault() throws IOException {
     DataflowPipelineOptions streamingOptions = buildPipelineOptions();
     streamingOptions.setStreaming(true);
-    DataflowRunner.fromOptions(streamingOptions);
+    streamingOptions.setRunner(DataflowRunner.class);
+    Pipeline.create(streamingOptions);
     assertEquals(
-        AbstractGoogleAsyncWriteChannel.UPLOAD_PIPE_BUFFER_SIZE_DEFAULT,
+        DataflowRunner.GCS_UPLOAD_BUFFER_SIZE_BYTES_DEFAULT,
         streamingOptions.getGcsUploadBufferSizeBytes().intValue());
   }
 
   @Test
-  public void testGcsUploadBufferSize() throws IOException {
+  public void testGcsUploadBufferSizeUnchangedWhenNotDefault() throws IOException {
     int gcsUploadBufferSizeBytes = 12345678;
     DataflowPipelineOptions batchOptions = buildPipelineOptions();
     batchOptions.setGcsUploadBufferSizeBytes(gcsUploadBufferSizeBytes);
-    DataflowRunner.fromOptions(batchOptions);
+    batchOptions.setRunner(DataflowRunner.class);
+    Pipeline.create(batchOptions);
     assertEquals(gcsUploadBufferSizeBytes, batchOptions.getGcsUploadBufferSizeBytes().intValue());
 
     DataflowPipelineOptions streamingOptions = buildPipelineOptions();
     streamingOptions.setStreaming(true);
     streamingOptions.setGcsUploadBufferSizeBytes(gcsUploadBufferSizeBytes);
-    DataflowRunner.fromOptions(streamingOptions);
+    streamingOptions.setRunner(DataflowRunner.class);
+    Pipeline.create(streamingOptions);
     assertEquals(
         gcsUploadBufferSizeBytes, streamingOptions.getGcsUploadBufferSizeBytes().intValue());
   }


[4/4] incubator-beam git commit: [BEAM-554] Set Gcs upload buffer size to 1M in DataflowRunner streaming mode

Posted by lc...@apache.org.
[BEAM-554] Set Gcs upload buffer size to 1M in DataflowRunner streaming mode

This closes #828


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a07648bb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a07648bb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a07648bb

Branch: refs/heads/master
Commit: a07648bb6609ec7ca52721ae2d4a1b8f6ecdba71
Parents: d93ef2e d9ff2e4
Author: Luke Cwik <lc...@google.com>
Authored: Wed Aug 17 16:24:50 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Aug 17 16:24:50 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   |  7 ++++
 .../runners/dataflow/DataflowRunnerTest.java    | 37 ++++++++++++++++++++
 2 files changed, 44 insertions(+)
----------------------------------------------------------------------



[3/4] incubator-beam git commit: Set Gcs upload buffer size to 1M in streaming mode in DataflowRunner

Posted by lc...@apache.org.
Set Gcs upload buffer size to 1M in streaming mode in DataflowRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/da3081a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/da3081a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/da3081a6

Branch: refs/heads/master
Commit: da3081a68c82c6f22ee382dfe0ffe1bd6be5d0e2
Parents: d93ef2e
Author: Pei He <pe...@google.com>
Authored: Mon Aug 15 12:22:11 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Aug 17 16:24:39 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   |  6 ++++
 .../runners/dataflow/DataflowRunnerTest.java    | 31 ++++++++++++++++++++
 2 files changed, 37 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/da3081a6/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 6222289..6f8180e 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -125,6 +125,7 @@ import com.google.api.services.dataflow.model.DataflowPackage;
 import com.google.api.services.dataflow.model.Job;
 import com.google.api.services.dataflow.model.ListJobsResponse;
 import com.google.api.services.dataflow.model.WorkerPool;
+import com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
@@ -309,6 +310,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
           + "' invalid. Please make sure the value is non-negative.");
     }
 
+    if (dataflowOptions.isStreaming() && dataflowOptions.getGcsUploadBufferSizeBytes() == null) {
+      dataflowOptions.setGcsUploadBufferSizeBytes(
+          AbstractGoogleAsyncWriteChannel.UPLOAD_PIPE_BUFFER_SIZE_DEFAULT);
+    }
+
     return new DataflowRunner(dataflowOptions);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/da3081a6/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index d7deffd..6f1653b 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -89,6 +89,7 @@ import com.google.api.services.dataflow.Dataflow;
 import com.google.api.services.dataflow.model.DataflowPackage;
 import com.google.api.services.dataflow.model.Job;
 import com.google.api.services.dataflow.model.ListJobsResponse;
+import com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -795,6 +796,36 @@ public class DataflowRunnerTest {
     }
   }
 
+  @Test
+  public void testGcsUploadBufferSizeDefault() throws IOException {
+    DataflowPipelineOptions batchOptions = buildPipelineOptions();
+    DataflowRunner.fromOptions(batchOptions);
+    assertNull(batchOptions.getGcsUploadBufferSizeBytes());
+
+    DataflowPipelineOptions streamingOptions = buildPipelineOptions();
+    streamingOptions.setStreaming(true);
+    DataflowRunner.fromOptions(streamingOptions);
+    assertEquals(
+        AbstractGoogleAsyncWriteChannel.UPLOAD_PIPE_BUFFER_SIZE_DEFAULT,
+        streamingOptions.getGcsUploadBufferSizeBytes().intValue());
+  }
+
+  @Test
+  public void testGcsUploadBufferSize() throws IOException {
+    int gcsUploadBufferSizeBytes = 12345678;
+    DataflowPipelineOptions batchOptions = buildPipelineOptions();
+    batchOptions.setGcsUploadBufferSizeBytes(gcsUploadBufferSizeBytes);
+    DataflowRunner.fromOptions(batchOptions);
+    assertEquals(gcsUploadBufferSizeBytes, batchOptions.getGcsUploadBufferSizeBytes().intValue());
+
+    DataflowPipelineOptions streamingOptions = buildPipelineOptions();
+    streamingOptions.setStreaming(true);
+    streamingOptions.setGcsUploadBufferSizeBytes(gcsUploadBufferSizeBytes);
+    DataflowRunner.fromOptions(streamingOptions);
+    assertEquals(
+        gcsUploadBufferSizeBytes, streamingOptions.getGcsUploadBufferSizeBytes().intValue());
+  }
+
   /**
    * A fake PTransform for testing.
    */