You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/08 04:54:18 UTC
[2/2] incubator-beam git commit: Modified addBulkOptions for
simplicity
Modified addBulkOptions for simplicity
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b9231826
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b9231826
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b9231826
Branch: refs/heads/master
Commit: b9231826bb9c8084f3802206fbdd1d9f69fea3a6
Parents: 155409b
Author: Ian Zhou <ia...@google.com>
Authored: Thu Jul 7 10:27:47 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Jul 7 21:53:35 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigtable/BigtableIO.java | 95 +++++++++----------
.../sdk/io/gcp/bigtable/BigtableIOTest.java | 99 ++++++++++----------
2 files changed, 99 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9231826/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index dd17abe..b4c3c75 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -47,7 +47,6 @@ import com.google.bigtable.v1.Row;
import com.google.bigtable.v1.RowFilter;
import com.google.bigtable.v1.SampleRowKeysResponse;
import com.google.cloud.bigtable.config.BigtableOptions;
-import com.google.cloud.bigtable.config.BulkOptions;
import com.google.cloud.bigtable.config.RetryOptions;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
@@ -207,10 +206,23 @@ public class BigtableIO {
public Read withBigtableOptions(BigtableOptions.Builder optionsBuilder) {
checkNotNull(optionsBuilder, "optionsBuilder");
// TODO: is there a better way to clone a Builder? Want it to be immune from user changes.
- BigtableOptions.Builder clonedBuilder = optionsBuilder.build().toBuilder();
- clonedBuilder.setDataChannelCount(1);
- clonedBuilder = addRetryOptions(clonedBuilder);
+ BigtableOptions options = optionsBuilder.build();
+ RetryOptions retryOptions = options.getRetryOptions();
+
+ // Set data channel count to one because there is only 1 scanner in this session
+ // Use retryOptionsToBuilder because absent in Bigtable library
+ // TODO: replace with RetryOptions.toBuilder() when added to Bigtable library
+ // Set batch size because of bug (incorrect initialization) in Bigtable library
+ // TODO: remove setRetryOptions when fixed in Bigtable library
+ BigtableOptions.Builder clonedBuilder = options.toBuilder()
+ .setDataChannelCount(1)
+ .setRetryOptions(
+ retryOptionsToBuilder(retryOptions)
+ .setStreamingBatchSize(Math.min(retryOptions.getStreamingBatchSize(),
+ retryOptions.getStreamingBufferSize() / 2))
+ .build());
BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(getUserAgent()).build();
+
return new Read(optionsWithAgent, tableId, filter, bigtableService);
}
@@ -393,9 +405,24 @@ public class BigtableIO {
public Write withBigtableOptions(BigtableOptions.Builder optionsBuilder) {
checkNotNull(optionsBuilder, "optionsBuilder");
// TODO: is there a better way to clone a Builder? Want it to be immune from user changes.
- BigtableOptions.Builder clonedBuilder = optionsBuilder.build().toBuilder();
- clonedBuilder = addBulkOptions(clonedBuilder);
- clonedBuilder = addRetryOptions(clonedBuilder);
+ BigtableOptions options = optionsBuilder.build();
+ RetryOptions retryOptions = options.getRetryOptions();
+
+ // Set useBulkApi to true for enabling bulk writes
+ // Use retryOptionsToBuilder because absent in Bigtable library
+ // TODO: replace with RetryOptions.toBuilder() when added to Bigtable library
+ // Set batch size because of bug (incorrect initialization) in Bigtable library
+ // TODO: remove setRetryOptions when fixed in Bigtable library
+ BigtableOptions.Builder clonedBuilder = options.toBuilder()
+ .setBulkOptions(
+ options.getBulkOptions().toBuilder()
+ .setUseBulkApi(true)
+ .build())
+ .setRetryOptions(
+ retryOptionsToBuilder(retryOptions)
+ .setStreamingBatchSize(Math.min(retryOptions.getStreamingBatchSize(),
+ retryOptions.getStreamingBufferSize() / 2))
+ .build());
BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(getUserAgent()).build();
return new Write(optionsWithAgent, tableId, bigtableService);
}
@@ -1036,52 +1063,26 @@ public class BigtableIO {
}
/**
- * A helper function to add appropriate bulk options. See
- * <a href="https://github.com/GoogleCloudPlatform/cloud-bigtable-client/issues/899">RetryOptions
- * toBuilder</a> for issue.
- */
- static BigtableOptions.Builder addBulkOptions(BigtableOptions.Builder builder) {
- BulkOptions bulkOptions = builder.build().getBulkOptions();
-
- BulkOptions.Builder bulkOptionsBuilder = new BulkOptions.Builder()
- .setAsyncMutatorWorkerCount(bulkOptions.getAsyncMutatorCount())
- .setUseBulkApi(true)
- .setBulkMaxRowKeyCount(bulkOptions.getBulkMaxRowKeyCount())
- .setBulkMaxRequestSize(bulkOptions.getBulkMaxRequestSize())
- .setMaxInflightRpcs(bulkOptions.getMaxInflightRpcs())
- .setMaxMemory(bulkOptions.getMaxMemory());
-
- builder.setBulkOptions(bulkOptionsBuilder.build());
- return builder;
- }
-
- /**
- * A helper function to add appropriate retry options. See
- * <a href="https://github.com/GoogleCloudPlatform/cloud-bigtable-client/issues/899">RetryOptions
- * toBuilder</a> for issue.
+ * A helper function to convert a RetryOptions into a RetryOptions.Builder.
*/
- static BigtableOptions.Builder addRetryOptions(BigtableOptions.Builder builder) {
- RetryOptions retryOptions = builder.build().getRetryOptions();
-
- RetryOptions.Builder retryOptionsBuilder = new RetryOptions.Builder()
- .setEnableRetries(retryOptions.enableRetries())
- .setInitialBackoffMillis(retryOptions.getInitialBackoffMillis())
- .setBackoffMultiplier(retryOptions.getBackoffMultiplier())
- .setMaxElapsedBackoffMillis(retryOptions.getMaxElaspedBackoffMillis())
- .setStreamingBufferSize(retryOptions.getStreamingBufferSize())
- .setStreamingBatchSize(Math.min(retryOptions.getStreamingBatchSize(),
- retryOptions.getStreamingBufferSize() / 2))
- .setReadPartialRowTimeoutMillis(retryOptions.getReadPartialRowTimeoutMillis())
- .setMaxScanTimeoutRetries(retryOptions.getMaxScanTimeoutRetries())
- .setAllowRetriesWithoutTimestamp(retryOptions.allowRetriesWithoutTimestamp());
+ private static RetryOptions.Builder retryOptionsToBuilder(RetryOptions options) {
+ RetryOptions.Builder builder = new RetryOptions.Builder();
+ builder.setEnableRetries(options.enableRetries());
+ builder.setInitialBackoffMillis(options.getInitialBackoffMillis());
+ builder.setBackoffMultiplier(options.getBackoffMultiplier());
+ builder.setMaxElapsedBackoffMillis(options.getMaxElaspedBackoffMillis());
+ builder.setStreamingBufferSize(options.getStreamingBufferSize());
+ builder.setStreamingBatchSize(options.getStreamingBatchSize());
+ builder.setReadPartialRowTimeoutMillis(options.getReadPartialRowTimeoutMillis());
+ builder.setMaxScanTimeoutRetries(options.getMaxScanTimeoutRetries());
+ builder.setAllowRetriesWithoutTimestamp(options.allowRetriesWithoutTimestamp());
for (Status.Code code : Status.Code.values()) {
- if (retryOptions.isRetryable(code)) {
- retryOptionsBuilder.addStatusToRetryOn(code);
+ if (options.isRetryable(code)) {
+ builder.addStatusToRetryOn(code);
}
}
- builder.setRetryOptions(retryOptionsBuilder.build());
return builder;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b9231826/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index cd16f54..a6a7f9d 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -28,11 +28,9 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValu
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verifyNotNull;
-
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
@@ -71,8 +69,6 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
-import io.grpc.Status;
-
import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Rule;
@@ -86,12 +82,10 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
-import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -536,55 +530,64 @@ public class BigtableIOTest {
}
@Test
- public void testAddBulkOptions() {
+ public void testReadWithBigTableOptionsSetsRetryOptions() {
+ final int initialBackoffMillis = -1;
+
BigtableOptions.Builder optionsBuilder = BIGTABLE_OPTIONS.toBuilder();
- optionsBuilder = BigtableIO.addBulkOptions(optionsBuilder);
- BulkOptions bulkOptions = optionsBuilder.build().getBulkOptions();
- assertEquals(BulkOptions.BIGTABLE_ASYNC_MUTATOR_COUNT_DEFAULT,
- bulkOptions.getAsyncMutatorCount());
- assertEquals(true, bulkOptions.useBulkApi());
- assertEquals(BulkOptions.BIGTABLE_BULK_MAX_ROW_KEY_COUNT_DEFAULT,
- bulkOptions.getBulkMaxRowKeyCount());
- assertEquals(BulkOptions.BIGTABLE_BULK_MAX_REQUEST_SIZE_BYTES_DEFAULT,
- bulkOptions.getBulkMaxRequestSize());
- assertEquals(BulkOptions.BIGTABLE_MAX_INFLIGHT_RPCS_PER_CHANNEL_DEFAULT
- * optionsBuilder.getDataChannelCount(), bulkOptions.getMaxInflightRpcs());
- assertEquals(BulkOptions.BIGTABLE_MAX_MEMORY_DEFAULT, bulkOptions.getMaxMemory());
+ RetryOptions.Builder retryOptionsBuilder = new RetryOptions.Builder();
+ retryOptionsBuilder.setInitialBackoffMillis(initialBackoffMillis);
+
+ optionsBuilder.setRetryOptions(retryOptionsBuilder.build());
+
+ BigtableIO.Read read =
+ BigtableIO.read().withBigtableOptions(optionsBuilder.build());
+
+ BigtableOptions options = read.getBigtableOptions();
+ assertEquals(RetryOptions.DEFAULT_STREAMING_BATCH_SIZE,
+ options.getRetryOptions().getStreamingBatchSize());
+ assertEquals(initialBackoffMillis, options.getRetryOptions().getInitialBackoffMillis());
+
+ assertThat(options.getRetryOptions(),
+ Matchers.equalTo(retryOptionsBuilder
+ .setStreamingBatchSize(RetryOptions.DEFAULT_STREAMING_BATCH_SIZE)
+ .build()));
}
@Test
- public void testAddRetryOptions() {
- final double delta = 0.0000001;
+ public void testWriteWithBigTableOptionsSetsBulkOptionsAndRetryOptions() {
+ final int maxInflightRpcs = 1;
+ final int initialBackoffMillis = -1;
+
BigtableOptions.Builder optionsBuilder = BIGTABLE_OPTIONS.toBuilder();
- optionsBuilder = BigtableIO.addRetryOptions(optionsBuilder);
-
- RetryOptions retryOptions = optionsBuilder.build().getRetryOptions();
- assertEquals(RetryOptions.DEFAULT_ENABLE_GRPC_RETRIES, retryOptions.enableRetries());
- assertEquals(RetryOptions.DEFAULT_INITIAL_BACKOFF_MILLIS,
- retryOptions.getInitialBackoffMillis());
- assertEquals(RetryOptions.DEFAULT_BACKOFF_MULTIPLIER, retryOptions.getBackoffMultiplier(),
- delta);
- assertEquals(RetryOptions.DEFAULT_MAX_ELAPSED_BACKOFF_MILLIS,
- retryOptions.getMaxElaspedBackoffMillis());
- assertEquals(RetryOptions.DEFAULT_STREAMING_BUFFER_SIZE, retryOptions.getStreamingBufferSize());
- assertEquals(RetryOptions.DEFAULT_STREAMING_BATCH_SIZE, retryOptions.getStreamingBatchSize());
- assertEquals(RetryOptions.DEFAULT_READ_PARTIAL_ROW_TIMEOUT_MS,
- retryOptions.getReadPartialRowTimeoutMillis());
- assertEquals(RetryOptions.DEFAULT_MAX_SCAN_TIMEOUT_RETRIES,
- retryOptions.getMaxScanTimeoutRetries());
- assertFalse(retryOptions.allowRetriesWithoutTimestamp());
-
- Set<Status.Code> statusToRetryOn = new HashSet<>();
- for (Status.Code code : Status.Code.values()) {
- if (retryOptions.isRetryable(code)) {
- statusToRetryOn.add(code);
- }
- }
- Set<Status.Code> defaultStatusToRetryOn =
- new HashSet<>(RetryOptions.DEFAULT_ENABLE_GRPC_RETRIES_SET);
- assertThat(statusToRetryOn, Matchers.containsInAnyOrder(defaultStatusToRetryOn.toArray()));
+ BulkOptions.Builder bulkOptionsBuilder = new BulkOptions.Builder();
+ bulkOptionsBuilder.setMaxInflightRpcs(maxInflightRpcs);
+
+ RetryOptions.Builder retryOptionsBuilder = new RetryOptions.Builder();
+ retryOptionsBuilder.setInitialBackoffMillis(initialBackoffMillis);
+
+ optionsBuilder.setBulkOptions(bulkOptionsBuilder.build())
+ .setRetryOptions(retryOptionsBuilder.build());
+
+ BigtableIO.Write write =
+ BigtableIO.write().withBigtableOptions(optionsBuilder.build());
+
+ BigtableOptions options = write.getBigtableOptions();
+ assertEquals(true, options.getBulkOptions().useBulkApi());
+ assertEquals(maxInflightRpcs, options.getBulkOptions().getMaxInflightRpcs());
+ assertEquals(RetryOptions.DEFAULT_STREAMING_BATCH_SIZE,
+ options.getRetryOptions().getStreamingBatchSize());
+ assertEquals(initialBackoffMillis, options.getRetryOptions().getInitialBackoffMillis());
+
+ assertThat(options.getBulkOptions(),
+ Matchers.equalTo(bulkOptionsBuilder
+ .setUseBulkApi(true)
+ .build()));
+ assertThat(options.getRetryOptions(),
+ Matchers.equalTo(retryOptionsBuilder
+ .setStreamingBatchSize(RetryOptions.DEFAULT_STREAMING_BATCH_SIZE)
+ .build()));
}
////////////////////////////////////////////////////////////////////////////////////////////