You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/04/25 09:13:39 UTC

[GitHub] sijie closed pull request #1366: [Table Service] Dlog based checkpoint store

sijie closed pull request #1366: [Table Service] Dlog based checkpoint store
URL: https://github.com/apache/bookkeeper/pull/1366
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
index 1e36356b2..0e923c661 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
@@ -873,6 +873,8 @@ private static void existPath(ZooKeeper zk,
                 } else if (Code.NOTEMPTY.intValue() == rc) {
                     future.completeExceptionally(new LockingException(oldLogPath + LOCK_PATH,
                         "Someone is holding a lock on log " + oldLogPath));
+                } else if (Code.NONODE.intValue() == rc) {
+                    future.completeExceptionally(new LogNotFoundException("Log " + newLogPath + " is not found"));
                 } else {
                     future.completeExceptionally(new ZKException("Failed to rename log "
                         + oldLogPath + " to " + newLogPath + " at path " + path, Code.get(rc)));
diff --git a/stream/distributedlog/io/dlfs/src/main/java/org/apache/distributedlog/fs/DLFileSystem.java b/stream/distributedlog/io/dlfs/src/main/java/org/apache/distributedlog/fs/DLFileSystem.java
index 1a056c3c9..78720529f 100644
--- a/stream/distributedlog/io/dlfs/src/main/java/org/apache/distributedlog/fs/DLFileSystem.java
+++ b/stream/distributedlog/io/dlfs/src/main/java/org/apache/distributedlog/fs/DLFileSystem.java
@@ -29,6 +29,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.ExecutionException;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.distributedlog.DLSN;
@@ -39,6 +40,7 @@
 import org.apache.distributedlog.api.LogReader;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.apache.distributedlog.exceptions.DLInterruptedException;
 import org.apache.distributedlog.exceptions.LogEmptyException;
 import org.apache.distributedlog.exceptions.LogNotFoundException;
 import org.apache.distributedlog.util.Utils;
@@ -315,7 +317,18 @@ public FileStatus getFileStatus(Path path) throws IOException {
     public boolean rename(Path src, Path dst) throws IOException {
         String srcLog = getStreamName(src);
         String dstLog = getStreamName(dst);
-        namespace.renameLog(srcLog, dstLog);
+        try {
+            namespace.renameLog(srcLog, dstLog).get();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new DLInterruptedException("Interrupted at renaming " + srcLog + " to " + dstLog, e);
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof IOException) {
+                throw (IOException) e.getCause();
+            } else {
+                throw new IOException("Failed to rename " + srcLog + " to " + dstLog, e.getCause());
+            }
+        }
         return true;
     }
 
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
index 69a8e848d..b48ea0d5c 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
@@ -27,6 +27,7 @@
 import org.apache.bookkeeper.common.component.LifecycleComponent;
 import org.apache.bookkeeper.common.component.LifecycleComponentStack;
 import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
+import org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.dlog.DLCheckpointStore;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stream.proto.common.Endpoint;
 import org.apache.bookkeeper.stream.server.conf.BookieConfiguration;
@@ -213,6 +214,7 @@ public static LifecycleComponent startStorageServer(CompositeConfiguration conf,
             .withRangeStoreFactory(
                 new MVCCStoreFactoryImpl(
                     dlNamespaceProvider,
+                    () -> new DLCheckpointStore(dlNamespaceProvider.get()),
                     storageConf.getRangeStoreDirs(),
                     storageResources,
                     storageConf.getServeReadOnlyTables()));
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLCheckpointStore.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLCheckpointStore.java
new file mode 100644
index 000000000..8dc044721
--- /dev/null
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLCheckpointStore.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.dlog;
+
+import com.google.common.collect.Lists;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.NoSuchFileException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.statelib.api.checkpoint.CheckpointStore;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.api.AsyncLogWriter;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogReader;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.exceptions.DLInterruptedException;
+import org.apache.distributedlog.exceptions.LogEmptyException;
+import org.apache.distributedlog.exceptions.LogExistsException;
+import org.apache.distributedlog.exceptions.LogNotFoundException;
+import org.apache.distributedlog.util.Utils;
+
+/**
+ * Dlog based checkpoint store.
+ */
+@Slf4j
+public class DLCheckpointStore implements CheckpointStore {
+
+    private final Namespace namespace;
+
+    public DLCheckpointStore(Namespace namespace) {
+        this.namespace = namespace;
+    }
+
+    @Override
+    public List<String> listFiles(String filePath) throws IOException {
+        return Lists.newArrayList(namespace.getLogs(filePath));
+    }
+
+    @Override
+    public boolean fileExists(String filePath) throws IOException {
+        return namespace.logExists(filePath);
+    }
+
+    @Override
+    public long getFileLength(String filePath) throws IOException {
+        try (DistributedLogManager dlm = namespace.openLog(filePath)) {
+            return dlm.getLastTxId();
+        } catch (LogNotFoundException e) {
+            throw new FileNotFoundException(filePath);
+        } catch (LogEmptyException e) {
+            return 0;
+        }
+    }
+
+    @Override
+    public InputStream openInputStream(String filePath) throws IOException {
+        try {
+            DistributedLogManager dlm = namespace.openLog(filePath);
+            LogReader reader;
+            try {
+                reader = dlm.openLogReader(DLSN.InitialDLSN);
+            } catch (LogNotFoundException | LogEmptyException e) {
+                throw new FileNotFoundException(filePath);
+            }
+            return new BufferedInputStream(
+                new DLInputStream(dlm, reader, 0L), 128 * 1024);
+        } catch (LogNotFoundException e) {
+            throw new FileNotFoundException(filePath);
+        }
+    }
+
+    @Override
+    public OutputStream openOutputStream(String filePath) throws IOException {
+        try {
+            DistributedLogManager dlm = namespace.openLog(
+                filePath);
+            AsyncLogWriter writer = Utils.ioResult(dlm.openAsyncLogWriter());
+            return new BufferedOutputStream(
+                new DLOutputStream(dlm, writer), 128 * 1024);
+        } catch (LogNotFoundException le) {
+            throw new FileNotFoundException(filePath);
+        }
+    }
+
+    @Override
+    public void rename(String srcLog, String dstLog) throws IOException {
+        log.info("Renaming {} to {}", srcLog, dstLog);
+        try {
+            namespace.renameLog(srcLog, dstLog).get();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new DLInterruptedException("Interrupted at renaming " + srcLog + " to " + dstLog, e);
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof LogExistsException) {
+                throw new FileAlreadyExistsException("Dest file already exists : " + dstLog);
+            } else if (e.getCause() instanceof LogNotFoundException) {
+                throw new NoSuchFileException("Src file or dest directory is not found");
+            } else if (e.getCause() instanceof IOException) {
+                throw (IOException) e.getCause();
+            } else {
+                throw new IOException("Failed to rename " + srcLog + " to " + dstLog, e.getCause());
+            }
+        }
+    }
+
+    @Override
+    public void deleteRecursively(String srcPath) throws IOException {
+        Iterator<String> logs = namespace.getLogs(srcPath);
+        while (logs.hasNext()) {
+            String child = logs.next();
+            deleteRecursively(srcPath + "/" + child);
+        }
+        namespace.deleteLog(srcPath);
+    }
+
+    @Override
+    public void delete(String srcPath) throws IOException {
+        namespace.deleteLog(srcPath);
+    }
+
+    @Override
+    public void createDirectories(String srcPath) throws IOException {
+        namespace.createLog(srcPath);
+    }
+
+    @Override
+    public void close() {
+        namespace.close();
+    }
+}
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLInputStream.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLInputStream.java
new file mode 100644
index 000000000..c9e5fa8d6
--- /dev/null
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLInputStream.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.dlog;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.io.InputStream;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.distributedlog.LogRecordWithDLSN;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogReader;
+
+/**
+ * The input stream for a distributedlog stream.
+ */
+@Slf4j
+class DLInputStream extends InputStream {
+
+    private static final long REOPEN_READER_SKIP_BYTES = 4 * 1024 * 1024; // 4MB
+
+    private static class RecordStream {
+
+        private final InputStream payloadStream;
+        private final LogRecordWithDLSN record;
+
+        RecordStream(LogRecordWithDLSN record) {
+            checkNotNull(record);
+
+            this.record = record;
+            this.payloadStream = record.getPayLoadInputStream();
+        }
+
+    }
+
+    private static RecordStream nextRecordStream(LogReader reader) throws IOException {
+        LogRecordWithDLSN record = reader.readNext(false);
+        if (null != record) {
+            return new RecordStream(record);
+        }
+        return null;
+    }
+
+    private final DistributedLogManager dlm;
+    private LogReader reader;
+    private long pos;
+    private long lastPos;
+    private RecordStream currentRecord = null;
+
+    DLInputStream(DistributedLogManager dlm,
+                  LogReader reader,
+                  long startPos)
+            throws IOException {
+        this.dlm = dlm;
+        this.reader = reader;
+        this.pos = startPos;
+        this.lastPos = readEndPos();
+        seek(startPos);
+    }
+
+    @Override
+    public void close() throws IOException {
+        reader.close();
+        dlm.close();
+    }
+
+    private long readEndPos() throws IOException {
+        return dlm.getLastTxId();
+    }
+
+    //
+    // FSInputStream
+    //
+
+    public void seek(long pos) throws IOException {
+        if (this.pos == pos) {
+            return;
+        }
+
+        if (this.pos > pos || (pos - this.pos) >= REOPEN_READER_SKIP_BYTES) {
+            // close the previous reader
+            this.reader.close();
+            this.reader = dlm.openLogReader(pos);
+            this.currentRecord = null;
+        }
+
+        skipTo(pos);
+    }
+
+    private boolean skipTo(final long position) throws IOException {
+        while (true) {
+            if (null == currentRecord) {
+                currentRecord = nextRecordStream(reader);
+            }
+
+            if (null == currentRecord) { // the stream is empty now
+                return false;
+            }
+
+            long endPos = currentRecord.record.getTransactionId();
+            if (endPos < position) {
+                currentRecord = nextRecordStream(reader);
+                this.pos = endPos;
+                continue;
+            } else if (endPos == position){
+                // find the record, but we defer read next record when actual read happens
+                this.pos = position;
+                this.currentRecord = null;
+                return true;
+            } else {
+                this.currentRecord.payloadStream.skip(
+                    this.currentRecord.payloadStream.available() - (endPos - position));
+                this.pos = position;
+                return true;
+            }
+        }
+    }
+
+    //
+    // Input Stream
+    //
+
+    @Override
+    public int read(byte[] b, final int off, final int len) throws IOException {
+        int remaining = len;
+        int numBytesRead = 0;
+        while (remaining > 0) {
+            if (null == currentRecord) {
+                currentRecord = nextRecordStream(reader);
+            }
+
+            if (null == currentRecord) {
+                if (numBytesRead == 0) {
+                    return -1;
+                }
+                break;
+            }
+
+            int bytesLeft = currentRecord.payloadStream.available();
+            if (bytesLeft <= 0) {
+                currentRecord.payloadStream.close();
+                currentRecord = null;
+                continue;
+            }
+
+            int numBytesToRead = Math.min(bytesLeft, remaining);
+            int numBytes = currentRecord.payloadStream.read(b, off + numBytesRead, numBytesToRead);
+            if (numBytes < 0) {
+                continue;
+            }
+            numBytesRead += numBytes;
+            remaining -= numBytes;
+        }
+        return numBytesRead;
+    }
+
+    @Override
+    public long skip(final long n) throws IOException {
+        if (n <= 0L) {
+            return 0L;
+        }
+
+        long remaining = n;
+        while (true) {
+            if (null == currentRecord) {
+                currentRecord = nextRecordStream(reader);
+            }
+
+            if (null == currentRecord) { // end of stream
+                return n - remaining;
+            }
+
+            int bytesLeft = currentRecord.payloadStream.available();
+            long endPos = currentRecord.record.getTransactionId();
+            if (remaining > bytesLeft) {
+                // skip the whole record
+                remaining -= bytesLeft;
+                this.pos = endPos;
+                this.currentRecord = nextRecordStream(reader);
+                continue;
+            } else if (remaining == bytesLeft) {
+                this.pos = endPos;
+                this.currentRecord = null;
+                return n;
+            } else {
+                currentRecord.payloadStream.skip(remaining);
+                this.pos = endPos - currentRecord.payloadStream.available();
+                return n;
+            }
+        }
+    }
+
+    @Override
+    public int available() throws IOException {
+        if (lastPos - pos == 0L) {
+            lastPos = readEndPos();
+        }
+        return (int) (lastPos - pos);
+    }
+
+    @Override
+    public boolean markSupported() {
+        return false;
+    }
+
+    @Override
+    public int read() throws IOException {
+        byte[] data = new byte[1];
+        int numBytes = read(data);
+        if (numBytes <= 0) {
+            return -1;
+        }
+        return data[0];
+    }
+
+}
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLOutputStream.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLOutputStream.java
new file mode 100644
index 000000000..8fe62007a
--- /dev/null
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLOutputStream.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.dlog;
+
+import static com.google.common.base.Charsets.UTF_8;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.concurrent.FutureEventListener;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.LogRecord;
+import org.apache.distributedlog.api.AsyncLogWriter;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.exceptions.UnexpectedException;
+import org.apache.distributedlog.util.Utils;
+
+/**
+ * DistributedLog Output Stream.
+ */
+@Slf4j
+class DLOutputStream extends OutputStream {
+
+    private static final byte[] CONTROL_RECORD_CONTENT = "control".getBytes(UTF_8);
+
+    private final DistributedLogManager dlm;
+    private final AsyncLogWriter writer;
+
+    // positions
+    private final long[] syncPos = new long[1];
+    private long writePos = 0L;
+
+    // state
+    private static final AtomicReferenceFieldUpdater<DLOutputStream, Throwable> exceptionUpdater =
+        AtomicReferenceFieldUpdater.newUpdater(DLOutputStream.class, Throwable.class, "exception");
+    private volatile Throwable exception = null;
+
+    DLOutputStream(DistributedLogManager dlm,
+                   AsyncLogWriter writer) {
+        this.dlm = dlm;
+        this.writer = writer;
+        this.writePos = writer.getLastTxId() < 0L ? 0L : writer.getLastTxId();
+        this.syncPos[0] = writePos;
+    }
+
+    public synchronized long position() {
+        return syncPos[0];
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        byte[] data = new byte[] { (byte) b };
+        write(data);
+    }
+
+    @Override
+    public void write(byte[] b) throws IOException {
+        write(Unpooled.wrappedBuffer(b));
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        write(Unpooled.wrappedBuffer(b, off, len));
+    }
+
+    private synchronized void write(ByteBuf buf) throws IOException {
+        Throwable cause = exceptionUpdater.get(this);
+        if (null != cause) {
+            if (cause instanceof IOException) {
+                throw (IOException) cause;
+            } else {
+                throw new UnexpectedException("Encountered unknown issue", cause);
+            }
+        }
+
+        writePos += buf.readableBytes();
+        LogRecord record = new LogRecord(writePos, buf);
+        writer.write(record).whenComplete(new FutureEventListener<DLSN>() {
+            @Override
+            public void onSuccess(DLSN value) {
+                synchronized (syncPos) {
+                    syncPos[0] = record.getTransactionId();
+                }
+            }
+
+            @Override
+            public void onFailure(Throwable cause) {
+                exceptionUpdater.compareAndSet(DLOutputStream.this, null, cause);
+            }
+        });
+    }
+
+    private CompletableFuture<DLSN> writeControlRecord() {
+        LogRecord record;
+        synchronized (this) {
+            record = new LogRecord(writePos, Unpooled.wrappedBuffer(CONTROL_RECORD_CONTENT));
+            record.setControl();
+        }
+        return writer.write(record);
+    }
+
+    @Override
+    public void flush() throws IOException {
+        try {
+            FutureUtils.result(writeControlRecord());
+        } catch (IOException ioe) {
+            throw ioe;
+        } catch (Exception e) {
+            log.error("Unexpected exception in DLOutputStream", e);
+            throw new UnexpectedException("unexpected exception in DLOutputStream#flush()", e);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        Utils.ioResult(
+            writeControlRecord()
+                .thenCompose(ignored -> writer.asyncClose())
+                .thenCompose(ignored -> dlm.asyncClose()));
+    }
+}
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/package-info.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/package-info.java
new file mode 100644
index 000000000..697e13a40
--- /dev/null
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Dlog based checkpoint manager.
+ */
+package org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.dlog;
\ No newline at end of file
diff --git a/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLCheckpointStoreTest.java b/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLCheckpointStoreTest.java
new file mode 100644
index 000000000..33f342dbf
--- /dev/null
+++ b/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLCheckpointStoreTest.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.dlog;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.ByteStreams;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.nio.file.FileAlreadyExistsException;
+import java.util.Collections;
+import java.util.List;
+import org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.fs.FSCheckpointManager;
+import org.apache.distributedlog.DLMTestUtil;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.TestDistributedLogBase;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+/**
+ * Unit test of {@link FSCheckpointManager}.
+ */
+public class DLCheckpointStoreTest extends TestDistributedLogBase {
+
+    private static final byte[] TEST_BYTES = "dlog-checkpoint-manager".getBytes(UTF_8);
+
+    @Rule
+    public final TestName runtime = new TestName();
+
+    private URI uri;
+    private Namespace namespace;
+    private DLCheckpointStore store;
+
+    @BeforeClass
+    public static void setupDL() throws Exception {
+        setupCluster();
+    }
+
+    @AfterClass
+    public static void teardownDL() throws Exception {
+        teardownCluster();
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        this.uri = DLMTestUtil.createDLMURI(zkPort, "/" + runtime.getMethodName());
+        ensureURICreated(this.uri);
+        this.namespace = NamespaceBuilder.newBuilder()
+            .conf(new DistributedLogConfiguration())
+            .uri(uri)
+            .build();
+        this.store = new DLCheckpointStore(namespace);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (null != store) {
+            store.close();
+        }
+    }
+
+    @Test
+    public void testListFilesEmpty() throws Exception {
+        // create a dummy log stream to ensure "dir" exists
+        namespace.createLog(runtime.getMethodName());
+        assertTrue(store.listFiles(runtime.getMethodName()).isEmpty());
+    }
+
+    @Test
+    public void testListFilesNotFound() throws Exception {
+        assertTrue(store.listFiles(runtime.getMethodName()).isEmpty());
+    }
+
+    @Test
+    public void testListFiles() throws Exception {
+        int numFiles = 3;
+        List<String> expectedFiles = Lists.newArrayListWithExpectedSize(3);
+
+        namespace.createLog(runtime.getMethodName());
+        for (int i = 0; i < numFiles; ++i) {
+            String filename = runtime.getMethodName() + "-" + i;
+            expectedFiles.add(filename);
+            namespace.createLog(runtime.getMethodName() + "/" + filename);
+        }
+        List<String> files = store.listFiles(runtime.getMethodName());
+        Collections.sort(files);
+
+        assertEquals(expectedFiles, files);
+    }
+
+    @Test
+    public void testFileExists() throws Exception {
+        namespace.createLog(runtime.getMethodName() + "/test");
+        assertTrue(store.fileExists(runtime.getMethodName() + "/test"));
+        assertFalse(store.fileExists(runtime.getMethodName() + "/test2"));
+    }
+
+    @Test
+    public void testFileRename() throws Exception {
+        namespace.createLog("src");
+        namespace.createLog("dest");
+
+        String srcFilePath = "src/" + runtime.getMethodName();
+        String destFilePath = "dest/" + runtime.getMethodName();
+        OutputStream os = store.openOutputStream(srcFilePath);
+        os.write(TEST_BYTES);
+        os.flush();
+        os.close();
+
+        store.rename(srcFilePath, destFilePath);
+        assertTrue(store.fileExists(destFilePath));
+        assertFalse(store.fileExists(srcFilePath));
+
+        assertEquals(TEST_BYTES.length, store.getFileLength(destFilePath));
+
+        try (InputStream is = store.openInputStream(destFilePath)) {
+            byte[] readBytes = new byte[TEST_BYTES.length];
+            ByteStreams.readFully(is, readBytes);
+
+            assertArrayEquals(TEST_BYTES, readBytes);
+        }
+    }
+
+    @Test
+    public void testFileRenameDirNotExists() throws Exception {
+        namespace.createLog("src");
+        assertFalse(store.fileExists("dest"));
+
+        String srcFilePath = "src/" + runtime.getMethodName();
+        String destFilePath = "dest/" + runtime.getMethodName();
+
+        assertFalse(store.fileExists(srcFilePath));
+
+        OutputStream os = store.openOutputStream(srcFilePath);
+        os.write(TEST_BYTES);
+        os.flush();
+        os.close();
+
+        // rename will automatically create stream path in dlog
+        store.rename(srcFilePath, destFilePath);
+        assertTrue(store.fileExists(destFilePath));
+        assertFalse(store.fileExists(srcFilePath));
+
+        assertEquals(TEST_BYTES.length, store.getFileLength(destFilePath));
+
+        try (InputStream is = store.openInputStream(destFilePath)) {
+            byte[] readBytes = new byte[TEST_BYTES.length];
+            ByteStreams.readFully(is, readBytes);
+
+            assertArrayEquals(TEST_BYTES, readBytes);
+        }
+    }
+
+    @Test
+    public void testFileRenameFileExists() throws Exception {
+        namespace.createLog("src");
+        assertFalse(store.fileExists("dest"));
+
+        String srcFilePath = "src/" + runtime.getMethodName();
+        String destFilePath = "dest/" + runtime.getMethodName();
+        namespace.createLog(destFilePath);
+        assertTrue(store.fileExists(destFilePath));
+
+        assertFalse(store.fileExists(srcFilePath));
+
+        OutputStream os = store.openOutputStream(srcFilePath);
+        os.write(TEST_BYTES);
+        os.flush();
+        os.close();
+
+        assertTrue(store.fileExists(srcFilePath));
+
+        try {
+            store.rename(srcFilePath, destFilePath);
+            fail("Should fail to rename if the dest dir doesn't exist");
+        } catch (FileAlreadyExistsException e) {
+            // expected
+        }
+        assertTrue(store.fileExists(destFilePath));
+        assertTrue(store.fileExists(srcFilePath));
+        assertEquals(0, store.getFileLength(destFilePath));
+    }
+
+    @Test
+    public void testDelete() throws Exception {
+        int numFiles = 3;
+        List<String> expectedFiles = Lists.newArrayListWithExpectedSize(3);
+
+        namespace.createLog(runtime.getMethodName());
+        for (int i = 0; i < numFiles; ++i) {
+            String filename = runtime.getMethodName() + "-" + i;
+            expectedFiles.add(filename);
+            namespace.createLog(runtime.getMethodName() + "/" + filename);
+        }
+        List<String> files = store.listFiles(runtime.getMethodName());
+        Collections.sort(files);
+
+        assertEquals(expectedFiles, files);
+
+        store.delete(runtime.getMethodName());
+
+        assertFalse(store.fileExists(runtime.getMethodName()));
+    }
+
+    @Test
+    public void testDeleteRecursively() throws Exception {
+        int numFiles = 3;
+        List<String> expectedFiles = Lists.newArrayListWithExpectedSize(3);
+
+        namespace.createLog(runtime.getMethodName());
+        for (int i = 0; i < numFiles; ++i) {
+            String filename = runtime.getMethodName() + "-" + i;
+            expectedFiles.add(filename);
+            namespace.createLog(runtime.getMethodName() + "/" + filename);
+        }
+        List<String> files = store.listFiles(runtime.getMethodName());
+        Collections.sort(files);
+
+        assertEquals(expectedFiles, files);
+
+        store.delete(runtime.getMethodName());
+
+        assertFalse(store.fileExists(runtime.getMethodName()));
+    }
+
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImpl.java
index 2c80a2544..28f9955ba 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImpl.java
@@ -41,7 +41,6 @@
 import org.apache.bookkeeper.statelib.api.StateStoreSpec;
 import org.apache.bookkeeper.statelib.api.checkpoint.CheckpointStore;
 import org.apache.bookkeeper.statelib.api.mvcc.MVCCAsyncStore;
-import org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.fs.FSCheckpointManager;
 import org.apache.bookkeeper.stream.protocol.RangeId;
 import org.apache.bookkeeper.stream.storage.StorageResources;
 import org.apache.distributedlog.api.namespace.Namespace;
@@ -67,13 +66,15 @@
     // dirs
     private final File[] localStateDirs;
     // checkpoint manager
-    private final CheckpointStore checkpointStore;
+    private final Supplier<CheckpointStore> checkpointStoreSupplier;
+    private CheckpointStore checkpointStore;
     // stores
     private final Map<Long, Map<RangeId, MVCCAsyncStore<byte[], byte[]>>> stores;
     private final boolean serveReadOnlyTable;
     private boolean closed = false;
 
     public MVCCStoreFactoryImpl(Supplier<Namespace> namespaceSupplier,
+                                Supplier<CheckpointStore> checkpointStoreSupplier,
                                 File[] localStoreDirs,
                                 StorageResources storageResources,
                                 boolean serveReadOnlyTable) {
@@ -86,8 +87,7 @@ public MVCCStoreFactoryImpl(Supplier<Namespace> namespaceSupplier,
         this.checkpointScheduler =
             SharedResourceManager.shared().get(storageResources.checkpointScheduler());
         this.localStateDirs = localStoreDirs;
-        // TODO: change this cto dlog based checkpoint manager
-        this.checkpointStore = new FSCheckpointManager(new File(localStoreDirs[0], "checkpoints"));
+        this.checkpointStoreSupplier = checkpointStoreSupplier;
         this.stores = Maps.newHashMap();
         this.serveReadOnlyTable = serveReadOnlyTable;
     }
@@ -185,6 +185,10 @@ private synchronized void addStore(long scId, long streamId, long rangeId,
             normalizedName(streamId),
             normalizedName(rangeId));
 
+        if (null == checkpointStore) {
+            checkpointStore = checkpointStoreSupplier.get();
+        }
+
         // build a spec
         StateStoreSpec spec = StateStoreSpec.builder()
             .name(storeName)
@@ -247,7 +251,10 @@ public void close() {
         } catch (Exception e) {
             log.info("Encountered issue on closing all the range stores opened by this range factory");
         }
-        checkpointStore.close();
+        if (null != checkpointStore) {
+            checkpointStore.close();
+            checkpointStore = null;
+        }
 
         SharedResourceManager.shared().release(
             storageResources.ioWriteScheduler(), writeIOScheduler);
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImplTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImplTest.java
index e5c36ee45..9372adfb1 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImplTest.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImplTest.java
@@ -99,6 +99,7 @@ public void setup() throws IOException {
                 .build());
         this.factory = new MVCCStoreFactoryImpl(
             () -> namespace,
+            () -> new FSCheckpointManager(new File(storeDirs[0], "checkpoints")),
             storeDirs,
             resources,
             false);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services