You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/06/04 22:04:00 UTC
[bookkeeper] 06/09: [TABLE SERVICE] remove StorageContainerRequest
and StorageContainerResponse
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit 76b6857725bf25e77e4db285a27692c13c022ebb
Author: Sijie Guo <si...@apache.org>
AuthorDate: Wed May 30 10:18:15 2018 -0700
[TABLE SERVICE] remove StorageContainerRequest and StorageContainerResponse
Descriptions of the changes in this PR:
*Motivation*
#1448 uses reverse proxy for serving storage container requests and responses. so the `StorageContainerRequest` and `StorageContainerResponse`
become redundant.
*Solution*
removes `StorageContainerRequest` and `StorageContainerResponse`
Master Issue: #1205
Author: Sijie Guo <si...@apache.org>
Reviewers: Enrico Olivelli <eo...@gmail.com>, Jia Zhai <None>
This closes #1452 from sijie/remove_storage_container_request_response
---
.../clients/impl/channel/StorageServerChannel.java | 4 +-
.../clients/impl/internal/MetaRangeClientImpl.java | 6 +-
.../internal/mr/MetaRangeRequestProcessor.java | 46 ++--
.../impl/internal/TestMetaRangeClientImpl.java | 20 +-
.../clients/impl/kv/DeleteRequestProcessor.java | 94 ++++++++
.../clients/impl/kv/IncrementRequestProcessor.java | 94 ++++++++
.../apache/bookkeeper/clients/impl/kv/KvUtils.java | 47 ----
.../clients/impl/kv/PByteBufTableRangeImpl.java | 52 ++--
.../clients/impl/kv/PutRequestProcessor.java | 94 ++++++++
.../clients/impl/kv/RangeRequestProcessor.java | 94 ++++++++
.../clients/impl/kv/TableRequestProcessor.java | 93 -------
.../clients/impl/kv/TxnRequestProcessor.java | 94 ++++++++
.../impl/kv/DeleteRequestProcessorTest.java | 115 +++++++++
.../impl/kv/IncrementRequestProcessorTest.java | 115 +++++++++
.../clients/impl/kv/PutRequestProcessorTest.java | 115 +++++++++
.../clients/impl/kv/RangeRequestProcessorTest.java | 115 +++++++++
.../clients/impl/kv/TableRequestProcessorTest.java | 217 -----------------
.../bookkeeper/clients/impl/kv/TestKvUtils.java | 65 -----
.../clients/impl/kv/TxnRequestProcessorTest.java | 115 +++++++++
.../stream/protocol/util/ProtoUtils.java | 25 +-
stream/proto/src/main/proto/kv_rpc.proto | 20 +-
stream/proto/src/main/proto/storage.proto | 50 +---
.../stream/storage/api/kv/TableStore.java | 22 +-
.../storage/api/metadata/MetaRangeStore.java | 6 +-
.../storage/impl/grpc/GrpcMetaRangeService.java | 21 +-
.../stream/storage/impl/grpc/GrpcTableService.java | 96 ++++++--
.../handler/StorageContainerResponseHandler.java | 44 ----
.../stream/storage/impl/kv/TableStoreImpl.java | 105 ++++----
.../stream/storage/impl/kv/TableStoreUtils.java | 5 +
.../storage/impl/metadata/MetaRangeStoreImpl.java | 35 +--
.../impl/service/FailRequestRangeStoreService.java | 59 +++--
.../impl/service/RangeStoreServiceImpl.java | 57 ++---
.../impl/TestStorageContainerStoreImpl.java | 135 +++++------
.../impl/grpc/TestGrpcMetaRangeService.java | 34 +--
.../storage/impl/grpc/TestGrpcTableService.java | 144 ++++++-----
.../TestStorageContainerResponseHandler.java | 115 ---------
.../stream/storage/impl/kv/TableStoreImplTest.java | 267 +++++++++------------
.../impl/metadata/MetaRangeStoreImplTest.java | 28 +--
.../impl/service/RangeStoreServiceImplTest.java | 155 ++++++------
39 files changed, 1690 insertions(+), 1328 deletions(-)
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
index d9e9388..8660aab 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/channel/StorageServerChannel.java
@@ -31,14 +31,14 @@ import org.apache.bookkeeper.clients.impl.container.StorageContainerClientInterc
import org.apache.bookkeeper.clients.resolver.EndpointResolver;
import org.apache.bookkeeper.clients.utils.GrpcUtils;
import org.apache.bookkeeper.stream.proto.common.Endpoint;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc.TableServiceFutureStub;
import org.apache.bookkeeper.stream.proto.storage.MetaRangeServiceGrpc;
import org.apache.bookkeeper.stream.proto.storage.MetaRangeServiceGrpc.MetaRangeServiceFutureStub;
import org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc;
import org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc.RootRangeServiceFutureStub;
import org.apache.bookkeeper.stream.proto.storage.StorageContainerServiceGrpc;
import org.apache.bookkeeper.stream.proto.storage.StorageContainerServiceGrpc.StorageContainerServiceFutureStub;
-import org.apache.bookkeeper.stream.proto.storage.TableServiceGrpc;
-import org.apache.bookkeeper.stream.proto.storage.TableServiceGrpc.TableServiceFutureStub;
/**
* A channel connected to a range server.
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/MetaRangeClientImpl.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/MetaRangeClientImpl.java
index 10481f1..7e61b58 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/MetaRangeClientImpl.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/MetaRangeClientImpl.java
@@ -78,10 +78,8 @@ class MetaRangeClientImpl implements MetaRangeClient {
@Override
public CompletableFuture<HashStreamRanges> getActiveDataRanges() {
return MetaRangeRequestProcessor.of(
- createGetActiveRangesRequest(
- scClient.getStorageContainerId(),
- streamProps),
- (response) -> createActiveRanges(response.getGetActiveRangesResp()),
+ createGetActiveRangesRequest(streamProps),
+ (response) -> createActiveRanges(response),
scClient,
executor,
backoffPolicy
diff --git a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/mr/MetaRangeRequestProcessor.java b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/mr/MetaRangeRequestProcessor.java
index cb25e84..39e9a5a 100644
--- a/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/mr/MetaRangeRequestProcessor.java
+++ b/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/impl/internal/mr/MetaRangeRequestProcessor.java
@@ -21,37 +21,36 @@ package org.apache.bookkeeper.clients.impl.internal.mr;
import static org.apache.bookkeeper.clients.impl.internal.ProtocolInternalUtils.createMetaRangeException;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
import org.apache.bookkeeper.clients.utils.ListenableFutureRpcProcessor;
import org.apache.bookkeeper.common.util.Backoff;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
import org.apache.bookkeeper.stream.proto.storage.StatusCode;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
/**
* Request Processor processing meta range request.
*/
public class MetaRangeRequestProcessor<RespT>
- extends ListenableFutureRpcProcessor<StorageContainerRequest, StorageContainerResponse, RespT> {
+ extends ListenableFutureRpcProcessor<GetActiveRangesRequest, GetActiveRangesResponse, RespT> {
public static <T> MetaRangeRequestProcessor<T> of(
- StorageContainerRequest request,
- Function<StorageContainerResponse, T> responseFunc,
+ GetActiveRangesRequest request,
+ Function<GetActiveRangesResponse, T> responseFunc,
StorageContainerChannel channel,
ScheduledExecutorService executor,
Backoff.Policy backoffPolicy) {
return new MetaRangeRequestProcessor<>(request, responseFunc, channel, executor, backoffPolicy);
}
- private final StorageContainerRequest request;
- private final Function<StorageContainerResponse, RespT> responseFunc;
+ private final GetActiveRangesRequest request;
+ private final Function<GetActiveRangesResponse, RespT> responseFunc;
- private MetaRangeRequestProcessor(StorageContainerRequest request,
- Function<StorageContainerResponse, RespT> responseFunc,
+ private MetaRangeRequestProcessor(GetActiveRangesRequest request,
+ Function<GetActiveRangesResponse, RespT> responseFunc,
StorageContainerChannel channel,
ScheduledExecutorService executor,
Backoff.Policy backoffPolicy) {
@@ -61,34 +60,23 @@ public class MetaRangeRequestProcessor<RespT>
}
@Override
- protected StorageContainerRequest createRequest() {
+ protected GetActiveRangesRequest createRequest() {
return request;
}
@Override
- protected ListenableFuture<StorageContainerResponse> sendRPC(StorageServerChannel rsChannel,
- StorageContainerRequest request) {
- switch (request.getRequestCase()) {
- case GET_ACTIVE_RANGES_REQ:
- return rsChannel.getMetaRangeService().getActiveRanges(request);
- default:
- SettableFuture<StorageContainerResponse> respFuture = SettableFuture.create();
- respFuture.setException(new Exception("Unknown request " + request));
- return respFuture;
- }
+ protected ListenableFuture<GetActiveRangesResponse> sendRPC(StorageServerChannel rsChannel,
+ GetActiveRangesRequest request) {
+ return rsChannel.getMetaRangeService().getActiveRanges(request);
}
- private String getIdentifier(StorageContainerRequest request) {
- switch (request.getRequestCase()) {
- case GET_ACTIVE_RANGES_REQ:
- return "" + request.getGetActiveRangesReq().getStreamId();
- default:
- return "";
- }
+ private String getIdentifier(GetActiveRangesRequest request) {
+
+ return "" + request.getStreamId();
}
@Override
- protected RespT processResponse(StorageContainerResponse response) throws Exception {
+ protected RespT processResponse(GetActiveRangesResponse response) throws Exception {
if (StatusCode.SUCCESS == response.getCode()) {
return responseFunc.apply(response);
}
diff --git a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/TestMetaRangeClientImpl.java b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/TestMetaRangeClientImpl.java
index 189b849..9716162 100644
--- a/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/TestMetaRangeClientImpl.java
+++ b/stream/clients/java/base/src/test/java/org/apache/bookkeeper/clients/impl/internal/TestMetaRangeClientImpl.java
@@ -45,13 +45,12 @@ import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.stream.proto.RangeProperties;
import org.apache.bookkeeper.stream.proto.StreamConfiguration;
import org.apache.bookkeeper.stream.proto.StreamProperties;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
import org.apache.bookkeeper.stream.proto.storage.MetaRangeServiceGrpc.MetaRangeServiceImplBase;
import org.apache.bookkeeper.stream.proto.storage.RelatedRanges;
import org.apache.bookkeeper.stream.proto.storage.RelationType;
import org.apache.bookkeeper.stream.proto.storage.StatusCode;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
import org.junit.Test;
/**
@@ -129,21 +128,18 @@ public class TestMetaRangeClientImpl extends GrpcClientTestBase {
// create response
GetActiveRangesResponse getActiveRangesResponse = GetActiveRangesResponse.newBuilder()
+ .setCode(StatusCode.SUCCESS)
.addRanges(
buildRelatedRange(Long.MIN_VALUE, 0L, 123L, 1L, Lists.newArrayList(113L))
).addRanges(
buildRelatedRange(0L, Long.MAX_VALUE, 124L, 2L, Lists.newArrayList(114L))
).build();
- StorageContainerResponse response = StorageContainerResponse.newBuilder()
- .setCode(StatusCode.SUCCESS)
- .setGetActiveRangesResp(getActiveRangesResponse)
- .build();
MetaRangeServiceImplBase metaRangeService = new MetaRangeServiceImplBase() {
@Override
- public void getActiveRanges(StorageContainerRequest request,
- StreamObserver<StorageContainerResponse> responseObserver) {
- responseObserver.onNext(response);
+ public void getActiveRanges(GetActiveRangesRequest request,
+ StreamObserver<GetActiveRangesResponse> responseObserver) {
+ responseObserver.onNext(getActiveRangesResponse);
responseObserver.onCompleted();
}
};
@@ -154,7 +150,7 @@ public class TestMetaRangeClientImpl extends GrpcClientTestBase {
Optional.empty());
serviceFuture.complete(rsChannel);
- HashStreamRanges expectedStream = createActiveRanges(response.getGetActiveRangesResp());
+ HashStreamRanges expectedStream = createActiveRanges(getActiveRangesResponse);
CompletableFuture<HashStreamRanges> getFuture = metaRangeClient.getActiveDataRanges();
assertEquals(expectedStream, getFuture.get());
}
@@ -166,8 +162,8 @@ public class TestMetaRangeClientImpl extends GrpcClientTestBase {
MetaRangeServiceImplBase metaRangeService = new MetaRangeServiceImplBase() {
@Override
- public void getActiveRanges(StorageContainerRequest request,
- StreamObserver<StorageContainerResponse> responseObserver) {
+ public void getActiveRanges(GetActiveRangesRequest request,
+ StreamObserver<GetActiveRangesResponse> responseObserver) {
responseObserver.onError(new StatusRuntimeException(Status.INTERNAL));
}
};
diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/DeleteRequestProcessor.java b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/DeleteRequestProcessor.java
new file mode 100644
index 0000000..88287df
--- /dev/null
+++ b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/DeleteRequestProcessor.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.clients.impl.kv;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
+import org.apache.bookkeeper.clients.exceptions.InternalServerException;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import org.apache.bookkeeper.clients.utils.ListenableFutureRpcProcessor;
+import org.apache.bookkeeper.common.util.Backoff.Policy;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeResponse;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+
+/**
+ * Request Processor processing table request.
+ */
+class DeleteRequestProcessor<RespT>
+ extends ListenableFutureRpcProcessor<DeleteRangeRequest, DeleteRangeResponse, RespT> {
+
+ public static <T> DeleteRequestProcessor<T> of(
+ DeleteRangeRequest request,
+ Function<DeleteRangeResponse, T> responseFunc,
+ StorageContainerChannel channel,
+ ScheduledExecutorService executor,
+ Policy backoffPolicy) {
+ return new DeleteRequestProcessor<>(request, responseFunc, channel, executor, backoffPolicy);
+ }
+
+ private final DeleteRangeRequest request;
+ private final Function<DeleteRangeResponse, RespT> responseFunc;
+
+ private DeleteRequestProcessor(DeleteRangeRequest request,
+ Function<DeleteRangeResponse, RespT> respFunc,
+ StorageContainerChannel channel,
+ ScheduledExecutorService executor,
+ Policy backoffPolicy) {
+ super(channel, executor, backoffPolicy);
+ this.request = request;
+ this.responseFunc = respFunc;
+ }
+
+ @Override
+ protected DeleteRangeRequest createRequest() {
+ return request;
+ }
+
+ @Override
+ protected ListenableFuture<DeleteRangeResponse> sendRPC(StorageServerChannel rsChannel,
+ DeleteRangeRequest request) {
+ return rsChannel.getTableService().delete(request);
+ }
+
+ @Override
+ protected RespT processResponse(DeleteRangeResponse response) throws Exception {
+ if (StatusCode.SUCCESS == response.getHeader().getCode()) {
+ return responseFunc.apply(response);
+ }
+ throw new InternalServerException("Encountered internal server exception : code = "
+ + response.getHeader().getCode());
+ }
+}
diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/IncrementRequestProcessor.java b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/IncrementRequestProcessor.java
new file mode 100644
index 0000000..f41c97b
--- /dev/null
+++ b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/IncrementRequestProcessor.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.clients.impl.kv;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
+import org.apache.bookkeeper.clients.exceptions.InternalServerException;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import org.apache.bookkeeper.clients.utils.ListenableFutureRpcProcessor;
+import org.apache.bookkeeper.common.util.Backoff.Policy;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementResponse;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+
+/**
+ * Request Processor processing table request.
+ */
+class IncrementRequestProcessor<RespT>
+ extends ListenableFutureRpcProcessor<IncrementRequest, IncrementResponse, RespT> {
+
+ public static <T> IncrementRequestProcessor<T> of(
+ IncrementRequest request,
+ Function<IncrementResponse, T> responseFunc,
+ StorageContainerChannel channel,
+ ScheduledExecutorService executor,
+ Policy backoffPolicy) {
+ return new IncrementRequestProcessor<>(request, responseFunc, channel, executor, backoffPolicy);
+ }
+
+ private final IncrementRequest request;
+ private final Function<IncrementResponse, RespT> responseFunc;
+
+ private IncrementRequestProcessor(IncrementRequest request,
+ Function<IncrementResponse, RespT> respFunc,
+ StorageContainerChannel channel,
+ ScheduledExecutorService executor,
+ Policy backoffPolicy) {
+ super(channel, executor, backoffPolicy);
+ this.request = request;
+ this.responseFunc = respFunc;
+ }
+
+ @Override
+ protected IncrementRequest createRequest() {
+ return request;
+ }
+
+ @Override
+ protected ListenableFuture<IncrementResponse> sendRPC(StorageServerChannel rsChannel,
+ IncrementRequest request) {
+ return rsChannel.getTableService().increment(request);
+ }
+
+ @Override
+ protected RespT processResponse(IncrementResponse response) throws Exception {
+ if (StatusCode.SUCCESS == response.getHeader().getCode()) {
+ return responseFunc.apply(response);
+ }
+ throw new InternalServerException("Encountered internal server exception : code = "
+ + response.getHeader().getCode());
+ }
+}
diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/KvUtils.java b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/KvUtils.java
index 2fd27ce..279219b 100644
--- a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/KvUtils.java
+++ b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/KvUtils.java
@@ -52,9 +52,7 @@ import org.apache.bookkeeper.stream.proto.kv.rpc.PutResponse;
import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
import org.apache.bookkeeper.stream.proto.kv.rpc.RequestOp;
-import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
import org.apache.bookkeeper.stream.proto.kv.rpc.TxnResponse;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
/**
* K/V related utils.
@@ -86,15 +84,6 @@ public final class KvUtils {
return Lists.transform(kvs, kv -> fromProtoKeyValue(kv, kvFactory));
}
- public static StorageContainerRequest newKvRangeRequest(
- long scId,
- RangeRequest.Builder rangeReq) {
- return StorageContainerRequest.newBuilder()
- .setScId(scId)
- .setKvRangeReq(rangeReq)
- .build();
- }
-
public static RangeRequest.Builder newRangeRequest(ByteBuf key, RangeOption<ByteBuf> option) {
RangeRequest.Builder builder = RangeRequest.newBuilder()
.setKey(toProtoKey(key))
@@ -121,15 +110,6 @@ public final class KvUtils {
.kvs(fromProtoKeyValues(response.getKvsList(), kvFactory));
}
- public static StorageContainerRequest newKvPutRequest(
- long scId,
- PutRequest.Builder putReq) {
- return StorageContainerRequest.newBuilder()
- .setScId(scId)
- .setKvPutReq(putReq)
- .build();
- }
-
public static PutRequest.Builder newPutRequest(ByteBuf key,
ByteBuf value,
PutOption<ByteBuf> option) {
@@ -150,15 +130,6 @@ public final class KvUtils {
return result;
}
- public static StorageContainerRequest newKvIncrementRequest(
- long scId,
- IncrementRequest.Builder putReq) {
- return StorageContainerRequest.newBuilder()
- .setScId(scId)
- .setKvIncrReq(putReq)
- .build();
- }
-
public static IncrementRequest.Builder newIncrementRequest(ByteBuf key,
long amount,
IncrementOption<ByteBuf> option) {
@@ -177,15 +148,6 @@ public final class KvUtils {
return result;
}
- public static StorageContainerRequest newKvDeleteRequest(
- long scId,
- DeleteRangeRequest.Builder deleteReq) {
- return StorageContainerRequest.newBuilder()
- .setScId(scId)
- .setKvDeleteReq(deleteReq)
- .build();
- }
-
public static DeleteRangeRequest.Builder newDeleteRequest(ByteBuf key, DeleteOption<ByteBuf> option) {
DeleteRangeRequest.Builder builder = DeleteRangeRequest.newBuilder()
.setKey(UnsafeByteOperations.unsafeWrap(key.nioBuffer()))
@@ -310,15 +272,6 @@ public final class KvUtils {
return reqBuilder;
}
- public static StorageContainerRequest newKvTxnRequest(
- long scId,
- TxnRequest.Builder txnReq) {
- return StorageContainerRequest.newBuilder()
- .setScId(scId)
- .setKvTxnReq(txnReq)
- .build();
- }
-
public static TxnResult<ByteBuf, ByteBuf> newKvTxnResult(
TxnResponse txnResponse,
ResultFactory<ByteBuf, ByteBuf> resultFactory,
diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableRangeImpl.java b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableRangeImpl.java
index 03c64f7..1f07b80 100644
--- a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableRangeImpl.java
+++ b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableRangeImpl.java
@@ -94,12 +94,11 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
if (null != option.endKey()) {
option.endKey().retain();
}
- return TableRequestProcessor.of(
- KvUtils.newKvRangeRequest(
- scChannel.getStorageContainerId(),
- KvUtils.newRangeRequest(lKey, option)
- .setHeader(newRoutingHeader(pKey))),
- response -> KvUtils.newRangeResult(response.getKvRangeResp(), resultFactory, kvFactory),
+ return RangeRequestProcessor.of(
+ KvUtils.newRangeRequest(lKey, option)
+ .setHeader(newRoutingHeader(pKey))
+ .build(),
+ response -> KvUtils.newRangeResult(response, resultFactory, kvFactory),
scChannel,
executor,
backoffPolicy
@@ -120,12 +119,11 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
pKey.retain();
lKey.retain();
value.retain();
- return TableRequestProcessor.of(
- KvUtils.newKvPutRequest(
- scChannel.getStorageContainerId(),
- KvUtils.newPutRequest(lKey, value, option)
- .setHeader(newRoutingHeader(pKey))),
- response -> KvUtils.newPutResult(response.getKvPutResp(), resultFactory, kvFactory),
+ return PutRequestProcessor.of(
+ KvUtils.newPutRequest(lKey, value, option)
+ .setHeader(newRoutingHeader(pKey))
+ .build(),
+ response -> KvUtils.newPutResult(response, resultFactory, kvFactory),
scChannel,
executor,
backoffPolicy
@@ -145,12 +143,11 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
if (null != option.endKey()) {
option.endKey().retain();
}
- return TableRequestProcessor.of(
- KvUtils.newKvDeleteRequest(
- scChannel.getStorageContainerId(),
- KvUtils.newDeleteRequest(lKey, option)
- .setHeader(newRoutingHeader(pKey))),
- response -> KvUtils.newDeleteResult(response.getKvDeleteResp(), resultFactory, kvFactory),
+ return DeleteRequestProcessor.of(
+ KvUtils.newDeleteRequest(lKey, option)
+ .setHeader(newRoutingHeader(pKey))
+ .build(),
+ response -> KvUtils.newDeleteResult(response, resultFactory, kvFactory),
scChannel,
executor,
backoffPolicy
@@ -170,12 +167,11 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
IncrementOption<ByteBuf> option) {
pKey.retain();
lKey.retain();
- return TableRequestProcessor.of(
- KvUtils.newKvIncrementRequest(
- scChannel.getStorageContainerId(),
- KvUtils.newIncrementRequest(lKey, amount, option)
- .setHeader(newRoutingHeader(pKey))),
- response -> KvUtils.newIncrementResult(response.getKvIncrResp(), resultFactory, kvFactory),
+ return IncrementRequestProcessor.of(
+ KvUtils.newIncrementRequest(lKey, amount, option)
+ .setHeader(newRoutingHeader(pKey))
+ .build(),
+ response -> KvUtils.newIncrementResult(response, resultFactory, kvFactory),
scChannel,
executor,
backoffPolicy
@@ -248,11 +244,9 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
@Override
public CompletableFuture<TxnResult<ByteBuf, ByteBuf>> commit() {
- return TableRequestProcessor.of(
- KvUtils.newKvTxnRequest(
- scChannel.getStorageContainerId(),
- txnBuilder.setHeader(newRoutingHeader(pKey))),
- response -> KvUtils.newKvTxnResult(response.getKvTxnResp(), resultFactory, kvFactory),
+ return TxnRequestProcessor.of(
+ txnBuilder.setHeader(newRoutingHeader(pKey)).build(),
+ response -> KvUtils.newKvTxnResult(response, resultFactory, kvFactory),
scChannel,
executor,
backoffPolicy
diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PutRequestProcessor.java b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PutRequestProcessor.java
new file mode 100644
index 0000000..38ae6fc
--- /dev/null
+++ b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PutRequestProcessor.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.clients.impl.kv;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
+import org.apache.bookkeeper.clients.exceptions.InternalServerException;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import org.apache.bookkeeper.clients.utils.ListenableFutureRpcProcessor;
+import org.apache.bookkeeper.common.util.Backoff.Policy;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutResponse;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+
+/**
+ * Request Processor processing table request.
+ */
+class PutRequestProcessor<RespT>
+ extends ListenableFutureRpcProcessor<PutRequest, PutResponse, RespT> {
+
+ public static <T> PutRequestProcessor<T> of(
+ PutRequest request,
+ Function<PutResponse, T> responseFunc,
+ StorageContainerChannel channel,
+ ScheduledExecutorService executor,
+ Policy backoffPolicy) {
+ return new PutRequestProcessor<>(request, responseFunc, channel, executor, backoffPolicy);
+ }
+
+ private final PutRequest request;
+ private final Function<PutResponse, RespT> responseFunc;
+
+ private PutRequestProcessor(PutRequest request,
+ Function<PutResponse, RespT> respFunc,
+ StorageContainerChannel channel,
+ ScheduledExecutorService executor,
+ Policy backoffPolicy) {
+ super(channel, executor, backoffPolicy);
+ this.request = request;
+ this.responseFunc = respFunc;
+ }
+
+ @Override
+ protected PutRequest createRequest() {
+ return request;
+ }
+
+ @Override
+ protected ListenableFuture<PutResponse> sendRPC(StorageServerChannel rsChannel,
+ PutRequest request) {
+ return rsChannel.getTableService().put(request);
+ }
+
+ @Override
+ protected RespT processResponse(PutResponse response) throws Exception {
+ if (StatusCode.SUCCESS == response.getHeader().getCode()) {
+ return responseFunc.apply(response);
+ }
+ throw new InternalServerException("Encountered internal server exception : code = "
+ + response.getHeader().getCode());
+ }
+}
diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/RangeRequestProcessor.java b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/RangeRequestProcessor.java
new file mode 100644
index 0000000..de2ac87
--- /dev/null
+++ b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/RangeRequestProcessor.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.clients.impl.kv;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
+import org.apache.bookkeeper.clients.exceptions.InternalServerException;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import org.apache.bookkeeper.clients.utils.ListenableFutureRpcProcessor;
+import org.apache.bookkeeper.common.util.Backoff.Policy;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+
+/**
+ * Request Processor processing table request.
+ */
+class RangeRequestProcessor<RespT>
+ extends ListenableFutureRpcProcessor<RangeRequest, RangeResponse, RespT> {
+
+ public static <T> RangeRequestProcessor<T> of(
+ RangeRequest request,
+ Function<RangeResponse, T> responseFunc,
+ StorageContainerChannel channel,
+ ScheduledExecutorService executor,
+ Policy backoffPolicy) {
+ return new RangeRequestProcessor<>(request, responseFunc, channel, executor, backoffPolicy);
+ }
+
+ private final RangeRequest request;
+ private final Function<RangeResponse, RespT> responseFunc;
+
+ private RangeRequestProcessor(RangeRequest request,
+ Function<RangeResponse, RespT> respFunc,
+ StorageContainerChannel channel,
+ ScheduledExecutorService executor,
+ Policy backoffPolicy) {
+ super(channel, executor, backoffPolicy);
+ this.request = request;
+ this.responseFunc = respFunc;
+ }
+
+ @Override
+ protected RangeRequest createRequest() {
+ return request;
+ }
+
+ @Override
+ protected ListenableFuture<RangeResponse> sendRPC(StorageServerChannel rsChannel,
+ RangeRequest request) {
+ return rsChannel.getTableService().range(request);
+ }
+
+ @Override
+ protected RespT processResponse(RangeResponse response) throws Exception {
+ if (StatusCode.SUCCESS == response.getHeader().getCode()) {
+ return responseFunc.apply(response);
+ }
+ throw new InternalServerException("Encountered internal server exception : code = "
+ + response.getHeader().getCode());
+ }
+}
diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/TableRequestProcessor.java b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/TableRequestProcessor.java
deleted file mode 100644
index f5e0bc5..0000000
--- a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/TableRequestProcessor.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bookkeeper.clients.impl.kv;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.function.Function;
-import java.util.stream.Stream;
-import org.apache.bookkeeper.clients.exceptions.InternalServerException;
-import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
-import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
-import org.apache.bookkeeper.clients.utils.ListenableFutureRpcProcessor;
-import org.apache.bookkeeper.common.util.Backoff.Policy;
-import org.apache.bookkeeper.stream.proto.storage.StatusCode;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
-
-/**
- * Request Processor processing table request.
- */
-public class TableRequestProcessor<RespT>
- extends ListenableFutureRpcProcessor<StorageContainerRequest, StorageContainerResponse, RespT> {
-
- public static <T> TableRequestProcessor<T> of(
- StorageContainerRequest request,
- Function<StorageContainerResponse, T> responseFunc,
- StorageContainerChannel channel,
- ScheduledExecutorService executor,
- Policy backoffPolicy) {
- return new TableRequestProcessor<>(request, responseFunc, channel, executor, backoffPolicy);
- }
-
- private final StorageContainerRequest request;
- private final Function<StorageContainerResponse, RespT> responseFunc;
-
- private TableRequestProcessor(StorageContainerRequest request,
- Function<StorageContainerResponse, RespT> respFunc,
- StorageContainerChannel channel,
- ScheduledExecutorService executor,
- Policy backoffPolicy) {
- super(channel, executor, backoffPolicy);
- this.request = request;
- this.responseFunc = respFunc;
- }
-
- @Override
- protected StorageContainerRequest createRequest() {
- return request;
- }
-
- @Override
- protected ListenableFuture<StorageContainerResponse> sendRPC(StorageServerChannel rsChannel,
- StorageContainerRequest request) {
- switch (request.getRequestCase()) {
- case KV_RANGE_REQ:
- return rsChannel.getTableService().range(request);
- case KV_PUT_REQ:
- return rsChannel.getTableService().put(request);
- case KV_DELETE_REQ:
- return rsChannel.getTableService().delete(request);
- case KV_INCR_REQ:
- return rsChannel.getTableService().increment(request);
- case KV_TXN_REQ:
- return rsChannel.getTableService().txn(request);
- default:
- SettableFuture<StorageContainerResponse> respFuture = SettableFuture.create();
- respFuture.setException(new Exception("Unknown request " + request));
- return respFuture;
- }
- }
-
- @Override
- protected RespT processResponse(StorageContainerResponse response) throws Exception {
- if (StatusCode.SUCCESS == response.getCode()) {
- return responseFunc.apply(response);
- }
- throw new InternalServerException("Encountered internal server exception : code = "
- + response.getCode());
- }
-}
diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/TxnRequestProcessor.java b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/TxnRequestProcessor.java
new file mode 100644
index 0000000..a3ed209
--- /dev/null
+++ b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/TxnRequestProcessor.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.clients.impl.kv;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
+import org.apache.bookkeeper.clients.exceptions.InternalServerException;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import org.apache.bookkeeper.clients.utils.ListenableFutureRpcProcessor;
+import org.apache.bookkeeper.common.util.Backoff.Policy;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnResponse;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+
+/**
+ * Request Processor processing table request.
+ */
+class TxnRequestProcessor<RespT>
+ extends ListenableFutureRpcProcessor<TxnRequest, TxnResponse, RespT> {
+
+ public static <T> TxnRequestProcessor<T> of(
+ TxnRequest request,
+ Function<TxnResponse, T> responseFunc,
+ StorageContainerChannel channel,
+ ScheduledExecutorService executor,
+ Policy backoffPolicy) {
+ return new TxnRequestProcessor<>(request, responseFunc, channel, executor, backoffPolicy);
+ }
+
+ private final TxnRequest request;
+ private final Function<TxnResponse, RespT> responseFunc;
+
+ private TxnRequestProcessor(TxnRequest request,
+ Function<TxnResponse, RespT> respFunc,
+ StorageContainerChannel channel,
+ ScheduledExecutorService executor,
+ Policy backoffPolicy) {
+ super(channel, executor, backoffPolicy);
+ this.request = request;
+ this.responseFunc = respFunc;
+ }
+
+ @Override
+ protected TxnRequest createRequest() {
+ return request;
+ }
+
+ @Override
+ protected ListenableFuture<TxnResponse> sendRPC(StorageServerChannel rsChannel,
+ TxnRequest request) {
+ return rsChannel.getTableService().txn(request);
+ }
+
+ @Override
+ protected RespT processResponse(TxnResponse response) throws Exception {
+ if (StatusCode.SUCCESS == response.getHeader().getCode()) {
+ return responseFunc.apply(response);
+ }
+ throw new InternalServerException("Encountered internal server exception : code = "
+ + response.getHeader().getCode());
+ }
+}
diff --git a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/DeleteRequestProcessorTest.java b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/DeleteRequestProcessorTest.java
new file mode 100644
index 0000000..8b01b9b
--- /dev/null
+++ b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/DeleteRequestProcessorTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.clients.impl.kv;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.stub.StreamObserver;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.Cleanup;
+import org.apache.bookkeeper.clients.grpc.GrpcClientTestBase;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import org.apache.bookkeeper.clients.utils.ClientConstants;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.ResponseHeader;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc.TableServiceImplBase;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+import org.junit.Test;
+
+/**
+ * Unit test of {@link DeleteRequestProcessor}.
+ */
+public class DeleteRequestProcessorTest extends GrpcClientTestBase {
+
+ @Override
+ protected void doSetup() throws Exception {
+ }
+
+ @Override
+ protected void doTeardown() throws Exception {
+ }
+
+ protected DeleteRangeResponse newSuccessResponse() {
+ return DeleteRangeResponse.newBuilder()
+ .setHeader(ResponseHeader.newBuilder()
+ .setCode(StatusCode.SUCCESS)
+ .build())
+ .build();
+ }
+
+ protected DeleteRangeRequest newRequest() {
+ return DeleteRangeRequest.newBuilder()
+ .build();
+ }
+
+ @Test
+ public void testProcess() throws Exception {
+ @Cleanup("shutdown") ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+
+ StorageContainerChannel scChannel = mock(StorageContainerChannel.class);
+
+ CompletableFuture<StorageServerChannel> serverChannelFuture = FutureUtils.createFuture();
+ when(scChannel.getStorageContainerChannelFuture()).thenReturn(serverChannelFuture);
+
+ DeleteRangeResponse response = newSuccessResponse();
+
+ AtomicReference<Object> receivedRequest = new AtomicReference<>(null);
+ TableServiceImplBase tableService = new TableServiceImplBase() {
+
+ @Override
+ public void delete(DeleteRangeRequest request,
+ StreamObserver<DeleteRangeResponse> responseObserver) {
+ receivedRequest.set(request);
+ complete(responseObserver);
+ }
+
+ private void complete(StreamObserver<DeleteRangeResponse> responseStreamObserver) {
+ responseStreamObserver.onNext(response);
+ responseStreamObserver.onCompleted();
+ }
+ };
+ serviceRegistry.addService(tableService.bindService());
+ StorageServerChannel ssChannel = new StorageServerChannel(
+ InProcessChannelBuilder.forName(serverName).directExecutor().build(),
+ Optional.empty());
+ serverChannelFuture.complete(ssChannel);
+
+ DeleteRangeRequest request = newRequest();
+
+ DeleteRequestProcessor<String> processor = DeleteRequestProcessor.of(
+ request,
+ resp -> "test",
+ scChannel,
+ scheduler,
+ ClientConstants.DEFAULT_INFINIT_BACKOFF_POLICY);
+ assertEquals("test", FutureUtils.result(processor.process()));
+ assertSame(request, receivedRequest.get());
+ }
+
+}
diff --git a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/IncrementRequestProcessorTest.java b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/IncrementRequestProcessorTest.java
new file mode 100644
index 0000000..1b1ec49
--- /dev/null
+++ b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/IncrementRequestProcessorTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.clients.impl.kv;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.stub.StreamObserver;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.Cleanup;
+import org.apache.bookkeeper.clients.grpc.GrpcClientTestBase;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import org.apache.bookkeeper.clients.utils.ClientConstants;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.ResponseHeader;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc.TableServiceImplBase;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+import org.junit.Test;
+
+/**
+ * Unit test of {@link IncrementRequestProcessor}.
+ */
+public class IncrementRequestProcessorTest extends GrpcClientTestBase {
+
+ @Override
+ protected void doSetup() throws Exception {
+ }
+
+ @Override
+ protected void doTeardown() throws Exception {
+ }
+
+ protected IncrementResponse newSuccessResponse() {
+ return IncrementResponse.newBuilder()
+ .setHeader(ResponseHeader.newBuilder()
+ .setCode(StatusCode.SUCCESS)
+ .build())
+ .build();
+ }
+
+ protected IncrementRequest newRequest() {
+ return IncrementRequest.newBuilder()
+ .build();
+ }
+
+ @Test
+ public void testProcess() throws Exception {
+ @Cleanup("shutdown") ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+
+ StorageContainerChannel scChannel = mock(StorageContainerChannel.class);
+
+ CompletableFuture<StorageServerChannel> serverChannelFuture = FutureUtils.createFuture();
+ when(scChannel.getStorageContainerChannelFuture()).thenReturn(serverChannelFuture);
+
+ IncrementResponse response = newSuccessResponse();
+
+ AtomicReference<Object> receivedRequest = new AtomicReference<>(null);
+ TableServiceImplBase tableService = new TableServiceImplBase() {
+
+ @Override
+ public void increment(IncrementRequest request,
+ StreamObserver<IncrementResponse> responseObserver) {
+ receivedRequest.set(request);
+ complete(responseObserver);
+ }
+
+ private void complete(StreamObserver<IncrementResponse> responseStreamObserver) {
+ responseStreamObserver.onNext(response);
+ responseStreamObserver.onCompleted();
+ }
+ };
+ serviceRegistry.addService(tableService.bindService());
+ StorageServerChannel ssChannel = new StorageServerChannel(
+ InProcessChannelBuilder.forName(serverName).directExecutor().build(),
+ Optional.empty());
+ serverChannelFuture.complete(ssChannel);
+
+ IncrementRequest request = newRequest();
+
+ IncrementRequestProcessor<String> processor = IncrementRequestProcessor.of(
+ request,
+ resp -> "test",
+ scChannel,
+ scheduler,
+ ClientConstants.DEFAULT_INFINIT_BACKOFF_POLICY);
+ assertEquals("test", FutureUtils.result(processor.process()));
+ assertSame(request, receivedRequest.get());
+ }
+
+}
diff --git a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/PutRequestProcessorTest.java b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/PutRequestProcessorTest.java
new file mode 100644
index 0000000..3481a26
--- /dev/null
+++ b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/PutRequestProcessorTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.clients.impl.kv;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.stub.StreamObserver;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.Cleanup;
+import org.apache.bookkeeper.clients.grpc.GrpcClientTestBase;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import org.apache.bookkeeper.clients.utils.ClientConstants;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.ResponseHeader;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc.TableServiceImplBase;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+import org.junit.Test;
+
+/**
+ * Unit test of {@link PutRequestProcessor}.
+ */
+public class PutRequestProcessorTest extends GrpcClientTestBase {
+
+ @Override
+ protected void doSetup() throws Exception {
+ }
+
+ @Override
+ protected void doTeardown() throws Exception {
+ }
+
+ protected PutResponse newSuccessResponse() {
+ return PutResponse.newBuilder()
+ .setHeader(ResponseHeader.newBuilder()
+ .setCode(StatusCode.SUCCESS)
+ .build())
+ .build();
+ }
+
+ protected PutRequest newRequest() {
+ return PutRequest.newBuilder()
+ .build();
+ }
+
+ @Test
+ public void testProcess() throws Exception {
+ @Cleanup("shutdown") ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+
+ StorageContainerChannel scChannel = mock(StorageContainerChannel.class);
+
+ CompletableFuture<StorageServerChannel> serverChannelFuture = FutureUtils.createFuture();
+ when(scChannel.getStorageContainerChannelFuture()).thenReturn(serverChannelFuture);
+
+ PutResponse response = newSuccessResponse();
+
+ AtomicReference<Object> receivedRequest = new AtomicReference<>(null);
+ TableServiceImplBase tableService = new TableServiceImplBase() {
+
+ @Override
+ public void put(PutRequest request,
+ StreamObserver<PutResponse> responseObserver) {
+ receivedRequest.set(request);
+ complete(responseObserver);
+ }
+
+ private void complete(StreamObserver<PutResponse> responseStreamObserver) {
+ responseStreamObserver.onNext(response);
+ responseStreamObserver.onCompleted();
+ }
+ };
+ serviceRegistry.addService(tableService.bindService());
+ StorageServerChannel ssChannel = new StorageServerChannel(
+ InProcessChannelBuilder.forName(serverName).directExecutor().build(),
+ Optional.empty());
+ serverChannelFuture.complete(ssChannel);
+
+ PutRequest request = newRequest();
+
+ PutRequestProcessor<String> processor = PutRequestProcessor.of(
+ request,
+ resp -> "test",
+ scChannel,
+ scheduler,
+ ClientConstants.DEFAULT_INFINIT_BACKOFF_POLICY);
+ assertEquals("test", FutureUtils.result(processor.process()));
+ assertSame(request, receivedRequest.get());
+ }
+
+}
diff --git a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/RangeRequestProcessorTest.java b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/RangeRequestProcessorTest.java
new file mode 100644
index 0000000..9e53df7
--- /dev/null
+++ b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/RangeRequestProcessorTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.clients.impl.kv;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.stub.StreamObserver;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.Cleanup;
+import org.apache.bookkeeper.clients.grpc.GrpcClientTestBase;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import org.apache.bookkeeper.clients.utils.ClientConstants;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.ResponseHeader;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc.TableServiceImplBase;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+import org.junit.Test;
+
+/**
+ * Unit test of {@link RangeRequestProcessor}.
+ */
+public class RangeRequestProcessorTest extends GrpcClientTestBase {
+
+ @Override
+ protected void doSetup() throws Exception {
+ }
+
+ @Override
+ protected void doTeardown() throws Exception {
+ }
+
+ protected RangeResponse newSuccessResponse() {
+ return RangeResponse.newBuilder()
+ .setHeader(ResponseHeader.newBuilder()
+ .setCode(StatusCode.SUCCESS)
+ .build())
+ .build();
+ }
+
+ protected RangeRequest newRequest() {
+ return RangeRequest.newBuilder()
+ .build();
+ }
+
+ @Test
+ public void testProcess() throws Exception {
+ @Cleanup("shutdown") ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+
+ StorageContainerChannel scChannel = mock(StorageContainerChannel.class);
+
+ CompletableFuture<StorageServerChannel> serverChannelFuture = FutureUtils.createFuture();
+ when(scChannel.getStorageContainerChannelFuture()).thenReturn(serverChannelFuture);
+
+ RangeResponse response = newSuccessResponse();
+
+ AtomicReference<Object> receivedRequest = new AtomicReference<>(null);
+ TableServiceImplBase tableService = new TableServiceImplBase() {
+
+ @Override
+ public void range(RangeRequest request,
+ StreamObserver<RangeResponse> responseObserver) {
+ receivedRequest.set(request);
+ complete(responseObserver);
+ }
+
+ private void complete(StreamObserver<RangeResponse> responseStreamObserver) {
+ responseStreamObserver.onNext(response);
+ responseStreamObserver.onCompleted();
+ }
+ };
+ serviceRegistry.addService(tableService.bindService());
+ StorageServerChannel ssChannel = new StorageServerChannel(
+ InProcessChannelBuilder.forName(serverName).directExecutor().build(),
+ Optional.empty());
+ serverChannelFuture.complete(ssChannel);
+
+ RangeRequest request = newRequest();
+
+ RangeRequestProcessor<String> processor = RangeRequestProcessor.of(
+ request,
+ resp -> "test",
+ scChannel,
+ scheduler,
+ ClientConstants.DEFAULT_INFINIT_BACKOFF_POLICY);
+ assertEquals("test", FutureUtils.result(processor.process()));
+ assertSame(request, receivedRequest.get());
+ }
+
+}
diff --git a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TableRequestProcessorTest.java b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TableRequestProcessorTest.java
deleted file mode 100644
index bec76e5..0000000
--- a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TableRequestProcessorTest.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.bookkeeper.clients.impl.kv;
-
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_DELETE_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_INCR_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_PUT_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_RANGE_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_TXN_REQ;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import io.grpc.inprocess.InProcessChannelBuilder;
-import io.grpc.stub.StreamObserver;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicReference;
-import lombok.Cleanup;
-import org.apache.bookkeeper.clients.grpc.GrpcClientTestBase;
-import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
-import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
-import org.apache.bookkeeper.clients.utils.ClientConstants;
-import org.apache.bookkeeper.common.concurrent.FutureUtils;
-import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
-import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeResponse;
-import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
-import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementResponse;
-import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
-import org.apache.bookkeeper.stream.proto.kv.rpc.PutResponse;
-import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
-import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
-import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
-import org.apache.bookkeeper.stream.proto.kv.rpc.TxnResponse;
-import org.apache.bookkeeper.stream.proto.storage.StatusCode;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
-import org.apache.bookkeeper.stream.proto.storage.TableServiceGrpc.TableServiceImplBase;
-import org.junit.Test;
-
-/**
- * Unit test of {@link TableRequestProcessor}.
- */
-public class TableRequestProcessorTest extends GrpcClientTestBase {
-
- @Override
- protected void doSetup() throws Exception {
- }
-
- @Override
- protected void doTeardown() throws Exception {
- }
-
- @Test
- public void testProcessRangeRequest() throws Exception {
- testProcess(KV_RANGE_REQ);
- }
-
- @Test
- public void testProcessPutRequest() throws Exception {
- testProcess(KV_PUT_REQ);
- }
-
- @Test
- public void testProcessDeleteRequest() throws Exception {
- testProcess(KV_DELETE_REQ);
- }
-
- @Test
- public void testProcessIncrementRequest() throws Exception {
- testProcess(KV_INCR_REQ);
- }
-
- @Test
- public void testProcessTxnRequest() throws Exception {
- testProcess(KV_TXN_REQ);
- }
-
- private void testProcess(RequestCase type) throws Exception {
- @Cleanup("shutdown") ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
-
- StorageContainerChannel scChannel = mock(StorageContainerChannel.class);
-
- CompletableFuture<StorageServerChannel> serverChannelFuture = FutureUtils.createFuture();
- when(scChannel.getStorageContainerChannelFuture()).thenReturn(serverChannelFuture);
-
- StorageContainerResponse.Builder respBuilder = StorageContainerResponse.newBuilder()
- .setCode(StatusCode.SUCCESS);
-
- switch (type) {
- case KV_PUT_REQ:
- respBuilder.setKvPutResp(PutResponse.newBuilder().build());
- break;
- case KV_DELETE_REQ:
- respBuilder.setKvDeleteResp(DeleteRangeResponse.newBuilder().build());
- break;
- case KV_RANGE_REQ:
- respBuilder.setKvRangeResp(RangeResponse.newBuilder().build());
- break;
- case KV_INCR_REQ:
- respBuilder.setKvIncrResp(IncrementResponse.newBuilder().build());
- break;
- case KV_TXN_REQ:
- respBuilder.setKvTxnResp(TxnResponse.newBuilder().build());
- break;
- default:
- break;
- }
- StorageContainerResponse response = respBuilder.build();
-
- AtomicReference<StorageContainerRequest> receivedRequest = new AtomicReference<>(null);
- AtomicReference<RequestCase> receivedRequestType = new AtomicReference<>(null);
- TableServiceImplBase tableService = new TableServiceImplBase() {
- @Override
- public void range(StorageContainerRequest request,
- StreamObserver<StorageContainerResponse> responseObserver) {
- receivedRequest.set(request);
- receivedRequestType.set(KV_RANGE_REQ);
- complete(responseObserver);
- }
-
- @Override
- public void put(StorageContainerRequest request,
- StreamObserver<StorageContainerResponse> responseObserver) {
- receivedRequest.set(request);
- receivedRequestType.set(KV_PUT_REQ);
- complete(responseObserver);
- }
-
- @Override
- public void delete(StorageContainerRequest request,
- StreamObserver<StorageContainerResponse> responseObserver) {
- receivedRequest.set(request);
- receivedRequestType.set(KV_DELETE_REQ);
- complete(responseObserver);
- }
-
- @Override
- public void txn(StorageContainerRequest request,
- StreamObserver<StorageContainerResponse> responseObserver) {
- receivedRequest.set(request);
- receivedRequestType.set(KV_TXN_REQ);
- complete(responseObserver);
- }
-
- @Override
- public void increment(StorageContainerRequest request,
- StreamObserver<StorageContainerResponse> responseObserver) {
- receivedRequest.set(request);
- receivedRequestType.set(KV_INCR_REQ);
- complete(responseObserver);
- }
-
- private void complete(StreamObserver<StorageContainerResponse> responseStreamObserver) {
- responseStreamObserver.onNext(response);
- responseStreamObserver.onCompleted();
- }
- };
- serviceRegistry.addService(tableService.bindService());
- StorageServerChannel ssChannel = new StorageServerChannel(
- InProcessChannelBuilder.forName(serverName).directExecutor().build(),
- Optional.empty());
- serverChannelFuture.complete(ssChannel);
-
- StorageContainerRequest.Builder requestBuilder = StorageContainerRequest.newBuilder();
- switch (type) {
- case KV_PUT_REQ:
- requestBuilder.setKvPutReq(PutRequest.newBuilder().build());
- break;
- case KV_DELETE_REQ:
- requestBuilder.setKvDeleteReq(DeleteRangeRequest.newBuilder().build());
- break;
- case KV_RANGE_REQ:
- requestBuilder.setKvRangeReq(RangeRequest.newBuilder().build());
- break;
- case KV_INCR_REQ:
- requestBuilder.setKvIncrReq(IncrementRequest.newBuilder().build());
- break;
- case KV_TXN_REQ:
- requestBuilder.setKvTxnReq(TxnRequest.newBuilder().build());
- break;
- default:
- break;
- }
- StorageContainerRequest request = requestBuilder.build();
-
- TableRequestProcessor<String> processor = TableRequestProcessor.of(
- request,
- resp -> "test",
- scChannel,
- scheduler,
- ClientConstants.DEFAULT_INFINIT_BACKOFF_POLICY);
- assertEquals("test", FutureUtils.result(processor.process()));
- assertSame(request, receivedRequest.get());
- assertEquals(type, receivedRequestType.get());
- }
-
-}
diff --git a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TestKvUtils.java b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TestKvUtils.java
index 3cff0a6..d2df334 100644
--- a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TestKvUtils.java
+++ b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TestKvUtils.java
@@ -17,16 +17,8 @@ package org.apache.bookkeeper.clients.impl.kv;
import static com.google.common.base.Charsets.UTF_8;
import static org.apache.bookkeeper.clients.impl.kv.KvUtils.newDeleteRequest;
import static org.apache.bookkeeper.clients.impl.kv.KvUtils.newIncrementRequest;
-import static org.apache.bookkeeper.clients.impl.kv.KvUtils.newKvDeleteRequest;
-import static org.apache.bookkeeper.clients.impl.kv.KvUtils.newKvIncrementRequest;
-import static org.apache.bookkeeper.clients.impl.kv.KvUtils.newKvPutRequest;
-import static org.apache.bookkeeper.clients.impl.kv.KvUtils.newKvRangeRequest;
import static org.apache.bookkeeper.clients.impl.kv.KvUtils.newPutRequest;
import static org.apache.bookkeeper.clients.impl.kv.KvUtils.newRangeRequest;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_DELETE_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_INCR_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_PUT_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_RANGE_REQ;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -45,7 +37,6 @@ import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
import org.junit.Test;
/**
@@ -89,26 +80,6 @@ public class TestKvUtils {
}
@Test
- public void testNewKvRangeRequest() {
- try (RangeOption<ByteBuf> rangeOption = optionFactory.newRangeOption()
- .endKey(key.retainedDuplicate())
- .countOnly(true)
- .keysOnly(true)
- .limit(10)
- .maxCreateRev(1234L)
- .minCreateRev(234L)
- .maxModRev(2345L)
- .minModRev(1235L)
- .build()) {
- RangeRequest.Builder rrBuilder = newRangeRequest(key, rangeOption);
- StorageContainerRequest request = newKvRangeRequest(scId, rrBuilder);
- assertEquals(scId, request.getScId());
- assertEquals(KV_RANGE_REQ, request.getRequestCase());
- assertEquals(rrBuilder.build(), request.getKvRangeReq());
- }
- }
-
- @Test
public void testNewPutRequest() {
try (PutOption<ByteBuf> option = Options.putAndGet()) {
PutRequest rr = newPutRequest(key, value, option).build();
@@ -120,17 +91,6 @@ public class TestKvUtils {
}
@Test
- public void testNewKvPutRequest() {
- try (PutOption<ByteBuf> option = Options.putAndGet()) {
- PutRequest.Builder putBuilder = newPutRequest(key, value, option);
- StorageContainerRequest request = newKvPutRequest(scId, putBuilder);
- assertEquals(scId, request.getScId());
- assertEquals(KV_PUT_REQ, request.getRequestCase());
- assertEquals(putBuilder.build(), request.getKvPutReq());
- }
- }
-
- @Test
public void testNewIncrementRequest() {
try (IncrementOption<ByteBuf> option = Options.incrementAndGet()) {
IncrementRequest rr = newIncrementRequest(key, 100L, option).build();
@@ -142,17 +102,6 @@ public class TestKvUtils {
}
@Test
- public void testNewKvIncrementRequest() {
- try (IncrementOption<ByteBuf> option = Options.incrementAndGet()) {
- IncrementRequest.Builder incrBuilder = newIncrementRequest(key, 100L, option);
- StorageContainerRequest request = newKvIncrementRequest(scId, incrBuilder);
- assertEquals(scId, request.getScId());
- assertEquals(KV_INCR_REQ, request.getRequestCase());
- assertEquals(incrBuilder.build(), request.getKvIncrReq());
- }
- }
-
- @Test
public void testNewDeleteRequest() {
try (DeleteOption<ByteBuf> option = optionFactory.newDeleteOption()
.endKey(key.retainedDuplicate())
@@ -166,18 +115,4 @@ public class TestKvUtils {
}
}
- @Test
- public void testNewKvDeleteRequest() {
- try (DeleteOption<ByteBuf> option = optionFactory.newDeleteOption()
- .endKey(key.retainedDuplicate())
- .prevKv(true)
- .build()) {
- DeleteRangeRequest.Builder delBuilder = newDeleteRequest(key, option);
- StorageContainerRequest request = newKvDeleteRequest(scId, delBuilder);
- assertEquals(scId, request.getScId());
- assertEquals(KV_DELETE_REQ, request.getRequestCase());
- assertEquals(delBuilder.build(), request.getKvDeleteReq());
- }
- }
-
}
diff --git a/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TxnRequestProcessorTest.java b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TxnRequestProcessorTest.java
new file mode 100644
index 0000000..2efecd0
--- /dev/null
+++ b/stream/clients/java/kv/src/test/java/org/apache/bookkeeper/clients/impl/kv/TxnRequestProcessorTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.clients.impl.kv;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.stub.StreamObserver;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.Cleanup;
+import org.apache.bookkeeper.clients.grpc.GrpcClientTestBase;
+import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
+import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
+import org.apache.bookkeeper.clients.utils.ClientConstants;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.stream.proto.kv.rpc.ResponseHeader;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc.TableServiceImplBase;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnResponse;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
+import org.junit.Test;
+
+/**
+ * Unit test of {@link TxnRequestProcessor}.
+ */
+public class TxnRequestProcessorTest extends GrpcClientTestBase {
+
+ @Override
+ protected void doSetup() throws Exception {
+ }
+
+ @Override
+ protected void doTeardown() throws Exception {
+ }
+
+ protected TxnResponse newSuccessResponse() {
+ return TxnResponse.newBuilder()
+ .setHeader(ResponseHeader.newBuilder()
+ .setCode(StatusCode.SUCCESS)
+ .build())
+ .build();
+ }
+
+ protected TxnRequest newRequest() {
+ return TxnRequest.newBuilder()
+ .build();
+ }
+
+ @Test
+ public void testProcess() throws Exception {
+ @Cleanup("shutdown") ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+
+ StorageContainerChannel scChannel = mock(StorageContainerChannel.class);
+
+ CompletableFuture<StorageServerChannel> serverChannelFuture = FutureUtils.createFuture();
+ when(scChannel.getStorageContainerChannelFuture()).thenReturn(serverChannelFuture);
+
+ TxnResponse response = newSuccessResponse();
+
+ AtomicReference<Object> receivedRequest = new AtomicReference<>(null);
+ TableServiceImplBase tableService = new TableServiceImplBase() {
+
+ @Override
+ public void txn(TxnRequest request,
+ StreamObserver<TxnResponse> responseObserver) {
+ receivedRequest.set(request);
+ complete(responseObserver);
+ }
+
+ private void complete(StreamObserver<TxnResponse> responseStreamObserver) {
+ responseStreamObserver.onNext(response);
+ responseStreamObserver.onCompleted();
+ }
+ };
+ serviceRegistry.addService(tableService.bindService());
+ StorageServerChannel ssChannel = new StorageServerChannel(
+ InProcessChannelBuilder.forName(serverName).directExecutor().build(),
+ Optional.empty());
+ serverChannelFuture.complete(ssChannel);
+
+ TxnRequest request = newRequest();
+
+ TxnRequestProcessor<String> processor = TxnRequestProcessor.of(
+ request,
+ resp -> "test",
+ scChannel,
+ scheduler,
+ ClientConstants.DEFAULT_INFINIT_BACKOFF_POLICY);
+ assertEquals("test", FutureUtils.result(processor.process()));
+ assertSame(request, receivedRequest.get());
+ }
+
+}
diff --git a/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/util/ProtoUtils.java b/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/util/ProtoUtils.java
index 35ed903..687388f 100644
--- a/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/util/ProtoUtils.java
+++ b/stream/proto/src/main/java/org/apache/bookkeeper/stream/protocol/util/ProtoUtils.java
@@ -42,7 +42,6 @@ import org.apache.bookkeeper.stream.proto.storage.OneStorageContainerEndpointReq
import org.apache.bookkeeper.stream.proto.storage.OneStorageContainerEndpointResponse;
import org.apache.bookkeeper.stream.proto.storage.StatusCode;
import org.apache.bookkeeper.stream.proto.storage.StorageContainerEndpoint;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
@@ -224,25 +223,17 @@ public class ProtoUtils {
// Meta Range API
//
- private static StorageContainerRequest.Builder newScRequestBuilder(long scId) {
- return StorageContainerRequest.newBuilder()
- .setScId(scId);
- }
-
- public static StorageContainerRequest createGetActiveRangesRequest(long scId, long streamId) {
- return newScRequestBuilder(scId)
- .setGetActiveRangesReq(GetActiveRangesRequest.newBuilder()
- .setStreamId(streamId))
- .build();
+ public static GetActiveRangesRequest createGetActiveRangesRequest(long streamId) {
+ return GetActiveRangesRequest.newBuilder()
+ .setStreamId(streamId)
+ .build();
}
- public static StorageContainerRequest createGetActiveRangesRequest(long scId,
- StreamProperties streamProps) {
- return newScRequestBuilder(scId)
- .setGetActiveRangesReq(GetActiveRangesRequest.newBuilder()
+ public static GetActiveRangesRequest createGetActiveRangesRequest(StreamProperties streamProps) {
+ return GetActiveRangesRequest.newBuilder()
.setStreamId(streamProps.getStreamId())
- .setStreamProps(streamProps))
- .build();
+ .setStreamProps(streamProps)
+ .build();
}
//
diff --git a/stream/proto/src/main/proto/kv_rpc.proto b/stream/proto/src/main/proto/kv_rpc.proto
index c324bcb..01e5cb6 100644
--- a/stream/proto/src/main/proto/kv_rpc.proto
+++ b/stream/proto/src/main/proto/kv_rpc.proto
@@ -18,6 +18,7 @@
syntax = "proto3";
import "kv.proto";
+import "storage.proto";
package bookkeeper.proto.kv.rpc;
@@ -34,7 +35,7 @@ message RoutingHeader {
bytes r_key = 3;
}
-service KV {
+service TableService {
// Range gets the keys in the range from the key-value store.
// NOT supported yet.
rpc Range(RangeRequest) returns (RangeResponse) {}
@@ -47,7 +48,7 @@ service KV {
// DeleteRange deletes the given range from the key-value store.
// A delete request increments the revision of the key-value store
// and generates a delete event in the event history for every deleted key.
- rpc DeleteRange(DeleteRangeRequest) returns (DeleteRangeResponse) {}
+ rpc Delete(DeleteRangeRequest) returns (DeleteRangeResponse) {}
// Txn processes multiple requests in a single transaction.
// A txn request increments the revision of the key-value store
@@ -55,20 +56,17 @@ service KV {
// It is not allowed to modify the same key several times within one txn.
rpc Txn(TxnRequest) returns (TxnResponse) {}
+ // Increment increments the amount associated with the keys
+ rpc Increment(IncrementRequest) returns (IncrementResponse) {}
+
}
message ResponseHeader {
- // cluster_id is the ID of the cluster which sent the response.
- uint64 cluster_id = 1;
- // member_id is the ID of the member which sent the response.
- uint64 member_id = 2;
- // revision is the key-value store revision when the request was applied.
- int64 revision = 3;
- // raft_term is the raft term when the request was applied.
- uint64 raft_term = 4;
+ // Status Code
+ storage.StatusCode code = 1;
// routing header
- RoutingHeader routing_header = 99;
+ RoutingHeader routing_header = 99;
}
message RangeRequest {
diff --git a/stream/proto/src/main/proto/storage.proto b/stream/proto/src/main/proto/storage.proto
index b84ec2e..7570d29 100644
--- a/stream/proto/src/main/proto/storage.proto
+++ b/stream/proto/src/main/proto/storage.proto
@@ -19,7 +19,6 @@ syntax = "proto3";
import "common.proto";
import "stream.proto";
-import "kv_rpc.proto";
package bookkeeper.proto.storage;
@@ -87,7 +86,8 @@ message GetActiveRangesRequest {
}
message GetActiveRangesResponse {
- repeated RelatedRanges ranges = 1;
+ StatusCode code = 1;
+ repeated RelatedRanges ranges = 2;
}
enum RelationType {
@@ -104,7 +104,7 @@ message RelatedRanges {
// public service for other operations in range server
service MetaRangeService {
- rpc GetActiveRanges(StorageContainerRequest) returns (StorageContainerResponse);
+ rpc GetActiveRanges(GetActiveRangesRequest) returns (GetActiveRangesResponse);
}
//
@@ -188,18 +188,6 @@ service RootRangeService {
}
//
-// KV Service
-//
-
-service TableService {
- rpc Range(StorageContainerRequest) returns (StorageContainerResponse) {}
- rpc Put(StorageContainerRequest) returns (StorageContainerResponse) {}
- rpc Delete(StorageContainerRequest) returns (StorageContainerResponse) {}
- rpc Increment(StorageContainerRequest) returns (StorageContainerResponse) {}
- rpc Txn(StorageContainerRequest) returns (StorageContainerResponse) {}
-}
-
-//
// StorageContainerService
//
@@ -234,35 +222,3 @@ service StorageContainerService {
// Get the storage container endpoints
rpc GetStorageContainerEndpoint(GetStorageContainerEndpointRequest) returns (GetStorageContainerEndpointResponse);
}
-
-message StorageContainerRequest {
- int64 sc_id = 1;
-
- oneof request {
- // stream metadata operations
- GetActiveRangesRequest get_active_ranges_req = 200;
-
- // kv operations
- kv.rpc.RangeRequest kv_range_req = 400;
- kv.rpc.PutRequest kv_put_req = 401;
- kv.rpc.DeleteRangeRequest kv_delete_req = 402;
- kv.rpc.TxnRequest kv_txn_req = 403;
- kv.rpc.IncrementRequest kv_incr_req = 404;
- }
-}
-
-message StorageContainerResponse {
- StatusCode code = 1;
-
- oneof response {
- // stream metadata operations
- GetActiveRangesResponse get_active_ranges_resp = 200;
-
- // kv operations
- kv.rpc.RangeResponse kv_range_resp = 400;
- kv.rpc.PutResponse kv_put_resp = 401;
- kv.rpc.DeleteRangeResponse kv_delete_resp = 402;
- kv.rpc.TxnResponse kv_txn_resp = 403;
- kv.rpc.IncrementResponse kv_incr_resp = 404;
- }
-}
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/kv/TableStore.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/kv/TableStore.java
index 1eaac62..a584a37 100644
--- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/kv/TableStore.java
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/kv/TableStore.java
@@ -18,22 +18,30 @@
package org.apache.bookkeeper.stream.storage.api.kv;
import java.util.concurrent.CompletableFuture;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnResponse;
/**
* The table store that stores and serves tables.
*/
public interface TableStore {
- CompletableFuture<StorageContainerResponse> range(StorageContainerRequest request);
+ CompletableFuture<RangeResponse> range(RangeRequest request);
- CompletableFuture<StorageContainerResponse> put(StorageContainerRequest request);
+ CompletableFuture<PutResponse> put(PutRequest request);
- CompletableFuture<StorageContainerResponse> delete(StorageContainerRequest request);
+ CompletableFuture<DeleteRangeResponse> delete(DeleteRangeRequest request);
- CompletableFuture<StorageContainerResponse> txn(StorageContainerRequest request);
+ CompletableFuture<TxnResponse> txn(TxnRequest request);
- CompletableFuture<StorageContainerResponse> incr(StorageContainerRequest request);
+ CompletableFuture<IncrementResponse> incr(IncrementRequest request);
}
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/metadata/MetaRangeStore.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/metadata/MetaRangeStore.java
index 4215dd5..7ef664d 100644
--- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/metadata/MetaRangeStore.java
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/api/metadata/MetaRangeStore.java
@@ -19,8 +19,8 @@
package org.apache.bookkeeper.stream.storage.api.metadata;
import java.util.concurrent.CompletableFuture;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
/**
* The metadata store that store ranges.
@@ -33,6 +33,6 @@ public interface MetaRangeStore {
* @param request the request
* @return the active ranges
*/
- CompletableFuture<StorageContainerResponse> getActiveRanges(StorageContainerRequest request);
+ CompletableFuture<GetActiveRangesResponse> getActiveRanges(GetActiveRangesRequest request);
}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcMetaRangeService.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcMetaRangeService.java
index 2fe81e7..1fb0e44 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcMetaRangeService.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcMetaRangeService.java
@@ -19,12 +19,12 @@ package org.apache.bookkeeper.stream.storage.impl.grpc;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
import org.apache.bookkeeper.stream.proto.storage.MetaRangeServiceGrpc.MetaRangeServiceImplBase;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
-import org.apache.bookkeeper.stream.storage.impl.grpc.handler.StorageContainerResponseHandler;
-import org.apache.bookkeeper.stream.storage.api.RangeStore;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
+import org.apache.bookkeeper.stream.storage.impl.grpc.handler.ResponseHandler;
/**
* The gRPC protocol based range service.
@@ -44,10 +44,17 @@ public class GrpcMetaRangeService extends MetaRangeServiceImplBase {
//
@Override
- public void getActiveRanges(StorageContainerRequest request,
- StreamObserver<StorageContainerResponse> responseObserver) {
+ public void getActiveRanges(GetActiveRangesRequest request,
+ StreamObserver<GetActiveRangesResponse> responseObserver) {
rangeStore.getActiveRanges(request).whenComplete(
- StorageContainerResponseHandler.of(responseObserver));
+ new ResponseHandler<GetActiveRangesResponse>(responseObserver) {
+ @Override
+ protected GetActiveRangesResponse createErrorResp(Throwable cause) {
+ return GetActiveRangesResponse.newBuilder()
+ .setCode(StatusCode.INTERNAL_SERVER_ERROR)
+ .build();
+ }
+ });
}
}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcTableService.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcTableService.java
index e86759d..f26a7bd 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcTableService.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/GrpcTableService.java
@@ -19,11 +19,21 @@ package org.apache.bookkeeper.stream.storage.impl.grpc;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
-import org.apache.bookkeeper.stream.proto.storage.TableServiceGrpc.TableServiceImplBase;
-import org.apache.bookkeeper.stream.storage.impl.grpc.handler.StorageContainerResponseHandler;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.ResponseHeader;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc.TableServiceImplBase;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnResponse;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
+import org.apache.bookkeeper.stream.storage.impl.grpc.handler.ResponseHandler;
/**
* The gRPC protocol based k/v service.
@@ -39,35 +49,87 @@ public class GrpcTableService extends TableServiceImplBase {
}
@Override
- public void range(StorageContainerRequest request,
- StreamObserver<StorageContainerResponse> responseObserver) {
+ public void range(RangeRequest request,
+ StreamObserver<RangeResponse> responseObserver) {
rangeStore.range(request).whenComplete(
- StorageContainerResponseHandler.of(responseObserver));
+ new ResponseHandler<RangeResponse>(responseObserver) {
+ @Override
+ protected RangeResponse createErrorResp(Throwable cause) {
+ return RangeResponse.newBuilder()
+ .setHeader(ResponseHeader.newBuilder()
+ .setCode(StatusCode.INTERNAL_SERVER_ERROR)
+ .setRoutingHeader(request.getHeader())
+ .build())
+ .build();
+ }
+ });
}
@Override
- public void put(StorageContainerRequest request,
- StreamObserver<StorageContainerResponse> responseObserver) {
+ public void put(PutRequest request,
+ StreamObserver<PutResponse> responseObserver) {
rangeStore.put(request).whenComplete(
- StorageContainerResponseHandler.of(responseObserver));
+ new ResponseHandler<PutResponse>(responseObserver) {
+ @Override
+ protected PutResponse createErrorResp(Throwable cause) {
+ return PutResponse.newBuilder()
+ .setHeader(ResponseHeader.newBuilder()
+ .setCode(StatusCode.INTERNAL_SERVER_ERROR)
+ .setRoutingHeader(request.getHeader())
+ .build())
+ .build();
+ }
+ });
}
@Override
- public void delete(StorageContainerRequest request,
- StreamObserver<StorageContainerResponse> responseObserver) {
+ public void delete(DeleteRangeRequest request,
+ StreamObserver<DeleteRangeResponse> responseObserver) {
rangeStore.delete(request).whenComplete(
- StorageContainerResponseHandler.of(responseObserver));
+ new ResponseHandler<DeleteRangeResponse>(responseObserver) {
+ @Override
+ protected DeleteRangeResponse createErrorResp(Throwable cause) {
+ return DeleteRangeResponse.newBuilder()
+ .setHeader(ResponseHeader.newBuilder()
+ .setCode(StatusCode.INTERNAL_SERVER_ERROR)
+ .setRoutingHeader(request.getHeader())
+ .build())
+ .build();
+ }
+ });
}
@Override
- public void txn(StorageContainerRequest request, StreamObserver<StorageContainerResponse> responseObserver) {
+ public void txn(TxnRequest request,
+ StreamObserver<TxnResponse> responseObserver) {
rangeStore.txn(request).whenComplete(
- StorageContainerResponseHandler.of(responseObserver));
+ new ResponseHandler<TxnResponse>(responseObserver) {
+ @Override
+ protected TxnResponse createErrorResp(Throwable cause) {
+ return TxnResponse.newBuilder()
+ .setHeader(ResponseHeader.newBuilder()
+ .setCode(StatusCode.INTERNAL_SERVER_ERROR)
+ .setRoutingHeader(request.getHeader())
+ .build())
+ .build();
+ }
+ });
}
@Override
- public void increment(StorageContainerRequest request, StreamObserver<StorageContainerResponse> responseObserver) {
+ public void increment(IncrementRequest request,
+ StreamObserver<IncrementResponse> responseObserver) {
rangeStore.incr(request).whenComplete(
- StorageContainerResponseHandler.of(responseObserver));
+ new ResponseHandler<IncrementResponse>(responseObserver) {
+ @Override
+ protected IncrementResponse createErrorResp(Throwable cause) {
+ return IncrementResponse.newBuilder()
+ .setHeader(ResponseHeader.newBuilder()
+ .setCode(StatusCode.INTERNAL_SERVER_ERROR)
+ .setRoutingHeader(request.getHeader())
+ .build())
+ .build();
+ }
+ });
}
}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/StorageContainerResponseHandler.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/StorageContainerResponseHandler.java
deleted file mode 100644
index 42c21a4..0000000
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/StorageContainerResponseHandler.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bookkeeper.stream.storage.impl.grpc.handler;
-
-import io.grpc.stub.StreamObserver;
-import org.apache.bookkeeper.stream.proto.storage.StatusCode;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
-
-/**
- * Response handler to handle storage container response.
- */
-public class StorageContainerResponseHandler extends ResponseHandler<StorageContainerResponse> {
-
- public static StorageContainerResponseHandler of(StreamObserver<StorageContainerResponse> respObserver) {
- return new StorageContainerResponseHandler(respObserver);
- }
-
- private StorageContainerResponseHandler(StreamObserver<StorageContainerResponse> respObserver) {
- super(respObserver);
- }
-
- @Override
- protected StorageContainerResponse createErrorResp(Throwable cause) {
- return StorageContainerResponse.newBuilder()
- .setCode(StatusCode.INTERNAL_SERVER_ERROR)
- .build();
- }
-}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/kv/TableStoreImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/kv/TableStoreImpl.java
index d7ef678..9a9670a 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/kv/TableStoreImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/kv/TableStoreImpl.java
@@ -19,7 +19,6 @@ package org.apache.bookkeeper.stream.storage.impl.kv;
import static org.apache.bookkeeper.stream.storage.impl.kv.TableStoreUtils.fromProtoCompare;
import static org.apache.bookkeeper.stream.storage.impl.kv.TableStoreUtils.handleCause;
-import static org.apache.bookkeeper.stream.storage.impl.kv.TableStoreUtils.mvccCodeToStatusCode;
import static org.apache.bookkeeper.stream.storage.impl.kv.TableStoreUtils.newStoreKey;
import static org.apache.bookkeeper.stream.storage.impl.kv.TableStoreUtils.processDeleteResult;
import static org.apache.bookkeeper.stream.storage.impl.kv.TableStoreUtils.processIncrementResult;
@@ -46,17 +45,18 @@ import org.apache.bookkeeper.api.kv.result.TxnResult;
import org.apache.bookkeeper.statelib.api.mvcc.MVCCAsyncStore;
import org.apache.bookkeeper.stream.proto.kv.rpc.Compare;
import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeResponse;
import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementResponse;
import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutResponse;
import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
import org.apache.bookkeeper.stream.proto.kv.rpc.RequestOp;
+import org.apache.bookkeeper.stream.proto.kv.rpc.ResponseHeader;
import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader;
import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
import org.apache.bookkeeper.stream.proto.kv.rpc.TxnResponse;
-import org.apache.bookkeeper.stream.proto.storage.StatusCode;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
import org.apache.bookkeeper.stream.storage.api.kv.TableStore;
/**
@@ -72,13 +72,11 @@ public class TableStoreImpl implements TableStore {
}
@Override
- public CompletableFuture<StorageContainerResponse> range(StorageContainerRequest request) {
- RangeRequest rangeReq = request.getKvRangeReq();
-
+ public CompletableFuture<RangeResponse> range(RangeRequest rangeReq) {
if (log.isTraceEnabled()) {
log.trace("Received range request {}", rangeReq);
}
- return range(rangeReq)
+ return doRange(rangeReq)
.thenApply(result -> {
try {
RangeResponse rangeResp = processRangeResult(
@@ -89,19 +87,18 @@ public class TableStoreImpl implements TableStore {
result.close();
}
})
- .thenApply(rangeResp -> StorageContainerResponse.newBuilder()
- .setCode(StatusCode.SUCCESS)
- .setKvRangeResp(rangeResp)
- .build())
.exceptionally(cause -> {
log.error("Failed to process range request {}", rangeReq, cause);
- return StorageContainerResponse.newBuilder()
- .setCode(handleCause(cause))
+ return RangeResponse.newBuilder()
+ .setHeader(ResponseHeader.newBuilder()
+ .setCode(handleCause(cause))
+ .setRoutingHeader(rangeReq.getHeader())
+ .build())
.build();
});
}
- private CompletableFuture<RangeResult<byte[], byte[]>> range(RangeRequest request) {
+ private CompletableFuture<RangeResult<byte[], byte[]>> doRange(RangeRequest request) {
RangeOp<byte[], byte[]> op = buildRangeOp(request.getHeader(), request);
return store.range(op)
.whenComplete((rangeResult, throwable) -> op.close());
@@ -143,10 +140,8 @@ public class TableStoreImpl implements TableStore {
}
@Override
- public CompletableFuture<StorageContainerResponse> put(StorageContainerRequest request) {
- PutRequest putReq = request.getKvPutReq();
-
- return put(putReq)
+ public CompletableFuture<PutResponse> put(PutRequest putReq) {
+ return doPut(putReq)
.thenApply(result -> {
try {
return processPutResult(
@@ -156,19 +151,18 @@ public class TableStoreImpl implements TableStore {
result.close();
}
})
- .thenApply(putResp -> StorageContainerResponse.newBuilder()
- .setCode(StatusCode.SUCCESS)
- .setKvPutResp(putResp)
- .build())
.exceptionally(cause -> {
log.error("Failed to process put request {}", putReq, cause);
- return StorageContainerResponse.newBuilder()
- .setCode(handleCause(cause))
+ return PutResponse.newBuilder()
+ .setHeader(ResponseHeader.newBuilder()
+ .setCode(handleCause(cause))
+ .setRoutingHeader(putReq.getHeader())
+ .build())
.build();
});
}
- private CompletableFuture<PutResult<byte[], byte[]>> put(PutRequest request) {
+ private CompletableFuture<PutResult<byte[], byte[]>> doPut(PutRequest request) {
PutOp<byte[], byte[]> op = buildPutOp(request.getHeader(), request);
return store.put(op)
.whenComplete((putResult, throwable) -> op.close());
@@ -187,10 +181,8 @@ public class TableStoreImpl implements TableStore {
}
@Override
- public CompletableFuture<StorageContainerResponse> incr(StorageContainerRequest request) {
- IncrementRequest incrementReq = request.getKvIncrReq();
-
- return increment(incrementReq)
+ public CompletableFuture<IncrementResponse> incr(IncrementRequest incrementReq) {
+ return doIncrement(incrementReq)
.thenApply(result -> {
try {
return processIncrementResult(
@@ -200,19 +192,18 @@ public class TableStoreImpl implements TableStore {
result.close();
}
})
- .thenApply(incrementResp -> StorageContainerResponse.newBuilder()
- .setCode(StatusCode.SUCCESS)
- .setKvIncrResp(incrementResp)
- .build())
.exceptionally(cause -> {
log.error("Failed to process increment request {}", incrementReq, cause);
- return StorageContainerResponse.newBuilder()
- .setCode(handleCause(cause))
+ return IncrementResponse.newBuilder()
+ .setHeader(ResponseHeader.newBuilder()
+ .setCode(handleCause(cause))
+ .setRoutingHeader(incrementReq.getHeader())
+ .build())
.build();
});
}
- private CompletableFuture<IncrementResult<byte[], byte[]>> increment(IncrementRequest request) {
+ private CompletableFuture<IncrementResult<byte[], byte[]>> doIncrement(IncrementRequest request) {
IncrementOp<byte[], byte[]> op = buildIncrementOp(request.getHeader(), request);
return store.increment(op)
.whenComplete((incrementResult, throwable) -> op.close());
@@ -231,10 +222,8 @@ public class TableStoreImpl implements TableStore {
}
@Override
- public CompletableFuture<StorageContainerResponse> delete(StorageContainerRequest request) {
- DeleteRangeRequest deleteReq = request.getKvDeleteReq();
-
- return delete(deleteReq)
+ public CompletableFuture<DeleteRangeResponse> delete(DeleteRangeRequest deleteReq) {
+ return doDelete(deleteReq)
.thenApply(result -> {
try {
return processDeleteResult(
@@ -244,16 +233,15 @@ public class TableStoreImpl implements TableStore {
result.close();
}
})
- .thenApply(deleteResp -> StorageContainerResponse.newBuilder()
- .setCode(StatusCode.SUCCESS)
- .setKvDeleteResp(deleteResp)
- .build())
- .exceptionally(cause -> StorageContainerResponse.newBuilder()
- .setCode(handleCause(cause))
+ .exceptionally(cause -> DeleteRangeResponse.newBuilder()
+ .setHeader(ResponseHeader.newBuilder()
+ .setCode(handleCause(cause))
+ .setRoutingHeader(deleteReq.getHeader())
+ .build())
.build());
}
- private CompletableFuture<DeleteResult<byte[], byte[]>> delete(DeleteRangeRequest request) {
+ private CompletableFuture<DeleteResult<byte[], byte[]>> doDelete(DeleteRangeRequest request) {
DeleteOp<byte[], byte[]> op = buildDeleteOp(request.getHeader(), request);
return store.delete(op)
.whenComplete((deleteResult, throwable) -> op.close());
@@ -277,30 +265,27 @@ public class TableStoreImpl implements TableStore {
}
@Override
- public CompletableFuture<StorageContainerResponse> txn(StorageContainerRequest request) {
- TxnRequest txnReq = request.getKvTxnReq();
-
+ public CompletableFuture<TxnResponse> txn(TxnRequest txnReq) {
if (log.isTraceEnabled()) {
log.trace("Received txn request : {}", txnReq);
}
- return txn(txnReq)
+ return doTxn(txnReq)
.thenApply(txnResult -> {
try {
- TxnResponse txnResponse = processTxnResult(txnReq.getHeader(), txnResult);
- return StorageContainerResponse.newBuilder()
- .setCode(mvccCodeToStatusCode(txnResult.code()))
- .setKvTxnResp(txnResponse)
- .build();
+ return processTxnResult(txnReq.getHeader(), txnResult);
} finally {
txnResult.close();
}
})
- .exceptionally(cause -> StorageContainerResponse.newBuilder()
- .setCode(handleCause(cause))
+ .exceptionally(cause -> TxnResponse.newBuilder()
+ .setHeader(ResponseHeader.newBuilder()
+ .setCode(handleCause(cause))
+ .setRoutingHeader(txnReq.getHeader())
+ .build())
.build());
}
- private CompletableFuture<TxnResult<byte[], byte[]>> txn(TxnRequest request) {
+ private CompletableFuture<TxnResult<byte[], byte[]>> doTxn(TxnRequest request) {
TxnOp<byte[], byte[]> op = buildTxnOp(request);
return store.txn(op)
.whenComplete((txnResult, throwable) -> op.close());
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/kv/TableStoreUtils.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/kv/TableStoreUtils.java
index a9a8dce..964ca62 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/kv/TableStoreUtils.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/kv/TableStoreUtils.java
@@ -158,6 +158,7 @@ final class TableStoreUtils {
ByteString rKey = routingHeader.getRKey();
PutResponse.Builder putRespBuilder = PutResponse.newBuilder()
.setHeader(ResponseHeader.newBuilder()
+ .setCode(mvccCodeToStatusCode(result.code()))
.setRoutingHeader(routingHeader)
.build());
if (null != result.prevKv()) {
@@ -170,6 +171,7 @@ final class TableStoreUtils {
IncrementResult<byte[], byte[]> result) {
IncrementResponse.Builder putRespBuilder = IncrementResponse.newBuilder()
.setHeader(ResponseHeader.newBuilder()
+ .setCode(mvccCodeToStatusCode(result.code()))
.setRoutingHeader(routingHeader)
.build())
.setTotalAmount(result.totalAmount());
@@ -182,6 +184,7 @@ final class TableStoreUtils {
return RangeResponse.newBuilder()
.setCount(result.count())
.setHeader(ResponseHeader.newBuilder()
+ .setCode(mvccCodeToStatusCode(result.code()))
.setRoutingHeader(routingHeader)
.build())
.addAllKvs(Lists.transform(result.kvs(), kv -> newKeyValue(rKey, kv)))
@@ -194,6 +197,7 @@ final class TableStoreUtils {
ByteString rKey = routingHeader.getRKey();
return DeleteRangeResponse.newBuilder()
.setHeader(ResponseHeader.newBuilder()
+ .setCode(mvccCodeToStatusCode(result.code()))
.setRoutingHeader(routingHeader)
.build())
.setDeleted(result.numDeleted())
@@ -205,6 +209,7 @@ final class TableStoreUtils {
TxnResult<byte[], byte[]> txnResult) {
return TxnResponse.newBuilder()
.setHeader(ResponseHeader.newBuilder()
+ .setCode(mvccCodeToStatusCode(txnResult.code()))
.setRoutingHeader(routingHeader)
.build())
.setSucceeded(txnResult.isSuccess())
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImpl.java
index 62502a2..530d4cd 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImpl.java
@@ -18,9 +18,6 @@
package org.apache.bookkeeper.stream.storage.impl.metadata;
-import static com.google.common.base.Preconditions.checkState;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.GET_ACTIVE_RANGES_REQ;
-
import com.google.common.collect.Maps;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -30,12 +27,11 @@ import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.statelib.api.mvcc.MVCCAsyncStore;
import org.apache.bookkeeper.stream.proto.RangeMetadata;
import org.apache.bookkeeper.stream.proto.StreamProperties;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
import org.apache.bookkeeper.stream.proto.storage.RelatedRanges;
import org.apache.bookkeeper.stream.proto.storage.RelationType;
import org.apache.bookkeeper.stream.proto.storage.StatusCode;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
import org.apache.bookkeeper.stream.protocol.util.StorageContainerPlacementPolicy;
import org.apache.bookkeeper.stream.storage.api.metadata.MetaRangeStore;
import org.apache.bookkeeper.stream.storage.api.metadata.stream.MetaRange;
@@ -70,11 +66,11 @@ public class MetaRangeStoreImpl
// Stream API
//
- private CompletableFuture<StorageContainerResponse> createStreamIfMissing(long streamId,
- MetaRangeImpl metaRange,
- StreamProperties streamProps) {
+ private CompletableFuture<GetActiveRangesResponse> createStreamIfMissing(long streamId,
+ MetaRangeImpl metaRange,
+ StreamProperties streamProps) {
if (null == streamProps) {
- return FutureUtils.value(StorageContainerResponse.newBuilder()
+ return FutureUtils.value(GetActiveRangesResponse.newBuilder()
.setCode(StatusCode.STREAM_NOT_FOUND)
.build());
}
@@ -86,7 +82,7 @@ public class MetaRangeStoreImpl
}
return getActiveRanges(metaRange);
} else {
- return FutureUtils.value(StorageContainerResponse.newBuilder()
+ return FutureUtils.value(GetActiveRangesResponse.newBuilder()
.setCode(StatusCode.INTERNAL_SERVER_ERROR)
.build());
}
@@ -94,11 +90,8 @@ public class MetaRangeStoreImpl
}
@Override
- public CompletableFuture<StorageContainerResponse> getActiveRanges(StorageContainerRequest request) {
- checkState(
- GET_ACTIVE_RANGES_REQ == request.getRequestCase(),
- "Wrong request type: %s", request.getRequestCase());
- final long streamId = request.getGetActiveRangesReq().getStreamId();
+ public CompletableFuture<GetActiveRangesResponse> getActiveRanges(GetActiveRangesRequest request) {
+ final long streamId = request.getStreamId();
MetaRangeImpl metaRange = streams.get(streamId);
@@ -107,8 +100,8 @@ public class MetaRangeStoreImpl
return metaRangeImpl.load(streamId)
.thenCompose(mr -> {
if (null == mr) {
- StreamProperties streamProps = request.getGetActiveRangesReq().hasStreamProps()
- ? request.getGetActiveRangesReq().getStreamProps() : null;
+ StreamProperties streamProps = request.hasStreamProps()
+ ? request.getStreamProps() : null;
return createStreamIfMissing(streamId, metaRangeImpl, streamProps);
} else {
synchronized (streams) {
@@ -122,8 +115,7 @@ public class MetaRangeStoreImpl
}
}
- private CompletableFuture<StorageContainerResponse> getActiveRanges(MetaRange metaRange) {
- StorageContainerResponse.Builder scBuilder = StorageContainerResponse.newBuilder();
+ private CompletableFuture<GetActiveRangesResponse> getActiveRanges(MetaRange metaRange) {
GetActiveRangesResponse.Builder respBuilder = GetActiveRangesResponse.newBuilder();
return metaRange.getActiveRanges()
.thenApplyAsync(ranges -> {
@@ -134,12 +126,11 @@ public class MetaRangeStoreImpl
.addAllRelatedRanges(range.getParentsList());
respBuilder.addRanges(rrBuilder);
}
- return scBuilder
+ return respBuilder
.setCode(StatusCode.SUCCESS)
- .setGetActiveRangesResp(respBuilder)
.build();
}, executor)
- .exceptionally(cause -> scBuilder.setCode(StatusCode.INTERNAL_SERVER_ERROR).build());
+ .exceptionally(cause -> respBuilder.setCode(StatusCode.INTERNAL_SERVER_ERROR).build());
}
}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/FailRequestRangeStoreService.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/FailRequestRangeStoreService.java
index e15b808..4a1be55 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/FailRequestRangeStoreService.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/FailRequestRangeStoreService.java
@@ -18,13 +18,21 @@
package org.apache.bookkeeper.stream.storage.impl.service;
-import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROOT_STORAGE_CONTAINER_ID;
-
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnResponse;
import org.apache.bookkeeper.stream.proto.storage.CreateNamespaceRequest;
import org.apache.bookkeeper.stream.proto.storage.CreateNamespaceResponse;
import org.apache.bookkeeper.stream.proto.storage.CreateStreamRequest;
@@ -33,12 +41,12 @@ import org.apache.bookkeeper.stream.proto.storage.DeleteNamespaceRequest;
import org.apache.bookkeeper.stream.proto.storage.DeleteNamespaceResponse;
import org.apache.bookkeeper.stream.proto.storage.DeleteStreamRequest;
import org.apache.bookkeeper.stream.proto.storage.DeleteStreamResponse;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
import org.apache.bookkeeper.stream.proto.storage.GetNamespaceRequest;
import org.apache.bookkeeper.stream.proto.storage.GetNamespaceResponse;
import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest;
import org.apache.bookkeeper.stream.proto.storage.GetStreamResponse;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
/**
@@ -56,11 +64,10 @@ final class FailRequestRangeStoreService implements RangeStoreService {
this.scheduler = scheduler;
}
- private <T> CompletableFuture<T> failWrongGroupRequest(long scId) {
+ private <T> CompletableFuture<T> failWrongGroupRequest() {
CompletableFuture<T> future = FutureUtils.createFuture();
- scheduler.executeOrdered(scId, () -> {
- future.completeExceptionally(new StatusRuntimeException(Status.NOT_FOUND));
- });
+ scheduler.execute(() ->
+ future.completeExceptionally(new StatusRuntimeException(Status.NOT_FOUND)));
return future;
}
@@ -80,17 +87,17 @@ final class FailRequestRangeStoreService implements RangeStoreService {
@Override
public CompletableFuture<CreateNamespaceResponse> createNamespace(CreateNamespaceRequest request) {
- return failWrongGroupRequest(ROOT_STORAGE_CONTAINER_ID);
+ return failWrongGroupRequest();
}
@Override
public CompletableFuture<DeleteNamespaceResponse> deleteNamespace(DeleteNamespaceRequest request) {
- return failWrongGroupRequest(ROOT_STORAGE_CONTAINER_ID);
+ return failWrongGroupRequest();
}
@Override
public CompletableFuture<GetNamespaceResponse> getNamespace(GetNamespaceRequest request) {
- return failWrongGroupRequest(ROOT_STORAGE_CONTAINER_ID);
+ return failWrongGroupRequest();
}
//
@@ -99,17 +106,17 @@ final class FailRequestRangeStoreService implements RangeStoreService {
@Override
public CompletableFuture<CreateStreamResponse> createStream(CreateStreamRequest request) {
- return failWrongGroupRequest(ROOT_STORAGE_CONTAINER_ID);
+ return failWrongGroupRequest();
}
@Override
public CompletableFuture<DeleteStreamResponse> deleteStream(DeleteStreamRequest request) {
- return failWrongGroupRequest(ROOT_STORAGE_CONTAINER_ID);
+ return failWrongGroupRequest();
}
@Override
public CompletableFuture<GetStreamResponse> getStream(GetStreamRequest request) {
- return failWrongGroupRequest(ROOT_STORAGE_CONTAINER_ID);
+ return failWrongGroupRequest();
}
//
@@ -117,8 +124,8 @@ final class FailRequestRangeStoreService implements RangeStoreService {
//
@Override
- public CompletableFuture<StorageContainerResponse> getActiveRanges(StorageContainerRequest request) {
- return failWrongGroupRequest(request.getScId());
+ public CompletableFuture<GetActiveRangesResponse> getActiveRanges(GetActiveRangesRequest request) {
+ return failWrongGroupRequest();
}
//
@@ -127,27 +134,27 @@ final class FailRequestRangeStoreService implements RangeStoreService {
@Override
- public CompletableFuture<StorageContainerResponse> range(StorageContainerRequest request) {
- return failWrongGroupRequest(request.getScId());
+ public CompletableFuture<RangeResponse> range(RangeRequest request) {
+ return failWrongGroupRequest();
}
@Override
- public CompletableFuture<StorageContainerResponse> put(StorageContainerRequest request) {
- return failWrongGroupRequest(request.getScId());
+ public CompletableFuture<PutResponse> put(PutRequest request) {
+ return failWrongGroupRequest();
}
@Override
- public CompletableFuture<StorageContainerResponse> delete(StorageContainerRequest request) {
- return failWrongGroupRequest(request.getScId());
+ public CompletableFuture<DeleteRangeResponse> delete(DeleteRangeRequest request) {
+ return failWrongGroupRequest();
}
@Override
- public CompletableFuture<StorageContainerResponse> txn(StorageContainerRequest request) {
- return failWrongGroupRequest(request.getScId());
+ public CompletableFuture<TxnResponse> txn(TxnRequest request) {
+ return failWrongGroupRequest();
}
@Override
- public CompletableFuture<StorageContainerResponse> incr(StorageContainerRequest request) {
- return failWrongGroupRequest(request.getScId());
+ public CompletableFuture<IncrementResponse> incr(IncrementRequest request) {
+ return failWrongGroupRequest();
}
}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java
index 0304fc8..7adcefd 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java
@@ -18,12 +18,6 @@
package org.apache.bookkeeper.stream.storage.impl.service;
-import static com.google.common.base.Preconditions.checkArgument;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_DELETE_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_INCR_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_PUT_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_RANGE_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_TXN_REQ;
import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.CONTAINER_META_RANGE_ID;
import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.CONTAINER_META_STREAM_ID;
import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROOT_RANGE_ID;
@@ -40,11 +34,16 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeResponse;
import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementResponse;
import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutResponse;
import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader;
import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnResponse;
import org.apache.bookkeeper.stream.proto.storage.CreateNamespaceRequest;
import org.apache.bookkeeper.stream.proto.storage.CreateNamespaceResponse;
import org.apache.bookkeeper.stream.proto.storage.CreateStreamRequest;
@@ -53,12 +52,12 @@ import org.apache.bookkeeper.stream.proto.storage.DeleteNamespaceRequest;
import org.apache.bookkeeper.stream.proto.storage.DeleteNamespaceResponse;
import org.apache.bookkeeper.stream.proto.storage.DeleteStreamRequest;
import org.apache.bookkeeper.stream.proto.storage.DeleteStreamResponse;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
import org.apache.bookkeeper.stream.proto.storage.GetNamespaceRequest;
import org.apache.bookkeeper.stream.proto.storage.GetNamespaceResponse;
import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest;
import org.apache.bookkeeper.stream.proto.storage.GetStreamResponse;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
import org.apache.bookkeeper.stream.protocol.RangeId;
import org.apache.bookkeeper.stream.protocol.util.StorageContainerPlacementPolicy;
import org.apache.bookkeeper.stream.storage.api.kv.TableStore;
@@ -232,7 +231,7 @@ class RangeStoreServiceImpl implements RangeStoreService, AutoCloseable {
//
@Override
- public CompletableFuture<StorageContainerResponse> getActiveRanges(StorageContainerRequest request) {
+ public CompletableFuture<GetActiveRangesResponse> getActiveRanges(GetActiveRangesRequest request) {
return mgStore.getActiveRanges(request);
}
@@ -242,12 +241,8 @@ class RangeStoreServiceImpl implements RangeStoreService, AutoCloseable {
@Override
- public CompletableFuture<StorageContainerResponse> range(StorageContainerRequest request) {
- checkArgument(KV_RANGE_REQ == request.getRequestCase());
-
- long scId = request.getScId();
- RangeRequest rr = request.getKvRangeReq();
- RoutingHeader header = rr.getHeader();
+ public CompletableFuture<RangeResponse> range(RangeRequest request) {
+ RoutingHeader header = request.getHeader();
RangeId rid = RangeId.of(header.getStreamId(), header.getRangeId());
TableStore store = tableStoreCache.getTableStore(rid);
@@ -260,12 +255,8 @@ class RangeStoreServiceImpl implements RangeStoreService, AutoCloseable {
}
@Override
- public CompletableFuture<StorageContainerResponse> put(StorageContainerRequest request) {
- checkArgument(KV_PUT_REQ == request.getRequestCase());
-
- long scId = request.getScId();
- PutRequest rr = request.getKvPutReq();
- RoutingHeader header = rr.getHeader();
+ public CompletableFuture<PutResponse> put(PutRequest request) {
+ RoutingHeader header = request.getHeader();
RangeId rid = RangeId.of(header.getStreamId(), header.getRangeId());
TableStore store = tableStoreCache.getTableStore(rid);
@@ -278,12 +269,8 @@ class RangeStoreServiceImpl implements RangeStoreService, AutoCloseable {
}
@Override
- public CompletableFuture<StorageContainerResponse> delete(StorageContainerRequest request) {
- checkArgument(KV_DELETE_REQ == request.getRequestCase());
-
- long scId = request.getScId();
- DeleteRangeRequest rr = request.getKvDeleteReq();
- RoutingHeader header = rr.getHeader();
+ public CompletableFuture<DeleteRangeResponse> delete(DeleteRangeRequest request) {
+ RoutingHeader header = request.getHeader();
RangeId rid = RangeId.of(header.getStreamId(), header.getRangeId());
TableStore store = tableStoreCache.getTableStore(rid);
@@ -296,12 +283,8 @@ class RangeStoreServiceImpl implements RangeStoreService, AutoCloseable {
}
@Override
- public CompletableFuture<StorageContainerResponse> txn(StorageContainerRequest request) {
- checkArgument(KV_TXN_REQ == request.getRequestCase());
-
- long scId = request.getScId();
- TxnRequest rr = request.getKvTxnReq();
- RoutingHeader header = rr.getHeader();
+ public CompletableFuture<TxnResponse> txn(TxnRequest request) {
+ RoutingHeader header = request.getHeader();
RangeId rid = RangeId.of(header.getStreamId(), header.getRangeId());
TableStore store = tableStoreCache.getTableStore(rid);
@@ -314,12 +297,8 @@ class RangeStoreServiceImpl implements RangeStoreService, AutoCloseable {
}
@Override
- public CompletableFuture<StorageContainerResponse> incr(StorageContainerRequest request) {
- checkArgument(KV_INCR_REQ == request.getRequestCase());
-
- long scId = request.getScId();
- IncrementRequest ir = request.getKvIncrReq();
- RoutingHeader header = ir.getHeader();
+ public CompletableFuture<IncrementResponse> incr(IncrementRequest request) {
+ RoutingHeader header = request.getHeader();
RangeId rid = RangeId.of(header.getStreamId(), header.getRangeId());
TableStore store = tableStoreCache.getTableStore(rid);
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.java
index a269466..9293117 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.java
@@ -16,7 +16,6 @@ package org.apache.bookkeeper.stream.storage.impl;
import static org.apache.bookkeeper.common.util.ListenableFutures.fromListenableFuture;
import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
-import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROOT_STORAGE_CONTAINER_ID;
import static org.apache.bookkeeper.stream.protocol.util.ProtoUtils.createCreateNamespaceRequest;
import static org.apache.bookkeeper.stream.protocol.util.ProtoUtils.createCreateStreamRequest;
import static org.apache.bookkeeper.stream.protocol.util.ProtoUtils.createDeleteNamespaceRequest;
@@ -46,6 +45,7 @@ import io.grpc.inprocess.InProcessServerBuilder;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.clients.impl.container.StorageContainerClientInterceptor;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
@@ -64,6 +64,8 @@ import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
import org.apache.bookkeeper.stream.proto.kv.rpc.ResponseHeader;
import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc.TableServiceFutureStub;
import org.apache.bookkeeper.stream.proto.storage.CreateNamespaceRequest;
import org.apache.bookkeeper.stream.proto.storage.CreateNamespaceResponse;
import org.apache.bookkeeper.stream.proto.storage.CreateStreamRequest;
@@ -72,6 +74,8 @@ import org.apache.bookkeeper.stream.proto.storage.DeleteNamespaceRequest;
import org.apache.bookkeeper.stream.proto.storage.DeleteNamespaceResponse;
import org.apache.bookkeeper.stream.proto.storage.DeleteStreamRequest;
import org.apache.bookkeeper.stream.proto.storage.DeleteStreamResponse;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
import org.apache.bookkeeper.stream.proto.storage.GetNamespaceRequest;
import org.apache.bookkeeper.stream.proto.storage.GetNamespaceResponse;
import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest;
@@ -81,10 +85,6 @@ import org.apache.bookkeeper.stream.proto.storage.MetaRangeServiceGrpc.MetaRange
import org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc;
import org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc.RootRangeServiceFutureStub;
import org.apache.bookkeeper.stream.proto.storage.StatusCode;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
-import org.apache.bookkeeper.stream.proto.storage.TableServiceGrpc;
-import org.apache.bookkeeper.stream.proto.storage.TableServiceGrpc.TableServiceFutureStub;
import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
import org.apache.bookkeeper.stream.storage.api.service.RangeStoreServiceFactory;
import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
@@ -140,6 +140,7 @@ public class TestStorageContainerStoreImpl {
private TableServiceFutureStub tableService;
private RootRangeServiceFutureStub rootRangeService;
private MetaRangeServiceFutureStub metaRangeService;
+ private long scId;
//
// Utils for table api
@@ -156,64 +157,52 @@ public class TestStorageContainerStoreImpl {
.setRoutingHeader(TEST_ROUTING_HEADER)
.build();
- private static StorageContainerRequest createPutRequest(long scId) {
- return StorageContainerRequest.newBuilder()
- .setScId(scId)
- .setKvPutReq(PutRequest.newBuilder()
+ private static PutRequest createPutRequest() {
+ return PutRequest.newBuilder()
.setHeader(TEST_ROUTING_HEADER)
.setKey(TEST_KEY)
.setValue(TEST_VAL)
- .build())
- .build();
+ .build();
}
- private static StorageContainerResponse createPutResponse(StatusCode code) {
- return StorageContainerResponse.newBuilder()
- .setCode(code)
- .setKvPutResp(PutResponse.newBuilder()
- .setHeader(TEST_RESP_HEADER)
- .build())
- .build();
+ private static PutResponse createPutResponse(StatusCode code) {
+ return PutResponse.newBuilder()
+ .setHeader(ResponseHeader.newBuilder(TEST_RESP_HEADER)
+ .setCode(code)
+ .build())
+ .build();
}
- private static StorageContainerRequest createRangeRequest(long scId) {
- return StorageContainerRequest.newBuilder()
- .setScId(scId)
- .setKvRangeReq(RangeRequest.newBuilder()
+ private static RangeRequest createRangeRequest() {
+ return RangeRequest.newBuilder()
.setHeader(TEST_ROUTING_HEADER)
.setKey(TEST_KEY)
- .build())
- .build();
+ .build();
}
- private static StorageContainerResponse createRangeResponse(StatusCode code) {
- return StorageContainerResponse.newBuilder()
- .setCode(code)
- .setKvRangeResp(RangeResponse.newBuilder()
- .setHeader(TEST_RESP_HEADER)
+ private static RangeResponse createRangeResponse(StatusCode code) {
+ return RangeResponse.newBuilder()
+ .setHeader(ResponseHeader.newBuilder(TEST_RESP_HEADER)
+ .setCode(code)
+ .build())
.setCount(0)
- .build())
- .build();
+ .build();
}
- private static StorageContainerRequest createDeleteRequest(long scId) {
- return StorageContainerRequest.newBuilder()
- .setScId(scId)
- .setKvDeleteReq(DeleteRangeRequest.newBuilder()
+ private static DeleteRangeRequest createDeleteRequest() {
+ return DeleteRangeRequest.newBuilder()
.setHeader(TEST_ROUTING_HEADER)
.setKey(TEST_KEY)
- .build())
- .build();
+ .build();
}
- private static StorageContainerResponse createDeleteResponse(StatusCode code) {
- return StorageContainerResponse.newBuilder()
- .setCode(code)
- .setKvDeleteResp(DeleteRangeResponse.newBuilder()
- .setHeader(TEST_RESP_HEADER)
+ private static DeleteRangeResponse createDeleteResponse(StatusCode code) {
+ return DeleteRangeResponse.newBuilder()
+ .setHeader(ResponseHeader.newBuilder(TEST_RESP_HEADER)
+ .setCode(code)
+ .build())
.setDeleted(0)
- .build())
- .build();
+ .build();
}
@SuppressWarnings("unchecked")
@@ -263,10 +252,12 @@ public class TestStorageContainerStoreImpl {
.usePlaintext()
.build();
+ scId = ThreadLocalRandom.current().nextInt(2);
+
// intercept the channel with storage container information.
channel = ClientInterceptors.intercept(
channel,
- new StorageContainerClientInterceptor(0L));
+ new StorageContainerClientInterceptor(scId));
tableService = TableServiceGrpc.newFutureStub(channel);
@@ -307,7 +298,7 @@ public class TestStorageContainerStoreImpl {
@Test
public void testCreateNamespaceNoRootStorageContainerStore() throws Exception {
- rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
+ rangeStore.getRegistry().stopStorageContainer(scId).join();
String colName = "test-create-namespace-no-root-storage-container-store";
verifyNotFoundException(fromListenableFuture(
@@ -317,7 +308,7 @@ public class TestStorageContainerStoreImpl {
@Test
public void testDeleteNamespaceNoRootStorageContainerStore() throws Exception {
- rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
+ rangeStore.getRegistry().stopStorageContainer(scId).join();
String colName = "test-delete-namespace-no-root-storage-container-store";
verifyNotFoundException(fromListenableFuture(
@@ -327,7 +318,7 @@ public class TestStorageContainerStoreImpl {
@Test
public void testGetNamespaceNoRootStorageContainerStore() throws Exception {
- rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
+ rangeStore.getRegistry().stopStorageContainer(scId).join();
String colName = "test-get-namespace-no-root-storage-container-store";
verifyNotFoundException(fromListenableFuture(
@@ -395,7 +386,7 @@ public class TestStorageContainerStoreImpl {
@Test
public void testCreateStreamNoRootStorageContainerStore() throws Exception {
- rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
+ rangeStore.getRegistry().stopStorageContainer(scId).join();
String colName = "test-create-namespace-no-root-storage-container-store";
String streamName = colName;
@@ -406,7 +397,7 @@ public class TestStorageContainerStoreImpl {
@Test
public void testDeleteStreamNoRootStorageContainerStore() throws Exception {
- rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
+ rangeStore.getRegistry().stopStorageContainer(scId).join();
String colName = "test-delete-namespace-no-root-storage-container-store";
String streamName = colName;
@@ -417,7 +408,7 @@ public class TestStorageContainerStoreImpl {
@Test
public void testGetStreamNoRootStorageContainerStore() throws Exception {
- rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
+ rangeStore.getRegistry().stopStorageContainer(scId).join();
String colName = "test-get-namespace-no-root-storage-container-store";
String streamName = colName;
@@ -482,26 +473,24 @@ public class TestStorageContainerStoreImpl {
@Test
public void testGetActiveRangesNoManager() throws Exception {
- rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
+ rangeStore.getRegistry().stopStorageContainer(scId).join();
verifyNotFoundException(fromListenableFuture(
- metaRangeService.getActiveRanges(createGetActiveRangesRequest(12L, 34L))),
+ metaRangeService.getActiveRanges(createGetActiveRangesRequest(34L))),
Status.NOT_FOUND);
}
@Test
public void testGetActiveRangesMockManager() throws Exception {
- long scId = System.currentTimeMillis();
-
- StorageContainerResponse resp = StorageContainerResponse.newBuilder()
+ GetActiveRangesResponse resp = GetActiveRangesResponse.newBuilder()
.setCode(StatusCode.STREAM_NOT_FOUND)
.build();
- StorageContainerRequest request = createGetActiveRangesRequest(scId, 34L);
+ GetActiveRangesRequest request = createGetActiveRangesRequest(34L);
when(mockRangeStoreService.getActiveRanges(request))
.thenReturn(CompletableFuture.completedFuture(resp));
- CompletableFuture<StorageContainerResponse> future = fromListenableFuture(
+ CompletableFuture<GetActiveRangesResponse> future = fromListenableFuture(
metaRangeService.getActiveRanges(request));
verify(mockRangeStoreService, times(1)).getActiveRanges(request);
assertTrue(resp == future.get());
@@ -514,40 +503,40 @@ public class TestStorageContainerStoreImpl {
@Test
public void testPutNoStorageContainer() throws Exception {
- rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
+ rangeStore.getRegistry().stopStorageContainer(scId).join();
verifyNotFoundException(fromListenableFuture(
- tableService.put(createPutRequest(ROOT_STORAGE_CONTAINER_ID))),
+ tableService.put(createPutRequest())),
Status.NOT_FOUND);
}
@Test
public void testDeleteNoStorageContainer() throws Exception {
- rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
+ rangeStore.getRegistry().stopStorageContainer(scId).join();
verifyNotFoundException(fromListenableFuture(
- tableService.delete(createDeleteRequest(ROOT_STORAGE_CONTAINER_ID))),
+ tableService.delete(createDeleteRequest())),
Status.NOT_FOUND);
}
@Test
public void testRangeNoStorageContainer() throws Exception {
- rangeStore.getRegistry().stopStorageContainer(ROOT_STORAGE_CONTAINER_ID).join();
+ rangeStore.getRegistry().stopStorageContainer(scId).join();
verifyNotFoundException(fromListenableFuture(
- tableService.range(createRangeRequest(ROOT_STORAGE_CONTAINER_ID))),
+ tableService.range(createRangeRequest())),
Status.NOT_FOUND);
}
@Test
public void testRangeMockStorageContainer() throws Exception {
- StorageContainerResponse response = createRangeResponse(StatusCode.SUCCESS);
- StorageContainerRequest request = createRangeRequest(ROOT_STORAGE_CONTAINER_ID);
+ RangeResponse response = createRangeResponse(StatusCode.SUCCESS);
+ RangeRequest request = createRangeRequest();
when(mockRangeStoreService.range(request))
.thenReturn(CompletableFuture.completedFuture(response));
- CompletableFuture<StorageContainerResponse> future = fromListenableFuture(
+ CompletableFuture<RangeResponse> future = fromListenableFuture(
tableService.range(request));
verify(mockRangeStoreService, times(1)).range(eq(request));
assertTrue(response == future.get());
@@ -555,13 +544,13 @@ public class TestStorageContainerStoreImpl {
@Test
public void testDeleteMockStorageContainer() throws Exception {
- StorageContainerResponse response = createDeleteResponse(StatusCode.SUCCESS);
- StorageContainerRequest request = createDeleteRequest(ROOT_STORAGE_CONTAINER_ID);
+ DeleteRangeResponse response = createDeleteResponse(StatusCode.SUCCESS);
+ DeleteRangeRequest request = createDeleteRequest();
when(mockRangeStoreService.delete(request))
.thenReturn(CompletableFuture.completedFuture(response));
- CompletableFuture<StorageContainerResponse> future = fromListenableFuture(
+ CompletableFuture<DeleteRangeResponse> future = fromListenableFuture(
tableService.delete(request));
verify(mockRangeStoreService, times(1)).delete(eq(request));
assertTrue(response == future.get());
@@ -569,13 +558,13 @@ public class TestStorageContainerStoreImpl {
@Test
public void testPutMockStorageContainer() throws Exception {
- StorageContainerResponse response = createPutResponse(StatusCode.SUCCESS);
- StorageContainerRequest request = createPutRequest(ROOT_STORAGE_CONTAINER_ID);
+ PutResponse response = createPutResponse(StatusCode.SUCCESS);
+ PutRequest request = createPutRequest();
when(mockRangeStoreService.put(request))
.thenReturn(CompletableFuture.completedFuture(response));
- CompletableFuture<StorageContainerResponse> future = fromListenableFuture(
+ CompletableFuture<PutResponse> future = fromListenableFuture(
tableService.put(request));
verify(mockRangeStoreService, times(1)).put(eq(request));
assertTrue(response == future.get());
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcMetaRangeService.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcMetaRangeService.java
index 178ac19..3f87b9e 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcMetaRangeService.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcMetaRangeService.java
@@ -30,8 +30,6 @@ import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
import org.apache.bookkeeper.stream.proto.storage.StatusCode;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
import org.junit.Test;
@@ -51,23 +49,19 @@ public class TestGrpcMetaRangeService {
RangeStoreService rangeService = mock(RangeStoreService.class);
GrpcMetaRangeService grpcService = new GrpcMetaRangeService(rangeService);
- StorageContainerRequest request = StorageContainerRequest
+ GetActiveRangesRequest request = GetActiveRangesRequest
.newBuilder()
- .setGetActiveRangesReq(GetActiveRangesRequest
- .newBuilder()
- .setStreamId(23456L)
- .build())
+ .setStreamId(23456L)
.build();
- StorageContainerResponse response = StorageContainerResponse.newBuilder()
+ GetActiveRangesResponse response = GetActiveRangesResponse.newBuilder()
.setCode(StatusCode.SUCCESS)
- .setGetActiveRangesResp(GetActiveRangesResponse.newBuilder())
.build();
when(rangeService.getActiveRanges(request)).thenReturn(
CompletableFuture.completedFuture(response));
- TestResponseObserver<StorageContainerResponse> responseObserver =
+ TestResponseObserver<GetActiveRangesResponse> responseObserver =
new TestResponseObserver<>();
grpcService.getActiveRanges(
request,
@@ -82,22 +76,19 @@ public class TestGrpcMetaRangeService {
RangeStoreService rangeService = mock(RangeStoreService.class);
GrpcMetaRangeService grpcService = new GrpcMetaRangeService(rangeService);
- StorageContainerRequest request = StorageContainerRequest
+ GetActiveRangesRequest request = GetActiveRangesRequest
.newBuilder()
- .setGetActiveRangesReq(GetActiveRangesRequest
- .newBuilder()
- .setStreamId(23456L)
- .build())
+ .setStreamId(23456L)
.build();
- StorageContainerResponse response = StorageContainerResponse.newBuilder()
+ GetActiveRangesResponse response = GetActiveRangesResponse.newBuilder()
.setCode(StatusCode.INTERNAL_SERVER_ERROR)
.build();
when(rangeService.getActiveRanges(request)).thenReturn(
FutureUtils.exception(CAUSE));
- TestResponseObserver<StorageContainerResponse> responseObserver =
+ TestResponseObserver<GetActiveRangesResponse> responseObserver =
new TestResponseObserver<>();
grpcService.getActiveRanges(
request,
@@ -112,18 +103,15 @@ public class TestGrpcMetaRangeService {
RangeStoreService rangeService = mock(RangeStoreService.class);
GrpcMetaRangeService grpcService = new GrpcMetaRangeService(rangeService);
- StorageContainerRequest request = StorageContainerRequest
+ GetActiveRangesRequest request = GetActiveRangesRequest
.newBuilder()
- .setGetActiveRangesReq(GetActiveRangesRequest
- .newBuilder()
- .setStreamId(23456L)
- .build())
+ .setStreamId(23456L)
.build();
when(rangeService.getActiveRanges(request)).thenReturn(
FutureUtils.exception(new StatusRuntimeException(Status.NOT_FOUND)));
- TestResponseObserver<StorageContainerResponse> responseObserver =
+ TestResponseObserver<GetActiveRangesResponse> responseObserver =
new TestResponseObserver<>();
grpcService.getActiveRanges(
request,
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcTableService.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcTableService.java
index e943fb4..874e69b 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcTableService.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/TestGrpcTableService.java
@@ -34,10 +34,9 @@ import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
import org.apache.bookkeeper.stream.proto.kv.rpc.PutResponse;
import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.ResponseHeader;
import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader;
import org.apache.bookkeeper.stream.proto.storage.StatusCode;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
import org.junit.Test;
@@ -66,24 +65,24 @@ public class TestGrpcTableService {
RangeStoreService rangeService = mock(RangeStoreService.class);
GrpcTableService grpcService = new GrpcTableService(rangeService);
- StorageContainerRequest request = StorageContainerRequest
+ PutRequest request = PutRequest
.newBuilder()
- .setKvPutReq(PutRequest
- .newBuilder()
- .setKey(TEST_KEY)
- .setValue(TEST_VAL)
- .setHeader(ROUTING_HEADER))
+ .setKey(TEST_KEY)
+ .setValue(TEST_VAL)
+ .setHeader(ROUTING_HEADER)
.build();
- StorageContainerResponse response = StorageContainerResponse.newBuilder()
- .setCode(StatusCode.SUCCESS)
- .setKvPutResp(PutResponse.newBuilder())
+ PutResponse response = PutResponse.newBuilder()
+ .setHeader(ResponseHeader.newBuilder()
+ .setCode(StatusCode.SUCCESS)
+ .setRoutingHeader(ROUTING_HEADER)
+ .build())
.build();
when(rangeService.put(request)).thenReturn(
CompletableFuture.completedFuture(response));
- TestResponseObserver<StorageContainerResponse> responseObserver =
+ TestResponseObserver<PutResponse> responseObserver =
new TestResponseObserver<>();
grpcService.put(
request,
@@ -98,23 +97,24 @@ public class TestGrpcTableService {
RangeStoreService rangeService = mock(RangeStoreService.class);
GrpcTableService grpcService = new GrpcTableService(rangeService);
- StorageContainerRequest request = StorageContainerRequest
+ PutRequest request = PutRequest
.newBuilder()
- .setKvPutReq(PutRequest
- .newBuilder()
- .setKey(TEST_KEY)
- .setValue(TEST_VAL)
- .setHeader(ROUTING_HEADER))
+ .setKey(TEST_KEY)
+ .setValue(TEST_VAL)
+ .setHeader(ROUTING_HEADER)
.build();
- StorageContainerResponse response = StorageContainerResponse.newBuilder()
- .setCode(StatusCode.INTERNAL_SERVER_ERROR)
+ PutResponse response = PutResponse.newBuilder()
+ .setHeader(ResponseHeader.newBuilder()
+ .setCode(StatusCode.INTERNAL_SERVER_ERROR)
+ .setRoutingHeader(ROUTING_HEADER)
+ .build())
.build();
when(rangeService.put(request)).thenReturn(
FutureUtils.exception(CAUSE));
- TestResponseObserver<StorageContainerResponse> responseObserver =
+ TestResponseObserver<PutResponse> responseObserver =
new TestResponseObserver<>();
grpcService.put(
request,
@@ -129,19 +129,17 @@ public class TestGrpcTableService {
RangeStoreService rangeService = mock(RangeStoreService.class);
GrpcTableService grpcService = new GrpcTableService(rangeService);
- StorageContainerRequest request = StorageContainerRequest
+ PutRequest request = PutRequest
.newBuilder()
- .setKvPutReq(PutRequest
- .newBuilder()
- .setKey(TEST_KEY)
- .setValue(TEST_VAL)
- .setHeader(ROUTING_HEADER))
+ .setKey(TEST_KEY)
+ .setValue(TEST_VAL)
+ .setHeader(ROUTING_HEADER)
.build();
when(rangeService.put(request)).thenReturn(
FutureUtils.exception(new StatusRuntimeException(Status.NOT_FOUND)));
- TestResponseObserver<StorageContainerResponse> responseObserver =
+ TestResponseObserver<PutResponse> responseObserver =
new TestResponseObserver<>();
grpcService.put(
request,
@@ -156,23 +154,23 @@ public class TestGrpcTableService {
RangeStoreService rangeService = mock(RangeStoreService.class);
GrpcTableService grpcService = new GrpcTableService(rangeService);
- StorageContainerRequest request = StorageContainerRequest
+ RangeRequest request = RangeRequest
.newBuilder()
- .setKvRangeReq(RangeRequest
- .newBuilder()
- .setKey(TEST_KEY)
- .setHeader(ROUTING_HEADER))
+ .setKey(TEST_KEY)
+ .setHeader(ROUTING_HEADER)
.build();
- StorageContainerResponse response = StorageContainerResponse.newBuilder()
- .setCode(StatusCode.SUCCESS)
- .setKvRangeResp(RangeResponse.newBuilder())
+ RangeResponse response = RangeResponse.newBuilder()
+ .setHeader(ResponseHeader.newBuilder()
+ .setCode(StatusCode.SUCCESS)
+ .setRoutingHeader(ROUTING_HEADER)
+ .build())
.build();
when(rangeService.range(request)).thenReturn(
CompletableFuture.completedFuture(response));
- TestResponseObserver<StorageContainerResponse> responseObserver =
+ TestResponseObserver<RangeResponse> responseObserver =
new TestResponseObserver<>();
grpcService.range(
request,
@@ -187,22 +185,23 @@ public class TestGrpcTableService {
RangeStoreService rangeService = mock(RangeStoreService.class);
GrpcTableService grpcService = new GrpcTableService(rangeService);
- StorageContainerRequest request = StorageContainerRequest
+ RangeRequest request = RangeRequest
.newBuilder()
- .setKvRangeReq(RangeRequest
- .newBuilder()
- .setKey(TEST_KEY)
- .setHeader(ROUTING_HEADER))
+ .setKey(TEST_KEY)
+ .setHeader(ROUTING_HEADER)
.build();
- StorageContainerResponse response = StorageContainerResponse.newBuilder()
- .setCode(StatusCode.INTERNAL_SERVER_ERROR)
+ RangeResponse response = RangeResponse.newBuilder()
+ .setHeader(ResponseHeader.newBuilder()
+ .setCode(StatusCode.INTERNAL_SERVER_ERROR)
+ .setRoutingHeader(ROUTING_HEADER)
+ .build())
.build();
when(rangeService.range(request)).thenReturn(
FutureUtils.exception(CAUSE));
- TestResponseObserver<StorageContainerResponse> responseObserver =
+ TestResponseObserver<RangeResponse> responseObserver =
new TestResponseObserver<>();
grpcService.range(
request,
@@ -217,18 +216,16 @@ public class TestGrpcTableService {
RangeStoreService rangeService = mock(RangeStoreService.class);
GrpcTableService grpcService = new GrpcTableService(rangeService);
- StorageContainerRequest request = StorageContainerRequest
+ RangeRequest request = RangeRequest
.newBuilder()
- .setKvRangeReq(RangeRequest
- .newBuilder()
- .setKey(TEST_KEY)
- .setHeader(ROUTING_HEADER))
+ .setKey(TEST_KEY)
+ .setHeader(ROUTING_HEADER)
.build();
when(rangeService.range(request)).thenReturn(
FutureUtils.exception(new StatusRuntimeException(Status.NOT_FOUND)));
- TestResponseObserver<StorageContainerResponse> responseObserver =
+ TestResponseObserver<RangeResponse> responseObserver =
new TestResponseObserver<>();
grpcService.range(
request,
@@ -243,23 +240,23 @@ public class TestGrpcTableService {
RangeStoreService rangeService = mock(RangeStoreService.class);
GrpcTableService grpcService = new GrpcTableService(rangeService);
- StorageContainerRequest request = StorageContainerRequest
+ DeleteRangeRequest request = DeleteRangeRequest
.newBuilder()
- .setKvDeleteReq(DeleteRangeRequest
- .newBuilder()
- .setKey(TEST_KEY)
- .setHeader(ROUTING_HEADER))
+ .setKey(TEST_KEY)
+ .setHeader(ROUTING_HEADER)
.build();
- StorageContainerResponse response = StorageContainerResponse.newBuilder()
- .setCode(StatusCode.SUCCESS)
- .setKvDeleteResp(DeleteRangeResponse.newBuilder())
+ DeleteRangeResponse response = DeleteRangeResponse.newBuilder()
+ .setHeader(ResponseHeader.newBuilder()
+ .setCode(StatusCode.SUCCESS)
+ .setRoutingHeader(ROUTING_HEADER)
+ .build())
.build();
when(rangeService.delete(request)).thenReturn(
CompletableFuture.completedFuture(response));
- TestResponseObserver<StorageContainerResponse> responseObserver =
+ TestResponseObserver<DeleteRangeResponse> responseObserver =
new TestResponseObserver<>();
grpcService.delete(
request,
@@ -274,22 +271,23 @@ public class TestGrpcTableService {
RangeStoreService rangeService = mock(RangeStoreService.class);
GrpcTableService grpcService = new GrpcTableService(rangeService);
- StorageContainerRequest request = StorageContainerRequest
+ DeleteRangeRequest request = DeleteRangeRequest
.newBuilder()
- .setKvDeleteReq(DeleteRangeRequest
- .newBuilder()
- .setKey(TEST_KEY)
- .setHeader(ROUTING_HEADER))
+ .setKey(TEST_KEY)
+ .setHeader(ROUTING_HEADER)
.build();
- StorageContainerResponse response = StorageContainerResponse.newBuilder()
- .setCode(StatusCode.INTERNAL_SERVER_ERROR)
+ DeleteRangeResponse response = DeleteRangeResponse.newBuilder()
+ .setHeader(ResponseHeader.newBuilder()
+ .setCode(StatusCode.INTERNAL_SERVER_ERROR)
+ .setRoutingHeader(ROUTING_HEADER)
+ .build())
.build();
when(rangeService.delete(request)).thenReturn(
FutureUtils.exception(CAUSE));
- TestResponseObserver<StorageContainerResponse> responseObserver =
+ TestResponseObserver<DeleteRangeResponse> responseObserver =
new TestResponseObserver<>();
grpcService.delete(
request,
@@ -304,18 +302,16 @@ public class TestGrpcTableService {
RangeStoreService rangeService = mock(RangeStoreService.class);
GrpcTableService grpcService = new GrpcTableService(rangeService);
- StorageContainerRequest request = StorageContainerRequest
+ DeleteRangeRequest request = DeleteRangeRequest
.newBuilder()
- .setKvDeleteReq(DeleteRangeRequest
- .newBuilder()
- .setKey(TEST_KEY)
- .setHeader(ROUTING_HEADER))
+ .setKey(TEST_KEY)
+ .setHeader(ROUTING_HEADER)
.build();
when(rangeService.delete(request)).thenReturn(
FutureUtils.exception(new StatusRuntimeException(Status.NOT_FOUND)));
- TestResponseObserver<StorageContainerResponse> responseObserver =
+ TestResponseObserver<DeleteRangeResponse> responseObserver =
new TestResponseObserver<>();
grpcService.delete(
request,
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/TestStorageContainerResponseHandler.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/TestStorageContainerResponseHandler.java
deleted file mode 100644
index 3eb440a..0000000
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/grpc/handler/TestStorageContainerResponseHandler.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.bookkeeper.stream.storage.impl.grpc.handler;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import io.grpc.Status;
-import io.grpc.StatusException;
-import io.grpc.StatusRuntimeException;
-import io.grpc.stub.StreamObserver;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.bookkeeper.stream.proto.storage.StatusCode;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
-import org.apache.bookkeeper.stream.storage.exceptions.StorageException;
-import org.junit.Test;
-import org.mockito.stubbing.Answer;
-
-/**
- * Unit test for {@link StorageContainerResponseHandler}.
- */
-public class TestStorageContainerResponseHandler {
-
- @SuppressWarnings("unchecked")
- @Test
- public void testSuccessResponse() {
- StreamObserver<StorageContainerResponse> observer =
- mock(StreamObserver.class);
- StorageContainerResponseHandler handler = StorageContainerResponseHandler.of(observer);
- StorageContainerResponse response = StorageContainerResponse.newBuilder()
- .setCode(StatusCode.SUCCESS)
- .build();
- handler.accept(response, null);
- verify(observer, times(1)).onNext(response);
- verify(observer, times(1)).onCompleted();
- verify(observer, times(0)).onError(any());
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testStatusRuntimeException() {
- StreamObserver<StorageContainerResponse> observer =
- mock(StreamObserver.class);
- StorageContainerResponseHandler handler = StorageContainerResponseHandler.of(observer);
- StatusRuntimeException exception = new StatusRuntimeException(Status.NOT_FOUND);
- handler.accept(null, exception);
- verify(observer, times(0)).onNext(any());
- verify(observer, times(0)).onCompleted();
- verify(observer, times(1)).onError(exception);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testStatusException() {
- StreamObserver<StorageContainerResponse> observer =
- mock(StreamObserver.class);
- StorageContainerResponseHandler handler = StorageContainerResponseHandler.of(observer);
- StatusException exception = new StatusException(Status.NOT_FOUND);
- handler.accept(null, exception);
- verify(observer, times(0)).onNext(any());
- verify(observer, times(0)).onCompleted();
- verify(observer, times(1)).onError(exception);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testInternalError() throws Exception {
- StreamObserver<StorageContainerResponse> observer =
- mock(StreamObserver.class);
- AtomicReference<StorageContainerResponse> responseHolder =
- new AtomicReference<>(null);
- CountDownLatch latch = new CountDownLatch(1);
- doAnswer((Answer<Void>) invocation -> {
- StorageContainerResponse resp = invocation.getArgument(0);
- responseHolder.set(resp);
- latch.countDown();
- return null;
- }).when(observer).onNext(any(StorageContainerResponse.class));
- StorageContainerResponseHandler handler = StorageContainerResponseHandler.of(observer);
- StorageException exception = new StorageException("test-exception");
- handler.accept(null, exception);
- verify(observer, times(1)).onNext(any());
- verify(observer, times(1)).onCompleted();
- verify(observer, times(0)).onError(any());
-
- latch.await();
- assertNotNull(responseHolder.get());
- assertEquals(
- StatusCode.INTERNAL_SERVER_ERROR,
- responseHolder.get().getCode());
- }
-
-}
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/kv/TableStoreImplTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/kv/TableStoreImplTest.java
index f7b88fc..2c7124f 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/kv/TableStoreImplTest.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/kv/TableStoreImplTest.java
@@ -46,9 +46,6 @@ import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader;
import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
import org.apache.bookkeeper.stream.proto.kv.rpc.TxnResponse;
import org.apache.bookkeeper.stream.proto.storage.StatusCode;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse.ResponseCase;
import org.apache.bookkeeper.stream.storage.impl.store.MVCCAsyncStoreTestBase;
import org.junit.Test;
@@ -90,16 +87,14 @@ public class TableStoreImplTest extends MVCCAsyncStoreTestBase {
}
private List<KeyValue> writeKVs(int numPairs, boolean prevKv) throws Exception {
- List<CompletableFuture<StorageContainerResponse>> results =
+ List<CompletableFuture<PutResponse>> results =
Lists.newArrayListWithExpectedSize(numPairs);
for (int i = 0; i < numPairs; i++) {
results.add(writeKV(i, prevKv));
}
return Lists.transform(
- result(FutureUtils.collect(results)), response -> {
- assertEquals(StatusCode.SUCCESS, response.getCode());
- assertEquals(ResponseCase.KV_PUT_RESP, response.getResponseCase());
- PutResponse putResp = response.getKvPutResp();
+ result(FutureUtils.collect(results)), putResp -> {
+ assertEquals(StatusCode.SUCCESS, putResp.getHeader().getCode());
assertEquals(HEADER, putResp.getHeader().getRoutingHeader());
if (putResp.hasPrevKv()) {
return putResp.getPrevKv();
@@ -109,33 +104,26 @@ public class TableStoreImplTest extends MVCCAsyncStoreTestBase {
});
}
- private CompletableFuture<StorageContainerResponse> writeKV(int i, boolean prevKv) {
- return tableStore.put(StorageContainerRequest.newBuilder()
- .setScId(SC_ID)
- .setKvPutReq(PutRequest.newBuilder()
- .setKey(getKey(i))
- .setValue(getValue(i))
- .setHeader(HEADER)
- .setPrevKv(prevKv))
+ private CompletableFuture<PutResponse> writeKV(int i, boolean prevKv) {
+ return tableStore.put(PutRequest.newBuilder()
+ .setKey(getKey(i))
+ .setValue(getValue(i))
+ .setHeader(HEADER)
+ .setPrevKv(prevKv)
.build());
}
- StorageContainerResponse getKeyFromTableStore(int i) throws Exception {
+ RangeResponse getKeyFromTableStore(int i) throws Exception {
return result(
- tableStore.range(StorageContainerRequest.newBuilder()
- .setScId(SC_ID)
- .setKvRangeReq(RangeRequest.newBuilder()
- .setHeader(HEADER)
- .setKey(getKey(i))
- .build())
+ tableStore.range(RangeRequest.newBuilder()
+ .setHeader(HEADER)
+ .setKey(getKey(i))
.build()));
}
KeyValue getKeyValue(int i) throws Exception {
- StorageContainerResponse resp = getKeyFromTableStore(i);
- assertEquals(StatusCode.SUCCESS, resp.getCode());
- assertEquals(ResponseCase.KV_RANGE_RESP, resp.getResponseCase());
- RangeResponse rr = resp.getKvRangeResp();
+ RangeResponse rr = getKeyFromTableStore(i);
+ assertEquals(StatusCode.SUCCESS, rr.getHeader().getCode());
assertEquals(HEADER, rr.getHeader().getRoutingHeader());
assertFalse(rr.getMore());
if (0 == rr.getCount()) {
@@ -146,53 +134,43 @@ public class TableStoreImplTest extends MVCCAsyncStoreTestBase {
}
void putKeyToTableStore(int key, int value) throws Exception {
- StorageContainerResponse response = result(
- tableStore.put(StorageContainerRequest.newBuilder()
- .setScId(SC_ID)
- .setKvPutReq(PutRequest.newBuilder()
- .setHeader(HEADER)
- .setKey(getKey(key))
- .setValue(getValue(value))
- .build())
+ PutResponse putResp = result(
+ tableStore.put(PutRequest.newBuilder()
+ .setHeader(HEADER)
+ .setKey(getKey(key))
+ .setValue(getValue(value))
.build()));
- assertEquals(StatusCode.SUCCESS, response.getCode());
- assertEquals(ResponseCase.KV_PUT_RESP, response.getResponseCase());
- PutResponse putResp = response.getKvPutResp();
+ assertEquals(StatusCode.SUCCESS, putResp.getHeader().getCode());
assertEquals(HEADER, putResp.getHeader().getRoutingHeader());
assertFalse(putResp.hasPrevKv());
}
KeyValue putIfAbsentToTableStore(int key, int value, boolean expectedSuccess) throws Exception {
- StorageContainerResponse response = result(
- tableStore.txn(StorageContainerRequest.newBuilder()
- .setScId(SC_ID)
- .setKvTxnReq(TxnRequest.newBuilder()
- .setHeader(HEADER)
- .addCompare(Compare.newBuilder()
- .setResult(CompareResult.EQUAL)
- .setTarget(CompareTarget.VALUE)
+ TxnResponse txnResp = result(
+ tableStore.txn(TxnRequest.newBuilder()
+ .setHeader(HEADER)
+ .addCompare(Compare.newBuilder()
+ .setResult(CompareResult.EQUAL)
+ .setTarget(CompareTarget.VALUE)
+ .setKey(getKey(key))
+ .setValue(ByteString.copyFrom(new byte[0])))
+ .addSuccess(RequestOp.newBuilder()
+ .setRequestPut(PutRequest.newBuilder()
+ .setHeader(HEADER)
.setKey(getKey(key))
- .setValue(ByteString.copyFrom(new byte[0])))
- .addSuccess(RequestOp.newBuilder()
- .setRequestPut(PutRequest.newBuilder()
- .setHeader(HEADER)
- .setKey(getKey(key))
- .setValue(getValue(value))
- .setPrevKv(true)
- .build()))
- .addFailure(RequestOp.newBuilder()
- .setRequestRange(RangeRequest.newBuilder()
- .setHeader(HEADER)
- .setKey(getKey(key))
- .build()))
- .build())
+ .setValue(getValue(value))
+ .setPrevKv(true)
+ .build()))
+ .addFailure(RequestOp.newBuilder()
+ .setRequestRange(RangeRequest.newBuilder()
+ .setHeader(HEADER)
+ .setKey(getKey(key))
+ .build()))
.build()));
- assertEquals(ResponseCase.KV_TXN_RESP, response.getResponseCase());
- TxnResponse txnResp = response.getKvTxnResp();
assertEquals(HEADER, txnResp.getHeader().getRoutingHeader());
- assertEquals(StatusCode.SUCCESS, response.getCode());
+ assertEquals(StatusCode.SUCCESS, txnResp.getHeader().getCode());
ResponseOp respOp = txnResp.getResponses(0);
if (expectedSuccess) {
@@ -215,39 +193,34 @@ public class TableStoreImplTest extends MVCCAsyncStoreTestBase {
}
}
- StorageContainerResponse vPutToTableStore(int key, int value, long version)
+ TxnResponse vPutToTableStore(int key, int value, long version)
throws Exception {
return result(
- tableStore.txn(StorageContainerRequest.newBuilder()
- .setScId(SC_ID)
- .setKvTxnReq(TxnRequest.newBuilder()
- .setHeader(HEADER)
- .addCompare(Compare.newBuilder()
- .setResult(CompareResult.EQUAL)
- .setTarget(CompareTarget.VERSION)
+ tableStore.txn(TxnRequest.newBuilder()
+ .setHeader(HEADER)
+ .addCompare(Compare.newBuilder()
+ .setResult(CompareResult.EQUAL)
+ .setTarget(CompareTarget.VERSION)
+ .setKey(getKey(key))
+ .setVersion(version))
+ .addSuccess(RequestOp.newBuilder()
+ .setRequestPut(PutRequest.newBuilder()
+ .setHeader(HEADER)
+ .setKey(getKey(key))
+ .setValue(getValue(value))
+ .setPrevKv(true)
+ .build()))
+ .addFailure(RequestOp.newBuilder()
+ .setRequestRange(RangeRequest.newBuilder()
+ .setHeader(HEADER)
.setKey(getKey(key))
- .setVersion(version))
- .addSuccess(RequestOp.newBuilder()
- .setRequestPut(PutRequest.newBuilder()
- .setHeader(HEADER)
- .setKey(getKey(key))
- .setValue(getValue(value))
- .setPrevKv(true)
- .build()))
- .addFailure(RequestOp.newBuilder()
- .setRequestRange(RangeRequest.newBuilder()
- .setHeader(HEADER)
- .setKey(getKey(key))
- .build()))
- .build())
+ .build()))
.build()));
}
- KeyValue verifyVPutResponse(StorageContainerResponse response, boolean expectedSuccess) throws Exception {
- assertEquals(ResponseCase.KV_TXN_RESP, response.getResponseCase());
- TxnResponse txnResp = response.getKvTxnResp();
+ KeyValue verifyVPutResponse(TxnResponse txnResp, boolean expectedSuccess) throws Exception {
assertEquals(HEADER, txnResp.getHeader().getRoutingHeader());
- assertEquals(StatusCode.SUCCESS, response.getCode());
+ assertEquals(StatusCode.SUCCESS, txnResp.getHeader().getCode());
ResponseOp respOp = txnResp.getResponses(0);
if (expectedSuccess) {
@@ -270,89 +243,71 @@ public class TableStoreImplTest extends MVCCAsyncStoreTestBase {
}
}
- StorageContainerResponse rPutToTableStore(int key, int value, long revision)
+ TxnResponse rPutToTableStore(int key, int value, long revision)
throws Exception {
return result(
- tableStore.txn(StorageContainerRequest.newBuilder()
- .setScId(SC_ID)
- .setKvTxnReq(TxnRequest.newBuilder()
- .setHeader(HEADER)
- .addCompare(Compare.newBuilder()
- .setResult(CompareResult.EQUAL)
- .setTarget(CompareTarget.MOD)
+ tableStore.txn(TxnRequest.newBuilder()
+ .setHeader(HEADER)
+ .addCompare(Compare.newBuilder()
+ .setResult(CompareResult.EQUAL)
+ .setTarget(CompareTarget.MOD)
+ .setKey(getKey(key))
+ .setModRevision(revision))
+ .addSuccess(RequestOp.newBuilder()
+ .setRequestPut(PutRequest.newBuilder()
+ .setHeader(HEADER)
+ .setKey(getKey(key))
+ .setValue(getValue(value))
+ .setPrevKv(true)
+ .build()))
+ .addFailure(RequestOp.newBuilder()
+ .setRequestRange(RangeRequest.newBuilder()
+ .setHeader(HEADER)
.setKey(getKey(key))
- .setModRevision(revision))
- .addSuccess(RequestOp.newBuilder()
- .setRequestPut(PutRequest.newBuilder()
- .setHeader(HEADER)
- .setKey(getKey(key))
- .setValue(getValue(value))
- .setPrevKv(true)
- .build()))
- .addFailure(RequestOp.newBuilder()
- .setRequestRange(RangeRequest.newBuilder()
- .setHeader(HEADER)
- .setKey(getKey(key))
- .build()))
- .build())
+ .build()))
.build()));
}
KeyValue deleteKeyFromTableStore(int key) throws Exception {
- StorageContainerResponse response = result(
- tableStore.delete(StorageContainerRequest.newBuilder()
- .setScId(SC_ID)
- .setKvDeleteReq(DeleteRangeRequest.newBuilder()
- .setHeader(HEADER)
- .setKey(getKey(key))
- .setPrevKv(true)
- .build())
+ DeleteRangeResponse response = result(
+ tableStore.delete(DeleteRangeRequest.newBuilder()
+ .setHeader(HEADER)
+ .setKey(getKey(key))
+ .setPrevKv(true)
.build()));
- assertEquals(StatusCode.SUCCESS, response.getCode());
- assertEquals(ResponseCase.KV_DELETE_RESP, response.getResponseCase());
- DeleteRangeResponse delResp = response.getKvDeleteResp();
- assertEquals(HEADER, delResp.getHeader().getRoutingHeader());
- if (0 == delResp.getPrevKvsCount()) {
+ assertEquals(StatusCode.SUCCESS, response.getHeader().getCode());
+ assertEquals(HEADER, response.getHeader().getRoutingHeader());
+ if (0 == response.getPrevKvsCount()) {
return null;
} else {
- return delResp.getPrevKvs(0);
+ return response.getPrevKvs(0);
}
}
List<KeyValue> deleteRange(int startKey, int endKey) throws Exception {
- StorageContainerResponse response = result(
- tableStore.delete(StorageContainerRequest.newBuilder()
- .setScId(SC_ID)
- .setKvDeleteReq(DeleteRangeRequest.newBuilder()
- .setHeader(HEADER)
- .setKey(getKey(startKey))
- .setRangeEnd(getKey(endKey))
- .setPrevKv(true)
- .build())
+ DeleteRangeResponse delResp = result(
+ tableStore.delete(DeleteRangeRequest.newBuilder()
+ .setHeader(HEADER)
+ .setKey(getKey(startKey))
+ .setRangeEnd(getKey(endKey))
+ .setPrevKv(true)
.build()));
- assertEquals(StatusCode.SUCCESS, response.getCode());
- assertEquals(ResponseCase.KV_DELETE_RESP, response.getResponseCase());
- DeleteRangeResponse delResp = response.getKvDeleteResp();
+ assertEquals(StatusCode.SUCCESS, delResp.getHeader().getCode());
assertEquals(HEADER, delResp.getHeader().getRoutingHeader());
return delResp.getPrevKvsList();
}
List<KeyValue> range(int startKey, int endKey) throws Exception {
- StorageContainerResponse response = result(
- tableStore.range(StorageContainerRequest.newBuilder()
- .setScId(SC_ID)
- .setKvRangeReq(RangeRequest.newBuilder()
- .setHeader(HEADER)
- .setKey(getKey(startKey))
- .setRangeEnd(getKey(endKey))
- .build())
+ RangeResponse rangeResp = result(
+ tableStore.range(RangeRequest.newBuilder()
+ .setHeader(HEADER)
+ .setKey(getKey(startKey))
+ .setRangeEnd(getKey(endKey))
.build()));
- assertEquals(StatusCode.SUCCESS, response.getCode());
- assertEquals(ResponseCase.KV_RANGE_RESP, response.getResponseCase());
- RangeResponse rangeResp = response.getKvRangeResp();
+ assertEquals(StatusCode.SUCCESS, rangeResp.getHeader().getCode());
assertEquals(HEADER, rangeResp.getHeader().getRoutingHeader());
return rangeResp.getKvsList();
}
@@ -398,18 +353,18 @@ public class TableStoreImplTest extends MVCCAsyncStoreTestBase {
int key = 2;
int initialVal = 2;
int casVal = 99;
- StorageContainerResponse response = vPutToTableStore(key, initialVal, 100L);
- assertEquals(StatusCode.KEY_NOT_FOUND, response.getCode());
+ TxnResponse response = vPutToTableStore(key, initialVal, 100L);
+ assertEquals(StatusCode.KEY_NOT_FOUND, response.getHeader().getCode());
// vPut(k, v, -1L)
response = vPutToTableStore(key, initialVal, -1L);
- assertEquals(StatusCode.KEY_NOT_FOUND, response.getCode());
+ assertEquals(StatusCode.KEY_NOT_FOUND, response.getHeader().getCode());
// put(key2, v)
KeyValue prevKV = putIfAbsentToTableStore(key, initialVal, true);
assertNull(prevKV);
// vPut(key2, v, 0)
response = vPutToTableStore(key, casVal, 0);
- assertEquals(StatusCode.SUCCESS, response.getCode());
+ assertEquals(StatusCode.SUCCESS, response.getHeader().getCode());
prevKV = verifyVPutResponse(response, true);
assertNotNull(prevKV);
assertEquals(getKey(key), prevKV.getKey());
@@ -428,12 +383,12 @@ public class TableStoreImplTest extends MVCCAsyncStoreTestBase {
int initialVal = 3;
int casVal = 99;
- StorageContainerResponse response = rPutToTableStore(key, initialVal, 100L);
- assertEquals(StatusCode.KEY_NOT_FOUND, response.getCode());
+ TxnResponse response = rPutToTableStore(key, initialVal, 100L);
+ assertEquals(StatusCode.KEY_NOT_FOUND, response.getHeader().getCode());
// rPut(k, v, -1L)
response = rPutToTableStore(key, initialVal, -1L);
- assertEquals(StatusCode.KEY_NOT_FOUND, response.getCode());
+ assertEquals(StatusCode.KEY_NOT_FOUND, response.getHeader().getCode());
// put(key2, v)
KeyValue prevKV = putIfAbsentToTableStore(key, initialVal, true);
@@ -445,7 +400,7 @@ public class TableStoreImplTest extends MVCCAsyncStoreTestBase {
// rPut(key2, v, 0)
response = rPutToTableStore(key, casVal, revision);
- assertEquals(StatusCode.SUCCESS, response.getCode());
+ assertEquals(StatusCode.SUCCESS, response.getHeader().getCode());
kv = getKeyValue(key);
assertEquals(revision + 1, kv.getModRevision());
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImplTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImplTest.java
index d2ed9b6..d0156c5 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImplTest.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/metadata/MetaRangeStoreImplTest.java
@@ -36,8 +36,6 @@ import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
import org.apache.bookkeeper.stream.proto.storage.RelatedRanges;
import org.apache.bookkeeper.stream.proto.storage.StatusCode;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
import org.apache.bookkeeper.stream.storage.impl.metadata.stream.MetaRangeImpl;
import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerPlacementPolicyImpl;
import org.apache.bookkeeper.stream.storage.impl.store.MVCCAsyncStoreTestBase;
@@ -69,21 +67,18 @@ public class MetaRangeStoreImplTest extends MVCCAsyncStoreTestBase {
protected void doTeardown() throws Exception {
}
- StorageContainerRequest createRequest(StreamProperties streamProperties) {
+ GetActiveRangesRequest createRequest(StreamProperties streamProperties) {
GetActiveRangesRequest.Builder reqBuilder = GetActiveRangesRequest.newBuilder()
.setStreamId(this.streamProps.getStreamId());
if (null != streamProperties) {
reqBuilder = reqBuilder.setStreamProps(streamProperties);
}
- return StorageContainerRequest.newBuilder()
- .setGetActiveRangesReq(reqBuilder)
- .setScId(1L)
- .build();
+ return reqBuilder.build();
}
@Test
public void testCreateIfMissingPropsNotSpecified() throws Exception {
- StorageContainerResponse resp = FutureUtils.result(
+ GetActiveRangesResponse resp = FutureUtils.result(
this.mrStoreImpl.getActiveRanges(createRequest(null)));
assertEquals(StatusCode.STREAM_NOT_FOUND, resp.getCode());
@@ -91,13 +86,11 @@ public class MetaRangeStoreImplTest extends MVCCAsyncStoreTestBase {
@Test
public void testCreateIfMissing() throws Exception {
- StorageContainerResponse resp = FutureUtils.result(
+ GetActiveRangesResponse resp = FutureUtils.result(
this.mrStoreImpl.getActiveRanges(createRequest(streamProps)));
assertEquals(StatusCode.SUCCESS, resp.getCode());
- GetActiveRangesResponse getResp = resp.getGetActiveRangesResp();
-
- verifyGetResponse(getResp);
+ verifyGetResponse(resp);
}
private void verifyGetResponse(GetActiveRangesResponse getResp) throws Exception {
@@ -222,22 +215,19 @@ public class MetaRangeStoreImplTest extends MVCCAsyncStoreTestBase {
@Test
public void testGetTwice() throws Exception {
-
- StorageContainerResponse resp = FutureUtils.result(
+ GetActiveRangesResponse resp = FutureUtils.result(
this.mrStoreImpl.getActiveRanges(createRequest(streamProps)));
assertEquals(StatusCode.SUCCESS, resp.getCode());
- GetActiveRangesResponse getResp = resp.getGetActiveRangesResp();
- verifyGetResponse(getResp);
+ verifyGetResponse(resp);
- StorageContainerResponse secondResp = FutureUtils.result(
+ GetActiveRangesResponse secondResp = FutureUtils.result(
this.mrStoreImpl.getActiveRanges(createRequest(streamProps)));
assertEquals(StatusCode.SUCCESS, secondResp.getCode());
- GetActiveRangesResponse secondGetResp = resp.getGetActiveRangesResp();
- verifyGetResponse(secondGetResp);
+ verifyGetResponse(secondResp);
}
}
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImplTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImplTest.java
index a4d8710..2cbb27f 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImplTest.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImplTest.java
@@ -17,10 +17,6 @@
*/
package org.apache.bookkeeper.stream.storage.impl.service;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_DELETE_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_PUT_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_RANGE_REQ;
-import static org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase.KV_TXN_REQ;
import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.CONTAINER_META_RANGE_ID;
import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.CONTAINER_META_STREAM_ID;
import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.ROOT_RANGE_ID;
@@ -40,11 +36,15 @@ import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.statelib.api.mvcc.MVCCAsyncStore;
import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeResponse;
import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.PutResponse;
import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader;
import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnResponse;
import org.apache.bookkeeper.stream.proto.storage.CreateNamespaceRequest;
import org.apache.bookkeeper.stream.proto.storage.CreateNamespaceResponse;
import org.apache.bookkeeper.stream.proto.storage.CreateStreamRequest;
@@ -53,13 +53,12 @@ import org.apache.bookkeeper.stream.proto.storage.DeleteNamespaceRequest;
import org.apache.bookkeeper.stream.proto.storage.DeleteNamespaceResponse;
import org.apache.bookkeeper.stream.proto.storage.DeleteStreamRequest;
import org.apache.bookkeeper.stream.proto.storage.DeleteStreamResponse;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
+import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
import org.apache.bookkeeper.stream.proto.storage.GetNamespaceRequest;
import org.apache.bookkeeper.stream.proto.storage.GetNamespaceResponse;
import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest;
import org.apache.bookkeeper.stream.proto.storage.GetStreamResponse;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerRequest.RequestCase;
-import org.apache.bookkeeper.stream.proto.storage.StorageContainerResponse;
import org.apache.bookkeeper.stream.protocol.RangeId;
import org.apache.bookkeeper.stream.storage.api.kv.TableStore;
import org.apache.bookkeeper.stream.storage.api.metadata.MetaRangeStore;
@@ -322,11 +321,11 @@ public class RangeStoreServiceImplTest {
public void testGetActiveRanges() throws Exception {
mockStorageContainer(SCID);
- StorageContainerResponse expectedResp = StorageContainerResponse.getDefaultInstance();
- when(mrStore.getActiveRanges(any(StorageContainerRequest.class)))
+ GetActiveRangesResponse expectedResp = GetActiveRangesResponse.getDefaultInstance();
+ when(mrStore.getActiveRanges(any(GetActiveRangesRequest.class)))
.thenReturn(FutureUtils.value(expectedResp));
- StorageContainerRequest expectedReq = StorageContainerRequest.getDefaultInstance();
+ GetActiveRangesRequest expectedReq = GetActiveRangesRequest.getDefaultInstance();
assertSame(
expectedResp,
FutureUtils.result(mrStore.getActiveRanges(expectedReq)));
@@ -338,50 +337,66 @@ public class RangeStoreServiceImplTest {
// Table API
//
- private StorageContainerRequest newStorageContainerRequest(RequestCase type) {
- StorageContainerRequest.Builder reqBuilder = StorageContainerRequest.newBuilder()
- .setScId(SCID);
+ private PutRequest newPutRequest() {
RoutingHeader header = RoutingHeader.newBuilder()
.setStreamId(STREAM_ID)
.setRangeId(RANGE_ID)
.build();
- switch (type) {
- case KV_PUT_REQ:
- reqBuilder = reqBuilder.setKvPutReq(
- PutRequest.newBuilder().setHeader(header));
- break;
- case KV_DELETE_REQ:
- reqBuilder = reqBuilder.setKvDeleteReq(
- DeleteRangeRequest.newBuilder().setHeader(header));
- break;
- case KV_RANGE_REQ:
- reqBuilder = reqBuilder.setKvRangeReq(
- RangeRequest.newBuilder().setHeader(header));
- break;
- case KV_TXN_REQ:
- reqBuilder = reqBuilder.setKvTxnReq(
- TxnRequest.newBuilder().setHeader(header));
- break;
- case KV_INCR_REQ:
- reqBuilder = reqBuilder.setKvIncrReq(
- IncrementRequest.newBuilder().setHeader(header));
- break;
- default:
- break;
- }
- return reqBuilder.build();
+ return PutRequest.newBuilder()
+ .setHeader(header)
+ .build();
+ }
+
+ private DeleteRangeRequest newDeleteRequest() {
+ RoutingHeader header = RoutingHeader.newBuilder()
+ .setStreamId(STREAM_ID)
+ .setRangeId(RANGE_ID)
+ .build();
+ return DeleteRangeRequest.newBuilder()
+ .setHeader(header)
+ .build();
+ }
+
+ private RangeRequest newRangeRequest() {
+ RoutingHeader header = RoutingHeader.newBuilder()
+ .setStreamId(STREAM_ID)
+ .setRangeId(RANGE_ID)
+ .build();
+ return RangeRequest.newBuilder()
+ .setHeader(header)
+ .build();
+ }
+
+ private IncrementRequest newIncrRequest() {
+ RoutingHeader header = RoutingHeader.newBuilder()
+ .setStreamId(STREAM_ID)
+ .setRangeId(RANGE_ID)
+ .build();
+ return IncrementRequest.newBuilder()
+ .setHeader(header)
+ .build();
+ }
+
+ private TxnRequest newTxnRequest() {
+ RoutingHeader header = RoutingHeader.newBuilder()
+ .setStreamId(STREAM_ID)
+ .setRangeId(RANGE_ID)
+ .build();
+ return TxnRequest.newBuilder()
+ .setHeader(header)
+ .build();
}
@Test
public void testRangeWhenTableStoreNotCached() throws Exception {
mockStorageContainer(SCID);
- StorageContainerResponse expectedResp = StorageContainerResponse.getDefaultInstance();
- when(trStore.range(any(StorageContainerRequest.class)))
+ RangeResponse expectedResp = RangeResponse.getDefaultInstance();
+ when(trStore.range(any(RangeRequest.class)))
.thenReturn(FutureUtils.value(expectedResp));
- StorageContainerRequest request = newStorageContainerRequest(KV_RANGE_REQ);
- StorageContainerResponse response = FutureUtils.result(container.range(request));
+ RangeRequest request = newRangeRequest();
+ RangeResponse response = FutureUtils.result(container.range(request));
assertSame(expectedResp, response);
assertSame(trStore, container.getTableStoreCache().getTableStore(RID));
}
@@ -390,13 +405,13 @@ public class RangeStoreServiceImplTest {
public void testRangeWhenTableStoreCached() throws Exception {
mockStorageContainer(SCID);
- StorageContainerResponse expectedResp = StorageContainerResponse.getDefaultInstance();
- when(trStore.range(any(StorageContainerRequest.class)))
+ RangeResponse expectedResp = RangeResponse.getDefaultInstance();
+ when(trStore.range(any(RangeRequest.class)))
.thenReturn(FutureUtils.value(expectedResp));
container.getTableStoreCache().getTableStores().put(RID, trStore);
- StorageContainerRequest request = newStorageContainerRequest(KV_RANGE_REQ);
- StorageContainerResponse response = FutureUtils.result(container.range(request));
+ RangeRequest request = newRangeRequest();
+ RangeResponse response = FutureUtils.result(container.range(request));
assertSame(expectedResp, response);
}
@@ -404,12 +419,12 @@ public class RangeStoreServiceImplTest {
public void testPutWhenTableStoreNotCached() throws Exception {
mockStorageContainer(SCID);
- StorageContainerResponse expectedResp = StorageContainerResponse.getDefaultInstance();
- when(trStore.put(any(StorageContainerRequest.class)))
+ PutResponse expectedResp = PutResponse.getDefaultInstance();
+ when(trStore.put(any(PutRequest.class)))
.thenReturn(FutureUtils.value(expectedResp));
- StorageContainerRequest request = newStorageContainerRequest(KV_PUT_REQ);
- StorageContainerResponse response = FutureUtils.result(container.put(request));
+ PutRequest request = newPutRequest();
+ PutResponse response = FutureUtils.result(container.put(request));
assertSame(expectedResp, response);
assertSame(trStore, container.getTableStoreCache().getTableStore(RID));
}
@@ -418,13 +433,13 @@ public class RangeStoreServiceImplTest {
public void testPutWhenTableStoreCached() throws Exception {
mockStorageContainer(SCID);
- StorageContainerResponse expectedResp = StorageContainerResponse.getDefaultInstance();
- when(trStore.put(any(StorageContainerRequest.class)))
+ PutResponse expectedResp = PutResponse.getDefaultInstance();
+ when(trStore.put(any(PutRequest.class)))
.thenReturn(FutureUtils.value(expectedResp));
container.getTableStoreCache().getTableStores().put(RID, trStore);
- StorageContainerRequest request = newStorageContainerRequest(KV_PUT_REQ);
- StorageContainerResponse response = FutureUtils.result(container.put(request));
+ PutRequest request = newPutRequest();
+ PutResponse response = FutureUtils.result(container.put(request));
assertSame(expectedResp, response);
}
@@ -432,12 +447,12 @@ public class RangeStoreServiceImplTest {
public void testDeleteWhenTableStoreNotCached() throws Exception {
mockStorageContainer(SCID);
- StorageContainerResponse expectedResp = StorageContainerResponse.getDefaultInstance();
- when(trStore.delete(any(StorageContainerRequest.class)))
+ DeleteRangeResponse expectedResp = DeleteRangeResponse.getDefaultInstance();
+ when(trStore.delete(any(DeleteRangeRequest.class)))
.thenReturn(FutureUtils.value(expectedResp));
- StorageContainerRequest request = newStorageContainerRequest(KV_DELETE_REQ);
- StorageContainerResponse response = FutureUtils.result(container.delete(request));
+ DeleteRangeRequest request = newDeleteRequest();
+ DeleteRangeResponse response = FutureUtils.result(container.delete(request));
assertSame(expectedResp, response);
assertSame(trStore, container.getTableStoreCache().getTableStore(RID));
}
@@ -446,13 +461,13 @@ public class RangeStoreServiceImplTest {
public void testDeleteWhenTableStoreCached() throws Exception {
mockStorageContainer(SCID);
- StorageContainerResponse expectedResp = StorageContainerResponse.getDefaultInstance();
- when(trStore.delete(any(StorageContainerRequest.class)))
+ DeleteRangeResponse expectedResp = DeleteRangeResponse.getDefaultInstance();
+ when(trStore.delete(any(DeleteRangeRequest.class)))
.thenReturn(FutureUtils.value(expectedResp));
container.getTableStoreCache().getTableStores().put(RID, trStore);
- StorageContainerRequest request = newStorageContainerRequest(KV_DELETE_REQ);
- StorageContainerResponse response = FutureUtils.result(container.delete(request));
+ DeleteRangeRequest request = newDeleteRequest();
+ DeleteRangeResponse response = FutureUtils.result(container.delete(request));
assertSame(expectedResp, response);
}
@@ -460,12 +475,12 @@ public class RangeStoreServiceImplTest {
public void testTxnWhenTableStoreNotCached() throws Exception {
mockStorageContainer(SCID);
- StorageContainerResponse expectedResp = StorageContainerResponse.getDefaultInstance();
- when(trStore.txn(any(StorageContainerRequest.class)))
+ TxnResponse expectedResp = TxnResponse.getDefaultInstance();
+ when(trStore.txn(any(TxnRequest.class)))
.thenReturn(FutureUtils.value(expectedResp));
- StorageContainerRequest request = newStorageContainerRequest(KV_TXN_REQ);
- StorageContainerResponse response = FutureUtils.result(container.txn(request));
+ TxnRequest request = newTxnRequest();
+ TxnResponse response = FutureUtils.result(container.txn(request));
assertSame(expectedResp, response);
assertSame(trStore, container.getTableStoreCache().getTableStore(RID));
}
@@ -474,13 +489,13 @@ public class RangeStoreServiceImplTest {
public void testTxnWhenTableStoreCached() throws Exception {
mockStorageContainer(SCID);
- StorageContainerResponse expectedResp = StorageContainerResponse.getDefaultInstance();
- when(trStore.txn(any(StorageContainerRequest.class)))
+ TxnResponse expectedResp = TxnResponse.getDefaultInstance();
+ when(trStore.txn(any(TxnRequest.class)))
.thenReturn(FutureUtils.value(expectedResp));
container.getTableStoreCache().getTableStores().put(RID, trStore);
- StorageContainerRequest request = newStorageContainerRequest(KV_TXN_REQ);
- StorageContainerResponse response = FutureUtils.result(container.txn(request));
+ TxnRequest request = newTxnRequest();
+ TxnResponse response = FutureUtils.result(container.txn(request));
assertSame(expectedResp, response);
}
}
--
To stop receiving notification emails like this one, please contact
sijie@apache.org.