You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/03/21 18:52:10 UTC
[1/2] beam git commit: Upgrade to bigtable-client-core 0.9.5.1
Repository: beam
Updated Branches:
refs/heads/master 92d1a6635 -> 7e97820c5
Upgrade to bigtable-client-core 0.9.5.1
Transitively bump netty version to 4.1.6.Final
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d1caa1cd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d1caa1cd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d1caa1cd
Branch: refs/heads/master
Commit: d1caa1cde40cfb955c53e6cb67a86b1528b3935a
Parents: 92d1a66
Author: Andrew Martin <am...@spotify.com>
Authored: Fri Mar 17 13:21:12 2017 -0400
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Mar 21 11:51:48 2017 -0700
----------------------------------------------------------------------
pom.xml | 2 +-
sdks/java/io/google-cloud-platform/pom.xml | 2 +-
.../beam/sdk/io/gcp/bigtable/BigtableIO.java | 48 +-------------------
.../beam/sdk/io/gcp/GcpApiSurfaceTest.java | 3 +-
.../sdk/io/gcp/bigtable/BigtableIOTest.java | 12 +----
.../sdk/io/gcp/bigtable/BigtableWriteIT.java | 15 ++----
6 files changed, 9 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d1caa1cd/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 65c5012..a4b1090 100644
--- a/pom.xml
+++ b/pom.xml
@@ -127,7 +127,7 @@
<joda.version>2.4</joda.version>
<junit.version>4.12</junit.version>
<mockito.version>1.9.5</mockito.version>
- <netty.version>4.1.3.Final</netty.version>
+ <netty.version>4.1.6.Final</netty.version>
<os-maven-plugin.version>1.4.0.Final</os-maven-plugin.version>
<protobuf.version>3.1.0</protobuf.version>
<pubsub.version>v1-rev10-1.22.0</pubsub.version>
http://git-wip-us.apache.org/repos/asf/beam/blob/d1caa1cd/sdks/java/io/google-cloud-platform/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml
index 393db18..0d4f023 100644
--- a/sdks/java/io/google-cloud-platform/pom.xml
+++ b/sdks/java/io/google-cloud-platform/pom.xml
@@ -32,7 +32,7 @@
<packaging>jar</packaging>
<properties>
- <bigtable.version>0.9.2</bigtable.version>
+ <bigtable.version>0.9.5.1</bigtable.version>
</properties>
<build>
http://git-wip-us.apache.org/repos/asf/beam/blob/d1caa1cd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 2d6cbba..7091e15 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -29,14 +29,12 @@ import com.google.bigtable.v2.SampleRowKeysResponse;
import com.google.cloud.bigtable.config.BigtableOptions;
import com.google.cloud.bigtable.config.CredentialOptions;
import com.google.cloud.bigtable.config.CredentialOptions.CredentialType;
-import com.google.cloud.bigtable.config.RetryOptions;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.protobuf.ByteString;
-import io.grpc.Status;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
@@ -213,20 +211,10 @@ public class BigtableIO {
checkNotNull(optionsBuilder, "optionsBuilder");
// TODO: is there a better way to clone a Builder? Want it to be immune from user changes.
BigtableOptions options = optionsBuilder.build();
- RetryOptions retryOptions = options.getRetryOptions();
// Set data channel count to one because there is only 1 scanner in this session
- // Use retryOptionsToBuilder because absent in Bigtable library
- // TODO: replace with RetryOptions.toBuilder() when added to Bigtable library
- // Set batch size because of bug (incorrect initialization) in Bigtable library
- // TODO: remove setRetryOptions when fixed in Bigtable library
BigtableOptions.Builder clonedBuilder = options.toBuilder()
- .setDataChannelCount(1)
- .setRetryOptions(
- retryOptionsToBuilder(retryOptions)
- .setStreamingBatchSize(Math.min(retryOptions.getStreamingBatchSize(),
- retryOptions.getStreamingBufferSize() / 2))
- .build());
+ .setDataChannelCount(1);
BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(getUserAgent()).build();
return new Read(optionsWithAgent, tableId, keyRange, filter, bigtableService);
@@ -454,22 +442,12 @@ public class BigtableIO {
checkNotNull(optionsBuilder, "optionsBuilder");
// TODO: is there a better way to clone a Builder? Want it to be immune from user changes.
BigtableOptions options = optionsBuilder.build();
- RetryOptions retryOptions = options.getRetryOptions();
// Set useBulkApi to true for enabling bulk writes
- // Use retryOptionsToBuilder because absent in Bigtable library
- // TODO: replace with RetryOptions.toBuilder() when added to Bigtable library
- // Set batch size because of bug (incorrect initialization) in Bigtable library
- // TODO: remove setRetryOptions when fixed in Bigtable library
BigtableOptions.Builder clonedBuilder = options.toBuilder()
.setBulkOptions(
options.getBulkOptions().toBuilder()
.setUseBulkApi(true)
- .build())
- .setRetryOptions(
- retryOptionsToBuilder(retryOptions)
- .setStreamingBatchSize(Math.min(retryOptions.getStreamingBatchSize(),
- retryOptions.getStreamingBufferSize() / 2))
.build());
BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(getUserAgent()).build();
return new Write(optionsWithAgent, tableId, bigtableService);
@@ -1068,28 +1046,4 @@ public class BigtableIO {
javaVersion,
"0.3.0" /* TODO get Bigtable client version directly from jar. */);
}
-
- /**
- * A helper function to convert a RetryOptions into a RetryOptions.Builder.
- */
- private static RetryOptions.Builder retryOptionsToBuilder(RetryOptions options) {
- RetryOptions.Builder builder = new RetryOptions.Builder();
- builder.setEnableRetries(options.enableRetries());
- builder.setInitialBackoffMillis(options.getInitialBackoffMillis());
- builder.setBackoffMultiplier(options.getBackoffMultiplier());
- builder.setMaxElapsedBackoffMillis(options.getMaxElaspedBackoffMillis());
- builder.setStreamingBufferSize(options.getStreamingBufferSize());
- builder.setStreamingBatchSize(options.getStreamingBatchSize());
- builder.setReadPartialRowTimeoutMillis(options.getReadPartialRowTimeoutMillis());
- builder.setMaxScanTimeoutRetries(options.getMaxScanTimeoutRetries());
- builder.setAllowRetriesWithoutTimestamp(options.allowRetriesWithoutTimestamp());
-
- for (Status.Code code : Status.Code.values()) {
- if (options.isRetryable(code)) {
- builder.addStatusToRetryOn(code);
- }
- }
-
- return builder;
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/d1caa1cd/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
index 717c6d3..0987140 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/GcpApiSurfaceTest.java
@@ -55,10 +55,9 @@ public class GcpApiSurfaceTest {
classesInPackage("com.google.auth"),
classesInPackage("com.google.bigtable.v2"),
classesInPackage("com.google.cloud.bigtable.config"),
+ Matchers.<Class<?>>equalTo(com.google.cloud.bigtable.grpc.BigtableClusterName.class),
Matchers.<Class<?>>equalTo(com.google.cloud.bigtable.grpc.BigtableInstanceName.class),
Matchers.<Class<?>>equalTo(com.google.cloud.bigtable.grpc.BigtableTableName.class),
- // https://github.com/GoogleCloudPlatform/cloud-bigtable-client/pull/1056
- classesInPackage("com.google.common.collect"),
// via Bigtable, PR above out to fix.
classesInPackage("com.google.datastore.v1"),
classesInPackage("com.google.protobuf"),
http://git-wip-us.apache.org/repos/asf/beam/blob/d1caa1cd/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index 878785b..1c770a2 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -699,14 +699,10 @@ public class BigtableIOTest {
BigtableIO.read().withBigtableOptions(optionsBuilder.build());
BigtableOptions options = read.getBigtableOptions();
- assertEquals(RetryOptions.DEFAULT_STREAMING_BATCH_SIZE,
- options.getRetryOptions().getStreamingBatchSize());
assertEquals(initialBackoffMillis, options.getRetryOptions().getInitialBackoffMillis());
assertThat(options.getRetryOptions(),
- Matchers.equalTo(retryOptionsBuilder
- .setStreamingBatchSize(RetryOptions.DEFAULT_STREAMING_BATCH_SIZE)
- .build()));
+ Matchers.equalTo(retryOptionsBuilder.build()));
}
@Test
@@ -731,8 +727,6 @@ public class BigtableIOTest {
BigtableOptions options = write.getBigtableOptions();
assertEquals(true, options.getBulkOptions().useBulkApi());
assertEquals(maxInflightRpcs, options.getBulkOptions().getMaxInflightRpcs());
- assertEquals(RetryOptions.DEFAULT_STREAMING_BATCH_SIZE,
- options.getRetryOptions().getStreamingBatchSize());
assertEquals(initialBackoffMillis, options.getRetryOptions().getInitialBackoffMillis());
assertThat(options.getBulkOptions(),
@@ -740,9 +734,7 @@ public class BigtableIOTest {
.setUseBulkApi(true)
.build()));
assertThat(options.getRetryOptions(),
- Matchers.equalTo(retryOptionsBuilder
- .setStreamingBatchSize(RetryOptions.DEFAULT_STREAMING_BATCH_SIZE)
- .build()));
+ Matchers.equalTo(retryOptionsBuilder.build()));
}
////////////////////////////////////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/beam/blob/d1caa1cd/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
index d7792f4..240fb31 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
@@ -32,7 +32,6 @@ import com.google.bigtable.v2.RowSet;
import com.google.cloud.bigtable.config.BigtableOptions;
import com.google.cloud.bigtable.config.BigtableOptions.Builder;
import com.google.cloud.bigtable.config.CredentialOptions;
-import com.google.cloud.bigtable.config.RetryOptions;
import com.google.cloud.bigtable.grpc.BigtableSession;
import com.google.cloud.bigtable.grpc.BigtableTableAdminClient;
import com.google.cloud.bigtable.grpc.scanner.ResultScanner;
@@ -43,7 +42,6 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
-import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.CountingInput;
import org.apache.beam.sdk.options.GcpOptions;
@@ -81,17 +79,11 @@ public class BigtableWriteIT implements Serializable {
PipelineOptionsFactory.register(BigtableTestOptions.class);
options = TestPipeline.testingPipelineOptions().as(BigtableTestOptions.class);
- // RetryOptions streamingBatchSize must be explicitly set for getTableData()
- RetryOptions.Builder retryOptionsBuilder = new RetryOptions.Builder();
- retryOptionsBuilder.setStreamingBatchSize(
- retryOptionsBuilder.build().getStreamingBufferSize() / 2);
-
bigtableOptions =
new Builder()
.setProjectId(options.getProjectId())
.setInstanceId(options.getInstanceId())
.setUserAgent("apache-beam-test")
- .setRetryOptions(retryOptionsBuilder.build())
.build();
session =
@@ -137,8 +129,8 @@ public class BigtableWriteIT implements Serializable {
// Test number of column families and column family name equality
Table table = getTable(tableName);
- assertThat(table.getColumnFamilies().keySet(), Matchers.hasSize(1));
- assertThat(table.getColumnFamilies(), Matchers.hasKey(COLUMN_FAMILY_NAME));
+ assertThat(table.getColumnFamiliesMap().keySet(), Matchers.hasSize(1));
+ assertThat(table.getColumnFamiliesMap(), Matchers.hasKey(COLUMN_FAMILY_NAME));
// Test table data equality
List<KV<ByteString, ByteString>> tableData = getTableData(tableName);
@@ -168,8 +160,7 @@ public class BigtableWriteIT implements Serializable {
/** Helper function to create an empty table. */
private void createEmptyTable(String instanceName, String tableId) {
Table.Builder tableBuilder = Table.newBuilder();
- Map<String, ColumnFamily> columnFamilies = tableBuilder.getMutableColumnFamilies();
- columnFamilies.put(COLUMN_FAMILY_NAME, ColumnFamily.newBuilder().build());
+ tableBuilder.putColumnFamilies(COLUMN_FAMILY_NAME, ColumnFamily.newBuilder().build());
CreateTableRequest.Builder createTableRequestBuilder = CreateTableRequest.newBuilder()
.setParent(instanceName)
[2/2] beam git commit: This closes #2265
Posted by dh...@apache.org.
This closes #2265
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7e97820c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7e97820c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7e97820c
Branch: refs/heads/master
Commit: 7e97820c5eeabaa919216b9f770dd7d2eabed623
Parents: 92d1a66 d1caa1c
Author: Dan Halperin <dh...@google.com>
Authored: Tue Mar 21 11:52:00 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Mar 21 11:52:00 2017 -0700
----------------------------------------------------------------------
pom.xml | 2 +-
sdks/java/io/google-cloud-platform/pom.xml | 2 +-
.../beam/sdk/io/gcp/bigtable/BigtableIO.java | 48 +-------------------
.../beam/sdk/io/gcp/GcpApiSurfaceTest.java | 3 +-
.../sdk/io/gcp/bigtable/BigtableIOTest.java | 12 +----
.../sdk/io/gcp/bigtable/BigtableWriteIT.java | 15 ++----
6 files changed, 9 insertions(+), 73 deletions(-)
----------------------------------------------------------------------