You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/06/30 20:06:17 UTC
[1/2] incubator-beam git commit: Update to bigtable-client-core-0.3.0
and use bulk writes
Repository: incubator-beam
Updated Branches:
refs/heads/master 4a0e426a8 -> 38866cd55
Update to bigtable-client-core-0.3.0 and use bulk writes
Generally more stable, plus bulk writes bring 5x write throughput in
batch jobs by more efficiently using the network.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/653c504f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/653c504f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/653c504f
Branch: refs/heads/master
Commit: 653c504f2f9460bc8861d149694ed2595701ce16
Parents: 4a0e426
Author: Ian Zhou <ia...@google.com>
Authored: Thu Jun 16 13:52:11 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Jun 30 13:05:07 2016 -0700
----------------------------------------------------------------------
pom.xml | 2 +-
sdks/java/core/pom.xml | 2 +-
sdks/java/io/google-cloud-platform/pom.xml | 2 +-
.../beam/sdk/io/gcp/bigtable/BigtableIO.java | 60 +++++++++++++++++++-
.../io/gcp/bigtable/BigtableServiceImpl.java | 31 +++++-----
.../sdk/io/gcp/bigtable/BigtableIOTest.java | 59 +++++++++++++++++++
.../sdk/io/gcp/bigtable/BigtableWriteIT.java | 18 +++++-
7 files changed, 150 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/653c504f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6f1eaac..14a9c67 100644
--- a/pom.xml
+++ b/pom.xml
@@ -119,7 +119,7 @@
<google-cloud-bigdataoss.version>1.4.5</google-cloud-bigdataoss.version>
<google-cloud-dataflow-java-proto-library-all.version>0.5.160304</google-cloud-dataflow-java-proto-library-all.version>
<guava.version>19.0</guava.version>
- <grpc.version>0.12.0</grpc.version>
+ <grpc.version>0.13.1</grpc.version>
<hamcrest.version>1.3</hamcrest.version>
<jackson.version>2.7.2</jackson.version>
<findbugs.version>3.0.1</findbugs.version>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/653c504f/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index 67c7fe9..9ec8f3d 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -311,7 +311,7 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
- <version>4.1.0.Beta8</version>
+ <version>4.1.0.CR1</version>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/653c504f/sdks/java/io/google-cloud-platform/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml
index c95ea71..c7e77f1 100644
--- a/sdks/java/io/google-cloud-platform/pom.xml
+++ b/sdks/java/io/google-cloud-platform/pom.xml
@@ -32,7 +32,7 @@
<packaging>jar</packaging>
<properties>
- <bigtable.version>0.2.3</bigtable.version>
+ <bigtable.version>0.3.0</bigtable.version>
</properties>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/653c504f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index cddb333..47c68dd 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -47,6 +47,8 @@ import com.google.bigtable.v1.Row;
import com.google.bigtable.v1.RowFilter;
import com.google.bigtable.v1.SampleRowKeysResponse;
import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.cloud.bigtable.config.BulkOptions;
+import com.google.cloud.bigtable.config.RetryOptions;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.FutureCallback;
@@ -54,6 +56,8 @@ import com.google.common.util.concurrent.Futures;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
+import io.grpc.Status;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -204,6 +208,8 @@ public class BigtableIO {
checkNotNull(optionsBuilder, "optionsBuilder");
// TODO: is there a better way to clone a Builder? Want it to be immune from user changes.
BigtableOptions.Builder clonedBuilder = optionsBuilder.build().toBuilder();
+ clonedBuilder.setDataChannelCount(1);
+ clonedBuilder = addRetryOptions(clonedBuilder);
BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(getUserAgent()).build();
return new Read(optionsWithAgent, tableId, filter, bigtableService);
}
@@ -388,6 +394,8 @@ public class BigtableIO {
checkNotNull(optionsBuilder, "optionsBuilder");
// TODO: is there a better way to clone a Builder? Want it to be immune from user changes.
BigtableOptions.Builder clonedBuilder = optionsBuilder.build().toBuilder();
+ clonedBuilder = addBulkOptions(clonedBuilder);
+ clonedBuilder = addRetryOptions(clonedBuilder);
BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(getUserAgent()).build();
return new Write(optionsWithAgent, tableId, bigtableService);
}
@@ -1024,6 +1032,56 @@ public class BigtableIO {
info.getName(),
info.getVersion(),
javaVersion,
- "0.2.3" /* TODO get Bigtable client version directly from jar. */);
+ "0.3.0" /* TODO get Bigtable client version directly from jar. */);
+ }
+
+ /**
+ * A helper function to add appropriate bulk options. See
+ * <a href="https://github.com/GoogleCloudPlatform/cloud-bigtable-client/issues/899">RetryOptions
+ * toBuilder</a> for issue.
+ */
+ static BigtableOptions.Builder addBulkOptions(BigtableOptions.Builder builder) {
+ BulkOptions bulkOptions = builder.build().getBulkOptions();
+
+ BulkOptions.Builder bulkOptionsBuilder = new BulkOptions.Builder()
+ .setAsyncMutatorWorkerCount(bulkOptions.getAsyncMutatorCount())
+ .setUseBulkApi(true)
+ .setBulkMaxRowKeyCount(bulkOptions.getBulkMaxRowKeyCount())
+ .setBulkMaxRequestSize(bulkOptions.getBulkMaxRequestSize())
+ .setMaxInflightRpcs(bulkOptions.getMaxInflightRpcs())
+ .setMaxMemory(bulkOptions.getMaxMemory());
+
+ builder.setBulkOptions(bulkOptionsBuilder.build());
+ return builder;
+ }
+
+ /**
+ * A helper function to add appropriate retry options. See
+ * <a href="https://github.com/GoogleCloudPlatform/cloud-bigtable-client/issues/899">RetryOptions
+ * toBuilder</a> for issue.
+ */
+ static BigtableOptions.Builder addRetryOptions(BigtableOptions.Builder builder) {
+ RetryOptions retryOptions = builder.build().getRetryOptions();
+
+ RetryOptions.Builder retryOptionsBuilder = new RetryOptions.Builder()
+ .setEnableRetries(retryOptions.enableRetries())
+ .setInitialBackoffMillis(retryOptions.getInitialBackoffMillis())
+ .setBackoffMultiplier(retryOptions.getBackoffMultiplier())
+ .setMaxElapsedBackoffMillis(retryOptions.getMaxElaspedBackoffMillis())
+ .setStreamingBufferSize(retryOptions.getStreamingBufferSize())
+ .setStreamingBatchSize(Math.min(retryOptions.getStreamingBatchSize(),
+ retryOptions.getStreamingBufferSize() / 2))
+ .setReadPartialRowTimeoutMillis(retryOptions.getReadPartialRowTimeoutMillis())
+ .setMaxScanTimeoutRetries(retryOptions.getMaxScanTimeoutRetries())
+ .setAllowRetriesWithoutTimestamp(retryOptions.allowRetriesWithoutTimestamp());
+
+ for (Status.Code code : Status.Code.values()) {
+ if (retryOptions.isRetryable(code)) {
+ retryOptionsBuilder.addStatusToRetryOn(code);
+ }
+ }
+
+ builder.setRetryOptions(retryOptionsBuilder.build());
+ return builder;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/653c504f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
index 5933e13..a0e6b29 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
@@ -30,8 +30,9 @@ import com.google.bigtable.v1.SampleRowKeysRequest;
import com.google.bigtable.v1.SampleRowKeysResponse;
import com.google.cloud.bigtable.config.BigtableOptions;
import com.google.cloud.bigtable.grpc.BigtableSession;
+import com.google.cloud.bigtable.grpc.BigtableTableName;
import com.google.cloud.bigtable.grpc.async.AsyncExecutor;
-import com.google.cloud.bigtable.grpc.async.HeapSizeManager;
+import com.google.cloud.bigtable.grpc.async.BulkMutation;
import com.google.cloud.bigtable.grpc.scanner.ResultScanner;
import com.google.common.base.MoreObjects;
import com.google.common.io.Closer;
@@ -65,7 +66,7 @@ class BigtableServiceImpl implements BigtableService {
@Override
public BigtableWriterImpl openForWriting(String tableId) throws IOException {
BigtableSession session = new BigtableSession(options);
- String tableName = options.getClusterName().toTableNameStr(tableId);
+ BigtableTableName tableName = options.getClusterName().toTableName(tableId);
return new BigtableWriterImpl(session, tableName);
}
@@ -170,24 +171,23 @@ class BigtableServiceImpl implements BigtableService {
private static class BigtableWriterImpl implements Writer {
private BigtableSession session;
private AsyncExecutor executor;
+ private BulkMutation bulkMutation;
private final MutateRowRequest.Builder partialBuilder;
- public BigtableWriterImpl(BigtableSession session, String tableName) {
+ public BigtableWriterImpl(BigtableSession session, BigtableTableName tableName) {
this.session = session;
- this.executor =
- new AsyncExecutor(
- session.getDataClient(),
- new HeapSizeManager(
- AsyncExecutor.ASYNC_MUTATOR_MAX_MEMORY_DEFAULT,
- AsyncExecutor.MAX_INFLIGHT_RPCS_DEFAULT));
-
- partialBuilder = MutateRowRequest.newBuilder().setTableName(tableName);
+ executor = session.createAsyncExecutor();
+ bulkMutation = session.createBulkMutation(tableName, executor);
+
+ partialBuilder = MutateRowRequest.newBuilder().setTableName(tableName.toString());
}
@Override
public void close() throws IOException {
try {
- if (executor != null) {
+ if (bulkMutation != null) {
+ bulkMutation.flush();
+ bulkMutation = null;
executor.flush();
executor = null;
}
@@ -208,12 +208,7 @@ class BigtableServiceImpl implements BigtableService {
.setRowKey(record.getKey())
.addAllMutations(record.getValue())
.build();
- try {
- return executor.mutateRowAsync(r);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException("Write interrupted", e);
- }
+ return bulkMutation.add(r);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/653c504f/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index cdbaaac..6a6197e 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -27,6 +27,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verifyNotNull;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
@@ -54,6 +55,8 @@ import com.google.bigtable.v1.Row;
import com.google.bigtable.v1.RowFilter;
import com.google.bigtable.v1.SampleRowKeysResponse;
import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.cloud.bigtable.config.BulkOptions;
+import com.google.cloud.bigtable.config.RetryOptions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
@@ -63,6 +66,8 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
+import io.grpc.Status;
+
import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Rule;
@@ -76,10 +81,12 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
+import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -520,6 +527,58 @@ public class BigtableIOTest {
reader.close();
}
+ @Test
+ public void testAddBulkOptions() {
+ BigtableOptions.Builder optionsBuilder = BIGTABLE_OPTIONS.toBuilder();
+ optionsBuilder = BigtableIO.addBulkOptions(optionsBuilder);
+
+ BulkOptions bulkOptions = optionsBuilder.build().getBulkOptions();
+ assertEquals(BulkOptions.BIGTABLE_ASYNC_MUTATOR_COUNT_DEFAULT,
+ bulkOptions.getAsyncMutatorCount());
+ assertEquals(true, bulkOptions.useBulkApi());
+ assertEquals(BulkOptions.BIGTABLE_BULK_MAX_ROW_KEY_COUNT_DEFAULT,
+ bulkOptions.getBulkMaxRowKeyCount());
+ assertEquals(BulkOptions.BIGTABLE_BULK_MAX_REQUEST_SIZE_BYTES_DEFAULT,
+ bulkOptions.getBulkMaxRequestSize());
+ assertEquals(BulkOptions.BIGTABLE_MAX_INFLIGHT_RPCS_PER_CHANNEL_DEFAULT
+ * optionsBuilder.getDataChannelCount(), bulkOptions.getMaxInflightRpcs());
+ assertEquals(BulkOptions.BIGTABLE_MAX_MEMORY_DEFAULT, bulkOptions.getMaxMemory());
+ }
+
+ @Test
+ public void testAddRetryOptions() {
+ final double delta = 0.0000001;
+ BigtableOptions.Builder optionsBuilder = BIGTABLE_OPTIONS.toBuilder();
+ optionsBuilder = BigtableIO.addRetryOptions(optionsBuilder);
+
+ RetryOptions retryOptions = optionsBuilder.build().getRetryOptions();
+ assertEquals(RetryOptions.DEFAULT_ENABLE_GRPC_RETRIES, retryOptions.enableRetries());
+ assertEquals(RetryOptions.DEFAULT_INITIAL_BACKOFF_MILLIS,
+ retryOptions.getInitialBackoffMillis());
+ assertEquals(RetryOptions.DEFAULT_BACKOFF_MULTIPLIER, retryOptions.getBackoffMultiplier(),
+ delta);
+ assertEquals(RetryOptions.DEFAULT_MAX_ELAPSED_BACKOFF_MILLIS,
+ retryOptions.getMaxElaspedBackoffMillis());
+ assertEquals(RetryOptions.DEFAULT_STREAMING_BUFFER_SIZE, retryOptions.getStreamingBufferSize());
+ assertEquals(RetryOptions.DEFAULT_STREAMING_BATCH_SIZE, retryOptions.getStreamingBatchSize());
+ assertEquals(RetryOptions.DEFAULT_READ_PARTIAL_ROW_TIMEOUT_MS,
+ retryOptions.getReadPartialRowTimeoutMillis());
+ assertEquals(RetryOptions.DEFAULT_MAX_SCAN_TIMEOUT_RETRIES,
+ retryOptions.getMaxScanTimeoutRetries());
+ assertFalse(retryOptions.allowRetriesWithoutTimestamp());
+
+ Set<Status.Code> statusToRetryOn = new HashSet<>();
+ for (Status.Code code : Status.Code.values()) {
+ if (retryOptions.isRetryable(code)) {
+ statusToRetryOn.add(code);
+ }
+ }
+
+ Set<Status.Code> defaultStatusToRetryOn =
+ new HashSet<>(RetryOptions.DEFAULT_ENABLE_GRPC_RETRIES_SET);
+ assertThat(statusToRetryOn, Matchers.containsInAnyOrder(defaultStatusToRetryOn.toArray()));
+ }
+
////////////////////////////////////////////////////////////////////////////////////////////
private static final String COLUMN_FAMILY_NAME = "family";
private static final ByteString COLUMN_NAME = ByteString.copyFromUtf8("column");
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/653c504f/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
index af7afc5..8e17761 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
@@ -35,7 +35,9 @@ import com.google.bigtable.admin.table.v1.Table;
import com.google.bigtable.v1.Mutation;
import com.google.bigtable.v1.ReadRowsRequest;
import com.google.bigtable.v1.Row;
+import com.google.bigtable.v1.RowRange;
import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.cloud.bigtable.config.RetryOptions;
import com.google.cloud.bigtable.grpc.BigtableSession;
import com.google.cloud.bigtable.grpc.BigtableTableAdminClient;
import com.google.cloud.bigtable.grpc.scanner.ResultScanner;
@@ -78,11 +80,17 @@ public class BigtableWriteIT implements Serializable {
PipelineOptionsFactory.register(BigtableTestOptions.class);
options = TestPipeline.testingPipelineOptions().as(BigtableTestOptions.class);
+ // RetryOptions streamingBatchSize must be explicitly set for getTableData()
+ RetryOptions.Builder retryOptionsBuilder = new RetryOptions.Builder();
+ retryOptionsBuilder.setStreamingBatchSize(
+ retryOptionsBuilder.build().getStreamingBufferSize() / 2);
+
BigtableOptions.Builder bigtableOptionsBuilder = new BigtableOptions.Builder()
.setProjectId(options.getProjectId())
.setClusterId(options.getClusterId())
.setZoneId(options.getZoneId())
- .setUserAgent("apache-beam-test");
+ .setUserAgent("apache-beam-test")
+ .setRetryOptions(retryOptionsBuilder.build());
bigtableOptions = bigtableOptionsBuilder.build();
session = new BigtableSession(bigtableOptions);
@@ -172,9 +180,15 @@ public class BigtableWriteIT implements Serializable {
/** Helper function to get a table's data. */
private List<KV<ByteString, ByteString>> getTableData(String tableName) throws IOException {
+ // Add empty range to avoid TARGET_NOT_SET error
+ RowRange range = RowRange.newBuilder()
+ .setStartKey(ByteString.EMPTY)
+ .setEndKey(ByteString.EMPTY)
+ .build();
List<KV<ByteString, ByteString>> tableData = new ArrayList<>();
ReadRowsRequest.Builder readRowsRequestBuilder = ReadRowsRequest.newBuilder()
- .setTableName(tableName);
+ .setTableName(tableName)
+ .setRowRange(range);
ResultScanner<Row> scanner = session.getDataClient().readRows(readRowsRequestBuilder.build());
Row currentRow;
[2/2] incubator-beam git commit: Closes #481
Posted by dh...@apache.org.
Closes #481
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/38866cd5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/38866cd5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/38866cd5
Branch: refs/heads/master
Commit: 38866cd5555a78a3b930961e2bd882b33731cdb2
Parents: 4a0e426 653c504
Author: Dan Halperin <dh...@google.com>
Authored: Thu Jun 30 13:05:55 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Jun 30 13:05:55 2016 -0700
----------------------------------------------------------------------
pom.xml | 2 +-
sdks/java/core/pom.xml | 2 +-
sdks/java/io/google-cloud-platform/pom.xml | 2 +-
.../beam/sdk/io/gcp/bigtable/BigtableIO.java | 60 +++++++++++++++++++-
.../io/gcp/bigtable/BigtableServiceImpl.java | 31 +++++-----
.../sdk/io/gcp/bigtable/BigtableIOTest.java | 59 +++++++++++++++++++
.../sdk/io/gcp/bigtable/BigtableWriteIT.java | 18 +++++-
7 files changed, 150 insertions(+), 24 deletions(-)
----------------------------------------------------------------------