You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/26 07:21:22 UTC

incubator-beam git commit: Modified BigtableIO to use DoFn setup/tearDown methods instead of startBundle/finishBundle

Repository: incubator-beam
Updated Branches:
  refs/heads/master bfd810f41 -> e871f1731


Modified BigtableIO to use DoFn setup/tearDown methods instead of startBundle/finishBundle


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e871f173
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e871f173
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e871f173

Branch: refs/heads/master
Commit: e871f1731d849fa7eda0a508b57e8b94514eb236
Parents: bfd810f
Author: Ian Zhou <ia...@google.com>
Authored: Mon Aug 15 15:39:56 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Aug 26 00:20:49 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java       | 17 +++++++++++++----
 .../beam/sdk/io/gcp/bigtable/BigtableService.java  |  9 ++++++++-
 .../sdk/io/gcp/bigtable/BigtableServiceImpl.java   |  8 ++++++++
 .../beam/sdk/io/gcp/bigtable/BigtableIOTest.java   |  3 +++
 4 files changed, 32 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e871f173/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 3a9ffce..67dde50 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -515,9 +515,13 @@ public class BigtableIO {
         this.failures = new ConcurrentLinkedQueue<>();
       }
 
-      @StartBundle
-      public void startBundle(Context c) throws Exception {
+      @Setup
+      public void setup() throws Exception {
         bigtableWriter = bigtableService.openForWriting(tableId);
+      }
+
+      @StartBundle
+      public void startBundle(Context c) {
         recordsWritten = 0;
       }
 
@@ -531,12 +535,17 @@ public class BigtableIO {
 
       @FinishBundle
       public void finishBundle(Context c) throws Exception {
-        bigtableWriter.close();
-        bigtableWriter = null;
+        bigtableWriter.flush();
         checkForFailures();
         logger.info("Wrote {} records", recordsWritten);
       }
 
+      @Teardown
+      public void tearDown() throws Exception {
+        bigtableWriter.close();
+        bigtableWriter = null;
+      }
+
       @Override
       public void populateDisplayData(DisplayData.Builder builder) {
         Write.this.populateDisplayData(builder);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e871f173/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
index ecd38a7..c656bbb 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
@@ -50,10 +50,17 @@ interface BigtableService extends Serializable {
         throws IOException;
 
     /**
-     * Closes the writer.
+     * Flushes the writer.
      *
      * @throws IOException if any writes did not succeed
      */
+    void flush() throws IOException;
+
+    /**
+     * Closes the writer.
+     *
+     * @throws IOException if there is an error closing the writer
+     */
     void close() throws IOException;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e871f173/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
index 07a183e..a402643 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
@@ -183,6 +183,14 @@ class BigtableServiceImpl implements BigtableService {
     }
 
     @Override
+    public void flush() throws IOException {
+      if (bulkMutation != null) {
+        bulkMutation.flush();
+        executor.flush();
+      }
+    }
+
+    @Override
     public void close() throws IOException {
       try {
         if (bulkMutation != null) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e871f173/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index 61b404a..d60ede6 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -822,6 +822,9 @@ public class BigtableIOTest {
     }
 
     @Override
+    public void flush() {}
+
+    @Override
     public void close() {}
   }