You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by we...@apache.org on 2019/05/24 17:24:52 UTC
[samza] branch master updated: SAMZA-2217: Refactoring of Couchbase
table functions (#1049)
This is an automated email from the ASF dual-hosted git repository.
weisong44 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new e65bcf9 SAMZA-2217: Refactoring of Couchbase table functions (#1049)
e65bcf9 is described below
commit e65bcf926dbeebc339630a18fa2c7a53bbd9172b
Author: Wei Song <ws...@linkedin.com>
AuthorDate: Fri May 24 10:24:47 2019 -0700
SAMZA-2217: Refactoring of Couchbase table functions (#1049)
---
.../couchbase/BaseCouchbaseTableFunction.java | 9 ++++--
.../couchbase/CouchbaseTableWriteFunction.java | 34 +++++++++++++++-------
2 files changed, 29 insertions(+), 14 deletions(-)
diff --git a/samza-kv-couchbase/src/main/java/org/apache/samza/table/remote/couchbase/BaseCouchbaseTableFunction.java b/samza-kv-couchbase/src/main/java/org/apache/samza/table/remote/couchbase/BaseCouchbaseTableFunction.java
index e805975..5bd896f 100644
--- a/samza-kv-couchbase/src/main/java/org/apache/samza/table/remote/couchbase/BaseCouchbaseTableFunction.java
+++ b/samza-kv-couchbase/src/main/java/org/apache/samza/table/remote/couchbase/BaseCouchbaseTableFunction.java
@@ -29,6 +29,7 @@ import com.google.common.collect.ImmutableList;
import java.time.Duration;
import java.util.List;
+import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.context.Context;
@@ -92,11 +93,13 @@ public abstract class BaseCouchbaseTableFunction<V> extends BaseTableFunction {
* Check whether the exception is caused by one of the temporary failure exceptions, which are
* likely to be retriable.
* @param exception exception thrown by the table provider
- * @return a boolean
+ * @return true if we should retry, otherwise false
*/
public boolean isRetriable(Throwable exception) {
- while (exception != null && !(exception instanceof TemporaryFailureException)
- && !(exception instanceof TemporaryLockFailureException)) {
+ while (exception != null
+ && !(exception instanceof TemporaryFailureException)
+ && !(exception instanceof TemporaryLockFailureException)
+ && !(exception instanceof TimeoutException)) {
exception = exception.getCause();
}
return exception != null;
diff --git a/samza-kv-couchbase/src/main/java/org/apache/samza/table/remote/couchbase/CouchbaseTableWriteFunction.java b/samza-kv-couchbase/src/main/java/org/apache/samza/table/remote/couchbase/CouchbaseTableWriteFunction.java
index 96800ae..4a13e58 100644
--- a/samza-kv-couchbase/src/main/java/org/apache/samza/table/remote/couchbase/CouchbaseTableWriteFunction.java
+++ b/samza-kv-couchbase/src/main/java/org/apache/samza/table/remote/couchbase/CouchbaseTableWriteFunction.java
@@ -39,7 +39,7 @@ import org.apache.samza.table.remote.TableWriteFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import rx.Single;
+import rx.Observable;
import rx.SingleSubscriber;
/**
@@ -76,29 +76,40 @@ public class CouchbaseTableWriteFunction<V> extends BaseCouchbaseTableFunction<V
Preconditions.checkArgument(StringUtils.isNotBlank(key), "key must not be null, empty or blank");
Preconditions.checkArgument(!key.contains(" "), String.format("key should not contain spaces: %s", key));
Preconditions.checkNotNull(record);
- Document<?> document =
- record instanceof JsonObject ? JsonDocument.create(key, (int) ttl.getSeconds(), (JsonObject) record)
- : BinaryDocument.create(key, (int) ttl.getSeconds(), Unpooled.copiedBuffer(valueSerde.toBytes(record)));
- return asyncWriteHelper(bucket.async().upsert(document, timeout.toMillis(), TimeUnit.MILLISECONDS).toSingle(),
+ Document<?> document = record instanceof JsonObject
+ ? JsonDocument.create(key, (int) ttl.getSeconds(), (JsonObject) record)
+ : BinaryDocument.create(key, (int) ttl.getSeconds(), Unpooled.copiedBuffer(valueSerde.toBytes(record)));
+ return asyncWriteHelper(
+ bucket.async().upsert(document, timeout.toMillis(), TimeUnit.MILLISECONDS),
String.format("Failed to insert key %s into bucket %s", key, bucketName));
}
@Override
public CompletableFuture<Void> deleteAsync(String key) {
Preconditions.checkArgument(StringUtils.isNotBlank(key), "key must not be null, empty or blank");
- return asyncWriteHelper(bucket.async().remove(key, timeout.toMillis(), TimeUnit.MILLISECONDS).toSingle(),
+ return asyncWriteHelper(
+ bucket.async().remove(key, timeout.toMillis(), TimeUnit.MILLISECONDS),
String.format("Failed to delete key %s from bucket %s.", key, bucketName));
}
/*
* Helper method for putAsync and deleteAsync to convert Single to CompletableFuture.
*/
- protected CompletableFuture<Void> asyncWriteHelper(Single<? extends Document<?>> single, String errorMessage) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- single.subscribe(new SingleSubscriber<Document>() {
+ protected CompletableFuture<Void> asyncWriteHelper(Observable<? extends Document> observable, String errorMessage) {
+ return asyncWriteHelper(observable, errorMessage, true);
+ }
+
+ protected <T> CompletableFuture<T> asyncWriteHelper(Observable<? extends Document> observable, String errorMessage,
+ boolean isVoid) {
+ CompletableFuture<T> future = new CompletableFuture<>();
+ observable.toSingle().subscribe(new SingleSubscriber<Document>() {
@Override
- public void onSuccess(Document v) {
- future.complete(null);
+ public void onSuccess(Document document) {
+ if (isVoid) {
+ future.complete(null);
+ } else {
+ future.complete((T) document.content());
+ }
}
@Override
@@ -108,4 +119,5 @@ public class CouchbaseTableWriteFunction<V> extends BaseCouchbaseTableFunction<V
});
return future;
}
+
}