You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/06/12 04:48:35 UTC

[bookkeeper] branch branch-4.7 updated: [TABLE SERVICE] replaying TxnRequest is not implemented

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/branch-4.7 by this push:
     new 507d2f1  [TABLE SERVICE] replaying TxnRequest is not implemented
507d2f1 is described below

commit 507d2f11d6823dca131c7cd4d9225f4043221849
Author: Sijie Guo <si...@apache.org>
AuthorDate: Mon Jun 11 21:48:02 2018 -0700

    [TABLE SERVICE] replaying TxnRequest is not implemented
    
    *Motivation*
    
    when enabling table service for pulsar at apache/incubator-pulsar#1922, I noticed that replaying TxnRequest is missed somehow.
    
    *Solution*
    
    This PR implements the replaying TxnRequest logic in the command process.
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Jia Zhai <None>
    
    This closes #1505 from sijie/replay_txn_request
    
    (cherry picked from commit b65fa438d6212eb4a89a848b4f2642ba2ad573da)
    Signed-off-by: Sijie Guo <si...@apache.org>
---
 .../journal/AbstractStateStoreWithJournal.java     |  12 +-
 .../statelib/impl/mvcc/MVCCCommandProcessor.java   |  26 +++-
 .../bookkeeper/statelib/impl/mvcc/MVCCUtils.java   |  47 +++++++
 .../impl/mvcc/op/proto/ProtoCompareImpl.java       | 141 +++++++++++++++++++++
 .../impl/mvcc/op/proto/ProtoPutOpImpl.java         |  10 ++
 .../impl/mvcc/op/proto/ProtoTxnOpImpl.java         | 126 ++++++++++++++++++
 .../impl/mvcc/TestMVCCAsyncBytesStoreImpl.java     |  57 ++++++++-
 .../impl/mvcc/op/proto/ProtoCompareImplTest.java   | 123 ++++++++++++++++++
 8 files changed, 534 insertions(+), 8 deletions(-)

diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/journal/AbstractStateStoreWithJournal.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/journal/AbstractStateStoreWithJournal.java
index a64dbfe..1ab5da6 100644
--- a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/journal/AbstractStateStoreWithJournal.java
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/journal/AbstractStateStoreWithJournal.java
@@ -325,6 +325,10 @@ public abstract class AbstractStateStoreWithJournal<LocalStateStoreT extends Sta
                         record, record.getDlsn(), name());
                 }
                 try {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Applying command transaction {} - record {} @ {} to mvcc store {}",
+                            record.getTransactionId(), record, record.getDlsn(), name());
+                    }
                     commandProcessor.applyCommand(record.getTransactionId(), record.getPayloadBuf(), localStore);
 
                     if (record.getDlsn().compareTo(endDLSN) >= 0) {
@@ -334,9 +338,15 @@ public abstract class AbstractStateStoreWithJournal<LocalStateStoreT extends Sta
                         return;
                     }
 
+                    if (log.isDebugEnabled()) {
+                        log.debug("Read next record after {} at mvcc store {}",
+                            record.getDlsn(), name());
+                    }
                     // read next record
                     replayJournal(reader, endDLSN, future);
-                } catch (StateStoreRuntimeException e) {
+                } catch (Exception e) {
+                    log.error("Exception is thrown when applying command record {} @ {} to mvcc store {}",
+                        record, record.getDlsn(), name());
                     FutureUtils.completeExceptionally(future, e);
                 }
             }
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCCommandProcessor.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCCommandProcessor.java
index d4bb484..dc3e805 100644
--- a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCCommandProcessor.java
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCCommandProcessor.java
@@ -25,15 +25,18 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.api.kv.op.DeleteOp;
 import org.apache.bookkeeper.api.kv.op.IncrementOp;
 import org.apache.bookkeeper.api.kv.op.PutOp;
+import org.apache.bookkeeper.api.kv.op.TxnOp;
 import org.apache.bookkeeper.api.kv.result.Code;
 import org.apache.bookkeeper.api.kv.result.DeleteResult;
 import org.apache.bookkeeper.api.kv.result.IncrementResult;
 import org.apache.bookkeeper.api.kv.result.PutResult;
