You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/09 00:31:23 UTC
[1/2] incubator-beam git commit: Closes #590
Repository: incubator-beam
Updated Branches:
refs/heads/master d9632b7f9 -> 90abca193
Closes #590
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/90abca19
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/90abca19
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/90abca19
Branch: refs/heads/master
Commit: 90abca193cb7e8f1f6dc200f6444424daed11716
Parents: d9632b7 9bf5376
Author: Dan Halperin <dh...@google.com>
Authored: Fri Jul 8 17:31:15 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Jul 8 17:31:15 2016 -0700
----------------------------------------------------------------------
sdks/java/io/google-cloud-platform/pom.xml | 2 +-
.../beam/sdk/io/gcp/bigtable/BigtableIO.java | 46 ++++++++++----------
.../sdk/io/gcp/bigtable/BigtableService.java | 10 ++---
.../io/gcp/bigtable/BigtableServiceImpl.java | 39 +++++++++--------
.../io/gcp/bigtable/BigtableTestOptions.java | 11 ++---
.../sdk/io/gcp/bigtable/BigtableIOTest.java | 38 ++++++++--------
.../sdk/io/gcp/bigtable/BigtableReadIT.java | 5 +--
.../sdk/io/gcp/bigtable/BigtableWriteIT.java | 45 ++++++++++---------
8 files changed, 96 insertions(+), 100 deletions(-)
----------------------------------------------------------------------
[2/2] incubator-beam git commit: [BEAM-426] Update
bigtable-client-core to 0.9.0
Posted by dh...@apache.org.
[BEAM-426] Update bigtable-client-core to 0.9.0
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9bf5376c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9bf5376c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9bf5376c
Branch: refs/heads/master
Commit: 9bf5376c54ede4dae874a4dccf098670e51b8d2f
Parents: d9632b7
Author: Neville Li <ne...@spotify.com>
Authored: Tue Jul 5 15:31:50 2016 -0400
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Jul 8 17:31:15 2016 -0700
----------------------------------------------------------------------
sdks/java/io/google-cloud-platform/pom.xml | 2 +-
.../beam/sdk/io/gcp/bigtable/BigtableIO.java | 46 ++++++++++----------
.../sdk/io/gcp/bigtable/BigtableService.java | 10 ++---
.../io/gcp/bigtable/BigtableServiceImpl.java | 39 +++++++++--------
.../io/gcp/bigtable/BigtableTestOptions.java | 11 ++---
.../sdk/io/gcp/bigtable/BigtableIOTest.java | 38 ++++++++--------
.../sdk/io/gcp/bigtable/BigtableReadIT.java | 5 +--
.../sdk/io/gcp/bigtable/BigtableWriteIT.java | 45 ++++++++++---------
8 files changed, 96 insertions(+), 100 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9bf5376c/sdks/java/io/google-cloud-platform/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml
index c7e77f1..0a814c1 100644
--- a/sdks/java/io/google-cloud-platform/pom.xml
+++ b/sdks/java/io/google-cloud-platform/pom.xml
@@ -32,7 +32,7 @@
<packaging>jar</packaging>
<properties>
- <bigtable.version>0.3.0</bigtable.version>
+ <bigtable.version>0.9.0</bigtable.version>
</properties>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9bf5376c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 4bab45e..84d52f6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -41,10 +41,11 @@ import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
-import com.google.bigtable.v1.Mutation;
-import com.google.bigtable.v1.Row;
-import com.google.bigtable.v1.RowFilter;
-import com.google.bigtable.v1.SampleRowKeysResponse;
+import com.google.bigtable.v2.MutateRowResponse;
+import com.google.bigtable.v2.Mutation;
+import com.google.bigtable.v2.Row;
+import com.google.bigtable.v2.RowFilter;
+import com.google.bigtable.v2.SampleRowKeysResponse;
import com.google.cloud.bigtable.config.BigtableOptions;
import com.google.cloud.bigtable.config.RetryOptions;
import com.google.common.base.MoreObjects;
@@ -52,7 +53,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.protobuf.ByteString;
-import com.google.protobuf.Empty;
import io.grpc.Status;
@@ -81,15 +81,14 @@ import javax.annotation.Nullable;
*
* <p>To configure a Cloud Bigtable source, you must supply a table id and a {@link BigtableOptions}
* or builder configured with the project and other information necessary to identify the
- * Bigtable cluster. A {@link RowFilter} may also optionally be specified using
+ * Bigtable instance. A {@link RowFilter} may also optionally be specified using
* {@link BigtableIO.Read#withRowFilter}. For example:
*
* <pre>{@code
* BigtableOptions.Builder optionsBuilder =
* new BigtableOptions.Builder()
* .setProjectId("project")
- * .setClusterId("cluster")
- * .setZoneId("zone");
+ * .setInstanceId("instance");
*
* Pipeline p = ...;
*
@@ -116,14 +115,13 @@ import javax.annotation.Nullable;
*
* <p>To configure a Cloud Bigtable sink, you must supply a table id and a {@link BigtableOptions}
* or builder configured with the project and other information necessary to identify the
- * Bigtable cluster, for example:
+ * Bigtable instance, for example:
*
* <pre>{@code
* BigtableOptions.Builder optionsBuilder =
* new BigtableOptions.Builder()
* .setProjectId("project")
- * .setClusterId("cluster")
- * .setZoneId("zone");
+ * .setInstanceId("instance");
*
* PCollection<KV<ByteString, Iterable<Mutation>>> data = ...;
*
@@ -153,7 +151,7 @@ public class BigtableIO {
* Creates an uninitialized {@link BigtableIO.Read}. Before use, the {@code Read} must be
* initialized with a
* {@link BigtableIO.Read#withBigtableOptions(BigtableOptions) BigtableOptions} that specifies
- * the source Cloud Bigtable cluster, and a {@link BigtableIO.Read#withTableId tableId} that
+ * the source Cloud Bigtable instance, and a {@link BigtableIO.Read#withTableId tableId} that
* specifies which table to read. A {@link RowFilter} may also optionally be specified using
* {@link BigtableIO.Read#withRowFilter}.
*/
@@ -166,8 +164,8 @@ public class BigtableIO {
* Creates an uninitialized {@link BigtableIO.Write}. Before use, the {@code Write} must be
* initialized with a
* {@link BigtableIO.Write#withBigtableOptions(BigtableOptions) BigtableOptions} that specifies
- * the destination Cloud Bigtable cluster, and a {@link BigtableIO.Write#withTableId tableId} that
- * specifies which table to write.
+ * the destination Cloud Bigtable instance, and a {@link BigtableIO.Write#withTableId tableId}
+ * that specifies which table to write.
*/
@Experimental
public static Write write() {
@@ -183,7 +181,7 @@ public class BigtableIO {
@Experimental
public static class Read extends PTransform<PBegin, PCollection<Row>> {
/**
- * Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable cluster
+ * Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable instance
* indicated by the given options, and using any other specified customizations.
*
* <p>Does not modify this object.
@@ -194,7 +192,7 @@ public class BigtableIO {
}
/**
- * Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable cluster
+ * Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable instance
* indicated by the given options, and using any other specified customizations.
*
* <p>Clones the given {@link BigtableOptions} builder so that any further changes
@@ -247,7 +245,7 @@ public class BigtableIO {
}
/**
- * Returns the Google Cloud Bigtable cluster being read from, and other parameters.
+ * Returns the Google Cloud Bigtable instance being read from, and other parameters.
*/
public BigtableOptions getBigtableOptions() {
return options;
@@ -308,7 +306,7 @@ public class BigtableIO {
/////////////////////////////////////////////////////////////////////////////////////////
/**
- * Used to define the Cloud Bigtable cluster and any options for the networking layer.
+ * Used to define the Cloud Bigtable instance and any options for the networking layer.
* Cannot actually be {@code null} at validation time, but may start out {@code null} while
* source is being built.
*/
@@ -364,7 +362,7 @@ public class BigtableIO {
public static class Write
extends PTransform<PCollection<KV<ByteString, Iterable<Mutation>>>, PDone> {
/**
- * Used to define the Cloud Bigtable cluster and any options for the networking layer.
+ * Used to define the Cloud Bigtable instance and any options for the networking layer.
* Cannot actually be {@code null} at validation time, but may start out {@code null} while
* source is being built.
*/
@@ -382,7 +380,7 @@ public class BigtableIO {
}
/**
- * Returns a new {@link BigtableIO.Write} that will write to the Cloud Bigtable cluster
+ * Returns a new {@link BigtableIO.Write} that will write to the Cloud Bigtable instance
* indicated by the given options, and using any other specified customizations.
*
* <p>Does not modify this object.
@@ -393,7 +391,7 @@ public class BigtableIO {
}
/**
- * Returns a new {@link BigtableIO.Write} that will write to the Cloud Bigtable cluster
+ * Returns a new {@link BigtableIO.Write} that will write to the Cloud Bigtable instance
* indicated by the given options, and using any other specified customizations.
*
* <p>Clones the given {@link BigtableOptions} builder so that any further changes
@@ -437,7 +435,7 @@ public class BigtableIO {
}
/**
- * Returns the Google Cloud Bigtable cluster being written to, and other parameters.
+ * Returns the Google Cloud Bigtable instance being written to, and other parameters.
*/
public BigtableOptions getBigtableOptions() {
return options;
@@ -585,7 +583,7 @@ public class BigtableIO {
throw new IOException(message);
}
- private class WriteExceptionCallback implements FutureCallback<Empty> {
+ private class WriteExceptionCallback implements FutureCallback<MutateRowResponse> {
private final KV<ByteString, Iterable<Mutation>> value;
public WriteExceptionCallback(KV<ByteString, Iterable<Mutation>> value) {
@@ -598,7 +596,7 @@ public class BigtableIO {
}
@Override
- public void onSuccess(Empty produced) {}
+ public void onSuccess(MutateRowResponse produced) {}
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9bf5376c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
index 93e558b..2a7e3a0 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
@@ -20,12 +20,12 @@ package org.apache.beam.sdk.io.gcp.bigtable;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource;
import org.apache.beam.sdk.values.KV;
-import com.google.bigtable.v1.Mutation;
-import com.google.bigtable.v1.Row;
-import com.google.bigtable.v1.SampleRowKeysResponse;
+import com.google.bigtable.v2.MutateRowResponse;
+import com.google.bigtable.v2.Mutation;
+import com.google.bigtable.v2.Row;
+import com.google.bigtable.v2.SampleRowKeysResponse;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
-import com.google.protobuf.Empty;
import java.io.IOException;
import java.io.Serializable;
@@ -48,7 +48,7 @@ interface BigtableService extends Serializable {
*
* @throws IOException if there is an error submitting the write.
*/
- ListenableFuture<Empty> writeRecord(KV<ByteString, Iterable<Mutation>> record)
+ ListenableFuture<MutateRowResponse> writeRecord(KV<ByteString, Iterable<Mutation>> record)
throws IOException;
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9bf5376c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
index a0e6b29..b9288df 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
@@ -20,14 +20,16 @@ package org.apache.beam.sdk.io.gcp.bigtable;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource;
import org.apache.beam.sdk.values.KV;
-import com.google.bigtable.admin.table.v1.GetTableRequest;
-import com.google.bigtable.v1.MutateRowRequest;
-import com.google.bigtable.v1.Mutation;
-import com.google.bigtable.v1.ReadRowsRequest;
-import com.google.bigtable.v1.Row;
-import com.google.bigtable.v1.RowRange;
-import com.google.bigtable.v1.SampleRowKeysRequest;
-import com.google.bigtable.v1.SampleRowKeysResponse;
+import com.google.bigtable.admin.v2.GetTableRequest;
+import com.google.bigtable.v2.MutateRowRequest;
+import com.google.bigtable.v2.MutateRowResponse;
+import com.google.bigtable.v2.Mutation;
+import com.google.bigtable.v2.ReadRowsRequest;
+import com.google.bigtable.v2.Row;
+import com.google.bigtable.v2.RowRange;
+import com.google.bigtable.v2.RowSet;
+import com.google.bigtable.v2.SampleRowKeysRequest;
+import com.google.bigtable.v2.SampleRowKeysResponse;
import com.google.cloud.bigtable.config.BigtableOptions;
import com.google.cloud.bigtable.grpc.BigtableSession;
import com.google.cloud.bigtable.grpc.BigtableTableName;
@@ -38,7 +40,6 @@ import com.google.common.base.MoreObjects;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
-import com.google.protobuf.Empty;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
@@ -66,7 +67,7 @@ class BigtableServiceImpl implements BigtableService {
@Override
public BigtableWriterImpl openForWriting(String tableId) throws IOException {
BigtableSession session = new BigtableSession(options);
- BigtableTableName tableName = options.getClusterName().toTableName(tableId);
+ BigtableTableName tableName = options.getInstanceName().toTableName(tableId);
return new BigtableWriterImpl(session, tableName);
}
@@ -84,7 +85,7 @@ class BigtableServiceImpl implements BigtableService {
try (BigtableSession session = new BigtableSession(options)) {
GetTableRequest getTable =
GetTableRequest.newBuilder()
- .setName(options.getClusterName().toTableNameStr(tableId))
+ .setName(options.getInstanceName().toTableNameStr(tableId))
.build();
session.getTableAdminClient().getTable(getTable);
return true;
@@ -115,13 +116,16 @@ class BigtableServiceImpl implements BigtableService {
public boolean start() throws IOException {
RowRange range =
RowRange.newBuilder()
- .setStartKey(source.getRange().getStartKey().getValue())
- .setEndKey(source.getRange().getEndKey().getValue())
+ .setStartKeyClosed(source.getRange().getStartKey().getValue())
+ .setEndKeyOpen(source.getRange().getEndKey().getValue())
.build();
+ RowSet rowSet = RowSet.newBuilder()
+ .addRowRanges(range)
+ .build();
ReadRowsRequest.Builder requestB =
ReadRowsRequest.newBuilder()
- .setRowRange(range)
- .setTableName(options.getClusterName().toTableNameStr(source.getTableId()));
+ .setRows(rowSet)
+ .setTableName(options.getInstanceName().toTableNameStr(source.getTableId()));
if (source.getRowFilter() != null) {
requestB.setFilter(source.getRowFilter());
}
@@ -200,7 +204,8 @@ class BigtableServiceImpl implements BigtableService {
}
@Override
- public ListenableFuture<Empty> writeRecord(KV<ByteString, Iterable<Mutation>> record)
+ public ListenableFuture<MutateRowResponse> writeRecord(
+ KV<ByteString, Iterable<Mutation>> record)
throws IOException {
MutateRowRequest r =
partialBuilder
@@ -231,7 +236,7 @@ class BigtableServiceImpl implements BigtableService {
try (BigtableSession session = new BigtableSession(options)) {
SampleRowKeysRequest request =
SampleRowKeysRequest.newBuilder()
- .setTableName(options.getClusterName().toTableNameStr(source.getTableId()))
+ .setTableName(options.getInstanceName().toTableNameStr(source.getTableId()))
.build();
return session.getDataClient().sampleRowKeys(request);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9bf5376c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestOptions.java
index 0cd4f57..0ab7576 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestOptions.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestOptions.java
@@ -30,13 +30,8 @@ public interface BigtableTestOptions extends TestPipelineOptions {
String getProjectId();
void setProjectId(String value);
- @Description("Cluster ID for Bigtable")
+ @Description("Instance ID for Bigtable")
@Default.String("beam-test")
- String getClusterId();
- void setClusterId(String value);
-
- @Description("Zone ID for Bigtable")
- @Default.String("us-central1-c")
- String getZoneId();
- void setZoneId(String value);
+ String getInstanceId();
+ void setInstanceId(String value);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9bf5376c/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index a6a7f9d..f5f0682 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -49,14 +49,15 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
-import com.google.bigtable.v1.Cell;
-import com.google.bigtable.v1.Column;
-import com.google.bigtable.v1.Family;
-import com.google.bigtable.v1.Mutation;
-import com.google.bigtable.v1.Mutation.SetCell;
-import com.google.bigtable.v1.Row;
-import com.google.bigtable.v1.RowFilter;
-import com.google.bigtable.v1.SampleRowKeysResponse;
+import com.google.bigtable.v2.Cell;
+import com.google.bigtable.v2.Column;
+import com.google.bigtable.v2.Family;
+import com.google.bigtable.v2.MutateRowResponse;
+import com.google.bigtable.v2.Mutation;
+import com.google.bigtable.v2.Mutation.SetCell;
+import com.google.bigtable.v2.Row;
+import com.google.bigtable.v2.RowFilter;
+import com.google.bigtable.v2.SampleRowKeysResponse;
import com.google.cloud.bigtable.config.BigtableOptions;
import com.google.cloud.bigtable.config.BulkOptions;
import com.google.cloud.bigtable.config.RetryOptions;
@@ -67,7 +68,6 @@ import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
-import com.google.protobuf.Empty;
import org.hamcrest.Matchers;
import org.junit.Before;
@@ -108,8 +108,7 @@ public class BigtableIOTest {
private static final BigtableOptions BIGTABLE_OPTIONS =
new BigtableOptions.Builder()
.setProjectId("project")
- .setClusterId("cluster")
- .setZoneId("zone")
+ .setInstanceId("instance")
.build();
private static BigtableIO.Read defaultRead =
BigtableIO.read().withBigtableOptions(BIGTABLE_OPTIONS);
@@ -132,8 +131,7 @@ public class BigtableIOTest {
BigtableIO.Read read =
BigtableIO.read().withBigtableOptions(BIGTABLE_OPTIONS).withTableId("table");
assertEquals("project", read.getBigtableOptions().getProjectId());
- assertEquals("cluster", read.getBigtableOptions().getClusterId());
- assertEquals("zone", read.getBigtableOptions().getZoneId());
+ assertEquals("instance", read.getBigtableOptions().getInstanceId());
assertEquals("table", read.getTableId());
}
@@ -142,8 +140,7 @@ public class BigtableIOTest {
BigtableIO.Read read =
BigtableIO.read().withTableId("table").withBigtableOptions(BIGTABLE_OPTIONS);
assertEquals("project", read.getBigtableOptions().getProjectId());
- assertEquals("cluster", read.getBigtableOptions().getClusterId());
- assertEquals("zone", read.getBigtableOptions().getZoneId());
+ assertEquals("instance", read.getBigtableOptions().getInstanceId());
assertEquals("table", read.getTableId());
}
@@ -153,17 +150,15 @@ public class BigtableIOTest {
BigtableIO.write().withBigtableOptions(BIGTABLE_OPTIONS).withTableId("table");
assertEquals("table", write.getTableId());
assertEquals("project", write.getBigtableOptions().getProjectId());
- assertEquals("zone", write.getBigtableOptions().getZoneId());
- assertEquals("cluster", write.getBigtableOptions().getClusterId());
+ assertEquals("instance", write.getBigtableOptions().getInstanceId());
}
@Test
public void testWriteBuildsCorrectlyInDifferentOrder() {
BigtableIO.Write write =
BigtableIO.write().withTableId("table").withBigtableOptions(BIGTABLE_OPTIONS);
- assertEquals("cluster", write.getBigtableOptions().getClusterId());
assertEquals("project", write.getBigtableOptions().getProjectId());
- assertEquals("zone", write.getBigtableOptions().getZoneId());
+ assertEquals("instance", write.getBigtableOptions().getInstanceId());
assertEquals("table", write.getTableId());
}
@@ -791,7 +786,8 @@ public class BigtableIOTest {
}
@Override
- public ListenableFuture<Empty> writeRecord(KV<ByteString, Iterable<Mutation>> record) {
+ public ListenableFuture<MutateRowResponse> writeRecord(
+ KV<ByteString, Iterable<Mutation>> record) {
service.verifyTableExists(tableId);
Map<ByteString, ByteString> table = service.getTable(tableId);
ByteString key = record.getKey();
@@ -802,7 +798,7 @@ public class BigtableIOTest {
}
table.put(key, cell.getValue());
}
- return Futures.immediateFuture(Empty.getDefaultInstance());
+ return Futures.immediateFuture(MutateRowResponse.getDefaultInstance());
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9bf5376c/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
index 22d5b5b..02d403f 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
@@ -24,7 +24,7 @@ import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.values.PCollection;
-import com.google.bigtable.v1.Row;
+import com.google.bigtable.v2.Row;
import com.google.cloud.bigtable.config.BigtableOptions;
import org.junit.Test;
@@ -45,8 +45,7 @@ public class BigtableReadIT {
BigtableOptions.Builder bigtableOptionsBuilder = new BigtableOptions.Builder()
.setProjectId(options.getProjectId())
- .setClusterId(options.getClusterId())
- .setZoneId(options.getZoneId());
+ .setInstanceId(options.getInstanceId());
final String tableId = "BigtableReadTest";
final long numRows = 1000L;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9bf5376c/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
index 8e17761..a39d7d5 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
@@ -27,15 +27,16 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
-import com.google.bigtable.admin.table.v1.ColumnFamily;
-import com.google.bigtable.admin.table.v1.CreateTableRequest;
-import com.google.bigtable.admin.table.v1.DeleteTableRequest;
-import com.google.bigtable.admin.table.v1.GetTableRequest;
-import com.google.bigtable.admin.table.v1.Table;
-import com.google.bigtable.v1.Mutation;
-import com.google.bigtable.v1.ReadRowsRequest;
-import com.google.bigtable.v1.Row;
-import com.google.bigtable.v1.RowRange;
+import com.google.bigtable.admin.v2.ColumnFamily;
+import com.google.bigtable.admin.v2.CreateTableRequest;
+import com.google.bigtable.admin.v2.DeleteTableRequest;
+import com.google.bigtable.admin.v2.GetTableRequest;
+import com.google.bigtable.admin.v2.Table;
+import com.google.bigtable.v2.Mutation;
+import com.google.bigtable.v2.ReadRowsRequest;
+import com.google.bigtable.v2.Row;
+import com.google.bigtable.v2.RowRange;
+import com.google.bigtable.v2.RowSet;
import com.google.cloud.bigtable.config.BigtableOptions;
import com.google.cloud.bigtable.config.RetryOptions;
import com.google.cloud.bigtable.grpc.BigtableSession;
@@ -87,8 +88,7 @@ public class BigtableWriteIT implements Serializable {
BigtableOptions.Builder bigtableOptionsBuilder = new BigtableOptions.Builder()
.setProjectId(options.getProjectId())
- .setClusterId(options.getClusterId())
- .setZoneId(options.getZoneId())
+ .setInstanceId(options.getInstanceId())
.setUserAgent("apache-beam-test")
.setRetryOptions(retryOptionsBuilder.build());
bigtableOptions = bigtableOptionsBuilder.build();
@@ -99,12 +99,12 @@ public class BigtableWriteIT implements Serializable {
@Test
public void testE2EBigtableWrite() throws Exception {
- final String tableName = bigtableOptions.getClusterName().toTableNameStr(tableId);
- final String clusterName = bigtableOptions.getClusterName().toString();
+ final String tableName = bigtableOptions.getInstanceName().toTableNameStr(tableId);
+ final String instanceName = bigtableOptions.getInstanceName().toString();
final int numRows = 1000;
final List<KV<ByteString, ByteString>> testData = generateTableData(numRows);
- createEmptyTable(clusterName, tableId);
+ createEmptyTable(instanceName, tableId);
Pipeline p = Pipeline.create(options);
p.apply(CountingInput.upTo(numRows))
@@ -140,7 +140,7 @@ public class BigtableWriteIT implements Serializable {
@After
public void tearDown() throws Exception {
- final String tableName = bigtableOptions.getClusterName().toTableNameStr(tableId);
+ final String tableName = bigtableOptions.getInstanceName().toTableNameStr(tableId);
deleteTable(tableName);
session.close();
}
@@ -159,13 +159,13 @@ public class BigtableWriteIT implements Serializable {
}
/** Helper function to create an empty table. */
- private void createEmptyTable(String clusterName, String tableId) {
+ private void createEmptyTable(String instanceName, String tableId) {
Table.Builder tableBuilder = Table.newBuilder();
Map<String, ColumnFamily> columnFamilies = tableBuilder.getMutableColumnFamilies();
columnFamilies.put(COLUMN_FAMILY_NAME, ColumnFamily.newBuilder().build());
CreateTableRequest.Builder createTableRequestBuilder = CreateTableRequest.newBuilder()
- .setName(clusterName)
+ .setParent(instanceName)
.setTableId(tableId)
.setTable(tableBuilder.build());
tableAdminClient.createTable(createTableRequestBuilder.build());
@@ -182,16 +182,19 @@ public class BigtableWriteIT implements Serializable {
private List<KV<ByteString, ByteString>> getTableData(String tableName) throws IOException {
// Add empty range to avoid TARGET_NOT_SET error
RowRange range = RowRange.newBuilder()
- .setStartKey(ByteString.EMPTY)
- .setEndKey(ByteString.EMPTY)
+ .setStartKeyClosed(ByteString.EMPTY)
+ .setEndKeyOpen(ByteString.EMPTY)
+ .build();
+ RowSet rowSet = RowSet.newBuilder()
+ .addRowRanges(range)
.build();
- List<KV<ByteString, ByteString>> tableData = new ArrayList<>();
ReadRowsRequest.Builder readRowsRequestBuilder = ReadRowsRequest.newBuilder()
.setTableName(tableName)
- .setRowRange(range);
+ .setRows(rowSet);
ResultScanner<Row> scanner = session.getDataClient().readRows(readRowsRequestBuilder.build());
Row currentRow;
+ List<KV<ByteString, ByteString>> tableData = new ArrayList<>();
while ((currentRow = scanner.next()) != null) {
ByteString key = currentRow.getKey();
ByteString value = currentRow.getFamilies(0).getColumns(0).getCells(0).getValue();