You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/26 07:21:22 UTC
incubator-beam git commit: Modified BigtableIO to use DoFn
setup/tearDown methods instead of startBundle/finishBundle
Repository: incubator-beam
Updated Branches:
refs/heads/master bfd810f41 -> e871f1731
Modified BigtableIO to use DoFn setup/tearDown methods instead of startBundle/finishBundle
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e871f173
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e871f173
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e871f173
Branch: refs/heads/master
Commit: e871f1731d849fa7eda0a508b57e8b94514eb236
Parents: bfd810f
Author: Ian Zhou <ia...@google.com>
Authored: Mon Aug 15 15:39:56 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Aug 26 00:20:49 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigtable/BigtableIO.java | 17 +++++++++++++----
.../beam/sdk/io/gcp/bigtable/BigtableService.java | 9 ++++++++-
.../sdk/io/gcp/bigtable/BigtableServiceImpl.java | 8 ++++++++
.../beam/sdk/io/gcp/bigtable/BigtableIOTest.java | 3 +++
4 files changed, 32 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e871f173/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 3a9ffce..67dde50 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -515,9 +515,13 @@ public class BigtableIO {
this.failures = new ConcurrentLinkedQueue<>();
}
- @StartBundle
- public void startBundle(Context c) throws Exception {
+ @Setup
+ public void setup() throws Exception {
bigtableWriter = bigtableService.openForWriting(tableId);
+ }
+
+ @StartBundle
+ public void startBundle(Context c) {
recordsWritten = 0;
}
@@ -531,12 +535,17 @@ public class BigtableIO {
@FinishBundle
public void finishBundle(Context c) throws Exception {
- bigtableWriter.close();
- bigtableWriter = null;
+ bigtableWriter.flush();
checkForFailures();
logger.info("Wrote {} records", recordsWritten);
}
+ @Teardown
+ public void tearDown() throws Exception {
+ bigtableWriter.close();
+ bigtableWriter = null;
+ }
+
@Override
public void populateDisplayData(DisplayData.Builder builder) {
Write.this.populateDisplayData(builder);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e871f173/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
index ecd38a7..c656bbb 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
@@ -50,10 +50,17 @@ interface BigtableService extends Serializable {
throws IOException;
/**
- * Closes the writer.
+ * Flushes the writer.
*
* @throws IOException if any writes did not succeed
*/
+ void flush() throws IOException;
+
+ /**
+ * Closes the writer.
+ *
+ * @throws IOException if there is an error closing the writer
+ */
void close() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e871f173/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
index 07a183e..a402643 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
@@ -183,6 +183,14 @@ class BigtableServiceImpl implements BigtableService {
}
@Override
+ public void flush() throws IOException {
+ if (bulkMutation != null) {
+ bulkMutation.flush();
+ executor.flush();
+ }
+ }
+
+ @Override
public void close() throws IOException {
try {
if (bulkMutation != null) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e871f173/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index 61b404a..d60ede6 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -822,6 +822,9 @@ public class BigtableIOTest {
}
@Override
+ public void flush() {}
+
+ @Override
public void close() {}
}