You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/08 04:54:18 UTC

[2/2] incubator-beam git commit: Modified addBulkOptions for simplicity

Modified addBulkOptions for simplicity


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b9231826
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b9231826
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b9231826

Branch: refs/heads/master
Commit: b9231826bb9c8084f3802206fbdd1d9f69fea3a6
Parents: 155409b
Author: Ian Zhou <ia...@google.com>
Authored: Thu Jul 7 10:27:47 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Jul 7 21:53:35 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java    | 95 +++++++++----------
 .../sdk/io/gcp/bigtable/BigtableIOTest.java     | 99 ++++++++++----------
 2 files changed, 99 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9231826/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index dd17abe..b4c3c75 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -47,7 +47,6 @@ import com.google.bigtable.v1.Row;
 import com.google.bigtable.v1.RowFilter;
 import com.google.bigtable.v1.SampleRowKeysResponse;
 import com.google.cloud.bigtable.config.BigtableOptions;
-import com.google.cloud.bigtable.config.BulkOptions;
 import com.google.cloud.bigtable.config.RetryOptions;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
@@ -207,10 +206,23 @@ public class BigtableIO {
     public Read withBigtableOptions(BigtableOptions.Builder optionsBuilder) {
       checkNotNull(optionsBuilder, "optionsBuilder");
       // TODO: is there a better way to clone a Builder? Want it to be immune from user changes.
-      BigtableOptions.Builder clonedBuilder = optionsBuilder.build().toBuilder();
-      clonedBuilder.setDataChannelCount(1);
-      clonedBuilder = addRetryOptions(clonedBuilder);
+      BigtableOptions options = optionsBuilder.build();
+      RetryOptions retryOptions = options.getRetryOptions();
+
+      // Set data channel count to one because there is only 1 scanner in this session
+      // Use retryOptionsToBuilder because absent in Bigtable library
+      // TODO: replace with RetryOptions.toBuilder() when added to Bigtable library
+      // Set batch size because of bug (incorrect initialization) in Bigtable library
+      // TODO: remove setRetryOptions when fixed in Bigtable library
+      BigtableOptions.Builder clonedBuilder = options.toBuilder()
+          .setDataChannelCount(1)
+          .setRetryOptions(
+              retryOptionsToBuilder(retryOptions)
+                  .setStreamingBatchSize(Math.min(retryOptions.getStreamingBatchSize(),
+                      retryOptions.getStreamingBufferSize() / 2))
+                  .build());
       BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(getUserAgent()).build();
+
       return new Read(optionsWithAgent, tableId, filter, bigtableService);
     }
 
@@ -393,9 +405,24 @@ public class BigtableIO {
     public Write withBigtableOptions(BigtableOptions.Builder optionsBuilder) {
       checkNotNull(optionsBuilder, "optionsBuilder");
       // TODO: is there a better way to clone a Builder? Want it to be immune from user changes.
-      BigtableOptions.Builder clonedBuilder = optionsBuilder.build().toBuilder();
-      clonedBuilder = addBulkOptions(clonedBuilder);
-      clonedBuilder = addRetryOptions(clonedBuilder);
+      BigtableOptions options = optionsBuilder.build();
+      RetryOptions retryOptions = options.getRetryOptions();
+
+      // Set useBulkApi to true for enabling bulk writes
+      // Use retryOptionsToBuilder because absent in Bigtable library
+      // TODO: replace with RetryOptions.toBuilder() when added to Bigtable library
+      // Set batch size because of bug (incorrect initialization) in Bigtable library
+      // TODO: remove setRetryOptions when fixed in Bigtable library
+      BigtableOptions.Builder clonedBuilder = options.toBuilder()
+          .setBulkOptions(
+              options.getBulkOptions().toBuilder()
+                  .setUseBulkApi(true)
+                  .build())
+          .setRetryOptions(
+              retryOptionsToBuilder(retryOptions)
+                  .setStreamingBatchSize(Math.min(retryOptions.getStreamingBatchSize(),
+                      retryOptions.getStreamingBufferSize() / 2))
+                  .build());
       BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(getUserAgent()).build();
       return new Write(optionsWithAgent, tableId, bigtableService);
     }
@@ -1036,52 +1063,26 @@ public class BigtableIO {
   }
 
   /**
-   * A helper function to add appropriate bulk options. See
-   * <a href="https://github.com/GoogleCloudPlatform/cloud-bigtable-client/issues/899">RetryOptions
-   * toBuilder</a> for issue.
-   */
-  static BigtableOptions.Builder addBulkOptions(BigtableOptions.Builder builder) {
-    BulkOptions bulkOptions = builder.build().getBulkOptions();
-
-    BulkOptions.Builder bulkOptionsBuilder = new BulkOptions.Builder()
-        .setAsyncMutatorWorkerCount(bulkOptions.getAsyncMutatorCount())
-        .setUseBulkApi(true)
-        .setBulkMaxRowKeyCount(bulkOptions.getBulkMaxRowKeyCount())
-        .setBulkMaxRequestSize(bulkOptions.getBulkMaxRequestSize())
-        .setMaxInflightRpcs(bulkOptions.getMaxInflightRpcs())
-        .setMaxMemory(bulkOptions.getMaxMemory());
-
-    builder.setBulkOptions(bulkOptionsBuilder.build());
-    return builder;
-  }
-
-  /**
-   * A helper function to add appropriate retry options. See
-   * <a href="https://github.com/GoogleCloudPlatform/cloud-bigtable-client/issues/899">RetryOptions
-   * toBuilder</a> for issue.
+   * A helper function to convert a RetryOptions into a RetryOptions.Builder.
    */
-  static BigtableOptions.Builder addRetryOptions(BigtableOptions.Builder builder) {
-    RetryOptions retryOptions = builder.build().getRetryOptions();
-
-    RetryOptions.Builder retryOptionsBuilder = new RetryOptions.Builder()
-        .setEnableRetries(retryOptions.enableRetries())
-        .setInitialBackoffMillis(retryOptions.getInitialBackoffMillis())
-        .setBackoffMultiplier(retryOptions.getBackoffMultiplier())
-        .setMaxElapsedBackoffMillis(retryOptions.getMaxElaspedBackoffMillis())
-        .setStreamingBufferSize(retryOptions.getStreamingBufferSize())
-        .setStreamingBatchSize(Math.min(retryOptions.getStreamingBatchSize(),
-            retryOptions.getStreamingBufferSize() / 2))
-        .setReadPartialRowTimeoutMillis(retryOptions.getReadPartialRowTimeoutMillis())
-        .setMaxScanTimeoutRetries(retryOptions.getMaxScanTimeoutRetries())
-        .setAllowRetriesWithoutTimestamp(retryOptions.allowRetriesWithoutTimestamp());
+  private static RetryOptions.Builder retryOptionsToBuilder(RetryOptions options) {
+    RetryOptions.Builder builder = new RetryOptions.Builder();
+    builder.setEnableRetries(options.enableRetries());
+    builder.setInitialBackoffMillis(options.getInitialBackoffMillis());
+    builder.setBackoffMultiplier(options.getBackoffMultiplier());
+    builder.setMaxElapsedBackoffMillis(options.getMaxElaspedBackoffMillis());
+    builder.setStreamingBufferSize(options.getStreamingBufferSize());
+    builder.setStreamingBatchSize(options.getStreamingBatchSize());
+    builder.setReadPartialRowTimeoutMillis(options.getReadPartialRowTimeoutMillis());
+    builder.setMaxScanTimeoutRetries(options.getMaxScanTimeoutRetries());
+    builder.setAllowRetriesWithoutTimestamp(options.allowRetriesWithoutTimestamp());
 
     for (Status.Code code : Status.Code.values()) {
-      if (retryOptions.isRetryable(code)) {
-        retryOptionsBuilder.addStatusToRetryOn(code);
+      if (options.isRetryable(code)) {
+        builder.addStatusToRetryOn(code);
       }
     }
 
-    builder.setRetryOptions(retryOptionsBuilder.build());
     return builder;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9231826/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index cd16f54..a6a7f9d 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -28,11 +28,9 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValu
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Verify.verifyNotNull;
-
 import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.hasSize;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 
@@ -71,8 +69,6 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Empty;
 
-import io.grpc.Status;
-
 import org.hamcrest.Matchers;
 import org.junit.Before;
 import org.junit.Rule;
@@ -86,12 +82,10 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
-import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
@@ -536,55 +530,64 @@ public class BigtableIOTest {
   }
 
   @Test
-  public void testAddBulkOptions() {
+  public void testReadWithBigTableOptionsSetsRetryOptions() {
+    final int initialBackoffMillis = -1;
+
     BigtableOptions.Builder optionsBuilder = BIGTABLE_OPTIONS.toBuilder();
-    optionsBuilder = BigtableIO.addBulkOptions(optionsBuilder);
 
-    BulkOptions bulkOptions = optionsBuilder.build().getBulkOptions();
-    assertEquals(BulkOptions.BIGTABLE_ASYNC_MUTATOR_COUNT_DEFAULT,
-        bulkOptions.getAsyncMutatorCount());
-    assertEquals(true, bulkOptions.useBulkApi());
-    assertEquals(BulkOptions.BIGTABLE_BULK_MAX_ROW_KEY_COUNT_DEFAULT,
-        bulkOptions.getBulkMaxRowKeyCount());
-    assertEquals(BulkOptions.BIGTABLE_BULK_MAX_REQUEST_SIZE_BYTES_DEFAULT,
-        bulkOptions.getBulkMaxRequestSize());
-    assertEquals(BulkOptions.BIGTABLE_MAX_INFLIGHT_RPCS_PER_CHANNEL_DEFAULT
-        * optionsBuilder.getDataChannelCount(), bulkOptions.getMaxInflightRpcs());
-    assertEquals(BulkOptions.BIGTABLE_MAX_MEMORY_DEFAULT, bulkOptions.getMaxMemory());
+    RetryOptions.Builder retryOptionsBuilder = new RetryOptions.Builder();
+    retryOptionsBuilder.setInitialBackoffMillis(initialBackoffMillis);
+
+    optionsBuilder.setRetryOptions(retryOptionsBuilder.build());
+
+    BigtableIO.Read read =
+        BigtableIO.read().withBigtableOptions(optionsBuilder.build());
+
+    BigtableOptions options = read.getBigtableOptions();
+    assertEquals(RetryOptions.DEFAULT_STREAMING_BATCH_SIZE,
+        options.getRetryOptions().getStreamingBatchSize());
+    assertEquals(initialBackoffMillis, options.getRetryOptions().getInitialBackoffMillis());
+
+    assertThat(options.getRetryOptions(),
+        Matchers.equalTo(retryOptionsBuilder
+            .setStreamingBatchSize(RetryOptions.DEFAULT_STREAMING_BATCH_SIZE)
+            .build()));
   }
 
   @Test
-  public void testAddRetryOptions() {
-    final double delta = 0.0000001;
+  public void testWriteWithBigTableOptionsSetsBulkOptionsAndRetryOptions() {
+    final int maxInflightRpcs = 1;
+    final int initialBackoffMillis = -1;
+
     BigtableOptions.Builder optionsBuilder = BIGTABLE_OPTIONS.toBuilder();
-    optionsBuilder = BigtableIO.addRetryOptions(optionsBuilder);
-
-    RetryOptions retryOptions = optionsBuilder.build().getRetryOptions();
-    assertEquals(RetryOptions.DEFAULT_ENABLE_GRPC_RETRIES, retryOptions.enableRetries());
-    assertEquals(RetryOptions.DEFAULT_INITIAL_BACKOFF_MILLIS,
-        retryOptions.getInitialBackoffMillis());
-    assertEquals(RetryOptions.DEFAULT_BACKOFF_MULTIPLIER, retryOptions.getBackoffMultiplier(),
-        delta);
-    assertEquals(RetryOptions.DEFAULT_MAX_ELAPSED_BACKOFF_MILLIS,
-        retryOptions.getMaxElaspedBackoffMillis());
-    assertEquals(RetryOptions.DEFAULT_STREAMING_BUFFER_SIZE, retryOptions.getStreamingBufferSize());
-    assertEquals(RetryOptions.DEFAULT_STREAMING_BATCH_SIZE, retryOptions.getStreamingBatchSize());
-    assertEquals(RetryOptions.DEFAULT_READ_PARTIAL_ROW_TIMEOUT_MS,
-        retryOptions.getReadPartialRowTimeoutMillis());
-    assertEquals(RetryOptions.DEFAULT_MAX_SCAN_TIMEOUT_RETRIES,
-        retryOptions.getMaxScanTimeoutRetries());
-    assertFalse(retryOptions.allowRetriesWithoutTimestamp());
-
-    Set<Status.Code> statusToRetryOn = new HashSet<>();
-    for (Status.Code code : Status.Code.values()) {
-      if (retryOptions.isRetryable(code)) {
-        statusToRetryOn.add(code);
-      }
-    }
 
-    Set<Status.Code> defaultStatusToRetryOn =
-        new HashSet<>(RetryOptions.DEFAULT_ENABLE_GRPC_RETRIES_SET);
-    assertThat(statusToRetryOn, Matchers.containsInAnyOrder(defaultStatusToRetryOn.toArray()));
+    BulkOptions.Builder bulkOptionsBuilder = new BulkOptions.Builder();
+    bulkOptionsBuilder.setMaxInflightRpcs(maxInflightRpcs);
+
+    RetryOptions.Builder retryOptionsBuilder = new RetryOptions.Builder();
+    retryOptionsBuilder.setInitialBackoffMillis(initialBackoffMillis);
+
+    optionsBuilder.setBulkOptions(bulkOptionsBuilder.build())
+        .setRetryOptions(retryOptionsBuilder.build());
+
+    BigtableIO.Write write =
+        BigtableIO.write().withBigtableOptions(optionsBuilder.build());
+
+    BigtableOptions options = write.getBigtableOptions();
+    assertEquals(true, options.getBulkOptions().useBulkApi());
+    assertEquals(maxInflightRpcs, options.getBulkOptions().getMaxInflightRpcs());
+    assertEquals(RetryOptions.DEFAULT_STREAMING_BATCH_SIZE,
+        options.getRetryOptions().getStreamingBatchSize());
+    assertEquals(initialBackoffMillis, options.getRetryOptions().getInitialBackoffMillis());
+
+    assertThat(options.getBulkOptions(),
+        Matchers.equalTo(bulkOptionsBuilder
+            .setUseBulkApi(true)
+            .build()));
+    assertThat(options.getRetryOptions(),
+        Matchers.equalTo(retryOptionsBuilder
+            .setStreamingBatchSize(RetryOptions.DEFAULT_STREAMING_BATCH_SIZE)
+            .build()));
   }
 
   ////////////////////////////////////////////////////////////////////////////////////////////