You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by we...@apache.org on 2019/05/24 17:24:52 UTC

[samza] branch master updated: SAMZA-2217: Refactoring of Couchbase table functions (#1049)

This is an automated email from the ASF dual-hosted git repository.

weisong44 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new e65bcf9  SAMZA-2217: Refactoring of Couchbase table functions (#1049)
e65bcf9 is described below

commit e65bcf926dbeebc339630a18fa2c7a53bbd9172b
Author: Wei Song <ws...@linkedin.com>
AuthorDate: Fri May 24 10:24:47 2019 -0700

    SAMZA-2217: Refactoring of Couchbase table functions (#1049)
---
 .../couchbase/BaseCouchbaseTableFunction.java      |  9 ++++--
 .../couchbase/CouchbaseTableWriteFunction.java     | 34 +++++++++++++++-------
 2 files changed, 29 insertions(+), 14 deletions(-)

diff --git a/samza-kv-couchbase/src/main/java/org/apache/samza/table/remote/couchbase/BaseCouchbaseTableFunction.java b/samza-kv-couchbase/src/main/java/org/apache/samza/table/remote/couchbase/BaseCouchbaseTableFunction.java
index e805975..5bd896f 100644
--- a/samza-kv-couchbase/src/main/java/org/apache/samza/table/remote/couchbase/BaseCouchbaseTableFunction.java
+++ b/samza-kv-couchbase/src/main/java/org/apache/samza/table/remote/couchbase/BaseCouchbaseTableFunction.java
@@ -29,6 +29,7 @@ import com.google.common.collect.ImmutableList;
 import java.time.Duration;
 import java.util.List;
 
+import java.util.concurrent.TimeoutException;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.context.Context;
@@ -92,11 +93,13 @@ public abstract class BaseCouchbaseTableFunction<V> extends BaseTableFunction {
    * Check whether the exception is caused by one of the temporary failure exceptions, which are
    * likely to be retriable.
    * @param exception exception thrown by the table provider
-   * @return a boolean
+   * @return true if we should retry, otherwise false
    */
   public boolean isRetriable(Throwable exception) {
-    while (exception != null && !(exception instanceof TemporaryFailureException)
-        && !(exception instanceof TemporaryLockFailureException)) {
+    while (exception != null
+        && !(exception instanceof TemporaryFailureException)
+        && !(exception instanceof TemporaryLockFailureException)
+        && !(exception instanceof TimeoutException)) {
       exception = exception.getCause();
     }
     return exception != null;
diff --git a/samza-kv-couchbase/src/main/java/org/apache/samza/table/remote/couchbase/CouchbaseTableWriteFunction.java b/samza-kv-couchbase/src/main/java/org/apache/samza/table/remote/couchbase/CouchbaseTableWriteFunction.java
index 96800ae..4a13e58 100644
--- a/samza-kv-couchbase/src/main/java/org/apache/samza/table/remote/couchbase/CouchbaseTableWriteFunction.java
+++ b/samza-kv-couchbase/src/main/java/org/apache/samza/table/remote/couchbase/CouchbaseTableWriteFunction.java
@@ -39,7 +39,7 @@ import org.apache.samza.table.remote.TableWriteFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import rx.Single;
+import rx.Observable;
 import rx.SingleSubscriber;
 
 /**
@@ -76,29 +76,40 @@ public class CouchbaseTableWriteFunction<V> extends BaseCouchbaseTableFunction<V
     Preconditions.checkArgument(StringUtils.isNotBlank(key), "key must not be null, empty or blank");
     Preconditions.checkArgument(!key.contains(" "), String.format("key should not contain spaces: %s", key));
     Preconditions.checkNotNull(record);
-    Document<?> document =
-        record instanceof JsonObject ? JsonDocument.create(key, (int) ttl.getSeconds(), (JsonObject) record)
-            : BinaryDocument.create(key, (int) ttl.getSeconds(), Unpooled.copiedBuffer(valueSerde.toBytes(record)));
-    return asyncWriteHelper(bucket.async().upsert(document, timeout.toMillis(), TimeUnit.MILLISECONDS).toSingle(),
+    Document<?> document = record instanceof JsonObject
+        ? JsonDocument.create(key, (int) ttl.getSeconds(), (JsonObject) record)
+        : BinaryDocument.create(key, (int) ttl.getSeconds(), Unpooled.copiedBuffer(valueSerde.toBytes(record)));
+    return asyncWriteHelper(
+        bucket.async().upsert(document, timeout.toMillis(), TimeUnit.MILLISECONDS),
         String.format("Failed to insert key %s into bucket %s", key, bucketName));
   }
 
   @Override
   public CompletableFuture<Void> deleteAsync(String key) {
     Preconditions.checkArgument(StringUtils.isNotBlank(key), "key must not be null, empty or blank");
-    return asyncWriteHelper(bucket.async().remove(key, timeout.toMillis(), TimeUnit.MILLISECONDS).toSingle(),
+    return asyncWriteHelper(
+        bucket.async().remove(key, timeout.toMillis(), TimeUnit.MILLISECONDS),
         String.format("Failed to delete key %s from bucket %s.", key, bucketName));
   }
 
   /*
    * Helper method for putAsync and deleteAsync to convert Single to CompletableFuture.
    */
-  protected CompletableFuture<Void> asyncWriteHelper(Single<? extends Document<?>> single, String errorMessage) {
-    CompletableFuture<Void> future = new CompletableFuture<>();
-    single.subscribe(new SingleSubscriber<Document>() {
+  protected CompletableFuture<Void>  asyncWriteHelper(Observable<? extends Document> observable, String errorMessage) {
+    return asyncWriteHelper(observable, errorMessage, true);
+  }
+
+  protected <T> CompletableFuture<T>  asyncWriteHelper(Observable<? extends Document> observable, String errorMessage,
+      boolean isVoid) {
+    CompletableFuture<T> future = new CompletableFuture<>();
+    observable.toSingle().subscribe(new SingleSubscriber<Document>() {
       @Override
-      public void onSuccess(Document v) {
-        future.complete(null);
+      public void onSuccess(Document document) {
+        if (isVoid) {
+          future.complete(null);
+        } else {
+          future.complete((T) document.content());
+        }
       }
 
       @Override
@@ -108,4 +119,5 @@ public class CouchbaseTableWriteFunction<V> extends BaseCouchbaseTableFunction<V
     });
     return future;
   }
+
 }