You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/06/04 20:39:59 UTC

[bookkeeper] 01/04: [TABLE SERVICE] Dlog based checkpoint store

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 7f0d8adcdd1b567dae722d14427e28231ed9c2e5
Author: Sijie Guo <si...@apache.org>
AuthorDate: Wed Apr 25 02:13:28 2018 -0700

    [TABLE SERVICE] Dlog based checkpoint store
    
    *Motivation*
    
    Currently the table range stores are using local filesystem as a checkpoint store. It is okay for running as standalone mode.
    But it doesn't work to run in a distributed mode. This change is introducing a dlog based checkpoint store to make sure all
    the sst files of table ranges are durably checkpointed into bookkeeper itself.
    
    *Solution*
    
    Introduced a dlog based checkpoint store.
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Jia Zhai <None>
    
    This closes #1366 from sijie/dlog_checkpoint_manager
---
 .../impl/metadata/ZKLogStreamMetadataStore.java    |   2 +
 .../org/apache/distributedlog/fs/DLFileSystem.java |  15 +-
 .../bookkeeper/stream/server/StorageServer.java    |   2 +
 .../rocksdb/checkpoint/dlog/DLCheckpointStore.java | 154 +++++++++++++
 .../rocksdb/checkpoint/dlog/DLInputStream.java     | 232 +++++++++++++++++++
 .../rocksdb/checkpoint/dlog/DLOutputStream.java    | 142 ++++++++++++
 .../impl/rocksdb/checkpoint/dlog/package-info.java |  23 ++
 .../checkpoint/dlog/DLCheckpointStoreTest.java     | 256 +++++++++++++++++++++
 .../storage/impl/store/MVCCStoreFactoryImpl.java   |  17 +-
 .../impl/store/MVCCStoreFactoryImplTest.java       |   1 +
 10 files changed, 838 insertions(+), 6 deletions(-)

diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
index 1e36356..0e923c6 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
@@ -873,6 +873,8 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore {
                 } else if (Code.NOTEMPTY.intValue() == rc) {
                     future.completeExceptionally(new LockingException(oldLogPath + LOCK_PATH,
                         "Someone is holding a lock on log " + oldLogPath));
+                } else if (Code.NONODE.intValue() == rc) {
+                    future.completeExceptionally(new LogNotFoundException("Log " + newLogPath + " is not found"));
                 } else {
                     future.completeExceptionally(new ZKException("Failed to rename log "
                         + oldLogPath + " to " + newLogPath + " at path " + path, Code.get(rc)));
diff --git a/stream/distributedlog/io/dlfs/src/main/java/org/apache/distributedlog/fs/DLFileSystem.java b/stream/distributedlog/io/dlfs/src/main/java/org/apache/distributedlog/fs/DLFileSystem.java
index 1a056c3..7872052 100644
--- a/stream/distributedlog/io/dlfs/src/main/java/org/apache/distributedlog/fs/DLFileSystem.java
+++ b/stream/distributedlog/io/dlfs/src/main/java/org/apache/distributedlog/fs/DLFileSystem.java
@@ -29,6 +29,7 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.ExecutionException;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.distributedlog.DLSN;
@@ -39,6 +40,7 @@ import org.apache.distributedlog.api.DistributedLogManager;
 import org.apache.distributedlog.api.LogReader;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.apache.distributedlog.exceptions.DLInterruptedException;
 import org.apache.distributedlog.exceptions.LogEmptyException;
 import org.apache.distributedlog.exceptions.LogNotFoundException;
 import org.apache.distributedlog.util.Utils;
@@ -315,7 +317,18 @@ public class DLFileSystem extends FileSystem {
     public boolean rename(Path src, Path dst) throws IOException {
         String srcLog = getStreamName(src);
         String dstLog = getStreamName(dst);
-        namespace.renameLog(srcLog, dstLog);
+        try {
+            namespace.renameLog(srcLog, dstLog).get();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new DLInterruptedException("Interrupted at renaming " + srcLog + " to " + dstLog, e);
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof IOException) {
+                throw (IOException) e.getCause();
+            } else {
+                throw new IOException("Failed to rename " + srcLog + " to " + dstLog, e.getCause());
+            }
+        }
         return true;
     }
 
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
index 69a8e84..b48ea0d 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
@@ -27,6 +27,7 @@ import org.apache.bookkeeper.common.component.ComponentStarter;
 import org.apache.bookkeeper.common.component.LifecycleComponent;
 import org.apache.bookkeeper.common.component.LifecycleComponentStack;
 import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
+import org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.dlog.DLCheckpointStore;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
 import org.apache.bookkeeper.stream.server.conf.BookieConfiguration;
@@ -213,6 +214,7 @@ public class StorageServer {
             .withRangeStoreFactory(
                 new MVCCStoreFactoryImpl(
                     dlNamespaceProvider,
+                    () -> new DLCheckpointStore(dlNamespaceProvider.get()),
                     storageConf.getRangeStoreDirs(),
                     storageResources,
                     storageConf.getServeReadOnlyTables()));
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLCheckpointStore.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLCheckpointStore.java
new file mode 100644
index 0000000..8dc0447
--- /dev/null
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLCheckpointStore.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.dlog;
+
+import com.google.common.collect.Lists;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.NoSuchFileException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.statelib.api.checkpoint.CheckpointStore;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.api.AsyncLogWriter;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogReader;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.exceptions.DLInterruptedException;
+import org.apache.distributedlog.exceptions.LogEmptyException;
+import org.apache.distributedlog.exceptions.LogExistsException;
+import org.apache.distributedlog.exceptions.LogNotFoundException;
+import org.apache.distributedlog.util.Utils;
+
+/**
+ * Dlog based checkpoint store.
+ */
+@Slf4j
+public class DLCheckpointStore implements CheckpointStore {
+
+    private final Namespace namespace;
+
+    public DLCheckpointStore(Namespace namespace) {
+        this.namespace = namespace;
+    }
+
+    @Override
+    public List<String> listFiles(String filePath) throws IOException {
+        return Lists.newArrayList(namespace.getLogs(filePath));
+    }
+
+    @Override
+    public boolean fileExists(String filePath) throws IOException {
+        return namespace.logExists(filePath);
+    }
+
+    @Override
+    public long getFileLength(String filePath) throws IOException {
+        try (DistributedLogManager dlm = namespace.openLog(filePath)) {
+            return dlm.getLastTxId();
+        } catch (LogNotFoundException e) {
+            throw new FileNotFoundException(filePath);
+        } catch (LogEmptyException e) {
+            return 0;
+        }
+    }
+
+    @Override
+    public InputStream openInputStream(String filePath) throws IOException {
+        try {
+            DistributedLogManager dlm = namespace.openLog(filePath);
+            LogReader reader;
+            try {
+                reader = dlm.openLogReader(DLSN.InitialDLSN);
+            } catch (LogNotFoundException | LogEmptyException e) {
+                throw new FileNotFoundException(filePath);
+            }
+            return new BufferedInputStream(
+                new DLInputStream(dlm, reader, 0L), 128 * 1024);
+        } catch (LogNotFoundException e) {
+            throw new FileNotFoundException(filePath);
+        }
+    }
+
+    @Override
+    public OutputStream openOutputStream(String filePath) throws IOException {
+        try {
+            DistributedLogManager dlm = namespace.openLog(
+                filePath);
+            AsyncLogWriter writer = Utils.ioResult(dlm.openAsyncLogWriter());
+            return new BufferedOutputStream(
+                new DLOutputStream(dlm, writer), 128 * 1024);
+        } catch (LogNotFoundException le) {
+            throw new FileNotFoundException(filePath);
+        }
+    }
+
+    @Override
+    public void rename(String srcLog, String dstLog) throws IOException {
+        log.info("Renaming {} to {}", srcLog, dstLog);
+        try {
+            namespace.renameLog(srcLog, dstLog).get();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new DLInterruptedException("Interrupted at renaming " + srcLog + " to " + dstLog, e);
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof LogExistsException) {
+                throw new FileAlreadyExistsException("Dest file already exists : " + dstLog);
+            } else if (e.getCause() instanceof LogNotFoundException) {
+                throw new NoSuchFileException("Src file or dest directory is not found");
+            } else if (e.getCause() instanceof IOException) {
+                throw (IOException) e.getCause();
+            } else {
+                throw new IOException("Failed to rename " + srcLog + " to " + dstLog, e.getCause());
+            }
+        }
+    }
+
+    @Override
+    public void deleteRecursively(String srcPath) throws IOException {
+        Iterator<String> logs = namespace.getLogs(srcPath);
+        while (logs.hasNext()) {
+            String child = logs.next();
+            deleteRecursively(srcPath + "/" + child);
+        }
+        namespace.deleteLog(srcPath);
+    }
+
+    @Override
+    public void delete(String srcPath) throws IOException {
+        namespace.deleteLog(srcPath);
+    }
+
+    @Override
+    public void createDirectories(String srcPath) throws IOException {
+        namespace.createLog(srcPath);
+    }
+
+    @Override
+    public void close() {
+        namespace.close();
+    }
+}
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLInputStream.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLInputStream.java
new file mode 100644
index 0000000..c9e5fa8
--- /dev/null
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLInputStream.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.dlog;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.io.InputStream;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.distributedlog.LogRecordWithDLSN;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogReader;
+
+/**
+ * The input stream for a distributedlog stream.
+ */
+@Slf4j
+class DLInputStream extends InputStream {
+
+    private static final long REOPEN_READER_SKIP_BYTES = 4 * 1024 * 1024; // 4MB
+
+    private static class RecordStream {
+
+        private final InputStream payloadStream;
+        private final LogRecordWithDLSN record;
+
+        RecordStream(LogRecordWithDLSN record) {
+            checkNotNull(record);
+
+            this.record = record;
+            this.payloadStream = record.getPayLoadInputStream();
+        }
+
+    }
+
+    private static RecordStream nextRecordStream(LogReader reader) throws IOException {
+        LogRecordWithDLSN record = reader.readNext(false);
+        if (null != record) {
+            return new RecordStream(record);
+        }
+        return null;
+    }
+
+    private final DistributedLogManager dlm;
+    private LogReader reader;
+    private long pos;
+    private long lastPos;
+    private RecordStream currentRecord = null;
+
+    DLInputStream(DistributedLogManager dlm,
+                  LogReader reader,
+                  long startPos)
+            throws IOException {
+        this.dlm = dlm;
+        this.reader = reader;
+        this.pos = startPos;
+        this.lastPos = readEndPos();
+        seek(startPos);
+    }
+
+    @Override
+    public void close() throws IOException {
+        reader.close();
+        dlm.close();
+    }
+
+    private long readEndPos() throws IOException {
+        return dlm.getLastTxId();
+    }
+
+    //
+    // FSInputStream
+    //
+
+    public void seek(long pos) throws IOException {
+        if (this.pos == pos) {
+            return;
+        }
+
+        if (this.pos > pos || (pos - this.pos) >= REOPEN_READER_SKIP_BYTES) {
+            // close the previous reader
+            this.reader.close();
+            this.reader = dlm.openLogReader(pos);
+            this.currentRecord = null;
+        }
+
+        skipTo(pos);
+    }
+
+    private boolean skipTo(final long position) throws IOException {
+        while (true) {
+            if (null == currentRecord) {
+                currentRecord = nextRecordStream(reader);
+            }
+
+            if (null == currentRecord) { // the stream is empty now
+                return false;
+            }
+
+            long endPos = currentRecord.record.getTransactionId();
+            if (endPos < position) {
+                currentRecord = nextRecordStream(reader);
+                this.pos = endPos;
+                continue;
+            } else if (endPos == position){
+                // find the record, but we defer read next record when actual read happens
+                this.pos = position;
+                this.currentRecord = null;
+                return true;
+            } else {
+                this.currentRecord.payloadStream.skip(
+                    this.currentRecord.payloadStream.available() - (endPos - position));
+                this.pos = position;
+                return true;
+            }
+        }
+    }
+
+    //
+    // Input Stream
+    //
+
+    @Override
+    public int read(byte[] b, final int off, final int len) throws IOException {
+        int remaining = len;
+        int numBytesRead = 0;
+        while (remaining > 0) {
+            if (null == currentRecord) {
+                currentRecord = nextRecordStream(reader);
+            }
+
+            if (null == currentRecord) {
+                if (numBytesRead == 0) {
+                    return -1;
+                }
+                break;
+            }
+
+            int bytesLeft = currentRecord.payloadStream.available();
+            if (bytesLeft <= 0) {
+                currentRecord.payloadStream.close();
+                currentRecord = null;
+                continue;
+            }
+
+            int numBytesToRead = Math.min(bytesLeft, remaining);
+            int numBytes = currentRecord.payloadStream.read(b, off + numBytesRead, numBytesToRead);
+            if (numBytes < 0) {
+                continue;
+            }
+            numBytesRead += numBytes;
+            remaining -= numBytes;
+        }
+        return numBytesRead;
+    }
+
+    @Override
+    public long skip(final long n) throws IOException {
+        if (n <= 0L) {
+            return 0L;
+        }
+
+        long remaining = n;
+        while (true) {
+            if (null == currentRecord) {
+                currentRecord = nextRecordStream(reader);
+            }
+
+            if (null == currentRecord) { // end of stream
+                return n - remaining;
+            }
+
+            int bytesLeft = currentRecord.payloadStream.available();
+            long endPos = currentRecord.record.getTransactionId();
+            if (remaining > bytesLeft) {
+                // skip the whole record
+                remaining -= bytesLeft;
+                this.pos = endPos;
+                this.currentRecord = nextRecordStream(reader);
+                continue;
+            } else if (remaining == bytesLeft) {
+                this.pos = endPos;
+                this.currentRecord = null;
+                return n;
+            } else {
+                currentRecord.payloadStream.skip(remaining);
+                this.pos = endPos - currentRecord.payloadStream.available();
+                return n;
+            }
+        }
+    }
+
+    @Override
+    public int available() throws IOException {
+        if (lastPos - pos == 0L) {
+            lastPos = readEndPos();
+        }
+        return (int) (lastPos - pos);
+    }
+
+    @Override
+    public boolean markSupported() {
+        return false;
+    }
+
+    @Override
+    public int read() throws IOException {
+        byte[] data = new byte[1];
+        int numBytes = read(data);
+        if (numBytes <= 0) {
+            return -1;
+        }
+        return data[0];
+    }
+
+}
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLOutputStream.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLOutputStream.java
new file mode 100644
index 0000000..8fe6200
--- /dev/null
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLOutputStream.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.dlog;
+
+import static com.google.common.base.Charsets.UTF_8;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.concurrent.FutureEventListener;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.LogRecord;
+import org.apache.distributedlog.api.AsyncLogWriter;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.exceptions.UnexpectedException;
+import org.apache.distributedlog.util.Utils;
+
+/**
+ * DistributedLog Output Stream.
+ */
+@Slf4j
+class DLOutputStream extends OutputStream {
+
+    private static final byte[] CONTROL_RECORD_CONTENT = "control".getBytes(UTF_8);
+
+    private final DistributedLogManager dlm;
+    private final AsyncLogWriter writer;
+
+    // positions
+    private final long[] syncPos = new long[1];
+    private long writePos = 0L;
+
+    // state
+    private static final AtomicReferenceFieldUpdater<DLOutputStream, Throwable> exceptionUpdater =
+        AtomicReferenceFieldUpdater.newUpdater(DLOutputStream.class, Throwable.class, "exception");
+    private volatile Throwable exception = null;
+
+    DLOutputStream(DistributedLogManager dlm,
+                   AsyncLogWriter writer) {
+        this.dlm = dlm;
+        this.writer = writer;
+        this.writePos = writer.getLastTxId() < 0L ? 0L : writer.getLastTxId();
+        this.syncPos[0] = writePos;
+    }
+
+    public synchronized long position() {
+        return syncPos[0];
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        byte[] data = new byte[] { (byte) b };
+        write(data);
+    }
+
+    @Override
+    public void write(byte[] b) throws IOException {
+        write(Unpooled.wrappedBuffer(b));
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        write(Unpooled.wrappedBuffer(b, off, len));
+    }
+
+    private synchronized void write(ByteBuf buf) throws IOException {
+        Throwable cause = exceptionUpdater.get(this);
+        if (null != cause) {
+            if (cause instanceof IOException) {
+                throw (IOException) cause;
+            } else {
+                throw new UnexpectedException("Encountered unknown issue", cause);
+            }
+        }
+
+        writePos += buf.readableBytes();
+        LogRecord record = new LogRecord(writePos, buf);
+        writer.write(record).whenComplete(new FutureEventListener<DLSN>() {
+            @Override
+            public void onSuccess(DLSN value) {
+                synchronized (syncPos) {
+                    syncPos[0] = record.getTransactionId();
+                }
+            }
+
+            @Override
+            public void onFailure(Throwable cause) {
+                exceptionUpdater.compareAndSet(DLOutputStream.this, null, cause);
+            }
+        });
+    }
+
+    private CompletableFuture<DLSN> writeControlRecord() {
+        LogRecord record;
+        synchronized (this) {
+            record = new LogRecord(writePos, Unpooled.wrappedBuffer(CONTROL_RECORD_CONTENT));
+            record.setControl();
+        }
+        return writer.write(record);
+    }
+
+    @Override
+    public void flush() throws IOException {
+        try {
+            FutureUtils.result(writeControlRecord());
+        } catch (IOException ioe) {
+            throw ioe;
+        } catch (Exception e) {
+            log.error("Unexpected exception in DLOutputStream", e);
+            throw new UnexpectedException("unexpected exception in DLOutputStream#flush()", e);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        Utils.ioResult(
+            writeControlRecord()
+                .thenCompose(ignored -> writer.asyncClose())
+                .thenCompose(ignored -> dlm.asyncClose()));
+    }
+}
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/package-info.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/package-info.java
new file mode 100644
index 0000000..697e13a
--- /dev/null
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Dlog based checkpoint manager.
+ */
+package org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.dlog;
\ No newline at end of file
diff --git a/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLCheckpointStoreTest.java b/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLCheckpointStoreTest.java
new file mode 100644
index 0000000..33f342d
--- /dev/null
+++ b/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLCheckpointStoreTest.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.dlog;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.ByteStreams;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.nio.file.FileAlreadyExistsException;
+import java.util.Collections;
+import java.util.List;
+import org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.fs.FSCheckpointManager;
+import org.apache.distributedlog.DLMTestUtil;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.TestDistributedLogBase;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+/**
+ * Unit test of {@link FSCheckpointManager}.
+ */
+public class DLCheckpointStoreTest extends TestDistributedLogBase {
+
+    private static final byte[] TEST_BYTES = "dlog-checkpoint-manager".getBytes(UTF_8);
+
+    @Rule
+    public final TestName runtime = new TestName();
+
+    private URI uri;
+    private Namespace namespace;
+    private DLCheckpointStore store;
+
+    @BeforeClass
+    public static void setupDL() throws Exception {
+        setupCluster();
+    }
+
+    @AfterClass
+    public static void teardownDL() throws Exception {
+        teardownCluster();
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        this.uri = DLMTestUtil.createDLMURI(zkPort, "/" + runtime.getMethodName());
+        ensureURICreated(this.uri);
+        this.namespace = NamespaceBuilder.newBuilder()
+            .conf(new DistributedLogConfiguration())
+            .uri(uri)
+            .build();
+        this.store = new DLCheckpointStore(namespace);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (null != store) {
+            store.close();
+        }
+    }
+
+    @Test
+    public void testListFilesEmpty() throws Exception {
+        // create a dummy log stream to ensure "dir" exists
+        namespace.createLog(runtime.getMethodName());
+        assertTrue(store.listFiles(runtime.getMethodName()).isEmpty());
+    }
+
+    @Test
+    public void testListFilesNotFound() throws Exception {
+        assertTrue(store.listFiles(runtime.getMethodName()).isEmpty());
+    }
+
+    @Test
+    public void testListFiles() throws Exception {
+        int numFiles = 3;
+        List<String> expectedFiles = Lists.newArrayListWithExpectedSize(3);
+
+        namespace.createLog(runtime.getMethodName());
+        for (int i = 0; i < numFiles; ++i) {
+            String filename = runtime.getMethodName() + "-" + i;
+            expectedFiles.add(filename);
+            namespace.createLog(runtime.getMethodName() + "/" + filename);
+        }
+        List<String> files = store.listFiles(runtime.getMethodName());
+        Collections.sort(files);
+
+        assertEquals(expectedFiles, files);
+    }
+
+    @Test
+    public void testFileExists() throws Exception {
+        namespace.createLog(runtime.getMethodName() + "/test");
+        assertTrue(store.fileExists(runtime.getMethodName() + "/test"));
+        assertFalse(store.fileExists(runtime.getMethodName() + "/test2"));
+    }
+
+    @Test
+    public void testFileRename() throws Exception {
+        namespace.createLog("src");
+        namespace.createLog("dest");
+
+        String srcFilePath = "src/" + runtime.getMethodName();
+        String destFilePath = "dest/" + runtime.getMethodName();
+        OutputStream os = store.openOutputStream(srcFilePath);
+        os.write(TEST_BYTES);
+        os.flush();
+        os.close();
+
+        store.rename(srcFilePath, destFilePath);
+        assertTrue(store.fileExists(destFilePath));
+        assertFalse(store.fileExists(srcFilePath));
+
+        assertEquals(TEST_BYTES.length, store.getFileLength(destFilePath));
+
+        try (InputStream is = store.openInputStream(destFilePath)) {
+            byte[] readBytes = new byte[TEST_BYTES.length];
+            ByteStreams.readFully(is, readBytes);
+
+            assertArrayEquals(TEST_BYTES, readBytes);
+        }
+    }
+
+    @Test
+    public void testFileRenameDirNotExists() throws Exception {
+        namespace.createLog("src");
+        assertFalse(store.fileExists("dest"));
+
+        String srcFilePath = "src/" + runtime.getMethodName();
+        String destFilePath = "dest/" + runtime.getMethodName();
+
+        assertFalse(store.fileExists(srcFilePath));
+
+        OutputStream os = store.openOutputStream(srcFilePath);
+        os.write(TEST_BYTES);
+        os.flush();
+        os.close();
+
+        // rename will automatically create stream path in dlog
+        store.rename(srcFilePath, destFilePath);
+        assertTrue(store.fileExists(destFilePath));
+        assertFalse(store.fileExists(srcFilePath));
+
+        assertEquals(TEST_BYTES.length, store.getFileLength(destFilePath));
+
+        try (InputStream is = store.openInputStream(destFilePath)) {
+            byte[] readBytes = new byte[TEST_BYTES.length];
+            ByteStreams.readFully(is, readBytes);
+
+            assertArrayEquals(TEST_BYTES, readBytes);
+        }
+    }
+
+    @Test
+    public void testFileRenameFileExists() throws Exception {
+        namespace.createLog("src");
+        assertFalse(store.fileExists("dest"));
+
+        String srcFilePath = "src/" + runtime.getMethodName();
+        String destFilePath = "dest/" + runtime.getMethodName();
+        namespace.createLog(destFilePath);
+        assertTrue(store.fileExists(destFilePath));
+
+        assertFalse(store.fileExists(srcFilePath));
+
+        OutputStream os = store.openOutputStream(srcFilePath);
+        os.write(TEST_BYTES);
+        os.flush();
+        os.close();
+
+        assertTrue(store.fileExists(srcFilePath));
+
+        try {
+            store.rename(srcFilePath, destFilePath);
+            fail("Should fail to rename if the dest dir doesn't exist");
+        } catch (FileAlreadyExistsException e) {
+            // expected
+        }
+        assertTrue(store.fileExists(destFilePath));
+        assertTrue(store.fileExists(srcFilePath));
+        assertEquals(0, store.getFileLength(destFilePath));
+    }
+
+    @Test
+    public void testDelete() throws Exception {
+        int numFiles = 3;
+        List<String> expectedFiles = Lists.newArrayListWithExpectedSize(3);
+
+        namespace.createLog(runtime.getMethodName());
+        for (int i = 0; i < numFiles; ++i) {
+            String filename = runtime.getMethodName() + "-" + i;
+            expectedFiles.add(filename);
+            namespace.createLog(runtime.getMethodName() + "/" + filename);
+        }
+        List<String> files = store.listFiles(runtime.getMethodName());
+        Collections.sort(files);
+
+        assertEquals(expectedFiles, files);
+
+        store.delete(runtime.getMethodName());
+
+        assertFalse(store.fileExists(runtime.getMethodName()));
+    }
+
+    @Test
+    public void testDeleteRecursively() throws Exception {
+        int numFiles = 3;
+        List<String> expectedFiles = Lists.newArrayListWithExpectedSize(3);
+
+        namespace.createLog(runtime.getMethodName());
+        for (int i = 0; i < numFiles; ++i) {
+            String filename = runtime.getMethodName() + "-" + i;
+            expectedFiles.add(filename);
+            namespace.createLog(runtime.getMethodName() + "/" + filename);
+        }
+        List<String> files = store.listFiles(runtime.getMethodName());
+        Collections.sort(files);
+
+        assertEquals(expectedFiles, files);
+
+        store.delete(runtime.getMethodName());
+
+        assertFalse(store.fileExists(runtime.getMethodName()));
+    }
+
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImpl.java
index 2c80a25..28f9955 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImpl.java
@@ -41,7 +41,6 @@ import org.apache.bookkeeper.statelib.StateStores;
 import org.apache.bookkeeper.statelib.api.StateStoreSpec;
 import org.apache.bookkeeper.statelib.api.checkpoint.CheckpointStore;
 import org.apache.bookkeeper.statelib.api.mvcc.MVCCAsyncStore;
-import org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.fs.FSCheckpointManager;
 import org.apache.bookkeeper.stream.protocol.RangeId;
 import org.apache.bookkeeper.stream.storage.StorageResources;
 import org.apache.distributedlog.api.namespace.Namespace;
@@ -67,13 +66,15 @@ public class MVCCStoreFactoryImpl implements MVCCStoreFactory {
     // dirs
     private final File[] localStateDirs;
     // checkpoint manager
-    private final CheckpointStore checkpointStore;
+    private final Supplier<CheckpointStore> checkpointStoreSupplier;
+    private CheckpointStore checkpointStore;
     // stores
     private final Map<Long, Map<RangeId, MVCCAsyncStore<byte[], byte[]>>> stores;
     private final boolean serveReadOnlyTable;
     private boolean closed = false;
 
     public MVCCStoreFactoryImpl(Supplier<Namespace> namespaceSupplier,
+                                Supplier<CheckpointStore> checkpointStoreSupplier,
                                 File[] localStoreDirs,
                                 StorageResources storageResources,
                                 boolean serveReadOnlyTable) {
@@ -86,8 +87,7 @@ public class MVCCStoreFactoryImpl implements MVCCStoreFactory {
         this.checkpointScheduler =
             SharedResourceManager.shared().get(storageResources.checkpointScheduler());
         this.localStateDirs = localStoreDirs;
-        // TODO: change this cto dlog based checkpoint manager
-        this.checkpointStore = new FSCheckpointManager(new File(localStoreDirs[0], "checkpoints"));
+        this.checkpointStoreSupplier = checkpointStoreSupplier;
         this.stores = Maps.newHashMap();
         this.serveReadOnlyTable = serveReadOnlyTable;
     }
@@ -185,6 +185,10 @@ public class MVCCStoreFactoryImpl implements MVCCStoreFactory {
             normalizedName(streamId),
             normalizedName(rangeId));
 
+        if (null == checkpointStore) {
+            checkpointStore = checkpointStoreSupplier.get();
+        }
+
         // build a spec
         StateStoreSpec spec = StateStoreSpec.builder()
             .name(storeName)
@@ -247,7 +251,10 @@ public class MVCCStoreFactoryImpl implements MVCCStoreFactory {
         } catch (Exception e) {
             log.info("Encountered issue on closing all the range stores opened by this range factory");
         }
-        checkpointStore.close();
+        if (null != checkpointStore) {
+            checkpointStore.close();
+            checkpointStore = null;
+        }
 
         SharedResourceManager.shared().release(
             storageResources.ioWriteScheduler(), writeIOScheduler);
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImplTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImplTest.java
index e5c36ee..9372adf 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImplTest.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImplTest.java
@@ -99,6 +99,7 @@ public class MVCCStoreFactoryImplTest {
                 .build());
         this.factory = new MVCCStoreFactoryImpl(
             () -> namespace,
+            () -> new FSCheckpointManager(new File(storeDirs[0], "checkpoints")),
             storeDirs,
             resources,
             false);

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.