You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/09 00:31:23 UTC

[1/2] incubator-beam git commit: Closes #590

Repository: incubator-beam
Updated Branches:
  refs/heads/master d9632b7f9 -> 90abca193


Closes #590


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/90abca19
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/90abca19
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/90abca19

Branch: refs/heads/master
Commit: 90abca193cb7e8f1f6dc200f6444424daed11716
Parents: d9632b7 9bf5376
Author: Dan Halperin <dh...@google.com>
Authored: Fri Jul 8 17:31:15 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Jul 8 17:31:15 2016 -0700

----------------------------------------------------------------------
 sdks/java/io/google-cloud-platform/pom.xml      |  2 +-
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java    | 46 ++++++++++----------
 .../sdk/io/gcp/bigtable/BigtableService.java    | 10 ++---
 .../io/gcp/bigtable/BigtableServiceImpl.java    | 39 +++++++++--------
 .../io/gcp/bigtable/BigtableTestOptions.java    | 11 ++---
 .../sdk/io/gcp/bigtable/BigtableIOTest.java     | 38 ++++++++--------
 .../sdk/io/gcp/bigtable/BigtableReadIT.java     |  5 +--
 .../sdk/io/gcp/bigtable/BigtableWriteIT.java    | 45 ++++++++++---------
 8 files changed, 96 insertions(+), 100 deletions(-)
----------------------------------------------------------------------



[2/2] incubator-beam git commit: [BEAM-426] Update bigtable-client-core to 0.9.0

Posted by dh...@apache.org.
[BEAM-426] Update bigtable-client-core to 0.9.0


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9bf5376c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9bf5376c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9bf5376c

Branch: refs/heads/master
Commit: 9bf5376c54ede4dae874a4dccf098670e51b8d2f
Parents: d9632b7
Author: Neville Li <ne...@spotify.com>
Authored: Tue Jul 5 15:31:50 2016 -0400
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Jul 8 17:31:15 2016 -0700

----------------------------------------------------------------------
 sdks/java/io/google-cloud-platform/pom.xml      |  2 +-
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java    | 46 ++++++++++----------
 .../sdk/io/gcp/bigtable/BigtableService.java    | 10 ++---
 .../io/gcp/bigtable/BigtableServiceImpl.java    | 39 +++++++++--------
 .../io/gcp/bigtable/BigtableTestOptions.java    | 11 ++---
 .../sdk/io/gcp/bigtable/BigtableIOTest.java     | 38 ++++++++--------
 .../sdk/io/gcp/bigtable/BigtableReadIT.java     |  5 +--
 .../sdk/io/gcp/bigtable/BigtableWriteIT.java    | 45 ++++++++++---------
 8 files changed, 96 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9bf5376c/sdks/java/io/google-cloud-platform/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml
index c7e77f1..0a814c1 100644
--- a/sdks/java/io/google-cloud-platform/pom.xml
+++ b/sdks/java/io/google-cloud-platform/pom.xml
@@ -32,7 +32,7 @@
   <packaging>jar</packaging>
 
   <properties>
-    <bigtable.version>0.3.0</bigtable.version>
+    <bigtable.version>0.9.0</bigtable.version>
   </properties>
 
   <build>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9bf5376c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 4bab45e..84d52f6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -41,10 +41,11 @@ import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
 
-import com.google.bigtable.v1.Mutation;
-import com.google.bigtable.v1.Row;
-import com.google.bigtable.v1.RowFilter;
-import com.google.bigtable.v1.SampleRowKeysResponse;
+import com.google.bigtable.v2.MutateRowResponse;
+import com.google.bigtable.v2.Mutation;
+import com.google.bigtable.v2.Row;
+import com.google.bigtable.v2.RowFilter;
+import com.google.bigtable.v2.SampleRowKeysResponse;
 import com.google.cloud.bigtable.config.BigtableOptions;
 import com.google.cloud.bigtable.config.RetryOptions;
 import com.google.common.base.MoreObjects;
