You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/06/04 20:39:59 UTC
[bookkeeper] 01/04: [TABLE SERVICE] Dlog based checkpoint store
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit 7f0d8adcdd1b567dae722d14427e28231ed9c2e5
Author: Sijie Guo <si...@apache.org>
AuthorDate: Wed Apr 25 02:13:28 2018 -0700
[TABLE SERVICE] Dlog based checkpoint store
*Motivation*
Currently the table range stores are using local filesystem as a checkpoint store. It is okay for running as standalone mode.
But it doesn't work to run in a distributed mode. This change is introducing a dlog based checkpoint store to make sure all
the sst files of table ranges are durably checkpointed into bookkeeper itself.
*Solution*
Introduced a dlog based checkpoint store.
Author: Sijie Guo <si...@apache.org>
Reviewers: Jia Zhai <None>
This closes #1366 from sijie/dlog_checkpoint_manager
---
.../impl/metadata/ZKLogStreamMetadataStore.java | 2 +
.../org/apache/distributedlog/fs/DLFileSystem.java | 15 +-
.../bookkeeper/stream/server/StorageServer.java | 2 +
.../rocksdb/checkpoint/dlog/DLCheckpointStore.java | 154 +++++++++++++
.../rocksdb/checkpoint/dlog/DLInputStream.java | 232 +++++++++++++++++++
.../rocksdb/checkpoint/dlog/DLOutputStream.java | 142 ++++++++++++
.../impl/rocksdb/checkpoint/dlog/package-info.java | 23 ++
.../checkpoint/dlog/DLCheckpointStoreTest.java | 256 +++++++++++++++++++++
.../storage/impl/store/MVCCStoreFactoryImpl.java | 17 +-
.../impl/store/MVCCStoreFactoryImplTest.java | 1 +
10 files changed, 838 insertions(+), 6 deletions(-)
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
index 1e36356..0e923c6 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
@@ -873,6 +873,8 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore {
} else if (Code.NOTEMPTY.intValue() == rc) {
future.completeExceptionally(new LockingException(oldLogPath + LOCK_PATH,
"Someone is holding a lock on log " + oldLogPath));
+ } else if (Code.NONODE.intValue() == rc) {
+ future.completeExceptionally(new LogNotFoundException("Log " + newLogPath + " is not found"));
} else {
future.completeExceptionally(new ZKException("Failed to rename log "
+ oldLogPath + " to " + newLogPath + " at path " + path, Code.get(rc)));
diff --git a/stream/distributedlog/io/dlfs/src/main/java/org/apache/distributedlog/fs/DLFileSystem.java b/stream/distributedlog/io/dlfs/src/main/java/org/apache/distributedlog/fs/DLFileSystem.java
index 1a056c3..7872052 100644
--- a/stream/distributedlog/io/dlfs/src/main/java/org/apache/distributedlog/fs/DLFileSystem.java
+++ b/stream/distributedlog/io/dlfs/src/main/java/org/apache/distributedlog/fs/DLFileSystem.java
@@ -29,6 +29,7 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.ExecutionException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.distributedlog.DLSN;
@@ -39,6 +40,7 @@ import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.LogReader;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.apache.distributedlog.exceptions.DLInterruptedException;
import org.apache.distributedlog.exceptions.LogEmptyException;
import org.apache.distributedlog.exceptions.LogNotFoundException;
import org.apache.distributedlog.util.Utils;
@@ -315,7 +317,18 @@ public class DLFileSystem extends FileSystem {
public boolean rename(Path src, Path dst) throws IOException {
String srcLog = getStreamName(src);
String dstLog = getStreamName(dst);
- namespace.renameLog(srcLog, dstLog);
+ try {
+ namespace.renameLog(srcLog, dstLog).get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new DLInterruptedException("Interrupted at renaming " + srcLog + " to " + dstLog, e);
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ } else {
+ throw new IOException("Failed to rename " + srcLog + " to " + dstLog, e.getCause());
+ }
+ }
return true;
}
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
index 69a8e84..b48ea0d 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/StorageServer.java
@@ -27,6 +27,7 @@ import org.apache.bookkeeper.common.component.ComponentStarter;
import org.apache.bookkeeper.common.component.LifecycleComponent;
import org.apache.bookkeeper.common.component.LifecycleComponentStack;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
+import org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.dlog.DLCheckpointStore;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stream.proto.common.Endpoint;
import org.apache.bookkeeper.stream.server.conf.BookieConfiguration;
@@ -213,6 +214,7 @@ public class StorageServer {
.withRangeStoreFactory(
new MVCCStoreFactoryImpl(
dlNamespaceProvider,
+ () -> new DLCheckpointStore(dlNamespaceProvider.get()),
storageConf.getRangeStoreDirs(),
storageResources,
storageConf.getServeReadOnlyTables()));
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLCheckpointStore.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLCheckpointStore.java
new file mode 100644
index 0000000..8dc0447
--- /dev/null
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLCheckpointStore.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.dlog;
+
+import com.google.common.collect.Lists;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.NoSuchFileException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.statelib.api.checkpoint.CheckpointStore;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.api.AsyncLogWriter;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogReader;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.exceptions.DLInterruptedException;
+import org.apache.distributedlog.exceptions.LogEmptyException;
+import org.apache.distributedlog.exceptions.LogExistsException;
+import org.apache.distributedlog.exceptions.LogNotFoundException;
+import org.apache.distributedlog.util.Utils;
+
+/**
+ * Dlog based checkpoint store.
+ */
+@Slf4j
+public class DLCheckpointStore implements CheckpointStore {
+
+ private final Namespace namespace;
+
+ public DLCheckpointStore(Namespace namespace) {
+ this.namespace = namespace;
+ }
+
+ @Override
+ public List<String> listFiles(String filePath) throws IOException {
+ return Lists.newArrayList(namespace.getLogs(filePath));
+ }
+
+ @Override
+ public boolean fileExists(String filePath) throws IOException {
+ return namespace.logExists(filePath);
+ }
+
+ @Override
+ public long getFileLength(String filePath) throws IOException {
+ try (DistributedLogManager dlm = namespace.openLog(filePath)) {
+ return dlm.getLastTxId();
+ } catch (LogNotFoundException e) {
+ throw new FileNotFoundException(filePath);
+ } catch (LogEmptyException e) {
+ return 0;
+ }
+ }
+
+ @Override
+ public InputStream openInputStream(String filePath) throws IOException {
+ try {
+ DistributedLogManager dlm = namespace.openLog(filePath);
+ LogReader reader;
+ try {
+ reader = dlm.openLogReader(DLSN.InitialDLSN);
+ } catch (LogNotFoundException | LogEmptyException e) {
+ throw new FileNotFoundException(filePath);
+ }
+ return new BufferedInputStream(
+ new DLInputStream(dlm, reader, 0L), 128 * 1024);
+ } catch (LogNotFoundException e) {
+ throw new FileNotFoundException(filePath);
+ }
+ }
+
+ @Override
+ public OutputStream openOutputStream(String filePath) throws IOException {
+ try {
+ DistributedLogManager dlm = namespace.openLog(
+ filePath);
+ AsyncLogWriter writer = Utils.ioResult(dlm.openAsyncLogWriter());
+ return new BufferedOutputStream(
+ new DLOutputStream(dlm, writer), 128 * 1024);
+ } catch (LogNotFoundException le) {
+ throw new FileNotFoundException(filePath);
+ }
+ }
+
+ @Override
+ public void rename(String srcLog, String dstLog) throws IOException {
+ log.info("Renaming {} to {}", srcLog, dstLog);
+ try {
+ namespace.renameLog(srcLog, dstLog).get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new DLInterruptedException("Interrupted at renaming " + srcLog + " to " + dstLog, e);
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof LogExistsException) {
+ throw new FileAlreadyExistsException("Dest file already exists : " + dstLog);
+ } else if (e.getCause() instanceof LogNotFoundException) {
+ throw new NoSuchFileException("Src file or dest directory is not found");
+ } else if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ } else {
+ throw new IOException("Failed to rename " + srcLog + " to " + dstLog, e.getCause());
+ }
+ }
+ }
+
+ @Override
+ public void deleteRecursively(String srcPath) throws IOException {
+ Iterator<String> logs = namespace.getLogs(srcPath);
+ while (logs.hasNext()) {
+ String child = logs.next();
+ deleteRecursively(srcPath + "/" + child);
+ }
+ namespace.deleteLog(srcPath);
+ }
+
+ @Override
+ public void delete(String srcPath) throws IOException {
+ namespace.deleteLog(srcPath);
+ }
+
+ @Override
+ public void createDirectories(String srcPath) throws IOException {
+ namespace.createLog(srcPath);
+ }
+
+ @Override
+ public void close() {
+ namespace.close();
+ }
+}
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLInputStream.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLInputStream.java
new file mode 100644
index 0000000..c9e5fa8
--- /dev/null
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLInputStream.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.dlog;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.io.InputStream;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.distributedlog.LogRecordWithDLSN;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogReader;
+
+/**
+ * The input stream for a distributedlog stream.
+ */
+@Slf4j
+class DLInputStream extends InputStream {
+
+ private static final long REOPEN_READER_SKIP_BYTES = 4 * 1024 * 1024; // 4MB
+
+ private static class RecordStream {
+
+ private final InputStream payloadStream;
+ private final LogRecordWithDLSN record;
+
+ RecordStream(LogRecordWithDLSN record) {
+ checkNotNull(record);
+
+ this.record = record;
+ this.payloadStream = record.getPayLoadInputStream();
+ }
+
+ }
+
+ private static RecordStream nextRecordStream(LogReader reader) throws IOException {
+ LogRecordWithDLSN record = reader.readNext(false);
+ if (null != record) {
+ return new RecordStream(record);
+ }
+ return null;
+ }
+
+ private final DistributedLogManager dlm;
+ private LogReader reader;
+ private long pos;
+ private long lastPos;
+ private RecordStream currentRecord = null;
+
+ DLInputStream(DistributedLogManager dlm,
+ LogReader reader,
+ long startPos)
+ throws IOException {
+ this.dlm = dlm;
+ this.reader = reader;
+ this.pos = startPos;
+ this.lastPos = readEndPos();
+ seek(startPos);
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ dlm.close();
+ }
+
+ private long readEndPos() throws IOException {
+ return dlm.getLastTxId();
+ }
+
+ //
+ // FSInputStream
+ //
+
+ public void seek(long pos) throws IOException {
+ if (this.pos == pos) {
+ return;
+ }
+
+ if (this.pos > pos || (pos - this.pos) >= REOPEN_READER_SKIP_BYTES) {
+ // close the previous reader
+ this.reader.close();
+ this.reader = dlm.openLogReader(pos);
+ this.currentRecord = null;
+ }
+
+ skipTo(pos);
+ }
+
+ private boolean skipTo(final long position) throws IOException {
+ while (true) {
+ if (null == currentRecord) {
+ currentRecord = nextRecordStream(reader);
+ }
+
+ if (null == currentRecord) { // the stream is empty now
+ return false;
+ }
+
+ long endPos = currentRecord.record.getTransactionId();
+ if (endPos < position) {
+ currentRecord = nextRecordStream(reader);
+ this.pos = endPos;
+ continue;
+ } else if (endPos == position){
+ // find the record, but we defer read next record when actual read happens
+ this.pos = position;
+ this.currentRecord = null;
+ return true;
+ } else {
+ this.currentRecord.payloadStream.skip(
+ this.currentRecord.payloadStream.available() - (endPos - position));
+ this.pos = position;
+ return true;
+ }
+ }
+ }
+
+ //
+ // Input Stream
+ //
+
+ @Override
+ public int read(byte[] b, final int off, final int len) throws IOException {
+ int remaining = len;
+ int numBytesRead = 0;
+ while (remaining > 0) {
+ if (null == currentRecord) {
+ currentRecord = nextRecordStream(reader);
+ }
+
+ if (null == currentRecord) {
+ if (numBytesRead == 0) {
+ return -1;
+ }
+ break;
+ }
+
+ int bytesLeft = currentRecord.payloadStream.available();
+ if (bytesLeft <= 0) {
+ currentRecord.payloadStream.close();
+ currentRecord = null;
+ continue;
+ }
+
+ int numBytesToRead = Math.min(bytesLeft, remaining);
+ int numBytes = currentRecord.payloadStream.read(b, off + numBytesRead, numBytesToRead);
+ if (numBytes < 0) {
+ continue;
+ }
+ numBytesRead += numBytes;
+ remaining -= numBytes;
+ }
+ return numBytesRead;
+ }
+
+ @Override
+ public long skip(final long n) throws IOException {
+ if (n <= 0L) {
+ return 0L;
+ }
+
+ long remaining = n;
+ while (true) {
+ if (null == currentRecord) {
+ currentRecord = nextRecordStream(reader);
+ }
+
+ if (null == currentRecord) { // end of stream
+ return n - remaining;
+ }
+
+ int bytesLeft = currentRecord.payloadStream.available();
+ long endPos = currentRecord.record.getTransactionId();
+ if (remaining > bytesLeft) {
+ // skip the whole record
+ remaining -= bytesLeft;
+ this.pos = endPos;
+ this.currentRecord = nextRecordStream(reader);
+ continue;
+ } else if (remaining == bytesLeft) {
+ this.pos = endPos;
+ this.currentRecord = null;
+ return n;
+ } else {
+ currentRecord.payloadStream.skip(remaining);
+ this.pos = endPos - currentRecord.payloadStream.available();
+ return n;
+ }
+ }
+ }
+
+ @Override
+ public int available() throws IOException {
+ if (lastPos - pos == 0L) {
+ lastPos = readEndPos();
+ }
+ return (int) (lastPos - pos);
+ }
+
+ @Override
+ public boolean markSupported() {
+ return false;
+ }
+
+ @Override
+ public int read() throws IOException {
+ byte[] data = new byte[1];
+ int numBytes = read(data);
+ if (numBytes <= 0) {
+ return -1;
+ }
+ return data[0];
+ }
+
+}
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLOutputStream.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLOutputStream.java
new file mode 100644
index 0000000..8fe6200
--- /dev/null
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLOutputStream.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.dlog;
+
+import static com.google.common.base.Charsets.UTF_8;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.concurrent.FutureEventListener;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.LogRecord;
+import org.apache.distributedlog.api.AsyncLogWriter;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.exceptions.UnexpectedException;
+import org.apache.distributedlog.util.Utils;
+
+/**
+ * DistributedLog Output Stream.
+ */
+@Slf4j
+class DLOutputStream extends OutputStream {
+
+ private static final byte[] CONTROL_RECORD_CONTENT = "control".getBytes(UTF_8);
+
+ private final DistributedLogManager dlm;
+ private final AsyncLogWriter writer;
+
+ // positions
+ private final long[] syncPos = new long[1];
+ private long writePos = 0L;
+
+ // state
+ private static final AtomicReferenceFieldUpdater<DLOutputStream, Throwable> exceptionUpdater =
+ AtomicReferenceFieldUpdater.newUpdater(DLOutputStream.class, Throwable.class, "exception");
+ private volatile Throwable exception = null;
+
+ DLOutputStream(DistributedLogManager dlm,
+ AsyncLogWriter writer) {
+ this.dlm = dlm;
+ this.writer = writer;
+ this.writePos = writer.getLastTxId() < 0L ? 0L : writer.getLastTxId();
+ this.syncPos[0] = writePos;
+ }
+
+ public synchronized long position() {
+ return syncPos[0];
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ byte[] data = new byte[] { (byte) b };
+ write(data);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ write(Unpooled.wrappedBuffer(b));
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ write(Unpooled.wrappedBuffer(b, off, len));
+ }
+
+ private synchronized void write(ByteBuf buf) throws IOException {
+ Throwable cause = exceptionUpdater.get(this);
+ if (null != cause) {
+ if (cause instanceof IOException) {
+ throw (IOException) cause;
+ } else {
+ throw new UnexpectedException("Encountered unknown issue", cause);
+ }
+ }
+
+ writePos += buf.readableBytes();
+ LogRecord record = new LogRecord(writePos, buf);
+ writer.write(record).whenComplete(new FutureEventListener<DLSN>() {
+ @Override
+ public void onSuccess(DLSN value) {
+ synchronized (syncPos) {
+ syncPos[0] = record.getTransactionId();
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable cause) {
+ exceptionUpdater.compareAndSet(DLOutputStream.this, null, cause);
+ }
+ });
+ }
+
+ private CompletableFuture<DLSN> writeControlRecord() {
+ LogRecord record;
+ synchronized (this) {
+ record = new LogRecord(writePos, Unpooled.wrappedBuffer(CONTROL_RECORD_CONTENT));
+ record.setControl();
+ }
+ return writer.write(record);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ try {
+ FutureUtils.result(writeControlRecord());
+ } catch (IOException ioe) {
+ throw ioe;
+ } catch (Exception e) {
+ log.error("Unexpected exception in DLOutputStream", e);
+ throw new UnexpectedException("unexpected exception in DLOutputStream#flush()", e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ Utils.ioResult(
+ writeControlRecord()
+ .thenCompose(ignored -> writer.asyncClose())
+ .thenCompose(ignored -> dlm.asyncClose()));
+ }
+}
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/package-info.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/package-info.java
new file mode 100644
index 0000000..697e13a
--- /dev/null
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Dlog based checkpoint manager.
+ */
+package org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.dlog;
\ No newline at end of file
diff --git a/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLCheckpointStoreTest.java b/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLCheckpointStoreTest.java
new file mode 100644
index 0000000..33f342d
--- /dev/null
+++ b/stream/statelib/src/test/java/org/apache/bookkeeper/statelib/impl/rocksdb/checkpoint/dlog/DLCheckpointStoreTest.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.dlog;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.ByteStreams;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.nio.file.FileAlreadyExistsException;
+import java.util.Collections;
+import java.util.List;
+import org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.fs.FSCheckpointManager;
+import org.apache.distributedlog.DLMTestUtil;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.TestDistributedLogBase;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+/**
+ * Unit test of {@link FSCheckpointManager}.
+ */
+public class DLCheckpointStoreTest extends TestDistributedLogBase {
+
+ private static final byte[] TEST_BYTES = "dlog-checkpoint-manager".getBytes(UTF_8);
+
+ @Rule
+ public final TestName runtime = new TestName();
+
+ private URI uri;
+ private Namespace namespace;
+ private DLCheckpointStore store;
+
+ @BeforeClass
+ public static void setupDL() throws Exception {
+ setupCluster();
+ }
+
+ @AfterClass
+ public static void teardownDL() throws Exception {
+ teardownCluster();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ this.uri = DLMTestUtil.createDLMURI(zkPort, "/" + runtime.getMethodName());
+ ensureURICreated(this.uri);
+ this.namespace = NamespaceBuilder.newBuilder()
+ .conf(new DistributedLogConfiguration())
+ .uri(uri)
+ .build();
+ this.store = new DLCheckpointStore(namespace);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (null != store) {
+ store.close();
+ }
+ }
+
+ @Test
+ public void testListFilesEmpty() throws Exception {
+ // create a dummy log stream to ensure "dir" exists
+ namespace.createLog(runtime.getMethodName());
+ assertTrue(store.listFiles(runtime.getMethodName()).isEmpty());
+ }
+
+ @Test
+ public void testListFilesNotFound() throws Exception {
+ assertTrue(store.listFiles(runtime.getMethodName()).isEmpty());
+ }
+
+ @Test
+ public void testListFiles() throws Exception {
+ int numFiles = 3;
+ List<String> expectedFiles = Lists.newArrayListWithExpectedSize(3);
+
+ namespace.createLog(runtime.getMethodName());
+ for (int i = 0; i < numFiles; ++i) {
+ String filename = runtime.getMethodName() + "-" + i;
+ expectedFiles.add(filename);
+ namespace.createLog(runtime.getMethodName() + "/" + filename);
+ }
+ List<String> files = store.listFiles(runtime.getMethodName());
+ Collections.sort(files);
+
+ assertEquals(expectedFiles, files);
+ }
+
+ @Test
+ public void testFileExists() throws Exception {
+ namespace.createLog(runtime.getMethodName() + "/test");
+ assertTrue(store.fileExists(runtime.getMethodName() + "/test"));
+ assertFalse(store.fileExists(runtime.getMethodName() + "/test2"));
+ }
+
+ @Test
+ public void testFileRename() throws Exception {
+ namespace.createLog("src");
+ namespace.createLog("dest");
+
+ String srcFilePath = "src/" + runtime.getMethodName();
+ String destFilePath = "dest/" + runtime.getMethodName();
+ OutputStream os = store.openOutputStream(srcFilePath);
+ os.write(TEST_BYTES);
+ os.flush();
+ os.close();
+
+ store.rename(srcFilePath, destFilePath);
+ assertTrue(store.fileExists(destFilePath));
+ assertFalse(store.fileExists(srcFilePath));
+
+ assertEquals(TEST_BYTES.length, store.getFileLength(destFilePath));
+
+ try (InputStream is = store.openInputStream(destFilePath)) {
+ byte[] readBytes = new byte[TEST_BYTES.length];
+ ByteStreams.readFully(is, readBytes);
+
+ assertArrayEquals(TEST_BYTES, readBytes);
+ }
+ }
+
+ @Test
+ public void testFileRenameDirNotExists() throws Exception {
+ namespace.createLog("src");
+ assertFalse(store.fileExists("dest"));
+
+ String srcFilePath = "src/" + runtime.getMethodName();
+ String destFilePath = "dest/" + runtime.getMethodName();
+
+ assertFalse(store.fileExists(srcFilePath));
+
+ OutputStream os = store.openOutputStream(srcFilePath);
+ os.write(TEST_BYTES);
+ os.flush();
+ os.close();
+
+ // rename will automatically create stream path in dlog
+ store.rename(srcFilePath, destFilePath);
+ assertTrue(store.fileExists(destFilePath));
+ assertFalse(store.fileExists(srcFilePath));
+
+ assertEquals(TEST_BYTES.length, store.getFileLength(destFilePath));
+
+ try (InputStream is = store.openInputStream(destFilePath)) {
+ byte[] readBytes = new byte[TEST_BYTES.length];
+ ByteStreams.readFully(is, readBytes);
+
+ assertArrayEquals(TEST_BYTES, readBytes);
+ }
+ }
+
+ @Test
+ public void testFileRenameFileExists() throws Exception {
+ namespace.createLog("src");
+ assertFalse(store.fileExists("dest"));
+
+ String srcFilePath = "src/" + runtime.getMethodName();
+ String destFilePath = "dest/" + runtime.getMethodName();
+ namespace.createLog(destFilePath);
+ assertTrue(store.fileExists(destFilePath));
+
+ assertFalse(store.fileExists(srcFilePath));
+
+ OutputStream os = store.openOutputStream(srcFilePath);
+ os.write(TEST_BYTES);
+ os.flush();
+ os.close();
+
+ assertTrue(store.fileExists(srcFilePath));
+
+ try {
+ store.rename(srcFilePath, destFilePath);
+ fail("Should fail to rename if the dest dir doesn't exist");
+ } catch (FileAlreadyExistsException e) {
+ // expected
+ }
+ assertTrue(store.fileExists(destFilePath));
+ assertTrue(store.fileExists(srcFilePath));
+ assertEquals(0, store.getFileLength(destFilePath));
+ }
+
+ @Test
+ public void testDelete() throws Exception {
+ int numFiles = 3;
+ List<String> expectedFiles = Lists.newArrayListWithExpectedSize(3);
+
+ namespace.createLog(runtime.getMethodName());
+ for (int i = 0; i < numFiles; ++i) {
+ String filename = runtime.getMethodName() + "-" + i;
+ expectedFiles.add(filename);
+ namespace.createLog(runtime.getMethodName() + "/" + filename);
+ }
+ List<String> files = store.listFiles(runtime.getMethodName());
+ Collections.sort(files);
+
+ assertEquals(expectedFiles, files);
+
+ store.delete(runtime.getMethodName());
+
+ assertFalse(store.fileExists(runtime.getMethodName()));
+ }
+
+ @Test
+ public void testDeleteRecursively() throws Exception {
+ int numFiles = 3;
+ List<String> expectedFiles = Lists.newArrayListWithExpectedSize(3);
+
+ namespace.createLog(runtime.getMethodName());
+ for (int i = 0; i < numFiles; ++i) {
+ String filename = runtime.getMethodName() + "-" + i;
+ expectedFiles.add(filename);
+ namespace.createLog(runtime.getMethodName() + "/" + filename);
+ }
+ List<String> files = store.listFiles(runtime.getMethodName());
+ Collections.sort(files);
+
+ assertEquals(expectedFiles, files);
+
+ store.delete(runtime.getMethodName());
+
+ assertFalse(store.fileExists(runtime.getMethodName()));
+ }
+
+}
diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImpl.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImpl.java
index 2c80a25..28f9955 100644
--- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImpl.java
+++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImpl.java
@@ -41,7 +41,6 @@ import org.apache.bookkeeper.statelib.StateStores;
import org.apache.bookkeeper.statelib.api.StateStoreSpec;
import org.apache.bookkeeper.statelib.api.checkpoint.CheckpointStore;
import org.apache.bookkeeper.statelib.api.mvcc.MVCCAsyncStore;
-import org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.fs.FSCheckpointManager;
import org.apache.bookkeeper.stream.protocol.RangeId;
import org.apache.bookkeeper.stream.storage.StorageResources;
import org.apache.distributedlog.api.namespace.Namespace;
@@ -67,13 +66,15 @@ public class MVCCStoreFactoryImpl implements MVCCStoreFactory {
// dirs
private final File[] localStateDirs;
// checkpoint manager
- private final CheckpointStore checkpointStore;
+ private final Supplier<CheckpointStore> checkpointStoreSupplier;
+ private CheckpointStore checkpointStore;
// stores
private final Map<Long, Map<RangeId, MVCCAsyncStore<byte[], byte[]>>> stores;
private final boolean serveReadOnlyTable;
private boolean closed = false;
public MVCCStoreFactoryImpl(Supplier<Namespace> namespaceSupplier,
+ Supplier<CheckpointStore> checkpointStoreSupplier,
File[] localStoreDirs,
StorageResources storageResources,
boolean serveReadOnlyTable) {
@@ -86,8 +87,7 @@ public class MVCCStoreFactoryImpl implements MVCCStoreFactory {
this.checkpointScheduler =
SharedResourceManager.shared().get(storageResources.checkpointScheduler());
this.localStateDirs = localStoreDirs;
- // TODO: change this cto dlog based checkpoint manager
- this.checkpointStore = new FSCheckpointManager(new File(localStoreDirs[0], "checkpoints"));
+ this.checkpointStoreSupplier = checkpointStoreSupplier;
this.stores = Maps.newHashMap();
this.serveReadOnlyTable = serveReadOnlyTable;
}
@@ -185,6 +185,10 @@ public class MVCCStoreFactoryImpl implements MVCCStoreFactory {
normalizedName(streamId),
normalizedName(rangeId));
+ if (null == checkpointStore) {
+ checkpointStore = checkpointStoreSupplier.get();
+ }
+
// build a spec
StateStoreSpec spec = StateStoreSpec.builder()
.name(storeName)
@@ -247,7 +251,10 @@ public class MVCCStoreFactoryImpl implements MVCCStoreFactory {
} catch (Exception e) {
log.info("Encountered issue on closing all the range stores opened by this range factory");
}
- checkpointStore.close();
+ if (null != checkpointStore) {
+ checkpointStore.close();
+ checkpointStore = null;
+ }
SharedResourceManager.shared().release(
storageResources.ioWriteScheduler(), writeIOScheduler);
diff --git a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImplTest.java b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImplTest.java
index e5c36ee..9372adf 100644
--- a/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImplTest.java
+++ b/stream/storage/impl/src/test/java/org/apache/bookkeeper/stream/storage/impl/store/MVCCStoreFactoryImplTest.java
@@ -99,6 +99,7 @@ public class MVCCStoreFactoryImplTest {
.build());
this.factory = new MVCCStoreFactoryImpl(
() -> namespace,
+ () -> new FSCheckpointManager(new File(storeDirs[0], "checkpoints")),
storeDirs,
resources,
false);
--
To stop receiving notification emails like this one, please contact
sijie@apache.org.