You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/08 17:43:47 UTC
[1/2] incubator-beam git commit: Closes #596
Repository: incubator-beam
Updated Branches:
refs/heads/master a7312bee3 -> 744b0474e
Closes #596
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/744b0474
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/744b0474
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/744b0474
Branch: refs/heads/master
Commit: 744b0474edb94cefbb9486664231004db8e00a72
Parents: a7312be 12f1599
Author: Dan Halperin <dh...@google.com>
Authored: Fri Jul 8 10:43:32 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Jul 8 10:43:32 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigtable/BigtableIO.java | 260 +++++++------------
1 file changed, 92 insertions(+), 168 deletions(-)
----------------------------------------------------------------------
[2/2] incubator-beam git commit: Modified BigtableIO to support
streaming
Posted by dh...@apache.org.
Modified BigtableIO to support streaming
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/12f15993
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/12f15993
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/12f15993
Branch: refs/heads/master
Commit: 12f159934ec7965c3974cda319681103a817778b
Parents: a7312be
Author: Ian Zhou <ia...@google.com>
Authored: Fri Jul 1 16:14:53 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Jul 8 10:43:32 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigtable/BigtableIO.java | 260 +++++++------------
1 file changed, 92 insertions(+), 168 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/12f15993/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 0c485bf..4bab45e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -23,18 +23,17 @@ import static com.google.common.base.Preconditions.checkState;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.coders.protobuf.ProtoCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
-import org.apache.beam.sdk.io.Sink.WriteOperation;
-import org.apache.beam.sdk.io.Sink.Writer;
import org.apache.beam.sdk.io.range.ByteKey;
import org.apache.beam.sdk.io.range.ByteKeyRange;
import org.apache.beam.sdk.io.range.ByteKeyRangeTracker;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.sdk.values.KV;
@@ -453,8 +452,8 @@ public class BigtableIO {
@Override
public PDone apply(PCollection<KV<ByteString, Iterable<Mutation>>> input) {
- Sink sink = new Sink(tableId, getBigtableService());
- return input.apply(org.apache.beam.sdk.io.Write.to(sink));
+ input.apply(ParDo.of(new BigtableWriterFn(tableId, getBigtableService())));
+ return PDone.in(input.getPipeline());
}
@Override
@@ -514,6 +513,94 @@ public class BigtableIO {
}
return new BigtableServiceImpl(options);
}
+
+ private class BigtableWriterFn extends DoFn<KV<ByteString, Iterable<Mutation>>, Void> {
+
+ public BigtableWriterFn(String tableId, BigtableService bigtableService) {
+ this.tableId = checkNotNull(tableId, "tableId");
+ this.bigtableService = checkNotNull(bigtableService, "bigtableService");
+ this.failures = new ConcurrentLinkedQueue<>();
+ }
+
+ @Override
+ public void startBundle(Context c) throws Exception {
+ bigtableWriter = bigtableService.openForWriting(tableId);
+ recordsWritten = 0;
+ }
+
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ checkForFailures();
+ Futures.addCallback(
+ bigtableWriter.writeRecord(c.element()), new WriteExceptionCallback(c.element()));
+ ++recordsWritten;
+ }
+
+ @Override
+ public void finishBundle(Context c) throws Exception {
+ bigtableWriter.close();
+ bigtableWriter = null;
+ checkForFailures();
+ logger.info("Wrote {} records", recordsWritten);
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ Write.this.populateDisplayData(builder);
+ }
+
+ ///////////////////////////////////////////////////////////////////////////////
+ private final String tableId;
+ private final BigtableService bigtableService;
+ private BigtableService.Writer bigtableWriter;
+ private long recordsWritten;
+ private final ConcurrentLinkedQueue<BigtableWriteException> failures;
+
+ /**
+ * If any write has asynchronously failed, fail the bundle with a useful error.
+ */
+ private void checkForFailures() throws IOException {
+ // Note that this function is never called by multiple threads and is the only place that
+ // we remove from failures, so this code is safe.
+ if (failures.isEmpty()) {
+ return;
+ }
+
+ StringBuilder logEntry = new StringBuilder();
+ int i = 0;
+ for (; i < 10 && !failures.isEmpty(); ++i) {
+ BigtableWriteException exc = failures.remove();
+ logEntry.append("\n").append(exc.getMessage());
+ if (exc.getCause() != null) {
+ logEntry.append(": ").append(exc.getCause().getMessage());
+ }
+ }
+ String message =
+ String.format(
+ "At least %d errors occurred writing to Bigtable. First %d errors: %s",
+ i + failures.size(),
+ i,
+ logEntry.toString());
+ logger.error(message);
+ throw new IOException(message);
+ }
+
+ private class WriteExceptionCallback implements FutureCallback<Empty> {
+ private final KV<ByteString, Iterable<Mutation>> value;
+
+ public WriteExceptionCallback(KV<ByteString, Iterable<Mutation>> value) {
+ this.value = value;
+ }
+
+ @Override
+ public void onFailure(Throwable cause) {
+ failures.add(new BigtableWriteException(value, cause));
+ }
+
+ @Override
+ public void onSuccess(Empty produced) {}
+ }
+ }
}
//////////////////////////////////////////////////////////////////////////////////////////
@@ -871,169 +958,6 @@ public class BigtableIO {
}
}
- private static class Sink
- extends org.apache.beam.sdk.io.Sink<KV<ByteString, Iterable<Mutation>>> {
-
- public Sink(String tableId, BigtableService bigtableService) {
- this.tableId = checkNotNull(tableId, "tableId");
- this.bigtableService = checkNotNull(bigtableService, "bigtableService");
- }
-
- public String getTableId() {
- return tableId;
- }
-
- public BigtableService getBigtableService() {
- return bigtableService;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(Sink.class)
- .add("bigtableService", bigtableService)
- .add("tableId", tableId)
- .toString();
- }
-
- ///////////////////////////////////////////////////////////////////////////////
- private final String tableId;
- private final BigtableService bigtableService;
-
- @Override
- public WriteOperation<KV<ByteString, Iterable<Mutation>>, Long> createWriteOperation(
- PipelineOptions options) {
- return new BigtableWriteOperation(this);
- }
-
- /** Does nothing, as it is redundant with {@link Write#validate}. */
- @Override
- public void validate(PipelineOptions options) {}
- }
-
- private static class BigtableWriteOperation
- extends WriteOperation<KV<ByteString, Iterable<Mutation>>, Long> {
- private final Sink sink;
-
- public BigtableWriteOperation(Sink sink) {
- this.sink = sink;
- }
-
- @Override
- public Writer<KV<ByteString, Iterable<Mutation>>, Long> createWriter(PipelineOptions options)
- throws Exception {
- return new BigtableWriter(this);
- }
-
- @Override
- public void initialize(PipelineOptions options) {}
-
- @Override
- public void finalize(Iterable<Long> writerResults, PipelineOptions options) {
- long count = 0;
- for (Long value : writerResults) {
- value += count;
- }
- logger.debug("Wrote {} elements to BigtableIO.Sink {}", sink);
- }
-
- @Override
- public Sink getSink() {
- return sink;
- }
-
- @Override
- public Coder<Long> getWriterResultCoder() {
- return VarLongCoder.of();
- }
- }
-
- private static class BigtableWriter extends Writer<KV<ByteString, Iterable<Mutation>>, Long> {
- private final BigtableWriteOperation writeOperation;
- private final Sink sink;
- private BigtableService.Writer bigtableWriter;
- private long recordsWritten;
- private final ConcurrentLinkedQueue<BigtableWriteException> failures;
-
- public BigtableWriter(BigtableWriteOperation writeOperation) {
- this.writeOperation = writeOperation;
- this.sink = writeOperation.getSink();
- this.failures = new ConcurrentLinkedQueue<>();
- }
-
- @Override
- public void open(String uId) throws Exception {
- bigtableWriter = sink.getBigtableService().openForWriting(sink.getTableId());
- recordsWritten = 0;
- }
-
- /**
- * If any write has asynchronously failed, fail the bundle with a useful error.
- */
- private void checkForFailures() throws IOException {
- // Note that this function is never called by multiple threads and is the only place that
- // we remove from failures, so this code is safe.
- if (failures.isEmpty()) {
- return;
- }
-
- StringBuilder logEntry = new StringBuilder();
- int i = 0;
- for (; i < 10 && !failures.isEmpty(); ++i) {
- BigtableWriteException exc = failures.remove();
- logEntry.append("\n").append(exc.getMessage());
- if (exc.getCause() != null) {
- logEntry.append(": ").append(exc.getCause().getMessage());
- }
- }
- String message =
- String.format(
- "At least %d errors occurred writing to Bigtable. First %d errors: %s",
- i + failures.size(),
- i,
- logEntry.toString());
- logger.error(message);
- throw new IOException(message);
- }
-
- @Override
- public void write(KV<ByteString, Iterable<Mutation>> rowMutations) throws Exception {
- checkForFailures();
- Futures.addCallback(
- bigtableWriter.writeRecord(rowMutations), new WriteExceptionCallback(rowMutations));
- ++recordsWritten;
- }
-
- @Override
- public Long close() throws Exception {
- bigtableWriter.close();
- bigtableWriter = null;
- checkForFailures();
- logger.info("Wrote {} records", recordsWritten);
- return recordsWritten;
- }
-
- @Override
- public WriteOperation<KV<ByteString, Iterable<Mutation>>, Long> getWriteOperation() {
- return writeOperation;
- }
-
- private class WriteExceptionCallback implements FutureCallback<Empty> {
- private final KV<ByteString, Iterable<Mutation>> value;
-
- public WriteExceptionCallback(KV<ByteString, Iterable<Mutation>> value) {
- this.value = value;
- }
-
- @Override
- public void onFailure(Throwable cause) {
- failures.add(new BigtableWriteException(value, cause));
- }
-
- @Override
- public void onSuccess(Empty produced) {}
- }
- }
-
/**
* An exception that puts information about the failed record being written in its message.
*/