You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/06/04 20:40:00 UTC
[bookkeeper] 02/04: [TABLE SERVICE] apply backoff policy to rpc
requests if storage container is not found
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit 71e0604ed60d6e1cd14cef41b7f041d57422dbfc
Author: Sijie Guo <si...@apache.org>
AuthorDate: Wed May 2 21:19:42 2018 -0700
[TABLE SERVICE] apply backoff policy to rpc requests if storage container is not found
Descriptions of the changes in this PR:
*Motivation*
A storage container can move between servers due to failures, or it can take time to start on a server. During that short period of time, it is "unavailable" and clients will receive `Status.NOT_FOUND` from grpc channels. The client should retry on this error and attempt to re-locate the storage container again.
*Modification*
- add backoff policy in client settings
- apply the backoff policy at storage container channel. if it receives `NOT_FOUND` grpc exception, it will reset the server channel, so subsequent requests will re-attempt to relocate the storage container.
Author: Sijie Guo <si...@apache.org>
Reviewers: Jia Zhai <None>
This closes #1379 from sijie/rpc_backoffs
---
.../org/apache/bookkeeper/common/util/Backoff.java | 12 +-
.../bookkeeper/clients/StorageClientImpl.java | 3 +-
.../clients/config/StorageClientSettings.java | 12 ++
.../impl/container/StorageContainerChannel.java | 18 +-
.../clients/impl/internal/LocationClientImpl.java | 1 -
.../clients/impl/internal/MetaRangeClientImpl.java | 15 +-
.../clients/impl/internal/RootRangeClientImpl.java | 11 +-
.../internal/RootRangeClientImplWithRetries.java | 132 ++++++++++++
.../internal/StorageServerClientManagerImpl.java | 10 +-
.../internal/mr/MetaRangeRequestProcessor.java | 11 +-
.../bookkeeper/clients/utils/ClientConstants.java | 16 ++
.../utils/ListenableFutureRpcProcessor.java | 44 ++--
.../apache/bookkeeper/clients/utils/RpcUtils.java | 13 ++
.../RootRangeClientImplWithRetriesTest.java | 173 +++++++++++++++
.../utils/ListenableFutureRpcProcessorTest.java | 236 +++++++++++++++++++++
.../bookkeeper/clients/utils/RpcUtilsTest.java | 50 +++++
.../java/base/src/test/resources/log4j.properties | 51 +++++
.../clients/impl/kv/PByteBufTableImpl.java | 7 +-
.../clients/impl/kv/PByteBufTableRangeImpl.java | 21 +-
.../clients/impl/kv/TableRequestProcessor.java | 12 +-
.../clients/impl/kv/TableRequestProcessorTest.java | 4 +-
.../clients/impl/kv/TestPByteBufTableImpl.java | 4 +-
22 files changed, 811 insertions(+), 45 deletions(-)
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Backoff.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Backoff.java
index 085e4cd..fdf8055 100644
--- a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Backoff.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Backoff.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import lombok.Data;
import lombok.ToString;
+import org.apache.bookkeeper.common.util.Backoff.Jitter.Type;
/**
* Implements various backoff strategies.
@@ -34,6 +35,12 @@ import lombok.ToString;
*/
public class Backoff {
+ public static final Policy DEFAULT = Jitter.of(
+ Type.EXPONENTIAL,
+ 200,
+ 2000,
+ 3);
+
private static final int MaxBitShift = 62;
/**
@@ -95,7 +102,10 @@ public class Backoff {
@ToString
public static class Jitter implements Policy {
- enum Type {
+ /**
+ * Jitter type.
+ */
+ public enum Type {
DECORRELATED,
EQUAL,
EXPONENTIAL
diff --git a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java
index 0cde4e2..aff322c 100644
--- a/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java
+++ b/stream/clients/java/all/src/main/java/org/apache/bookkeeper/clients/StorageClientImpl.java
@@ -98,7 +98,8 @@ class StorageClientImpl extends AbstractAutoAsyncCloseable implements StorageCli
streamName,
props,
serverManager,
- scheduler.chooseThread(props.getStreamId())
+ scheduler.chooseThread(props.getStreamId()),
+ settings.backoffPolicy()
).initialize();
}),
future
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java
index 825e1d3..20b5821 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java
@@ -24,6 +24,8 @@ import io.grpc.ManagedChannelBuilder;
import io.grpc.NameResolver;
import java.util.List;
import java.util.Optional;
+import org.apache.bookkeeper.clients.utils.ClientConstants;
+import org.apache.bookkeeper.common.util.Backoff;
import org.apache.bookkeeper.stream.proto.common.Endpoint;
import org.inferred.freebuilder.FreeBuilder;
@@ -80,6 +82,15 @@ public interface StorageClientSettings {
Optional<String> clientName();
/**
+ * Configure a backoff policy for the client.
+ *
+ * <p>There are a few default backoff policies defined in {@link org.apache.bookkeeper.common.util.Backoff}.
+ *
+ * @return backoff policy provider
+ */
+ Backoff.Policy backoffPolicy();
+
+ /**
* Builder of {@link StorageClientSettings} instances.
*/
class Builder extends StorageClientSettings_Builder {
@@ -87,6 +98,7 @@ public interface StorageClientSettings {
Builder() {
numWorkerThreads(Runtime.getRuntime().availableProcessors());
usePlaintext(true);
+ backoffPolicy(ClientConstants.DEFAULT_INFINIT_BACKOFF_POLICY);
}
@Override
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerChannel.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerChannel.java
index 4006776..8635e0f 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerChannel.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/container/StorageContainerChannel.java
@@ -78,11 +78,25 @@ public class StorageContainerChannel {
return rsChannelFuture;
}
- @VisibleForTesting
- synchronized void resetStorageServerChannelFuture() {
+ public synchronized void resetStorageServerChannelFuture() {
rsChannelFuture = null;
}
+ public synchronized boolean resetStorageServerChannelFuture(CompletableFuture<StorageServerChannel> oldFuture) {
+ if (oldFuture != null) {
+ // we only reset the channel that we expect to reset
+ if (rsChannelFuture == oldFuture) {
+ rsChannelFuture = null;
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ rsChannelFuture = null;
+ return true;
+ }
+ }
+
@VisibleForTesting
public synchronized void setStorageServerChannelFuture(CompletableFuture<StorageServerChannel> rsChannelFuture) {
this.rsChannelFuture = rsChannelFuture;
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/LocationClientImpl.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/LocationClientImpl.java
index fa02004..95cae9a 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/LocationClientImpl.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/LocationClientImpl.java
@@ -105,7 +105,6 @@ public class LocationClientImpl implements LocationClient {
}
switch (status.getCode()) {
case INVALID_ARGUMENT:
- case NOT_FOUND:
case ALREADY_EXISTS:
case PERMISSION_DENIED:
case UNAUTHENTICATED:
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/MetaRangeClientImpl.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/MetaRangeClientImpl.java
index c84caeb..10481f1 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/MetaRangeClientImpl.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/MetaRangeClientImpl.java
@@ -29,6 +29,8 @@ import org.apache.bookkeeper.clients.impl.container.StorageContainerChannelManag
import org.apache.bookkeeper.clients.impl.internal.api.HashStreamRanges;
import org.apache.bookkeeper.clients.impl.internal.api.MetaRangeClient;
import org.apache.bookkeeper.clients.impl.internal.mr.MetaRangeRequestProcessor;
+import org.apache.bookkeeper.clients.utils.ClientConstants;
+import org.apache.bookkeeper.common.util.Backoff;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.stream.proto.StreamProperties;
@@ -41,13 +43,23 @@ class MetaRangeClientImpl implements MetaRangeClient {
private final StreamProperties streamProps;
private final ScheduledExecutorService executor;
private final StorageContainerChannel scClient;
+ private final Backoff.Policy backoffPolicy;
MetaRangeClientImpl(StreamProperties streamProps,
OrderedScheduler scheduler,
StorageContainerChannelManager channelManager) {
+ this(streamProps, scheduler, channelManager, ClientConstants.DEFAULT_INFINIT_BACKOFF_POLICY);
+
+ }
+
+ MetaRangeClientImpl(StreamProperties streamProps,
+ OrderedScheduler scheduler,
+ StorageContainerChannelManager channelManager,
+ Backoff.Policy backoffPolicy) {
this.streamProps = streamProps;
this.executor = scheduler.chooseThread(streamProps.getStreamId());
this.scClient = channelManager.getOrCreate(streamProps.getStorageContainerId());
+ this.backoffPolicy = backoffPolicy;
}
@Override
@@ -71,7 +83,8 @@ class MetaRangeClientImpl implements MetaRangeClient {
streamProps),
(response) -> createActiveRanges(response.getGetActiveRangesResp()),
scClient,
- executor
+ executor,
+ backoffPolicy
).process();
}
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImpl.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImpl.java
index 51ea9e3..72bb0e1 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImpl.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImpl.java
@@ -19,6 +19,7 @@
package org.apache.bookkeeper.clients.impl.internal;
import static org.apache.bookkeeper.clients.impl.internal.ProtocolInternalUtils.createRootRangeException;
+import static org.apache.bookkeeper.clients.utils.RpcUtils.isContainerNotFound;
import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROOT_STORAGE_CONTAINER_ID;
import static org.apache.bookkeeper.stream.protocol.util.ProtoUtils.createCreateNamespaceRequest;
import static org.apache.bookkeeper.stream.protocol.util.ProtoUtils.createCreateStreamRequest;
@@ -79,7 +80,15 @@ class RootRangeClientImpl implements RootRangeClient {
ProcessRequestFunc<ReqT, RespT, RootRangeServiceFutureStub> processRequestFunc,
ProcessResponseFunc<RespT, T> processResponseFunc) {
- CompletableFuture<T> result = FutureUtils.createFuture();
+ CompletableFuture<T> result = FutureUtils.<T>createFuture()
+ .whenComplete((v, cause) -> {
+ if (null != cause && isContainerNotFound(cause)) {
+ // if the rpc fails with `NOT_FOUND`, it means the storage container is not owned by any servers
+ // yet. in this case, reset the storage server channel, this allows subsequent retries will be
+ // forced to re-locate the containers.
+ scClient.resetStorageServerChannelFuture();
+ }
+ });
scClient.getStorageContainerChannelFuture().whenComplete((rsChannel, cause) -> {
if (null != cause) {
handleGetRootRangeServiceFailure(result, cause);
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImplWithRetries.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImplWithRetries.java
new file mode 100644
index 0000000..d83d5a7
--- /dev/null
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImplWithRetries.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.clients.impl.internal;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.StatusRuntimeException;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import org.apache.bookkeeper.clients.impl.internal.api.RootRangeClient;
+import org.apache.bookkeeper.common.util.Backoff;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.common.util.Retries;
+import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
+import org.apache.bookkeeper.stream.proto.NamespaceProperties;
+import org.apache.bookkeeper.stream.proto.StreamConfiguration;
+import org.apache.bookkeeper.stream.proto.StreamProperties;
+
+/**
+ * A root range client wrapper with retries.
+ */
+class RootRangeClientImplWithRetries implements RootRangeClient {
+
+ @VisibleForTesting
+ static final Predicate<Throwable> ROOT_RANGE_CLIENT_RETRY_PREDICATE =
+ cause -> shouldRetryOnException(cause);
+
+ private static boolean shouldRetryOnException(Throwable cause) {
+ if (cause instanceof StatusRuntimeException || cause instanceof StatusException) {
+ Status status;
+ if (cause instanceof StatusException) {
+ status = ((StatusException) cause).getStatus();
+ } else {
+ status = ((StatusRuntimeException) cause).getStatus();
+ }
+ switch (status.getCode()) {
+ case INVALID_ARGUMENT:
+ case ALREADY_EXISTS:
+ case PERMISSION_DENIED:
+ case UNAUTHENTICATED:
+ return false;
+ default:
+ return true;
+ }
+ } else if (cause instanceof RuntimeException) {
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ private final RootRangeClient client;
+ private final Backoff.Policy backoffPolicy;
+ private final OrderedScheduler scheduler;
+
+ RootRangeClientImplWithRetries(RootRangeClient client,
+ Backoff.Policy backoffPolicy,
+ OrderedScheduler scheduler) {
+ this.client = client;
+ this.backoffPolicy = backoffPolicy;
+ this.scheduler = scheduler;
+ }
+
+ private <T> CompletableFuture<T> runRpcWithRetries(
+ Supplier<CompletableFuture<T>> futureSupplier) {
+ return Retries.run(
+ backoffPolicy.toBackoffs(),
+ ROOT_RANGE_CLIENT_RETRY_PREDICATE,
+ futureSupplier,
+ scheduler);
+ }
+
+ @Override
+ public CompletableFuture<NamespaceProperties> createNamespace(String namespace,
+ NamespaceConfiguration nsConf) {
+ return runRpcWithRetries(() -> client.createNamespace(namespace, nsConf));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> deleteNamespace(String namespace) {
+ return runRpcWithRetries(() -> client.deleteNamespace(namespace));
+ }
+
+ @Override
+ public CompletableFuture<NamespaceProperties> getNamespace(String namespace) {
+ return runRpcWithRetries(() -> client.getNamespace(namespace));
+ }
+
+ @Override
+ public CompletableFuture<StreamProperties> createStream(String nsName,
+ String streamName,
+ StreamConfiguration streamConf) {
+ return runRpcWithRetries(() ->
+ client.createStream(nsName, streamName, streamConf));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> deleteStream(String nsName, String streamName) {
+ return runRpcWithRetries(() ->
+ client.deleteStream(nsName, streamName));
+ }
+
+ @Override
+ public CompletableFuture<StreamProperties> getStream(String nsName, String streamName) {
+ return runRpcWithRetries(() ->
+ client.getStream(nsName, streamName));
+ }
+
+ @Override
+ public CompletableFuture<StreamProperties> getStream(long streamId) {
+ return runRpcWithRetries(() ->
+ client.getStream(streamId));
+ }
+}
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/StorageServerClientManagerImpl.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/StorageServerClientManagerImpl.java
index 0e64abe..d219b6a 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/StorageServerClientManagerImpl.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/StorageServerClientManagerImpl.java
@@ -79,9 +79,13 @@ public class StorageServerClientManagerImpl
this.channelManager,
this.locationClient,
scheduler);
- this.rootRangeClient = new RootRangeClientImpl(
- scheduler,
- scChannelManager);
+ this.rootRangeClient = new RootRangeClientImplWithRetries(
+ new RootRangeClientImpl(
+ scheduler,
+ scChannelManager),
+ settings.backoffPolicy(),
+ scheduler
+ );
this.streamMetadataCache = new StreamMetadataCache(rootRangeClient);
this.metaRangeClients = Maps.newConcurrentMap();
}
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/mr/MetaRangeRequestProcessor.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/mr/MetaRangeRequestProcessor.java
index f7f2480..cb25e84 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/mr/MetaRangeRequestProcessor.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/mr/MetaRangeRequestProcessor.java
@@ -27,6 +27,7 @@ import java.util.function.Function;
import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
import org.apache.bookkeeper.clients.utils.ListenableFutureRpcProcessor;
+import org.apache.bookkeeper.common.util.Backoff;
import org.apache.bookkeeper.stream.proto.storage.StatusCode;
import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
@@ -41,8 +42,9 @@ public class MetaRangeRequestProcessor<RespT>
StorageContainerRequest request,
Function<StorageContainerResponse, T> responseFunc,
StorageContainerChannel channel,
- ScheduledExecutorService executor) {
- return new MetaRangeRequestProcessor<>(request, responseFunc, channel, executor);
+ ScheduledExecutorService executor,
+ Backoff.Policy backoffPolicy) {
+ return new MetaRangeRequestProcessor<>(request, responseFunc, channel, executor, backoffPolicy);
}
private final StorageContainerRequest request;
@@ -51,8 +53,9 @@ public class MetaRangeRequestProcessor<RespT>
private MetaRangeRequestProcessor(StorageContainerRequest request,
Function<StorageContainerResponse, RespT> responseFunc,
StorageContainerChannel channel,
- ScheduledExecutorService executor) {
- super(channel, executor);
+ ScheduledExecutorService executor,
+ Backoff.Policy backoffPolicy) {
+ super(channel, executor, backoffPolicy);
this.request = request;
this.responseFunc = responseFunc;
}
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/ClientConstants.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/ClientConstants.java
index 1fe486d..42b8fec 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/ClientConstants.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/ClientConstants.java
@@ -18,6 +18,10 @@
package org.apache.bookkeeper.clients.utils;
+import org.apache.bookkeeper.common.util.Backoff.Jitter;
+import org.apache.bookkeeper.common.util.Backoff.Jitter.Type;
+import org.apache.bookkeeper.common.util.Backoff.Policy;
+
/**
* Client related constants.
*/
@@ -32,5 +36,17 @@ public final class ClientConstants {
public static final int DEFAULT_BACKOFF_START_MS = 200;
public static final int DEFAULT_BACKOFF_MAX_MS = 1000;
public static final int DEFAULT_BACKOFF_MULTIPLIER = 2;
+ public static final int DEFAULT_BACKOFF_RETRIES = 3;
+
+ public static final Policy DEFAULT_BACKOFF_POLICY = Jitter.of(
+ Type.EXPONENTIAL,
+ DEFAULT_BACKOFF_START_MS,
+ DEFAULT_BACKOFF_MAX_MS,
+ DEFAULT_BACKOFF_RETRIES);
+
+ public static final Policy DEFAULT_INFINIT_BACKOFF_POLICY = Jitter.of(
+ Type.EXPONENTIAL,
+ DEFAULT_BACKOFF_START_MS,
+ DEFAULT_BACKOFF_MAX_MS);
}
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/ListenableFutureRpcProcessor.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/ListenableFutureRpcProcessor.java
index af49a9b..28ad7ca 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/ListenableFutureRpcProcessor.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/ListenableFutureRpcProcessor.java
@@ -33,7 +33,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
-import org.apache.bookkeeper.common.util.Backoff;
+import org.apache.bookkeeper.common.util.Backoff.Policy;
/**
* A process for processing rpc request on storage container channel.
@@ -44,27 +44,22 @@ public abstract class ListenableFutureRpcProcessor<RequestT, ResponseT, ResultT>
FutureCallback<ResponseT>,
Runnable {
- private static final long startBackoffMs = 200;
- private static final long maxBackoffMs = 2000;
- private static final int maxRetries = 3;
-
private final StorageContainerChannel scChannel;
private final Iterator<Long> backoffs;
private final ScheduledExecutorService executor;
private final CompletableFuture<ResultT> resultFuture;
+ private CompletableFuture<StorageServerChannel> serverChannelFuture = null;
+
protected ListenableFutureRpcProcessor(StorageContainerChannel channel,
- ScheduledExecutorService executor) {
+ ScheduledExecutorService executor,
+ Policy backoffPolicy) {
this.scChannel = channel;
- this.backoffs = configureBackoffs();
+ this.backoffs = backoffPolicy.toBackoffs().iterator();
this.resultFuture = FutureUtils.createFuture();
this.executor = executor;
}
- protected Iterator<Long> configureBackoffs() {
- return Backoff.exponentialJittered(startBackoffMs, maxBackoffMs).limit(maxRetries).iterator();
- }
-
/**
* Create the rpc request for the processor.
*
@@ -88,7 +83,8 @@ public abstract class ListenableFutureRpcProcessor<RequestT, ResponseT, ResultT>
protected abstract ResultT processResponse(ResponseT response) throws Exception;
public CompletableFuture<ResultT> process() {
- scChannel.getStorageContainerChannelFuture().whenCompleteAsync(this, executor);
+ serverChannelFuture = scChannel.getStorageContainerChannelFuture();
+ serverChannelFuture.whenCompleteAsync(this, executor);
return resultFuture;
}
@@ -106,7 +102,9 @@ public abstract class ListenableFutureRpcProcessor<RequestT, ResponseT, ResultT>
@Override
public void accept(StorageServerChannel storageServerChannel, Throwable cause) {
if (null != cause) {
- // failure to retrieve a channel to the server that hosts this storage container
+ // The `StorageContainerChannel` already retry on failures related to server channel,
+ // So we don't need to retry here if failed to retrieve a channel to the server
+ // that hosts this storage container.
resultFuture.completeExceptionally(cause);
return;
}
@@ -141,14 +139,26 @@ public abstract class ListenableFutureRpcProcessor<RequestT, ResponseT, ResultT>
@Override
public void onFailure(Throwable t) {
- boolean shouldRetry = false;
+ Status status = null;
if (t instanceof StatusRuntimeException) {
- shouldRetry = shouldRetryOn(((StatusRuntimeException) t).getStatus());
+ status = ((StatusRuntimeException) t).getStatus();
} else if (t instanceof StatusException) {
- shouldRetry = shouldRetryOn(((StatusException) t).getStatus());
+ status = ((StatusException) t).getStatus();
+ }
+
+ if (Status.NOT_FOUND == status) {
+ // `NOT_FOUND` means storage container is not found. that means:
+ //
+ // - the container is moved to a different server
+ // - the container is not assigned to any servers yet
+ // - the container is assigned, but it is still starting up and not ready for serving
+ //
+ // at either case, we need to reset the storage server channel, so next retry can attempt to re-locate
+ // the storage container again
+ scChannel.resetStorageServerChannelFuture(serverChannelFuture);
}
- if (shouldRetry && backoffs.hasNext()) {
+ if (shouldRetryOn(status) && backoffs.hasNext()) {
long backoffMs = backoffs.next();
executor.schedule(this, backoffMs, TimeUnit.MILLISECONDS);
} else {
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/RpcUtils.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/RpcUtils.java
index d05bfff..46ca205 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/RpcUtils.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/utils/RpcUtils.java
@@ -17,6 +17,9 @@ package org.apache.bookkeeper.clients.utils;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.StatusRuntimeException;
import java.util.concurrent.CompletableFuture;
/**
@@ -48,6 +51,16 @@ public class RpcUtils {
void process(RespT resp, CompletableFuture<T> resultFuture);
}
+ public static boolean isContainerNotFound(Throwable cause) {
+ if (cause instanceof StatusRuntimeException) {
+ return Status.NOT_FOUND == ((StatusRuntimeException) cause).getStatus();
+ } else if (cause instanceof StatusException) {
+ return Status.NOT_FOUND == ((StatusException) cause).getStatus();
+ } else {
+ return false;
+ }
+ }
+
public static <T, ReqT, RespT, ServiceT> void processRpc(
ServiceT service,
CompletableFuture<T> result,
diff --git a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImplWithRetriesTest.java b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImplWithRetriesTest.java
new file mode 100644
index 0000000..fa16002
--- /dev/null
+++ b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/RootRangeClientImplWithRetriesTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.clients.impl.internal;
+
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.clients.impl.internal.api.RootRangeClient;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.common.util.Backoff;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
+import org.apache.bookkeeper.stream.proto.NamespaceProperties;
+import org.apache.bookkeeper.stream.proto.StreamConfiguration;
+import org.apache.bookkeeper.stream.proto.StreamProperties;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test {@link RootRangeClientImplWithRetries}.
+ */
+public class RootRangeClientImplWithRetriesTest {
+
+ private static final int NUM_RETRIES = 3;
+
+ private static final String NS_NAME = "test-namespace";
+ private static final NamespaceConfiguration NS_CONF = NamespaceConfiguration.newBuilder().build();
+ private static final NamespaceProperties NS_PROPS = NamespaceProperties.newBuilder().build();
+ private static final String STREAM_NAME = "test-stream";
+ private static final StreamConfiguration STREAM_CONF = StreamConfiguration.newBuilder().build();
+ private static final StreamProperties STREAM_PROPS = StreamProperties.newBuilder().build();
+
+ private AtomicInteger callCounter;
+ private RootRangeClient client;
+ private OrderedScheduler scheduler;
+ private RootRangeClientImplWithRetries clientWithRetries;
+
+ @Before
+ public void setup() {
+ this.callCounter = new AtomicInteger(NUM_RETRIES);
+ this.client = mock(RootRangeClient.class);
+ this.scheduler = OrderedScheduler.newSchedulerBuilder()
+ .name("test-scheduler")
+ .numThreads(1)
+ .build();
+ this.clientWithRetries = new RootRangeClientImplWithRetries(
+ client,
+ Backoff.Constant.of(10, NUM_RETRIES),
+ scheduler);
+ }
+
+ @Test
+ public void testCreateNamespace() throws Exception {
+ when(client.createNamespace(anyString(), any(NamespaceConfiguration.class)))
+ .thenAnswer(invocationOnMock -> {
+ if (callCounter.decrementAndGet() > 0) {
+ return FutureUtils.exception(new StatusRuntimeException(Status.NOT_FOUND));
+ } else {
+ return FutureUtils.value(NS_PROPS);
+ }
+ });
+
+ assertSame(NS_PROPS, FutureUtils.result(clientWithRetries.createNamespace(NS_NAME, NS_CONF)));
+ }
+
+ @Test
+ public void testDeleteNamespace() throws Exception {
+ when(client.deleteNamespace(anyString()))
+ .thenAnswer(invocationOnMock -> {
+ if (callCounter.decrementAndGet() > 0) {
+ return FutureUtils.exception(new StatusRuntimeException(Status.NOT_FOUND));
+ } else {
+ return FutureUtils.value(true);
+ }
+ });
+
+ assertTrue(FutureUtils.result(clientWithRetries.deleteNamespace(NS_NAME)));
+ }
+
+ @Test
+ public void testGetNamespace() throws Exception {
+ when(client.getNamespace(anyString()))
+ .thenAnswer(invocationOnMock -> {
+ if (callCounter.decrementAndGet() > 0) {
+ return FutureUtils.exception(new StatusRuntimeException(Status.NOT_FOUND));
+ } else {
+ return FutureUtils.value(NS_PROPS);
+ }
+ });
+
+ assertSame(NS_PROPS, FutureUtils.result(clientWithRetries.getNamespace(NS_NAME)));
+ }
+
+ @Test
+ public void testCreateStream() throws Exception {
+ when(client.createStream(anyString(), anyString(), any(StreamConfiguration.class)))
+ .thenAnswer(invocationOnMock -> {
+ if (callCounter.decrementAndGet() > 0) {
+ return FutureUtils.exception(new StatusRuntimeException(Status.NOT_FOUND));
+ } else {
+ return FutureUtils.value(STREAM_PROPS);
+ }
+ });
+
+ assertSame(STREAM_PROPS, FutureUtils.result(clientWithRetries.createStream(NS_NAME, STREAM_NAME, STREAM_CONF)));
+ }
+
+ @Test
+ public void testDeleteStream() throws Exception {
+ when(client.deleteStream(anyString(), anyString()))
+ .thenAnswer(invocationOnMock -> {
+ if (callCounter.decrementAndGet() > 0) {
+ return FutureUtils.exception(new StatusRuntimeException(Status.NOT_FOUND));
+ } else {
+ return FutureUtils.value(true);
+ }
+ });
+
+ assertTrue(FutureUtils.result(clientWithRetries.deleteStream(NS_NAME, STREAM_NAME)));
+ }
+
+ @Test
+ public void testGetStream() throws Exception {
+ when(client.getStream(anyString(), anyString()))
+ .thenAnswer(invocationOnMock -> {
+ if (callCounter.decrementAndGet() > 0) {
+ return FutureUtils.exception(new StatusRuntimeException(Status.NOT_FOUND));
+ } else {
+ return FutureUtils.value(STREAM_PROPS);
+ }
+ });
+
+ assertSame(STREAM_PROPS, FutureUtils.result(clientWithRetries.getStream(NS_NAME, STREAM_NAME)));
+ }
+
+ @Test
+ public void testGetStreamById() throws Exception {
+ when(client.getStream(anyLong()))
+ .thenAnswer(invocationOnMock -> {
+ if (callCounter.decrementAndGet() > 0) {
+ return FutureUtils.exception(new StatusRuntimeException(Status.NOT_FOUND));
+ } else {
+ return FutureUtils.value(STREAM_PROPS);
+ }
+ });
+
+ assertSame(STREAM_PROPS, FutureUtils.result(clientWithRetries.getStream(1234L)));
+ }
+}
diff --git a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/utils/ListenableFutureRpcProcessorTest.java b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/utils/ListenableFutureRpcProcessorTest.java
new file mode 100644
index 0000000..2c229cb
--- /dev/null
+++ b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/utils/ListenableFutureRpcProcessorTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.clients.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test {@link ListenableFutureRpcProcessor}.
+ */
+public class ListenableFutureRpcProcessorTest {
+
+ private ListenableFutureRpcProcessor<String, String, String> processor;
+ private StorageContainerChannel scChannel;
+ private ScheduledExecutorService executor;
+
+ @Before
+ public void setup() {
+ executor = Executors.newSingleThreadScheduledExecutor();
+ scChannel = mock(StorageContainerChannel.class);
+ processor = spy(new ListenableFutureRpcProcessor<String, String, String>(
+ scChannel, executor, ClientConstants.DEFAULT_INFINIT_BACKOFF_POLICY) {
+ @Override
+ protected String createRequest() {
+ return null;
+ }
+
+ @Override
+ protected ListenableFuture<String> sendRPC(StorageServerChannel rsChannel, String s) {
+ return null;
+ }
+
+ @Override
+ protected String processResponse(String response) throws Exception {
+ return null;
+ }
+ });
+ }
+
+ @Test
+ public void testFailToConnect() {
+ CompletableFuture<StorageServerChannel> serverFuture = new CompletableFuture<>();
+ when(scChannel.getStorageContainerChannelFuture()).thenReturn(serverFuture);
+
+ CompletableFuture<String> resultFuture = processor.process();
+ verify(scChannel, times(1)).getStorageContainerChannelFuture();
+
+ // inject channel failure
+ Exception testExc = new Exception("test-exception");
+ serverFuture.completeExceptionally(testExc);
+
+ try {
+ FutureUtils.result(resultFuture);
+ fail("Should fail the process if failed to connect to storage server");
+ } catch (Exception e) {
+ assertSame(testExc, e);
+ }
+ }
+
+ @Test
+ public void testProcessSuccessfully() throws Exception {
+ String request = "request";
+ String response = "response";
+ String result = "result";
+
+ StorageServerChannel serverChannel = mock(StorageServerChannel.class);
+
+ CompletableFuture<StorageServerChannel> serverFuture = new CompletableFuture<>();
+ when(scChannel.getStorageContainerChannelFuture()).thenReturn(serverFuture);
+
+ SettableFuture<String> rpcFuture = SettableFuture.create();
+
+ // mock the process method
+ when(processor.createRequest()).thenReturn(request);
+ when(processor.sendRPC(same(serverChannel), eq(request))).thenReturn(rpcFuture);
+ when(processor.processResponse(eq(response))).thenReturn(result);
+
+ CompletableFuture<String> resultFuture = processor.process();
+ verify(scChannel, times(1)).getStorageContainerChannelFuture();
+
+ // complete the server future to return a mock server channel
+ FutureUtils.complete(serverFuture, serverChannel);
+
+ // complete the rpc future to return the response
+ rpcFuture.set(response);
+
+ assertEquals(result, resultFuture.get());
+ }
+
+ @Test
+ public void testProcessResponseException() throws Exception {
+ String request = "request";
+ String response = "response";
+
+ StorageServerChannel serverChannel = mock(StorageServerChannel.class);
+
+ CompletableFuture<StorageServerChannel> serverFuture = new CompletableFuture<>();
+ when(scChannel.getStorageContainerChannelFuture()).thenReturn(serverFuture);
+
+ SettableFuture<String> rpcFuture = SettableFuture.create();
+
+ Exception testException = new Exception("test-exception");
+
+ // mock the process method
+ when(processor.createRequest()).thenReturn(request);
+ when(processor.sendRPC(same(serverChannel), eq(request))).thenReturn(rpcFuture);
+ when(processor.processResponse(eq(response))).thenThrow(testException);
+
+ CompletableFuture<String> resultFuture = processor.process();
+ verify(scChannel, times(1)).getStorageContainerChannelFuture();
+
+ // complete the server future to return a mock server channel
+ FutureUtils.complete(serverFuture, serverChannel);
+
+ // complete the rpc future to return the response
+ rpcFuture.set(response);
+
+ try {
+ FutureUtils.result(resultFuture);
+ fail("Should throw exception on processing result");
+ } catch (Exception e) {
+ assertSame(testException, e);
+ }
+ }
+
+ @Test
+ public void testProcessRpcException() throws Exception {
+ String request = "request";
+ String response = "response";
+ String result = "result";
+
+ StorageServerChannel serverChannel = mock(StorageServerChannel.class);
+
+ CompletableFuture<StorageServerChannel> serverFuture = new CompletableFuture<>();
+ when(scChannel.getStorageContainerChannelFuture()).thenReturn(serverFuture);
+
+ SettableFuture<String> rpcFuture = SettableFuture.create();
+
+ // mock the process method
+ when(processor.createRequest()).thenReturn(request);
+ when(processor.sendRPC(same(serverChannel), eq(request))).thenReturn(rpcFuture);
+ when(processor.processResponse(eq(response))).thenReturn(result);
+
+ CompletableFuture<String> resultFuture = processor.process();
+ verify(scChannel, times(1)).getStorageContainerChannelFuture();
+
+ // complete the server future to return a mock server channel
+ FutureUtils.complete(serverFuture, serverChannel);
+
+ // complete the rpc future with `Status.INTERNAL`
+ rpcFuture.setException(new StatusRuntimeException(Status.INTERNAL));
+
+ try {
+ FutureUtils.result(resultFuture);
+ fail("Should throw fail immediately if rpc request failed");
+ } catch (Exception e) {
+ assertTrue(e instanceof StatusRuntimeException);
+ StatusRuntimeException sre = (StatusRuntimeException) e;
+ assertEquals(Status.INTERNAL, sre.getStatus());
+ }
+ }
+
+ @Test
+ public void testProcessRetryNotFoundRpcException() throws Exception {
+ String request = "request";
+ String response = "response";
+ String result = "result";
+
+ StorageServerChannel serverChannel = mock(StorageServerChannel.class);
+
+ CompletableFuture<StorageServerChannel> serverFuture = new CompletableFuture<>();
+ when(scChannel.getStorageContainerChannelFuture()).thenReturn(serverFuture);
+
+ AtomicInteger numRpcs = new AtomicInteger(0);
+
+ // mock the process method
+ when(processor.createRequest()).thenReturn(request);
+ when(processor.processResponse(eq(response))).thenReturn(result);
+ when(processor.sendRPC(same(serverChannel), eq(request))).thenAnswer(invocationOnMock -> {
+ SettableFuture<String> rpcFuture = SettableFuture.create();
+ if (numRpcs.getAndIncrement() > 2) {
+ rpcFuture.set(response);
+ } else {
+ rpcFuture.setException(new StatusRuntimeException(Status.NOT_FOUND));
+ }
+ return rpcFuture;
+ });
+
+ CompletableFuture<String> resultFuture = processor.process();
+
+ // complete the server future to return a mock server channel
+ FutureUtils.complete(serverFuture, serverChannel);
+
+ assertEquals(result, FutureUtils.result(resultFuture));
+ verify(scChannel, times(4)).getStorageContainerChannelFuture();
+ }
+}
diff --git a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/utils/RpcUtilsTest.java b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/utils/RpcUtilsTest.java
new file mode 100644
index 0000000..d97d4e4
--- /dev/null
+++ b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/utils/RpcUtilsTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.clients.utils;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import io.grpc.Status;
+import io.grpc.StatusException;
+import io.grpc.StatusRuntimeException;
+import org.junit.Test;
+
+/**
+ * Unit test {@link RpcUtils}.
+ */
+public class RpcUtilsTest {
+
+ @Test
+ public void testIsContainerNotFound() {
+ StatusRuntimeException trueSRE = new StatusRuntimeException(Status.NOT_FOUND);
+ assertTrue(RpcUtils.isContainerNotFound(trueSRE));
+ StatusRuntimeException falseSRE = new StatusRuntimeException(Status.INTERNAL);
+ assertFalse(RpcUtils.isContainerNotFound(falseSRE));
+
+ StatusException trueSE = new StatusException(Status.NOT_FOUND);
+ assertTrue(RpcUtils.isContainerNotFound(trueSE));
+ StatusException falseSE = new StatusException(Status.INTERNAL);
+ assertFalse(RpcUtils.isContainerNotFound(falseSE));
+
+ Exception unknownException = new Exception("unknown");
+ assertFalse(RpcUtils.isContainerNotFound(unknownException));
+ }
+
+}
diff --git a/stream/clients/java/base/src/test/resources/log4j.properties b/stream/clients/java/base/src/test/resources/log4j.properties
new file mode 100644
index 0000000..9405038
--- /dev/null
+++ b/stream/clients/java/base/src/test/resources/log4j.properties
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# DisributedLog Logging Configuration
+#
+
+# Example with rolling log file
+log4j.rootLogger=INFO, CONSOLE
+
+#disable zookeeper logging
+log4j.logger.org.apache.zookeeper=OFF
+#Set the bookkeeper level to warning
+log4j.logger.org.apache.bookkeeper=INFO
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.Threshold=INFO
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
+
+# Add ROLLINGFILE to rootLogger to get log file output
+# Log DEBUG level and above messages to a log file
+#log4j.appender.ROLLINGFILE=org.apache.log4j.DailyRollingFileAppender
+#log4j.appender.ROLLINGFILE.Threshold=INFO
+#log4j.appender.ROLLINGFILE.File=stream.log
+#log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
+#log4j.appender.ROLLINGFILE.DatePattern='.'yyyy-MM-dd-HH-mm
+#log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
+
+log4j.appender.R=org.apache.log4j.RollingFileAppender
+log4j.appender.R.Threshold=TRACE
+log4j.appender.R.File=target/error.log
+log4j.appender.R.MaxFileSize=200MB
+log4j.appender.R.MaxBackupIndex=7
+log4j.appender.R.layout=org.apache.log4j.PatternLayout
+log4j.appender.R.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableImpl.java b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableImpl.java
index 4cb7dc1..1bda971 100644
--- a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableImpl.java
+++ b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableImpl.java
@@ -49,6 +49,7 @@ import org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManage
import org.apache.bookkeeper.clients.impl.routing.RangeRouter;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.router.ByteBufHashRouter;
+import org.apache.bookkeeper.common.util.Backoff;
import org.apache.bookkeeper.stream.proto.StreamProperties;
/**
@@ -157,7 +158,8 @@ public class PByteBufTableImpl implements PTable<ByteBuf, ByteBuf> {
public PByteBufTableImpl(String streamName,
StreamProperties props,
StorageServerClientManager clientManager,
- ScheduledExecutorService executor) {
+ ScheduledExecutorService executor,
+ Backoff.Policy backoffPolicy) {
this(
streamName,
props,
@@ -171,7 +173,8 @@ public class PByteBufTableImpl implements PTable<ByteBuf, ByteBuf> {
executorService,
opFactory,
resultFactory,
- kvFactory),
+ kvFactory,
+ backoffPolicy),
Optional.empty());
}
diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableRangeImpl.java b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableRangeImpl.java
index 1eb6ec8..03c64f7 100644
--- a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableRangeImpl.java
+++ b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableRangeImpl.java
@@ -41,6 +41,7 @@ import org.apache.bookkeeper.api.kv.result.PutResult;
import org.apache.bookkeeper.api.kv.result.RangeResult;
import org.apache.bookkeeper.api.kv.result.TxnResult;
import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import org.apache.bookkeeper.common.util.Backoff;
import org.apache.bookkeeper.stream.proto.RangeProperties;
import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader;
import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
@@ -58,6 +59,7 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
private final OpFactory<ByteBuf, ByteBuf> opFactory;
private final ResultFactory<ByteBuf, ByteBuf> resultFactory;
private final KeyValueFactory<ByteBuf, ByteBuf> kvFactory;
+ private final Backoff.Policy backoffPolicy;
PByteBufTableRangeImpl(long streamId,
RangeProperties rangeProps,
@@ -65,7 +67,8 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
ScheduledExecutorService executor,
OpFactory<ByteBuf, ByteBuf> opFactory,
ResultFactory<ByteBuf, ByteBuf> resultFactory,
- KeyValueFactory<ByteBuf, ByteBuf> kvFactory) {
+ KeyValueFactory<ByteBuf, ByteBuf> kvFactory,
+ Backoff.Policy backoffPolicy) {
this.streamId = streamId;
this.rangeProps = rangeProps;
this.scChannel = scChannel;
@@ -73,6 +76,7 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
this.opFactory = opFactory;
this.resultFactory = resultFactory;
this.kvFactory = kvFactory;
+ this.backoffPolicy = backoffPolicy;
}
private RoutingHeader.Builder newRoutingHeader(ByteBuf pKey) {
@@ -97,7 +101,8 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
.setHeader(newRoutingHeader(pKey))),
response -> KvUtils.newRangeResult(response.getKvRangeResp(), resultFactory, kvFactory),
scChannel,
- executor
+ executor,
+ backoffPolicy
).process().whenComplete((value, cause) -> {
pKey.release();
lKey.release();
@@ -122,7 +127,8 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
.setHeader(newRoutingHeader(pKey))),
response -> KvUtils.newPutResult(response.getKvPutResp(), resultFactory, kvFactory),
scChannel,
- executor
+ executor,
+ backoffPolicy
).process().whenComplete((ignored, cause) -> {
pKey.release();
lKey.release();
@@ -146,7 +152,8 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
.setHeader(newRoutingHeader(pKey))),
response -> KvUtils.newDeleteResult(response.getKvDeleteResp(), resultFactory, kvFactory),
scChannel,
- executor
+ executor,
+ backoffPolicy
).process().whenComplete((ignored, cause) -> {
pKey.release();
lKey.release();
@@ -170,7 +177,8 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
.setHeader(newRoutingHeader(pKey))),
response -> KvUtils.newIncrementResult(response.getKvIncrResp(), resultFactory, kvFactory),
scChannel,
- executor
+ executor,
+ backoffPolicy
).process().whenComplete((ignored, cause) -> {
pKey.release();
lKey.release();
@@ -246,7 +254,8 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
txnBuilder.setHeader(newRoutingHeader(pKey))),
response -> KvUtils.newKvTxnResult(response.getKvTxnResp(), resultFactory, kvFactory),
scChannel,
- executor
+ executor,
+ backoffPolicy
).process().whenComplete((ignored, cause) -> {
pKey.release();
for (AutoCloseable resource : resourcesToRelease) {
diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/TableRequestProcessor.java b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/TableRequestProcessor.java
index 1b9d4cb..f5e0bc5 100644
--- a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/TableRequestProcessor.java
+++ b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/TableRequestProcessor.java
@@ -18,10 +18,12 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
+import java.util.stream.Stream;
import org.apache.bookkeeper.clients.exceptions.InternalServerException;
import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
import org.apache.bookkeeper.clients.utils.ListenableFutureRpcProcessor;
+import org.apache.bookkeeper.common.util.Backoff.Policy;
import org.apache.bookkeeper.stream.proto.storage.StatusCode;
import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
@@ -36,8 +38,9 @@ public class TableRequestProcessor<RespT>
StorageContainerRequest request,
Function<StorageContainerResponse, T> responseFunc,
StorageContainerChannel channel,
- ScheduledExecutorService executor) {
- return new TableRequestProcessor<>(request, responseFunc, channel, executor);
+ ScheduledExecutorService executor,
+ Policy backoffPolicy) {
+ return new TableRequestProcessor<>(request, responseFunc, channel, executor, backoffPolicy);
}
private final StorageContainerRequest request;
@@ -46,8 +49,9 @@ public class TableRequestProcessor<RespT>
private TableRequestProcessor(StorageContainerRequest request,
Function<StorageContainerResponse, RespT> respFunc,
StorageContainerChannel channel,
- ScheduledExecutorService executor) {
- super(channel, executor);
+ ScheduledExecutorService executor,
+ Policy backoffPolicy) {
+ super(channel, executor, backoffPolicy);
this.request = request;
this.responseFunc = respFunc;
}
diff --git a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TableRequestProcessorTest.java b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TableRequestProcessorTest.java
index fc93384..bec76e5 100644
--- a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TableRequestProcessorTest.java
+++ b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TableRequestProcessorTest.java
@@ -38,6 +38,7 @@ import lombok.Cleanup;
import org.apache.bookkeeper.clients.grpc.GrpcClientTestBase;
import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import org.apache.bookkeeper.clients.utils.ClientConstants;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeResponse;
@@ -206,7 +207,8 @@ public class TableRequestProcessorTest extends GrpcClientTestBase {
request,
resp -> "test",
scChannel,
- scheduler);
+ scheduler,
+ ClientConstants.DEFAULT_INFINIT_BACKOFF_POLICY);
assertEquals("test", FutureUtils.result(processor.process()));
assertSame(request, receivedRequest.get());
assertEquals(type, receivedRequestType.get());
diff --git a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TestPByteBufTableImpl.java b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TestPByteBufTableImpl.java
index c2adc66..d25c3c9 100644
--- a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TestPByteBufTableImpl.java
+++ b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TestPByteBufTableImpl.java
@@ -49,6 +49,7 @@ import org.apache.bookkeeper.clients.impl.internal.api.HashStreamRanges;
import org.apache.bookkeeper.clients.impl.internal.api.MetaRangeClient;
import org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
import org.apache.bookkeeper.clients.impl.routing.RangeRouter;
+import org.apache.bookkeeper.clients.utils.ClientConstants;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.router.HashRouter;
import org.apache.bookkeeper.common.util.Bytes;
@@ -135,7 +136,8 @@ public class TestPByteBufTableImpl {
runtime.getMethodName(),
streamProps,
mockClientManager,
- scheduler.chooseThread(1));
+ scheduler.chooseThread(1),
+ ClientConstants.DEFAULT_INFINIT_BACKOFF_POLICY);
try {
FutureUtils.result(table.initialize());
fail("Should fail initializing the table with exception " + cause);
--
To stop receiving notification emails like this one, please contact
sijie@apache.org.