You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by gl...@apache.org on 2019/03/13 13:03:32 UTC
[beam] 01/01: Fix usage of non-vendored guava in BigTableIO
This is an automated email from the ASF dual-hosted git repository.
gleb pushed a commit to branch kanterov_bigtable_vendored_guava
in repository https://gitbox.apache.org/repos/asf/beam.git
commit b038ee74ad4804c4ce06c772b6f80b8936bc1575
Author: Gleb Kanterov <gl...@spotify.com>
AuthorDate: Wed Feb 27 18:02:45 2019 +0100
Fix usage of non-vendored guava in BigTableIO
---
.../src/main/resources/beam/suppressions.xml | 2 +-
.../sdk/io/gcp/bigtable/BigtableServiceImpl.java | 6 +-
.../bigtable/VendoredListenableFutureAdapter.java | 66 ++++++++++++++++++++++
3 files changed, 70 insertions(+), 4 deletions(-)
diff --git a/sdks/java/build-tools/src/main/resources/beam/suppressions.xml b/sdks/java/build-tools/src/main/resources/beam/suppressions.xml
index 247a980..05ae412 100644
--- a/sdks/java/build-tools/src/main/resources/beam/suppressions.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/suppressions.xml
@@ -98,7 +98,7 @@
<suppress id="ForbidNonVendoredGuava" files=".*cassandra.*CassandraIO\.java" />
<suppress id="ForbidNonVendoredGuava" files=".*kinesis.*KinesisIO\.java" />
<suppress id="ForbidNonVendoredGuava" files=".*kinesis.*KinesisProducerMock\.java" />
- <suppress id="ForbidNonVendoredGuava" files=".*bigtable.*BigtableServiceImpl\.java" />
+ <suppress id="ForbidNonVendoredGuava" files=".*bigtable.*VendoredListenableFutureAdapter\.java" />
<suppress id="ForbidNonVendoredGuava" files=".*bigtable.*BigtableServiceImplTest\.java" />
<suppress id="ForbidNonVendoredGuava" files=".*sql.*BeamValuesRel\.java" />
<suppress id="ForbidNonVendoredGuava" files=".*sql.*BeamEnumerableConverterTest\.java" />
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
index 5d5de21..767faff 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
@@ -32,8 +32,6 @@ import com.google.cloud.bigtable.grpc.BigtableSession;
import com.google.cloud.bigtable.grpc.BigtableTableName;
import com.google.cloud.bigtable.grpc.async.BulkMutation;
import com.google.cloud.bigtable.grpc.scanner.ResultScanner;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
import com.google.protobuf.ByteString;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
@@ -48,6 +46,8 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v20_0.com.google.common.io.Closer;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.FutureCallback;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.Futures;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -228,7 +228,7 @@ class BigtableServiceImpl implements BigtableService {
CompletableFuture<MutateRowResponse> result = new CompletableFuture<>();
Futures.addCallback(
- bulkMutation.add(request),
+ new VendoredListenableFutureAdapter<>(bulkMutation.add(request)),
new FutureCallback<MutateRowResponse>() {
@Override
public void onSuccess(MutateRowResponse mutateRowResponse) {
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/VendoredListenableFutureAdapter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/VendoredListenableFutureAdapter.java
new file mode 100644
index 0000000..c736083
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/VendoredListenableFutureAdapter.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListenableFuture;
+
+/** Adapts {@link ListenableFuture} from bigtable-client-core to vendored guava. */
+class VendoredListenableFutureAdapter<V> implements ListenableFuture<V> {
+
+ private final com.google.common.util.concurrent.ListenableFuture<V> underlying;
+
+ VendoredListenableFutureAdapter(
+ com.google.common.util.concurrent.ListenableFuture<V> underlying) {
+ this.underlying = underlying;
+ }
+
+ @Override
+ public void addListener(Runnable listener, Executor executor) {
+ underlying.addListener(listener, executor);
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return underlying.cancel(mayInterruptIfRunning);
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return underlying.isCancelled();
+ }
+
+ @Override
+ public boolean isDone() {
+ return underlying.isDone();
+ }
+
+ @Override
+ public V get() throws InterruptedException, ExecutionException {
+ return underlying.get();
+ }
+
+ @Override
+ public V get(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ return underlying.get(timeout, unit);
+ }
+}