+import org.apache.bookkeeper.api.kv.result.TxnResult;
 import org.apache.bookkeeper.statelib.api.exceptions.MVCCStoreException;
 import org.apache.bookkeeper.statelib.impl.journal.CommandProcessor;
 import org.apache.bookkeeper.statelib.impl.mvcc.op.proto.ProtoDeleteOpImpl;
 import org.apache.bookkeeper.statelib.impl.mvcc.op.proto.ProtoIncrementOpImpl;
 import org.apache.bookkeeper.statelib.impl.mvcc.op.proto.ProtoPutOpImpl;
+import org.apache.bookkeeper.statelib.impl.mvcc.op.proto.ProtoTxnOpImpl;
 import org.apache.bookkeeper.stream.proto.kv.store.Command;
 
 @Slf4j
@@ -89,7 +92,28 @@ class MVCCCommandProcessor implements CommandProcessor<MVCCStoreImpl<byte[], byt
 
     private void applyTxnCommand(long revision, Command command,
                                  MVCCStoreImpl<byte[], byte[]> store) {
-        throw new UnsupportedOperationException();
+        try (ProtoTxnOpImpl op = ProtoTxnOpImpl.newTxnOp(command.getTxnReq())) {
+            applyTxnOp(revision, op, true, store);
+        }
+    }
+
+    private void applyTxnOp(long revision,
+                            TxnOp<byte[], byte[]> op,
+                            boolean ignoreSmallerRevision,
+                            MVCCStoreImpl<byte[], byte[]> localStore) {
+        try (TxnResult<byte[], byte[]> result = localStore.processTxn(revision, op)) {
+            if (log.isDebugEnabled()) {
+                log.debug("Result after applying transaction {} : {} - success = {}",
+                    revision, result.code(), result.isSuccess());
+            }
+            if (Code.OK == result.code()
+                || (ignoreSmallerRevision && Code.SMALLER_REVISION == result.code())) {
+                return;
+            }
+            throw new MVCCStoreException(result.code(),
+                "Failed to apply command " + op + " at revision "
+                    + revision + " to the state store " + localStore.name());
+        }
     }
 
     private void applyIncrCommand(long revision, Command command,
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCUtils.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCUtils.java
index 8bc5965..ac96a92 100644
--- a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCUtils.java
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCUtils.java
@@ -45,6 +45,9 @@ import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.statelib.api.exceptions.MVCCStoreException;
 import org.apache.bookkeeper.statelib.api.exceptions.StateStoreRuntimeException;
 import org.apache.bookkeeper.statelib.impl.Constants;
+import org.apache.bookkeeper.statelib.impl.mvcc.op.proto.ProtoDeleteOpImpl;
+import org.apache.bookkeeper.statelib.impl.mvcc.op.proto.ProtoPutOpImpl;
+import org.apache.bookkeeper.statelib.impl.mvcc.op.proto.ProtoRangeOpImpl;
 import org.apache.bookkeeper.stream.proto.kv.rpc.Compare;
 import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
 import org.apache.bookkeeper.stream.proto.kv.rpc.IncrementRequest;
@@ -144,6 +147,20 @@ public final class MVCCUtils {
         return requestOps;
     }
 
+    public static Op<byte[], byte[]> toApiOp(RequestOp protoOp) {
+        switch (protoOp.getRequestCase()) {
+            case REQUEST_PUT:
+                return ProtoPutOpImpl.newPutOp(protoOp.getRequestPut());
+            case REQUEST_RANGE:
+                return ProtoRangeOpImpl.newRangeOp(protoOp.getRequestRange());
+            case REQUEST_DELETE_RANGE:
+                return ProtoDeleteOpImpl.newDeleteOp(protoOp.getRequestDeleteRange());
+            default:
+                throw new IllegalArgumentException("Unknown request "
+                    + protoOp.getRequestCase() + " found in a txn request");
+        }
+    }
+
     private static List<Compare> toCompareList(List<CompareOp<byte[], byte[]>> ops) {
         List<Compare> compares = Lists.newArrayListWithExpectedSize(ops.size());
         for (CompareOp<byte[], byte[]> op : ops) {
@@ -193,6 +210,21 @@ public final class MVCCUtils {
         }
     }
 
+    public static CompareTarget toApiCompareTarget(Compare.CompareTarget target) {
+        switch (target) {
+            case MOD:
+                return CompareTarget.MOD;
+            case CREATE:
+                return CompareTarget.CREATE;
+            case VERSION:
+                return CompareTarget.VERSION;
+            case VALUE:
+                return CompareTarget.VALUE;
+            default:
+                throw new IllegalArgumentException("Invalid proto compare target " + target);
+        }
+    }
+
     private static Compare.CompareResult toProtoCompareResult(CompareResult result) {
         switch (result) {
             case LESS:
@@ -208,6 +240,21 @@ public final class MVCCUtils {
         }
     }
 
+    public static CompareResult toApiCompareResult(Compare.CompareResult result) {
+        switch (result) {
+            case LESS:
+                return CompareResult.LESS;
+            case EQUAL:
+                return CompareResult.EQUAL;
+            case GREATER:
+                return CompareResult.GREATER;
+            case NOT_EQUAL:
+                return CompareResult.NOT_EQUAL;
+            default:
+                throw new IllegalArgumentException("Invalid proto compare result " + result);
+        }
+    }
+
     static TxnRequest toTxnRequest(TxnOp<byte[], byte[]> op) {
         return TxnRequest.newBuilder()
             .addAllSuccess(toRequestOpList(op.successOps()))
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/op/proto/ProtoCompareImpl.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/op/proto/ProtoCompareImpl.java
new file mode 100644
index 0000000..569f9d1
--- /dev/null
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/op/proto/ProtoCompareImpl.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.statelib.impl.mvcc.op.proto;
+
+import static org.apache.bookkeeper.statelib.impl.Constants.INVALID_REVISION;
+import static org.apache.bookkeeper.statelib.impl.mvcc.MVCCUtils.toApiCompareResult;
+import static org.apache.bookkeeper.statelib.impl.mvcc.MVCCUtils.toApiCompareTarget;
+
+import com.google.protobuf.ByteString;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+import lombok.AccessLevel;
+import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+import lombok.ToString;
+import org.apache.bookkeeper.api.kv.op.CompareOp;
+import org.apache.bookkeeper.api.kv.op.CompareResult;
+import org.apache.bookkeeper.api.kv.op.CompareTarget;
+import org.apache.bookkeeper.stream.proto.kv.rpc.Compare;
+
+/**
+ * A protobuf encoded compare operation.
+ */
+@RequiredArgsConstructor
+@Setter(AccessLevel.PRIVATE)
+@ToString(exclude = "recyclerHandle")
+public class ProtoCompareImpl implements CompareOp<byte[], byte[]> {
+
+    /**
+     * Create a protobuf encoded compare operation.
+     *
+     * @param protoCompare the protobuf representation of a compare operation.
+     * @return a protobuf encoded compare operation
+     */
+    public static ProtoCompareImpl newCompareOp(Compare protoCompare) {
+        ProtoCompareImpl op = RECYCLER.get();
+        op.setRequest(protoCompare);
+        op.setTarget(toApiCompareTarget(protoCompare.getTarget()));
+        op.setResult(toApiCompareResult(protoCompare.getResult()));
+        return op;
+    }
+
+    private static final Recycler<ProtoCompareImpl> RECYCLER = new Recycler<ProtoCompareImpl>() {
+        @Override
+        protected ProtoCompareImpl newObject(Handle<ProtoCompareImpl> handle) {
+            return new ProtoCompareImpl(handle);
+        }
+    };
+
+    private final Handle<ProtoCompareImpl> recyclerHandle;
+    private Compare request;
+    private CompareTarget target;
+    private CompareResult result;
+    private byte[] key;
+    private byte[] value;
+
+    private void reset() {
+        request = null;
+        key = null;
+        value = null;
+        target = null;
+        result = null;
+    }
+
+    @Override
+    public CompareTarget target() {
+        return target;
+    }
+
+    @Override
+    public CompareResult result() {
+        return result;
+    }
+
+    @Override
+    public byte[] key() {
+        if (null != key) {
+            return key;
+        }
+        if (ByteString.EMPTY == request.getKey()) {
+            key = null;
+        } else {
+            key = request.getKey().toByteArray();
+        }
+        return key;
+    }
+
+    @Override
+    public byte[] value() {
+        if (null != value) {
+            return value;
+        }
+        if (ByteString.EMPTY == request.getValue()) {
+            value = null;
+        } else {
+            value = request.getValue().toByteArray();
+        }
+        return value;
+    }
+
+    @Override
+    public long revision() {
+        Compare req = request;
+        if (null == req) {
+            return INVALID_REVISION;
+        } else {
+            switch (req.getTarget()) {
+                case MOD:
+                    return req.getModRevision();
+                case CREATE:
+                    return req.getCreateRevision();
+                case VERSION:
+                    return req.getVersion();
+                default:
+                    return INVALID_REVISION;
+            }
+        }
+    }
+
+    @Override
+    public void close() {
+        reset();
+        recyclerHandle.recycle(this);
+    }
+}
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/op/proto/ProtoPutOpImpl.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/op/proto/ProtoPutOpImpl.java
index 8ae2b88..cdde245 100644
--- a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/op/proto/ProtoPutOpImpl.java
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/op/proto/ProtoPutOpImpl.java
@@ -42,6 +42,12 @@ public class ProtoPutOpImpl implements PutOp<byte[], byte[]>, PutOption<byte[]>
         return op;
     }
 
+    public static ProtoPutOpImpl newPutOp(PutRequest req) {
+        ProtoPutOpImpl op = RECYCLER.get();
+        op.setPutRequest(req);
+        return op;
+    }
+
     private static final Recycler<ProtoPutOpImpl> RECYCLER = new Recycler<ProtoPutOpImpl>() {
         @Override
         protected ProtoPutOpImpl newObject(Handle<ProtoPutOpImpl> handle) {
@@ -73,6 +79,10 @@ public class ProtoPutOpImpl implements PutOp<byte[], byte[]>, PutOption<byte[]>
         this.req = command.getPutReq();
     }
 
+    public void setPutRequest(PutRequest request) {
+        this.req = request;
+    }
+
     @Override
     public boolean prevKv() {
         return req.getPrevKv();
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/op/proto/ProtoTxnOpImpl.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/op/proto/ProtoTxnOpImpl.java
new file mode 100644
index 0000000..37f675c
--- /dev/null
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/op/proto/ProtoTxnOpImpl.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.statelib.impl.mvcc.op.proto;
+
+import static org.apache.bookkeeper.statelib.impl.mvcc.MVCCUtils.toApiOp;
+
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+import java.util.List;
+import lombok.AccessLevel;
+import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+import lombok.ToString;
+import org.apache.bookkeeper.api.kv.op.CompareOp;
+import org.apache.bookkeeper.api.kv.op.Op;
+import org.apache.bookkeeper.api.kv.op.OpType;
+import org.apache.bookkeeper.api.kv.op.TxnOp;
+import org.apache.bookkeeper.common.collections.RecyclableArrayList;
+import org.apache.bookkeeper.stream.proto.kv.rpc.Compare;
+import org.apache.bookkeeper.stream.proto.kv.rpc.RequestOp;
+import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
+
+/**
+ * A protobuf encoded transaction operation.
+ */
+@RequiredArgsConstructor
+@ToString(exclude = "recyclerHandle")
+@Setter(AccessLevel.PRIVATE)
+public class ProtoTxnOpImpl implements TxnOp<byte[], byte[]> {
+
+    public static ProtoTxnOpImpl newTxnOp(TxnRequest request) {
+        ProtoTxnOpImpl op = RECYCLER.get();
+        op.setRequest(request);
+        RecyclableArrayList<CompareOp<byte[], byte[]>> compareOps = COMPARE_OPS_RECYCLER.newInstance();
+        for (Compare compare : request.getCompareList()) {
+            compareOps.add(ProtoCompareImpl.newCompareOp(compare));
+        }
+        op.setCompareOps(compareOps);
+        RecyclableArrayList<Op<byte[], byte[]>> successOps = OPS_RECYCLER.newInstance();
+        for (RequestOp reqOp : request.getSuccessList()) {
+            successOps.add(toApiOp(reqOp));
+        }
+        op.setSuccessOps(successOps);
+        RecyclableArrayList<Op<byte[], byte[]>> failureOps = OPS_RECYCLER.newInstance();
+        for (RequestOp reqOp : request.getFailureList()) {
+            failureOps.add(toApiOp(reqOp));
+        }
+        return op;
+    }
+
+    private static final Recycler<ProtoTxnOpImpl> RECYCLER = new Recycler<ProtoTxnOpImpl>() {
+        @Override
+        protected ProtoTxnOpImpl newObject(Handle<ProtoTxnOpImpl> handle) {
+            return new ProtoTxnOpImpl(handle);
+        }
+    };
+
+    private static final RecyclableArrayList.Recycler<CompareOp<byte[], byte[]>> COMPARE_OPS_RECYCLER =
+        new RecyclableArrayList.Recycler<>();
+    private static final RecyclableArrayList.Recycler<Op<byte[], byte[]>> OPS_RECYCLER =
+        new RecyclableArrayList.Recycler<>();
+
+    private final Handle<ProtoTxnOpImpl> recyclerHandle;
+    private TxnRequest request;
+    private RecyclableArrayList<CompareOp<byte[], byte[]>> compareOps;
+    private RecyclableArrayList<Op<byte[], byte[]>> successOps;
+    private RecyclableArrayList<Op<byte[], byte[]>> failureOps;
+
+    private void reset() {
+        request = null;
+        if (null != compareOps) {
+            compareOps.forEach(CompareOp::close);
+            compareOps.recycle();
+        }
+        if (null != successOps) {
+            successOps.forEach(Op::close);
+            successOps.recycle();
+        }
+        if (null != failureOps) {
+            failureOps.forEach(Op::close);
+            failureOps.recycle();
+        }
+    }
+
+    @Override
+    public List<CompareOp<byte[], byte[]>> compareOps() {
+        return compareOps;
+    }
+
+    @Override
+    public List<Op<byte[], byte[]>> successOps() {
+        return successOps;
+    }
+
+    @Override
+    public List<Op<byte[], byte[]>> failureOps() {
+        return failureOps;
+    }
+
+    @Override
+    public OpType type() {
+        return OpType.TXN;
+    }
+
+    @Override
+    public void close() {
+        reset();
+        recyclerHandle.recycle(this);
+    }
+}
diff --git a/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/mvcc/TestMVCCAsyncBytesStoreImpl.java b/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/mvcc/TestMVCCAsyncBytesStoreImpl.java
index 610d5d2..5314ed8 100644
--- a/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/mvcc/TestMVCCAsyncBytesStoreImpl.java
+++ b/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/mvcc/TestMVCCAsyncBytesStoreImpl.java
@@ -29,11 +29,11 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import com.google.common.collect.Lists;
-import com.google.common.io.Files;
 import java.io.File;
 import java.net.URI;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.IntStream;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.api.kv.op.PutOp;
@@ -45,7 +45,6 @@ import org.apache.bookkeeper.common.coder.ByteArrayCoder;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.statelib.api.StateStoreSpec;
 import org.apache.bookkeeper.statelib.api.exceptions.MVCCStoreException;
-import org.apache.commons.io.FileUtils;
 import org.apache.distributedlog.DLMTestUtil;
 import org.apache.distributedlog.TestDistributedLogBase;
 import org.apache.distributedlog.api.namespace.Namespace;
@@ -54,7 +53,9 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 /**
  * Unit test of {@link MVCCAsyncBytesStoreImpl}.
@@ -62,6 +63,9 @@ import org.junit.Test;
 @Slf4j
 public class TestMVCCAsyncBytesStoreImpl extends TestDistributedLogBase {
 
+    @Rule
+    public final TemporaryFolder testDir = new TemporaryFolder();
+
     private static URI uri;
     private static Namespace namespace;
 
@@ -96,7 +100,7 @@ public class TestMVCCAsyncBytesStoreImpl extends TestDistributedLogBase {
         super.setup();
         ensureURICreated(uri);
 
-        tempDir = Files.createTempDir();
+        tempDir = testDir.newFolder();
 
         store = new MVCCAsyncBytesStoreImpl(
             () -> new MVCCStoreImpl<>(),
@@ -123,9 +127,6 @@ public class TestMVCCAsyncBytesStoreImpl extends TestDistributedLogBase {
         if (null != store) {
             store.close();
         }
-        if (null != tempDir) {
-            FileUtils.deleteDirectory(tempDir);
-        }
         super.teardown();
     }
 
@@ -423,4 +424,48 @@ public class TestMVCCAsyncBytesStoreImpl extends TestDistributedLogBase {
         assertEquals(endKey + 1, idx);
     }
 
+    @Test
+    public void testReplayJournal() throws Exception {
+        this.streamName = "test-replay-journal";
+        StateStoreSpec spec = initSpec(streamName);
+        result(store.init(spec));
+
+        int numKvs = 10;
+
+        // putIfAbsent
+        IntStream.range(0, numKvs)
+            .forEach(i -> {
+                try {
+                    result(store.putIfAbsent(getKey(i), getValue(100 + i)));
+                } catch (Exception e) {
+                    log.error("Failed to put kv pair ({})", i, e);
+                }
+            });
+
+        log.info("Closing the store '{}' ...", streamName);
+        // close the store
+        store.close();
+        log.info("Closed the store '{}' ...", streamName);
+
+        // open the store again to replay the journal.
+        store = new MVCCAsyncBytesStoreImpl(
+            () -> new MVCCStoreImpl<>(),
+            () -> namespace);
+        spec = StateStoreSpec.builder()
+            .name(streamName)
+            .keyCoder(ByteArrayCoder.of())
+            .valCoder(ByteArrayCoder.of())
+            .stream(streamName)
+            .localStateStoreDir(testDir.newFolder())
+            .build();
+        result(store.init(spec));
+
+        // verify the key/value pairs
+        for (int i = 0; i < numKvs; i++) {
+            byte[] value = result(store.get(getKey(i)));
+            assertNotNull(value);
+            assertArrayEquals(getValue(100 + i), value);
+        }
+    }
+
 }
diff --git a/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/mvcc/op/proto/ProtoCompareImplTest.java b/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/mvcc/op/proto/ProtoCompareImplTest.java
new file mode 100644
index 0000000..6e50baf
--- /dev/null
+++ b/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/mvcc/op/proto/ProtoCompareImplTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.statelib.impl.mvcc.op.proto;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import com.google.protobuf.ByteString;
+import lombok.Cleanup;
+import org.apache.bookkeeper.stream.proto.kv.rpc.Compare;
+import org.apache.bookkeeper.stream.proto.kv.rpc.Compare.CompareResult;
+import org.apache.bookkeeper.stream.proto.kv.rpc.Compare.CompareTarget;
+import org.junit.Test;
+
+/**
+ * Unit test {@link ProtoCompareImpl}.
+ */
+public class ProtoCompareImplTest {
+
+    private static final ByteString KEY = ByteString.copyFromUtf8("test-key");
+    private static final ByteString VAL = ByteString.copyFromUtf8("test-value");
+    private static final long MOD_REV = System.currentTimeMillis();
+    private static final long CREATE_REV = MOD_REV + 1;
+    private static final long VERSION = CREATE_REV + 1;
+
+    @Test
+    public void testCompareEmptyValue() {
+        Compare compare = Compare.newBuilder()
+            .setKey(KEY)
+            .setResult(CompareResult.EQUAL)
+            .setTarget(CompareTarget.VALUE)
+            .build();
+
+        @Cleanup ProtoCompareImpl protoCompare = ProtoCompareImpl.newCompareOp(compare);
+        assertArrayEquals("test-key".getBytes(UTF_8), protoCompare.key());
+        assertNull(protoCompare.value());
+        assertEquals(org.apache.bookkeeper.api.kv.op.CompareResult.EQUAL, protoCompare.result());
+        assertEquals(org.apache.bookkeeper.api.kv.op.CompareTarget.VALUE, protoCompare.target());
+    }
+
+    @Test
+    public void testCompareValue() {
+        Compare compare = Compare.newBuilder()
+            .setKey(KEY)
+            .setValue(VAL)
+            .setResult(CompareResult.EQUAL)
+            .setTarget(CompareTarget.VALUE)
+            .build();
+
+        @Cleanup ProtoCompareImpl protoCompare = ProtoCompareImpl.newCompareOp(compare);
+        assertArrayEquals("test-key".getBytes(UTF_8), protoCompare.key());
+        assertArrayEquals("test-value".getBytes(UTF_8), protoCompare.value());
+        assertEquals(org.apache.bookkeeper.api.kv.op.CompareResult.EQUAL, protoCompare.result());
+        assertEquals(org.apache.bookkeeper.api.kv.op.CompareTarget.VALUE, protoCompare.target());
+    }
+
+    @Test
+    public void testCompareMod() {
+        Compare compare = Compare.newBuilder()
+            .setKey(KEY)
+            .setModRevision(MOD_REV)
+            .setResult(CompareResult.EQUAL)
+            .setTarget(CompareTarget.MOD)
+            .build();
+
+        @Cleanup ProtoCompareImpl protoCompare = ProtoCompareImpl.newCompareOp(compare);
+        assertArrayEquals("test-key".getBytes(UTF_8), protoCompare.key());
+        assertEquals(MOD_REV, protoCompare.revision());
+        assertEquals(org.apache.bookkeeper.api.kv.op.CompareResult.EQUAL, protoCompare.result());
+        assertEquals(org.apache.bookkeeper.api.kv.op.CompareTarget.MOD, protoCompare.target());
+    }
+
+    @Test
+    public void testCompareCreate() {
+        Compare compare = Compare.newBuilder()
+            .setKey(KEY)
+            .setCreateRevision(CREATE_REV)
+            .setResult(CompareResult.EQUAL)
+            .setTarget(CompareTarget.CREATE)
+            .build();
+
+        @Cleanup ProtoCompareImpl protoCompare = ProtoCompareImpl.newCompareOp(compare);
+        assertArrayEquals("test-key".getBytes(UTF_8), protoCompare.key());
+        assertEquals(CREATE_REV, protoCompare.revision());
+        assertEquals(org.apache.bookkeeper.api.kv.op.CompareResult.EQUAL, protoCompare.result());
+        assertEquals(org.apache.bookkeeper.api.kv.op.CompareTarget.CREATE, protoCompare.target());
+    }
+
+    @Test
+    public void testCompareVersion() {
+        Compare compare = Compare.newBuilder()
+            .setKey(KEY)
+            .setVersion(VERSION)
+            .setResult(CompareResult.EQUAL)
+            .setTarget(CompareTarget.VERSION)
+            .build();
+
+        @Cleanup ProtoCompareImpl protoCompare = ProtoCompareImpl.newCompareOp(compare);
+        assertArrayEquals("test-key".getBytes(UTF_8), protoCompare.key());
+        assertEquals(VERSION, protoCompare.revision());
+        assertEquals(org.apache.bookkeeper.api.kv.op.CompareResult.EQUAL, protoCompare.result());
+        assertEquals(org.apache.bookkeeper.api.kv.op.CompareTarget.VERSION, protoCompare.target());
+    }
+
+}

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.