You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ji...@apache.org on 2017/01/31 21:17:21 UTC
[36/54] [abbrv] incubator-ratis git commit: Renamed the packages from
raft to ratis in preperation for Apache Incubation - Moved all java packages
from org.apache.raft to org.apache.ratis. - Moved native package to
org_apache_ratis, and native lib to l
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/LogOutputStream.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/LogOutputStream.java b/raft-server/src/main/java/org/apache/raft/server/storage/LogOutputStream.java
deleted file mode 100644
index 0dc8029..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/storage/LogOutputStream.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.storage;
-
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.server.impl.RaftServerConstants;
-import org.apache.raft.shaded.com.google.protobuf.CodedOutputStream;
-import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
-import org.apache.raft.util.PureJavaCrc32C;
-import org.apache.raft.util.RaftUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.zip.Checksum;
-
-import static org.apache.raft.server.RaftServerConfigKeys.*;
-
-public class LogOutputStream implements Closeable {
- private static final Logger LOG = LoggerFactory.getLogger(LogOutputStream.class);
-
- private static final ByteBuffer fill;
- private static final int BUFFER_SIZE = 1024 * 1024; // 1 MB
- static {
- fill = ByteBuffer.allocateDirect(BUFFER_SIZE);
- fill.position(0);
- for (int i = 0; i < fill.capacity(); i++) {
- fill.put(RaftServerConstants.LOG_TERMINATE_BYTE);
- }
- }
-
- private File file;
- private FileChannel fc; // channel of the file stream for sync
- private BufferedWriteChannel out; // buffered FileChannel for writing
- private final Checksum checksum;
-
- private final long segmentMaxSize;
- private final long preallocatedSize;
- private long preallocatedPos;
-
- public LogOutputStream(File file, boolean append, RaftProperties properties)
- throws IOException {
- this.file = file;
- this.checksum = new PureJavaCrc32C();
- this.segmentMaxSize = properties.getLong(RAFT_LOG_SEGMENT_MAX_SIZE_KEY,
- RAFT_LOG_SEGMENT_MAX_SIZE_DEFAULT);
- this.preallocatedSize = properties.getLong(
- RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_KEY,
- RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_DEFAULT);
- RandomAccessFile rp = new RandomAccessFile(file, "rw");
- fc = rp.getChannel();
- fc.position(fc.size());
- preallocatedPos = fc.size();
-
- int bufferSize = properties.getInt(RAFT_LOG_WRITE_BUFFER_SIZE_KEY,
- RAFT_LOG_WRITE_BUFFER_SIZE_DEFAULT);
- out = new BufferedWriteChannel(fc, bufferSize);
-
- if (!append) {
- create();
- }
- }
-
- /**
- * Format:
- * LogEntryProto's protobuf
- * 4-byte checksum of the above protobuf
- */
- public void write(LogEntryProto entry) throws IOException {
- final int serialized = entry.getSerializedSize();
- final int bufferSize = CodedOutputStream.computeUInt32SizeNoTag(serialized)
- + serialized;
-
- preallocateIfNecessary(bufferSize + 4);
-
- byte[] buf = new byte[bufferSize];
- CodedOutputStream cout = CodedOutputStream.newInstance(buf);
- cout.writeUInt32NoTag(serialized);
- entry.writeTo(cout);
-
- checksum.reset();
- checksum.update(buf, 0, buf.length);
- final int sum = (int) checksum.getValue();
-
- out.write(buf);
- writeInt(sum);
- }
-
- private void writeInt(int v) throws IOException {
- out.write((v >>> 24) & 0xFF);
- out.write((v >>> 16) & 0xFF);
- out.write((v >>> 8) & 0xFF);
- out.write((v) & 0xFF);
- }
-
- private void create() throws IOException {
- fc.truncate(0);
- fc.position(0);
- preallocatedPos = 0;
- preallocate(); // preallocate file
-
- out.write(SegmentedRaftLog.HEADER_BYTES);
- flush();
- }
-
- @Override
- public void close() throws IOException {
- try {
- out.flush(false);
- if (fc != null && fc.isOpen()) {
- fc.truncate(fc.position());
- }
- } finally {
- RaftUtils.cleanup(LOG, fc, out);
- fc = null;
- out = null;
- }
- }
-
- /**
- * Flush data to persistent store.
- * Collect sync metrics.
- */
- public void flush() throws IOException {
- if (out == null) {
- throw new IOException("Trying to use aborted output stream");
- }
- out.flush(true);
- }
-
- private void preallocate() throws IOException {
- fill.position(0);
- long targetSize = Math.min(segmentMaxSize - fc.size(), preallocatedSize);
- int allocated = 0;
- while (allocated < targetSize) {
- int size = (int) Math.min(BUFFER_SIZE, targetSize - allocated);
- ByteBuffer buffer = fill.slice();
- buffer.limit(size);
- RaftUtils.writeFully(fc, buffer, preallocatedPos);
- preallocatedPos += size;
- allocated += size;
- }
- LOG.debug("Pre-allocated {} bytes for the log segment", allocated);
- }
-
- private void preallocateIfNecessary(int size) throws IOException {
- if (out.position() + size > preallocatedPos) {
- preallocate();
- }
- }
-
- @Override
- public String toString() {
- return this.getClass().getSimpleName() + "(" + file + ")";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/LogReader.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/LogReader.java b/raft-server/src/main/java/org/apache/raft/server/storage/LogReader.java
deleted file mode 100644
index 9523cac..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/storage/LogReader.java
+++ /dev/null
@@ -1,292 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.storage;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.io.Charsets;
-import org.apache.raft.protocol.ChecksumException;
-import org.apache.raft.server.impl.RaftServerConstants;
-import org.apache.raft.shaded.com.google.protobuf.CodedInputStream;
-import org.apache.raft.shaded.com.google.protobuf.CodedOutputStream;
-import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
-import org.apache.raft.util.PureJavaCrc32C;
-import org.apache.raft.util.RaftUtils;
-
-import java.io.*;
-import java.util.zip.Checksum;
-
-public class LogReader implements Closeable {
- /**
- * InputStream wrapper that keeps track of the current stream position.
- *
- * This stream also allows us to set a limit on how many bytes we can read
- * without getting an exception.
- */
- public static class LimitedInputStream extends FilterInputStream {
- private long curPos = 0;
- private long markPos = -1;
- private long limitPos = Long.MAX_VALUE;
-
- public LimitedInputStream(InputStream is) {
- super(is);
- }
-
- private void checkLimit(long amt) throws IOException {
- long extra = (curPos + amt) - limitPos;
- if (extra > 0) {
- throw new IOException("Tried to read " + amt + " byte(s) past " +
- "the limit at offset " + limitPos);
- }
- }
-
- @Override
- public int read() throws IOException {
- checkLimit(1);
- int ret = super.read();
- if (ret != -1) curPos++;
- return ret;
- }
-
- @Override
- public int read(byte[] data) throws IOException {
- checkLimit(data.length);
- int ret = super.read(data);
- if (ret > 0) curPos += ret;
- return ret;
- }
-
- @Override
- public int read(byte[] data, int offset, int length) throws IOException {
- checkLimit(length);
- int ret = super.read(data, offset, length);
- if (ret > 0) curPos += ret;
- return ret;
- }
-
- public void setLimit(long limit) {
- limitPos = curPos + limit;
- }
-
- public void clearLimit() {
- limitPos = Long.MAX_VALUE;
- }
-
- @Override
- public void mark(int limit) {
- super.mark(limit);
- markPos = curPos;
- }
-
- @Override
- public void reset() throws IOException {
- if (markPos == -1) {
- throw new IOException("Not marked!");
- }
- super.reset();
- curPos = markPos;
- markPos = -1;
- }
-
- public long getPos() {
- return curPos;
- }
-
- @Override
- public long skip(long amt) throws IOException {
- long extra = (curPos + amt) - limitPos;
- if (extra > 0) {
- throw new IOException("Tried to skip " + extra + " bytes past " +
- "the limit at offset " + limitPos);
- }
- long ret = super.skip(amt);
- curPos += ret;
- return ret;
- }
- }
-
- private static final int maxOpSize = 32 * 1024 * 1024;
-
- private final LimitedInputStream limiter;
- private final DataInputStream in;
- private byte[] temp = new byte[4096];
- private final Checksum checksum;
-
- LogReader(File file) throws FileNotFoundException {
- this.limiter = new LimitedInputStream(
- new BufferedInputStream(new FileInputStream(file)));
- in = new DataInputStream(limiter);
- checksum = new PureJavaCrc32C();
- }
-
- String readLogHeader() throws IOException {
- byte[] header = new byte[SegmentedRaftLog.HEADER_BYTES.length];
- int num = in.read(header);
- if (num < header.length) {
- throw new EOFException("EOF before reading a complete log header");
- }
- return new String(header, Charsets.UTF_8);
- }
-
- /**
- * Read a log entry from the input stream.
- *
- * @return the operation read from the stream, or null at the end of the
- * file
- * @throws IOException on error. This function should only throw an
- * exception when skipBrokenEdits is false.
- */
- LogEntryProto readEntry() throws IOException {
- try {
- return decodeEntry();
- } catch (IOException e) {
- in.reset();
-
- throw e;
- } catch (Throwable e) {
- // raft log requires no gap between any two entries. thus if an entry is
- // broken, throw the exception instead of skipping broken entries
- in.reset();
- throw new IOException("got unexpected exception " + e.getMessage(), e);
- }
- }
-
- /**
- * Scan and validate a log entry.
- * @return the index of the log entry
- */
- long scanEntry() throws IOException {
- LogEntryProto entry = decodeEntry();
- return entry != null ? entry.getIndex() : RaftServerConstants.INVALID_LOG_INDEX;
- }
-
- void verifyTerminator() throws IOException {
- // The end of the log should contain 0x00 bytes.
- // If it contains other bytes, the log itself may be corrupt.
- limiter.clearLimit();
- int numRead = -1, idx = 0;
- while (true) {
- try {
- numRead = -1;
- numRead = in.read(temp);
- if (numRead == -1) {
- return;
- }
- for (idx = 0; idx < numRead; idx++) {
- if (temp[idx] != RaftServerConstants.LOG_TERMINATE_BYTE) {
- throw new IOException("Read extra bytes after the terminator!");
- }
- }
- } finally {
- // After reading each group of bytes, we reposition the mark one
- // byte before the next group. Similarly, if there is an error, we
- // want to reposition the mark one byte before the error
- if (numRead != -1) {
- in.reset();
- RaftUtils.skipFully(in, idx);
- in.mark(temp.length + 1);
- RaftUtils.skipFully(in, 1);
- }
- }
- }
- }
-
- /**
- * Decode the log entry "frame". This includes reading the log entry, and
- * validating the checksum.
- *
- * The input stream will be advanced to the end of the op at the end of this
- * function.
- *
- * @return The log entry, or null if we hit EOF.
- */
- private LogEntryProto decodeEntry() throws IOException {
- limiter.setLimit(maxOpSize);
- in.mark(maxOpSize);
-
- byte nextByte;
- try {
- nextByte = in.readByte();
- } catch (EOFException eof) {
- // EOF at an opcode boundary is expected.
- return null;
- }
- // Each log entry starts with a var-int. Thus a valid entry's first byte
- // should not be 0. So if the terminate byte is 0, we should hit the end
- // of the segment.
- if (nextByte == RaftServerConstants.LOG_TERMINATE_BYTE) {
- verifyTerminator();
- return null;
- }
-
- // Here, we verify that the Op size makes sense and that the
- // data matches its checksum before attempting to construct an Op.
- int entryLength = CodedInputStream.readRawVarint32(nextByte, in);
- if (entryLength > maxOpSize) {
- throw new IOException("Entry has size " + entryLength
- + ", but maxOpSize = " + maxOpSize);
- }
-
- final int varintLength = CodedOutputStream.computeUInt32SizeNoTag(
- entryLength);
- final int totalLength = varintLength + entryLength;
- checkBufferSize(totalLength);
- in.reset();
- in.mark(maxOpSize);
- RaftUtils.readFully(in, temp, 0, totalLength);
-
- // verify checksum
- checksum.reset();
- checksum.update(temp, 0, totalLength);
- int expectedChecksum = in.readInt();
- int calculatedChecksum = (int) checksum.getValue();
- if (expectedChecksum != calculatedChecksum) {
- throw new ChecksumException("LogEntry is corrupt. Calculated checksum is "
- + calculatedChecksum + " but read checksum " + expectedChecksum,
- limiter.markPos);
- }
-
- // parse the buffer
- return LogEntryProto.parseFrom(
- CodedInputStream.newInstance(temp, varintLength, entryLength));
- }
-
- private void checkBufferSize(int entryLength) {
- Preconditions.checkArgument(entryLength <= maxOpSize);
- int length = temp.length;
- if (length < entryLength) {
- while (length < entryLength) {
- length = Math.min(length * 2, maxOpSize);
- }
- temp = new byte[length];
- }
- }
-
- long getPos() {
- return limiter.getPos();
- }
-
- void skipFully(long length) throws IOException {
- limiter.clearLimit();
- RaftUtils.skipFully(limiter, length);
- }
-
- @Override
- public void close() throws IOException {
- RaftUtils.cleanup(null, in);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/LogSegment.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/LogSegment.java b/raft-server/src/main/java/org/apache/raft/server/storage/LogSegment.java
deleted file mode 100644
index 987cc6c..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/storage/LogSegment.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.storage;
-
-import com.google.common.base.Preconditions;
-import org.apache.raft.server.impl.ConfigurationManager;
-import org.apache.raft.server.impl.ServerProtoUtils;
-import org.apache.raft.shaded.com.google.protobuf.CodedOutputStream;
-import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
-import org.apache.raft.util.FileUtils;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.apache.raft.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY;
-
-/**
- * In-memory cache for a log segment file. All the updates will be first written
- * into LogSegment then into corresponding files in the same order.
- *
- * This class will be protected by the RaftServer's lock.
- */
-class LogSegment implements Comparable<Long> {
- static class LogRecord {
- /** starting offset in the file */
- final long offset;
- final LogEntryProto entry;
-
- LogRecord(long offset, LogEntryProto entry) {
- this.offset = offset;
- this.entry = entry;
- }
- }
-
- static class SegmentFileInfo {
- final long startIndex; // start index of the
- final long endIndex; // original end index
- final boolean isOpen;
- final long targetLength; // position for truncation
- final long newEndIndex; // new end index after the truncation
-
- SegmentFileInfo(long start, long end, boolean isOpen, long targetLength,
- long newEndIndex) {
- this.startIndex = start;
- this.endIndex = end;
- this.isOpen = isOpen;
- this.targetLength = targetLength;
- this.newEndIndex = newEndIndex;
- }
- }
-
- static long getEntrySize(LogEntryProto entry) {
- final int serialized = entry.getSerializedSize();
- return serialized + CodedOutputStream.computeUInt32SizeNoTag(serialized) + 4;
- }
-
- private boolean isOpen;
- private final List<LogRecord> records = new ArrayList<>();
- private long totalSize;
- private final long startIndex;
- private long endIndex;
-
- private LogSegment(boolean isOpen, long start, long end) {
- this.isOpen = isOpen;
- this.startIndex = start;
- this.endIndex = end;
- totalSize = SegmentedRaftLog.HEADER_BYTES.length;
- }
-
- static LogSegment newOpenSegment(long start) {
- Preconditions.checkArgument(start >= 0);
- return new LogSegment(true, start, start - 1);
- }
-
- private static LogSegment newCloseSegment(long start, long end) {
- Preconditions.checkArgument(start >= 0 && end >= start);
- return new LogSegment(false, start, end);
- }
-
- static LogSegment loadSegment(File file, long start, long end, boolean isOpen,
- ConfigurationManager confManager) throws IOException {
- final LogSegment segment;
- try (LogInputStream in = new LogInputStream(file, start, end, isOpen)) {
- segment = isOpen ? LogSegment.newOpenSegment(start) :
- LogSegment.newCloseSegment(start, end);
- LogEntryProto next;
- LogEntryProto prev = null;
- while ((next = in.nextEntry()) != null) {
- if (prev != null) {
- Preconditions.checkState(next.getIndex() == prev.getIndex() + 1,
- "gap between entry %s and entry %s", prev, next);
- }
- segment.append(next);
- if (confManager != null &&
- next.getLogEntryBodyCase() == CONFIGURATIONENTRY) {
- confManager.addConfiguration(next.getIndex(),
- ServerProtoUtils.toRaftConfiguration(next.getIndex(),
- next.getConfigurationEntry()));
- }
- prev = next;
- }
- }
-
- // truncate padding if necessary
- if (file.length() > segment.getTotalSize()) {
- FileUtils.truncateFile(file, segment.getTotalSize());
- }
-
- Preconditions.checkState(start == segment.records.get(0).entry.getIndex());
- if (!isOpen) {
- Preconditions.checkState(segment.getEndIndex() == end);
- }
- return segment;
- }
-
- long getStartIndex() {
- return startIndex;
- }
-
- long getEndIndex() {
- return endIndex;
- }
-
- boolean isOpen() {
- return isOpen;
- }
-
- int numOfEntries() {
- return (int) (endIndex - startIndex + 1);
- }
-
- void appendToOpenSegment(LogEntryProto... entries) {
- Preconditions.checkState(isOpen(),
- "The log segment %s is not open for append", this.toString());
- append(entries);
- }
-
- private void append(LogEntryProto... entries) {
- Preconditions.checkArgument(entries != null && entries.length > 0);
- final long term = entries[0].getTerm();
- if (records.isEmpty()) {
- Preconditions.checkArgument(entries[0].getIndex() == startIndex,
- "gap between start index %s and first entry to append %s",
- startIndex, entries[0].getIndex());
- }
- for (LogEntryProto entry : entries) {
- // all these entries should be of the same term
- Preconditions.checkArgument(entry.getTerm() == term,
- "expected term:%s, term of the entry:%s", term, entry.getTerm());
- final LogRecord currentLast = getLastRecord();
- if (currentLast != null) {
- Preconditions.checkArgument(
- entry.getIndex() == currentLast.entry.getIndex() + 1,
- "gap between entries %s and %s", entry.getIndex(),
- currentLast.entry.getIndex());
- }
-
- final LogRecord record = new LogRecord(totalSize, entry);
- records.add(record);
- totalSize += getEntrySize(entry);
- endIndex = entry.getIndex();
- }
- }
-
- LogRecord getLogRecord(long index) {
- if (index >= startIndex && index <= endIndex) {
- return records.get((int) (index - startIndex));
- }
- return null;
- }
-
- LogRecord getLastRecord() {
- return records.isEmpty() ? null : records.get(records.size() - 1);
- }
-
- long getTotalSize() {
- return totalSize;
- }
-
- /**
- * Remove records from the given index (inclusive)
- */
- void truncate(long fromIndex) {
- Preconditions.checkArgument(fromIndex >= startIndex && fromIndex <= endIndex);
- LogRecord record = records.get((int) (fromIndex - startIndex));
- for (long index = endIndex; index >= fromIndex; index--) {
- records.remove((int)(index - startIndex));
- }
- totalSize = record.offset;
- isOpen = false;
- this.endIndex = fromIndex - 1;
- }
-
- void close() {
- Preconditions.checkState(isOpen());
- isOpen = false;
- }
-
- @Override
- public String toString() {
- return isOpen() ? "log-" + startIndex + "-inprogress" :
- "log-" + startIndex + "-" + endIndex;
- }
-
- @Override
- public int compareTo(Long l) {
- return (l >= getStartIndex() && l <= getEndIndex()) ? 0 :
- (this.getEndIndex() < l ? -1 : 1);
- }
-
- void clear() {
- records.clear();
- endIndex = startIndex - 1;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/MemoryRaftLog.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/MemoryRaftLog.java b/raft-server/src/main/java/org/apache/raft/server/storage/MemoryRaftLog.java
deleted file mode 100644
index c12e1aa..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/storage/MemoryRaftLog.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.storage;
-
-import com.google.common.base.Preconditions;
-import org.apache.raft.server.impl.RaftConfiguration;
-import org.apache.raft.server.impl.RaftServerConstants;
-import org.apache.raft.server.impl.ServerProtoUtils;
-import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
-import org.apache.raft.util.AutoCloseableLock;
-import org.apache.raft.util.CodeInjectionForTesting;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * A simple RaftLog implementation in memory. Used only for testing.
- */
-public class MemoryRaftLog extends RaftLog {
- private final List<LogEntryProto> entries = new ArrayList<>();
-
- public MemoryRaftLog(String selfId) {
- super(selfId);
- }
-
- @Override
- public LogEntryProto get(long index) {
- checkLogState();
- try(AutoCloseableLock readLock = readLock()) {
- final int i = (int) index;
- return i >= 0 && i < entries.size() ? entries.get(i) : null;
- }
- }
-
- @Override
- public LogEntryProto[] getEntries(long startIndex, long endIndex) {
- checkLogState();
- try(AutoCloseableLock readLock = readLock()) {
- final int i = (int) startIndex;
- if (startIndex >= entries.size()) {
- return null;
- }
- final int toIndex = (int) Math.min(entries.size(), endIndex);
- return entries.subList(i, toIndex).toArray(EMPTY_LOGENTRY_ARRAY);
- }
- }
-
- @Override
- void truncate(long index) {
- checkLogState();
- try(AutoCloseableLock writeLock = writeLock()) {
- Preconditions.checkArgument(index >= 0);
- final int truncateIndex = (int) index;
- for (int i = entries.size() - 1; i >= truncateIndex; i--) {
- entries.remove(i);
- }
- }
- }
-
- @Override
- public LogEntryProto getLastEntry() {
- checkLogState();
- try(AutoCloseableLock readLock = readLock()) {
- final int size = entries.size();
- return size == 0 ? null : entries.get(size - 1);
- }
- }
-
- @Override
- void appendEntry(LogEntryProto entry) {
- checkLogState();
- try(AutoCloseableLock writeLock = writeLock()) {
- entries.add(entry);
- }
- }
-
- @Override
- public long append(long term, RaftConfiguration newConf) {
- checkLogState();
- try(AutoCloseableLock writeLock = writeLock()) {
- final long nextIndex = getNextIndex();
- final LogEntryProto e = ServerProtoUtils.toLogEntryProto(newConf, term,
- nextIndex);
- entries.add(e);
- return nextIndex;
- }
- }
-
- @Override
- public long getStartIndex() {
- return entries.isEmpty() ? RaftServerConstants.INVALID_LOG_INDEX :
- entries.get(0).getIndex();
- }
-
- @Override
- public void append(LogEntryProto... entries) {
- checkLogState();
- try(AutoCloseableLock writeLock = writeLock()) {
- if (entries == null || entries.length == 0) {
- return;
- }
- // Before truncating the entries, we first need to check if some
- // entries are duplicated. If the leader sends entry 6, entry 7, then
- // entry 6 again, without this check the follower may truncate entry 7
- // when receiving entry 6 again. Then before the leader detects this
- // truncation in the next appendEntries RPC, leader may think entry 7 has
- // been committed but in the system the entry has not been committed to
- // the quorum of peers' disks.
- // TODO add a unit test for this
- boolean toTruncate = false;
- int truncateIndex = (int) entries[0].getIndex();
- int index = 0;
- for (; truncateIndex < getNextIndex() && index < entries.length;
- index++, truncateIndex++) {
- if (this.entries.get(truncateIndex).getTerm() !=
- entries[index].getTerm()) {
- toTruncate = true;
- break;
- }
- }
- if (toTruncate) {
- truncate(truncateIndex);
- }
- // Collections.addAll(this.entries, entries);
- for (int i = index; i < entries.length; i++) {
- this.entries.add(entries[i]);
- }
- }
- }
-
- @Override
- public String toString() {
- return "last=" + ServerProtoUtils.toString(getLastEntry())
- + ", committed="
- + ServerProtoUtils.toString(get(getLastCommittedIndex()));
- }
-
- public String getEntryString() {
- return "entries=" + entries;
- }
-
- @Override
- public void logSync() {
- CodeInjectionForTesting.execute(LOG_SYNC, getSelfId(), null);
- // do nothing
- }
-
- @Override
- public long getLatestFlushedIndex() {
- return getNextIndex() - 1;
- }
-
- @Override
- public void writeMetadata(long term, String votedFor) {
- // do nothing
- }
-
- @Override
- public Metadata loadMetadata() {
- return new Metadata(null, 0);
- }
-
- @Override
- public void syncWithSnapshot(long lastSnapshotIndex) {
- // do nothing
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/MetaFile.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/MetaFile.java b/raft-server/src/main/java/org/apache/raft/server/storage/MetaFile.java
deleted file mode 100644
index b2b6f04..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/storage/MetaFile.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.storage;
-
-import com.google.common.base.Charsets;
-import org.apache.raft.util.AtomicFileOutputStream;
-import org.apache.raft.util.RaftUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.*;
-import java.util.Properties;
-
-/**
- * Class that represents a file on disk which persistently stores
- * a single <code>long</code> value. The file is updated atomically
- * and durably (i.e fsynced).
- */
-class MetaFile {
- private static final Logger LOG = LoggerFactory.getLogger(MetaFile.class);
- private static final String TERM_KEY = "term";
- private static final String VOTEDFOR_KEY = "votedFor";
- static final long DEFAULT_TERM = 0;
- static final String EMPTY_VOTEFOR = "";
-
- private final File file;
- private boolean loaded = false;
- private long term;
- private String votedFor;
-
- MetaFile(File file) {
- this.file = file;
- term = DEFAULT_TERM;
- votedFor = EMPTY_VOTEFOR;
- }
-
- boolean exists() {
- return this.file.exists();
- }
-
- long getTerm() throws IOException {
- if (!loaded) {
- readFile();
- loaded = true;
- }
- return term;
- }
-
- String getVotedFor() throws IOException {
- if (!loaded) {
- readFile();
- loaded = true;
- }
- return votedFor;
- }
-
- void set(long newTerm, String newVotedFor) throws IOException {
- newVotedFor = newVotedFor == null ? EMPTY_VOTEFOR : newVotedFor;
- if (!loaded || (newTerm != term || !newVotedFor.equals(votedFor))) {
- writeFile(newTerm, newVotedFor);
- }
- term = newTerm;
- votedFor = newVotedFor;
- loaded = true;
- }
-
- /**
- * Atomically write the given term and votedFor information to the given file,
- * including fsyncing.
- *
- * @throws IOException if the file cannot be written
- */
- void writeFile(long term, String votedFor) throws IOException {
- AtomicFileOutputStream fos = new AtomicFileOutputStream(file);
- Properties properties = new Properties();
- properties.setProperty(TERM_KEY, Long.toString(term));
- properties.setProperty(VOTEDFOR_KEY, votedFor);
- try {
- properties.store(
- new BufferedWriter(new OutputStreamWriter(fos, Charsets.UTF_8)), "");
- fos.close();
- fos = null;
- } finally {
- if (fos != null) {
- fos.abort();
- }
- }
- }
-
- void readFile() throws IOException {
- term = DEFAULT_TERM;
- votedFor = EMPTY_VOTEFOR;
- if (file.exists()) {
- BufferedReader br = new BufferedReader(
- new InputStreamReader(new FileInputStream(file), Charsets.UTF_8));
- try {
- Properties properties = new Properties();
- properties.load(br);
- if (properties.containsKey(TERM_KEY) &&
- properties.containsKey(VOTEDFOR_KEY)) {
- term = Long.parseLong((String) properties.get(TERM_KEY));
- votedFor = (String) properties.get(VOTEDFOR_KEY);
- } else {
- throw new IOException("Corrupted term/votedFor properties: "
- + properties);
- }
- } catch(IOException e) {
- LOG.warn("Cannot load term/votedFor properties from {}", file, e);
- throw e;
- } finally {
- RaftUtils.cleanup(LOG, br);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/RaftLog.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/RaftLog.java b/raft-server/src/main/java/org/apache/raft/server/storage/RaftLog.java
deleted file mode 100644
index de79911..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/storage/RaftLog.java
+++ /dev/null
@@ -1,292 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.storage;
-
-import com.google.common.base.Preconditions;
-import org.apache.raft.server.impl.ConfigurationManager;
-import org.apache.raft.server.impl.RaftConfiguration;
-import org.apache.raft.server.impl.RaftServerConstants;
-import org.apache.raft.server.impl.ServerProtoUtils;
-import org.apache.raft.server.protocol.TermIndex;
-import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
-import org.apache.raft.statemachine.TransactionContext;
-import org.apache.raft.util.AutoCloseableLock;
-import org.apache.raft.util.ProtoUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-/**
- * Base class of RaftLog. Currently we provide two types of RaftLog
- * implementation:
- * 1. MemoryRaftLog: all the log entries are stored in memory. This is only used
- * for testing.
- * 2. Segmented RaftLog: the log entries are persisted on disk, and are stored
- * in segments.
- */
-public abstract class RaftLog implements Closeable {
- public static final Logger LOG = LoggerFactory.getLogger(RaftLog.class);
- public static final LogEntryProto[] EMPTY_LOGENTRY_ARRAY = new LogEntryProto[0];
- public static final String LOG_SYNC = RaftLog.class.getSimpleName() + ".logSync";
-
- /**
- * The largest committed index. Note the last committed log may be included
- * in the latest snapshot file.
- */
- protected final AtomicLong lastCommitted =
- new AtomicLong(RaftServerConstants.INVALID_LOG_INDEX);
- private final String selfId;
-
- private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
- private volatile boolean isOpen = false;
-
- public RaftLog(String selfId) {
- this.selfId = selfId;
- }
-
- public long getLastCommittedIndex() {
- return lastCommitted.get();
- }
-
- public void checkLogState() {
- Preconditions.checkState(isOpen,
- "The RaftLog has not been opened or has been closed");
- }
-
- /**
- * Update the last committed index.
- * @param majorityIndex the index that has achieved majority.
- * @param currentTerm the current term.
- */
- public void updateLastCommitted(long majorityIndex, long currentTerm) {
- try(AutoCloseableLock writeLock = writeLock()) {
- if (lastCommitted.get() < majorityIndex) {
- // Only update last committed index for current term. See �5.4.2 in
- // paper for details.
- final LogEntryProto entry = get(majorityIndex);
- if (entry != null && entry.getTerm() == currentTerm) {
- LOG.debug("{}: Updating lastCommitted to {}", selfId, majorityIndex);
- lastCommitted.set(majorityIndex);
- }
- }
- }
- }
-
- /**
- * Does the log contains the given term and index? Used to check the
- * consistency between the local log of a follower and the log entries sent
- * by the leader.
- */
- public boolean contains(TermIndex ti) {
- if (ti == null) {
- return false;
- }
- LogEntryProto entry = get(ti.getIndex());
- TermIndex local = ServerProtoUtils.toTermIndex(entry);
- return ti.equals(local);
- }
-
- /**
- * @return the index of the next log entry to append.
- */
- public long getNextIndex() {
- final LogEntryProto last = getLastEntry();
- if (last == null) {
- // if the log is empty, the last committed index should be consistent with
- // the last index included in the latest snapshot.
- return getLastCommittedIndex() + 1;
- }
- return last.getIndex() + 1;
- }
-
- /**
- * Generate a log entry for the given term and message, and append the entry.
- * Used by the leader.
- * @return the index of the new log entry.
- */
- public long append(long term, TransactionContext operation) throws IOException {
- checkLogState();
- try(AutoCloseableLock writeLock = writeLock()) {
- final long nextIndex = getNextIndex();
-
- // This is called here to guarantee strict serialization of callback executions in case
- // the SM wants to attach a logic depending on ordered execution in the log commit order.
- operation = operation.preAppendTransaction();
-
- // build the log entry after calling the StateMachine
- final LogEntryProto e = ProtoUtils.toLogEntryProto(
- operation.getSMLogEntry().get(), term, nextIndex);
-
- appendEntry(e);
- operation.setLogEntry(e);
- return nextIndex;
- }
- }
-
- /**
- * Generate a log entry for the given term and configurations,
- * and append the entry. Used by the leader.
- * @return the index of the new log entry.
- */
- public long append(long term, RaftConfiguration newConf) {
- checkLogState();
- try(AutoCloseableLock writeLock = writeLock()) {
- final long nextIndex = getNextIndex();
- final LogEntryProto e = ServerProtoUtils.toLogEntryProto(newConf, term,
- nextIndex);
- appendEntry(e);
- return nextIndex;
- }
- }
-
- public void open(ConfigurationManager confManager, long lastIndexInSnapshot)
- throws IOException {
- isOpen = true;
- }
-
- public abstract long getStartIndex();
-
- /**
- * Get the log entry of the given index.
- *
- * @param index The given index.
- * @return The log entry associated with the given index.
- * Null if there is no log entry with the index.
- */
- public abstract LogEntryProto get(long index);
-
- /**
- * @param startIndex the starting log index (inclusive)
- * @param endIndex the ending log index (exclusive)
- * @return all log entries within the given index range. Null if startIndex
- * is greater than the smallest available index.
- */
- public abstract LogEntryProto[] getEntries(long startIndex, long endIndex);
-
- /**
- * @return the last log entry.
- */
- public abstract LogEntryProto getLastEntry();
-
- /**
- * Truncate the log entries till the given index. The log with the given index
- * will also be truncated (i.e., inclusive).
- */
- abstract void truncate(long index);
-
- /**
- * Used by the leader when appending a new entry based on client's request
- * or configuration change.
- */
- abstract void appendEntry(LogEntryProto entry);
-
- /**
- * Append all the given log entries. Used by the followers.
- *
- * If an existing entry conflicts with a new one (same index but different
- * terms), delete the existing entry and all entries that follow it (�5.3).
- *
- * This method, {@link #append(long, TransactionContext)},
- * {@link #append(long, RaftConfiguration)}, and {@link #truncate(long)},
- * do not guarantee the changes are persisted.
- * Need to call {@link #logSync()} to persist the changes.
- */
- public abstract void append(LogEntryProto... entries);
-
- /**
- * Flush and sync the log.
- * It is triggered by AppendEntries RPC request from the leader.
- */
- public abstract void logSync() throws InterruptedException;
-
- /**
- * @return the index of the latest entry that has been flushed to the local
- * storage.
- */
- public abstract long getLatestFlushedIndex();
-
- /**
- * Write and flush the metadata (votedFor and term) into the meta file.
- *
- * We need to guarantee that the order of writeMetadata calls is the same with
- * that when we change the in-memory term/votedFor. Otherwise we may persist
- * stale term/votedFor in file.
- *
- * Since the leader change is not frequent, currently we simply put this call
- * in the RaftPeer's lock. Later we can use an IO task queue to enforce the
- * order.
- */
- public abstract void writeMetadata(long term, String votedFor)
- throws IOException;
-
- public abstract Metadata loadMetadata() throws IOException;
-
- public abstract void syncWithSnapshot(long lastSnapshotIndex);
-
- @Override
- public String toString() {
- return ServerProtoUtils.toString(getLastEntry());
- }
-
- public static class Metadata {
- private final String votedFor;
- private final long term;
-
- public Metadata(String votedFor, long term) {
- this.votedFor = votedFor;
- this.term = term;
- }
-
- public String getVotedFor() {
- return votedFor;
- }
-
- public long getTerm() {
- return term;
- }
- }
-
- public AutoCloseableLock readLock() {
- return AutoCloseableLock.acquire(lock.readLock());
- }
-
- public AutoCloseableLock writeLock() {
- return AutoCloseableLock.acquire(lock.writeLock());
- }
-
- public boolean hasWriteLock() {
- return this.lock.isWriteLockedByCurrentThread();
- }
-
- public boolean hasReadLock() {
- return this.lock.getReadHoldCount() > 0 || hasWriteLock();
- }
-
- @Override
- public void close() throws IOException {
- isOpen = false;
- }
-
- public String getSelfId() {
- return selfId;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogCache.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogCache.java b/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogCache.java
deleted file mode 100644
index d022a91..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogCache.java
+++ /dev/null
@@ -1,323 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.storage;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.raft.server.impl.RaftServerConstants;
-import org.apache.raft.server.storage.LogSegment.LogRecord;
-import org.apache.raft.server.storage.LogSegment.SegmentFileInfo;
-import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
-
-import java.util.*;
-
-import static org.apache.raft.server.impl.RaftServerConstants.INVALID_LOG_INDEX;
-
-/**
- * In-memory RaftLog Cache. Currently we provide a simple implementation that
- * caches all the segments in the memory. The cache is not thread-safe and
- * requires external lock protection.
- */
-class RaftLogCache {
- private LogSegment openSegment;
- private final List<LogSegment> closedSegments;
-
- RaftLogCache() {
- closedSegments = new ArrayList<>();
- }
-
- private boolean areConsecutiveSegments(LogSegment prev, LogSegment segment) {
- return !prev.isOpen() && prev.getEndIndex() + 1 == segment.getStartIndex();
- }
-
- private LogSegment getLastClosedSegment() {
- return closedSegments.isEmpty() ?
- null : closedSegments.get(closedSegments.size() - 1);
- }
-
- private void validateAdding(LogSegment segment) {
- final LogSegment lastClosed = getLastClosedSegment();
- if (!segment.isOpen()) {
- Preconditions.checkState(lastClosed == null ||
- areConsecutiveSegments(lastClosed, segment));
- } else {
- Preconditions.checkState(openSegment == null &&
- (lastClosed == null || areConsecutiveSegments(lastClosed, segment)));
- }
- }
-
- void addSegment(LogSegment segment) {
- validateAdding(segment);
- if (segment.isOpen()) {
- openSegment = segment;
- } else {
- closedSegments.add(segment);
- }
- }
-
- LogEntryProto getEntry(long index) {
- if (openSegment != null && index >= openSegment.getStartIndex()) {
- final LogRecord record = openSegment.getLogRecord(index);
- return record == null ? null : record.entry;
- } else {
- int segmentIndex = Collections.binarySearch(closedSegments, index);
- if (segmentIndex < 0) {
- return null;
- } else {
- return closedSegments.get(segmentIndex).getLogRecord(index).entry;
- }
- }
- }
-
- /**
- * @param startIndex inclusive
- * @param endIndex exclusive
- */
- LogEntryProto[] getEntries(final long startIndex, final long endIndex) {
- if (startIndex < 0 || startIndex < getStartIndex()) {
- throw new IndexOutOfBoundsException("startIndex = " + startIndex
- + ", log cache starts from index " + getStartIndex());
- }
- if (startIndex > endIndex) {
- throw new IndexOutOfBoundsException("startIndex(" + startIndex
- + ") > endIndex(" + endIndex + ")");
- }
- final long realEnd = Math.min(getEndIndex() + 1, endIndex);
- if (startIndex >= realEnd) {
- return RaftLog.EMPTY_LOGENTRY_ARRAY;
- }
-
- LogEntryProto[] entries = new LogEntryProto[(int) (realEnd - startIndex)];
- int segmentIndex = Collections.binarySearch(closedSegments, startIndex);
- if (segmentIndex < 0) {
- getEntriesFromSegment(openSegment, startIndex, entries, 0, entries.length);
- } else {
- long index = startIndex;
- for (int i = segmentIndex; i < closedSegments.size() && index < realEnd; i++) {
- LogSegment s = closedSegments.get(i);
- int numberFromSegment = (int) Math.min(realEnd - index,
- s.getEndIndex() - index + 1);
- getEntriesFromSegment(s, index, entries, (int) (index - startIndex),
- numberFromSegment);
- index += numberFromSegment;
- }
- if (index < realEnd) {
- getEntriesFromSegment(openSegment, index, entries,
- (int) (index - startIndex), (int) (realEnd - index));
- }
- }
- return entries;
- }
-
- private void getEntriesFromSegment(LogSegment segment, long startIndex,
- LogEntryProto[] entries, int offset, int size) {
- long endIndex = segment.getEndIndex();
- endIndex = Math.min(endIndex, startIndex + size - 1);
- int index = offset;
- for (long i = startIndex; i <= endIndex; i++) {
- entries[index++] = segment.getLogRecord(i).entry;
- }
- }
-
- long getStartIndex() {
- if (closedSegments.isEmpty()) {
- return openSegment != null ? openSegment.getStartIndex() :
- RaftServerConstants.INVALID_LOG_INDEX;
- } else {
- return closedSegments.get(0).getStartIndex();
- }
- }
-
- @VisibleForTesting
- long getEndIndex() {
- return openSegment != null ? openSegment.getEndIndex() :
- (closedSegments.isEmpty() ?
- INVALID_LOG_INDEX :
- closedSegments.get(closedSegments.size() - 1).getEndIndex());
- }
-
- LogEntryProto getLastEntry() {
- return (openSegment != null && openSegment.numOfEntries() > 0) ?
- openSegment.getLastRecord().entry :
- (closedSegments.isEmpty() ? null :
- closedSegments.get(closedSegments.size() - 1).getLastRecord().entry);
- }
-
- LogSegment getOpenSegment() {
- return openSegment;
- }
-
- void appendEntry(LogEntryProto entry) {
- // SegmentedRaftLog does the segment creation/rolling work. Here we just
- // simply append the entry into the open segment.
- Preconditions.checkState(openSegment != null);
- openSegment.appendToOpenSegment(entry);
- }
-
- /**
- * finalize the current open segment, and start a new open segment
- */
- void rollOpenSegment(boolean createNewOpen) {
- Preconditions.checkState(openSegment != null
- && openSegment.numOfEntries() > 0);
- final long nextIndex = openSegment.getEndIndex() + 1;
- openSegment.close();
- closedSegments.add(openSegment);
- if (createNewOpen) {
- openSegment = LogSegment.newOpenSegment(nextIndex);
- } else {
- openSegment = null;
- }
- }
-
- private SegmentFileInfo deleteOpenSegment() {
- final long oldEnd = openSegment.getEndIndex();
- openSegment.clear();
- SegmentFileInfo info = new SegmentFileInfo(openSegment.getStartIndex(),
- oldEnd, true, 0, openSegment.getEndIndex());
- openSegment = null;
- return info;
- }
-
- /**
- * truncate log entries starting from the given index (inclusive)
- */
- TruncationSegments truncate(long index) {
- int segmentIndex = Collections.binarySearch(closedSegments, index);
- if (segmentIndex == -closedSegments.size() - 1) {
- if (openSegment != null && openSegment.getEndIndex() >= index) {
- final long oldEnd = openSegment.getEndIndex();
- if (index == openSegment.getStartIndex()) {
- // the open segment should be deleted
- return new TruncationSegments(null,
- Collections.singletonList(deleteOpenSegment()));
- } else {
- openSegment.truncate(index);
- Preconditions.checkState(!openSegment.isOpen());
- SegmentFileInfo info = new SegmentFileInfo(openSegment.getStartIndex(),
- oldEnd, true, openSegment.getTotalSize(),
- openSegment.getEndIndex());
- closedSegments.add(openSegment);
- openSegment = null;
- return new TruncationSegments(info, Collections.emptyList());
- }
- }
- } else if (segmentIndex >= 0) {
- LogSegment ts = closedSegments.get(segmentIndex);
- final long oldEnd = ts.getEndIndex();
- List<SegmentFileInfo> list = new ArrayList<>();
- ts.truncate(index);
- final int size = closedSegments.size();
- for (int i = size - 1;
- i >= (ts.numOfEntries() == 0 ? segmentIndex : segmentIndex + 1);
- i-- ) {
- LogSegment s = closedSegments.remove(i);
- final long endOfS = i == segmentIndex ? oldEnd : s.getEndIndex();
- s.clear();
- list.add(new SegmentFileInfo(s.getStartIndex(), endOfS, false, 0,
- s.getEndIndex()));
- }
- if (openSegment != null) {
- list.add(deleteOpenSegment());
- }
- SegmentFileInfo t = ts.numOfEntries() == 0 ? null :
- new SegmentFileInfo(ts.getStartIndex(), oldEnd, false,
- ts.getTotalSize(), ts.getEndIndex());
- return new TruncationSegments(t, list);
- }
- return null;
- }
-
- static class TruncationSegments {
- final SegmentFileInfo toTruncate; // name of the file to be truncated
- final SegmentFileInfo[] toDelete; // names of the files to be deleted
-
- TruncationSegments(SegmentFileInfo toTruncate,
- List<SegmentFileInfo> toDelete) {
- this.toDelete = toDelete == null ? null :
- toDelete.toArray(new SegmentFileInfo[toDelete.size()]);
- this.toTruncate = toTruncate;
- }
- }
-
- Iterator<LogEntryProto> iterator(long startIndex) {
- return new EntryIterator(startIndex);
- }
-
- private class EntryIterator implements Iterator<LogEntryProto> {
- private long nextIndex;
- private LogSegment currentSegment;
- private int segmentIndex;
-
- EntryIterator(long start) {
- this.nextIndex = start;
- segmentIndex = Collections.binarySearch(closedSegments, nextIndex);
- if (segmentIndex >= 0) {
- currentSegment = closedSegments.get(segmentIndex);
- } else {
- segmentIndex = -segmentIndex - 1;
- if (segmentIndex == closedSegments.size()) {
- currentSegment = openSegment;
- } else {
- // the start index is smaller than the first closed segment's start
- // index. We no longer keep the log entry (because of the snapshot) or
- // the start index is invalid.
- Preconditions.checkState(segmentIndex == 0);
- throw new IndexOutOfBoundsException();
- }
- }
- }
-
- @Override
- public boolean hasNext() {
- return currentSegment != null &&
- currentSegment.getLogRecord(nextIndex) != null;
- }
-
- @Override
- public LogEntryProto next() {
- LogRecord record;
- if (currentSegment == null ||
- (record = currentSegment.getLogRecord(nextIndex)) == null) {
- throw new NoSuchElementException();
- }
- if (++nextIndex > currentSegment.getEndIndex()) {
- if (currentSegment != openSegment) {
- segmentIndex++;
- currentSegment = segmentIndex == closedSegments.size() ?
- openSegment : closedSegments.get(segmentIndex);
- }
- }
- return record.entry;
- }
- }
-
- @VisibleForTesting
- int getNumOfSegments() {
- return closedSegments.size() + (openSegment == null ? 0 : 1);
- }
-
- boolean isEmpty() {
- return closedSegments.isEmpty() && openSegment == null;
- }
-
- void clear() {
- openSegment = null;
- closedSegments.clear();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogWorker.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogWorker.java b/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogWorker.java
deleted file mode 100644
index 6cef212..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogWorker.java
+++ /dev/null
@@ -1,371 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.storage;
-
-import com.google.common.base.Preconditions;
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.io.nativeio.NativeIO;
-import org.apache.raft.server.impl.RaftServerImpl;
-import org.apache.raft.server.impl.RaftServerConstants;
-import org.apache.raft.server.storage.LogSegment.SegmentFileInfo;
-import org.apache.raft.server.storage.RaftLogCache.TruncationSegments;
-import org.apache.raft.server.storage.SegmentedRaftLog.Task;
-import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
-import org.apache.raft.util.ExitUtils;
-import org.apache.raft.util.FileUtils;
-import org.apache.raft.util.RaftUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.raft.server.RaftServerConfigKeys.RAFT_LOG_FORCE_SYNC_NUM_DEFAULT;
-import static org.apache.raft.server.RaftServerConfigKeys.RAFT_LOG_FORCE_SYNC_NUM_KEY;
-
-/**
- * This class takes the responsibility of all the raft log related I/O ops for a
- * raft peer.
- */
-class RaftLogWorker implements Runnable {
- static final Logger LOG = LoggerFactory.getLogger(RaftLogWorker.class);
- /**
- * The task queue accessed by rpc handler threads and the io worker thread.
- */
- private final BlockingQueue<Task> queue = new ArrayBlockingQueue<>(4096);
- private volatile boolean running = true;
- private final Thread workerThread;
-
- private final RaftStorage storage;
- private LogOutputStream out;
- private final RaftServerImpl raftServer;
-
- /**
- * The number of entries that have been written into the LogOutputStream but
- * has not been flushed.
- */
- private int pendingFlushNum = 0;
- /** the index of the last entry that has been written */
- private long lastWrittenIndex;
- /** the largest index of the entry that has been flushed */
- private volatile long flushedIndex;
-
- private final int forceSyncNum;
-
- private final RaftProperties properties;
-
- RaftLogWorker(RaftServerImpl raftServer, RaftStorage storage,
- RaftProperties properties) {
- this.raftServer = raftServer;
- this.storage = storage;
- this.properties = properties;
- this.forceSyncNum = properties.getInt(RAFT_LOG_FORCE_SYNC_NUM_KEY,
- RAFT_LOG_FORCE_SYNC_NUM_DEFAULT);
- workerThread = new Thread(this,
- getClass().getSimpleName() + " for " + storage);
- }
-
- void start(long latestIndex, File openSegmentFile) throws IOException {
- lastWrittenIndex = latestIndex;
- flushedIndex = latestIndex;
- if (openSegmentFile != null) {
- Preconditions.checkArgument(openSegmentFile.exists());
- out = new LogOutputStream(openSegmentFile, true, properties);
- }
- workerThread.start();
- }
-
- void close() {
- this.running = false;
- workerThread.interrupt();
- try {
- workerThread.join();
- } catch (InterruptedException ignored) {
- }
- }
-
- /**
- * A snapshot has just been installed on the follower. Need to update the IO
- * worker's state accordingly.
- */
- void syncWithSnapshot(long lastSnapshotIndex) {
- queue.clear();
- lastWrittenIndex = lastSnapshotIndex;
- flushedIndex = lastSnapshotIndex;
- pendingFlushNum = 0;
- }
-
- @Override
- public String toString() {
- return this.getClass().getSimpleName() + "-"
- + (raftServer != null ? raftServer.getId() : "");
- }
-
- /**
- * This is protected by the RaftServer and RaftLog's lock.
- */
- private Task addIOTask(Task task) {
- LOG.debug("add task {}", task);
- try {
- if (!queue.offer(task, 1, TimeUnit.SECONDS)) {
- Preconditions.checkState(isAlive(),
- "the worker thread is not alive");
- queue.put(task);
- }
- } catch (Throwable t) {
- if (t instanceof InterruptedException && !running) {
- LOG.info("Got InterruptedException when adding task " + task
- + ". The RaftLogWorker already stopped.");
- } else {
- ExitUtils.terminate(2, "Failed to add IO task " + task, t, LOG);
- }
- }
- return task;
- }
-
- boolean isAlive() {
- return running && workerThread.isAlive();
- }
-
- @Override
- public void run() {
- while (running) {
- try {
- Task task = queue.poll(1, TimeUnit.SECONDS);
- if (task != null) {
- try {
- task.execute();
- } catch (IOException e) {
- if (task.getEndIndex() < lastWrittenIndex) {
- LOG.info("Ignore IOException when handling task " + task
- + " which is smaller than the lastWrittenIndex."
- + " There should be a snapshot installed.", e);
- } else {
- throw e;
- }
- }
- task.done();
- }
- } catch (InterruptedException e) {
- LOG.info(Thread.currentThread().getName()
- + " was interrupted, exiting. There are " + queue.size()
- + " tasks remaining in the queue.");
- } catch (Throwable t) {
- // TODO avoid terminating the jvm by supporting multiple log directories
- ExitUtils.terminate(1, Thread.currentThread().getName() + " failed.", t, LOG);
- }
- }
- }
-
- private boolean shouldFlush() {
- return pendingFlushNum >= forceSyncNum ||
- (pendingFlushNum > 0 && queue.isEmpty());
- }
-
- private void flushWrites() throws IOException {
- if (out != null) {
- LOG.debug("flush data to " + out + ", reset pending_sync_number to 0");
- out.flush();
- updateFlushedIndex();
- }
- }
-
- private void updateFlushedIndex() {
- flushedIndex = lastWrittenIndex;
- pendingFlushNum = 0;
- if (raftServer != null) {
- raftServer.submitLocalSyncEvent();
- }
- }
-
- /**
- * The following several methods (startLogSegment, rollLogSegment,
- * writeLogEntry, and truncate) are only called by SegmentedRaftLog which is
- * protected by RaftServer's lock.
- *
- * Thus all the tasks are created and added sequentially.
- */
- Task startLogSegment(long startIndex) {
- return addIOTask(new StartLogSegment(startIndex));
- }
-
- Task rollLogSegment(LogSegment segmentToClose) {
- addIOTask(new FinalizeLogSegment(segmentToClose));
- return addIOTask(new StartLogSegment(segmentToClose.getEndIndex() + 1));
- }
-
- Task writeLogEntry(LogEntryProto entry) {
- return addIOTask(new WriteLog(entry));
- }
-
- Task truncate(TruncationSegments ts) {
- return addIOTask(new TruncateLog(ts));
- }
-
- // TODO we can add another level of buffer for writing here
- private class WriteLog extends Task {
- private final LogEntryProto entry;
-
- WriteLog(LogEntryProto entry) {
- this.entry = entry;
- }
-
- @Override
- public void execute() throws IOException {
- Preconditions.checkState(out != null);
- Preconditions.checkState(lastWrittenIndex + 1 == entry.getIndex(),
- "lastWrittenIndex == %s, entry == %s", lastWrittenIndex, entry);
- out.write(entry);
- lastWrittenIndex = entry.getIndex();
- pendingFlushNum++;
- if (shouldFlush()) {
- flushWrites();
- }
- }
-
- @Override
- long getEndIndex() {
- return entry.getIndex();
- }
- }
-
- private class FinalizeLogSegment extends Task {
- private final LogSegment segmentToClose;
-
- FinalizeLogSegment(LogSegment segmentToClose) {
- this.segmentToClose = segmentToClose;
- }
-
- @Override
- public void execute() throws IOException {
- RaftUtils.cleanup(null, out);
- out = null;
- Preconditions.checkState(segmentToClose != null);
-
- File openFile = storage.getStorageDir()
- .getOpenLogFile(segmentToClose.getStartIndex());
- Preconditions.checkState(openFile.exists(),
- "File %s does not exist.", openFile);
- if (segmentToClose.numOfEntries() > 0) {
- // finalize the current open segment
- File dstFile = storage.getStorageDir().getClosedLogFile(
- segmentToClose.getStartIndex(), segmentToClose.getEndIndex());
- Preconditions.checkState(!dstFile.exists());
-
- NativeIO.renameTo(openFile, dstFile);
- } else { // delete the file of the empty segment
- FileUtils.deleteFile(openFile);
- }
- updateFlushedIndex();
- }
-
- @Override
- long getEndIndex() {
- return segmentToClose.getEndIndex();
- }
- }
-
- private class StartLogSegment extends Task {
- private final long newStartIndex;
-
- StartLogSegment(long newStartIndex) {
- this.newStartIndex = newStartIndex;
- }
-
- @Override
- void execute() throws IOException {
- File openFile = storage.getStorageDir().getOpenLogFile(newStartIndex);
- Preconditions.checkState(!openFile.exists(), "open file %s exists for %s",
- openFile.getAbsolutePath(), RaftLogWorker.this.toString());
- Preconditions.checkState(out == null && pendingFlushNum == 0);
- out = new LogOutputStream(openFile, false, properties);
- }
-
- @Override
- long getEndIndex() {
- return newStartIndex;
- }
- }
-
- private class TruncateLog extends Task {
- private final TruncationSegments segments;
-
- TruncateLog(TruncationSegments ts) {
- this.segments = ts;
- }
-
- @Override
- void execute() throws IOException {
- RaftUtils.cleanup(null, out);
- out = null;
- if (segments.toTruncate != null) {
- File fileToTruncate = segments.toTruncate.isOpen ?
- storage.getStorageDir().getOpenLogFile(
- segments.toTruncate.startIndex) :
- storage.getStorageDir().getClosedLogFile(
- segments.toTruncate.startIndex,
- segments.toTruncate.endIndex);
- FileUtils.truncateFile(fileToTruncate, segments.toTruncate.targetLength);
-
- // rename the file
- File dstFile = storage.getStorageDir().getClosedLogFile(
- segments.toTruncate.startIndex, segments.toTruncate.newEndIndex);
- Preconditions.checkState(!dstFile.exists());
- NativeIO.renameTo(fileToTruncate, dstFile);
-
- // update lastWrittenIndex
- lastWrittenIndex = segments.toTruncate.newEndIndex;
- }
- if (segments.toDelete != null && segments.toDelete.length > 0) {
- long minStart = segments.toDelete[0].startIndex;
- for (SegmentFileInfo del : segments.toDelete) {
- final File delFile;
- if (del.isOpen) {
- delFile = storage.getStorageDir().getOpenLogFile(del.startIndex);
- } else {
- delFile = storage.getStorageDir()
- .getClosedLogFile(del.startIndex, del.endIndex);
- }
- FileUtils.deleteFile(delFile);
- minStart = Math.min(minStart, del.startIndex);
- }
- if (segments.toTruncate == null) {
- lastWrittenIndex = minStart - 1;
- }
- }
- updateFlushedIndex();
- }
-
- @Override
- long getEndIndex() {
- if (segments.toTruncate != null) {
- return segments.toTruncate.newEndIndex;
- } else if (segments.toDelete.length > 0) {
- return segments.toDelete[segments.toDelete.length - 1].endIndex;
- }
- return RaftServerConstants.INVALID_LOG_INDEX;
- }
- }
-
- long getFlushedIndex() {
- return flushedIndex;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorage.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorage.java b/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorage.java
deleted file mode 100644
index 434f505..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorage.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.storage;
-
-import com.google.common.base.Preconditions;
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.server.impl.RaftServerConstants;
-import org.apache.raft.server.storage.RaftStorageDirectory.StorageState;
-import org.apache.raft.statemachine.SnapshotInfo;
-import org.apache.raft.statemachine.StateMachineStorage;
-import org.apache.raft.util.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-
-import static org.apache.raft.server.RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_DEFAULT;
-import static org.apache.raft.server.RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_KEY;
-
-public class RaftStorage implements Closeable {
- private static final Logger LOG = LoggerFactory.getLogger(RaftStorage.class);
-
- // TODO support multiple storage directories
- private final RaftStorageDirectory storageDir;
- private final StorageState state;
- private volatile MetaFile metaFile;
- private StateMachineStorage stateMachineStorage;
-
- public RaftStorage(RaftProperties prop, RaftServerConstants.StartupOption option)
- throws IOException {
- final String dir = prop.get(RAFT_SERVER_STORAGE_DIR_KEY,
- RAFT_SERVER_STORAGE_DIR_DEFAULT);
- storageDir = new RaftStorageDirectory(
- new File(FileUtils.stringAsURI(dir).getPath()));
- if (option == RaftServerConstants.StartupOption.FORMAT) {
- if (storageDir.analyzeStorage(false) == StorageState.NON_EXISTENT) {
- throw new IOException("Cannot format " + storageDir);
- }
- storageDir.lock();
- format();
- state = storageDir.analyzeStorage(false);
- Preconditions.checkState(state == StorageState.NORMAL);
- } else {
- state = analyzeAndRecoverStorage(true); // metaFile is initialized here
- if (state != StorageState.NORMAL) {
- storageDir.unlock();
- throw new IOException("Cannot load " + storageDir
- + ". Its state: " + state);
- }
- }
- }
-
- StorageState getState() {
- return state;
- }
-
- private void format() throws IOException {
- storageDir.clearDirectory();
- metaFile = writeMetaFile(MetaFile.DEFAULT_TERM, MetaFile.EMPTY_VOTEFOR);
- LOG.info("Storage directory " + storageDir.getRoot()
- + " has been successfully formatted.");
- }
-
- private MetaFile writeMetaFile(long term, String votedFor) throws IOException {
- MetaFile metaFile = new MetaFile(storageDir.getMetaFile());
- metaFile.set(term, votedFor);
- return metaFile;
- }
-
- private void cleanMetaTmpFile() throws IOException {
- Files.deleteIfExists(storageDir.getMetaTmpFile().toPath());
- }
-
- private StorageState analyzeAndRecoverStorage(boolean toLock)
- throws IOException {
- StorageState storageState = storageDir.analyzeStorage(toLock);
- if (storageState == StorageState.NORMAL) {
- metaFile = new MetaFile(storageDir.getMetaFile());
- assert metaFile.exists();
- metaFile.readFile();
- // Existence of raft-meta.tmp means the change of votedFor/term has not
- // been committed. Thus we should delete the tmp file.
- cleanMetaTmpFile();
- return StorageState.NORMAL;
- } else if (storageState == StorageState.NOT_FORMATTED &&
- storageDir.isCurrentEmpty()) {
- format();
- return StorageState.NORMAL;
- } else {
- return storageState;
- }
- }
-
- public RaftStorageDirectory getStorageDir() {
- return storageDir;
- }
-
- @Override
- public void close() throws IOException {
- storageDir.unlock();
- }
-
- MetaFile getMetaFile() {
- return metaFile;
- }
-
- public SnapshotInfo getLastestSnapshot() throws IOException {
- return getStateMachineStorage().getLatestSnapshot();
- }
-
- /**
- * Called by the state machine after it has initialized the StateMachineStorage.
- */
- public void setStateMachineStorage(StateMachineStorage smStorage) {
- this.stateMachineStorage = smStorage;
- }
-
- public StateMachineStorage getStateMachineStorage() {
- return stateMachineStorage;
- }
-
- @Override
- public String toString() {
- return getStorageDir() + "";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorageDirectory.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorageDirectory.java b/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorageDirectory.java
deleted file mode 100644
index 662e4ec..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorageDirectory.java
+++ /dev/null
@@ -1,360 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.storage;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Charsets;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import org.apache.raft.util.AtomicFileOutputStream;
-import org.apache.raft.util.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.lang.management.ManagementFactory;
-import java.nio.channels.FileLock;
-import java.nio.channels.OverlappingFileLockException;
-import java.nio.file.DirectoryStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.*;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import static java.nio.file.Files.newDirectoryStream;
-import static org.apache.raft.server.impl.RaftServerConstants.INVALID_LOG_INDEX;
-
-public class RaftStorageDirectory {
- static final Logger LOG = LoggerFactory.getLogger(RaftStorageDirectory.class);
-
- static final String STORAGE_DIR_CURRENT = "current";
- static final String STORAGE_FILE_LOCK = "in_use.lock";
- static final String META_FILE_NAME = "raft-meta";
- static final String LOG_FILE_INPROGRESS = "inprogress";
- static final String LOG_FILE_PREFIX = "log";
- static final String STATE_MACHINE = "sm"; // directory containing state machine snapshots
- static final String TEMP = "tmp";
- static final Pattern CLOSED_SEGMENT_REGEX = Pattern.compile("log_(\\d+)-(\\d+)");
- static final Pattern OPEN_SEGMENT_REGEX = Pattern.compile("log_inprogress_(\\d+)(?:\\..*)?");
-
- private static final List<Pattern> LOGSEGMENTS_REGEXES =
- ImmutableList.of(CLOSED_SEGMENT_REGEX, OPEN_SEGMENT_REGEX);
-
- enum StorageState {
- NON_EXISTENT,
- NOT_FORMATTED,
- NORMAL
- }
-
- public static class LogPathAndIndex {
- public final Path path;
- public final long startIndex;
- public final long endIndex;
-
- LogPathAndIndex(Path path, long startIndex, long endIndex) {
- this.path = path;
- this.startIndex = startIndex;
- this.endIndex = endIndex;
- }
-
- @Override
- public String toString() {
- return path + "-" + startIndex + "-" + endIndex;
- }
- }
-
- private final File root; // root directory
- private FileLock lock; // storage lock
-
- /**
- * Constructor
- * @param dir directory corresponding to the storage
- */
- RaftStorageDirectory(File dir) {
- this.root = dir;
- this.lock = null;
- }
-
- /**
- * Get root directory of this storage
- */
- //TODO
- public File getRoot() {
- return root;
- }
-
- /**
- * Clear and re-create storage directory.
- * <p>
- * Removes contents of the current directory and creates an empty directory.
- *
- * This does not fully format storage directory.
- * It cannot write the version file since it should be written last after
- * all other storage type dependent files are written.
- * Derived storage is responsible for setting specific storage values and
- * writing the version file to disk.
- */
- void clearDirectory() throws IOException {
- File curDir = this.getCurrentDir();
- clearDirectory(curDir);
- clearDirectory(getStateMachineDir());
- }
-
- void clearDirectory(File dir) throws IOException {
- if (dir.exists()) {
- File[] files = FileUtils.listFiles(dir);
- LOG.info("Will remove files: " + Arrays.toString(files));
- if (!(FileUtils.fullyDelete(dir)))
- throw new IOException("Cannot remove directory: " + dir);
- }
- if (!dir.mkdirs())
- throw new IOException("Cannot create directory " + dir);
- }
-
- /**
- * Directory {@code current} contains latest files defining
- * the file system meta-data.
- *
- * @return the directory path
- */
- File getCurrentDir() {
- return new File(root, STORAGE_DIR_CURRENT);
- }
-
- File getMetaFile() {
- return new File(getCurrentDir(), META_FILE_NAME);
- }
-
- File getMetaTmpFile() {
- return new File(getCurrentDir(), META_FILE_NAME
- + AtomicFileOutputStream.TMP_EXTENSION);
- }
-
- File getOpenLogFile(long startIndex) {
- return new File(getCurrentDir(), getOpenLogFileName(startIndex));
- }
-
- static String getOpenLogFileName(long startIndex) {
- return LOG_FILE_PREFIX + "_" + LOG_FILE_INPROGRESS + "_" + startIndex;
- }
-
- File getClosedLogFile(long startIndex, long endIndex) {
- return new File(getCurrentDir(), getClosedLogFileName(startIndex, endIndex));
- }
-
- static String getClosedLogFileName(long startIndex, long endIndex) {
- return LOG_FILE_PREFIX + "_" + startIndex + "-" + endIndex;
- }
-
- public File getStateMachineDir() {
- return new File(getRoot(), STATE_MACHINE);
- }
-
- /** Returns a uniquely named temporary directory under $rootdir/tmp/ */
- public File getNewTempDir() {
- return new File(new File(getRoot(), TEMP), UUID.randomUUID().toString());
- }
-
- public Path relativizeToRoot(Path p) {
- if (p.isAbsolute()) {
- return getRoot().toPath().relativize(p);
- }
- return p;
- }
-
- /**
- * @return log segment files sorted based on their index.
- */
- @VisibleForTesting
- public List<LogPathAndIndex> getLogSegmentFiles() throws IOException {
- List<LogPathAndIndex> list = new ArrayList<>();
- try (DirectoryStream<Path> stream =
- Files.newDirectoryStream(getCurrentDir().toPath())) {
- for (Path path : stream) {
- for (Pattern pattern : LOGSEGMENTS_REGEXES) {
- Matcher matcher = pattern.matcher(path.getFileName().toString());
- if (matcher.matches()) {
- final long startIndex = Long.parseLong(matcher.group(1));
- final long endIndex = matcher.groupCount() == 2 ?
- Long.parseLong(matcher.group(2)) : INVALID_LOG_INDEX;
- list.add(new LogPathAndIndex(path, startIndex, endIndex));
- }
- }
- }
- }
- Collections.sort(list,
- (o1, o2) -> o1.startIndex == o2.startIndex ?
- 0 : (o1.startIndex < o2.startIndex ? -1 : 1));
- return list;
- }
-
- /**
- * Check to see if current/ directory is empty.
- */
- boolean isCurrentEmpty() throws IOException {
- File currentDir = getCurrentDir();
- if(!currentDir.exists()) {
- // if current/ does not exist, it's safe to format it.
- return true;
- }
- try(DirectoryStream<Path> dirStream =
- newDirectoryStream(currentDir.toPath())) {
- if (dirStream.iterator().hasNext()) {
- return false;
- }
- }
- return true;
- }
-
- /**
- * Check consistency of the storage directory.
- *
- * @return state {@link StorageState} of the storage directory
- */
- StorageState analyzeStorage(boolean toLock) throws IOException {
- Preconditions.checkState(root != null, "root directory is null");
-
- String rootPath = root.getCanonicalPath();
- try { // check that storage exists
- if (!root.exists()) {
- LOG.info(rootPath + " does not exist. Creating ...");
- if (!root.mkdirs()) {
- throw new IOException("Cannot create directory " + rootPath);
- }
- }
- // or is inaccessible
- if (!root.isDirectory()) {
- LOG.warn(rootPath + "is not a directory");
- return StorageState.NON_EXISTENT;
- }
- if (!FileUtils.canWrite(root)) {
- LOG.warn("Cannot access storage directory " + rootPath);
- return StorageState.NON_EXISTENT;
- }
- } catch(SecurityException ex) {
- LOG.warn("Cannot access storage directory " + rootPath, ex);
- return StorageState.NON_EXISTENT;
- }
-
- if (toLock) {
- this.lock(); // lock storage if it exists
- }
-
- // check whether current directory is valid
- if (hasMetaFile()) {
- return StorageState.NORMAL;
- } else {
- return StorageState.NOT_FORMATTED;
- }
- }
-
- boolean hasMetaFile() throws IOException {
- return getMetaFile().exists();
- }
-
- /**
- * Lock storage to provide exclusive access.
- *
- * <p> Locking is not supported by all file systems.
- * E.g., NFS does not consistently support exclusive locks.
- *
- * <p> If locking is supported we guarantee exclusive access to the
- * storage directory. Otherwise, no guarantee is given.
- *
- * @throws IOException if locking fails
- */
- public void lock() throws IOException {
- FileLock newLock = tryLock();
- if (newLock == null) {
- String msg = "Cannot lock storage " + this.root
- + ". The directory is already locked";
- LOG.info(msg);
- throw new IOException(msg);
- }
- // Don't overwrite lock until success - this way if we accidentally
- // call lock twice, the internal state won't be cleared by the second
- // (failed) lock attempt
- lock = newLock;
- }
-
- /**
- * Attempts to acquire an exclusive lock on the storage.
- *
- * @return A lock object representing the newly-acquired lock or
- * <code>null</code> if storage is already locked.
- * @throws IOException if locking fails.
- */
- private FileLock tryLock() throws IOException {
- boolean deletionHookAdded = false;
- File lockF = new File(root, STORAGE_FILE_LOCK);
- if (!lockF.exists()) {
- lockF.deleteOnExit();
- deletionHookAdded = true;
- }
- RandomAccessFile file = new RandomAccessFile(lockF, "rws");
- String jvmName = ManagementFactory.getRuntimeMXBean().getName();
- FileLock res;
- try {
- res = file.getChannel().tryLock();
- if (null == res) {
- LOG.error("Unable to acquire file lock on path " + lockF.toString());
- throw new OverlappingFileLockException();
- }
- file.write(jvmName.getBytes(Charsets.UTF_8));
- LOG.info("Lock on " + lockF + " acquired by nodename " + jvmName);
- } catch (OverlappingFileLockException oe) {
- // Cannot read from the locked file on Windows.
- LOG.error("It appears that another process "
- + "has already locked the storage directory: " + root, oe);
- file.close();
- return null;
- } catch(IOException e) {
- LOG.error("Failed to acquire lock on " + lockF
- + ". If this storage directory is mounted via NFS, "
- + "ensure that the appropriate nfs lock services are running.", e);
- file.close();
- throw e;
- }
- if (!deletionHookAdded) {
- // If the file existed prior to our startup, we didn't
- // call deleteOnExit above. But since we successfully locked
- // the dir, we can take care of cleaning it up.
- lockF.deleteOnExit();
- }
- return res;
- }
-
- /**
- * Unlock storage.
- */
- public void unlock() throws IOException {
- if (this.lock == null)
- return;
- this.lock.release();
- lock.channel().close();
- lock = null;
- }
-
- @Override
- public String toString() {
- return "Storage Directory " + this.root;
- }
-}