You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ji...@apache.org on 2017/01/31 21:17:15 UTC
[30/54] [abbrv] incubator-ratis git commit: Renamed the packages from
raft to ratis in preperation for Apache Incubation - Moved all java packages
from org.apache.raft to org.apache.ratis. - Moved native package to
org_apache_ratis, and native lib to l
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/io/nativeio/NativeIO.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/io/nativeio/NativeIO.java b/ratis-common/src/main/java/org/apache/ratis/io/nativeio/NativeIO.java
new file mode 100644
index 0000000..9d20682
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/io/nativeio/NativeIO.java
@@ -0,0 +1,805 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.io.nativeio;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.ratis.protocol.AlreadyExistsException;
+import org.apache.ratis.util.NativeCodeLoader;
+import org.apache.ratis.util.RaftUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.misc.Unsafe;
+
+import java.io.File;
+import java.io.FileDescriptor;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+
+/**
+ * JNI wrappers for various native IO-related calls not available in Java.
+ * These functions should generally be used alongside a fallback to another
+ * more portable mechanism.
+ */
+public class NativeIO {
+ private static final Logger LOG = LoggerFactory.getLogger(NativeIO.class);
+
+ public static class POSIX {
+ // Flags for open() call from bits/fcntl.h - Set by JNI
+ public static int O_RDONLY = -1;
+ public static int O_WRONLY = -1;
+ public static int O_RDWR = -1;
+ public static int O_CREAT = -1;
+ public static int O_EXCL = -1;
+ public static int O_NOCTTY = -1;
+ public static int O_TRUNC = -1;
+ public static int O_APPEND = -1;
+ public static int O_NONBLOCK = -1;
+ public static int O_SYNC = -1;
+
+ // Flags for posix_fadvise() from bits/fcntl.h - Set by JNI
+ /* No further special treatment. */
+ public static int POSIX_FADV_NORMAL = -1;
+ /* Expect random page references. */
+ public static int POSIX_FADV_RANDOM = -1;
+ /* Expect sequential page references. */
+ public static int POSIX_FADV_SEQUENTIAL = -1;
+ /* Will need these pages. */
+ public static int POSIX_FADV_WILLNEED = -1;
+ /* Don't need these pages. */
+ public static int POSIX_FADV_DONTNEED = -1;
+ /* Data will be accessed once. */
+ public static int POSIX_FADV_NOREUSE = -1;
+
+
+ // Updated by JNI when supported by glibc. Leave defaults in case kernel
+ // supports sync_file_range, but glibc does not.
+ /* Wait upon writeout of all pages
+ in the range before performing the
+ write. */
+ public static int SYNC_FILE_RANGE_WAIT_BEFORE = 1;
+ /* Initiate writeout of all those
+ dirty pages in the range which are
+ not presently under writeback. */
+ public static int SYNC_FILE_RANGE_WRITE = 2;
+ /* Wait upon writeout of all pages in
+ the range after performing the
+ write. */
+ public static int SYNC_FILE_RANGE_WAIT_AFTER = 4;
+
+ // Set to true via JNI if possible
+ public static boolean fadvisePossible = false;
+
+ private static boolean nativeLoaded = false;
+ private static boolean syncFileRangePossible = true;
+
+ private static long cacheTimeout = -1;
+
+ private static CacheManipulator cacheManipulator = new CacheManipulator();
+
+ public static CacheManipulator getCacheManipulator() {
+ return cacheManipulator;
+ }
+
+ public static void setCacheManipulator(CacheManipulator cacheManipulator) {
+ POSIX.cacheManipulator = cacheManipulator;
+ }
+
+ /**
+ * Used to manipulate the operating system cache.
+ */
+ @VisibleForTesting
+ public static class CacheManipulator {
+ public void mlock(String identifier, ByteBuffer buffer,
+ long len) throws IOException {
+ POSIX.mlock(buffer, len);
+ }
+
+ public long getMemlockLimit() {
+ return NativeIO.getMemlockLimit();
+ }
+
+ public long getOperatingSystemPageSize() {
+ return NativeIO.getOperatingSystemPageSize();
+ }
+
+ public void posixFadviseIfPossible(String identifier,
+ FileDescriptor fd, long offset, long len, int flags)
+ throws NativeIOException {
+ NativeIO.POSIX.posixFadviseIfPossible(identifier, fd, offset,
+ len, flags);
+ }
+
+ public boolean verifyCanMlock() {
+ return NativeIO.isAvailable();
+ }
+ }
+
+ /**
+ * A CacheManipulator used for testing which does not actually call mlock.
+ * This allows many tests to be run even when the operating system does not
+ * allow mlock, or only allows limited mlocking.
+ */
+ @VisibleForTesting
+ public static class NoMlockCacheManipulator extends CacheManipulator {
+ public void mlock(String identifier, ByteBuffer buffer,
+ long len) throws IOException {
+ LOG.info("mlocking " + identifier);
+ }
+
+ public long getMemlockLimit() {
+ return 1125899906842624L;
+ }
+
+ public long getOperatingSystemPageSize() {
+ return 4096;
+ }
+
+ public boolean verifyCanMlock() {
+ return true;
+ }
+ }
+
+ static {
+ initNativeLib();
+ }
+
+ /**
+ * Return true if the JNI-based native IO extensions are available.
+ */
+ public static boolean isAvailable() {
+ return NativeCodeLoader.isNativeCodeLoaded() && nativeLoaded;
+ }
+
+ private static void assertCodeLoaded() throws IOException {
+ if (!isAvailable()) {
+ throw new IOException("NativeIO was not loaded");
+ }
+ }
+
+ /** Wrapper around open(2) */
+ public static native FileDescriptor open(String path, int flags, int mode) throws IOException;
+ /** Wrapper around fstat(2) */
+ private static native Stat fstat(FileDescriptor fd) throws IOException;
+
+ /** Native chmod implementation. On UNIX, it is a wrapper around chmod(2) */
+ private static native void chmodImpl(String path, int mode) throws IOException;
+
+ public static void chmod(String path, int mode) throws IOException {
+ if (!RaftUtils.WINDOWS) {
+ chmodImpl(path, mode);
+ } else {
+ try {
+ chmodImpl(path, mode);
+ } catch (NativeIOException nioe) {
+ if (nioe.getErrorCode() == 3) {
+ throw new NativeIOException("No such file or directory",
+ Errno.ENOENT);
+ } else {
+ LOG.warn(String.format("NativeIO.chmod error (%d): %s",
+ nioe.getErrorCode(), nioe.getMessage()));
+ throw new NativeIOException("Unknown error", Errno.UNKNOWN);
+ }
+ }
+ }
+ }
+
+ /** Wrapper around posix_fadvise(2) */
+ static native void posix_fadvise(
+ FileDescriptor fd, long offset, long len, int flags) throws NativeIOException;
+
+ /** Wrapper around sync_file_range(2) */
+ static native void sync_file_range(
+ FileDescriptor fd, long offset, long nbytes, int flags) throws NativeIOException;
+
+ /**
+ * Call posix_fadvise on the given file descriptor. See the manpage
+ * for this syscall for more information. On systems where this
+ * call is not available, does nothing.
+ *
+ * @throws NativeIOException if there is an error with the syscall
+ */
+ static void posixFadviseIfPossible(String identifier,
+ FileDescriptor fd, long offset, long len, int flags)
+ throws NativeIOException {
+ if (nativeLoaded && fadvisePossible) {
+ try {
+ posix_fadvise(fd, offset, len, flags);
+ } catch (UnsatisfiedLinkError ule) {
+ fadvisePossible = false;
+ }
+ }
+ }
+
+ /**
+ * Call sync_file_range on the given file descriptor. See the manpage
+ * for this syscall for more information. On systems where this
+ * call is not available, does nothing.
+ *
+ * @throws NativeIOException if there is an error with the syscall
+ */
+ public static void syncFileRangeIfPossible(
+ FileDescriptor fd, long offset, long nbytes, int flags)
+ throws NativeIOException {
+ if (nativeLoaded && syncFileRangePossible) {
+ try {
+ sync_file_range(fd, offset, nbytes, flags);
+ } catch (UnsupportedOperationException | UnsatisfiedLinkError uoe) {
+ syncFileRangePossible = false;
+ }
+ }
+ }
+
+ static native void mlock_native(
+ ByteBuffer buffer, long len) throws NativeIOException;
+
+ /**
+ * Locks the provided direct ByteBuffer into memory, preventing it from
+ * swapping out. After a buffer is locked, future accesses will not incur
+ * a page fault.
+ *
+ * See the mlock(2) man page for more information.
+ */
+ static void mlock(ByteBuffer buffer, long len)
+ throws IOException {
+ assertCodeLoaded();
+ if (!buffer.isDirect()) {
+ throw new IOException("Cannot mlock a non-direct ByteBuffer");
+ }
+ mlock_native(buffer, len);
+ }
+
+ /**
+ * Unmaps the block from memory. See munmap(2).
+ *
+ * There isn't any portable way to unmap a memory region in Java.
+ * So we use the sun.nio method here.
+ * Note that unmapping a memory region could cause crashes if code
+ * continues to reference the unmapped code. However, if we don't
+ * manually unmap the memory, we are dependent on the finalizer to
+ * do it, and we have no idea when the finalizer will run.
+ *
+ * @param buffer The buffer to unmap.
+ */
+ public static void munmap(MappedByteBuffer buffer) {
+ if (buffer instanceof sun.nio.ch.DirectBuffer) {
+ sun.misc.Cleaner cleaner =
+ ((sun.nio.ch.DirectBuffer)buffer).cleaner();
+ cleaner.clean();
+ }
+ }
+
+ /** Linux only methods used for getOwner() implementation */
+ private static native long getUIDforFDOwnerforOwner(FileDescriptor fd) throws IOException;
+ private static native String getUserName(long uid) throws IOException;
+
+ /**
+ * Result type of the fstat call
+ */
+ public static class Stat {
+ private int ownerId, groupId;
+ private String owner, group;
+ private int mode;
+
+ // Mode constants - Set by JNI
+ public static int S_IFMT = -1; /* type of file */
+ public static int S_IFIFO = -1; /* named pipe (fifo) */
+ public static int S_IFCHR = -1; /* character special */
+ public static int S_IFDIR = -1; /* directory */
+ public static int S_IFBLK = -1; /* block special */
+ public static int S_IFREG = -1; /* regular */
+ public static int S_IFLNK = -1; /* symbolic link */
+ public static int S_IFSOCK = -1; /* socket */
+ public static int S_ISUID = -1; /* set user id on execution */
+ public static int S_ISGID = -1; /* set group id on execution */
+ public static int S_ISVTX = -1; /* save swapped text even after use */
+ public static int S_IRUSR = -1; /* read permission, owner */
+ public static int S_IWUSR = -1; /* write permission, owner */
+ public static int S_IXUSR = -1; /* execute/search permission, owner */
+
+ Stat(int ownerId, int groupId, int mode) {
+ this.ownerId = ownerId;
+ this.groupId = groupId;
+ this.mode = mode;
+ }
+
+ Stat(String owner, String group, int mode) {
+ if (!RaftUtils.WINDOWS) {
+ this.owner = owner;
+ } else {
+ this.owner = stripDomain(owner);
+ }
+ if (!RaftUtils.WINDOWS) {
+ this.group = group;
+ } else {
+ this.group = stripDomain(group);
+ }
+ this.mode = mode;
+ }
+
+ @Override
+ public String toString() {
+ return "Stat(owner='" + owner + "', group='" + group + "'" +
+ ", mode=" + mode + ")";
+ }
+
+ public String getOwner() {
+ return owner;
+ }
+ public String getGroup() {
+ return group;
+ }
+ public int getMode() {
+ return mode;
+ }
+ }
+
+ private static class CachedName {
+ final long timestamp;
+ final String name;
+
+ public CachedName(String name, long timestamp) {
+ this.name = name;
+ this.timestamp = timestamp;
+ }
+ }
+
+ public final static int MMAP_PROT_READ = 0x1;
+ public final static int MMAP_PROT_WRITE = 0x2;
+ public final static int MMAP_PROT_EXEC = 0x4;
+
+ public static native long mmap(FileDescriptor fd, int prot,
+ boolean shared, long length) throws IOException;
+
+ public static native void munmap(long addr, long length)
+ throws IOException;
+ }
+
+ private static boolean workaroundNonThreadSafePasswdCalls = false;
+
+
+ public static class Windows {
+ // Flags for CreateFile() call on Windows
+ public static final long GENERIC_READ = 0x80000000L;
+ public static final long GENERIC_WRITE = 0x40000000L;
+
+ public static final long FILE_SHARE_READ = 0x00000001L;
+ public static final long FILE_SHARE_WRITE = 0x00000002L;
+ public static final long FILE_SHARE_DELETE = 0x00000004L;
+
+ public static final long CREATE_NEW = 1;
+ public static final long CREATE_ALWAYS = 2;
+ public static final long OPEN_EXISTING = 3;
+ public static final long OPEN_ALWAYS = 4;
+ public static final long TRUNCATE_EXISTING = 5;
+
+ public static final long FILE_BEGIN = 0;
+ public static final long FILE_CURRENT = 1;
+ public static final long FILE_END = 2;
+
+ public static final long FILE_ATTRIBUTE_NORMAL = 0x00000080L;
+
+ /**
+ * Create a directory with permissions set to the specified mode. By setting
+ * permissions at creation time, we avoid issues related to the user lacking
+ * WRITE_DAC rights on subsequent chmod calls. One example where this can
+ * occur is writing to an SMB share where the user does not have Full Control
+ * rights, and therefore WRITE_DAC is denied.
+ *
+ * @param path directory to create
+ * @param mode permissions of new directory
+ * @throws IOException if there is an I/O error
+ */
+ public static void createDirectoryWithMode(File path, int mode)
+ throws IOException {
+ createDirectoryWithMode0(path.getAbsolutePath(), mode);
+ }
+
+ /** Wrapper around CreateDirectory() on Windows */
+ private static native void createDirectoryWithMode0(String path, int mode)
+ throws NativeIOException;
+
+ /** Wrapper around CreateFile() on Windows */
+ public static native FileDescriptor createFile(String path,
+ long desiredAccess, long shareMode, long creationDisposition)
+ throws IOException;
+
+ /**
+ * Create a file for write with permissions set to the specified mode. By
+ * setting permissions at creation time, we avoid issues related to the user
+ * lacking WRITE_DAC rights on subsequent chmod calls. One example where
+ * this can occur is writing to an SMB share where the user does not have
+ * Full Control rights, and therefore WRITE_DAC is denied.
+ *
+ * This method mimics the semantics implemented by the JDK in
+ * {@link FileOutputStream}. The file is opened for truncate or
+ * append, the sharing mode allows other readers and writers, and paths
+ * longer than MAX_PATH are supported. (See io_util_md.c in the JDK.)
+ *
+ * @param path file to create
+ * @param append if true, then open file for append
+ * @param mode permissions of new directory
+ * @return FileOutputStream of opened file
+ * @throws IOException if there is an I/O error
+ */
+ public static FileOutputStream createFileOutputStreamWithMode(File path,
+ boolean append, int mode) throws IOException {
+ long shareMode = FILE_SHARE_READ | FILE_SHARE_WRITE;
+ long creationDisposition = append ? OPEN_ALWAYS : CREATE_ALWAYS;
+ return new FileOutputStream(createFileWithMode0(path.getAbsolutePath(),
+ GENERIC_WRITE, shareMode, creationDisposition, mode));
+ }
+
+ /** Wrapper around CreateFile() with security descriptor on Windows */
+ private static native FileDescriptor createFileWithMode0(String path,
+ long desiredAccess, long shareMode, long creationDisposition, int mode)
+ throws NativeIOException;
+
+ /** Wrapper around SetFilePointer() on Windows */
+ public static native long setFilePointer(FileDescriptor fd,
+ long distanceToMove, long moveMethod) throws IOException;
+
+ /** Windows only methods used for getOwner() implementation */
+ private static native String getOwner(FileDescriptor fd) throws IOException;
+
+ /** Supported list of Windows access right flags */
+ public enum AccessRight {
+ ACCESS_READ (0x0001), // FILE_READ_DATA
+ ACCESS_WRITE (0x0002), // FILE_WRITE_DATA
+ ACCESS_EXECUTE (0x0020); // FILE_EXECUTE
+
+ private final int accessRight;
+ AccessRight(int access) {
+ accessRight = access;
+ }
+
+ public int accessRight() {
+ return accessRight;
+ }
+ }
+
+ /** Windows only method used to check if the current process has requested
+ * access rights on the given path. */
+ private static native boolean access0(String path, int requestedAccess);
+
+ /**
+ * Checks whether the current process has desired access rights on
+ * the given path.
+ *
+ * Longer term this native function can be substituted with JDK7
+ * function Files#isReadable, isWritable, isExecutable.
+ *
+ * @param path input path
+ * @param desiredAccess ACCESS_READ, ACCESS_WRITE or ACCESS_EXECUTE
+ * @return true if access is allowed
+ * @throws IOException I/O exception on error
+ */
+ public static boolean access(String path, AccessRight desiredAccess)
+ throws IOException {
+ return access0(path, desiredAccess.accessRight());
+ }
+
+ /**
+ * Extends both the minimum and maximum working set size of the current
+ * process. This method gets the current minimum and maximum working set
+ * size, adds the requested amount to each and then sets the minimum and
+ * maximum working set size to the new values. Controlling the working set
+ * size of the process also controls the amount of memory it can lock.
+ *
+ * @param delta amount to increment minimum and maximum working set size
+ * @throws IOException for any error
+ * @see POSIX#mlock(ByteBuffer, long)
+ */
+ public static native void extendWorkingSetSize(long delta) throws IOException;
+
+ static {
+ initNativeLib();
+ }
+ }
+
+ private static boolean nativeLoaded = false;
+
+ static {
+ initNativeLib();
+ }
+
+ private static void initNativeLib() {
+ if (NativeCodeLoader.isNativeCodeLoaded()) {
+ try {
+ initNative();
+ nativeLoaded = true;
+ } catch (Throwable t) {
+ LOG.debug("Unable to initialize NativeIO libraries", t);
+ }
+ }
+ }
+
+ /**
+ * Return true if the JNI-based native IO extensions are available.
+ */
+ public static boolean isAvailable() {
+ return NativeCodeLoader.isNativeCodeLoaded() && nativeLoaded;
+ }
+
+ /** Initialize the JNI method ID and class ID cache */
+ private static native void initNative();
+
+ /**
+ * Get the maximum number of bytes that can be locked into memory at any
+ * given point.
+ *
+ * @return 0 if no bytes can be locked into memory;
+ * Long.MAX_VALUE if there is no limit;
+ * The number of bytes that can be locked into memory otherwise.
+ */
+ static long getMemlockLimit() {
+ return isAvailable() ? getMemlockLimit0() : 0;
+ }
+
+ private static native long getMemlockLimit0();
+
+ /**
+ * @return the operating system's page size.
+ */
+ static long getOperatingSystemPageSize() {
+ try {
+ Field f = Unsafe.class.getDeclaredField("theUnsafe");
+ f.setAccessible(true);
+ Unsafe unsafe = (Unsafe)f.get(null);
+ return unsafe.pageSize();
+ } catch (Throwable e) {
+ LOG.warn("Unable to get operating system page size. Guessing 4096.", e);
+ return 4096;
+ }
+ }
+
+ private static class CachedUid {
+ final long timestamp;
+ final String username;
+ public CachedUid(String username, long timestamp) {
+ this.timestamp = timestamp;
+ this.username = username;
+ }
+ }
+
+ private static boolean initialized = false;
+
+ /**
+ * The Windows logon name has two part, NetBIOS domain name and
+ * user account name, of the format DOMAIN\UserName. This method
+ * will remove the domain part of the full logon name.
+ *
+ * @param name full principal name containing the domain
+ * @return name with domain removed
+ */
+ private static String stripDomain(String name) {
+ int i = name.indexOf('\\');
+ if (i != -1)
+ name = name.substring(i + 1);
+ return name;
+ }
+
+ /**
+ * Create a FileInputStream that shares delete permission on the
+ * file opened, i.e. other process can delete the file the
+ * FileInputStream is reading. Only Windows implementation uses
+ * the native interface.
+ */
+ public static FileInputStream getShareDeleteFileInputStream(File f)
+ throws IOException {
+ if (!RaftUtils.WINDOWS) {
+ // On Linux the default FileInputStream shares delete permission
+ // on the file opened.
+ //
+ return new FileInputStream(f);
+ } else {
+ // Use Windows native interface to create a FileInputStream that
+ // shares delete permission on the file opened.
+ //
+ FileDescriptor fd = Windows.createFile(
+ f.getAbsolutePath(),
+ Windows.GENERIC_READ,
+ Windows.FILE_SHARE_READ |
+ Windows.FILE_SHARE_WRITE |
+ Windows.FILE_SHARE_DELETE,
+ Windows.OPEN_EXISTING);
+ return new FileInputStream(fd);
+ }
+ }
+
+ /**
+ * Create a FileInputStream that shares delete permission on the
+ * file opened at a given offset, i.e. other process can delete
+ * the file the FileInputStream is reading. Only Windows implementation
+ * uses the native interface.
+ */
+ public static FileInputStream getShareDeleteFileInputStream(File f, long seekOffset)
+ throws IOException {
+ if (!RaftUtils.WINDOWS) {
+ RandomAccessFile rf = new RandomAccessFile(f, "r");
+ if (seekOffset > 0) {
+ rf.seek(seekOffset);
+ }
+ return new FileInputStream(rf.getFD());
+ } else {
+ // Use Windows native interface to create a FileInputStream that
+ // shares delete permission on the file opened, and set it to the
+ // given offset.
+ //
+ FileDescriptor fd = NativeIO.Windows.createFile(
+ f.getAbsolutePath(),
+ NativeIO.Windows.GENERIC_READ,
+ NativeIO.Windows.FILE_SHARE_READ |
+ NativeIO.Windows.FILE_SHARE_WRITE |
+ NativeIO.Windows.FILE_SHARE_DELETE,
+ NativeIO.Windows.OPEN_EXISTING);
+ if (seekOffset > 0)
+ NativeIO.Windows.setFilePointer(fd, seekOffset, NativeIO.Windows.FILE_BEGIN);
+ return new FileInputStream(fd);
+ }
+ }
+
+ /**
+ * Create the specified File for write access, ensuring that it does not exist.
+ * @param f the file that we want to create
+ * @param permissions we want to have on the file (if security is enabled)
+ *
+ * @throws AlreadyExistsException if the file already exists
+ * @throws IOException if any other error occurred
+ */
+ public static FileOutputStream getCreateForWriteFileOutputStream(File f, int permissions)
+ throws IOException {
+ if (!RaftUtils.WINDOWS) {
+ // Use the native wrapper around open(2)
+ try {
+ FileDescriptor fd = NativeIO.POSIX.open(f.getAbsolutePath(),
+ NativeIO.POSIX.O_WRONLY | NativeIO.POSIX.O_CREAT
+ | NativeIO.POSIX.O_EXCL, permissions);
+ return new FileOutputStream(fd);
+ } catch (NativeIOException nioe) {
+ if (nioe.getErrno() == Errno.EEXIST) {
+ throw new AlreadyExistsException(nioe);
+ }
+ throw nioe;
+ }
+ } else {
+ // Use the Windows native APIs to create equivalent FileOutputStream
+ try {
+ FileDescriptor fd = NativeIO.Windows.createFile(f.getCanonicalPath(),
+ NativeIO.Windows.GENERIC_WRITE,
+ NativeIO.Windows.FILE_SHARE_DELETE
+ | NativeIO.Windows.FILE_SHARE_READ
+ | NativeIO.Windows.FILE_SHARE_WRITE,
+ NativeIO.Windows.CREATE_NEW);
+ NativeIO.POSIX.chmod(f.getCanonicalPath(), permissions);
+ return new FileOutputStream(fd);
+ } catch (NativeIOException nioe) {
+ if (nioe.getErrorCode() == 80) {
+ // ERROR_FILE_EXISTS
+ // 80 (0x50)
+ // The file exists
+ throw new AlreadyExistsException(nioe);
+ }
+ throw nioe;
+ }
+ }
+ }
+
+ /**
+ * A version of renameTo that throws a descriptive exception when it fails.
+ *
+ * @param src The source path
+ * @param dst The destination path
+ *
+ * @throws NativeIOException On failure.
+ */
+ public static void renameTo(File src, File dst)
+ throws IOException {
+ if (!nativeLoaded) {
+ if (!src.renameTo(dst)) {
+ throw new IOException("renameTo(src=" + src + ", dst=" +
+ dst + ") failed.");
+ }
+ } else {
+ renameTo0(src.getAbsolutePath(), dst.getAbsolutePath());
+ }
+ }
+
+ /**
+ * A version of renameTo that throws a descriptive exception when it fails.
+ *
+ * @param src The source path
+ * @param dst The destination path
+ *
+ * @throws NativeIOException On failure.
+ */
+ private static native void renameTo0(String src, String dst)
+ throws NativeIOException;
+
+ private static native void link0(String src, String dst)
+ throws NativeIOException;
+
+ /**
+ * Unbuffered file copy from src to dst without tainting OS buffer cache
+ *
+ * In POSIX platform:
+ * It uses FileChannel#transferTo() which internally attempts
+ * unbuffered IO on OS with native sendfile64() support and falls back to
+ * buffered IO otherwise.
+ *
+ * It minimizes the number of FileChannel#transferTo call by passing the the
+ * src file size directly instead of a smaller size as the 3rd parameter.
+ * This saves the number of sendfile64() system call when native sendfile64()
+ * is supported. In the two fall back cases where sendfile is not supported,
+ * FileChannle#transferTo already has its own batching of size 8 MB and 8 KB,
+ * respectively.
+ *
+ * In Windows Platform:
+ * It uses its own native wrapper of CopyFileEx with COPY_FILE_NO_BUFFERING
+ * flag, which is supported on Windows Server 2008 and above.
+ *
+ * Ideally, we should use FileChannel#transferTo() across both POSIX and Windows
+ * platform. Unfortunately, the wrapper(Java_sun_nio_ch_FileChannelImpl_transferTo0)
+ * used by FileChannel#transferTo for unbuffered IO is not implemented on Windows.
+ * Based on OpenJDK 6/7/8 source code, Java_sun_nio_ch_FileChannelImpl_transferTo0
+ * on Windows simply returns IOS_UNSUPPORTED.
+ *
+ * Note: This simple native wrapper does minimal parameter checking before copy and
+ * consistency check (e.g., size) after copy.
+ * It is recommended to use wrapper function like
+ * the Storage#nativeCopyFileUnbuffered() function with pre/post copy checks.
+ *
+ * @param src The source path
+ * @param dst The destination path
+ */
+ public static void copyFileUnbuffered(File src, File dst) throws IOException {
+ if (nativeLoaded && RaftUtils.WINDOWS) {
+ copyFileUnbuffered0(src.getAbsolutePath(), dst.getAbsolutePath());
+ } else {
+ FileInputStream fis = null;
+ FileOutputStream fos = null;
+ FileChannel input = null;
+ FileChannel output = null;
+ try {
+ fis = new FileInputStream(src);
+ fos = new FileOutputStream(dst);
+ input = fis.getChannel();
+ output = fos.getChannel();
+ long remaining = input.size();
+ long position = 0;
+ long transferred;
+ while (remaining > 0) {
+ transferred = input.transferTo(position, remaining, output);
+ remaining -= transferred;
+ position += transferred;
+ }
+ } finally {
+ RaftUtils.cleanup(LOG, output, fos, input, fis);
+ }
+ }
+ }
+
+ private static native void copyFileUnbuffered0(String src, String dst)
+ throws NativeIOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/io/nativeio/NativeIOException.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/io/nativeio/NativeIOException.java b/ratis-common/src/main/java/org/apache/ratis/io/nativeio/NativeIOException.java
new file mode 100644
index 0000000..58b83e7
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/io/nativeio/NativeIOException.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.io.nativeio;
+
+import java.io.IOException;
+
+import org.apache.ratis.util.RaftUtils;
+
+
+/**
+ * An exception generated by a call to the native IO code.
+ *
+ * These exceptions simply wrap <i>errno</i> result codes on Linux,
+ * or the System Error Code on Windows.
+ */
+public class NativeIOException extends IOException {
+ private static final long serialVersionUID = 1L;
+
+ private Errno errno;
+
+ // Java has no unsigned primitive error code. Use a signed 32-bit
+ // integer to hold the unsigned 32-bit integer.
+ private int errorCode;
+
+ public NativeIOException(String msg, Errno errno) {
+ super(msg);
+ this.errno = errno;
+ // Windows error code is always set to ERROR_SUCCESS on Linux,
+ // i.e. no failure on Windows
+ this.errorCode = 0;
+ }
+
+ public NativeIOException(String msg, int errorCode) {
+ super(msg);
+ this.errorCode = errorCode;
+ this.errno = Errno.UNKNOWN;
+ }
+
+ public long getErrorCode() {
+ return errorCode;
+ }
+
+ public Errno getErrno() {
+ return errno;
+ }
+
+ @Override
+ public String toString() {
+ if (RaftUtils.WINDOWS)
+ return errorCode + ": " + super.getMessage();
+ else
+ return errno.toString() + ": " + super.getMessage();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/AlreadyExistsException.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/AlreadyExistsException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/AlreadyExistsException.java
new file mode 100644
index 0000000..cc441f2
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/AlreadyExistsException.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+import java.io.IOException;
+
+/**
+ * Signals that an attempt to create a file at a given pathname has failed
+ * because another file already existed at that path.
+ */
+public class AlreadyExistsException extends IOException {
+ private static final long serialVersionUID = 1L;
+
+ public AlreadyExistsException(String msg) {
+ super(msg);
+ }
+
+ public AlreadyExistsException(Throwable cause) {
+ super(cause);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/ChecksumException.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/ChecksumException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/ChecksumException.java
new file mode 100644
index 0000000..1742c24
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/ChecksumException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.protocol;
+
+import java.io.IOException;
+
+/** Thrown for checksum errors. */
+public class ChecksumException extends IOException {
+ private static final long serialVersionUID = 1L;
+ private long pos;
+ public ChecksumException(String description, long pos) {
+ super(description);
+ this.pos = pos;
+ }
+
+ public long getPos() {
+ return pos;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java b/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java
new file mode 100644
index 0000000..77ef267
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+
+/**
+ * The information clients append to the raft ring.
+ */
+public interface Message {
+ /**
+ * @return the content of the message
+ */
+ ByteString getContent();
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/NotLeaderException.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/NotLeaderException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/NotLeaderException.java
new file mode 100644
index 0000000..1306290
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/NotLeaderException.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+public class NotLeaderException extends RaftException {
+ private final RaftPeer suggestedLeader;
+ /** the client may need to update its RaftPeer list */
+ private final RaftPeer[] peers;
+
+ public NotLeaderException(String id, RaftPeer suggestedLeader,
+ RaftPeer[] peers) {
+ super("Server " + id + " is not the leader (" + suggestedLeader
+ + "). Request must be sent to leader.");
+ this.suggestedLeader = suggestedLeader;
+ this.peers = peers == null ? RaftPeer.EMPTY_PEERS : peers;
+ }
+
+ public RaftPeer getSuggestedLeader() {
+ return suggestedLeader;
+ }
+
+ public RaftPeer[] getPeers() {
+ return peers;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java
new file mode 100644
index 0000000..3298431
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+/** Asynchronous version of {@link RaftClientProtocol}. */
+public interface RaftClientAsynchronousProtocol {
+ CompletableFuture<RaftClientReply> submitClientRequestAsync(
+ RaftClientRequest request) throws IOException;
+
+ CompletableFuture<RaftClientReply> setConfigurationAsync(
+ SetConfigurationRequest request) throws IOException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientProtocol.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientProtocol.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientProtocol.java
new file mode 100644
index 0000000..b3cbcc3
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientProtocol.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+import java.io.IOException;
+
+public interface RaftClientProtocol {
+ RaftClientReply submitClientRequest(RaftClientRequest request) throws IOException;
+
+ RaftClientReply setConfiguration(SetConfigurationRequest request) throws IOException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
new file mode 100644
index 0000000..8c5cd75
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+public class RaftClientReply extends RaftRpcMessage {
+ private final String requestorId;
+ private final String replierId;
+ private final boolean success;
+ private final long seqNum;
+
+ /** non-null if the server is not leader */
+ private final NotLeaderException notLeaderException;
+ private final Message message;
+
+ public RaftClientReply(String requestorId, String replierId, long seqNum,
+ boolean success, Message message, NotLeaderException notLeaderException) {
+ this.requestorId = requestorId;
+ this.replierId = replierId;
+ this.success = success;
+ this.seqNum = seqNum;
+ this.message = message;
+ this.notLeaderException = notLeaderException;
+ }
+
+ public RaftClientReply(RaftClientRequest request,
+ NotLeaderException notLeaderException) {
+ this(request.getRequestorId(), request.getReplierId(), request.getSeqNum(),
+ false, null, notLeaderException);
+ }
+
+ public RaftClientReply(RaftClientRequest request, Message message) {
+ this(request.getRequestorId(), request.getReplierId(), request.getSeqNum(),
+ true, message, null);
+ }
+
+ @Override
+ public final boolean isRequest() {
+ return false;
+ }
+
+ @Override
+ public String getRequestorId() {
+ return requestorId;
+ }
+
+ @Override
+ public String getReplierId() {
+ return replierId;
+ }
+
+ public long getSeqNum() {
+ return seqNum;
+ }
+
+ @Override
+ public String toString() {
+ return super.toString() + ", seqNum: " + getSeqNum()
+ + ", success: " + isSuccess();
+ }
+
+ public boolean isSuccess() {
+ return success;
+ }
+
+ public Message getMessage() {
+ return message;
+ }
+
+ public NotLeaderException getNotLeaderException() {
+ return notLeaderException;
+ }
+
+ public boolean isNotLeader() {
+ return notLeaderException != null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
new file mode 100644
index 0000000..90b648a
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+public class RaftClientRequest extends RaftRpcMessage {
+ private final String requestorId;
+ private final String replierId;
+ private final long seqNum;
+ private final Message message;
+ private final boolean readOnly;
+
+ public RaftClientRequest(String requestorId, String replierId, long seqNum,
+ Message message) {
+ this(requestorId, replierId, seqNum, message, false);
+ }
+
+ public RaftClientRequest(String requestorId, String replierId, long seqNum,
+ Message message, boolean readOnly) {
+ this.requestorId = requestorId;
+ this.replierId = replierId;
+ this.seqNum = seqNum;
+ this.message = message;
+ this.readOnly = readOnly;
+ }
+
+ @Override
+ public final boolean isRequest() {
+ return true;
+ }
+
+ @Override
+ public String getRequestorId() {
+ return requestorId;
+ }
+
+ @Override
+ public String getReplierId() {
+ return replierId;
+ }
+
+ public long getSeqNum() {
+ return seqNum;
+ }
+
+ public Message getMessage() {
+ return message;
+ }
+
+ public boolean isReadOnly() {
+ return readOnly;
+ }
+
+ @Override
+ public String toString() {
+ return super.toString() + ", seqNum: " + seqNum + ", "
+ + (isReadOnly()? "RO": "RW");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/RaftException.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftException.java
new file mode 100644
index 0000000..11aac90
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftException.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+import java.io.IOException;
+
+public class RaftException extends IOException {
+ private static final long serialVersionUID = 1L;
+
+ public RaftException(String message) {
+ super(message);
+ }
+
+ public RaftException(Throwable cause) {
+ super(cause);
+ }
+
+ public RaftException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java
new file mode 100644
index 0000000..a32aaa0
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftPeer.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+import com.google.common.net.HostAndPort;
+
+import java.net.InetSocketAddress;
+
+/**
+ * A {@link RaftPeer} is a server in a Raft cluster.
+ *
+ * The objects of this class are immutable.
+ */
+public class RaftPeer {
+ public static final RaftPeer[] EMPTY_PEERS = {};
+
+ /** The id of the peer. */
+ private final String id;
+ /** The address of the peer. */
+ private final String address;
+
+ /** Construct a peer with the given id and a null address. */
+ public RaftPeer(String id) {
+ this(id, (String)null);
+ }
+
+ /** Construct a peer with the given id and address. */
+ public RaftPeer(String id, InetSocketAddress address) {
+ this(id, address == null ? null :
+ HostAndPort.fromParts(address.getAddress().getHostAddress(),
+ address.getPort()).toString());
+ }
+
+ /** Construct a peer with the given id and address. */
+ public RaftPeer(String id, String address) {
+ this.id = id;
+ this.address = address;
+ }
+
+ /** @return The id of the peer. */
+ public String getId() {
+ return id;
+ }
+
+ /** @return The address of the peer. */
+ public String getAddress() {
+ return address;
+ }
+
+ @Override
+ public String toString() {
+ return id + ":" + address;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return (o instanceof RaftPeer) && id.equals(((RaftPeer) o).getId());
+ }
+
+ @Override
+ public int hashCode() {
+ return id.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRpcMessage.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRpcMessage.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRpcMessage.java
new file mode 100644
index 0000000..82f1ebb
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftRpcMessage.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+public abstract class RaftRpcMessage {
+
+ public abstract boolean isRequest();
+
+ public abstract String getRequestorId();
+
+ public abstract String getReplierId();
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "(" + getRequestorId()
+ + (isRequest()? "->": "<-") + getReplierId() + ")";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/ReconfigurationInProgressException.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/ReconfigurationInProgressException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/ReconfigurationInProgressException.java
new file mode 100644
index 0000000..3d0f093
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/ReconfigurationInProgressException.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+public class ReconfigurationInProgressException extends RaftException {
+ public ReconfigurationInProgressException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/ReconfigurationTimeoutException.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/ReconfigurationTimeoutException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/ReconfigurationTimeoutException.java
new file mode 100644
index 0000000..69a2e51
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/ReconfigurationTimeoutException.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+public class ReconfigurationTimeoutException extends RaftException {
+ public ReconfigurationTimeoutException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
new file mode 100644
index 0000000..84449d4
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+import java.util.Arrays;
+
+public class SetConfigurationRequest extends RaftClientRequest {
+ private final RaftPeer[] peers;
+
+ public SetConfigurationRequest(String requestorId, String replierId,
+ long seqNum, RaftPeer[] peers) {
+ super(requestorId, replierId, seqNum, null);
+ this.peers = peers;
+ }
+
+ public RaftPeer[] getPeersInNewConf() {
+ return peers;
+ }
+
+ @Override
+ public String toString() {
+ return super.toString() + ", peers:" + Arrays.asList(getPeersInNewConf());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java
new file mode 100644
index 0000000..099133d
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+public class StateMachineException extends RaftException {
+ public StateMachineException(String serverId, Exception cause) {
+ super(cause.getClass().getName() + " from Server " + serverId, cause);
+ }
+
+ public StateMachineException(String msg) {
+ super(msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/util/AtomicFileOutputStream.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/AtomicFileOutputStream.java b/ratis-common/src/main/java/org/apache/ratis/util/AtomicFileOutputStream.java
new file mode 100644
index 0000000..e8e267e
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/AtomicFileOutputStream.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.apache.ratis.io.nativeio.NativeIO;
+import org.apache.ratis.io.nativeio.NativeIOException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+
+/**
+ * A FileOutputStream that has the property that it will only show
+ * up at its destination once it has been entirely written and flushed
+ * to disk. While being written, it will use a .tmp suffix.
+ *
+ * When the output stream is closed, it is flushed, fsynced, and
+ * will be moved into place, overwriting any file that already
+ * exists at that location.
+ *
+ * <b>NOTE</b>: on Windows platforms, it will not atomically
+ * replace the target file - instead the target file is deleted
+ * before this one is moved into place.
+ */
+public class AtomicFileOutputStream extends FilterOutputStream {
+
+ public static final String TMP_EXTENSION = ".tmp";
+
+ public static final Logger LOG = LoggerFactory.getLogger(AtomicFileOutputStream.class);
+
+ private final File origFile;
+ private final File tmpFile;
+
+ public AtomicFileOutputStream(File f) throws FileNotFoundException {
+ // Code unfortunately must be duplicated below since we can't assign anything
+ // before calling super
+ super(new FileOutputStream(new File(f.getParentFile(), f.getName() + TMP_EXTENSION)));
+ origFile = f.getAbsoluteFile();
+ tmpFile = new File(f.getParentFile(), f.getName() + TMP_EXTENSION).getAbsoluteFile();
+ }
+
+ @Override
+ public void close() throws IOException {
+ boolean triedToClose = false, success = false;
+ try {
+ flush();
+ ((FileOutputStream)out).getChannel().force(true);
+
+ triedToClose = true;
+ super.close();
+ success = true;
+ } finally {
+ if (success) {
+ boolean renamed = tmpFile.renameTo(origFile);
+ if (!renamed) {
+ // On windows, renameTo does not replace.
+ if (origFile.exists() && !origFile.delete()) {
+ throw new IOException("Could not delete original file " + origFile);
+ }
+ try {
+ NativeIO.renameTo(tmpFile, origFile);
+ } catch (NativeIOException e) {
+ throw new IOException("Could not rename temporary file " + tmpFile
+ + " to " + origFile + " due to failure in native rename. "
+ + e.toString());
+ }
+ }
+ } else {
+ if (!triedToClose) {
+ // If we failed when flushing, try to close it to not leak an FD
+ RaftUtils.cleanup(LOG, out);
+ }
+ // close wasn't successful, try to delete the tmp file
+ if (!tmpFile.delete()) {
+ LOG.warn("Unable to delete tmp file " + tmpFile);
+ }
+ }
+ }
+ }
+
+ /**
+ * Close the atomic file, but do not "commit" the temporary file
+ * on top of the destination. This should be used if there is a failure
+ * in writing.
+ */
+ public void abort() {
+ try {
+ super.close();
+ } catch (IOException ioe) {
+ LOG.warn("Unable to abort file " + tmpFile, ioe);
+ }
+ if (!tmpFile.delete()) {
+ LOG.warn("Unable to delete tmp file during abort " + tmpFile);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/util/AutoCloseableLock.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/AutoCloseableLock.java b/ratis-common/src/main/java/org/apache/ratis/util/AutoCloseableLock.java
new file mode 100644
index 0000000..489b5cd
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/AutoCloseableLock.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Wrap a lock with the {@link AutoCloseable} interface
+ * so that the {@link #close()} method will unlock the lock.
+ */
+public class AutoCloseableLock implements AutoCloseable {
+ /**
+ * Acquire the given lock and then wrap it with {@link AutoCloseableLock}
+ * so that the given lock can be released by calling {@link #close()},
+ * or by using a {@code try}-with-resources statement as shown below.
+ *
+ * <pre> {@code
+ * try(AutoCloseableLock acl = AutoCloseableLock.acquire(lock)) {
+ * ...
+ * }}</pre>
+ */
+ public static AutoCloseableLock acquire(final Lock lock) {
+ lock.lock();
+ return new AutoCloseableLock(lock);
+ }
+
+ private final Lock underlying;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ private AutoCloseableLock(Lock underlying) {
+ this.underlying = underlying;
+ }
+
+ /** Unlock the underlying lock. This method is idempotent. */
+ @Override
+ public void close() {
+ if (closed.compareAndSet(false, true)) {
+ underlying.unlock();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/util/CheckedFunction.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/CheckedFunction.java b/ratis-common/src/main/java/org/apache/ratis/util/CheckedFunction.java
new file mode 100644
index 0000000..4badc66
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/CheckedFunction.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import java.util.function.Function;
+
+/** Function with a throws-clause. */
+@FunctionalInterface
+public interface CheckedFunction<INPUT, OUTPUT, THROWABLE extends Throwable> {
+ /**
+ * The same as {@link Function#apply(Object)}
+ * except that this method is declared with a throws-clause.
+ */
+ OUTPUT apply(INPUT input) throws THROWABLE;
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/util/CheckedRunnable.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/CheckedRunnable.java b/ratis-common/src/main/java/org/apache/ratis/util/CheckedRunnable.java
new file mode 100644
index 0000000..b6e90b9
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/CheckedRunnable.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+/** Runnable with a throws-clause. */
+@FunctionalInterface
+public interface CheckedRunnable<THROWABLE extends Throwable> {
+ /**
+ * The same as {@link Runnable#run()}
+ * except that this method is declared with a throws-clause.
+ */
+ void run() throws THROWABLE;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/util/CodeInjectionForTesting.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/CodeInjectionForTesting.java b/ratis-common/src/main/java/org/apache/ratis/util/CodeInjectionForTesting.java
new file mode 100644
index 0000000..60b5daf
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/CodeInjectionForTesting.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** Inject code for testing. */
+public class CodeInjectionForTesting {
+ public static final Logger LOG = LoggerFactory.getLogger(CodeInjectionForTesting.class);
+
+ /** Code to be injected. */
+ public interface Code {
+ Logger LOG = CodeInjectionForTesting.LOG;
+
+ /**
+ * Execute the injected code for testing.
+ * @param localId the id of the local peer
+ * @param remoteId the id of the remote peer if handling a request
+ * @param args other possible args
+ * @return if the injected code is executed
+ */
+ boolean execute(String localId, String remoteId, Object... args);
+ }
+
+ private static final Map<String, Code> INJECTION_POINTS
+ = new ConcurrentHashMap<>();
+
+ /** Put an injection point. */
+ public static void put(String injectionPoint, Code code) {
+ LOG.debug("put: {}, {}", injectionPoint, code);
+ INJECTION_POINTS.put(injectionPoint, code);
+ }
+
+ /** Execute the injected code, if there is any. */
+ public static boolean execute(String injectionPoint, String localId,
+ String remoteId, Object... args) {
+ final Code code = INJECTION_POINTS.get(injectionPoint);
+ if (code == null) {
+ return false;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("execute: {}, {}, localId={}, remoteId={}, args={}",
+ injectionPoint, code, localId, remoteId, Arrays.toString(args));
+ }
+ return code.execute(localId, remoteId, args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java b/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java
new file mode 100644
index 0000000..1ef95ae
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/Daemon.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+public class Daemon extends Thread {
+ {
+ setDaemon(true);
+ }
+
+ /** Construct a daemon thread. */
+ public Daemon() {
+ super();
+ }
+
+ /** Construct a daemon thread with the given runnable. */
+ public Daemon(Runnable runnable) {
+ super(runnable);
+ this.setName(runnable.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/util/ExitUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ExitUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ExitUtils.java
new file mode 100644
index 0000000..4404344
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ExitUtils.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.slf4j.Logger;
+
+/** Facilitates hooking process termination for tests and debugging. */
+public class ExitUtils {
+ public static class ExitException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public final int status;
+
+ public ExitException(int status, String message, Throwable throwable) {
+ super(message, throwable);
+ this.status = status;
+ }
+ }
+
+ private static volatile boolean systemExitDisabled = false;
+ private static volatile ExitException firstExitException;
+
+ /**
+ * @return the first {@link ExitException} thrown, or null if none thrown yet.
+ */
+ public static ExitException getFirstExitException() {
+ return firstExitException;
+ }
+
+ /**
+ * Reset the tracking of process termination.
+ * This is useful when some tests expect an exit but the others do not.
+ */
+ public static void resetFirstExitException() {
+ firstExitException = null;
+ }
+
+ /** @return true if {@link #terminate(int, String, Throwable, Logger)} has been invoked. */
+ public static boolean isTerminated() {
+ // Either this member is set or System.exit is actually invoked.
+ return firstExitException != null;
+ }
+
+ /** Disable the use of {@link System#exit(int)} for testing. */
+ public static void disableSystemExit() {
+ systemExitDisabled = true;
+ }
+
+ /**
+ * Terminate the current process. Note that terminate is the *only* method
+ * that should be used to terminate the daemon processes.
+ *
+ * @param status Exit status
+ * @param message message used to create the {@code ExitException}
+ * @throws ExitException if System.exit is disabled for test purposes
+ */
+ public static void terminate(
+ int status, String message, Throwable throwable, Logger log)
+ throws ExitException {
+ if (log != null) {
+ final String s = "Terminating with exit status " + status + ": " + message;
+ if (status == 0) {
+ log.info(s, throwable);
+ } else {
+ log.error(s, throwable);
+ }
+ }
+
+ if (!systemExitDisabled) {
+ System.exit(status);
+ }
+
+ final ExitException ee = new ExitException(status, message, throwable);
+ if (firstExitException == null) {
+ firstExitException = ee;
+ }
+ throw ee;
+ }
+
+ public static void terminate(int status, String message, Logger log) {
+ terminate(status, message, null, log);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7e71a2e0/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java
new file mode 100644
index 0000000..5dc509d
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.apache.ratis.io.nativeio.NativeIO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+
+public class FileUtils {
+ public static final Logger LOG = LoggerFactory.getLogger(FileUtils.class);
+
+ public static void truncateFile(File f, long target) throws IOException {
+ try (FileOutputStream out = new FileOutputStream(f, true)) {
+ out.getChannel().truncate(target);
+ }
+ }
+
+ public static void deleteFile(File f) throws IOException {
+ try {
+ Files.delete(f.toPath());
+ } catch (IOException e) {
+ LOG.warn("Could not delete " + f);
+ throw e;
+ }
+ }
+
+ /**
+ * Delete a directory and all its contents. If
+ * we return false, the directory may be partially-deleted.
+ * (1) If dir is symlink to a file, the symlink is deleted. The file pointed
+ * to by the symlink is not deleted.
+ * (2) If dir is symlink to a directory, symlink is deleted. The directory
+ * pointed to by symlink is not deleted.
+ * (3) If dir is a normal file, it is deleted.
+ * (4) If dir is a normal directory, then dir and all its contents recursively
+ * are deleted.
+ */
+ public static boolean fullyDelete(final File dir) {
+ if (deleteImpl(dir, false)) {
+ // dir is (a) normal file, (b) symlink to a file, (c) empty directory or
+ // (d) symlink to a directory
+ return true;
+ }
+ // handle nonempty directory deletion
+ return fullyDeleteContents(dir) && deleteImpl(dir, true);
+ }
+
+ private static boolean deleteImpl(final File f, final boolean doLog) {
+ if (f == null) {
+ LOG.warn("null file argument.");
+ return false;
+ }
+ final boolean wasDeleted = f.delete();
+ if (wasDeleted) {
+ return true;
+ }
+ final boolean ex = f.exists();
+ if (doLog && ex) {
+ LOG.warn("Failed to delete file or dir ["
+ + f.getAbsolutePath() + "]: it still exists.");
+ }
+ return !ex;
+ }
+
+ /**
+ * Delete the contents of a directory, not the directory itself. If
+ * we return false, the directory may be partially-deleted.
+ * If dir is a symlink to a directory, all the contents of the actual
+ * directory pointed to by dir will be deleted.
+ */
+ private static boolean fullyDeleteContents(final File dir) {
+ boolean deletionSucceeded = true;
+ final File[] contents = dir.listFiles();
+ if (contents != null) {
+ for (File content : contents) {
+ if (content.isFile()) {
+ if (!deleteImpl(content, true)) {
+ deletionSucceeded = false;
+ }
+ } else {
+ // Either directory or symlink to another directory.
+ // Try deleting the directory as this might be a symlink
+ if (deleteImpl(content, false)) {
+ // this was indeed a symlink or an empty directory
+ continue;
+ }
+ // if not an empty directory or symlink let
+ // fullyDelete handle it.
+ if (!fullyDelete(content)) {
+ deletionSucceeded = false;
+ // continue deletion of other files/dirs under dir
+ }
+ }
+ }
+ }
+ return deletionSucceeded;
+ }
+
+ /**
+ * Interprets the passed string as a URI. In case of error it
+ * assumes the specified string is a file.
+ *
+ * @param s the string to interpret
+ * @return the resulting URI
+ */
+ public static URI stringAsURI(String s) throws IOException {
+ URI u = null;
+ // try to make a URI
+ try {
+ u = new URI(s);
+ } catch (URISyntaxException e){
+ LOG.error("Syntax error in URI " + s
+ + ". Please check hdfs configuration.", e);
+ }
+
+ // if URI is null or scheme is undefined, then assume it's file://
+ if(u == null || u.getScheme() == null){
+ LOG.warn("Path " + s + " should be specified as a URI "
+ + "in configuration files. Please update configuration.");
+ u = fileAsURI(new File(s));
+ }
+ return u;
+ }
+
+ /**
+ * Converts the passed File to a URI. This method trims the trailing slash if
+ * one is appended because the underlying file is in fact a directory that
+ * exists.
+ *
+ * @param f the file to convert
+ * @return the resulting URI
+ */
+ public static URI fileAsURI(File f) throws IOException {
+ URI u = f.getCanonicalFile().toURI();
+
+ // trim the trailing slash, if it's present
+ if (u.getPath().endsWith("/")) {
+ String uriAsString = u.toString();
+ try {
+ u = new URI(uriAsString.substring(0, uriAsString.length() - 1));
+ } catch (URISyntaxException e) {
+ throw new IOException(e);
+ }
+ }
+ return u;
+ }
+
+ /**
+ * A wrapper for {@link File#listFiles()}. This java.io API returns null
+ * when a dir is not a directory or for any I/O error. Instead of having
+ * null check everywhere File#listFiles() is used, we will add utility API
+ * to get around this problem. For the majority of cases where we prefer
+ * an IOException to be thrown.
+ * @param dir directory for which listing should be performed
+ * @return list of files or empty list
+ * @exception IOException for invalid directory or for a bad disk.
+ */
+ public static File[] listFiles(File dir) throws IOException {
+ File[] files = dir.listFiles();
+ if(files == null) {
+ throw new IOException("Invalid directory or I/O error occurred for dir: "
+ + dir.toString());
+ }
+ return files;
+ }
+
+ /**
+ * Platform independent implementation for {@link File#canWrite()}
+ * @param f input file
+ * @return On Unix, same as {@link File#canWrite()}
+ * On Windows, true if process has write access on the path
+ */
+ public static boolean canWrite(File f) {
+ if (RaftUtils.WINDOWS) {
+ try {
+ return NativeIO.Windows.access(f.getCanonicalPath(),
+ NativeIO.Windows.AccessRight.ACCESS_WRITE);
+ } catch (IOException e) {
+ return false;
+ }
+ } else {
+ return f.canWrite();
+ }
+ }
+}