You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ji...@apache.org on 2017/01/31 21:17:21 UTC

[36/54] [abbrv] incubator-ratis git commit: Renamed the packages from raft to ratis in preperation for Apache Incubation - Moved all java packages from org.apache.raft to org.apache.ratis. - Moved native package to org_apache_ratis, and native lib to l

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/LogOutputStream.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/LogOutputStream.java b/raft-server/src/main/java/org/apache/raft/server/storage/LogOutputStream.java
deleted file mode 100644
index 0dc8029..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/storage/LogOutputStream.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.storage;
-
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.server.impl.RaftServerConstants;
-import org.apache.raft.shaded.com.google.protobuf.CodedOutputStream;
-import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
-import org.apache.raft.util.PureJavaCrc32C;
-import org.apache.raft.util.RaftUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.zip.Checksum;
-
-import static org.apache.raft.server.RaftServerConfigKeys.*;
-
-public class LogOutputStream implements Closeable {
-  private static final Logger LOG = LoggerFactory.getLogger(LogOutputStream.class);
-
-  private static final ByteBuffer fill;
-  private static final int BUFFER_SIZE = 1024 * 1024; // 1 MB
-  static {
-    fill = ByteBuffer.allocateDirect(BUFFER_SIZE);
-    fill.position(0);
-    for (int i = 0; i < fill.capacity(); i++) {
-      fill.put(RaftServerConstants.LOG_TERMINATE_BYTE);
-    }
-  }
-
-  private File file;
-  private FileChannel fc; // channel of the file stream for sync
-  private BufferedWriteChannel out; // buffered FileChannel for writing
-  private final Checksum checksum;
-
-  private final long segmentMaxSize;
-  private final long preallocatedSize;
-  private long preallocatedPos;
-
-  public LogOutputStream(File file, boolean append, RaftProperties properties)
-      throws IOException {
-    this.file = file;
-    this.checksum = new PureJavaCrc32C();
-    this.segmentMaxSize = properties.getLong(RAFT_LOG_SEGMENT_MAX_SIZE_KEY,
-        RAFT_LOG_SEGMENT_MAX_SIZE_DEFAULT);
-    this.preallocatedSize = properties.getLong(
-        RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_KEY,
-        RAFT_LOG_SEGMENT_PREALLOCATED_SIZE_DEFAULT);
-    RandomAccessFile rp = new RandomAccessFile(file, "rw");
-    fc = rp.getChannel();
-    fc.position(fc.size());
-    preallocatedPos = fc.size();
-
-    int bufferSize = properties.getInt(RAFT_LOG_WRITE_BUFFER_SIZE_KEY,
-        RAFT_LOG_WRITE_BUFFER_SIZE_DEFAULT);
-    out = new BufferedWriteChannel(fc, bufferSize);
-
-    if (!append) {
-      create();
-    }
-  }
-
-  /**
-   * Format:
-   * LogEntryProto's protobuf
-   * 4-byte checksum of the above protobuf
-   */
-  public void write(LogEntryProto entry) throws IOException {
-    final int serialized = entry.getSerializedSize();
-    final int bufferSize = CodedOutputStream.computeUInt32SizeNoTag(serialized)
-        + serialized;
-
-    preallocateIfNecessary(bufferSize + 4);
-
-    byte[] buf = new byte[bufferSize];
-    CodedOutputStream cout = CodedOutputStream.newInstance(buf);
-    cout.writeUInt32NoTag(serialized);
-    entry.writeTo(cout);
-
-    checksum.reset();
-    checksum.update(buf, 0, buf.length);
-    final int sum = (int) checksum.getValue();
-
-    out.write(buf);
-    writeInt(sum);
-  }
-
-  private void writeInt(int v) throws IOException {
-    out.write((v >>> 24) & 0xFF);
-    out.write((v >>> 16) & 0xFF);
-    out.write((v >>>  8) & 0xFF);
-    out.write((v) & 0xFF);
-  }
-
-  private void create() throws IOException {
-    fc.truncate(0);
-    fc.position(0);
-    preallocatedPos = 0;
-    preallocate(); // preallocate file
-
-    out.write(SegmentedRaftLog.HEADER_BYTES);
-    flush();
-  }
-
-  @Override
-  public void close() throws IOException {
-    try {
-      out.flush(false);
-      if (fc != null && fc.isOpen()) {
-        fc.truncate(fc.position());
-      }
-    } finally {
-      RaftUtils.cleanup(LOG, fc, out);
-      fc = null;
-      out = null;
-    }
-  }
-
-  /**
-   * Flush data to persistent store.
-   * Collect sync metrics.
-   */
-  public void flush() throws IOException {
-    if (out == null) {
-      throw new IOException("Trying to use aborted output stream");
-    }
-    out.flush(true);
-  }
-
-  private void preallocate() throws IOException {
-    fill.position(0);
-    long targetSize = Math.min(segmentMaxSize - fc.size(), preallocatedSize);
-    int allocated = 0;
-    while (allocated < targetSize) {
-      int size = (int) Math.min(BUFFER_SIZE, targetSize - allocated);
-      ByteBuffer buffer = fill.slice();
-      buffer.limit(size);
-      RaftUtils.writeFully(fc, buffer, preallocatedPos);
-      preallocatedPos += size;
-      allocated += size;
-    }
-    LOG.debug("Pre-allocated {} bytes for the log segment", allocated);
-  }
-
-  private void preallocateIfNecessary(int size) throws IOException {
-    if (out.position() + size > preallocatedPos) {
-      preallocate();
-    }
-  }
-
-  @Override
-  public String toString() {
-    return this.getClass().getSimpleName() + "(" + file + ")";
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/LogReader.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/LogReader.java b/raft-server/src/main/java/org/apache/raft/server/storage/LogReader.java
deleted file mode 100644
index 9523cac..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/storage/LogReader.java
+++ /dev/null
@@ -1,292 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.storage;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.io.Charsets;
-import org.apache.raft.protocol.ChecksumException;
-import org.apache.raft.server.impl.RaftServerConstants;
-import org.apache.raft.shaded.com.google.protobuf.CodedInputStream;
-import org.apache.raft.shaded.com.google.protobuf.CodedOutputStream;
-import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
-import org.apache.raft.util.PureJavaCrc32C;
-import org.apache.raft.util.RaftUtils;
-
-import java.io.*;
-import java.util.zip.Checksum;
-
-public class LogReader implements Closeable {
-  /**
-   * InputStream wrapper that keeps track of the current stream position.
-   *
-   * This stream also allows us to set a limit on how many bytes we can read
-   * without getting an exception.
-   */
-  public static class LimitedInputStream extends FilterInputStream {
-    private long curPos = 0;
-    private long markPos = -1;
-    private long limitPos = Long.MAX_VALUE;
-
-    public LimitedInputStream(InputStream is) {
-      super(is);
-    }
-
-    private void checkLimit(long amt) throws IOException {
-      long extra = (curPos + amt) - limitPos;
-      if (extra > 0) {
-        throw new IOException("Tried to read " + amt + " byte(s) past " +
-            "the limit at offset " + limitPos);
-      }
-    }
-
-    @Override
-    public int read() throws IOException {
-      checkLimit(1);
-      int ret = super.read();
-      if (ret != -1) curPos++;
-      return ret;
-    }
-
-    @Override
-    public int read(byte[] data) throws IOException {
-      checkLimit(data.length);
-      int ret = super.read(data);
-      if (ret > 0) curPos += ret;
-      return ret;
-    }
-
-    @Override
-    public int read(byte[] data, int offset, int length) throws IOException {
-      checkLimit(length);
-      int ret = super.read(data, offset, length);
-      if (ret > 0) curPos += ret;
-      return ret;
-    }
-
-    public void setLimit(long limit) {
-      limitPos = curPos + limit;
-    }
-
-    public void clearLimit() {
-      limitPos = Long.MAX_VALUE;
-    }
-
-    @Override
-    public void mark(int limit) {
-      super.mark(limit);
-      markPos = curPos;
-    }
-
-    @Override
-    public void reset() throws IOException {
-      if (markPos == -1) {
-        throw new IOException("Not marked!");
-      }
-      super.reset();
-      curPos = markPos;
-      markPos = -1;
-    }
-
-    public long getPos() {
-      return curPos;
-    }
-
-    @Override
-    public long skip(long amt) throws IOException {
-      long extra = (curPos + amt) - limitPos;
-      if (extra > 0) {
-        throw new IOException("Tried to skip " + extra + " bytes past " +
-            "the limit at offset " + limitPos);
-      }
-      long ret = super.skip(amt);
-      curPos += ret;
-      return ret;
-    }
-  }
-
-  private static final int maxOpSize = 32 * 1024 * 1024;
-
-  private final LimitedInputStream limiter;
-  private final DataInputStream in;
-  private byte[] temp = new byte[4096];
-  private final Checksum checksum;
-
-  LogReader(File file) throws FileNotFoundException {
-    this.limiter = new LimitedInputStream(
-        new BufferedInputStream(new FileInputStream(file)));
-    in = new DataInputStream(limiter);
-    checksum = new PureJavaCrc32C();
-  }
-
-  String readLogHeader() throws IOException {
-    byte[] header = new byte[SegmentedRaftLog.HEADER_BYTES.length];
-    int num = in.read(header);
-    if (num < header.length) {
-      throw new EOFException("EOF before reading a complete log header");
-    }
-    return new String(header, Charsets.UTF_8);
-  }
-
-  /**
-   * Read a log entry from the input stream.
-   *
-   * @return the operation read from the stream, or null at the end of the
-   *         file
-   * @throws IOException on error.  This function should only throw an
-   *         exception when skipBrokenEdits is false.
-   */
-  LogEntryProto readEntry() throws IOException {
-    try {
-      return decodeEntry();
-    } catch (IOException e) {
-      in.reset();
-
-      throw e;
-    } catch (Throwable e) {
-      // raft log requires no gap between any two entries. thus if an entry is
-      // broken, throw the exception instead of skipping broken entries
-      in.reset();
-      throw new IOException("got unexpected exception " + e.getMessage(), e);
-    }
-  }
-
-  /**
-   * Scan and validate a log entry.
-   * @return the index of the log entry
-   */
-  long scanEntry() throws IOException {
-    LogEntryProto entry = decodeEntry();
-    return entry != null ? entry.getIndex() : RaftServerConstants.INVALID_LOG_INDEX;
-  }
-
-  void verifyTerminator() throws IOException {
-     // The end of the log should contain 0x00 bytes.
-     // If it contains other bytes, the log itself may be corrupt.
-    limiter.clearLimit();
-    int numRead = -1, idx = 0;
-    while (true) {
-      try {
-        numRead = -1;
-        numRead = in.read(temp);
-        if (numRead == -1) {
-          return;
-        }
-        for (idx = 0; idx < numRead; idx++) {
-          if (temp[idx] != RaftServerConstants.LOG_TERMINATE_BYTE) {
-            throw new IOException("Read extra bytes after the terminator!");
-          }
-        }
-      } finally {
-        // After reading each group of bytes, we reposition the mark one
-        // byte before the next group. Similarly, if there is an error, we
-        // want to reposition the mark one byte before the error
-        if (numRead != -1) {
-          in.reset();
-          RaftUtils.skipFully(in, idx);
-          in.mark(temp.length + 1);
-          RaftUtils.skipFully(in, 1);
-        }
-      }
-    }
-  }
-
-  /**
-   * Decode the log entry "frame". This includes reading the log entry, and
-   * validating the checksum.
-   *
-   * The input stream will be advanced to the end of the op at the end of this
-   * function.
-   *
-   * @return The log entry, or null if we hit EOF.
-   */
-  private LogEntryProto decodeEntry() throws IOException {
-    limiter.setLimit(maxOpSize);
-    in.mark(maxOpSize);
-
-    byte nextByte;
-    try {
-      nextByte = in.readByte();
-    } catch (EOFException eof) {
-      // EOF at an opcode boundary is expected.
-      return null;
-    }
-    // Each log entry starts with a var-int. Thus a valid entry's first byte
-    // should not be 0. So if the terminate byte is 0, we should hit the end
-    // of the segment.
-    if (nextByte == RaftServerConstants.LOG_TERMINATE_BYTE) {
-      verifyTerminator();
-      return null;
-    }
-
-    // Here, we verify that the Op size makes sense and that the
-    // data matches its checksum before attempting to construct an Op.
-    int entryLength = CodedInputStream.readRawVarint32(nextByte, in);
-    if (entryLength > maxOpSize) {
-      throw new IOException("Entry has size " + entryLength
-          + ", but maxOpSize = " + maxOpSize);
-    }
-
-    final int varintLength = CodedOutputStream.computeUInt32SizeNoTag(
-        entryLength);
-    final int totalLength = varintLength + entryLength;
-    checkBufferSize(totalLength);
-    in.reset();
-    in.mark(maxOpSize);
-    RaftUtils.readFully(in, temp, 0, totalLength);
-
-    // verify checksum
-    checksum.reset();
-    checksum.update(temp, 0, totalLength);
-    int expectedChecksum = in.readInt();
-    int calculatedChecksum = (int) checksum.getValue();
-    if (expectedChecksum != calculatedChecksum) {
-      throw new ChecksumException("LogEntry is corrupt. Calculated checksum is "
-          + calculatedChecksum + " but read checksum " + expectedChecksum,
-          limiter.markPos);
-    }
-
-    // parse the buffer
-    return LogEntryProto.parseFrom(
-        CodedInputStream.newInstance(temp, varintLength, entryLength));
-  }
-
-  private void checkBufferSize(int entryLength) {
-    Preconditions.checkArgument(entryLength <= maxOpSize);
-    int length = temp.length;
-    if (length < entryLength) {
-      while (length < entryLength) {
-        length = Math.min(length * 2, maxOpSize);
-      }
-      temp = new byte[length];
-    }
-  }
-
-  long getPos() {
-    return limiter.getPos();
-  }
-
-  void skipFully(long length) throws IOException {
-    limiter.clearLimit();
-    RaftUtils.skipFully(limiter, length);
-  }
-
-  @Override
-  public void close() throws IOException {
-    RaftUtils.cleanup(null, in);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/LogSegment.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/LogSegment.java b/raft-server/src/main/java/org/apache/raft/server/storage/LogSegment.java
deleted file mode 100644
index 987cc6c..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/storage/LogSegment.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.storage;
-
-import com.google.common.base.Preconditions;
-import org.apache.raft.server.impl.ConfigurationManager;
-import org.apache.raft.server.impl.ServerProtoUtils;
-import org.apache.raft.shaded.com.google.protobuf.CodedOutputStream;
-import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
-import org.apache.raft.util.FileUtils;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.apache.raft.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY;
-
-/**
- * In-memory cache for a log segment file. All the updates will be first written
- * into LogSegment then into corresponding files in the same order.
- *
- * This class will be protected by the RaftServer's lock.
- */
-class LogSegment implements Comparable<Long> {
-  static class LogRecord {
-    /** starting offset in the file */
-    final long offset;
-    final LogEntryProto entry;
-
-    LogRecord(long offset, LogEntryProto entry) {
-      this.offset = offset;
-      this.entry = entry;
-    }
-  }
-
-  static class SegmentFileInfo {
-    final long startIndex; // start index of the
-    final long endIndex; // original end index
-    final boolean isOpen;
-    final long targetLength; // position for truncation
-    final long newEndIndex; // new end index after the truncation
-
-    SegmentFileInfo(long start, long end, boolean isOpen, long targetLength,
-        long newEndIndex) {
-      this.startIndex = start;
-      this.endIndex = end;
-      this.isOpen = isOpen;
-      this.targetLength = targetLength;
-      this.newEndIndex = newEndIndex;
-    }
-  }
-
-  static long getEntrySize(LogEntryProto entry) {
-    final int serialized = entry.getSerializedSize();
-    return serialized + CodedOutputStream.computeUInt32SizeNoTag(serialized) + 4;
-  }
-
-  private boolean isOpen;
-  private final List<LogRecord> records = new ArrayList<>();
-  private long totalSize;
-  private final long startIndex;
-  private long endIndex;
-
-  private LogSegment(boolean isOpen, long start, long end) {
-    this.isOpen = isOpen;
-    this.startIndex = start;
-    this.endIndex = end;
-    totalSize = SegmentedRaftLog.HEADER_BYTES.length;
-  }
-
-  static LogSegment newOpenSegment(long start) {
-    Preconditions.checkArgument(start >= 0);
-    return new LogSegment(true, start, start - 1);
-  }
-
-  private static LogSegment newCloseSegment(long start, long end) {
-    Preconditions.checkArgument(start >= 0 && end >= start);
-    return new LogSegment(false, start, end);
-  }
-
-  static LogSegment loadSegment(File file, long start, long end, boolean isOpen,
-      ConfigurationManager confManager) throws IOException {
-    final LogSegment segment;
-    try (LogInputStream in = new LogInputStream(file, start, end, isOpen)) {
-      segment = isOpen ? LogSegment.newOpenSegment(start) :
-          LogSegment.newCloseSegment(start, end);
-      LogEntryProto next;
-      LogEntryProto prev = null;
-      while ((next = in.nextEntry()) != null) {
-        if (prev != null) {
-          Preconditions.checkState(next.getIndex() == prev.getIndex() + 1,
-              "gap between entry %s and entry %s", prev, next);
-        }
-        segment.append(next);
-        if (confManager != null &&
-            next.getLogEntryBodyCase() == CONFIGURATIONENTRY) {
-          confManager.addConfiguration(next.getIndex(),
-              ServerProtoUtils.toRaftConfiguration(next.getIndex(),
-                  next.getConfigurationEntry()));
-        }
-        prev = next;
-      }
-    }
-
-    // truncate padding if necessary
-    if (file.length() > segment.getTotalSize()) {
-      FileUtils.truncateFile(file, segment.getTotalSize());
-    }
-
-    Preconditions.checkState(start == segment.records.get(0).entry.getIndex());
-    if (!isOpen) {
-      Preconditions.checkState(segment.getEndIndex() == end);
-    }
-    return segment;
-  }
-
-  long getStartIndex() {
-    return startIndex;
-  }
-
-  long getEndIndex() {
-    return endIndex;
-  }
-
-  boolean isOpen() {
-    return isOpen;
-  }
-
-  int numOfEntries() {
-    return (int) (endIndex - startIndex + 1);
-  }
-
-  void appendToOpenSegment(LogEntryProto... entries) {
-    Preconditions.checkState(isOpen(),
-        "The log segment %s is not open for append", this.toString());
-    append(entries);
-  }
-
-  private void append(LogEntryProto... entries) {
-    Preconditions.checkArgument(entries != null && entries.length > 0);
-    final long term = entries[0].getTerm();
-    if (records.isEmpty()) {
-      Preconditions.checkArgument(entries[0].getIndex() == startIndex,
-          "gap between start index %s and first entry to append %s",
-          startIndex, entries[0].getIndex());
-    }
-    for (LogEntryProto entry : entries) {
-      // all these entries should be of the same term
-      Preconditions.checkArgument(entry.getTerm() == term,
-          "expected term:%s, term of the entry:%s", term, entry.getTerm());
-      final LogRecord currentLast = getLastRecord();
-      if (currentLast != null) {
-        Preconditions.checkArgument(
-            entry.getIndex() == currentLast.entry.getIndex() + 1,
-            "gap between entries %s and %s", entry.getIndex(),
-            currentLast.entry.getIndex());
-      }
-
-      final LogRecord record = new LogRecord(totalSize, entry);
-      records.add(record);
-      totalSize += getEntrySize(entry);
-      endIndex = entry.getIndex();
-    }
-  }
-
-  LogRecord getLogRecord(long index) {
-    if (index >= startIndex && index <= endIndex) {
-      return records.get((int) (index - startIndex));
-    }
-    return null;
-  }
-
-  LogRecord getLastRecord() {
-    return records.isEmpty() ? null : records.get(records.size() - 1);
-  }
-
-  long getTotalSize() {
-    return totalSize;
-  }
-
-  /**
-   * Remove records from the given index (inclusive)
-   */
-  void truncate(long fromIndex) {
-    Preconditions.checkArgument(fromIndex >= startIndex && fromIndex <= endIndex);
-    LogRecord record = records.get((int) (fromIndex - startIndex));
-    for (long index = endIndex; index >= fromIndex; index--) {
-      records.remove((int)(index - startIndex));
-    }
-    totalSize = record.offset;
-    isOpen = false;
-    this.endIndex = fromIndex - 1;
-  }
-
-  void close() {
-    Preconditions.checkState(isOpen());
-    isOpen = false;
-  }
-
-  @Override
-  public String toString() {
-    return isOpen() ? "log-" + startIndex + "-inprogress" :
-        "log-" + startIndex + "-" + endIndex;
-  }
-
-  @Override
-  public int compareTo(Long l) {
-    return (l >= getStartIndex() && l <= getEndIndex()) ? 0 :
-        (this.getEndIndex() < l ? -1 : 1);
-  }
-
-  void clear() {
-    records.clear();
-    endIndex = startIndex - 1;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/MemoryRaftLog.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/MemoryRaftLog.java b/raft-server/src/main/java/org/apache/raft/server/storage/MemoryRaftLog.java
deleted file mode 100644
index c12e1aa..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/storage/MemoryRaftLog.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.storage;
-
-import com.google.common.base.Preconditions;
-import org.apache.raft.server.impl.RaftConfiguration;
-import org.apache.raft.server.impl.RaftServerConstants;
-import org.apache.raft.server.impl.ServerProtoUtils;
-import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
-import org.apache.raft.util.AutoCloseableLock;
-import org.apache.raft.util.CodeInjectionForTesting;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * A simple RaftLog implementation in memory. Used only for testing.
- */
-public class MemoryRaftLog extends RaftLog {
-  private final List<LogEntryProto> entries = new ArrayList<>();
-
-  public MemoryRaftLog(String selfId) {
-    super(selfId);
-  }
-
-  @Override
-  public LogEntryProto get(long index) {
-    checkLogState();
-    try(AutoCloseableLock readLock = readLock()) {
-      final int i = (int) index;
-      return i >= 0 && i < entries.size() ? entries.get(i) : null;
-    }
-  }
-
-  @Override
-  public LogEntryProto[] getEntries(long startIndex, long endIndex) {
-    checkLogState();
-    try(AutoCloseableLock readLock = readLock()) {
-      final int i = (int) startIndex;
-      if (startIndex >= entries.size()) {
-        return null;
-      }
-      final int toIndex = (int) Math.min(entries.size(), endIndex);
-      return entries.subList(i, toIndex).toArray(EMPTY_LOGENTRY_ARRAY);
-    }
-  }
-
-  @Override
-  void truncate(long index) {
-    checkLogState();
-    try(AutoCloseableLock writeLock = writeLock()) {
-      Preconditions.checkArgument(index >= 0);
-      final int truncateIndex = (int) index;
-      for (int i = entries.size() - 1; i >= truncateIndex; i--) {
-        entries.remove(i);
-      }
-    }
-  }
-
-  @Override
-  public LogEntryProto getLastEntry() {
-    checkLogState();
-    try(AutoCloseableLock readLock = readLock()) {
-      final int size = entries.size();
-      return size == 0 ? null : entries.get(size - 1);
-    }
-  }
-
-  @Override
-  void appendEntry(LogEntryProto entry) {
-    checkLogState();
-    try(AutoCloseableLock writeLock = writeLock()) {
-      entries.add(entry);
-    }
-  }
-
-  @Override
-  public long append(long term, RaftConfiguration newConf) {
-    checkLogState();
-    try(AutoCloseableLock writeLock = writeLock()) {
-      final long nextIndex = getNextIndex();
-      final LogEntryProto e = ServerProtoUtils.toLogEntryProto(newConf, term,
-          nextIndex);
-      entries.add(e);
-      return nextIndex;
-    }
-  }
-
-  @Override
-  public long getStartIndex() {
-    return entries.isEmpty() ? RaftServerConstants.INVALID_LOG_INDEX :
-        entries.get(0).getIndex();
-  }
-
-  @Override
-  public void append(LogEntryProto... entries) {
-    checkLogState();
-    try(AutoCloseableLock writeLock = writeLock()) {
-      if (entries == null || entries.length == 0) {
-        return;
-      }
-      // Before truncating the entries, we first need to check if some
-      // entries are duplicated. If the leader sends entry 6, entry 7, then
-      // entry 6 again, without this check the follower may truncate entry 7
-      // when receiving entry 6 again. Then before the leader detects this
-      // truncation in the next appendEntries RPC, leader may think entry 7 has
-      // been committed but in the system the entry has not been committed to
-      // the quorum of peers' disks.
-      // TODO add a unit test for this
-      boolean toTruncate = false;
-      int truncateIndex = (int) entries[0].getIndex();
-      int index = 0;
-      for (; truncateIndex < getNextIndex() && index < entries.length;
-           index++, truncateIndex++) {
-        if (this.entries.get(truncateIndex).getTerm() !=
-            entries[index].getTerm()) {
-          toTruncate = true;
-          break;
-        }
-      }
-      if (toTruncate) {
-        truncate(truncateIndex);
-      }
-      //  Collections.addAll(this.entries, entries);
-      for (int i = index; i < entries.length; i++) {
-        this.entries.add(entries[i]);
-      }
-    }
-  }
-
-  @Override
-  public String toString() {
-    return "last=" + ServerProtoUtils.toString(getLastEntry())
-        + ", committed="
-        + ServerProtoUtils.toString(get(getLastCommittedIndex()));
-  }
-
-  public String getEntryString() {
-    return "entries=" + entries;
-  }
-
-  @Override
-  public void logSync() {
-    CodeInjectionForTesting.execute(LOG_SYNC, getSelfId(), null);
-    // do nothing
-  }
-
-  @Override
-  public long getLatestFlushedIndex() {
-    return getNextIndex() - 1;
-  }
-
-  @Override
-  public void writeMetadata(long term, String votedFor) {
-    // do nothing
-  }
-
-  @Override
-  public Metadata loadMetadata() {
-    return new Metadata(null, 0);
-  }
-
-  @Override
-  public void syncWithSnapshot(long lastSnapshotIndex) {
-    // do nothing
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/MetaFile.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/MetaFile.java b/raft-server/src/main/java/org/apache/raft/server/storage/MetaFile.java
deleted file mode 100644
index b2b6f04..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/storage/MetaFile.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.raft.server.storage;
-
-import com.google.common.base.Charsets;
-import org.apache.raft.util.AtomicFileOutputStream;
-import org.apache.raft.util.RaftUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.*;
-import java.util.Properties;
-
-/**
- * Class that represents a file on disk which persistently stores
- * a single <code>long</code> value. The file is updated atomically
- * and durably (i.e fsynced).
- */
-class MetaFile {
-  private static final Logger LOG = LoggerFactory.getLogger(MetaFile.class);
-  private static final String TERM_KEY = "term";
-  private static final String VOTEDFOR_KEY = "votedFor";
-  static final long DEFAULT_TERM = 0;
-  static final String EMPTY_VOTEFOR = "";
-
-  private final File file;
-  private boolean loaded = false;
-  private long term;
-  private String votedFor;
-
-  MetaFile(File file) {
-    this.file = file;
-    term = DEFAULT_TERM;
-    votedFor = EMPTY_VOTEFOR;
-  }
-
-  boolean exists() {
-    return this.file.exists();
-  }
-
-  long getTerm() throws IOException {
-    if (!loaded) {
-      readFile();
-      loaded = true;
-    }
-    return term;
-  }
-
-  String getVotedFor() throws IOException {
-    if (!loaded) {
-      readFile();
-      loaded = true;
-    }
-    return votedFor;
-  }
-
-  void set(long newTerm, String newVotedFor) throws IOException {
-    newVotedFor = newVotedFor == null ? EMPTY_VOTEFOR : newVotedFor;
-    if (!loaded || (newTerm != term || !newVotedFor.equals(votedFor))) {
-      writeFile(newTerm, newVotedFor);
-    }
-    term = newTerm;
-    votedFor = newVotedFor;
-    loaded = true;
-  }
-
-  /**
-   * Atomically write the given term and votedFor information to the given file,
-   * including fsyncing.
-   *
-   * @throws IOException if the file cannot be written
-   */
-  void writeFile(long term, String votedFor) throws IOException {
-    AtomicFileOutputStream fos = new AtomicFileOutputStream(file);
-    Properties properties = new Properties();
-    properties.setProperty(TERM_KEY, Long.toString(term));
-    properties.setProperty(VOTEDFOR_KEY, votedFor);
-    try {
-      properties.store(
-          new BufferedWriter(new OutputStreamWriter(fos, Charsets.UTF_8)), "");
-      fos.close();
-      fos = null;
-    } finally {
-      if (fos != null) {
-        fos.abort();
-      }
-    }
-  }
-
-  void readFile() throws IOException {
-    term = DEFAULT_TERM;
-    votedFor = EMPTY_VOTEFOR;
-    if (file.exists()) {
-      BufferedReader br = new BufferedReader(
-          new InputStreamReader(new FileInputStream(file), Charsets.UTF_8));
-      try {
-        Properties properties = new Properties();
-        properties.load(br);
-        if (properties.containsKey(TERM_KEY) &&
-            properties.containsKey(VOTEDFOR_KEY)) {
-          term = Long.parseLong((String) properties.get(TERM_KEY));
-          votedFor = (String) properties.get(VOTEDFOR_KEY);
-        } else {
-          throw new IOException("Corrupted term/votedFor properties: "
-              + properties);
-        }
-      } catch(IOException e) {
-        LOG.warn("Cannot load term/votedFor properties from {}", file, e);
-        throw e;
-      } finally {
-        RaftUtils.cleanup(LOG, br);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/RaftLog.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/RaftLog.java b/raft-server/src/main/java/org/apache/raft/server/storage/RaftLog.java
deleted file mode 100644
index de79911..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/storage/RaftLog.java
+++ /dev/null
@@ -1,292 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.storage;
-
-import com.google.common.base.Preconditions;
-import org.apache.raft.server.impl.ConfigurationManager;
-import org.apache.raft.server.impl.RaftConfiguration;
-import org.apache.raft.server.impl.RaftServerConstants;
-import org.apache.raft.server.impl.ServerProtoUtils;
-import org.apache.raft.server.protocol.TermIndex;
-import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
-import org.apache.raft.statemachine.TransactionContext;
-import org.apache.raft.util.AutoCloseableLock;
-import org.apache.raft.util.ProtoUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-/**
- * Base class of RaftLog. Currently we provide two types of RaftLog
- * implementation:
- * 1. MemoryRaftLog: all the log entries are stored in memory. This is only used
- *    for testing.
- * 2. Segmented RaftLog: the log entries are persisted on disk, and are stored
- *    in segments.
- */
-public abstract class RaftLog implements Closeable {
-  public static final Logger LOG = LoggerFactory.getLogger(RaftLog.class);
-  public static final LogEntryProto[] EMPTY_LOGENTRY_ARRAY = new LogEntryProto[0];
-  public static final String LOG_SYNC = RaftLog.class.getSimpleName() + ".logSync";
-
-  /**
-   * The largest committed index. Note the last committed log may be included
-   * in the latest snapshot file.
-   */
-  protected final AtomicLong lastCommitted =
-      new AtomicLong(RaftServerConstants.INVALID_LOG_INDEX);
-  private final String selfId;
-
-  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
-  private volatile boolean isOpen = false;
-
-  public RaftLog(String selfId) {
-    this.selfId = selfId;
-  }
-
-  public long getLastCommittedIndex() {
-    return lastCommitted.get();
-  }
-
-  public void checkLogState() {
-    Preconditions.checkState(isOpen,
-        "The RaftLog has not been opened or has been closed");
-  }
-
-  /**
-   * Update the last committed index.
-   * @param majorityIndex the index that has achieved majority.
-   * @param currentTerm the current term.
-   */
-  public void updateLastCommitted(long majorityIndex, long currentTerm) {
-    try(AutoCloseableLock writeLock = writeLock()) {
-      if (lastCommitted.get() < majorityIndex) {
-        // Only update last committed index for current term. See �5.4.2 in
-        // paper for details.
-        final LogEntryProto entry = get(majorityIndex);
-        if (entry != null && entry.getTerm() == currentTerm) {
-          LOG.debug("{}: Updating lastCommitted to {}", selfId, majorityIndex);
-          lastCommitted.set(majorityIndex);
-        }
-      }
-    }
-  }
-
-  /**
-   * Does the log contains the given term and index? Used to check the
-   * consistency between the local log of a follower and the log entries sent
-   * by the leader.
-   */
-  public boolean contains(TermIndex ti) {
-    if (ti == null) {
-      return false;
-    }
-    LogEntryProto entry = get(ti.getIndex());
-    TermIndex local = ServerProtoUtils.toTermIndex(entry);
-    return ti.equals(local);
-  }
-
-  /**
-   * @return the index of the next log entry to append.
-   */
-  public long getNextIndex() {
-    final LogEntryProto last = getLastEntry();
-    if (last == null) {
-      // if the log is empty, the last committed index should be consistent with
-      // the last index included in the latest snapshot.
-      return getLastCommittedIndex() + 1;
-    }
-    return last.getIndex() + 1;
-  }
-
-  /**
-   * Generate a log entry for the given term and message, and append the entry.
-   * Used by the leader.
-   * @return the index of the new log entry.
-   */
-  public long append(long term, TransactionContext operation) throws IOException {
-    checkLogState();
-    try(AutoCloseableLock writeLock = writeLock()) {
-      final long nextIndex = getNextIndex();
-
-      // This is called here to guarantee strict serialization of callback executions in case
-      // the SM wants to attach a logic depending on ordered execution in the log commit order.
-      operation = operation.preAppendTransaction();
-
-      // build the log entry after calling the StateMachine
-      final LogEntryProto e = ProtoUtils.toLogEntryProto(
-          operation.getSMLogEntry().get(), term, nextIndex);
-
-      appendEntry(e);
-      operation.setLogEntry(e);
-      return nextIndex;
-    }
-  }
-
-  /**
-   * Generate a log entry for the given term and configurations,
-   * and append the entry. Used by the leader.
-   * @return the index of the new log entry.
-   */
-  public long append(long term, RaftConfiguration newConf) {
-    checkLogState();
-    try(AutoCloseableLock writeLock = writeLock()) {
-      final long nextIndex = getNextIndex();
-      final LogEntryProto e = ServerProtoUtils.toLogEntryProto(newConf, term,
-          nextIndex);
-      appendEntry(e);
-      return nextIndex;
-    }
-  }
-
-  public void open(ConfigurationManager confManager, long lastIndexInSnapshot)
-      throws IOException {
-    isOpen = true;
-  }
-
-  public abstract long getStartIndex();
-
-  /**
-   * Get the log entry of the given index.
-   *
-   * @param index The given index.
-   * @return The log entry associated with the given index.
-   *         Null if there is no log entry with the index.
-   */
-  public abstract LogEntryProto get(long index);
-
-  /**
-   * @param startIndex the starting log index (inclusive)
-   * @param endIndex the ending log index (exclusive)
-   * @return all log entries within the given index range. Null if startIndex
-   *         is greater than the smallest available index.
-   */
-  public abstract LogEntryProto[] getEntries(long startIndex, long endIndex);
-
-  /**
-   * @return the last log entry.
-   */
-  public abstract LogEntryProto getLastEntry();
-
-  /**
-   * Truncate the log entries till the given index. The log with the given index
-   * will also be truncated (i.e., inclusive).
-   */
-  abstract void truncate(long index);
-
-  /**
-   * Used by the leader when appending a new entry based on client's request
-   * or configuration change.
-   */
-  abstract void appendEntry(LogEntryProto entry);
-
-  /**
-   * Append all the given log entries. Used by the followers.
-   *
-   * If an existing entry conflicts with a new one (same index but different
-   * terms), delete the existing entry and all entries that follow it (�5.3).
-   *
-   * This method, {@link #append(long, TransactionContext)},
-   * {@link #append(long, RaftConfiguration)}, and {@link #truncate(long)},
-   * do not guarantee the changes are persisted.
-   * Need to call {@link #logSync()} to persist the changes.
-   */
-  public abstract void append(LogEntryProto... entries);
-
-  /**
-   * Flush and sync the log.
-   * It is triggered by AppendEntries RPC request from the leader.
-   */
-  public abstract void logSync() throws InterruptedException;
-
-  /**
-   * @return the index of the latest entry that has been flushed to the local
-   *         storage.
-   */
-  public abstract long getLatestFlushedIndex();
-
-  /**
-   * Write and flush the metadata (votedFor and term) into the meta file.
-   *
-   * We need to guarantee that the order of writeMetadata calls is the same with
-   * that when we change the in-memory term/votedFor. Otherwise we may persist
-   * stale term/votedFor in file.
-   *
-   * Since the leader change is not frequent, currently we simply put this call
-   * in the RaftPeer's lock. Later we can use an IO task queue to enforce the
-   * order.
-   */
-  public abstract void writeMetadata(long term, String votedFor)
-      throws IOException;
-
-  public abstract Metadata loadMetadata() throws IOException;
-
-  public abstract void syncWithSnapshot(long lastSnapshotIndex);
-
-  @Override
-  public String toString() {
-    return ServerProtoUtils.toString(getLastEntry());
-  }
-
-  public static class Metadata {
-    private final String votedFor;
-    private final long term;
-
-    public Metadata(String votedFor, long term) {
-      this.votedFor = votedFor;
-      this.term = term;
-    }
-
-    public String getVotedFor() {
-      return votedFor;
-    }
-
-    public long getTerm() {
-      return term;
-    }
-  }
-
-  public AutoCloseableLock readLock() {
-    return AutoCloseableLock.acquire(lock.readLock());
-  }
-
-  public AutoCloseableLock writeLock() {
-    return AutoCloseableLock.acquire(lock.writeLock());
-  }
-
-  public boolean hasWriteLock() {
-    return this.lock.isWriteLockedByCurrentThread();
-  }
-
-  public boolean hasReadLock() {
-    return this.lock.getReadHoldCount() > 0 || hasWriteLock();
-  }
-
-  @Override
-  public void close() throws IOException {
-    isOpen = false;
-  }
-
-  public String getSelfId() {
-    return selfId;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogCache.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogCache.java b/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogCache.java
deleted file mode 100644
index d022a91..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogCache.java
+++ /dev/null
@@ -1,323 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.storage;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.raft.server.impl.RaftServerConstants;
-import org.apache.raft.server.storage.LogSegment.LogRecord;
-import org.apache.raft.server.storage.LogSegment.SegmentFileInfo;
-import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
-
-import java.util.*;
-
-import static org.apache.raft.server.impl.RaftServerConstants.INVALID_LOG_INDEX;
-
-/**
- * In-memory RaftLog Cache. Currently we provide a simple implementation that
- * caches all the segments in the memory. The cache is not thread-safe and
- * requires external lock protection.
- */
-class RaftLogCache {
-  private LogSegment openSegment;
-  private final List<LogSegment> closedSegments;
-
-  RaftLogCache() {
-    closedSegments = new ArrayList<>();
-  }
-
-  private boolean areConsecutiveSegments(LogSegment prev, LogSegment segment) {
-    return !prev.isOpen() && prev.getEndIndex() + 1 == segment.getStartIndex();
-  }
-
-  private LogSegment getLastClosedSegment() {
-    return closedSegments.isEmpty() ?
-        null : closedSegments.get(closedSegments.size() - 1);
-  }
-
-  private void validateAdding(LogSegment segment) {
-    final LogSegment lastClosed = getLastClosedSegment();
-    if (!segment.isOpen()) {
-      Preconditions.checkState(lastClosed == null ||
-          areConsecutiveSegments(lastClosed, segment));
-    } else {
-      Preconditions.checkState(openSegment == null &&
-          (lastClosed == null || areConsecutiveSegments(lastClosed, segment)));
-    }
-  }
-
-  void addSegment(LogSegment segment) {
-    validateAdding(segment);
-    if (segment.isOpen()) {
-      openSegment = segment;
-    } else {
-      closedSegments.add(segment);
-    }
-  }
-
-  LogEntryProto getEntry(long index) {
-    if (openSegment != null && index >= openSegment.getStartIndex()) {
-      final LogRecord record = openSegment.getLogRecord(index);
-      return record == null ? null : record.entry;
-    } else {
-      int segmentIndex = Collections.binarySearch(closedSegments, index);
-      if (segmentIndex < 0) {
-        return null;
-      } else {
-        return closedSegments.get(segmentIndex).getLogRecord(index).entry;
-      }
-    }
-  }
-
-  /**
-   * @param startIndex inclusive
-   * @param endIndex exclusive
-   */
-  LogEntryProto[] getEntries(final long startIndex, final long endIndex) {
-    if (startIndex < 0 || startIndex < getStartIndex()) {
-      throw new IndexOutOfBoundsException("startIndex = " + startIndex
-          + ", log cache starts from index " + getStartIndex());
-    }
-    if (startIndex > endIndex) {
-      throw new IndexOutOfBoundsException("startIndex(" + startIndex
-          + ") > endIndex(" + endIndex + ")");
-    }
-    final long realEnd = Math.min(getEndIndex() + 1, endIndex);
-    if (startIndex >= realEnd) {
-      return RaftLog.EMPTY_LOGENTRY_ARRAY;
-    }
-
-    LogEntryProto[] entries = new LogEntryProto[(int) (realEnd - startIndex)];
-    int segmentIndex = Collections.binarySearch(closedSegments, startIndex);
-    if (segmentIndex < 0) {
-      getEntriesFromSegment(openSegment, startIndex, entries, 0, entries.length);
-    } else {
-      long index = startIndex;
-      for (int i = segmentIndex; i < closedSegments.size() && index < realEnd; i++) {
-        LogSegment s = closedSegments.get(i);
-        int numberFromSegment = (int) Math.min(realEnd - index,
-            s.getEndIndex() - index + 1);
-        getEntriesFromSegment(s, index, entries, (int) (index - startIndex),
-            numberFromSegment);
-        index += numberFromSegment;
-      }
-      if (index < realEnd) {
-        getEntriesFromSegment(openSegment, index, entries,
-            (int) (index - startIndex), (int) (realEnd - index));
-      }
-    }
-    return entries;
-  }
-
-  private void getEntriesFromSegment(LogSegment segment, long startIndex,
-      LogEntryProto[] entries, int offset, int size) {
-    long endIndex = segment.getEndIndex();
-    endIndex = Math.min(endIndex, startIndex + size - 1);
-    int index = offset;
-    for (long i = startIndex; i <= endIndex; i++) {
-      entries[index++] = segment.getLogRecord(i).entry;
-    }
-  }
-
-  long getStartIndex() {
-    if (closedSegments.isEmpty()) {
-      return openSegment != null ? openSegment.getStartIndex() :
-          RaftServerConstants.INVALID_LOG_INDEX;
-    } else {
-      return closedSegments.get(0).getStartIndex();
-    }
-  }
-
-  @VisibleForTesting
-  long getEndIndex() {
-    return openSegment != null ? openSegment.getEndIndex() :
-        (closedSegments.isEmpty() ?
-            INVALID_LOG_INDEX :
-            closedSegments.get(closedSegments.size() - 1).getEndIndex());
-  }
-
-  LogEntryProto getLastEntry() {
-    return (openSegment != null && openSegment.numOfEntries() > 0) ?
-        openSegment.getLastRecord().entry :
-        (closedSegments.isEmpty() ? null :
-            closedSegments.get(closedSegments.size() - 1).getLastRecord().entry);
-  }
-
-  LogSegment getOpenSegment() {
-    return openSegment;
-  }
-
-  void appendEntry(LogEntryProto entry) {
-    // SegmentedRaftLog does the segment creation/rolling work. Here we just
-    // simply append the entry into the open segment.
-    Preconditions.checkState(openSegment != null);
-    openSegment.appendToOpenSegment(entry);
-  }
-
-  /**
-   * finalize the current open segment, and start a new open segment
-   */
-  void rollOpenSegment(boolean createNewOpen) {
-    Preconditions.checkState(openSegment != null
-        && openSegment.numOfEntries() > 0);
-    final long nextIndex = openSegment.getEndIndex() + 1;
-    openSegment.close();
-    closedSegments.add(openSegment);
-    if (createNewOpen) {
-      openSegment = LogSegment.newOpenSegment(nextIndex);
-    } else {
-      openSegment = null;
-    }
-  }
-
-  private SegmentFileInfo deleteOpenSegment() {
-    final long oldEnd = openSegment.getEndIndex();
-    openSegment.clear();
-    SegmentFileInfo info = new SegmentFileInfo(openSegment.getStartIndex(),
-        oldEnd, true, 0, openSegment.getEndIndex());
-    openSegment = null;
-    return info;
-  }
-
-  /**
-   * truncate log entries starting from the given index (inclusive)
-   */
-  TruncationSegments truncate(long index) {
-    int segmentIndex = Collections.binarySearch(closedSegments, index);
-    if (segmentIndex == -closedSegments.size() - 1) {
-      if (openSegment != null && openSegment.getEndIndex() >= index) {
-        final long oldEnd = openSegment.getEndIndex();
-        if (index == openSegment.getStartIndex()) {
-          // the open segment should be deleted
-          return new TruncationSegments(null,
-              Collections.singletonList(deleteOpenSegment()));
-        } else {
-          openSegment.truncate(index);
-          Preconditions.checkState(!openSegment.isOpen());
-          SegmentFileInfo info = new SegmentFileInfo(openSegment.getStartIndex(),
-              oldEnd, true, openSegment.getTotalSize(),
-              openSegment.getEndIndex());
-          closedSegments.add(openSegment);
-          openSegment = null;
-          return new TruncationSegments(info, Collections.emptyList());
-        }
-      }
-    } else if (segmentIndex >= 0) {
-      LogSegment ts = closedSegments.get(segmentIndex);
-      final long oldEnd = ts.getEndIndex();
-      List<SegmentFileInfo> list = new ArrayList<>();
-      ts.truncate(index);
-      final int size = closedSegments.size();
-      for (int i = size - 1;
-           i >= (ts.numOfEntries() == 0 ? segmentIndex : segmentIndex + 1);
-           i-- ) {
-        LogSegment s = closedSegments.remove(i);
-        final long endOfS = i == segmentIndex ? oldEnd : s.getEndIndex();
-        s.clear();
-        list.add(new SegmentFileInfo(s.getStartIndex(), endOfS, false, 0,
-            s.getEndIndex()));
-      }
-      if (openSegment != null) {
-        list.add(deleteOpenSegment());
-      }
-      SegmentFileInfo t = ts.numOfEntries() == 0 ? null :
-          new SegmentFileInfo(ts.getStartIndex(), oldEnd, false,
-              ts.getTotalSize(), ts.getEndIndex());
-      return new TruncationSegments(t, list);
-    }
-    return null;
-  }
-
-  static class TruncationSegments {
-    final SegmentFileInfo toTruncate; // name of the file to be truncated
-    final SegmentFileInfo[] toDelete; // names of the files to be deleted
-
-    TruncationSegments(SegmentFileInfo toTruncate,
-        List<SegmentFileInfo> toDelete) {
-      this.toDelete = toDelete == null ? null :
-          toDelete.toArray(new SegmentFileInfo[toDelete.size()]);
-      this.toTruncate = toTruncate;
-    }
-  }
-
-  Iterator<LogEntryProto> iterator(long startIndex) {
-    return new EntryIterator(startIndex);
-  }
-
-  private class EntryIterator implements Iterator<LogEntryProto> {
-    private long nextIndex;
-    private LogSegment currentSegment;
-    private int segmentIndex;
-
-    EntryIterator(long start) {
-      this.nextIndex = start;
-      segmentIndex = Collections.binarySearch(closedSegments, nextIndex);
-      if (segmentIndex >= 0) {
-        currentSegment = closedSegments.get(segmentIndex);
-      } else {
-        segmentIndex = -segmentIndex - 1;
-        if (segmentIndex == closedSegments.size()) {
-          currentSegment = openSegment;
-        } else {
-          // the start index is smaller than the first closed segment's start
-          // index. We no longer keep the log entry (because of the snapshot) or
-          // the start index is invalid.
-          Preconditions.checkState(segmentIndex == 0);
-          throw new IndexOutOfBoundsException();
-        }
-      }
-    }
-
-    @Override
-    public boolean hasNext() {
-      return currentSegment != null &&
-          currentSegment.getLogRecord(nextIndex) != null;
-    }
-
-    @Override
-    public LogEntryProto next() {
-      LogRecord record;
-      if (currentSegment == null ||
-          (record = currentSegment.getLogRecord(nextIndex)) == null) {
-        throw new NoSuchElementException();
-      }
-      if (++nextIndex > currentSegment.getEndIndex()) {
-        if (currentSegment != openSegment) {
-          segmentIndex++;
-          currentSegment = segmentIndex == closedSegments.size() ?
-              openSegment : closedSegments.get(segmentIndex);
-        }
-      }
-      return record.entry;
-    }
-  }
-
-  @VisibleForTesting
-  int getNumOfSegments() {
-    return closedSegments.size() + (openSegment == null ? 0 : 1);
-  }
-
-  boolean isEmpty() {
-    return closedSegments.isEmpty() && openSegment == null;
-  }
-
-  void clear() {
-    openSegment = null;
-    closedSegments.clear();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogWorker.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogWorker.java b/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogWorker.java
deleted file mode 100644
index 6cef212..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/storage/RaftLogWorker.java
+++ /dev/null
@@ -1,371 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.storage;
-
-import com.google.common.base.Preconditions;
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.io.nativeio.NativeIO;
-import org.apache.raft.server.impl.RaftServerImpl;
-import org.apache.raft.server.impl.RaftServerConstants;
-import org.apache.raft.server.storage.LogSegment.SegmentFileInfo;
-import org.apache.raft.server.storage.RaftLogCache.TruncationSegments;
-import org.apache.raft.server.storage.SegmentedRaftLog.Task;
-import org.apache.raft.shaded.proto.RaftProtos.LogEntryProto;
-import org.apache.raft.util.ExitUtils;
-import org.apache.raft.util.FileUtils;
-import org.apache.raft.util.RaftUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.raft.server.RaftServerConfigKeys.RAFT_LOG_FORCE_SYNC_NUM_DEFAULT;
-import static org.apache.raft.server.RaftServerConfigKeys.RAFT_LOG_FORCE_SYNC_NUM_KEY;
-
-/**
- * This class takes the responsibility of all the raft log related I/O ops for a
- * raft peer.
- */
-class RaftLogWorker implements Runnable {
-  static final Logger LOG = LoggerFactory.getLogger(RaftLogWorker.class);
-  /**
-   * The task queue accessed by rpc handler threads and the io worker thread.
-   */
-  private final BlockingQueue<Task> queue = new ArrayBlockingQueue<>(4096);
-  private volatile boolean running = true;
-  private final Thread workerThread;
-
-  private final RaftStorage storage;
-  private LogOutputStream out;
-  private final RaftServerImpl raftServer;
-
-  /**
-   * The number of entries that have been written into the LogOutputStream but
-   * has not been flushed.
-   */
-  private int pendingFlushNum = 0;
-  /** the index of the last entry that has been written */
-  private long lastWrittenIndex;
-  /** the largest index of the entry that has been flushed */
-  private volatile long flushedIndex;
-
-  private final int forceSyncNum;
-
-  private final  RaftProperties properties;
-
-  RaftLogWorker(RaftServerImpl raftServer, RaftStorage storage,
-                RaftProperties properties) {
-    this.raftServer = raftServer;
-    this.storage = storage;
-    this.properties = properties;
-    this.forceSyncNum = properties.getInt(RAFT_LOG_FORCE_SYNC_NUM_KEY,
-        RAFT_LOG_FORCE_SYNC_NUM_DEFAULT);
-    workerThread = new Thread(this,
-        getClass().getSimpleName() + " for " + storage);
-  }
-
-  void start(long latestIndex, File openSegmentFile) throws IOException {
-    lastWrittenIndex = latestIndex;
-    flushedIndex = latestIndex;
-    if (openSegmentFile != null) {
-      Preconditions.checkArgument(openSegmentFile.exists());
-      out = new LogOutputStream(openSegmentFile, true, properties);
-    }
-    workerThread.start();
-  }
-
-  void close() {
-    this.running = false;
-    workerThread.interrupt();
-    try {
-      workerThread.join();
-    } catch (InterruptedException ignored) {
-    }
-  }
-
-  /**
-   * A snapshot has just been installed on the follower. Need to update the IO
-   * worker's state accordingly.
-   */
-  void syncWithSnapshot(long lastSnapshotIndex) {
-    queue.clear();
-    lastWrittenIndex = lastSnapshotIndex;
-    flushedIndex = lastSnapshotIndex;
-    pendingFlushNum = 0;
-  }
-
-  @Override
-  public String toString() {
-    return this.getClass().getSimpleName() + "-"
-        + (raftServer != null ? raftServer.getId() : "");
-  }
-
-  /**
-   * This is protected by the RaftServer and RaftLog's lock.
-   */
-  private Task addIOTask(Task task) {
-    LOG.debug("add task {}", task);
-    try {
-      if (!queue.offer(task, 1, TimeUnit.SECONDS)) {
-        Preconditions.checkState(isAlive(),
-            "the worker thread is not alive");
-        queue.put(task);
-      }
-    } catch (Throwable t) {
-      if (t instanceof InterruptedException && !running) {
-        LOG.info("Got InterruptedException when adding task " + task
-            + ". The RaftLogWorker already stopped.");
-      } else {
-        ExitUtils.terminate(2, "Failed to add IO task " + task, t, LOG);
-      }
-    }
-    return task;
-  }
-
-  boolean isAlive() {
-    return running && workerThread.isAlive();
-  }
-
-  @Override
-  public void run() {
-    while (running) {
-      try {
-        Task task = queue.poll(1, TimeUnit.SECONDS);
-        if (task != null) {
-          try {
-            task.execute();
-          } catch (IOException e) {
-            if (task.getEndIndex() < lastWrittenIndex) {
-              LOG.info("Ignore IOException when handling task " + task
-                  + " which is smaller than the lastWrittenIndex."
-                  + " There should be a snapshot installed.", e);
-            } else {
-              throw e;
-            }
-          }
-          task.done();
-        }
-      } catch (InterruptedException e) {
-        LOG.info(Thread.currentThread().getName()
-            + " was interrupted, exiting. There are " + queue.size()
-            + " tasks remaining in the queue.");
-      } catch (Throwable t) {
-        // TODO avoid terminating the jvm by supporting multiple log directories
-        ExitUtils.terminate(1, Thread.currentThread().getName() + " failed.", t, LOG);
-      }
-    }
-  }
-
-  private boolean shouldFlush() {
-    return pendingFlushNum >= forceSyncNum ||
-        (pendingFlushNum > 0 && queue.isEmpty());
-  }
-
-  private void flushWrites() throws IOException {
-    if (out != null) {
-      LOG.debug("flush data to " + out + ", reset pending_sync_number to 0");
-      out.flush();
-      updateFlushedIndex();
-    }
-  }
-
-  private void updateFlushedIndex() {
-    flushedIndex = lastWrittenIndex;
-    pendingFlushNum = 0;
-    if (raftServer != null) {
-      raftServer.submitLocalSyncEvent();
-    }
-  }
-
-  /**
-   * The following several methods (startLogSegment, rollLogSegment,
-   * writeLogEntry, and truncate) are only called by SegmentedRaftLog which is
-   * protected by RaftServer's lock.
-   *
-   * Thus all the tasks are created and added sequentially.
-   */
-  Task startLogSegment(long startIndex) {
-    return addIOTask(new StartLogSegment(startIndex));
-  }
-
-  Task rollLogSegment(LogSegment segmentToClose) {
-    addIOTask(new FinalizeLogSegment(segmentToClose));
-    return addIOTask(new StartLogSegment(segmentToClose.getEndIndex() + 1));
-  }
-
-  Task writeLogEntry(LogEntryProto entry) {
-    return addIOTask(new WriteLog(entry));
-  }
-
-  Task truncate(TruncationSegments ts) {
-    return addIOTask(new TruncateLog(ts));
-  }
-
-  // TODO we can add another level of buffer for writing here
-  private class WriteLog extends Task {
-    private final LogEntryProto entry;
-
-    WriteLog(LogEntryProto entry) {
-      this.entry = entry;
-    }
-
-    @Override
-    public void execute() throws IOException {
-      Preconditions.checkState(out != null);
-      Preconditions.checkState(lastWrittenIndex + 1 == entry.getIndex(),
-          "lastWrittenIndex == %s, entry == %s", lastWrittenIndex, entry);
-      out.write(entry);
-      lastWrittenIndex = entry.getIndex();
-      pendingFlushNum++;
-      if (shouldFlush()) {
-        flushWrites();
-      }
-    }
-
-    @Override
-    long getEndIndex() {
-      return entry.getIndex();
-    }
-  }
-
-  private class FinalizeLogSegment extends Task {
-    private final LogSegment segmentToClose;
-
-    FinalizeLogSegment(LogSegment segmentToClose) {
-      this.segmentToClose = segmentToClose;
-    }
-
-    @Override
-    public void execute() throws IOException {
-      RaftUtils.cleanup(null, out);
-      out = null;
-      Preconditions.checkState(segmentToClose != null);
-
-      File openFile = storage.getStorageDir()
-          .getOpenLogFile(segmentToClose.getStartIndex());
-      Preconditions.checkState(openFile.exists(),
-          "File %s does not exist.", openFile);
-      if (segmentToClose.numOfEntries() > 0) {
-        // finalize the current open segment
-        File dstFile = storage.getStorageDir().getClosedLogFile(
-            segmentToClose.getStartIndex(), segmentToClose.getEndIndex());
-        Preconditions.checkState(!dstFile.exists());
-
-        NativeIO.renameTo(openFile, dstFile);
-      } else { // delete the file of the empty segment
-        FileUtils.deleteFile(openFile);
-      }
-      updateFlushedIndex();
-    }
-
-    @Override
-    long getEndIndex() {
-      return segmentToClose.getEndIndex();
-    }
-  }
-
-  private class StartLogSegment extends Task {
-    private final long newStartIndex;
-
-    StartLogSegment(long newStartIndex) {
-      this.newStartIndex = newStartIndex;
-    }
-
-    @Override
-    void execute() throws IOException {
-      File openFile = storage.getStorageDir().getOpenLogFile(newStartIndex);
-      Preconditions.checkState(!openFile.exists(), "open file %s exists for %s",
-          openFile.getAbsolutePath(), RaftLogWorker.this.toString());
-      Preconditions.checkState(out == null && pendingFlushNum == 0);
-      out = new LogOutputStream(openFile, false, properties);
-    }
-
-    @Override
-    long getEndIndex() {
-      return newStartIndex;
-    }
-  }
-
-  private class TruncateLog extends Task {
-    private final TruncationSegments segments;
-
-    TruncateLog(TruncationSegments ts) {
-      this.segments = ts;
-    }
-
-    @Override
-    void execute() throws IOException {
-      RaftUtils.cleanup(null, out);
-      out = null;
-      if (segments.toTruncate != null) {
-        File fileToTruncate = segments.toTruncate.isOpen ?
-            storage.getStorageDir().getOpenLogFile(
-                segments.toTruncate.startIndex) :
-            storage.getStorageDir().getClosedLogFile(
-                segments.toTruncate.startIndex,
-                segments.toTruncate.endIndex);
-        FileUtils.truncateFile(fileToTruncate, segments.toTruncate.targetLength);
-
-        // rename the file
-        File dstFile = storage.getStorageDir().getClosedLogFile(
-            segments.toTruncate.startIndex, segments.toTruncate.newEndIndex);
-        Preconditions.checkState(!dstFile.exists());
-        NativeIO.renameTo(fileToTruncate, dstFile);
-
-        // update lastWrittenIndex
-        lastWrittenIndex = segments.toTruncate.newEndIndex;
-      }
-      if (segments.toDelete != null && segments.toDelete.length > 0) {
-        long minStart = segments.toDelete[0].startIndex;
-        for (SegmentFileInfo del : segments.toDelete) {
-          final File delFile;
-          if (del.isOpen) {
-            delFile = storage.getStorageDir().getOpenLogFile(del.startIndex);
-          } else {
-            delFile = storage.getStorageDir()
-                .getClosedLogFile(del.startIndex, del.endIndex);
-          }
-          FileUtils.deleteFile(delFile);
-          minStart = Math.min(minStart, del.startIndex);
-        }
-        if (segments.toTruncate == null) {
-          lastWrittenIndex = minStart - 1;
-        }
-      }
-      updateFlushedIndex();
-    }
-
-    @Override
-    long getEndIndex() {
-      if (segments.toTruncate != null) {
-        return segments.toTruncate.newEndIndex;
-      } else if (segments.toDelete.length > 0) {
-        return segments.toDelete[segments.toDelete.length - 1].endIndex;
-      }
-      return RaftServerConstants.INVALID_LOG_INDEX;
-    }
-  }
-
-  long getFlushedIndex() {
-    return flushedIndex;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorage.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorage.java b/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorage.java
deleted file mode 100644
index 434f505..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorage.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.storage;
-
-import com.google.common.base.Preconditions;
-import org.apache.raft.conf.RaftProperties;
-import org.apache.raft.server.impl.RaftServerConstants;
-import org.apache.raft.server.storage.RaftStorageDirectory.StorageState;
-import org.apache.raft.statemachine.SnapshotInfo;
-import org.apache.raft.statemachine.StateMachineStorage;
-import org.apache.raft.util.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-
-import static org.apache.raft.server.RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_DEFAULT;
-import static org.apache.raft.server.RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_KEY;
-
-public class RaftStorage implements Closeable {
-  private static final Logger LOG = LoggerFactory.getLogger(RaftStorage.class);
-
-  // TODO support multiple storage directories
-  private final RaftStorageDirectory storageDir;
-  private final StorageState state;
-  private volatile MetaFile metaFile;
-  private StateMachineStorage stateMachineStorage;
-
-  public RaftStorage(RaftProperties prop, RaftServerConstants.StartupOption option)
-      throws IOException {
-    final String dir = prop.get(RAFT_SERVER_STORAGE_DIR_KEY,
-        RAFT_SERVER_STORAGE_DIR_DEFAULT);
-    storageDir = new RaftStorageDirectory(
-        new File(FileUtils.stringAsURI(dir).getPath()));
-    if (option == RaftServerConstants.StartupOption.FORMAT) {
-      if (storageDir.analyzeStorage(false) == StorageState.NON_EXISTENT) {
-        throw new IOException("Cannot format " + storageDir);
-      }
-      storageDir.lock();
-      format();
-      state = storageDir.analyzeStorage(false);
-      Preconditions.checkState(state == StorageState.NORMAL);
-    } else {
-      state = analyzeAndRecoverStorage(true); // metaFile is initialized here
-      if (state != StorageState.NORMAL) {
-        storageDir.unlock();
-        throw new IOException("Cannot load " + storageDir
-            + ". Its state: " + state);
-      }
-    }
-  }
-
-  StorageState getState() {
-    return state;
-  }
-
-  private void format() throws IOException {
-    storageDir.clearDirectory();
-    metaFile = writeMetaFile(MetaFile.DEFAULT_TERM, MetaFile.EMPTY_VOTEFOR);
-    LOG.info("Storage directory " + storageDir.getRoot()
-        + " has been successfully formatted.");
-  }
-
-  private MetaFile writeMetaFile(long term, String votedFor) throws IOException {
-    MetaFile metaFile = new MetaFile(storageDir.getMetaFile());
-    metaFile.set(term, votedFor);
-    return metaFile;
-  }
-
-  private void cleanMetaTmpFile() throws IOException {
-    Files.deleteIfExists(storageDir.getMetaTmpFile().toPath());
-  }
-
-  private StorageState analyzeAndRecoverStorage(boolean toLock)
-      throws IOException {
-    StorageState storageState = storageDir.analyzeStorage(toLock);
-    if (storageState == StorageState.NORMAL) {
-      metaFile = new MetaFile(storageDir.getMetaFile());
-      assert metaFile.exists();
-      metaFile.readFile();
-      // Existence of raft-meta.tmp means the change of votedFor/term has not
-      // been committed. Thus we should delete the tmp file.
-      cleanMetaTmpFile();
-      return StorageState.NORMAL;
-    } else if (storageState == StorageState.NOT_FORMATTED &&
-        storageDir.isCurrentEmpty()) {
-      format();
-      return StorageState.NORMAL;
-    } else {
-      return storageState;
-    }
-  }
-
-  public RaftStorageDirectory getStorageDir() {
-    return storageDir;
-  }
-
-  @Override
-  public void close() throws IOException {
-    storageDir.unlock();
-  }
-
-  MetaFile getMetaFile() {
-    return metaFile;
-  }
-
-  public SnapshotInfo getLastestSnapshot() throws IOException {
-    return getStateMachineStorage().getLatestSnapshot();
-  }
-
-  /**
-   * Called by the state machine after it has initialized the StateMachineStorage.
-   */
-  public void setStateMachineStorage(StateMachineStorage smStorage) {
-    this.stateMachineStorage = smStorage;
-  }
-
-  public StateMachineStorage getStateMachineStorage() {
-    return stateMachineStorage;
-  }
-
-  @Override
-  public String toString() {
-    return getStorageDir() + "";
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorageDirectory.java
----------------------------------------------------------------------
diff --git a/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorageDirectory.java b/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorageDirectory.java
deleted file mode 100644
index 662e4ec..0000000
--- a/raft-server/src/main/java/org/apache/raft/server/storage/RaftStorageDirectory.java
+++ /dev/null
@@ -1,360 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.raft.server.storage;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Charsets;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import org.apache.raft.util.AtomicFileOutputStream;
-import org.apache.raft.util.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.lang.management.ManagementFactory;
-import java.nio.channels.FileLock;
-import java.nio.channels.OverlappingFileLockException;
-import java.nio.file.DirectoryStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.*;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import static java.nio.file.Files.newDirectoryStream;
-import static org.apache.raft.server.impl.RaftServerConstants.INVALID_LOG_INDEX;
-
-public class RaftStorageDirectory {
-  static final Logger LOG = LoggerFactory.getLogger(RaftStorageDirectory.class);
-
-  static final String STORAGE_DIR_CURRENT = "current";
-  static final String STORAGE_FILE_LOCK = "in_use.lock";
-  static final String META_FILE_NAME = "raft-meta";
-  static final String LOG_FILE_INPROGRESS = "inprogress";
-  static final String LOG_FILE_PREFIX = "log";
-  static final String STATE_MACHINE = "sm"; // directory containing state machine snapshots
-  static final String TEMP = "tmp";
-  static final Pattern CLOSED_SEGMENT_REGEX = Pattern.compile("log_(\\d+)-(\\d+)");
-  static final Pattern OPEN_SEGMENT_REGEX = Pattern.compile("log_inprogress_(\\d+)(?:\\..*)?");
-
-  private static final List<Pattern> LOGSEGMENTS_REGEXES =
-      ImmutableList.of(CLOSED_SEGMENT_REGEX, OPEN_SEGMENT_REGEX);
-
-  enum StorageState {
-    NON_EXISTENT,
-    NOT_FORMATTED,
-    NORMAL
-  }
-
-  public static class LogPathAndIndex {
-    public final Path path;
-    public final long startIndex;
-    public final long endIndex;
-
-    LogPathAndIndex(Path path, long startIndex, long endIndex) {
-      this.path = path;
-      this.startIndex = startIndex;
-      this.endIndex = endIndex;
-    }
-
-    @Override
-    public String toString() {
-      return path + "-" + startIndex + "-" + endIndex;
-    }
-  }
-
-  private final File root; // root directory
-  private FileLock lock;   // storage lock
-
-  /**
-   * Constructor
-   * @param dir directory corresponding to the storage
-   */
-  RaftStorageDirectory(File dir) {
-    this.root = dir;
-    this.lock = null;
-  }
-
-  /**
-   * Get root directory of this storage
-   */
-  //TODO
-  public File getRoot() {
-    return root;
-  }
-
-  /**
-   * Clear and re-create storage directory.
-   * <p>
-   * Removes contents of the current directory and creates an empty directory.
-   *
-   * This does not fully format storage directory.
-   * It cannot write the version file since it should be written last after
-   * all other storage type dependent files are written.
-   * Derived storage is responsible for setting specific storage values and
-   * writing the version file to disk.
-   */
-  void clearDirectory() throws IOException {
-    File curDir = this.getCurrentDir();
-    clearDirectory(curDir);
-    clearDirectory(getStateMachineDir());
-  }
-
-  void clearDirectory(File dir) throws IOException {
-    if (dir.exists()) {
-      File[] files = FileUtils.listFiles(dir);
-      LOG.info("Will remove files: " + Arrays.toString(files));
-      if (!(FileUtils.fullyDelete(dir)))
-        throw new IOException("Cannot remove directory: " + dir);
-    }
-    if (!dir.mkdirs())
-      throw new IOException("Cannot create directory " + dir);
-  }
-
-  /**
-   * Directory {@code current} contains latest files defining
-   * the file system meta-data.
-   *
-   * @return the directory path
-   */
-  File getCurrentDir() {
-    return new File(root, STORAGE_DIR_CURRENT);
-  }
-
-  File getMetaFile() {
-    return new File(getCurrentDir(), META_FILE_NAME);
-  }
-
-  File getMetaTmpFile() {
-    return new File(getCurrentDir(), META_FILE_NAME
-        + AtomicFileOutputStream.TMP_EXTENSION);
-  }
-
-  File getOpenLogFile(long startIndex) {
-    return new File(getCurrentDir(), getOpenLogFileName(startIndex));
-  }
-
-  static String getOpenLogFileName(long startIndex) {
-    return LOG_FILE_PREFIX + "_" + LOG_FILE_INPROGRESS + "_" + startIndex;
-  }
-
-  File getClosedLogFile(long startIndex, long endIndex) {
-    return new File(getCurrentDir(), getClosedLogFileName(startIndex, endIndex));
-  }
-
-  static String getClosedLogFileName(long startIndex, long endIndex) {
-    return LOG_FILE_PREFIX + "_" + startIndex + "-" + endIndex;
-  }
-
-  public File getStateMachineDir() {
-    return new File(getRoot(), STATE_MACHINE);
-  }
-
-  /** Returns a uniquely named temporary directory under $rootdir/tmp/ */
-  public File getNewTempDir() {
-    return new File(new File(getRoot(), TEMP), UUID.randomUUID().toString());
-  }
-
-  public Path relativizeToRoot(Path p) {
-    if (p.isAbsolute()) {
-      return getRoot().toPath().relativize(p);
-    }
-    return p;
-  }
-
-  /**
-   * @return log segment files sorted based on their index.
-   */
-  @VisibleForTesting
-  public List<LogPathAndIndex> getLogSegmentFiles() throws IOException {
-    List<LogPathAndIndex> list = new ArrayList<>();
-    try (DirectoryStream<Path> stream =
-             Files.newDirectoryStream(getCurrentDir().toPath())) {
-      for (Path path : stream) {
-        for (Pattern pattern : LOGSEGMENTS_REGEXES) {
-          Matcher matcher = pattern.matcher(path.getFileName().toString());
-          if (matcher.matches()) {
-            final long startIndex = Long.parseLong(matcher.group(1));
-            final long endIndex = matcher.groupCount() == 2 ?
-                Long.parseLong(matcher.group(2)) : INVALID_LOG_INDEX;
-            list.add(new LogPathAndIndex(path, startIndex, endIndex));
-          }
-        }
-      }
-    }
-    Collections.sort(list,
-        (o1, o2) -> o1.startIndex == o2.startIndex ?
-            0 : (o1.startIndex < o2.startIndex ? -1 : 1));
-    return list;
-  }
-
-  /**
-   * Check to see if current/ directory is empty.
-   */
-  boolean isCurrentEmpty() throws IOException {
-    File currentDir = getCurrentDir();
-    if(!currentDir.exists()) {
-      // if current/ does not exist, it's safe to format it.
-      return true;
-    }
-    try(DirectoryStream<Path> dirStream =
-            newDirectoryStream(currentDir.toPath())) {
-      if (dirStream.iterator().hasNext()) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Check consistency of the storage directory.
-   *
-   * @return state {@link StorageState} of the storage directory
-   */
-  StorageState analyzeStorage(boolean toLock) throws IOException {
-    Preconditions.checkState(root != null, "root directory is null");
-
-    String rootPath = root.getCanonicalPath();
-    try { // check that storage exists
-      if (!root.exists()) {
-        LOG.info(rootPath + " does not exist. Creating ...");
-        if (!root.mkdirs()) {
-          throw new IOException("Cannot create directory " + rootPath);
-        }
-      }
-      // or is inaccessible
-      if (!root.isDirectory()) {
-        LOG.warn(rootPath + "is not a directory");
-        return StorageState.NON_EXISTENT;
-      }
-      if (!FileUtils.canWrite(root)) {
-        LOG.warn("Cannot access storage directory " + rootPath);
-        return StorageState.NON_EXISTENT;
-      }
-    } catch(SecurityException ex) {
-      LOG.warn("Cannot access storage directory " + rootPath, ex);
-      return StorageState.NON_EXISTENT;
-    }
-
-    if (toLock) {
-      this.lock(); // lock storage if it exists
-    }
-
-    // check whether current directory is valid
-    if (hasMetaFile()) {
-      return StorageState.NORMAL;
-    } else {
-      return StorageState.NOT_FORMATTED;
-    }
-  }
-
-  boolean hasMetaFile() throws IOException {
-    return getMetaFile().exists();
-  }
-
-  /**
-   * Lock storage to provide exclusive access.
-   *
-   * <p> Locking is not supported by all file systems.
-   * E.g., NFS does not consistently support exclusive locks.
-   *
-   * <p> If locking is supported we guarantee exclusive access to the
-   * storage directory. Otherwise, no guarantee is given.
-   *
-   * @throws IOException if locking fails
-   */
-  public void lock() throws IOException {
-    FileLock newLock = tryLock();
-    if (newLock == null) {
-      String msg = "Cannot lock storage " + this.root
-          + ". The directory is already locked";
-      LOG.info(msg);
-      throw new IOException(msg);
-    }
-    // Don't overwrite lock until success - this way if we accidentally
-    // call lock twice, the internal state won't be cleared by the second
-    // (failed) lock attempt
-    lock = newLock;
-  }
-
-  /**
-   * Attempts to acquire an exclusive lock on the storage.
-   *
-   * @return A lock object representing the newly-acquired lock or
-   * <code>null</code> if storage is already locked.
-   * @throws IOException if locking fails.
-   */
-  private FileLock tryLock() throws IOException {
-    boolean deletionHookAdded = false;
-    File lockF = new File(root, STORAGE_FILE_LOCK);
-    if (!lockF.exists()) {
-      lockF.deleteOnExit();
-      deletionHookAdded = true;
-    }
-    RandomAccessFile file = new RandomAccessFile(lockF, "rws");
-    String jvmName = ManagementFactory.getRuntimeMXBean().getName();
-    FileLock res;
-    try {
-      res = file.getChannel().tryLock();
-      if (null == res) {
-        LOG.error("Unable to acquire file lock on path " + lockF.toString());
-        throw new OverlappingFileLockException();
-      }
-      file.write(jvmName.getBytes(Charsets.UTF_8));
-      LOG.info("Lock on " + lockF + " acquired by nodename " + jvmName);
-    } catch (OverlappingFileLockException oe) {
-      // Cannot read from the locked file on Windows.
-      LOG.error("It appears that another process "
-          + "has already locked the storage directory: " + root, oe);
-      file.close();
-      return null;
-    } catch(IOException e) {
-      LOG.error("Failed to acquire lock on " + lockF
-          + ". If this storage directory is mounted via NFS, "
-          + "ensure that the appropriate nfs lock services are running.", e);
-      file.close();
-      throw e;
-    }
-    if (!deletionHookAdded) {
-      // If the file existed prior to our startup, we didn't
-      // call deleteOnExit above. But since we successfully locked
-      // the dir, we can take care of cleaning it up.
-      lockF.deleteOnExit();
-    }
-    return res;
-  }
-
-  /**
-   * Unlock storage.
-   */
-  public void unlock() throws IOException {
-    if (this.lock == null)
-      return;
-    this.lock.release();
-    lock.channel().close();
-    lock = null;
-  }
-
-  @Override
-  public String toString() {
-    return "Storage Directory " + this.root;
-  }
-}