@@ -52,7 +53,6 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.protobuf.ByteString;
-import com.google.protobuf.Empty;
 
 import io.grpc.Status;
 
@@ -81,15 +81,14 @@ import javax.annotation.Nullable;
  *
  * <p>To configure a Cloud Bigtable source, you must supply a table id and a {@link BigtableOptions}
  * or builder configured with the project and other information necessary to identify the
- * Bigtable cluster. A {@link RowFilter} may also optionally be specified using
+ * Bigtable instance. A {@link RowFilter} may also optionally be specified using
  * {@link BigtableIO.Read#withRowFilter}. For example:
  *
  * <pre>{@code
  * BigtableOptions.Builder optionsBuilder =
  *     new BigtableOptions.Builder()
  *         .setProjectId("project")
- *         .setClusterId("cluster")
- *         .setZoneId("zone");
+ *         .setInstanceId("instance");
  *
  * Pipeline p = ...;
  *
@@ -116,14 +115,13 @@ import javax.annotation.Nullable;
  *
  * <p>To configure a Cloud Bigtable sink, you must supply a table id and a {@link BigtableOptions}
  * or builder configured with the project and other information necessary to identify the
- * Bigtable cluster, for example:
+ * Bigtable instance, for example:
  *
  * <pre>{@code
  * BigtableOptions.Builder optionsBuilder =
  *     new BigtableOptions.Builder()
  *         .setProjectId("project")
- *         .setClusterId("cluster")
- *         .setZoneId("zone");
+ *         .setInstanceId("instance");
  *
  * PCollection<KV<ByteString, Iterable<Mutation>>> data = ...;
  *
@@ -153,7 +151,7 @@ public class BigtableIO {
    * Creates an uninitialized {@link BigtableIO.Read}. Before use, the {@code Read} must be
    * initialized with a
    * {@link BigtableIO.Read#withBigtableOptions(BigtableOptions) BigtableOptions} that specifies
-   * the source Cloud Bigtable cluster, and a {@link BigtableIO.Read#withTableId tableId} that
+   * the source Cloud Bigtable instance, and a {@link BigtableIO.Read#withTableId tableId} that
    * specifies which table to read. A {@link RowFilter} may also optionally be specified using
    * {@link BigtableIO.Read#withRowFilter}.
    */
@@ -166,8 +164,8 @@ public class BigtableIO {
    * Creates an uninitialized {@link BigtableIO.Write}. Before use, the {@code Write} must be
    * initialized with a
    * {@link BigtableIO.Write#withBigtableOptions(BigtableOptions) BigtableOptions} that specifies
-   * the destination Cloud Bigtable cluster, and a {@link BigtableIO.Write#withTableId tableId} that
-   * specifies which table to write.
+   * the destination Cloud Bigtable instance, and a {@link BigtableIO.Write#withTableId tableId}
+   * that specifies which table to write.
    */
   @Experimental
   public static Write write() {
@@ -183,7 +181,7 @@ public class BigtableIO {
   @Experimental
   public static class Read extends PTransform<PBegin, PCollection<Row>> {
     /**
-     * Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable cluster
+     * Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable instance
      * indicated by the given options, and using any other specified customizations.
      *
      * <p>Does not modify this object.
@@ -194,7 +192,7 @@ public class BigtableIO {
     }
 
     /**
-     * Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable cluster
+     * Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable instance
      * indicated by the given options, and using any other specified customizations.
      *
      * <p>Clones the given {@link BigtableOptions} builder so that any further changes
@@ -247,7 +245,7 @@ public class BigtableIO {
     }
 
     /**
-     * Returns the Google Cloud Bigtable cluster being read from, and other parameters.
+     * Returns the Google Cloud Bigtable instance being read from, and other parameters.
      */
     public BigtableOptions getBigtableOptions() {
       return options;
@@ -308,7 +306,7 @@ public class BigtableIO {
 
     /////////////////////////////////////////////////////////////////////////////////////////
     /**
-     * Used to define the Cloud Bigtable cluster and any options for the networking layer.
+     * Used to define the Cloud Bigtable instance and any options for the networking layer.
      * Cannot actually be {@code null} at validation time, but may start out {@code null} while
      * source is being built.
      */
@@ -364,7 +362,7 @@ public class BigtableIO {
   public static class Write
       extends PTransform<PCollection<KV<ByteString, Iterable<Mutation>>>, PDone> {
     /**
-     * Used to define the Cloud Bigtable cluster and any options for the networking layer.
+     * Used to define the Cloud Bigtable instance and any options for the networking layer.
      * Cannot actually be {@code null} at validation time, but may start out {@code null} while
      * source is being built.
      */
@@ -382,7 +380,7 @@ public class BigtableIO {
     }
 
     /**
-     * Returns a new {@link BigtableIO.Write} that will write to the Cloud Bigtable cluster
+     * Returns a new {@link BigtableIO.Write} that will write to the Cloud Bigtable instance
      * indicated by the given options, and using any other specified customizations.
      *
      * <p>Does not modify this object.
@@ -393,7 +391,7 @@ public class BigtableIO {
     }
 
     /**
-     * Returns a new {@link BigtableIO.Write} that will write to the Cloud Bigtable cluster
+     * Returns a new {@link BigtableIO.Write} that will write to the Cloud Bigtable instance
      * indicated by the given options, and using any other specified customizations.
      *
      * <p>Clones the given {@link BigtableOptions} builder so that any further changes
@@ -437,7 +435,7 @@ public class BigtableIO {
     }
 
     /**
-     * Returns the Google Cloud Bigtable cluster being written to, and other parameters.
+     * Returns the Google Cloud Bigtable instance being written to, and other parameters.
      */
     public BigtableOptions getBigtableOptions() {
       return options;
@@ -585,7 +583,7 @@ public class BigtableIO {
         throw new IOException(message);
       }
 
-      private class WriteExceptionCallback implements FutureCallback<Empty> {
+      private class WriteExceptionCallback implements FutureCallback<MutateRowResponse> {
         private final KV<ByteString, Iterable<Mutation>> value;
 
         public WriteExceptionCallback(KV<ByteString, Iterable<Mutation>> value) {
@@ -598,7 +596,7 @@ public class BigtableIO {
         }
 
         @Override
-        public void onSuccess(Empty produced) {}
+        public void onSuccess(MutateRowResponse produced) {}
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9bf5376c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
index 93e558b..2a7e3a0 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
@@ -20,12 +20,12 @@ package org.apache.beam.sdk.io.gcp.bigtable;
 import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource;
 import org.apache.beam.sdk.values.KV;
 
-import com.google.bigtable.v1.Mutation;
-import com.google.bigtable.v1.Row;
-import com.google.bigtable.v1.SampleRowKeysResponse;
+import com.google.bigtable.v2.MutateRowResponse;
+import com.google.bigtable.v2.Mutation;
+import com.google.bigtable.v2.Row;
+import com.google.bigtable.v2.SampleRowKeysResponse;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.protobuf.ByteString;
-import com.google.protobuf.Empty;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -48,7 +48,7 @@ interface BigtableService extends Serializable {
      *
      * @throws IOException if there is an error submitting the write.
      */
-    ListenableFuture<Empty> writeRecord(KV<ByteString, Iterable<Mutation>> record)
+    ListenableFuture<MutateRowResponse> writeRecord(KV<ByteString, Iterable<Mutation>> record)
         throws IOException;
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9bf5376c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
index a0e6b29..b9288df 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
@@ -20,14 +20,16 @@ package org.apache.beam.sdk.io.gcp.bigtable;
 import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource;
 import org.apache.beam.sdk.values.KV;
 
-import com.google.bigtable.admin.table.v1.GetTableRequest;
-import com.google.bigtable.v1.MutateRowRequest;
-import com.google.bigtable.v1.Mutation;
-import com.google.bigtable.v1.ReadRowsRequest;
-import com.google.bigtable.v1.Row;
-import com.google.bigtable.v1.RowRange;
-import com.google.bigtable.v1.SampleRowKeysRequest;
-import com.google.bigtable.v1.SampleRowKeysResponse;
+import com.google.bigtable.admin.v2.GetTableRequest;
+import com.google.bigtable.v2.MutateRowRequest;
+import com.google.bigtable.v2.MutateRowResponse;
+import com.google.bigtable.v2.Mutation;
+import com.google.bigtable.v2.ReadRowsRequest;
+import com.google.bigtable.v2.Row;
+import com.google.bigtable.v2.RowRange;
+import com.google.bigtable.v2.RowSet;
+import com.google.bigtable.v2.SampleRowKeysRequest;
+import com.google.bigtable.v2.SampleRowKeysResponse;
 import com.google.cloud.bigtable.config.BigtableOptions;
 import com.google.cloud.bigtable.grpc.BigtableSession;
 import com.google.cloud.bigtable.grpc.BigtableTableName;
@@ -38,7 +40,6 @@ import com.google.common.base.MoreObjects;
 import com.google.common.io.Closer;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.protobuf.ByteString;
-import com.google.protobuf.Empty;
 
 import io.grpc.Status.Code;
 import io.grpc.StatusRuntimeException;
@@ -66,7 +67,7 @@ class BigtableServiceImpl implements BigtableService {
   @Override
   public BigtableWriterImpl openForWriting(String tableId) throws IOException {
     BigtableSession session = new BigtableSession(options);
-    BigtableTableName tableName = options.getClusterName().toTableName(tableId);
+    BigtableTableName tableName = options.getInstanceName().toTableName(tableId);
     return new BigtableWriterImpl(session, tableName);
   }
 
@@ -84,7 +85,7 @@ class BigtableServiceImpl implements BigtableService {
     try (BigtableSession session = new BigtableSession(options)) {
       GetTableRequest getTable =
           GetTableRequest.newBuilder()
-              .setName(options.getClusterName().toTableNameStr(tableId))
+              .setName(options.getInstanceName().toTableNameStr(tableId))
               .build();
       session.getTableAdminClient().getTable(getTable);
       return true;
@@ -115,13 +116,16 @@ class BigtableServiceImpl implements BigtableService {
     public boolean start() throws IOException {
       RowRange range =
           RowRange.newBuilder()
-              .setStartKey(source.getRange().getStartKey().getValue())
-              .setEndKey(source.getRange().getEndKey().getValue())
+              .setStartKeyClosed(source.getRange().getStartKey().getValue())
+              .setEndKeyOpen(source.getRange().getEndKey().getValue())
               .build();
+      RowSet rowSet = RowSet.newBuilder()
+          .addRowRanges(range)
+          .build();
       ReadRowsRequest.Builder requestB =
           ReadRowsRequest.newBuilder()
-              .setRowRange(range)
-              .setTableName(options.getClusterName().toTableNameStr(source.getTableId()));
+              .setRows(rowSet)
+              .setTableName(options.getInstanceName().toTableNameStr(source.getTableId()));
       if (source.getRowFilter() != null) {
         requestB.setFilter(source.getRowFilter());
       }
@@ -200,7 +204,8 @@ class BigtableServiceImpl implements BigtableService {
     }
 
     @Override
-    public ListenableFuture<Empty> writeRecord(KV<ByteString, Iterable<Mutation>> record)
+    public ListenableFuture<MutateRowResponse> writeRecord(
+        KV<ByteString, Iterable<Mutation>> record)
         throws IOException {
       MutateRowRequest r =
           partialBuilder
@@ -231,7 +236,7 @@ class BigtableServiceImpl implements BigtableService {
     try (BigtableSession session = new BigtableSession(options)) {
       SampleRowKeysRequest request =
           SampleRowKeysRequest.newBuilder()
-              .setTableName(options.getClusterName().toTableNameStr(source.getTableId()))
+              .setTableName(options.getInstanceName().toTableNameStr(source.getTableId()))
               .build();
       return session.getDataClient().sampleRowKeys(request);
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9bf5376c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestOptions.java
index 0cd4f57..0ab7576 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestOptions.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableTestOptions.java
@@ -30,13 +30,8 @@ public interface BigtableTestOptions extends TestPipelineOptions {
   String getProjectId();
   void setProjectId(String value);
 
-  @Description("Cluster ID for Bigtable")
+  @Description("Instance ID for Bigtable")
   @Default.String("beam-test")
-  String getClusterId();
-  void setClusterId(String value);
-
-  @Description("Zone ID for Bigtable")
-  @Default.String("us-central1-c")
-  String getZoneId();
-  void setZoneId(String value);
+  String getInstanceId();
+  void setInstanceId(String value);
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9bf5376c/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index a6a7f9d..f5f0682 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -49,14 +49,15 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
-import com.google.bigtable.v1.Cell;
-import com.google.bigtable.v1.Column;
-import com.google.bigtable.v1.Family;
-import com.google.bigtable.v1.Mutation;
-import com.google.bigtable.v1.Mutation.SetCell;
-import com.google.bigtable.v1.Row;
-import com.google.bigtable.v1.RowFilter;
-import com.google.bigtable.v1.SampleRowKeysResponse;
+import com.google.bigtable.v2.Cell;
+import com.google.bigtable.v2.Column;
+import com.google.bigtable.v2.Family;
+import com.google.bigtable.v2.MutateRowResponse;
+import com.google.bigtable.v2.Mutation;
+import com.google.bigtable.v2.Mutation.SetCell;
+import com.google.bigtable.v2.Row;
+import com.google.bigtable.v2.RowFilter;
+import com.google.bigtable.v2.SampleRowKeysResponse;
 import com.google.cloud.bigtable.config.BigtableOptions;
 import com.google.cloud.bigtable.config.BulkOptions;
 import com.google.cloud.bigtable.config.RetryOptions;
@@ -67,7 +68,6 @@ import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.protobuf.ByteString;
-import com.google.protobuf.Empty;
 
 import org.hamcrest.Matchers;
 import org.junit.Before;
@@ -108,8 +108,7 @@ public class BigtableIOTest {
   private static final BigtableOptions BIGTABLE_OPTIONS =
       new BigtableOptions.Builder()
           .setProjectId("project")
-          .setClusterId("cluster")
-          .setZoneId("zone")
+          .setInstanceId("instance")
           .build();
   private static BigtableIO.Read defaultRead =
       BigtableIO.read().withBigtableOptions(BIGTABLE_OPTIONS);
@@ -132,8 +131,7 @@ public class BigtableIOTest {
     BigtableIO.Read read =
         BigtableIO.read().withBigtableOptions(BIGTABLE_OPTIONS).withTableId("table");
     assertEquals("project", read.getBigtableOptions().getProjectId());
-    assertEquals("cluster", read.getBigtableOptions().getClusterId());
-    assertEquals("zone", read.getBigtableOptions().getZoneId());
+    assertEquals("instance", read.getBigtableOptions().getInstanceId());
     assertEquals("table", read.getTableId());
   }
 
@@ -142,8 +140,7 @@ public class BigtableIOTest {
     BigtableIO.Read read =
         BigtableIO.read().withTableId("table").withBigtableOptions(BIGTABLE_OPTIONS);
     assertEquals("project", read.getBigtableOptions().getProjectId());
-    assertEquals("cluster", read.getBigtableOptions().getClusterId());
-    assertEquals("zone", read.getBigtableOptions().getZoneId());
+    assertEquals("instance", read.getBigtableOptions().getInstanceId());
     assertEquals("table", read.getTableId());
   }
 
@@ -153,17 +150,15 @@ public class BigtableIOTest {
         BigtableIO.write().withBigtableOptions(BIGTABLE_OPTIONS).withTableId("table");
     assertEquals("table", write.getTableId());
     assertEquals("project", write.getBigtableOptions().getProjectId());
-    assertEquals("zone", write.getBigtableOptions().getZoneId());
-    assertEquals("cluster", write.getBigtableOptions().getClusterId());
+    assertEquals("instance", write.getBigtableOptions().getInstanceId());
   }
 
   @Test
   public void testWriteBuildsCorrectlyInDifferentOrder() {
     BigtableIO.Write write =
         BigtableIO.write().withTableId("table").withBigtableOptions(BIGTABLE_OPTIONS);
-    assertEquals("cluster", write.getBigtableOptions().getClusterId());
     assertEquals("project", write.getBigtableOptions().getProjectId());
-    assertEquals("zone", write.getBigtableOptions().getZoneId());
+    assertEquals("instance", write.getBigtableOptions().getInstanceId());
     assertEquals("table", write.getTableId());
   }
 
@@ -791,7 +786,8 @@ public class BigtableIOTest {
     }
 
     @Override
-    public ListenableFuture<Empty> writeRecord(KV<ByteString, Iterable<Mutation>> record) {
+    public ListenableFuture<MutateRowResponse> writeRecord(
+        KV<ByteString, Iterable<Mutation>> record) {
       service.verifyTableExists(tableId);
       Map<ByteString, ByteString> table = service.getTable(tableId);
       ByteString key = record.getKey();
@@ -802,7 +798,7 @@ public class BigtableIOTest {
         }
         table.put(key, cell.getValue());
       }
-      return Futures.immediateFuture(Empty.getDefaultInstance());
+      return Futures.immediateFuture(MutateRowResponse.getDefaultInstance());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9bf5376c/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
index 22d5b5b..02d403f 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableReadIT.java
@@ -24,7 +24,7 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.values.PCollection;
 
-import com.google.bigtable.v1.Row;
+import com.google.bigtable.v2.Row;
 import com.google.cloud.bigtable.config.BigtableOptions;
 
 import org.junit.Test;
@@ -45,8 +45,7 @@ public class BigtableReadIT {
 
     BigtableOptions.Builder bigtableOptionsBuilder = new BigtableOptions.Builder()
         .setProjectId(options.getProjectId())
-        .setClusterId(options.getClusterId())
-        .setZoneId(options.getZoneId());
+        .setInstanceId(options.getInstanceId());
 
     final String tableId = "BigtableReadTest";
     final long numRows = 1000L;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9bf5376c/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
index 8e17761..a39d7d5 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
@@ -27,15 +27,16 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.KV;
 
-import com.google.bigtable.admin.table.v1.ColumnFamily;
-import com.google.bigtable.admin.table.v1.CreateTableRequest;
-import com.google.bigtable.admin.table.v1.DeleteTableRequest;
-import com.google.bigtable.admin.table.v1.GetTableRequest;
-import com.google.bigtable.admin.table.v1.Table;
-import com.google.bigtable.v1.Mutation;
-import com.google.bigtable.v1.ReadRowsRequest;
-import com.google.bigtable.v1.Row;
-import com.google.bigtable.v1.RowRange;
+import com.google.bigtable.admin.v2.ColumnFamily;
+import com.google.bigtable.admin.v2.CreateTableRequest;
+import com.google.bigtable.admin.v2.DeleteTableRequest;
+import com.google.bigtable.admin.v2.GetTableRequest;
+import com.google.bigtable.admin.v2.Table;
+import com.google.bigtable.v2.Mutation;
+import com.google.bigtable.v2.ReadRowsRequest;
+import com.google.bigtable.v2.Row;
+import com.google.bigtable.v2.RowRange;
+import com.google.bigtable.v2.RowSet;
 import com.google.cloud.bigtable.config.BigtableOptions;
 import com.google.cloud.bigtable.config.RetryOptions;
 import com.google.cloud.bigtable.grpc.BigtableSession;
@@ -87,8 +88,7 @@ public class BigtableWriteIT implements Serializable {
 
     BigtableOptions.Builder bigtableOptionsBuilder = new BigtableOptions.Builder()
         .setProjectId(options.getProjectId())
-        .setClusterId(options.getClusterId())
-        .setZoneId(options.getZoneId())
+        .setInstanceId(options.getInstanceId())
         .setUserAgent("apache-beam-test")
         .setRetryOptions(retryOptionsBuilder.build());
     bigtableOptions = bigtableOptionsBuilder.build();
@@ -99,12 +99,12 @@ public class BigtableWriteIT implements Serializable {
 
   @Test
   public void testE2EBigtableWrite() throws Exception {
-    final String tableName = bigtableOptions.getClusterName().toTableNameStr(tableId);
-    final String clusterName = bigtableOptions.getClusterName().toString();
+    final String tableName = bigtableOptions.getInstanceName().toTableNameStr(tableId);
+    final String instanceName = bigtableOptions.getInstanceName().toString();
     final int numRows = 1000;
     final List<KV<ByteString, ByteString>> testData = generateTableData(numRows);
 
-    createEmptyTable(clusterName, tableId);
+    createEmptyTable(instanceName, tableId);
 
     Pipeline p = Pipeline.create(options);
     p.apply(CountingInput.upTo(numRows))
@@ -140,7 +140,7 @@ public class BigtableWriteIT implements Serializable {
 
   @After
   public void tearDown() throws Exception {
-    final String tableName = bigtableOptions.getClusterName().toTableNameStr(tableId);
+    final String tableName = bigtableOptions.getInstanceName().toTableNameStr(tableId);
     deleteTable(tableName);
     session.close();
   }
@@ -159,13 +159,13 @@ public class BigtableWriteIT implements Serializable {
   }
 
   /** Helper function to create an empty table. */
-  private void createEmptyTable(String clusterName, String tableId) {
+  private void createEmptyTable(String instanceName, String tableId) {
     Table.Builder tableBuilder = Table.newBuilder();
     Map<String, ColumnFamily> columnFamilies = tableBuilder.getMutableColumnFamilies();
     columnFamilies.put(COLUMN_FAMILY_NAME, ColumnFamily.newBuilder().build());
 
     CreateTableRequest.Builder createTableRequestBuilder = CreateTableRequest.newBuilder()
-        .setName(clusterName)
+        .setParent(instanceName)
         .setTableId(tableId)
         .setTable(tableBuilder.build());
     tableAdminClient.createTable(createTableRequestBuilder.build());
@@ -182,16 +182,19 @@ public class BigtableWriteIT implements Serializable {
   private List<KV<ByteString, ByteString>> getTableData(String tableName) throws IOException {
     // Add empty range to avoid TARGET_NOT_SET error
     RowRange range = RowRange.newBuilder()
-        .setStartKey(ByteString.EMPTY)
-        .setEndKey(ByteString.EMPTY)
+        .setStartKeyClosed(ByteString.EMPTY)
+        .setEndKeyOpen(ByteString.EMPTY)
+        .build();
+    RowSet rowSet = RowSet.newBuilder()
+        .addRowRanges(range)
         .build();
-    List<KV<ByteString, ByteString>> tableData = new ArrayList<>();
     ReadRowsRequest.Builder readRowsRequestBuilder = ReadRowsRequest.newBuilder()
         .setTableName(tableName)
-        .setRowRange(range);
+        .setRows(rowSet);
     ResultScanner<Row> scanner = session.getDataClient().readRows(readRowsRequestBuilder.build());
 
     Row currentRow;
+    List<KV<ByteString, ByteString>> tableData = new ArrayList<>();
     while ((currentRow = scanner.next()) != null) {
       ByteString key = currentRow.getKey();
       ByteString value = currentRow.getFamilies(0).getColumns(0).getCells(0).getValue();