You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/08 17:43:48 UTC

[2/2] incubator-beam git commit: Modified BigtableIO to support streaming

Modified BigtableIO to support streaming


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/12f15993
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/12f15993
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/12f15993

Branch: refs/heads/master
Commit: 12f159934ec7965c3974cda319681103a817778b
Parents: a7312be
Author: Ian Zhou <ia...@google.com>
Authored: Fri Jul 1 16:14:53 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Jul 8 10:43:32 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java    | 260 +++++++------------
 1 file changed, 92 insertions(+), 168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12f15993/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 0c485bf..4bab45e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -23,18 +23,17 @@ import static com.google.common.base.Preconditions.checkState;
 
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.coders.protobuf.ProtoCoder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
-import org.apache.beam.sdk.io.Sink.WriteOperation;
-import org.apache.beam.sdk.io.Sink.Writer;
 import org.apache.beam.sdk.io.range.ByteKey;
 import org.apache.beam.sdk.io.range.ByteKeyRange;
 import org.apache.beam.sdk.io.range.ByteKeyRangeTracker;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.ReleaseInfo;
 import org.apache.beam.sdk.values.KV;
@@ -453,8 +452,8 @@ public class BigtableIO {
 
     @Override
     public PDone apply(PCollection<KV<ByteString, Iterable<Mutation>>> input) {
-      Sink sink = new Sink(tableId, getBigtableService());
-      return input.apply(org.apache.beam.sdk.io.Write.to(sink));
+      input.apply(ParDo.of(new BigtableWriterFn(tableId, getBigtableService())));
+      return PDone.in(input.getPipeline());
     }
 
     @Override
@@ -514,6 +513,94 @@ public class BigtableIO {
       }
       return new BigtableServiceImpl(options);
     }
+
+    private class BigtableWriterFn extends DoFn<KV<ByteString, Iterable<Mutation>>, Void> {
+
+      public BigtableWriterFn(String tableId, BigtableService bigtableService) {
+        this.tableId = checkNotNull(tableId, "tableId");
+        this.bigtableService = checkNotNull(bigtableService, "bigtableService");
+        this.failures = new ConcurrentLinkedQueue<>();
+      }
+
+      @Override
+      public void startBundle(Context c) throws Exception {
+        bigtableWriter = bigtableService.openForWriting(tableId);
+        recordsWritten = 0;
+      }
+
+      @Override
+      public void processElement(ProcessContext c) throws Exception {
+        checkForFailures();
+        Futures.addCallback(
+            bigtableWriter.writeRecord(c.element()), new WriteExceptionCallback(c.element()));
+        ++recordsWritten;
+      }
+
+      @Override
+      public void finishBundle(Context c) throws Exception {
+        bigtableWriter.close();
+        bigtableWriter = null;
+        checkForFailures();
+        logger.info("Wrote {} records", recordsWritten);
+      }
+
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        Write.this.populateDisplayData(builder);
+      }
+
+      ///////////////////////////////////////////////////////////////////////////////
+      private final String tableId;
+      private final BigtableService bigtableService;
+      private BigtableService.Writer bigtableWriter;
+      private long recordsWritten;
+      private final ConcurrentLinkedQueue<BigtableWriteException> failures;
+
+      /**
+       * If any write has asynchronously failed, fail the bundle with a useful error.
+       */
+      private void checkForFailures() throws IOException {
+        // Note that this function is never called by multiple threads and is the only place that
+        // we remove from failures, so this code is safe.
+        if (failures.isEmpty()) {
+          return;
+        }
+
+        StringBuilder logEntry = new StringBuilder();
+        int i = 0;
+        for (; i < 10 && !failures.isEmpty(); ++i) {
+          BigtableWriteException exc = failures.remove();
+          logEntry.append("\n").append(exc.getMessage());
+          if (exc.getCause() != null) {
+            logEntry.append(": ").append(exc.getCause().getMessage());
+          }
+        }
+        String message =
+            String.format(
+                "At least %d errors occurred writing to Bigtable. First %d errors: %s",
+                i + failures.size(),
+                i,
+                logEntry.toString());
+        logger.error(message);
+        throw new IOException(message);
+      }
+
+      private class WriteExceptionCallback implements FutureCallback<Empty> {
+        private final KV<ByteString, Iterable<Mutation>> value;
+
+        public WriteExceptionCallback(KV<ByteString, Iterable<Mutation>> value) {
+          this.value = value;
+        }
+
+        @Override
+        public void onFailure(Throwable cause) {
+          failures.add(new BigtableWriteException(value, cause));
+        }
+
+        @Override
+        public void onSuccess(Empty produced) {}
+      }
+    }
   }
 
   //////////////////////////////////////////////////////////////////////////////////////////
@@ -871,169 +958,6 @@ public class BigtableIO {
     }
   }
 
