You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ji...@apache.org on 2017/01/31 21:17:15 UTC

[30/54] [abbrv] incubator-ratis git commit: Renamed the packages from raft to ratis in preperation for Apache Incubation - Moved all java packages from org.apache.raft to org.apache.ratis. - Moved native package to org_apache_ratis, and native lib to l

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/io/nativeio/NativeIO.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/io/nativeio/NativeIO.java b/ratis-common/src/main/java/org/apache/ratis/io/nativeio/NativeIO.java
new file mode 100644
index 0000000..9d20682
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/io/nativeio/NativeIO.java
@@ -0,0 +1,805 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.io.nativeio;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.ratis.protocol.AlreadyExistsException;
+import org.apache.ratis.util.NativeCodeLoader;
+import org.apache.ratis.util.RaftUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.misc.Unsafe;
+
+import java.io.File;
+import java.io.FileDescriptor;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+
+/**
+ * JNI wrappers for various native IO-related calls not available in Java.
+ * These functions should generally be used alongside a fallback to another
+ * more portable mechanism.
+ */
+public class NativeIO {
+  private static final Logger LOG = LoggerFactory.getLogger(NativeIO.class);
+
+  public static class POSIX {
+    // Flags for open() call from bits/fcntl.h - Set by JNI
+    public static int O_RDONLY = -1;
+    public static int O_WRONLY = -1;
+    public static int O_RDWR = -1;
+    public static int O_CREAT = -1;
+    public static int O_EXCL = -1;
+    public static int O_NOCTTY = -1;
+    public static int O_TRUNC = -1;
+    public static int O_APPEND = -1;
+    public static int O_NONBLOCK = -1;
+    public static int O_SYNC = -1;
+
+    // Flags for posix_fadvise() from bits/fcntl.h - Set by JNI
+    /* No further special treatment.  */
+    public static int POSIX_FADV_NORMAL = -1;
+    /* Expect random page references.  */
+    public static int POSIX_FADV_RANDOM = -1;
+    /* Expect sequential page references.  */
+    public static int POSIX_FADV_SEQUENTIAL = -1;
+    /* Will need these pages.  */
+    public static int POSIX_FADV_WILLNEED = -1;
+    /* Don't need these pages.  */
+    public static int POSIX_FADV_DONTNEED = -1;
+    /* Data will be accessed once.  */
+    public static int POSIX_FADV_NOREUSE = -1;
+
+
+    // Updated by JNI when supported by glibc.  Leave defaults in case kernel
+    // supports sync_file_range, but glibc does not.
+    /* Wait upon writeout of all pages
+       in the range before performing the
+       write.  */
+    public static int SYNC_FILE_RANGE_WAIT_BEFORE = 1;
+    /* Initiate writeout of all those
+       dirty pages in the range which are
+       not presently under writeback.  */
+    public static int SYNC_FILE_RANGE_WRITE = 2;
+    /* Wait upon writeout of all pages in
+       the range after performing the
+       write.  */
+    public static int SYNC_FILE_RANGE_WAIT_AFTER = 4;
+
+    // Set to true via JNI if possible
+    public static boolean fadvisePossible = false;
+
+    private static boolean nativeLoaded = false;
+    private static boolean syncFileRangePossible = true;
+
+    private static long cacheTimeout = -1;
+
+    private static CacheManipulator cacheManipulator = new CacheManipulator();
+
+    public static CacheManipulator getCacheManipulator() {
+      return cacheManipulator;
+    }
+
+    public static void setCacheManipulator(CacheManipulator cacheManipulator) {
+      POSIX.cacheManipulator = cacheManipulator;
+    }
+
+    /**
+     * Used to manipulate the operating system cache.
+     */
+    @VisibleForTesting
+    public static class CacheManipulator {
+      public void mlock(String identifier, ByteBuffer buffer,
+          long len) throws IOException {
+        POSIX.mlock(buffer, len);
+      }
+
+      public long getMemlockLimit() {
+        return NativeIO.getMemlockLimit();
+      }
+
+      public long getOperatingSystemPageSize() {
+        return NativeIO.getOperatingSystemPageSize();
+      }
+
+      public void posixFadviseIfPossible(String identifier,
+        FileDescriptor fd, long offset, long len, int flags)
+            throws NativeIOException {
+        NativeIO.POSIX.posixFadviseIfPossible(identifier, fd, offset,
+            len, flags);
+      }
+
+      public boolean verifyCanMlock() {
+        return NativeIO.isAvailable();
+      }
+    }
+
+    /**
+     * A CacheManipulator used for testing which does not actually call mlock.
+     * This allows many tests to be run even when the operating system does not
+     * allow mlock, or only allows limited mlocking.
+     */
+    @VisibleForTesting
+    public static class NoMlockCacheManipulator extends CacheManipulator {
+      public void mlock(String identifier, ByteBuffer buffer,
+          long len) throws IOException {
+        LOG.info("mlocking " + identifier);
+      }
+
+      public long getMemlockLimit() {
+        return 1125899906842624L;
+      }
+
+      public long getOperatingSystemPageSize() {
+        return 4096;
+      }
+
+      public boolean verifyCanMlock() {
+        return true;
+      }
+    }
+
+    static {
+      initNativeLib();
+    }
+
+    /**
+     * Return true if the JNI-based native IO extensions are available.
+     */
+    public static boolean isAvailable() {
+      return NativeCodeLoader.isNativeCodeLoaded() && nativeLoaded;
+    }
+
+    private static void assertCodeLoaded() throws IOException {
+      if (!isAvailable()) {
+        throw new IOException("NativeIO was not loaded");
+      }
+    }
+
+    /** Wrapper around open(2) */
+    public static native FileDescriptor open(String path, int flags, int mode) throws IOException;
+    /** Wrapper around fstat(2) */
+    private static native Stat fstat(FileDescriptor fd) throws IOException;
+
+    /** Native chmod implementation. On UNIX, it is a wrapper around chmod(2) */
+    private static native void chmodImpl(String path, int mode) throws IOException;
+
+    public static void chmod(String path, int mode) throws IOException {
+      if (!RaftUtils.WINDOWS) {
+        chmodImpl(path, mode);
+      } else {
+        try {
+          chmodImpl(path, mode);
+        } catch (NativeIOException nioe) {
+          if (nioe.getErrorCode() == 3) {
+            throw new NativeIOException("No such file or directory",
+                Errno.ENOENT);
+          } else {
+            LOG.warn(String.format("NativeIO.chmod error (%d): %s",
+                nioe.getErrorCode(), nioe.getMessage()));
+            throw new NativeIOException("Unknown error", Errno.UNKNOWN);
+          }
+        }
+      }
+    }
+
+    /** Wrapper around posix_fadvise(2) */
+    static native void posix_fadvise(
+      FileDescriptor fd, long offset, long len, int flags) throws NativeIOException;
+
+    /** Wrapper around sync_file_range(2) */
+    static native void sync_file_range(
+      FileDescriptor fd, long offset, long nbytes, int flags) throws NativeIOException;
+
+    /**
+     * Call posix_fadvise on the given file descriptor. See the manpage
+     * for this syscall for more information. On systems where this
+     * call is not available, does nothing.
+     *
+     * @throws NativeIOException if there is an error with the syscall
+     */
+    static void posixFadviseIfPossible(String identifier,
+        FileDescriptor fd, long offset, long len, int flags)
+        throws NativeIOException {
+      if (nativeLoaded && fadvisePossible) {
+        try {
+          posix_fadvise(fd, offset, len, flags);
+        } catch (UnsatisfiedLinkError ule) {
+          fadvisePossible = false;
+        }
+      }
+    }
+
+    /**
+     * Call sync_file_range on the given file descriptor. See the manpage
+     * for this syscall for more information. On systems where this
+     * call is not available, does nothing.
+     *
+     * @throws NativeIOException if there is an error with the syscall
+     */
+    public static void syncFileRangeIfPossible(
+        FileDescriptor fd, long offset, long nbytes, int flags)
+        throws NativeIOException {
+      if (nativeLoaded && syncFileRangePossible) {
+        try {
+          sync_file_range(fd, offset, nbytes, flags);
+        } catch (UnsupportedOperationException | UnsatisfiedLinkError uoe) {
+          syncFileRangePossible = false;
+        }
+      }
+    }
+
+    static native void mlock_native(
+        ByteBuffer buffer, long len) throws NativeIOException;
+
+    /**
+     * Locks the provided direct ByteBuffer into memory, preventing it from
+     * swapping out. After a buffer is locked, future accesses will not incur
+     * a page fault.
+     *
+     * See the mlock(2) man page for more information.
+     */
+    static void mlock(ByteBuffer buffer, long len)
+        throws IOException {
+      assertCodeLoaded();
+      if (!buffer.isDirect()) {
+        throw new IOException("Cannot mlock a non-direct ByteBuffer");
+      }
+      mlock_native(buffer, len);
+    }
+
+    /**
+     * Unmaps the block from memory. See munmap(2).
+     *
+     * There isn't any portable way to unmap a memory region in Java.
+     * So we use the sun.nio method here.
+     * Note that unmapping a memory region could cause crashes if code
+     * continues to reference the unmapped code.  However, if we don't
+     * manually unmap the memory, we are dependent on the finalizer to
+     * do it, and we have no idea when the finalizer will run.
+     *
+     * @param buffer    The buffer to unmap.
+     */
+    public static void munmap(MappedByteBuffer buffer) {
+      if (buffer instanceof sun.nio.ch.DirectBuffer) {
+        sun.misc.Cleaner cleaner =
+            ((sun.nio.ch.DirectBuffer)buffer).cleaner();
+        cleaner.clean();
+      }
+    }
+
+    /** Linux only methods used for getOwner() implementation */
+    private static native long getUIDforFDOwnerforOwner(FileDescriptor fd) throws IOException;
+    private static native String getUserName(long uid) throws IOException;
+
+    /**
+     * Result type of the fstat call
+     */
+    public static class Stat {
+      private int ownerId, groupId;
+      private String owner, group;
+      private int mode;
+
+      // Mode constants - Set by JNI
+      public static int S_IFMT = -1;    /* type of file */
+      public static int S_IFIFO  = -1;  /* named pipe (fifo) */
+      public static int S_IFCHR  = -1;  /* character special */
+      public static int S_IFDIR  = -1;  /* directory */
+      public static int S_IFBLK  = -1;  /* block special */
+      public static int S_IFREG  = -1;  /* regular */
+      public static int S_IFLNK  = -1;  /* symbolic link */
+      public static int S_IFSOCK = -1;  /* socket */
+      public static int S_ISUID = -1;  /* set user id on execution */
+      public static int S_ISGID = -1;  /* set group id on execution */
+      public static int S_ISVTX = -1;  /* save swapped text even after use */
+      public static int S_IRUSR = -1;  /* read permission, owner */
+      public static int S_IWUSR = -1;  /* write permission, owner */
+      public static int S_IXUSR = -1;  /* execute/search permission, owner */
+
+      Stat(int ownerId, int groupId, int mode) {
+        this.ownerId = ownerId;
+        this.groupId = groupId;
+        this.mode = mode;
+      }
+
+      Stat(String owner, String group, int mode) {
+        if (!RaftUtils.WINDOWS) {
+          this.owner = owner;
+        } else {
+          this.owner = stripDomain(owner);
+        }
+        if (!RaftUtils.WINDOWS) {
+          this.group = group;
+        } else {
+          this.group = stripDomain(group);
+        }
+        this.mode = mode;
+      }
+
+      @Override
+      public String toString() {
+        return "Stat(owner='" + owner + "', group='" + group + "'" +
+          ", mode=" + mode + ")";
+      }
+
+      public String getOwner() {
+        return owner;
+      }
+      public String getGroup() {
+        return group;
+      }
+      public int getMode() {
+        return mode;
+      }
+    }
+
+    private static class CachedName {
+      final long timestamp;
+      final String name;
+
+      public CachedName(String name, long timestamp) {
+        this.name = name;
+        this.timestamp = timestamp;
+      }
+    }
+
+    public final static int MMAP_PROT_READ = 0x1;
+    public final static int MMAP_PROT_WRITE = 0x2;
+    public final static int MMAP_PROT_EXEC = 0x4;
+
+    public static native long mmap(FileDescriptor fd, int prot,
+        boolean shared, long length) throws IOException;
+
+    public static native void munmap(long addr, long length)
+        throws IOException;
+  }
+
+  private static boolean workaroundNonThreadSafePasswdCalls = false;
+
+
+  public static class Windows {
+    // Flags for CreateFile() call on Windows
+    public static final long GENERIC_READ = 0x80000000L;
+    public static final long GENERIC_WRITE = 0x40000000L;
+
+    public static final long FILE_SHARE_READ = 0x00000001L;
+    public static final long FILE_SHARE_WRITE = 0x00000002L;
+    public static final long FILE_SHARE_DELETE = 0x00000004L;
+
+    public static final long CREATE_NEW = 1;
+    public static final long CREATE_ALWAYS = 2;
+    public static final long OPEN_EXISTING = 3;
+    public static final long OPEN_ALWAYS = 4;
+    public static final long TRUNCATE_EXISTING = 5;
+
+    public static final long FILE_BEGIN = 0;
+    public static final long FILE_CURRENT = 1;
+    public static final long FILE_END = 2;
+
+    public static final long FILE_ATTRIBUTE_NORMAL = 0x00000080L;
+
+    /**
+     * Create a directory with permissions set to the specified mode.  By setting
+     * permissions at creation time, we avoid issues related to the user lacking
+     * WRITE_DAC rights on subsequent chmod calls.  One example where this can
+     * occur is writing to an SMB share where the user does not have Full Control
+     * rights, and therefore WRITE_DAC is denied.
+     *
+     * @param path directory to create
+     * @param mode permissions of new directory
+     * @throws IOException if there is an I/O error
+     */
+    public static void createDirectoryWithMode(File path, int mode)
+        throws IOException {
+      createDirectoryWithMode0(path.getAbsolutePath(), mode);
+    }
+
+    /** Wrapper around CreateDirectory() on Windows */
+    private static native void createDirectoryWithMode0(String path, int mode)
+        throws NativeIOException;
+
+    /** Wrapper around CreateFile() on Windows */
+    public static native FileDescriptor createFile(String path,
+        long desiredAccess, long shareMode, long creationDisposition)
+        throws IOException;
+
+    /**
+     * Create a file for write with permissions set to the specified mode.  By
+     * setting permissions at creation time, we avoid issues related to the user
+     * lacking WRITE_DAC rights on subsequent chmod calls.  One example where
+     * this can occur is writing to an SMB share where the user does not have
+     * Full Control rights, and therefore WRITE_DAC is denied.
+     *
+     * This method mimics the semantics implemented by the JDK in
+     * {@link FileOutputStream}.  The file is opened for truncate or
+     * append, the sharing mode allows other readers and writers, and paths
+     * longer than MAX_PATH are supported.  (See io_util_md.c in the JDK.)
+     *
+     * @param path file to create
+     * @param append if true, then open file for append
+     * @param mode permissions of new directory
+     * @return FileOutputStream of opened file
+     * @throws IOException if there is an I/O error
+     */
+    public static FileOutputStream createFileOutputStreamWithMode(File path,
+        boolean append, int mode) throws IOException {
+      long shareMode = FILE_SHARE_READ | FILE_SHARE_WRITE;
+      long creationDisposition = append ? OPEN_ALWAYS : CREATE_ALWAYS;
+      return new FileOutputStream(createFileWithMode0(path.getAbsolutePath(),
+          GENERIC_WRITE, shareMode, creationDisposition, mode));
+    }
+
+    /** Wrapper around CreateFile() with security descriptor on Windows */
+    private static native FileDescriptor createFileWithMode0(String path,
+        long desiredAccess, long shareMode, long creationDisposition, int mode)
+        throws NativeIOException;
+
+    /** Wrapper around SetFilePointer() on Windows */
+    public static native long setFilePointer(FileDescriptor fd,
+        long distanceToMove, long moveMethod) throws IOException;
+
+    /** Windows only methods used for getOwner() implementation */
+    private static native String getOwner(FileDescriptor fd) throws IOException;
+
+    /** Supported list of Windows access right flags */
+    public enum AccessRight {
+      ACCESS_READ (0x0001),      // FILE_READ_DATA
+      ACCESS_WRITE (0x0002),     // FILE_WRITE_DATA
+      ACCESS_EXECUTE (0x0020);   // FILE_EXECUTE
+
+      private final int accessRight;
+      AccessRight(int access) {
+        accessRight = access;
+      }
+
+      public int accessRight() {
+        return accessRight;
+      }
+    }
+
+    /** Windows only method used to check if the current process has requested
+     *  access rights on the given path. */
+    private static native boolean access0(String path, int requestedAccess);
+
+    /**
+     * Checks whether the current process has desired access rights on
+     * the given path.
+     *
+     * Longer term this native function can be substituted with JDK7
+     * function Files#isReadable, isWritable, isExecutable.
+     *
+     * @param path input path
+     * @param desiredAccess ACCESS_READ, ACCESS_WRITE or ACCESS_EXECUTE
+     * @return true if access is allowed
+     * @throws IOException I/O exception on error
+     */
+    public static boolean access(String path, AccessRight desiredAccess)
+        throws IOException {
+      return access0(path, desiredAccess.accessRight());
+    }
+
+    /**
+     * Extends both the minimum and maximum working set size of the current
+     * process.  This method gets the current minimum and maximum working set
+     * size, adds the requested amount to each and then sets the minimum and
+     * maximum working set size to the new values.  Controlling the working set
+     * size of the process also controls the amount of memory it can lock.
+     *
+     * @param delta amount to increment minimum and maximum working set size
+     * @throws IOException for any error
+     * @see POSIX#mlock(ByteBuffer, long)
+     */
+    public static native void extendWorkingSetSize(long delta) throws IOException;
+
+    static {
+      initNativeLib();
+    }
+  }
+
+  private static boolean nativeLoaded = false;
+
+  static {
+    initNativeLib();
+  }
+
+  private static void initNativeLib() {
+    if (NativeCodeLoader.isNativeCodeLoaded()) {
+      try {
+        initNative();
+        nativeLoaded = true;
+      } catch (Throwable t) {
+        LOG.debug("Unable to initialize NativeIO libraries", t);
+      }
+    }
+  }
+
+  /**
+   * Return true if the JNI-based native IO extensions are available.
+   */
+  public static boolean isAvailable() {
+    return NativeCodeLoader.isNativeCodeLoaded() && nativeLoaded;
+  }
+
+  /** Initialize the JNI method ID and class ID cache */
+  private static native void initNative();
+
+  /**
+   * Get the maximum number of bytes that can be locked into memory at any
+   * given point.
+   *
+   * @return 0 if no bytes can be locked into memory;
+   *         Long.MAX_VALUE if there is no limit;
+   *         The number of bytes that can be locked into memory otherwise.
+   */
+  static long getMemlockLimit() {
+    return isAvailable() ? getMemlockLimit0() : 0;
+  }
+
+  private static native long getMemlockLimit0();
+
+  /**
+   * @return the operating system's page size.
+   */
+  static long getOperatingSystemPageSize() {
+    try {
+      Field f = Unsafe.class.getDeclaredField("theUnsafe");
+      f.setAccessible(true);
+      Unsafe unsafe = (Unsafe)f.get(null);
+      return unsafe.pageSize();
+    } catch (Throwable e) {
+      LOG.warn("Unable to get operating system page size.  Guessing 4096.", e);
+      return 4096;
+    }
+  }
+
+  private static class CachedUid {
+    final long timestamp;
+    final String username;
+    public CachedUid(String username, long timestamp) {
+      this.timestamp = timestamp;
+      this.username = username;
+    }
+  }
+
+  private static boolean initialized = false;
+
+  /**
+   * The Windows logon name has two part, NetBIOS domain name and
+   * user account name, of the format DOMAIN\UserName. This method
+   * will remove the domain part of the full logon name.
+   *
+   * @param name full principal name containing the domain
+   * @return name with domain removed
+   */
+  private static String stripDomain(String name) {
+    int i = name.indexOf('\\');
+    if (i != -1)
+      name = name.substring(i + 1);
+    return name;
+  }
+
+  /**
+   * Create a FileInputStream that shares delete permission on the
+   * file opened, i.e. other process can delete the file the
+   * FileInputStream is reading. Only Windows implementation uses
+   * the native interface.
+   */
+  public static FileInputStream getShareDeleteFileInputStream(File f)
+      throws IOException {
+    if (!RaftUtils.WINDOWS) {
+      // On Linux the default FileInputStream shares delete permission
+      // on the file opened.
+      //
+      return new FileInputStream(f);
+    } else {
+      // Use Windows native interface to create a FileInputStream that
+      // shares delete permission on the file opened.
+      //
+      FileDescriptor fd = Windows.createFile(
+          f.getAbsolutePath(),
+          Windows.GENERIC_READ,
+          Windows.FILE_SHARE_READ |
+              Windows.FILE_SHARE_WRITE |
+              Windows.FILE_SHARE_DELETE,
+          Windows.OPEN_EXISTING);
+      return new FileInputStream(fd);
+    }
+  }
+
+  /**
+   * Create a FileInputStream that shares delete permission on the
+   * file opened at a given offset, i.e. other process can delete
+   * the file the FileInputStream is reading. Only Windows implementation
+   * uses the native interface.
+   */
+  public static FileInputStream getShareDeleteFileInputStream(File f, long seekOffset)
+      throws IOException {
+    if (!RaftUtils.WINDOWS) {
+      RandomAccessFile rf = new RandomAccessFile(f, "r");
+      if (seekOffset > 0) {
+        rf.seek(seekOffset);
+      }
+      return new FileInputStream(rf.getFD());
+    } else {
+      // Use Windows native interface to create a FileInputStream that
+      // shares delete permission on the file opened, and set it to the
+      // given offset.
+      //
+      FileDescriptor fd = NativeIO.Windows.createFile(
+          f.getAbsolutePath(),
+          NativeIO.Windows.GENERIC_READ,
+          NativeIO.Windows.FILE_SHARE_READ |
+              NativeIO.Windows.FILE_SHARE_WRITE |
+              NativeIO.Windows.FILE_SHARE_DELETE,
+          NativeIO.Windows.OPEN_EXISTING);
+      if (seekOffset > 0)
+        NativeIO.Windows.setFilePointer(fd, seekOffset, NativeIO.Windows.FILE_BEGIN);
+      return new FileInputStream(fd);
+    }
+  }
+
+  /**
+   * Create the specified File for write access, ensuring that it does not exist.
+   * @param f the file that we want to create
+   * @param permissions we want to have on the file (if security is enabled)
+   *
+   * @throws AlreadyExistsException if the file already exists
+   * @throws IOException if any other error occurred
+   */
+  public static FileOutputStream getCreateForWriteFileOutputStream(File f, int permissions)
+      throws IOException {
+    if (!RaftUtils.WINDOWS) {
+      // Use the native wrapper around open(2)
+      try {
+        FileDescriptor fd = NativeIO.POSIX.open(f.getAbsolutePath(),
+            NativeIO.POSIX.O_WRONLY | NativeIO.POSIX.O_CREAT
+                | NativeIO.POSIX.O_EXCL, permissions);
+        return new FileOutputStream(fd);
+      } catch (NativeIOException nioe) {
+        if (nioe.getErrno() == Errno.EEXIST) {
+          throw new AlreadyExistsException(nioe);
+        }
+        throw nioe;
+      }
+    } else {
+      // Use the Windows native APIs to create equivalent FileOutputStream
+      try {
+        FileDescriptor fd = NativeIO.Windows.createFile(f.getCanonicalPath(),
+            NativeIO.Windows.GENERIC_WRITE,
+            NativeIO.Windows.FILE_SHARE_DELETE
+                | NativeIO.Windows.FILE_SHARE_READ
+                | NativeIO.Windows.FILE_SHARE_WRITE,
+            NativeIO.Windows.CREATE_NEW);
+        NativeIO.POSIX.chmod(f.getCanonicalPath(), permissions);
+        return new FileOutputStream(fd);
+      } catch (NativeIOException nioe) {
+        if (nioe.getErrorCode() == 80) {
+          // ERROR_FILE_EXISTS
+          // 80 (0x50)
+          // The file exists
+          throw new AlreadyExistsException(nioe);
+        }
+        throw nioe;
+      }
+    }
+  }
+  
+  /**
+   * A version of renameTo that throws a descriptive exception when it fails.
+   *
+   * @param src                  The source path
+   * @param dst                  The destination path
+   * 
+   * @throws NativeIOException   On failure.
+   */
+  public static void renameTo(File src, File dst)
+      throws IOException {
+    if (!nativeLoaded) {
+      if (!src.renameTo(dst)) {
+        throw new IOException("renameTo(src=" + src + ", dst=" +
+          dst + ") failed.");
+      }
+    } else {
+      renameTo0(src.getAbsolutePath(), dst.getAbsolutePath());
+    }
+  }
+
+  /**
+   * A version of renameTo that throws a descriptive exception when it fails.
+   *
+   * @param src                  The source path
+   * @param dst                  The destination path
+   * 
+   * @throws NativeIOException   On failure.
+   */
+  private static native void renameTo0(String src, String dst)
+      throws NativeIOException;
+
+  private static native void link0(String src, String dst)
+      throws NativeIOException;
+
+  /**
+   * Unbuffered file copy from src to dst without tainting OS buffer cache
+   *
+   * In POSIX platform:
+   * It uses FileChannel#transferTo() which internally attempts
+   * unbuffered IO on OS with native sendfile64() support and falls back to
+   * buffered IO otherwise.
+   *
+   * It minimizes the number of FileChannel#transferTo call by passing the the
+   * src file size directly instead of a smaller size as the 3rd parameter.
+   * This saves the number of sendfile64() system call when native sendfile64()
+   * is supported. In the two fall back cases where sendfile is not supported,
+   * FileChannle#transferTo already has its own batching of size 8 MB and 8 KB,
+   * respectively.
+   *
+   * In Windows Platform:
+   * It uses its own native wrapper of CopyFileEx with COPY_FILE_NO_BUFFERING
+   * flag, which is supported on Windows Server 2008 and above.
+   *
+   * Ideally, we should use FileChannel#transferTo() across both POSIX and Windows
+   * platform. Unfortunately, the wrapper(Java_sun_nio_ch_FileChannelImpl_transferTo0)
+   * used by FileChannel#transferTo for unbuffered IO is not implemented on Windows.
+   * Based on OpenJDK 6/7/8 source code, Java_sun_nio_ch_FileChannelImpl_transferTo0
+   * on Windows simply returns IOS_UNSUPPORTED.
+   *
+   * Note: This simple native wrapper does minimal parameter checking before copy and
+   * consistency check (e.g., size) after copy.
+   * It is recommended to use wrapper function like
+   * the Storage#nativeCopyFileUnbuffered() function with pre/post copy checks.
+   *
+   * @param src                  The source path
+   * @param dst                  The destination path
+   */
+  public static void copyFileUnbuffered(File src, File dst) throws IOException {
+    if (nativeLoaded && RaftUtils.WINDOWS) {
+      copyFileUnbuffered0(src.getAbsolutePath(), dst.getAbsolutePath());
+    } else {
+      FileInputStream fis = null;
+      FileOutputStream fos = null;
+      FileChannel input = null;
+      FileChannel output = null;
+      try {
+        fis = new FileInputStream(src);
+        fos = new FileOutputStream(dst);
+        input = fis.getChannel();
+        output = fos.getChannel();
+        long remaining = input.size();
+        long position = 0;
+        long transferred;
+        while (remaining > 0) {
+          transferred = input.transferTo(position, remaining, output);
+          remaining -= transferred;
+          position += transferred;
+        }
+      } finally {
+        RaftUtils.cleanup(LOG, output, fos, input, fis);
+      }
+    }
+  }
+
+  private static native void copyFileUnbuffered0(String src, String dst)
+      throws NativeIOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/io/nativeio/NativeIOException.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/io/nativeio/NativeIOException.java b/ratis-common/src/main/java/org/apache/ratis/io/nativeio/NativeIOException.java
new file mode 100644
index 0000000..58b83e7
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/io/nativeio/NativeIOException.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.io.nativeio;
+
+import java.io.IOException;
+
+import org.apache.ratis.util.RaftUtils;
+
+
+/**
+ * An exception generated by a call to the native IO code.
+ *
+ * These exceptions simply wrap <i>errno</i> result codes on Linux,
+ * or the System Error Code on Windows.
+ */
+public class NativeIOException extends IOException {
+  private static final long serialVersionUID = 1L;
+
+  private Errno errno;
+
+  // Java has no unsigned primitive error code. Use a signed 32-bit
+  // integer to hold the unsigned 32-bit integer.
+  private int errorCode;
+
+  public NativeIOException(String msg, Errno errno) {
+    super(msg);
+    this.errno = errno;
+    // Windows error code is always set to ERROR_SUCCESS on Linux,
+    // i.e. no failure on Windows
+    this.errorCode = 0;
+  }
+
+  public NativeIOException(String msg, int errorCode) {
+    super(msg);
+    this.errorCode = errorCode;
+    this.errno = Errno.UNKNOWN;
+  }
+
+  public long getErrorCode() {
+    return errorCode;
+  }
+
+  public Errno getErrno() {
+    return errno;
+  }
+
+  @Override
+  public String toString() {
+    if (RaftUtils.WINDOWS)
+      return errorCode + ": " + super.getMessage();
+    else
+      return errno.toString() + ": " + super.getMessage();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/AlreadyExistsException.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/AlreadyExistsException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/AlreadyExistsException.java
new file mode 100644
index 0000000..cc441f2
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/AlreadyExistsException.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+import java.io.IOException;
+
+/**
+ * Signals that an attempt to create a file at a given pathname has failed
+ * because another file already existed at that path.
+ */
+public class AlreadyExistsException extends IOException {
+  private static final long serialVersionUID = 1L;
+
+  public AlreadyExistsException(String msg) {
+    super(msg);
+  }
+
+  public AlreadyExistsException(Throwable cause) {
+    super(cause);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/ChecksumException.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/ChecksumException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/ChecksumException.java
new file mode 100644
index 0000000..1742c24
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/ChecksumException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.protocol;
+
+import java.io.IOException;
+
+/** Thrown for checksum errors. */
+public class ChecksumException extends IOException {
+  private static final long serialVersionUID = 1L;
+  private long pos;
+  public ChecksumException(String description, long pos) {
+    super(description);
+    this.pos = pos;
+  }
+
+  public long getPos() {
+    return pos;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java b/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java
new file mode 100644
index 0000000..77ef267
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+
+/**
+ * The information clients append to the raft ring.
+ */
+public interface Message {
+  /**
+   * @return the content of the message
+   */
+  ByteString getContent();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/NotLeaderException.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/NotLeaderException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/NotLeaderException.java
new file mode 100644
index 0000000..1306290
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/NotLeaderException.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+public class NotLeaderException extends RaftException {
+  private final RaftPeer suggestedLeader;
+  /** the client may need to update its RaftPeer list */
+  private final RaftPeer[] peers;
+
+  public NotLeaderException(String id, RaftPeer suggestedLeader,
+      RaftPeer[] peers) {
+    super("Server " + id + " is not the leader (" + suggestedLeader
+        + "). Request must be sent to leader.");
+    this.suggestedLeader = suggestedLeader;
+    this.peers = peers == null ? RaftPeer.EMPTY_PEERS : peers;
+  }
+
+  public RaftPeer getSuggestedLeader() {
+    return suggestedLeader;
+  }
+
+  public RaftPeer[] getPeers() {
+    return peers;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java
new file mode 100644
index 0000000..3298431
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+/** Asynchronous version of {@link RaftClientProtocol}. */
+public interface RaftClientAsynchronousProtocol {
+  CompletableFuture<RaftClientReply> submitClientRequestAsync(
+      RaftClientRequest request) throws IOException;
+
+  CompletableFuture<RaftClientReply> setConfigurationAsync(
+      SetConfigurationRequest request) throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientProtocol.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientProtocol.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientProtocol.java
new file mode 100644
index 0000000..b3cbcc3
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientProtocol.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+import java.io.IOException;
+
+public interface RaftClientProtocol {
+  RaftClientReply submitClientRequest(RaftClientRequest request) throws IOException;
+
+  RaftClientReply setConfiguration(SetConfigurationRequest request) throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
new file mode 100644
index 0000000..8c5cd75
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+public class RaftClientReply extends RaftRpcMessage {
+  private final String requestorId;
+  private final String replierId;
+  private final boolean success;
+  private final long seqNum;
+
+  /** non-null if the server is not leader */
+  private final NotLeaderException notLeaderException;
+  private final Message message;
+
+  public RaftClientReply(String requestorId, String replierId, long seqNum,
+      boolean success, Message message, NotLeaderException notLeaderException) {
+    this.requestorId = requestorId;
+    this.replierId = replierId;
+    this.success = success;
+    this.seqNum = seqNum;
+    this.message = message;
+    this.notLeaderException = notLeaderException;
+  }
+
+  public RaftClientReply(RaftClientRequest request,
+      NotLeaderException notLeaderException) {
+    this(request.getRequestorId(), request.getReplierId(), request.getSeqNum(),
+        false, null, notLeaderException);
+  }
+
+  public RaftClientReply(RaftClientRequest request, Message message) {
+    this(request.getRequestorId(), request.getReplierId(), request.getSeqNum(),
+        true, message, null);
+  }
+
+  @Override
+  public final boolean isRequest() {
+    return false;
+  }
+
+  @Override
+  public String getRequestorId() {
+    return requestorId;
+  }
+
+  @Override
+  public String getReplierId() {
+    return replierId;
+  }
+
+  public long getSeqNum() {
+    return seqNum;
+  }
+
+  @Override
+  public String toString() {
+    return super.toString() + ", seqNum: " + getSeqNum()
+        + ", success: " + isSuccess();
+  }
+
+  public boolean isSuccess() {
+    return success;
+  }
+
+  public Message getMessage() {
+    return message;
+  }
+
+  public NotLeaderException getNotLeaderException() {
+    return notLeaderException;
+  }
+
+  public boolean isNotLeader() {
+    return notLeaderException != null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
new file mode 100644
index 0000000..90b648a
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+public class RaftClientRequest extends RaftRpcMessage {
+  private final String requestorId;
+  private final String replierId;
+  private final long seqNum;
+  private final Message message;
+  private final boolean readOnly;
+
+  public RaftClientRequest(String  requestorId, String replierId, long seqNum,
+                           Message message) {
+    this(requestorId, replierId, seqNum, message, false);
+  }
+
+  public RaftClientRequest(String requestorId, String replierId, long seqNum,
+       Message message, boolean readOnly) {
+    this.requestorId = requestorId;
+    this.replierId = replierId;
+    this.seqNum = seqNum;
+    this.message = message;
+    this.readOnly = readOnly;
+  }
+
+  @Override
+  public final boolean isRequest() {
+    return true;
+  }
+
+  @Override
+  public String getRequestorId() {
+    return requestorId;
+  }
+
+  @Override
+  public String getReplierId() {
+    return replierId;
+  }
+
+  public long getSeqNum() {
+    return seqNum;
+  }
+
+  public Message getMessage() {
+    return message;
+  }
+
+  public boolean isReadOnly() {
+    return readOnly;
+  }
+
+  @Override
+  public String toString() {
+    return super.toString() + ", seqNum: " + seqNum + ", "
+        + (isReadOnly()? "RO": "RW");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/RaftException.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftException.java
new file mode 100644
index 0000000..11aac90
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftException.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+import java.io.IOException;
+
+public class RaftException extends IOException {
+  private static final long serialVersionUID = 1L;
+
+  public RaftException(String message) {
+    super(message);
+  }
+
+  public RaftException(Throwable cause) {
+    super(cause);
+  }
+
+  public RaftException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java
new file mode 100644
index 0000000..a32aaa0
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+import com.google.common.net.HostAndPort;
+
+import java.net.InetSocketAddress;
+
+/**
+ * A {@link RaftPeer} is a server in a Raft cluster.
+ *
+ * The objects of this class are immutable.
+ */
+public class RaftPeer {
+  public static final RaftPeer[] EMPTY_PEERS = {};
+
+  /** The id of the peer. */
+  private final String id;
+  /** The address of the peer. */
+  private final String address;
+
+  /** Construct a peer with the given id and a null address. */
+  public RaftPeer(String id) {
+    this(id, (String)null);
+  }
+
+  /** Construct a peer with the given id and address. */
+  public RaftPeer(String id, InetSocketAddress address) {
+    this(id, address == null ? null :
+        HostAndPort.fromParts(address.getAddress().getHostAddress(),
+            address.getPort()).toString());
+  }
+
+  /** Construct a peer with the given id and address. */
+  public RaftPeer(String id, String address) {
+    this.id = id;
+    this.address = address;
+  }
+
+  /** @return The id of the peer. */
+  public String getId() {
+    return id;
+  }
+
+  /** @return The address of the peer. */
+  public String getAddress() {
+    return address;
+  }
+
+  @Override
+  public String toString() {
+    return id + ":" + address;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    return (o instanceof RaftPeer) && id.equals(((RaftPeer) o).getId());
+  }
+
+  @Override
+  public int hashCode() {
+    return id.hashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRpcMessage.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRpcMessage.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRpcMessage.java
new file mode 100644
index 0000000..82f1ebb
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRpcMessage.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+public abstract class RaftRpcMessage {
+
+  public abstract boolean isRequest();
+
+  public abstract String getRequestorId();
+
+  public abstract String getReplierId();
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName() + "(" + getRequestorId()
+        + (isRequest()? "->": "<-") + getReplierId() + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/ReconfigurationInProgressException.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/ReconfigurationInProgressException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/ReconfigurationInProgressException.java
new file mode 100644
index 0000000..3d0f093
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/ReconfigurationInProgressException.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+public class ReconfigurationInProgressException extends RaftException {
+  public ReconfigurationInProgressException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/ReconfigurationTimeoutException.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/ReconfigurationTimeoutException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/ReconfigurationTimeoutException.java
new file mode 100644
index 0000000..69a2e51
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/ReconfigurationTimeoutException.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+public class ReconfigurationTimeoutException extends RaftException {
+  public ReconfigurationTimeoutException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
new file mode 100644
index 0000000..84449d4
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+import java.util.Arrays;
+
+public class SetConfigurationRequest extends RaftClientRequest {
+  private final RaftPeer[] peers;
+
+  public SetConfigurationRequest(String requestorId, String replierId,
+      long seqNum, RaftPeer[] peers) {
+    super(requestorId, replierId, seqNum, null);
+    this.peers = peers;
+  }
+
+  public RaftPeer[] getPeersInNewConf() {
+    return peers;
+  }
+
+  @Override
+  public String toString() {
+    return super.toString() + ", peers:" + Arrays.asList(getPeersInNewConf());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java
new file mode 100644
index 0000000..099133d
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+public class StateMachineException extends RaftException {
+  public StateMachineException(String serverId, Exception cause) {
+    super(cause.getClass().getName() + " from Server " + serverId, cause);
+  }
+
+  public StateMachineException(String msg) {
+    super(msg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/util/AtomicFileOutputStream.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/AtomicFileOutputStream.java b/ratis-common/src/main/java/org/apache/ratis/util/AtomicFileOutputStream.java
new file mode 100644
index 0000000..e8e267e
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/AtomicFileOutputStream.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.apache.ratis.io.nativeio.NativeIO;
+import org.apache.ratis.io.nativeio.NativeIOException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+
+/**
+ * A FileOutputStream that has the property that it will only show
+ * up at its destination once it has been entirely written and flushed
+ * to disk. While being written, it will use a .tmp suffix.
+ *
+ * When the output stream is closed, it is flushed, fsynced, and
+ * will be moved into place, overwriting any file that already
+ * exists at that location.
+ *
+ * <b>NOTE</b>: on Windows platforms, it will not atomically
+ * replace the target file - instead the target file is deleted
+ * before this one is moved into place.
+ */
+public class AtomicFileOutputStream extends FilterOutputStream {
+
+  public static final String TMP_EXTENSION = ".tmp";
+
+  public static final Logger LOG = LoggerFactory.getLogger(AtomicFileOutputStream.class);
+
+  private final File origFile;
+  private final File tmpFile;
+
+  public AtomicFileOutputStream(File f) throws FileNotFoundException {
+    // Code unfortunately must be duplicated below since we can't assign anything
+    // before calling super
+    super(new FileOutputStream(new File(f.getParentFile(), f.getName() + TMP_EXTENSION)));
+    origFile = f.getAbsoluteFile();
+    tmpFile = new File(f.getParentFile(), f.getName() + TMP_EXTENSION).getAbsoluteFile();
+  }
+
+  @Override
+  public void close() throws IOException {
+    boolean triedToClose = false, success = false;
+    try {
+      flush();
+      ((FileOutputStream)out).getChannel().force(true);
+
+      triedToClose = true;
+      super.close();
+      success = true;
+    } finally {
+      if (success) {
+        boolean renamed = tmpFile.renameTo(origFile);
+        if (!renamed) {
+          // On windows, renameTo does not replace.
+          if (origFile.exists() && !origFile.delete()) {
+            throw new IOException("Could not delete original file " + origFile);
+          }
+          try {
+            NativeIO.renameTo(tmpFile, origFile);
+          } catch (NativeIOException e) {
+            throw new IOException("Could not rename temporary file " + tmpFile
+                + " to " + origFile + " due to failure in native rename. "
+                + e.toString());
+          }
+        }
+      } else {
+        if (!triedToClose) {
+          // If we failed when flushing, try to close it to not leak an FD
+          RaftUtils.cleanup(LOG, out);
+        }
+        // close wasn't successful, try to delete the tmp file
+        if (!tmpFile.delete()) {
+          LOG.warn("Unable to delete tmp file " + tmpFile);
+        }
+      }
+    }
+  }
+
+  /**
+   * Close the atomic file, but do not "commit" the temporary file
+   * on top of the destination. This should be used if there is a failure
+   * in writing.
+   */
+  public void abort() {
+    try {
+      super.close();
+    } catch (IOException ioe) {
+      LOG.warn("Unable to abort file " + tmpFile, ioe);
+    }
+    if (!tmpFile.delete()) {
+      LOG.warn("Unable to delete tmp file during abort " + tmpFile);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/util/AutoCloseableLock.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/AutoCloseableLock.java b/ratis-common/src/main/java/org/apache/ratis/util/AutoCloseableLock.java
new file mode 100644
index 0000000..489b5cd
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/AutoCloseableLock.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Wrap a lock with the {@link AutoCloseable} interface
+ * so that the {@link #close()} method will unlock the lock.
+ */
+public class AutoCloseableLock implements AutoCloseable {
+  /**
+   * Acquire the given lock and then wrap it with {@link AutoCloseableLock}
+   * so that the given lock can be released by calling {@link #close()},
+   * or by using a {@code try}-with-resources statement as shown below.
+   *
+   * <pre> {@code
+   * try(AutoCloseableLock acl = AutoCloseableLock.acquire(lock)) {
+   *   ...
+   * }}</pre>
+   */
+  public static AutoCloseableLock acquire(final Lock lock) {
+    lock.lock();
+    return new AutoCloseableLock(lock);
+  }
+
+  private final Lock underlying;
+  private final AtomicBoolean closed = new AtomicBoolean(false);
+
+  private AutoCloseableLock(Lock underlying) {
+    this.underlying = underlying;
+  }
+
+  /** Unlock the underlying lock.  This method is idempotent. */
+  @Override
+  public void close() {
+    if (closed.compareAndSet(false, true)) {
+      underlying.unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/util/CheckedFunction.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/CheckedFunction.java b/ratis-common/src/main/java/org/apache/ratis/util/CheckedFunction.java
new file mode 100644
index 0000000..4badc66
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/CheckedFunction.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import java.util.function.Function;
+
+/** Function with a throws-clause. */
+@FunctionalInterface
+public interface CheckedFunction<INPUT, OUTPUT, THROWABLE extends Throwable> {
+  /**
+   * The same as {@link Function#apply(Object)}
+   * except that this method is declared with a throws-clause.
+   */
+  OUTPUT apply(INPUT input) throws THROWABLE;
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/util/CheckedRunnable.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/CheckedRunnable.java b/ratis-common/src/main/java/org/apache/ratis/util/CheckedRunnable.java
new file mode 100644
index 0000000..b6e90b9
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/CheckedRunnable.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+/** Runnable with a throws-clause. */
+@FunctionalInterface
+public interface CheckedRunnable<THROWABLE extends Throwable> {
+  /**
+   * The same as {@link Runnable#run()}
+   * except that this method is declared with a throws-clause.
+   */
+  void run() throws THROWABLE;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/util/CodeInjectionForTesting.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/CodeInjectionForTesting.java b/ratis-common/src/main/java/org/apache/ratis/util/CodeInjectionForTesting.java
new file mode 100644
index 0000000..60b5daf
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/CodeInjectionForTesting.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** Inject code for testing. */
+public class CodeInjectionForTesting {
+  public static final Logger LOG = LoggerFactory.getLogger(CodeInjectionForTesting.class);
+
+  /** Code to be injected. */
+  public interface Code {
+    Logger LOG = CodeInjectionForTesting.LOG;
+
+    /**
+     * Execute the injected code for testing.
+     * @param localId the id of the local peer
+     * @param remoteId the id of the remote peer if handling a request
+     * @param args other possible args
+     * @return if the injected code is executed
+     */
+    boolean execute(String localId, String remoteId, Object... args);
+  }
+
+  private static final Map<String, Code> INJECTION_POINTS
+      = new ConcurrentHashMap<>();
+
+  /** Put an injection point. */
+  public static void put(String injectionPoint, Code code) {
+    LOG.debug("put: {}, {}", injectionPoint, code);
+    INJECTION_POINTS.put(injectionPoint, code);
+  }
+
+  /** Execute the injected code, if there is any. */
+  public static boolean execute(String injectionPoint, String localId,
+      String remoteId, Object... args) {
+    final Code code = INJECTION_POINTS.get(injectionPoint);
+    if (code == null) {
+      return false;
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("execute: {}, {}, localId={}, remoteId={}, args={}",
+          injectionPoint, code, localId, remoteId, Arrays.toString(args));
+    }
+    return code.execute(localId, remoteId, args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java b/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java
new file mode 100644
index 0000000..1ef95ae
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+public class Daemon extends Thread {
+  {
+    setDaemon(true);
+  }
+
+  /** Construct a daemon thread. */
+  public Daemon() {
+    super();
+  }
+
+  /** Construct a daemon thread with the given runnable. */
+  public Daemon(Runnable runnable) {
+    super(runnable);
+    this.setName(runnable.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/util/ExitUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ExitUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ExitUtils.java
new file mode 100644
index 0000000..4404344
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ExitUtils.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.slf4j.Logger;
+
+/** Facilitates hooking process termination for tests and debugging. */
+public class ExitUtils {
+  public static class ExitException extends RuntimeException {
+    private static final long serialVersionUID = 1L;
+
+    public final int status;
+
+    public ExitException(int status, String message, Throwable throwable) {
+      super(message, throwable);
+      this.status = status;
+    }
+  }
+
+  private static volatile boolean systemExitDisabled = false;
+  private static volatile ExitException firstExitException;
+
+  /**
+   * @return the first {@link ExitException} thrown, or null if none thrown yet.
+   */
+  public static ExitException getFirstExitException() {
+    return firstExitException;
+  }
+
+  /**
+   * Reset the tracking of process termination.
+   * This is useful when some tests expect an exit but the others do not.
+   */
+  public static void resetFirstExitException() {
+    firstExitException = null;
+  }
+
+  /** @return true if {@link #terminate(int, String, Throwable, Logger)} has been invoked. */
+  public static boolean isTerminated() {
+    // Either this member is set or System.exit is actually invoked.
+    return firstExitException != null;
+  }
+
+  /** Disable the use of {@link System#exit(int)} for testing. */
+  public static void disableSystemExit() {
+    systemExitDisabled = true;
+  }
+
+  /**
+   * Terminate the current process. Note that terminate is the *only* method
+   * that should be used to terminate the daemon processes.
+   *
+   * @param status Exit status
+   * @param message message used to create the {@code ExitException}
+   * @throws ExitException if System.exit is disabled for test purposes
+   */
+  public static void terminate(
+      int status, String message, Throwable throwable, Logger log)
+      throws ExitException {
+    if (log != null) {
+      final String s = "Terminating with exit status " + status + ": " + message;
+      if (status == 0) {
+        log.info(s, throwable);
+      } else {
+        log.error(s, throwable);
+      }
+    }
+
+    if (!systemExitDisabled) {
+      System.exit(status);
+    }
+
+    final ExitException ee = new ExitException(status, message, throwable);
+    if (firstExitException == null) {
+      firstExitException = ee;
+    }
+    throw ee;
+  }
+
+  public static void terminate(int status, String message, Logger log) {
+    terminate(status, message, null, log);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java
new file mode 100644
index 0000000..5dc509d
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.apache.ratis.io.nativeio.NativeIO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+
+public class FileUtils {
+  public static final Logger LOG = LoggerFactory.getLogger(FileUtils.class);
+
+  public static void truncateFile(File f, long target) throws IOException {
+    try (FileOutputStream out = new FileOutputStream(f, true)) {
+      out.getChannel().truncate(target);
+    }
+  }
+
+  public static void deleteFile(File f) throws IOException {
+    try {
+      Files.delete(f.toPath());
+    } catch (IOException e) {
+      LOG.warn("Could not delete " + f);
+      throw e;
+    }
+  }
+
+  /**
+   * Delete a directory and all its contents.  If
+   * we return false, the directory may be partially-deleted.
+   * (1) If dir is symlink to a file, the symlink is deleted. The file pointed
+   *     to by the symlink is not deleted.
+   * (2) If dir is symlink to a directory, symlink is deleted. The directory
+   *     pointed to by symlink is not deleted.
+   * (3) If dir is a normal file, it is deleted.
+   * (4) If dir is a normal directory, then dir and all its contents recursively
+   *     are deleted.
+   */
+  public static boolean fullyDelete(final File dir) {
+    if (deleteImpl(dir, false)) {
+      // dir is (a) normal file, (b) symlink to a file, (c) empty directory or
+      // (d) symlink to a directory
+      return true;
+    }
+    // handle nonempty directory deletion
+    return fullyDeleteContents(dir) && deleteImpl(dir, true);
+  }
+
+  private static boolean deleteImpl(final File f, final boolean doLog) {
+    if (f == null) {
+      LOG.warn("null file argument.");
+      return false;
+    }
+    final boolean wasDeleted = f.delete();
+    if (wasDeleted) {
+      return true;
+    }
+    final boolean ex = f.exists();
+    if (doLog && ex) {
+      LOG.warn("Failed to delete file or dir ["
+          + f.getAbsolutePath() + "]: it still exists.");
+    }
+    return !ex;
+  }
+
+  /**
+   * Delete the contents of a directory, not the directory itself.  If
+   * we return false, the directory may be partially-deleted.
+   * If dir is a symlink to a directory, all the contents of the actual
+   * directory pointed to by dir will be deleted.
+   */
+  private static boolean fullyDeleteContents(final File dir) {
+    boolean deletionSucceeded = true;
+    final File[] contents = dir.listFiles();
+    if (contents != null) {
+      for (File content : contents) {
+        if (content.isFile()) {
+          if (!deleteImpl(content, true)) {
+            deletionSucceeded = false;
+          }
+        } else {
+          // Either directory or symlink to another directory.
+          // Try deleting the directory as this might be a symlink
+          if (deleteImpl(content, false)) {
+            // this was indeed a symlink or an empty directory
+            continue;
+          }
+          // if not an empty directory or symlink let
+          // fullyDelete handle it.
+          if (!fullyDelete(content)) {
+            deletionSucceeded = false;
+            // continue deletion of other files/dirs under dir
+          }
+        }
+      }
+    }
+    return deletionSucceeded;
+  }
+
+  /**
+   * Interprets the passed string as a URI. In case of error it
+   * assumes the specified string is a file.
+   *
+   * @param s the string to interpret
+   * @return the resulting URI
+   */
+  public static URI stringAsURI(String s) throws IOException {
+    URI u = null;
+    // try to make a URI
+    try {
+      u = new URI(s);
+    } catch (URISyntaxException e){
+      LOG.error("Syntax error in URI " + s
+          + ". Please check hdfs configuration.", e);
+    }
+
+    // if URI is null or scheme is undefined, then assume it's file://
+    if(u == null || u.getScheme() == null){
+      LOG.warn("Path " + s + " should be specified as a URI "
+          + "in configuration files. Please update configuration.");
+      u = fileAsURI(new File(s));
+    }
+    return u;
+  }
+
+  /**
+   * Converts the passed File to a URI. This method trims the trailing slash if
+   * one is appended because the underlying file is in fact a directory that
+   * exists.
+   *
+   * @param f the file to convert
+   * @return the resulting URI
+   */
+  public static URI fileAsURI(File f) throws IOException {
+    URI u = f.getCanonicalFile().toURI();
+
+    // trim the trailing slash, if it's present
+    if (u.getPath().endsWith("/")) {
+      String uriAsString = u.toString();
+      try {
+        u = new URI(uriAsString.substring(0, uriAsString.length() - 1));
+      } catch (URISyntaxException e) {
+        throw new IOException(e);
+      }
+    }
+    return u;
+  }
+
+  /**
+   * A wrapper for {@link File#listFiles()}. This java.io API returns null
+   * when a dir is not a directory or for any I/O error. Instead of having
+   * null check everywhere File#listFiles() is used, we will add utility API
+   * to get around this problem. For the majority of cases where we prefer
+   * an IOException to be thrown.
+   * @param dir directory for which listing should be performed
+   * @return list of files or empty list
+   * @exception IOException for invalid directory or for a bad disk.
+   */
+  public static File[] listFiles(File dir) throws IOException {
+    File[] files = dir.listFiles();
+    if(files == null) {
+      throw new IOException("Invalid directory or I/O error occurred for dir: "
+          + dir.toString());
+    }
+    return files;
+  }
+
+  /**
+   * Platform independent implementation for {@link File#canWrite()}
+   * @param f input file
+   * @return On Unix, same as {@link File#canWrite()}
+   *         On Windows, true if process has write access on the path
+   */
+  public static boolean canWrite(File f) {
+    if (RaftUtils.WINDOWS) {
+      try {
+        return NativeIO.Windows.access(f.getCanonicalPath(),
+            NativeIO.Windows.AccessRight.ACCESS_WRITE);
+      } catch (IOException e) {
+        return false;
+      }
+    } else {
+      return f.canWrite();
+    }
+  }
+}