You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by gl...@apache.org on 2019/03/13 13:37:14 UTC

[beam] branch master updated: Fix usage of non-vendored guava in BigTableIO

This is an automated email from the ASF dual-hosted git repository.

gleb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b038ee7  Fix usage of non-vendored guava in BigTableIO
     new 6e89b6c  Merge pull request #7957: Fix usage of non-vendored guava in BigTableIO
b038ee7 is described below

commit b038ee74ad4804c4ce06c772b6f80b8936bc1575
Author: Gleb Kanterov <gl...@spotify.com>
AuthorDate: Wed Feb 27 18:02:45 2019 +0100

    Fix usage of non-vendored guava in BigTableIO
---
 .../src/main/resources/beam/suppressions.xml       |  2 +-
 .../sdk/io/gcp/bigtable/BigtableServiceImpl.java   |  6 +-
 .../bigtable/VendoredListenableFutureAdapter.java  | 66 ++++++++++++++++++++++
 3 files changed, 70 insertions(+), 4 deletions(-)

diff --git a/sdks/java/build-tools/src/main/resources/beam/suppressions.xml b/sdks/java/build-tools/src/main/resources/beam/suppressions.xml
index 247a980..05ae412 100644
--- a/sdks/java/build-tools/src/main/resources/beam/suppressions.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/suppressions.xml
@@ -98,7 +98,7 @@
   <suppress id="ForbidNonVendoredGuava" files=".*cassandra.*CassandraIO\.java" />
   <suppress id="ForbidNonVendoredGuava" files=".*kinesis.*KinesisIO\.java" />
   <suppress id="ForbidNonVendoredGuava" files=".*kinesis.*KinesisProducerMock\.java" />
-  <suppress id="ForbidNonVendoredGuava" files=".*bigtable.*BigtableServiceImpl\.java" />
+  <suppress id="ForbidNonVendoredGuava" files=".*bigtable.*VendoredListenableFutureAdapter\.java" />
   <suppress id="ForbidNonVendoredGuava" files=".*bigtable.*BigtableServiceImplTest\.java" />
   <suppress id="ForbidNonVendoredGuava" files=".*sql.*BeamValuesRel\.java" />
   <suppress id="ForbidNonVendoredGuava" files=".*sql.*BeamEnumerableConverterTest\.java" />
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
index 5d5de21..767faff 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
@@ -32,8 +32,6 @@ import com.google.cloud.bigtable.grpc.BigtableSession;
 import com.google.cloud.bigtable.grpc.BigtableTableName;
 import com.google.cloud.bigtable.grpc.async.BulkMutation;
 import com.google.cloud.bigtable.grpc.scanner.ResultScanner;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
 import com.google.protobuf.ByteString;
 import io.grpc.Status.Code;
 import io.grpc.StatusRuntimeException;
@@ -48,6 +46,8 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.io.Closer;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.FutureCallback;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.Futures;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -228,7 +228,7 @@ class BigtableServiceImpl implements BigtableService {
 
       CompletableFuture<MutateRowResponse> result = new CompletableFuture<>();
       Futures.addCallback(
-          bulkMutation.add(request),
+          new VendoredListenableFutureAdapter<>(bulkMutation.add(request)),
           new FutureCallback<MutateRowResponse>() {
             @Override
             public void onSuccess(MutateRowResponse mutateRowResponse) {
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/VendoredListenableFutureAdapter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/VendoredListenableFutureAdapter.java
new file mode 100644
index 0000000..c736083
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/VendoredListenableFutureAdapter.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListenableFuture;
+
+/** Adapts {@link ListenableFuture} from bigtable-client-core to vendored guava. */
+class VendoredListenableFutureAdapter<V> implements ListenableFuture<V> {
+
+  private final com.google.common.util.concurrent.ListenableFuture<V> underlying;
+
+  VendoredListenableFutureAdapter(
+      com.google.common.util.concurrent.ListenableFuture<V> underlying) {
+    this.underlying = underlying;
+  }
+
+  @Override
+  public void addListener(Runnable listener, Executor executor) {
+    underlying.addListener(listener, executor);
+  }
+
+  @Override
+  public boolean cancel(boolean mayInterruptIfRunning) {
+    return underlying.cancel(mayInterruptIfRunning);
+  }
+
+  @Override
+  public boolean isCancelled() {
+    return underlying.isCancelled();
+  }
+
+  @Override
+  public boolean isDone() {
+    return underlying.isDone();
+  }
+
+  @Override
+  public V get() throws InterruptedException, ExecutionException {
+    return underlying.get();
+  }
+
+  @Override
+  public V get(long timeout, TimeUnit unit)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    return underlying.get(timeout, unit);
+  }
+}