-  private static class Sink
-      extends org.apache.beam.sdk.io.Sink<KV<ByteString, Iterable<Mutation>>> {
-
-    public Sink(String tableId, BigtableService bigtableService) {
-      this.tableId = checkNotNull(tableId, "tableId");
-      this.bigtableService = checkNotNull(bigtableService, "bigtableService");
-    }
-
-    public String getTableId() {
-      return tableId;
-    }
-
-    public BigtableService getBigtableService() {
-      return bigtableService;
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(Sink.class)
-          .add("bigtableService", bigtableService)
-          .add("tableId", tableId)
-          .toString();
-    }
-
-    ///////////////////////////////////////////////////////////////////////////////
-    private final String tableId;
-    private final BigtableService bigtableService;
-
-    @Override
-    public WriteOperation<KV<ByteString, Iterable<Mutation>>, Long> createWriteOperation(
-        PipelineOptions options) {
-      return new BigtableWriteOperation(this);
-    }
-
-    /** Does nothing, as it is redundant with {@link Write#validate}. */
-    @Override
-    public void validate(PipelineOptions options) {}
-  }
-
-  private static class BigtableWriteOperation
-      extends WriteOperation<KV<ByteString, Iterable<Mutation>>, Long> {
-    private final Sink sink;
-
-    public BigtableWriteOperation(Sink sink) {
-      this.sink = sink;
-    }
-
-    @Override
-    public Writer<KV<ByteString, Iterable<Mutation>>, Long> createWriter(PipelineOptions options)
-        throws Exception {
-      return new BigtableWriter(this);
-    }
-
-    @Override
-    public void initialize(PipelineOptions options) {}
-
-    @Override
-    public void finalize(Iterable<Long> writerResults, PipelineOptions options) {
-      long count = 0;
-      for (Long value : writerResults) {
-        value += count;
-      }
-      logger.debug("Wrote {} elements to BigtableIO.Sink {}", sink);
-    }
-
-    @Override
-    public Sink getSink() {
-      return sink;
-    }
-
-    @Override
-    public Coder<Long> getWriterResultCoder() {
-      return VarLongCoder.of();
-    }
-  }
-
-  private static class BigtableWriter extends Writer<KV<ByteString, Iterable<Mutation>>, Long> {
-    private final BigtableWriteOperation writeOperation;
-    private final Sink sink;
-    private BigtableService.Writer bigtableWriter;
-    private long recordsWritten;
-    private final ConcurrentLinkedQueue<BigtableWriteException> failures;
-
-    public BigtableWriter(BigtableWriteOperation writeOperation) {
-      this.writeOperation = writeOperation;
-      this.sink = writeOperation.getSink();
-      this.failures = new ConcurrentLinkedQueue<>();
-    }
-
-    @Override
-    public void open(String uId) throws Exception {
-      bigtableWriter = sink.getBigtableService().openForWriting(sink.getTableId());
-      recordsWritten = 0;
-    }
-
-    /**
-     * If any write has asynchronously failed, fail the bundle with a useful error.
-     */
-    private void checkForFailures() throws IOException {
-      // Note that this function is never called by multiple threads and is the only place that
-      // we remove from failures, so this code is safe.
-      if (failures.isEmpty()) {
-        return;
-      }
-
-      StringBuilder logEntry = new StringBuilder();
-      int i = 0;
-      for (; i < 10 && !failures.isEmpty(); ++i) {
-        BigtableWriteException exc = failures.remove();
-        logEntry.append("\n").append(exc.getMessage());
-        if (exc.getCause() != null) {
-          logEntry.append(": ").append(exc.getCause().getMessage());
-        }
-      }
-      String message =
-          String.format(
-              "At least %d errors occurred writing to Bigtable. First %d errors: %s",
-              i + failures.size(),
-              i,
-              logEntry.toString());
-      logger.error(message);
-      throw new IOException(message);
-    }
-
-    @Override
-    public void write(KV<ByteString, Iterable<Mutation>> rowMutations) throws Exception {
-      checkForFailures();
-      Futures.addCallback(
-          bigtableWriter.writeRecord(rowMutations), new WriteExceptionCallback(rowMutations));
-      ++recordsWritten;
-    }
-
-    @Override
-    public Long close() throws Exception {
-      bigtableWriter.close();
-      bigtableWriter = null;
-      checkForFailures();
-      logger.info("Wrote {} records", recordsWritten);
-      return recordsWritten;
-    }
-
-    @Override
-    public WriteOperation<KV<ByteString, Iterable<Mutation>>, Long> getWriteOperation() {
-      return writeOperation;
-    }
-
-    private class WriteExceptionCallback implements FutureCallback<Empty> {
-      private final KV<ByteString, Iterable<Mutation>> value;
-
-      public WriteExceptionCallback(KV<ByteString, Iterable<Mutation>> value) {
-        this.value = value;
-      }
-
-      @Override
-      public void onFailure(Throwable cause) {
-        failures.add(new BigtableWriteException(value, cause));
-      }
-
-      @Override
-      public void onSuccess(Empty produced) {}
-    }
-  }
-
   /**
    * An exception that puts information about the failed record being written in its message.
    */