You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/06/12 04:48:35 UTC
[bookkeeper] branch branch-4.7 updated: [TABLE SERVICE] replaying
TxnRequest is not implemented
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/branch-4.7 by this push:
new 507d2f1 [TABLE SERVICE] replaying TxnRequest is not implemented
507d2f1 is described below
commit 507d2f11d6823dca131c7cd4d9225f4043221849
Author: Sijie Guo <si...@apache.org>
AuthorDate: Mon Jun 11 21:48:02 2018 -0700
[TABLE SERVICE] replaying TxnRequest is not implemented
*Motivation*
when enabling table service for pulsar at apache/incubator-pulsar#1922, I noticed that replaying TxnRequest is missed somehow.
*Solution*
This PR implements the replaying TxnRequest logic in the command process.
Author: Sijie Guo <si...@apache.org>
Reviewers: Enrico Olivelli <eo...@gmail.com>, Jia Zhai <None>
This closes #1505 from sijie/replay_txn_request
(cherry picked from commit b65fa438d6212eb4a89a848b4f2642ba2ad573da)
Signed-off-by: Sijie Guo <si...@apache.org>
---
.../journal/AbstractStateStoreWithJournal.java | 12 +-
.../statelib/impl/mvcc/MVCCCommandProcessor.java | 26 +++-
.../bookkeeper/statelib/impl/mvcc/MVCCUtils.java | 47 +++++++
.../impl/mvcc/op/proto/ProtoCompareImpl.java | 141 +++++++++++++++++++++
.../impl/mvcc/op/proto/ProtoPutOpImpl.java | 10 ++
.../impl/mvcc/op/proto/ProtoTxnOpImpl.java | 126 ++++++++++++++++++
.../impl/mvcc/TestMVCCAsyncBytesStoreImpl.java | 57 ++++++++-
.../impl/mvcc/op/proto/ProtoCompareImplTest.java | 123 ++++++++++++++++++
8 files changed, 534 insertions(+), 8 deletions(-)
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/journal/AbstractStateStoreWithJournal.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/journal/AbstractStateStoreWithJournal.java
index a64dbfe..1ab5da6 100644
--- a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/journal/AbstractStateStoreWithJournal.java
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/journal/AbstractStateStoreWithJournal.java
@@ -325,6 +325,10 @@ public abstract class AbstractStateStoreWithJournal<LocalStateStoreT extends Sta
record, record.getDlsn(), name());
}
try {
+ if (log.isDebugEnabled()) {
+ log.debug("Applying command transaction {} - record {} @ {} to mvcc store {}",
+ record.getTransactionId(), record, record.getDlsn(), name());
+ }
commandProcessor.applyCommand(record.getTransactionId(), record.getPayloadBuf(), localStore);
if (record.getDlsn().compareTo(endDLSN) >= 0) {
@@ -334,9 +338,15 @@ public abstract class AbstractStateStoreWithJournal<LocalStateStoreT extends Sta
return;
}
+ if (log.isDebugEnabled()) {
+ log.debug("Read next record after {} at mvcc store {}",
+ record.getDlsn(), name());
+ }
// read next record
replayJournal(reader, endDLSN, future);
- } catch (StateStoreRuntimeException e) {
+ } catch (Exception e) {
+ log.error("Exception is thrown when applying command record {} @ {} to mvcc store {}",
+ record, record.getDlsn(), name());
FutureUtils.completeExceptionally(future, e);
}
}
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCCommandProcessor.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCCommandProcessor.java
index d4bb484..dc3e805 100644
--- a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCCommandProcessor.java
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCCommandProcessor.java
@@ -25,15 +25,18 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.api.kv.op.DeleteOp;
import org.apache.bookkeeper.api.kv.op.IncrementOp;
import org.apache.bookkeeper.api.kv.op.PutOp;
+import org.apache.bookkeeper.api.kv.op.TxnOp;
import org.apache.bookkeeper.api.kv.result.Code;
import org.apache.bookkeeper.api.kv.result.DeleteResult;
import org.apache.bookkeeper.api.kv.result.IncrementResult;
import org.apache.bookkeeper.api.kv.result.PutResult;
+import org.apache.bookkeeper.api.kv.result.TxnResult;
import org.apache.bookkeeper.statelib.api.exceptions.MVCCStoreException;
import org.apache.bookkeeper.statelib.impl.journal.CommandProcessor;
import org.apache.bookkeeper.statelib.impl.mvcc.op.proto.ProtoDeleteOpImpl;
import org.apache.bookkeeper.statelib.impl.mvcc.op.proto.ProtoIncrementOpImpl;
import org.apache.bookkeeper.statelib.impl.mvcc.op.proto.ProtoPutOpImpl;
+import org.apache.bookkeeper.statelib.impl.mvcc.op.proto.ProtoTxnOpImpl;
import org.apache.bookkeeper.stream.proto.kv.store.Command;
@Slf4j
@@ -89,7 +92,28 @@ class MVCCCommandProcessor implements CommandProcessor<MVCCStoreImpl<byte[], byt
private void applyTxnCommand(long revision, Command command,
MVCCStoreImpl<byte[], byte[]> store) {
- throw new UnsupportedOperationException();
+ try (ProtoTxnOpImpl op = ProtoTxnOpImpl.newTxnOp(command.getTxnReq())) {
+ applyTxnOp(revision, op, true, store);
+ }
+ }
+
+ private void applyTxnOp(long revision,
+ TxnOp<byte[], byte[]> op,
+ boolean ignoreSmallerRevision,
+ MVCCStoreImpl<byte[], byte[]> localStore) {
+ try (TxnResult<byte[], byte[]> result = localStore.processTxn(revision, op)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Result after applying transaction {} : {} - success = {}",
+ revision, result.code(), result.isSuccess());
+ }
+ if (Code.OK == result.code()
+ || (ignoreSmallerRevision && Code.SMALLER_REVISION == result.code())) {
+ return;
+ }
+ throw new MVCCStoreException(result.code(),
+ "Failed to apply command " + op + " at revision "
+ + revision + " to the state store " + localStore.name());
+ }
}
private void applyIncrCommand(long revision, Command command,
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCUtils.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCUtils.java
index 8bc5965..ac96a92 100644
--- a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCUtils.java
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCUtils.java
@@ -45,6 +45,9 @@ import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.statelib.api.exceptions.MVCCStoreException;
import org.apache.bookkeeper.statelib.api.exceptions.StateStoreRuntimeException;
import org.apache.bookkeeper.statelib.impl.Constants;
+import org.apache.bookkeeper.statelib.impl.mvcc.op.proto.ProtoDeleteOpImpl;
+import org.apache.bookkeeper.statelib.impl.mvcc.op.proto.ProtoPutOpImpl;
+import org.apache.bookkeeper.statelib.impl.mvcc.op.proto.ProtoRangeOpImpl;
import org.apache.bookkeeper.stream.proto.kv.rpc.Compare;
import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
@@ -144,6 +147,20 @@ public final class MVCCUtils {
return requestOps;
}
+ public static Op<byte[], byte[]> toApiOp(RequestOp protoOp) {
+ switch (protoOp.getRequestCase()) {
+ case REQUEST_PUT:
+ return ProtoPutOpImpl.newPutOp(protoOp.getRequestPut());
+ case REQUEST_RANGE:
+ return ProtoRangeOpImpl.newRangeOp(protoOp.getRequestRange());
+ case REQUEST_DELETE_RANGE:
+ return ProtoDeleteOpImpl.newDeleteOp(protoOp.getRequestDeleteRange());
+ default:
+ throw new IllegalArgumentException("Unknown request "
+ + protoOp.getRequestCase() + " found in a txn request");
+ }
+ }
+
private static List<Compare> toCompareList(List<CompareOp<byte[], byte[]>> ops) {
List<Compare> compares = Lists.newArrayListWithExpectedSize(ops.size());
for (CompareOp<byte[], byte[]> op : ops) {
@@ -193,6 +210,21 @@ public final class MVCCUtils {
}
}
+ public static CompareTarget toApiCompareTarget(Compare.CompareTarget target) {
+ switch (target) {
+ case MOD:
+ return CompareTarget.MOD;
+ case CREATE:
+ return CompareTarget.CREATE;
+ case VERSION:
+ return CompareTarget.VERSION;
+ case VALUE:
+ return CompareTarget.VALUE;
+ default:
+ throw new IllegalArgumentException("Invalid proto compare target " + target);
+ }
+ }
+
private static Compare.CompareResult toProtoCompareResult(CompareResult result) {
switch (result) {
case LESS:
@@ -208,6 +240,21 @@ public final class MVCCUtils {
}
}
+ public static CompareResult toApiCompareResult(Compare.CompareResult result) {
+ switch (result) {
+ case LESS:
+ return CompareResult.LESS;
+ case EQUAL:
+ return CompareResult.EQUAL;
+ case GREATER:
+ return CompareResult.GREATER;
+ case NOT_EQUAL:
+ return CompareResult.NOT_EQUAL;
+ default:
+ throw new IllegalArgumentException("Invalid proto compare result " + result);
+ }
+ }
+
static TxnRequest toTxnRequest(TxnOp<byte[], byte[]> op) {
return TxnRequest.newBuilder()
.addAllSuccess(toRequestOpList(op.successOps()))
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/op/proto/ProtoCompareImpl.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/op/proto/ProtoCompareImpl.java
new file mode 100644
index 0000000..569f9d1
--- /dev/null
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/op/proto/ProtoCompareImpl.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.statelib.impl.mvcc.op.proto;
+
+import static org.apache.bookkeeper.statelib.impl.Constants.INVALID_REVISION;
+import static org.apache.bookkeeper.statelib.impl.mvcc.MVCCUtils.toApiCompareResult;
+import static org.apache.bookkeeper.statelib.impl.mvcc.MVCCUtils.toApiCompareTarget;
+
+import com.google.protobuf.ByteString;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+import lombok.AccessLevel;
+import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+import lombok.ToString;
+import org.apache.bookkeeper.api.kv.op.CompareOp;
+import org.apache.bookkeeper.api.kv.op.CompareResult;
+import org.apache.bookkeeper.api.kv.op.CompareTarget;
+import org.apache.bookkeeper.stream.proto.kv.rpc.Compare;
+
+/**
+ * A protobuf encoded compare operation.
+ */
+@RequiredArgsConstructor
+@Setter(AccessLevel.PRIVATE)
+@ToString(exclude = "recyclerHandle")
+public class ProtoCompareImpl implements CompareOp<byte[], byte[]> {
+
+ /**
+ * Create a protobuf encoded compare operation.
+ *
+ * @param protoCompare the protobuf representation of a compare operation.
+ * @return a protobuf encoded compare operation
+ */
+ public static ProtoCompareImpl newCompareOp(Compare protoCompare) {
+ ProtoCompareImpl op = RECYCLER.get();
+ op.setRequest(protoCompare);
+ op.setTarget(toApiCompareTarget(protoCompare.getTarget()));
+ op.setResult(toApiCompareResult(protoCompare.getResult()));
+ return op;
+ }
+
+ private static final Recycler<ProtoCompareImpl> RECYCLER = new Recycler<ProtoCompareImpl>() {
+ @Override
+ protected ProtoCompareImpl newObject(Handle<ProtoCompareImpl> handle) {
+ return new ProtoCompareImpl(handle);
+ }
+ };
+
+ private final Handle<ProtoCompareImpl> recyclerHandle;
+ private Compare request;
+ private CompareTarget target;
+ private CompareResult result;
+ private byte[] key;
+ private byte[] value;
+
+ private void reset() {
+ request = null;
+ key = null;
+ value = null;
+ target = null;
+ result = null;
+ }
+
+ @Override
+ public CompareTarget target() {
+ return target;
+ }
+
+ @Override
+ public CompareResult result() {
+ return result;
+ }
+
+ @Override
+ public byte[] key() {
+ if (null != key) {
+ return key;
+ }
+ if (ByteString.EMPTY == request.getKey()) {
+ key = null;
+ } else {
+ key = request.getKey().toByteArray();
+ }
+ return key;
+ }
+
+ @Override
+ public byte[] value() {
+ if (null != value) {
+ return value;
+ }
+ if (ByteString.EMPTY == request.getValue()) {
+ value = null;
+ } else {
+ value = request.getValue().toByteArray();
+ }
+ return value;
+ }
+
+ @Override
+ public long revision() {
+ Compare req = request;
+ if (null == req) {
+ return INVALID_REVISION;
+ } else {
+ switch (req.getTarget()) {
+ case MOD:
+ return req.getModRevision();
+ case CREATE:
+ return req.getCreateRevision();
+ case VERSION:
+ return req.getVersion();
+ default:
+ return INVALID_REVISION;
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ reset();
+ recyclerHandle.recycle(this);
+ }
+}
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/op/proto/ProtoPutOpImpl.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/op/proto/ProtoPutOpImpl.java
index 8ae2b88..cdde245 100644
--- a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/op/proto/ProtoPutOpImpl.java
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/op/proto/ProtoPutOpImpl.java
@@ -42,6 +42,12 @@ public class ProtoPutOpImpl implements PutOp<byte[], byte[]>, PutOption<byte[]>
return op;
}
+ public static ProtoPutOpImpl newPutOp(PutRequest req) {
+ ProtoPutOpImpl op = RECYCLER.get();
+ op.setPutRequest(req);
+ return op;
+ }
+
private static final Recycler<ProtoPutOpImpl> RECYCLER = new Recycler<ProtoPutOpImpl>() {
@Override
protected ProtoPutOpImpl newObject(Handle<ProtoPutOpImpl> handle) {
@@ -73,6 +79,10 @@ public class ProtoPutOpImpl implements PutOp<byte[], byte[]>, PutOption<byte[]>
this.req = command.getPutReq();
}
+ public void setPutRequest(PutRequest request) {
+ this.req = request;
+ }
+
@Override
public boolean prevKv() {
return req.getPrevKv();
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/op/proto/ProtoTxnOpImpl.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/op/proto/ProtoTxnOpImpl.java
new file mode 100644
index 0000000..37f675c
--- /dev/null
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/op/proto/ProtoTxnOpImpl.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.statelib.impl.mvcc.op.proto;
+
+import static org.apache.bookkeeper.statelib.impl.mvcc.MVCCUtils.toApiOp;
+
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+import java.util.List;
+import lombok.AccessLevel;
+import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+import lombok.ToString;
+import org.apache.bookkeeper.api.kv.op.CompareOp;
+import org.apache.bookkeeper.api.kv.op.Op;
+import org.apache.bookkeeper.api.kv.op.OpType;
+import org.apache.bookkeeper.api.kv.op.TxnOp;
+import org.apache.bookkeeper.common.collections.RecyclableArrayList;
+import org.apache.bookkeeper.stream.proto.kv.rpc.Compare;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RequestOp;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
+
+/**
+ * A protobuf encoded transaction operation.
+ */
+@RequiredArgsConstructor
+@ToString(exclude = "recyclerHandle")
+@Setter(AccessLevel.PRIVATE)
+public class ProtoTxnOpImpl implements TxnOp<byte[], byte[]> {
+
+ public static ProtoTxnOpImpl newTxnOp(TxnRequest request) {
+ ProtoTxnOpImpl op = RECYCLER.get();
+ op.setRequest(request);
+ RecyclableArrayList<CompareOp<byte[], byte[]>> compareOps = COMPARE_OPS_RECYCLER.newInstance();
+ for (Compare compare : request.getCompareList()) {
+ compareOps.add(ProtoCompareImpl.newCompareOp(compare));
+ }
+ op.setCompareOps(compareOps);
+ RecyclableArrayList<Op<byte[], byte[]>> successOps = OPS_RECYCLER.newInstance();
+ for (RequestOp reqOp : request.getSuccessList()) {
+ successOps.add(toApiOp(reqOp));
+ }
+ op.setSuccessOps(successOps);
+ RecyclableArrayList<Op<byte[], byte[]>> failureOps = OPS_RECYCLER.newInstance();
+ for (RequestOp reqOp : request.getFailureList()) {
+ failureOps.add(toApiOp(reqOp));
+ }
+ return op;
+ }
+
+ private static final Recycler<ProtoTxnOpImpl> RECYCLER = new Recycler<ProtoTxnOpImpl>() {
+ @Override
+ protected ProtoTxnOpImpl newObject(Handle<ProtoTxnOpImpl> handle) {
+ return new ProtoTxnOpImpl(handle);
+ }
+ };
+
+ private static final RecyclableArrayList.Recycler<CompareOp<byte[], byte[]>> COMPARE_OPS_RECYCLER =
+ new RecyclableArrayList.Recycler<>();
+ private static final RecyclableArrayList.Recycler<Op<byte[], byte[]>> OPS_RECYCLER =
+ new RecyclableArrayList.Recycler<>();
+
+ private final Handle<ProtoTxnOpImpl> recyclerHandle;
+ private TxnRequest request;
+ private RecyclableArrayList<CompareOp<byte[], byte[]>> compareOps;
+ private RecyclableArrayList<Op<byte[], byte[]>> successOps;
+ private RecyclableArrayList<Op<byte[], byte[]>> failureOps;
+
+ private void reset() {
+ request = null;
+ if (null != compareOps) {
+ compareOps.forEach(CompareOp::close);
+ compareOps.recycle();
+ }
+ if (null != successOps) {
+ successOps.forEach(Op::close);
+ successOps.recycle();
+ }
+ if (null != failureOps) {
+ failureOps.forEach(Op::close);
+ failureOps.recycle();
+ }
+ }
+
+ @Override
+ public List<CompareOp<byte[], byte[]>> compareOps() {
+ return compareOps;
+ }
+
+ @Override
+ public List<Op<byte[], byte[]>> successOps() {
+ return successOps;
+ }
+
+ @Override
+ public List<Op<byte[], byte[]>> failureOps() {
+ return failureOps;
+ }
+
+ @Override
+ public OpType type() {
+ return OpType.TXN;
+ }
+
+ @Override
+ public void close() {
+ reset();
+ recyclerHandle.recycle(this);
+ }
+}
diff --git a/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/mvcc/TestMVCCAsyncBytesStoreImpl.java b/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/mvcc/TestMVCCAsyncBytesStoreImpl.java
index 610d5d2..5314ed8 100644
--- a/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/mvcc/TestMVCCAsyncBytesStoreImpl.java
+++ b/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/mvcc/TestMVCCAsyncBytesStoreImpl.java
@@ -29,11 +29,11 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import com.google.common.collect.Lists;
-import com.google.common.io.Files;
import java.io.File;
import java.net.URI;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.IntStream;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.api.kv.op.PutOp;
@@ -45,7 +45,6 @@ import org.apache.bookkeeper.common.coder.ByteArrayCoder;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.statelib.api.StateStoreSpec;
import org.apache.bookkeeper.statelib.api.exceptions.MVCCStoreException;
-import org.apache.commons.io.FileUtils;
import org.apache.distributedlog.DLMTestUtil;
import org.apache.distributedlog.TestDistributedLogBase;
import org.apache.distributedlog.api.namespace.Namespace;
@@ -54,7 +53,9 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
/**
* Unit test of {@link MVCCAsyncBytesStoreImpl}.
@@ -62,6 +63,9 @@ import org.junit.Test;
@Slf4j
public class TestMVCCAsyncBytesStoreImpl extends TestDistributedLogBase {
+ @Rule
+ public final TemporaryFolder testDir = new TemporaryFolder();
+
private static URI uri;
private static Namespace namespace;
@@ -96,7 +100,7 @@ public class TestMVCCAsyncBytesStoreImpl extends TestDistributedLogBase {
super.setup();
ensureURICreated(uri);
- tempDir = Files.createTempDir();
+ tempDir = testDir.newFolder();
store = new MVCCAsyncBytesStoreImpl(
() -> new MVCCStoreImpl<>(),
@@ -123,9 +127,6 @@ public class TestMVCCAsyncBytesStoreImpl extends TestDistributedLogBase {
if (null != store) {
store.close();
}
- if (null != tempDir) {
- FileUtils.deleteDirectory(tempDir);
- }
super.teardown();
}
@@ -423,4 +424,48 @@ public class TestMVCCAsyncBytesStoreImpl extends TestDistributedLogBase {
assertEquals(endKey + 1, idx);
}
+ @Test
+ public void testReplayJournal() throws Exception {
+ this.streamName = "test-replay-journal";
+ StateStoreSpec spec = initSpec(streamName);
+ result(store.init(spec));
+
+ int numKvs = 10;
+
+ // putIfAbsent
+ IntStream.range(0, numKvs)
+ .forEach(i -> {
+ try {
+ result(store.putIfAbsent(getKey(i), getValue(100 + i)));
+ } catch (Exception e) {
+ log.error("Failed to put kv pair ({})", i, e);
+ }
+ });
+
+ log.info("Closing the store '{}' ...", streamName);
+ // close the store
+ store.close();
+ log.info("Closed the store '{}' ...", streamName);
+
+ // open the store again to replay the journal.
+ store = new MVCCAsyncBytesStoreImpl(
+ () -> new MVCCStoreImpl<>(),
+ () -> namespace);
+ spec = StateStoreSpec.builder()
+ .name(streamName)
+ .keyCoder(ByteArrayCoder.of())
+ .valCoder(ByteArrayCoder.of())
+ .stream(streamName)
+ .localStateStoreDir(testDir.newFolder())
+ .build();
+ result(store.init(spec));
+
+ // verify the key/value pairs
+ for (int i = 0; i < numKvs; i++) {
+ byte[] value = result(store.get(getKey(i)));
+ assertNotNull(value);
+ assertArrayEquals(getValue(100 + i), value);
+ }
+ }
+
}
diff --git a/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/mvcc/op/proto/ProtoCompareImplTest.java b/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/mvcc/op/proto/ProtoCompareImplTest.java
new file mode 100644
index 0000000..6e50baf
--- /dev/null
+++ b/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/mvcc/op/proto/ProtoCompareImplTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.statelib.impl.mvcc.op.proto;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import com.google.protobuf.ByteString;
+import lombok.Cleanup;
+import org.apache.bookkeeper.stream.proto.kv.rpc.Compare;
+import org.apache.bookkeeper.stream.proto.kv.rpc.Compare.CompareResult;
+import org.apache.bookkeeper.stream.proto.kv.rpc.Compare.CompareTarget;
+import org.junit.Test;
+
+/**
+ * Unit test {@link ProtoCompareImpl}.
+ */
+public class ProtoCompareImplTest {
+
+ private static final ByteString KEY = ByteString.copyFromUtf8("test-key");
+ private static final ByteString VAL = ByteString.copyFromUtf8("test-value");
+ private static final long MOD_REV = System.currentTimeMillis();
+ private static final long CREATE_REV = MOD_REV + 1;
+ private static final long VERSION = CREATE_REV + 1;
+
+ @Test
+ public void testCompareEmptyValue() {
+ Compare compare = Compare.newBuilder()
+ .setKey(KEY)
+ .setResult(CompareResult.EQUAL)
+ .setTarget(CompareTarget.VALUE)
+ .build();
+
+ @Cleanup ProtoCompareImpl protoCompare = ProtoCompareImpl.newCompareOp(compare);
+ assertArrayEquals("test-key".getBytes(UTF_8), protoCompare.key());
+ assertNull(protoCompare.value());
+ assertEquals(org.apache.bookkeeper.api.kv.op.CompareResult.EQUAL, protoCompare.result());
+ assertEquals(org.apache.bookkeeper.api.kv.op.CompareTarget.VALUE, protoCompare.target());
+ }
+
+ @Test
+ public void testCompareValue() {
+ Compare compare = Compare.newBuilder()
+ .setKey(KEY)
+ .setValue(VAL)
+ .setResult(CompareResult.EQUAL)
+ .setTarget(CompareTarget.VALUE)
+ .build();
+
+ @Cleanup ProtoCompareImpl protoCompare = ProtoCompareImpl.newCompareOp(compare);
+ assertArrayEquals("test-key".getBytes(UTF_8), protoCompare.key());
+ assertArrayEquals("test-value".getBytes(UTF_8), protoCompare.value());
+ assertEquals(org.apache.bookkeeper.api.kv.op.CompareResult.EQUAL, protoCompare.result());
+ assertEquals(org.apache.bookkeeper.api.kv.op.CompareTarget.VALUE, protoCompare.target());
+ }
+
+ @Test
+ public void testCompareMod() {
+ Compare compare = Compare.newBuilder()
+ .setKey(KEY)
+ .setModRevision(MOD_REV)
+ .setResult(CompareResult.EQUAL)
+ .setTarget(CompareTarget.MOD)
+ .build();
+
+ @Cleanup ProtoCompareImpl protoCompare = ProtoCompareImpl.newCompareOp(compare);
+ assertArrayEquals("test-key".getBytes(UTF_8), protoCompare.key());
+ assertEquals(MOD_REV, protoCompare.revision());
+ assertEquals(org.apache.bookkeeper.api.kv.op.CompareResult.EQUAL, protoCompare.result());
+ assertEquals(org.apache.bookkeeper.api.kv.op.CompareTarget.MOD, protoCompare.target());
+ }
+
+ @Test
+ public void testCompareCreate() {
+ Compare compare = Compare.newBuilder()
+ .setKey(KEY)
+ .setCreateRevision(CREATE_REV)
+ .setResult(CompareResult.EQUAL)
+ .setTarget(CompareTarget.CREATE)
+ .build();
+
+ @Cleanup ProtoCompareImpl protoCompare = ProtoCompareImpl.newCompareOp(compare);
+ assertArrayEquals("test-key".getBytes(UTF_8), protoCompare.key());
+ assertEquals(CREATE_REV, protoCompare.revision());
+ assertEquals(org.apache.bookkeeper.api.kv.op.CompareResult.EQUAL, protoCompare.result());
+ assertEquals(org.apache.bookkeeper.api.kv.op.CompareTarget.CREATE, protoCompare.target());
+ }
+
+ @Test
+ public void testCompareVersion() {
+ Compare compare = Compare.newBuilder()
+ .setKey(KEY)
+ .setVersion(VERSION)
+ .setResult(CompareResult.EQUAL)
+ .setTarget(CompareTarget.VERSION)
+ .build();
+
+ @Cleanup ProtoCompareImpl protoCompare = ProtoCompareImpl.newCompareOp(compare);
+ assertArrayEquals("test-key".getBytes(UTF_8), protoCompare.key());
+ assertEquals(VERSION, protoCompare.revision());
+ assertEquals(org.apache.bookkeeper.api.kv.op.CompareResult.EQUAL, protoCompare.result());
+ assertEquals(org.apache.bookkeeper.api.kv.op.CompareTarget.VERSION, protoCompare.target());
+ }
+
+}
--
To stop receiving notification emails like this one, please contact
sijie@apache.org.