You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/11/03 02:53:50 UTC

[1/2] hbase git commit: HBASE-18784 if available, query underlying outputstream capabilities where we need hflush/hsync.

Repository: hbase
Updated Branches:
  refs/heads/master 0ff9dabe6 -> e79a007dd


http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
index d17dde2..f3c5bf2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
 import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
+import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
 import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 
@@ -153,9 +154,9 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
 
   @Override
   protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
-      short replication, long blockSize) throws IOException {
+      short replication, long blockSize) throws IOException, StreamLacksCapabilityException {
     this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication,
-      blockSize, eventLoop, channelClass);
+        blockSize, eventLoop, channelClass);
     this.asyncOutputWrapper = new OutputStreamWrapper(output);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
index 59f6713..d1e72f7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
@@ -31,6 +31,8 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
 import org.apache.hadoop.hbase.wal.FSHLogProvider;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 
@@ -86,9 +88,13 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter
   @SuppressWarnings("deprecation")
   @Override
   protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
-      short replication, long blockSize) throws IOException {
+      short replication, long blockSize) throws IOException, StreamLacksCapabilityException {
     this.output = fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize,
       null);
+    // TODO Be sure to add a check for hsync if this branch includes HBASE-19024
+    if (!(CommonFSUtils.hasCapability(output, "hflush"))) {
+      throw new StreamLacksCapabilityException("hflush");
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
index 5748e6d..81fcaf2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.util;
 import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
 import org.apache.hadoop.hbase.shaded.com.google.common.collect.Iterators;
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
 import org.apache.hadoop.hbase.shaded.com.google.common.primitives.Ints;
 
 import edu.umd.cs.findbugs.annotations.CheckForNull;
@@ -36,8 +35,6 @@ import java.io.InterruptedIOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -60,17 +57,14 @@ import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hbase.ClusterId;
@@ -101,22 +95,18 @@ import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 
-
-import static org.apache.hadoop.hbase.HConstants.HBASE_DIR;
-
 /**
  * Utility methods for interacting with the underlying file system.
  */
 @InterfaceAudience.Private
-public abstract class FSUtils {
+public abstract class FSUtils extends CommonFSUtils {
   private static final Log LOG = LogFactory.getLog(FSUtils.class);
 
-  /** Full access permissions (starting point for a umask) */
-  public static final String FULL_RWX_PERMISSIONS = "777";
   private static final String THREAD_POOLSIZE = "hbase.client.localityCheck.threadPoolSize";
   private static final int DEFAULT_THREAD_POOLSIZE = 2;
 
   /** Set to true on Windows platforms */
+  @VisibleForTesting // currently only used in testing. TODO refactor into a test class
   public static final boolean WINDOWS = System.getProperty("os.name").startsWith("Windows");
 
   protected FSUtils() {
@@ -124,152 +114,6 @@ public abstract class FSUtils {
   }
 
   /**
-   * Sets storage policy for given path according to config setting.
-   * If the passed path is a directory, we'll set the storage policy for all files
-   * created in the future in said directory. Note that this change in storage
-   * policy takes place at the HDFS level; it will persist beyond this RS's lifecycle.
-   * If we're running on a version of HDFS that doesn't support the given storage policy
-   * (or storage policies at all), then we'll issue a log message and continue.
-   *
-   * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
-   *
-   * @param fs We only do anything if an instance of DistributedFileSystem
-   * @param conf used to look up storage policy with given key; not modified.
-   * @param path the Path whose storage policy is to be set
-   * @param policyKey Key to use pulling a policy from Configuration:
-   * e.g. HConstants.WAL_STORAGE_POLICY (hbase.wal.storage.policy).
-   * @param defaultPolicy usually should be the policy NONE to delegate to HDFS
-   */
-  public static void setStoragePolicy(final FileSystem fs, final Configuration conf,
-      final Path path, final String policyKey, final String defaultPolicy) {
-    String storagePolicy = conf.get(policyKey, defaultPolicy).toUpperCase(Locale.ROOT);
-    if (storagePolicy.equals(defaultPolicy)) {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("default policy of " + defaultPolicy + " requested, exiting early.");
-      }
-      return;
-    }
-    setStoragePolicy(fs, path, storagePolicy);
-  }
-
-  private static final Map<FileSystem, Boolean> warningMap =
-      new ConcurrentHashMap<FileSystem, Boolean>();
-
-  /**
-   * Sets storage policy for given path.
-   * If the passed path is a directory, we'll set the storage policy for all files
-   * created in the future in said directory. Note that this change in storage
-   * policy takes place at the HDFS level; it will persist beyond this RS's lifecycle.
-   * If we're running on a version of HDFS that doesn't support the given storage policy
-   * (or storage policies at all), then we'll issue a log message and continue.
-   *
-   * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
-   *
-   * @param fs We only do anything if an instance of DistributedFileSystem
-   * @param path the Path whose storage policy is to be set
-   * @param storagePolicy Policy to set on <code>path</code>; see hadoop 2.6+
-   * org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g
-   * 'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'.
-   */
-  public static void setStoragePolicy(final FileSystem fs, final Path path,
-      final String storagePolicy) {
-    if (storagePolicy == null) {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("We were passed a null storagePolicy, exiting early.");
-      }
-      return;
-    }
-    final String trimmedStoragePolicy = storagePolicy.trim();
-    if (trimmedStoragePolicy.isEmpty()) {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("We were passed an empty storagePolicy, exiting early.");
-      }
-      return;
-    }
-    boolean distributed = false;
-    try {
-      distributed = isDistributedFileSystem(fs);
-    } catch (IOException ioe) {
-      if (!warningMap.containsKey(fs)) {
-        warningMap.put(fs, true);
-        LOG.warn("FileSystem isn't an instance of DistributedFileSystem; presuming it doesn't "
-            + "support setStoragePolicy. Unable to set storagePolicy=" + trimmedStoragePolicy
-            + " on path=" + path);
-      } else if (LOG.isDebugEnabled()) {
-        LOG.debug("FileSystem isn't an instance of DistributedFileSystem; presuming it doesn't "
-            + "support setStoragePolicy. Unable to set storagePolicy=" + trimmedStoragePolicy
-            + " on path=" + path);
-      }
-      return;
-    }
-    if (distributed) {
-      invokeSetStoragePolicy(fs, path, trimmedStoragePolicy);
-    }
-  }
-
-  /*
-   * All args have been checked and are good. Run the setStoragePolicy invocation.
-   */
-  private static void invokeSetStoragePolicy(final FileSystem fs, final Path path,
-      final String storagePolicy) {
-    Method m = null;
-    try {
-      m = fs.getClass().getDeclaredMethod("setStoragePolicy",
-        new Class<?>[] { Path.class, String.class });
-      m.setAccessible(true);
-    } catch (NoSuchMethodException e) {
-      final String msg = "FileSystem doesn't support setStoragePolicy; HDFS-6584 not available";
-      if (!warningMap.containsKey(fs)) {
-        warningMap.put(fs, true);
-        LOG.warn(msg, e);
-      } else if (LOG.isDebugEnabled()) {
-        LOG.debug(msg, e);
-      }
-      m = null;
-    } catch (SecurityException e) {
-      final String msg = "No access to setStoragePolicy on FileSystem; HDFS-6584 not available";
-      if (!warningMap.containsKey(fs)) {
-        warningMap.put(fs, true);
-        LOG.warn(msg, e);
-      } else if (LOG.isDebugEnabled()) {
-        LOG.debug(msg, e);
-      }
-      m = null; // could happen on setAccessible()
-    }
-    if (m != null) {
-      try {
-        m.invoke(fs, path, storagePolicy);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Set storagePolicy=" + storagePolicy + " for path=" + path);
-        }
-      } catch (Exception e) {
-        // This swallows FNFE, should we be throwing it? seems more likely to indicate dev
-        // misuse than a runtime problem with HDFS.
-        if (!warningMap.containsKey(fs)) {
-          warningMap.put(fs, true);
-          LOG.warn("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e);
-        } else if (LOG.isDebugEnabled()) {
-          LOG.debug("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e);
-        }
-        // check for lack of HDFS-7228
-        if (e instanceof InvocationTargetException) {
-          final Throwable exception = e.getCause();
-          if (exception instanceof RemoteException &&
-              HadoopIllegalArgumentException.class.getName().equals(
-                ((RemoteException)exception).getClassName())) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Given storage policy, '" +storagePolicy +"', was rejected and probably " +
-                "isn't a valid policy for the version of Hadoop you're running. I.e. if you're " +
-                "trying to use SSD related policies then you're likely missing HDFS-7228. For " +
-                "more information see the 'ArchivalStorage' docs for your Hadoop release.");
-            }
-          }
-        }
-      }
-    }
-  }
-
-  /**
    * @return True is <code>fs</code> is instance of DistributedFileSystem
    * @throws IOException
    */
@@ -284,32 +128,6 @@ public abstract class FSUtils {
   }
 
   /**
-   * Compare of path component. Does not consider schema; i.e. if schemas
-   * different but <code>path</code> starts with <code>rootPath</code>,
-   * then the function returns true
-   * @param rootPath
-   * @param path
-   * @return True if <code>path</code> starts with <code>rootPath</code>
-   */
-  public static boolean isStartingWithPath(final Path rootPath, final String path) {
-    String uriRootPath = rootPath.toUri().getPath();
-    String tailUriPath = (new Path(path)).toUri().getPath();
-    return tailUriPath.startsWith(uriRootPath);
-  }
-
-  /**
-   * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
-   * '/a/b/c' part. Does not consider schema; i.e. if schemas different but path or subpath matches,
-   * the two will equate.
-   * @param pathToSearch Path we will be trying to match.
-   * @param pathTail
-   * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
-   */
-  public static boolean isMatchingTail(final Path pathToSearch, String pathTail) {
-    return isMatchingTail(pathToSearch, new Path(pathTail));
-  }
-
-  /**
    * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
    * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true.  Does not consider
    * schema; i.e. if schemas different but path or subpath matches, the two will equate.
@@ -353,18 +171,6 @@ public abstract class FSUtils {
   }
 
   /**
-   * Delete if exists.
-   * @param fs filesystem object
-   * @param dir directory to delete
-   * @return True if deleted <code>dir</code>
-   * @throws IOException e
-   */
-  public static boolean deleteDirectory(final FileSystem fs, final Path dir)
-  throws IOException {
-    return fs.exists(dir) && fs.delete(dir, true);
-  }
-
-  /**
    * Delete the region directory if exists.
    * @param conf
    * @param hri
@@ -379,89 +185,7 @@ public abstract class FSUtils {
       new Path(getTableDir(rootDir, hri.getTable()), hri.getEncodedName()));
   }
 
-  /**
-   * Return the number of bytes that large input files should be optimally
-   * be split into to minimize i/o time.
-   *
-   * use reflection to search for getDefaultBlockSize(Path f)
-   * if the method doesn't exist, fall back to using getDefaultBlockSize()
-   *
-   * @param fs filesystem object
-   * @return the default block size for the path's filesystem
-   * @throws IOException e
-   */
-  public static long getDefaultBlockSize(final FileSystem fs, final Path path) throws IOException {
-    Method m = null;
-    Class<? extends FileSystem> cls = fs.getClass();
-    try {
-      m = cls.getMethod("getDefaultBlockSize", new Class<?>[] { Path.class });
-    } catch (NoSuchMethodException e) {
-      LOG.info("FileSystem doesn't support getDefaultBlockSize");
-    } catch (SecurityException e) {
-      LOG.info("Doesn't have access to getDefaultBlockSize on FileSystems", e);
-      m = null; // could happen on setAccessible()
-    }
-    if (m == null) {
-      return fs.getDefaultBlockSize(path);
-    } else {
-      try {
-        Object ret = m.invoke(fs, path);
-        return ((Long)ret).longValue();
-      } catch (Exception e) {
-        throw new IOException(e);
-      }
-    }
-  }
-
-  /*
-   * Get the default replication.
-   *
-   * use reflection to search for getDefaultReplication(Path f)
-   * if the method doesn't exist, fall back to using getDefaultReplication()
-   *
-   * @param fs filesystem object
-   * @param f path of file
-   * @return default replication for the path's filesystem
-   * @throws IOException e
-   */
-  public static short getDefaultReplication(final FileSystem fs, final Path path) throws IOException {
-    Method m = null;
-    Class<? extends FileSystem> cls = fs.getClass();
-    try {
-      m = cls.getMethod("getDefaultReplication", new Class<?>[] { Path.class });
-    } catch (NoSuchMethodException e) {
-      LOG.info("FileSystem doesn't support getDefaultReplication");
-    } catch (SecurityException e) {
-      LOG.info("Doesn't have access to getDefaultReplication on FileSystems", e);
-      m = null; // could happen on setAccessible()
-    }
-    if (m == null) {
-      return fs.getDefaultReplication(path);
-    } else {
-      try {
-        Object ret = m.invoke(fs, path);
-        return ((Number)ret).shortValue();
-      } catch (Exception e) {
-        throw new IOException(e);
-      }
-    }
-  }
-
-  /**
-   * Returns the default buffer size to use during writes.
-   *
-   * The size of the buffer should probably be a multiple of hardware
-   * page size (4096 on Intel x86), and it determines how much data is
-   * buffered during read and write operations.
-   *
-   * @param fs filesystem object
-   * @return default buffer size to use during writes
-   */
-  public static int getDefaultBufferSize(final FileSystem fs) {
-    return fs.getConf().getInt("io.file.buffer.size", 4096);
-  }
-
-  /**
+ /**
    * Create the specified file on the filesystem. By default, this will:
    * <ol>
    * <li>overwrite the file if it exists</li>
@@ -515,71 +239,6 @@ public abstract class FSUtils {
   }
 
   /**
-   * Create the specified file on the filesystem. By default, this will:
-   * <ol>
-   * <li>apply the umask in the configuration (if it is enabled)</li>
-   * <li>use the fs configured buffer size (or 4096 if not set)</li>
-   * <li>use the default replication</li>
-   * <li>use the default block size</li>
-   * <li>not track progress</li>
-   * </ol>
-   *
-   * @param fs {@link FileSystem} on which to write the file
-   * @param path {@link Path} to the file to write
-   * @param perm
-   * @param overwrite Whether or not the created file should be overwritten.
-   * @return output stream to the created file
-   * @throws IOException if the file cannot be created
-   */
-  public static FSDataOutputStream create(FileSystem fs, Path path,
-      FsPermission perm, boolean overwrite) throws IOException {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Creating file=" + path + " with permission=" + perm + ", overwrite=" + overwrite);
-    }
-    return fs.create(path, perm, overwrite, getDefaultBufferSize(fs),
-        getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null);
-  }
-
-  /**
-   * Get the file permissions specified in the configuration, if they are
-   * enabled.
-   *
-   * @param fs filesystem that the file will be created on.
-   * @param conf configuration to read for determining if permissions are
-   *          enabled and which to use
-   * @param permssionConfKey property key in the configuration to use when
-   *          finding the permission
-   * @return the permission to use when creating a new file on the fs. If
-   *         special permissions are not specified in the configuration, then
-   *         the default permissions on the the fs will be returned.
-   */
-  public static FsPermission getFilePermissions(final FileSystem fs,
-      final Configuration conf, final String permssionConfKey) {
-    boolean enablePermissions = conf.getBoolean(
-        HConstants.ENABLE_DATA_FILE_UMASK, false);
-
-    if (enablePermissions) {
-      try {
-        FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS);
-        // make sure that we have a mask, if not, go default.
-        String mask = conf.get(permssionConfKey);
-        if (mask == null)
-          return FsPermission.getFileDefault();
-        // appy the umask
-        FsPermission umask = new FsPermission(mask);
-        return perm.applyUMask(umask);
-      } catch (IllegalArgumentException e) {
-        LOG.warn(
-            "Incorrect umask attempted to be created: "
-                + conf.get(permssionConfKey)
-                + ", using default file permissions.", e);
-        return FsPermission.getFileDefault();
-      }
-    }
-    return FsPermission.getFileDefault();
-  }
-
-  /**
    * Checks to see if the specified file system is available
    *
    * @param fs filesystem
@@ -1023,46 +682,6 @@ public abstract class FSUtils {
   }
 
   /**
-   * Verifies root directory path is a valid URI with a scheme
-   *
-   * @param root root directory path
-   * @return Passed <code>root</code> argument.
-   * @throws IOException if not a valid URI with a scheme
-   */
-  public static Path validateRootPath(Path root) throws IOException {
-    try {
-      URI rootURI = new URI(root.toString());
-      String scheme = rootURI.getScheme();
-      if (scheme == null) {
-        throw new IOException("Root directory does not have a scheme");
-      }
-      return root;
-    } catch (URISyntaxException e) {
-      IOException io = new IOException("Root directory path is not a valid " +
-        "URI -- check your " + HBASE_DIR + " configuration");
-      io.initCause(e);
-      throw io;
-    }
-  }
-
-  /**
-   * Checks for the presence of the WAL log root path (using the provided conf object) in the given path. If
-   * it exists, this method removes it and returns the String representation of remaining relative path.
-   * @param path
-   * @param conf
-   * @return String representation of the remaining relative path
-   * @throws IOException
-   */
-  public static String removeWALRootPath(Path path, final Configuration conf) throws IOException {
-    Path root = getWALRootDir(conf);
-    String pathStr = path.toString();
-    // check that the path is absolute... it has the root path in it.
-    if (!pathStr.startsWith(root.toString())) return pathStr;
-    // if not, return as it is.
-    return pathStr.substring(root.toString().length() + 1);// remove the "/" too.
-  }
-
-  /**
    * If DFS, check safe mode and if so, wait until we clear it.
    * @param conf configuration
    * @param wait Sleep between retries
@@ -1086,81 +705,6 @@ public abstract class FSUtils {
   }
 
   /**
-   * Return the 'path' component of a Path.  In Hadoop, Path is an URI.  This
-   * method returns the 'path' component of a Path's URI: e.g. If a Path is
-   * <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>,
-   * this method returns <code>/hbase_trunk/TestTable/compaction.dir</code>.
-   * This method is useful if you want to print out a Path without qualifying
-   * Filesystem instance.
-   * @param p Filesystem Path whose 'path' component we are to return.
-   * @return Path portion of the Filesystem
-   */
-  public static String getPath(Path p) {
-    return p.toUri().getPath();
-  }
-
-  /**
-   * @param c configuration
-   * @return {@link Path} to hbase root directory: i.e. {@value org.apache.hadoop.hbase.HConstants#HBASE_DIR} from
-   * configuration as a qualified Path.
-   * @throws IOException e
-   */
-  public static Path getRootDir(final Configuration c) throws IOException {
-    Path p = new Path(c.get(HBASE_DIR));
-    FileSystem fs = p.getFileSystem(c);
-    return p.makeQualified(fs);
-  }
-
-  public static void setRootDir(final Configuration c, final Path root) throws IOException {
-    c.set(HBASE_DIR, root.toString());
-  }
-
-  public static void setFsDefault(final Configuration c, final Path root) throws IOException {
-    c.set("fs.defaultFS", root.toString());    // for hadoop 0.21+
-  }
-
-  public static FileSystem getRootDirFileSystem(final Configuration c) throws IOException {
-    Path p = getRootDir(c);
-    return p.getFileSystem(c);
-  }
-
-  /**
-   * @param c configuration
-   * @return {@link Path} to hbase log root directory: i.e. {@value org.apache.hadoop.hbase.fs.HFileSystem#HBASE_WAL_DIR} from
-   * configuration as a qualified Path. Defaults to {@value org.apache.hadoop.hbase.HConstants#HBASE_DIR}
-   * @throws IOException e
-   */
-  public static Path getWALRootDir(final Configuration c) throws IOException {
-    Path p = new Path(c.get(HFileSystem.HBASE_WAL_DIR, c.get(HBASE_DIR)));
-    if (!isValidWALRootDir(p, c)) {
-      return FSUtils.getRootDir(c);
-    }
-    FileSystem fs = p.getFileSystem(c);
-    return p.makeQualified(fs);
-  }
-
-  @VisibleForTesting
-  public static void setWALRootDir(final Configuration c, final Path root) throws IOException {
-    c.set(HFileSystem.HBASE_WAL_DIR, root.toString());
-  }
-
-  public static FileSystem getWALFileSystem(final Configuration c) throws IOException {
-    Path p = getWALRootDir(c);
-    return p.getFileSystem(c);
-  }
-
-  private static boolean isValidWALRootDir(Path walDir, final Configuration c) throws IOException {
-    Path rootDir = FSUtils.getRootDir(c);
-    if (walDir != rootDir) {
-      if (walDir.toString().startsWith(rootDir.toString() + "/")) {
-        throw new IllegalStateException("Illegal WAL directory specified. " +
-            "WAL directories are not permitted to be under the root directory if set.");
-      }
-    }
-    return true;
-  }
-
-  /**
    * Checks if meta region exists
    *
    * @param fs file system
@@ -1298,44 +842,6 @@ public abstract class FSUtils {
   }
 
   /**
-   * Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under
-   * path rootdir
-   *
-   * @param rootdir qualified path of HBase root directory
-   * @param tableName name of table
-   * @return {@link org.apache.hadoop.fs.Path} for table
-   */
-  public static Path getTableDir(Path rootdir, final TableName tableName) {
-    return new Path(getNamespaceDir(rootdir, tableName.getNamespaceAsString()),
-        tableName.getQualifierAsString());
-  }
-
-  /**
-   * Returns the {@link org.apache.hadoop.hbase.TableName} object representing
-   * the table directory under
-   * path rootdir
-   *
-   * @param tablePath path of table
-   * @return {@link org.apache.hadoop.fs.Path} for table
-   */
-  public static TableName getTableName(Path tablePath) {
-    return TableName.valueOf(tablePath.getParent().getName(), tablePath.getName());
-  }
-
-  /**
-   * Returns the {@link org.apache.hadoop.fs.Path} object representing
-   * the namespace directory under path rootdir
-   *
-   * @param rootdir qualified path of HBase root directory
-   * @param namespace namespace name
-   * @return {@link org.apache.hadoop.fs.Path} for table
-   */
-  public static Path getNamespaceDir(Path rootdir, final String namespace) {
-    return new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR,
-        new Path(namespace)));
-  }
-
-  /**
    * A {@link PathFilter} that returns only regular files.
    */
   static class FileFilter extends AbstractFileStatusFilter {
@@ -1432,17 +938,6 @@ public abstract class FSUtils {
   }
 
   /**
-   * @param conf
-   * @return True if this filesystem whose scheme is 'hdfs'.
-   * @throws IOException
-   */
-  public static boolean isHDFS(final Configuration conf) throws IOException {
-    FileSystem fs = FileSystem.get(conf);
-    String scheme = fs.getUri().getScheme();
-    return scheme.equalsIgnoreCase("hdfs");
-  }
-
-  /**
    * Recover file lease. Used when a file might be suspect
    * to be had been left open by another process.
    * @param fs FileSystem handle
@@ -1484,15 +979,6 @@ public abstract class FSUtils {
   }
 
   /**
-   * Checks if the given path is the one with 'recovered.edits' dir.
-   * @param path
-   * @return True if we recovered edits
-   */
-  public static boolean isRecoveredEdits(Path path) {
-    return path.toString().contains(HConstants.RECOVERED_EDITS_DIR);
-  }
-
-  /**
    * Filter for all dirs that don't start with '.'
    */
   public static class RegionDirFilter extends AbstractFileStatusFilter {
@@ -1668,18 +1154,6 @@ public abstract class FSUtils {
     }
   }
 
-
-  /**
-   * @param conf
-   * @return Returns the filesystem of the hbase rootdir.
-   * @throws IOException
-   */
-  public static FileSystem getCurrentFileSystem(Configuration conf)
-  throws IOException {
-    return getRootDir(conf).getFileSystem(conf);
-  }
-
-
   /**
    * Runs through the HBase rootdir/tablename and creates a reverse lookup map for
    * table StoreFile names to the full Path.
@@ -1978,101 +1452,6 @@ public abstract class FSUtils {
   }
 
   /**
-   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
-   * This accommodates differences between hadoop versions, where hadoop 1
-   * does not throw a FileNotFoundException, and return an empty FileStatus[]
-   * while Hadoop 2 will throw FileNotFoundException.
-   *
-   * Where possible, prefer {@link #listStatusWithStatusFilter(FileSystem,
-   * Path, FileStatusFilter)} instead.
-   *
-   * @param fs file system
-   * @param dir directory
-   * @param filter path filter
-   * @return null if dir is empty or doesn't exist, otherwise FileStatus array
-   */
-  public static FileStatus [] listStatus(final FileSystem fs,
-      final Path dir, final PathFilter filter) throws IOException {
-    FileStatus [] status = null;
-    try {
-      status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter);
-    } catch (FileNotFoundException fnfe) {
-      // if directory doesn't exist, return null
-      if (LOG.isTraceEnabled()) {
-        LOG.trace(dir + " doesn't exist");
-      }
-    }
-    if (status == null || status.length < 1) return null;
-    return status;
-  }
-
-  /**
-   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
-   * This would accommodates differences between hadoop versions
-   *
-   * @param fs file system
-   * @param dir directory
-   * @return null if dir is empty or doesn't exist, otherwise FileStatus array
-   */
-  public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException {
-    return listStatus(fs, dir, null);
-  }
-
-  /**
-   * Calls fs.listFiles() to get FileStatus and BlockLocations together for reducing rpc call
-   *
-   * @param fs file system
-   * @param dir directory
-   * @return LocatedFileStatus list
-   */
-  public static List<LocatedFileStatus> listLocatedStatus(final FileSystem fs,
-      final Path dir) throws IOException {
-    List<LocatedFileStatus> status = null;
-    try {
-      RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fs
-          .listFiles(dir, false);
-      while (locatedFileStatusRemoteIterator.hasNext()) {
-        if (status == null) {
-          status = Lists.newArrayList();
-        }
-        status.add(locatedFileStatusRemoteIterator.next());
-      }
-    } catch (FileNotFoundException fnfe) {
-      // if directory doesn't exist, return null
-      if (LOG.isTraceEnabled()) {
-        LOG.trace(dir + " doesn't exist");
-      }
-    }
-    return status;
-  }
-
-  /**
-   * Calls fs.delete() and returns the value returned by the fs.delete()
-   *
-   * @param fs
-   * @param path
-   * @param recursive
-   * @return the value returned by the fs.delete()
-   * @throws IOException
-   */
-  public static boolean delete(final FileSystem fs, final Path path, final boolean recursive)
-      throws IOException {
-    return fs.delete(path, recursive);
-  }
-
-  /**
-   * Calls fs.exists(). Checks if the specified path exists
-   *
-   * @param fs
-   * @param path
-   * @return the value returned by fs.exists()
-   * @throws IOException
-   */
-  public static boolean isExists(final FileSystem fs, final Path path) throws IOException {
-    return fs.exists(path);
-  }
-
-  /**
    * Throw an exception if an action is not permitted by a user on a file.
    *
    * @param ugi
@@ -2109,46 +1488,6 @@ public abstract class FSUtils {
   }
 
   /**
-   * Log the current state of the filesystem from a certain root directory
-   * @param fs filesystem to investigate
-   * @param root root file/directory to start logging from
-   * @param LOG log to output information
-   * @throws IOException if an unexpected exception occurs
-   */
-  public static void logFileSystemState(final FileSystem fs, final Path root, Log LOG)
-      throws IOException {
-    LOG.debug("Current file system:");
-    logFSTree(LOG, fs, root, "|-");
-  }
-
-  /**
-   * Recursive helper to log the state of the FS
-   *
-   * @see #logFileSystemState(FileSystem, Path, Log)
-   */
-  private static void logFSTree(Log LOG, final FileSystem fs, final Path root, String prefix)
-      throws IOException {
-    FileStatus[] files = FSUtils.listStatus(fs, root, null);
-    if (files == null) return;
-
-    for (FileStatus file : files) {
-      if (file.isDirectory()) {
-        LOG.debug(prefix + file.getPath().getName() + "/");
-        logFSTree(LOG, fs, file.getPath(), prefix + "---");
-      } else {
-        LOG.debug(prefix + file.getPath().getName());
-      }
-    }
-  }
-
-  public static boolean renameAndSetModifyTime(final FileSystem fs, final Path src, final Path dest)
-      throws IOException {
-    // set the modify time for TimeToLive Cleaner
-    fs.setTimes(src, EnvironmentEdgeManager.currentTime(), -1);
-    return fs.rename(src, dest);
-  }
-
-  /**
    * This function is to scan the root path of the file system to get the
    * degree of locality for each region on each of the servers having at least
    * one block of that region.
@@ -2397,4 +1736,5 @@ public abstract class FSUtils {
       return null;
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
index 8880ca5..4304137 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
@@ -38,7 +38,7 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL;
 import org.apache.hadoop.hbase.regionserver.wal.AsyncProtobufLogWriter;
-import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.Pair;
 
 /**
@@ -52,7 +52,13 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
 
   // Only public so classes back in regionserver.wal can access
   public interface AsyncWriter extends WALProvider.AsyncWriter {
-    void init(FileSystem fs, Path path, Configuration c, boolean overwritable) throws IOException;
+    /**
+     * @throws IOException if something goes wrong initializing an output stream
+     * @throws StreamLacksCapabilityException if the given FileSystem can't provide streams that
+     *         meet the needs of the given Writer implementation.
+     */
+    void init(FileSystem fs, Path path, Configuration c, boolean overwritable)
+        throws IOException, CommonFSUtils.StreamLacksCapabilityException;
   }
 
   private EventLoopGroup eventLoopGroup;
@@ -60,7 +66,7 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
   private Class<? extends Channel> channelClass;
   @Override
   protected AsyncFSWAL createWAL() throws IOException {
-    return new AsyncFSWAL(FSUtils.getWALFileSystem(conf), FSUtils.getWALRootDir(conf),
+    return new AsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf),
         getWALDirectoryName(factory.factoryId),
         getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix,
         META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null,
@@ -96,7 +102,15 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
       writer.init(fs, path, conf, overwritable);
       return writer;
     } catch (Exception e) {
-      LOG.debug("Error instantiating log writer.", e);
+      if (e instanceof CommonFSUtils.StreamLacksCapabilityException) {
+        LOG.error("The RegionServer async write ahead log provider " +
+            "relies on the ability to call " + e.getMessage() + " for proper operation during " +
+            "component failures, but the current FileSystem does not support doing so. Please " +
+            "check the config value of '" + CommonFSUtils.HBASE_WAL_DIR + "' and ensure " +
+            "it points to a FileSystem mount that has suitable capabilities for output streams.");
+      } else {
+        LOG.debug("Error instantiating log writer.", e);
+      }
       Throwables.propagateIfPossible(e, IOException.class);
       throw new IOException("cannot get log writer", e);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java
index 459485c..b72e668 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java
@@ -31,7 +31,7 @@ import org.apache.yetus.audience.InterfaceStability;
 // imports for things that haven't moved from regionserver.wal yet.
 import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
 import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
-import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
 
 /**
  * A WAL provider that use {@link FSHLog}.
@@ -44,7 +44,13 @@ public class FSHLogProvider extends AbstractFSWALProvider<FSHLog> {
 
   // Only public so classes back in regionserver.wal can access
   public interface Writer extends WALProvider.Writer {
-    void init(FileSystem fs, Path path, Configuration c, boolean overwritable) throws IOException;
+    /**
+     * @throws IOException if something goes wrong initializing an output stream
+     * @throws StreamLacksCapabilityException if the given FileSystem can't provide streams that
+     *         meet the needs of the given Writer implementation.
+     */
+    void init(FileSystem fs, Path path, Configuration c, boolean overwritable)
+        throws IOException, CommonFSUtils.StreamLacksCapabilityException;
   }
 
   /**
@@ -61,7 +67,15 @@ public class FSHLogProvider extends AbstractFSWALProvider<FSHLog> {
       writer.init(fs, path, conf, overwritable);
       return writer;
     } catch (Exception e) { 
-      LOG.debug("Error instantiating log writer.", e);            
+      if (e instanceof CommonFSUtils.StreamLacksCapabilityException) {
+        LOG.error("The RegionServer write ahead log provider for FileSystem implementations " +
+            "relies on the ability to call " + e.getMessage() + " for proper operation during " +
+            "component failures, but the current FileSystem does not support doing so. Please " +
+            "check the config value of '" + CommonFSUtils.HBASE_WAL_DIR + "' and ensure " +
+            "it points to a FileSystem mount that has suitable capabilities for output streams.");
+      } else {
+        LOG.debug("Error instantiating log writer.", e);
+      }
       if (writer != null) {
         try{
           writer.close();
@@ -75,7 +89,7 @@ public class FSHLogProvider extends AbstractFSWALProvider<FSHLog> {
 
   @Override
   protected FSHLog createWAL() throws IOException {
-    return new FSHLog(FSUtils.getWALFileSystem(conf), FSUtils.getWALRootDir(conf),
+    return new FSHLog(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf),
         getWALDirectoryName(factory.factoryId),
         getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix,
         META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
index 1fb5d37..b0d689c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
 import org.apache.hadoop.hbase.testclassification.MiscTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.FSUtils;
 import org.junit.AfterClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -55,7 +56,8 @@ public class TestLocalAsyncOutput {
   }
 
   @Test
-  public void test() throws IOException, InterruptedException, ExecutionException {
+  public void test() throws IOException, InterruptedException, ExecutionException,
+      FSUtils.StreamLacksCapabilityException {
     Path f = new Path(TEST_UTIL.getDataTestDir(), "test");
     FileSystem fs = FileSystem.getLocal(TEST_UTIL.getConfiguration());
     AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, false, true,

http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java
index 4e11778..9ea068a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/MockMasterServices.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
 import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
 import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
+import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
 import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@@ -184,10 +185,8 @@ public class MockMasterServices extends MockNoopMasterServices {
       throws IOException {
     final Configuration conf = getConfiguration();
     final Path logDir = new Path(fileSystemManager.getRootDir(),
-        MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR);
+        WALProcedureStore.MASTER_PROCEDURE_LOGDIR);
 
-    //procedureStore = new WALProcedureStore(conf, fileSystemManager.getFileSystem(), logDir,
-    //    new MasterProcedureEnv.WALStoreLeaseRecovery(this));
     this.procedureStore = new NoopProcedureStore();
     this.procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this));
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureWalLease.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureWalLease.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureWalLease.java
index 86f0abc..e0452c2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureWalLease.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureWalLease.java
@@ -115,8 +115,8 @@ public class TestMasterProcedureWalLease {
     Mockito.doReturn(firstMaster.getConfiguration()).when(backupMaster3).getConfiguration();
     Mockito.doReturn(true).when(backupMaster3).isActiveMaster();
     final WALProcedureStore backupStore3 = new WALProcedureStore(firstMaster.getConfiguration(),
-        firstMaster.getMasterFileSystem().getFileSystem(),
         ((WALProcedureStore)masterStore).getWALDir(),
+        null,
         new MasterProcedureEnv.WALStoreLeaseRecovery(backupMaster3));
     // Abort Latch for the test store
     final CountDownLatch backupStore3Abort = new CountDownLatch(1);
@@ -195,8 +195,8 @@ public class TestMasterProcedureWalLease {
     Mockito.doReturn(firstMaster.getConfiguration()).when(backupMaster3).getConfiguration();
     Mockito.doReturn(true).when(backupMaster3).isActiveMaster();
     final WALProcedureStore procStore2 = new WALProcedureStore(firstMaster.getConfiguration(),
-        firstMaster.getMasterFileSystem().getFileSystem(),
         ((WALProcedureStore)procStore).getWALDir(),
+        null,
         new MasterProcedureEnv.WALStoreLeaseRecovery(backupMaster3));
 
     // start a second store which should fence the first one out

http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java
index 0c5ee1f..7932d00 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestWALProcedureStoreOnHDFS.java
@@ -76,7 +76,7 @@ public class TestWALProcedureStoreOnHDFS {
     MiniDFSCluster dfs = UTIL.startMiniDFSCluster(3);
 
     Path logDir = new Path(new Path(dfs.getFileSystem().getUri()), "/test-logs");
-    store = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(), dfs.getFileSystem(), logDir);
+    store = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(), logDir);
     store.registerListener(stopProcedureListener);
     store.start(8);
     store.recoverLease();

http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java
index d5e8c1c..b736fae 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java
@@ -58,9 +58,9 @@ import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.EnvironmentEdge;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL;
@@ -139,8 +139,8 @@ public abstract class AbstractTestFSWAL {
     // test to see whether the coprocessor is loaded or not.
     AbstractFSWAL<?> wal = null;
     try {
-      wal = newWAL(FS, FSUtils.getWALRootDir(CONF), DIR.toString(), HConstants.HREGION_OLDLOGDIR_NAME,
-        CONF, null, true, null, null);
+      wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(),
+          HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null);
       WALCoprocessorHost host = wal.getCoprocessorHost();
       Coprocessor c = host.findCoprocessor(SampleRegionWALCoprocessor.class);
       assertNotNull(c);
@@ -187,8 +187,8 @@ public abstract class AbstractTestFSWAL {
     AbstractFSWAL<?> wal1 = null;
     AbstractFSWAL<?> walMeta = null;
     try {
-      wal1 = newWAL(FS, FSUtils.getWALRootDir(CONF), DIR.toString(), HConstants.HREGION_OLDLOGDIR_NAME,
-        CONF, null, true, null, null);
+      wal1 = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(),
+          HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null);
       LOG.debug("Log obtained is: " + wal1);
       Comparator<Path> comp = wal1.LOG_NAME_COMPARATOR;
       Path p1 = wal1.computeFilename(11);
@@ -197,9 +197,9 @@ public abstract class AbstractTestFSWAL {
       assertTrue(comp.compare(p1, p1) == 0);
       // comparing with different filenum.
       assertTrue(comp.compare(p1, p2) < 0);
-      walMeta =
-          newWAL(FS, FSUtils.getWALRootDir(CONF), DIR.toString(), HConstants.HREGION_OLDLOGDIR_NAME,
-            CONF, null, true, null, AbstractFSWALProvider.META_WAL_PROVIDER_ID);
+      walMeta = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(),
+          HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null,
+          AbstractFSWALProvider.META_WAL_PROVIDER_ID);
       Comparator<Path> compMeta = walMeta.LOG_NAME_COMPARATOR;
 
       Path p1WithMeta = walMeta.computeFilename(11);
@@ -245,7 +245,7 @@ public abstract class AbstractTestFSWAL {
     LOG.debug("testFindMemStoresEligibleForFlush");
     Configuration conf1 = HBaseConfiguration.create(CONF);
     conf1.setInt("hbase.regionserver.maxlogs", 1);
-    AbstractFSWAL<?> wal = newWAL(FS, FSUtils.getWALRootDir(conf1), DIR.toString(),
+    AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(conf1), DIR.toString(),
       HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null);
     HTableDescriptor t1 =
         new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor("row"));
@@ -332,9 +332,10 @@ public abstract class AbstractTestFSWAL {
   }
 
   @Test(expected = IOException.class)
-  public void testFailedToCreateWALIfParentRenamed() throws IOException {
+  public void testFailedToCreateWALIfParentRenamed() throws IOException,
+      CommonFSUtils.StreamLacksCapabilityException {
     final String name = "testFailedToCreateWALIfParentRenamed";
-    AbstractFSWAL<?> wal = newWAL(FS, FSUtils.getWALRootDir(CONF), name,
+    AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), name,
       HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null);
     long filenum = System.currentTimeMillis();
     Path path = wal.computeFilename(filenum);
@@ -373,17 +374,17 @@ public abstract class AbstractTestFSWAL {
       scopes.put(fam, 0);
     }
     // subclass and doctor a method.
-    AbstractFSWAL<?> wal = newSlowWAL(FS, FSUtils.getWALRootDir(CONF), DIR.toString(), testName, CONF,
-      null, true, null, null, new Runnable() {
+    AbstractFSWAL<?> wal = newSlowWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(),
+        testName, CONF, null, true, null, null, new Runnable() {
 
-        @Override
-        public void run() {
-          if (goslow.get()) {
-            Threads.sleep(100);
-            LOG.debug("Sleeping before appending 100ms");
+          @Override
+          public void run() {
+            if (goslow.get()) {
+              Threads.sleep(100);
+              LOG.debug("Sleeping before appending 100ms");
+            }
           }
-        }
-      });
+        });
     HRegion region = HRegion.openHRegion(TEST_UTIL.getConfiguration(),
       TEST_UTIL.getTestFileSystem(), TEST_UTIL.getDefaultRootDirPath(), hri, htd, wal);
     EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
@@ -434,8 +435,8 @@ public abstract class AbstractTestFSWAL {
   @Test
   public void testSyncNoAppend() throws IOException {
     String testName = currentTest.getMethodName();
-    AbstractFSWAL<?> wal = newWAL(FS, FSUtils.getWALRootDir(CONF), DIR.toString(), testName, CONF,
-      null, true, null, null);
+    AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getWALRootDir(CONF), DIR.toString(), testName,
+        CONF, null, true, null, null);
     try {
       wal.sync();
     } finally {

http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
index f18ad77..283b85d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
@@ -88,6 +88,7 @@ import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
 import org.apache.hadoop.hbase.util.EnvironmentEdge;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
@@ -1029,7 +1030,8 @@ public abstract class AbstractTestWALReplay {
   /**
    * testcase for https://issues.apache.org/jira/browse/HBASE-14949.
    */
-  private void testNameConflictWhenSplit(boolean largeFirst) throws IOException {
+  private void testNameConflictWhenSplit(boolean largeFirst) throws IOException,
+      StreamLacksCapabilityException {
     final TableName tableName = TableName.valueOf("testReplayEditsWrittenIntoWAL");
     final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
     final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
@@ -1071,12 +1073,12 @@ public abstract class AbstractTestWALReplay {
   }
 
   @Test
-  public void testNameConflictWhenSplit0() throws IOException {
+  public void testNameConflictWhenSplit0() throws IOException, StreamLacksCapabilityException {
     testNameConflictWhenSplit(true);
   }
 
   @Test
-  public void testNameConflictWhenSplit1() throws IOException {
+  public void testNameConflictWhenSplit1() throws IOException, StreamLacksCapabilityException {
     testNameConflictWhenSplit(false);
   }
 
@@ -1231,7 +1233,8 @@ public abstract class AbstractTestWALReplay {
     return htd;
   }
 
-  private void writerWALFile(Path file, List<FSWALEntry> entries) throws IOException {
+  private void writerWALFile(Path file, List<FSWALEntry> entries) throws IOException,
+      StreamLacksCapabilityException {
     fs.mkdirs(file.getParent());
     ProtobufLogWriter writer = new ProtobufLogWriter();
     writer.init(fs, file, conf, true);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
index 548c023..055c28d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
@@ -30,6 +30,8 @@ import java.io.IOException;
 import java.util.Random;
 import java.util.UUID;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -41,7 +43,6 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.MiscTests;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -57,6 +58,7 @@ import org.junit.experimental.categories.Category;
  */
 @Category({MiscTests.class, MediumTests.class})
 public class TestFSUtils {
+  private static final Log LOG = LogFactory.getLog(TestFSUtils.class);
 
   private HBaseTestingUtility htu;
   private FileSystem fs;
@@ -69,53 +71,6 @@ public class TestFSUtils {
     conf = htu.getConfiguration();
   }
 
-  /**
-   * Test path compare and prefix checking.
-   * @throws IOException
-   */
-  @Test
-  public void testMatchingTail() throws IOException {
-    Path rootdir = htu.getDataTestDir();
-    assertTrue(rootdir.depth() > 1);
-    Path partPath = new Path("a", "b");
-    Path fullPath = new Path(rootdir, partPath);
-    Path fullyQualifiedPath = fs.makeQualified(fullPath);
-    assertFalse(FSUtils.isMatchingTail(fullPath, partPath));
-    assertFalse(FSUtils.isMatchingTail(fullPath, partPath.toString()));
-    assertTrue(FSUtils.isStartingWithPath(rootdir, fullPath.toString()));
-    assertTrue(FSUtils.isStartingWithPath(fullyQualifiedPath, fullPath.toString()));
-    assertFalse(FSUtils.isStartingWithPath(rootdir, partPath.toString()));
-    assertFalse(FSUtils.isMatchingTail(fullyQualifiedPath, partPath));
-    assertTrue(FSUtils.isMatchingTail(fullyQualifiedPath, fullPath));
-    assertTrue(FSUtils.isMatchingTail(fullyQualifiedPath, fullPath.toString()));
-    assertTrue(FSUtils.isMatchingTail(fullyQualifiedPath, fs.makeQualified(fullPath)));
-    assertTrue(FSUtils.isStartingWithPath(rootdir, fullyQualifiedPath.toString()));
-    assertFalse(FSUtils.isMatchingTail(fullPath, new Path("x")));
-    assertFalse(FSUtils.isMatchingTail(new Path("x"), fullPath));
-  }
-
-  @Test
-  public void testVersion() throws DeserializationException, IOException {
-    final Path rootdir = htu.getDataTestDir();
-    assertNull(FSUtils.getVersion(fs, rootdir));
-    // Write out old format version file.  See if we can read it in and convert.
-    Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
-    FSDataOutputStream s = fs.create(versionFile);
-    final String version = HConstants.FILE_SYSTEM_VERSION;
-    s.writeUTF(version);
-    s.close();
-    assertTrue(fs.exists(versionFile));
-    FileStatus [] status = fs.listStatus(versionFile);
-    assertNotNull(status);
-    assertTrue(status.length > 0);
-    String newVersion = FSUtils.getVersion(fs, rootdir);
-    assertEquals(version.length(), newVersion.length());
-    assertEquals(version, newVersion);
-    // File will have been converted. Exercise the pb format
-    assertEquals(version, FSUtils.getVersion(fs, rootdir));
-    FSUtils.checkVersion(fs, rootdir, true);
-  }
-
   @Test public void testIsHDFS() throws Exception {
     assertFalse(FSUtils.isHDFS(conf));
     MiniDFSCluster cluster = null;
@@ -239,7 +194,32 @@ public class TestFSUtils {
   }
 
   @Test
+  public void testVersion() throws DeserializationException, IOException {
+    final Path rootdir = htu.getDataTestDir();
+    final FileSystem fs = rootdir.getFileSystem(conf);
+    assertNull(FSUtils.getVersion(fs, rootdir));
+    // Write out old format version file.  See if we can read it in and convert.
+    Path versionFile = new Path(rootdir, HConstants.VERSION_FILE_NAME);
+    FSDataOutputStream s = fs.create(versionFile);
+    final String version = HConstants.FILE_SYSTEM_VERSION;
+    s.writeUTF(version);
+    s.close();
+    assertTrue(fs.exists(versionFile));
+    FileStatus [] status = fs.listStatus(versionFile);
+    assertNotNull(status);
+    assertTrue(status.length > 0);
+    String newVersion = FSUtils.getVersion(fs, rootdir);
+    assertEquals(version.length(), newVersion.length());
+    assertEquals(version, newVersion);
+    // File will have been converted. Exercise the pb format
+    assertEquals(version, FSUtils.getVersion(fs, rootdir));
+    FSUtils.checkVersion(fs, rootdir, true);
+  }
+
+  @Test
   public void testPermMask() throws Exception {
+    final Path rootdir = htu.getDataTestDir();
+    final FileSystem fs = rootdir.getFileSystem(conf);
     // default fs permission
     FsPermission defaultFsPerm = FSUtils.getFilePermissions(fs, conf,
         HConstants.DATA_FILE_UMASK_KEY);
@@ -277,6 +257,8 @@ public class TestFSUtils {
 
   @Test
   public void testDeleteAndExists() throws Exception {
+    final Path rootdir = htu.getDataTestDir();
+    final FileSystem fs = rootdir.getFileSystem(conf);
     conf.setBoolean(HConstants.ENABLE_DATA_FILE_UMASK, true);
     FsPermission perms = FSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY);
     // then that the correct file is created
@@ -302,6 +284,7 @@ public class TestFSUtils {
     }
   }
 
+
   @Test
   public void testRenameAndSetModifyTime() throws Exception {
     MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
@@ -338,6 +321,24 @@ public class TestFSUtils {
     }
   }
 
+  @Test
+  public void testSetStoragePolicyDefault() throws Exception {
+    verifyFileInDirWithStoragePolicy(HConstants.DEFAULT_WAL_STORAGE_POLICY);
+  }
+
+  /* might log a warning, but still work. (always warning on Hadoop < 2.6.0) */
+  @Test
+  public void testSetStoragePolicyValidButMaybeNotPresent() throws Exception {
+    verifyFileInDirWithStoragePolicy("ALL_SSD");
+  }
+
+  /* should log a warning, but still work. (different warning on Hadoop < 2.6.0) */
+  @Test
+  public void testSetStoragePolicyInvalid() throws Exception {
+    verifyFileInDirWithStoragePolicy("1772");
+  }
+
+  // Here instead of TestCommonFSUtils because we need a minicluster
   private void verifyFileInDirWithStoragePolicy(final String policy) throws Exception {
     conf.set(HConstants.WAL_STORAGE_POLICY, policy);
 
@@ -362,63 +363,6 @@ public class TestFSUtils {
     }
   }
 
-  @Test
-  public void testSetStoragePolicyDefault() throws Exception {
-    verifyFileInDirWithStoragePolicy(HConstants.DEFAULT_WAL_STORAGE_POLICY);
-  }
-
-  /* might log a warning, but still work. (always warning on Hadoop < 2.6.0) */
-  @Test
-  public void testSetStoragePolicyValidButMaybeNotPresent() throws Exception {
-    verifyFileInDirWithStoragePolicy("ALL_SSD");
-  }
-
-  /* should log a warning, but still work. (different warning on Hadoop < 2.6.0) */
-  @Test
-  public void testSetStoragePolicyInvalid() throws Exception {
-    verifyFileInDirWithStoragePolicy("1772");
-  }
-
-  @Test
-  public void testSetWALRootDir() throws Exception {
-    Path p = new Path("file:///hbase/root");
-    FSUtils.setWALRootDir(conf, p);
-    assertEquals(p.toString(), conf.get(HFileSystem.HBASE_WAL_DIR));
-  }
-
-  @Test
-  public void testGetWALRootDir() throws IOException {
-    Path root = new Path("file:///hbase/root");
-    Path walRoot = new Path("file:///hbase/logroot");
-    FSUtils.setRootDir(conf, root);
-    assertEquals(FSUtils.getRootDir(conf), root);
-    assertEquals(FSUtils.getWALRootDir(conf), root);
-    FSUtils.setWALRootDir(conf, walRoot);
-    assertEquals(FSUtils.getWALRootDir(conf), walRoot);
-  }
-
-  @Test(expected=IllegalStateException.class)
-  public void testGetWALRootDirIllegalWALDir() throws IOException {
-    Path root = new Path("file:///hbase/root");
-    Path invalidWALDir = new Path("file:///hbase/root/logroot");
-    FSUtils.setRootDir(conf, root);
-    FSUtils.setWALRootDir(conf, invalidWALDir);
-    FSUtils.getWALRootDir(conf);
-  }
-
-  @Test
-  public void testRemoveWALRootPath() throws Exception {
-    FSUtils.setRootDir(conf, new Path("file:///user/hbase"));
-    Path testFile = new Path(FSUtils.getRootDir(conf), "test/testfile");
-    Path tmpFile = new Path("file:///test/testfile");
-    assertEquals(FSUtils.removeWALRootPath(testFile, conf), "test/testfile");
-    assertEquals(FSUtils.removeWALRootPath(tmpFile, conf), tmpFile.toString());
-    FSUtils.setWALRootDir(conf, new Path("file:///user/hbaseLogDir"));
-    assertEquals(FSUtils.removeWALRootPath(testFile, conf), testFile.toString());
-    Path logFile = new Path(FSUtils.getWALRootDir(conf), "test/testlog");
-    assertEquals(FSUtils.removeWALRootPath(logFile, conf), "test/testlog");
-  }
-
   /**
    * Ugly test that ensures we can get at the hedged read counters in dfsclient.
    * Does a bit of preading with hedged reads enabled using code taken from hdfs TestPread.
@@ -565,4 +509,37 @@ public class TestFSUtils {
     assertTrue(fileSys.delete(name, true));
     assertTrue(!fileSys.exists(name));
   }
+
+
+  private static final boolean STREAM_CAPABILITIES_IS_PRESENT;
+  static {
+    boolean tmp = false;
+    try {
+      Class.forName("org.apache.hadoop.fs.StreamCapabilities");
+      tmp = true;
+      LOG.debug("Test thought StreamCapabilities class was present.");
+    } catch (ClassNotFoundException exception) {
+      LOG.debug("Test didn't think StreamCapabilities class was present.");
+    } finally {
+      STREAM_CAPABILITIES_IS_PRESENT = tmp;
+    }
+  }
+
+  // Here instead of TestCommonFSUtils because we need a minicluster
+  @Test
+  public void checkStreamCapabilitiesOnHdfsDataOutputStream() throws Exception {
+    MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
+    try (FileSystem filesystem = cluster.getFileSystem()) {
+      FSDataOutputStream stream = filesystem.create(new Path("/tmp/foobar"));
+      assertTrue(FSUtils.hasCapability(stream, "hsync"));
+      assertTrue(FSUtils.hasCapability(stream, "hflush"));
+      assertNotEquals("We expect HdfsDataOutputStream to say it has a dummy capability iff the " +
+          "StreamCapabilities class is not defined.",
+          STREAM_CAPABILITIES_IS_PRESENT,
+          FSUtils.hasCapability(stream, "a capability that hopefully HDFS doesn't add."));
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
index 944a4f1..f578c11 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
@@ -38,7 +38,7 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
 import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
-import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 
 /**
@@ -100,7 +100,7 @@ public class IOTestProvider implements WALProvider {
       providerId = DEFAULT_PROVIDER_ID;
     }
     final String logPrefix = factory.factoryId + WAL_FILE_NAME_DELIMITER + providerId;
-    log = new IOTestWAL(FSUtils.getWALFileSystem(conf), FSUtils.getWALRootDir(conf),
+    log = new IOTestWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf),
         AbstractFSWALProvider.getWALDirectoryName(factory.factoryId),
         HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners, true, logPrefix,
         META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null);
@@ -184,7 +184,12 @@ public class IOTestProvider implements WALProvider {
       if (!initialized || doFileRolls) {
         LOG.info("creating new writer instance.");
         final ProtobufLogWriter writer = new IOTestWriter();
-        writer.init(fs, path, conf, false);
+        try {
+          writer.init(fs, path, conf, false);
+        } catch (CommonFSUtils.StreamLacksCapabilityException exception) {
+          throw new IOException("Can't create writer instance because underlying FileSystem " +
+              "doesn't support needed stream capabilities.", exception);
+        }
         if (!initialized) {
           LOG.info("storing initial writer instance in case file rolling isn't allowed.");
           noRollsWriter = writer;
@@ -207,7 +212,8 @@ public class IOTestProvider implements WALProvider {
     private boolean doSyncs;
 
     @Override
-    public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable) throws IOException {
+    public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable)
+        throws IOException, CommonFSUtils.StreamLacksCapabilityException {
       Collection<String> operations = conf.getStringCollection(ALLOWED_OPERATIONS);
       if (operations.isEmpty() || operations.contains(AllowedOperations.all.name())) {
         doAppends = doSyncs = true;


[2/2] hbase git commit: HBASE-18784 if available, query underlying outputstream capabilities where we need hflush/hsync.

Posted by bu...@apache.org.
HBASE-18784 if available, query underlying outputstream capabilities where we need hflush/hsync.

* pull things that don't rely on HDFS in hbase-server/FSUtils into hbase-common/CommonFSUtils
* refactor setStoragePolicy so that it can move into hbase-common/CommonFSUtils, as a side effect update it for Hadoop 2.8,3.0+
* refactor WALProcedureStore so that it handles its own FS interactions
* add a reflection-based lookup of stream capabilities
* call said lookup in places where we make WALs to make sure hflush/hsync is available.
* javadoc / checkstyle cleanup on changes as flagged by yetus

Signed-off-by: Chia-Ping Tsai <ch...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e79a007d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e79a007d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e79a007d

Branch: refs/heads/master
Commit: e79a007dd9810b33cd508986037e17d45b55a705
Parents: 0ff9dab
Author: Sean Busbey <bu...@apache.org>
Authored: Thu Oct 12 10:59:43 2017 -0500
Committer: Sean Busbey <bu...@apache.org>
Committed: Thu Nov 2 21:29:20 2017 -0500

----------------------------------------------------------------------
 .../apache/hadoop/hbase/util/CommonFSUtils.java | 890 +++++++++++++++++++
 .../hadoop/hbase/util/TestCommonFSUtils.java    | 164 ++++
 .../procedure2/store/wal/WALProcedureStore.java |  70 +-
 .../procedure2/ProcedureTestingUtility.java     |  12 +-
 .../hbase/procedure2/TestChildProcedures.java   |   2 +-
 .../hbase/procedure2/TestProcedureEvents.java   |   2 +-
 .../procedure2/TestProcedureExecution.java      |   2 +-
 .../hbase/procedure2/TestProcedureMetrics.java  |   2 +-
 .../hbase/procedure2/TestProcedureNonce.java    |   2 +-
 .../hbase/procedure2/TestProcedureRecovery.java |   2 +-
 .../procedure2/TestProcedureReplayOrder.java    |   2 +-
 .../procedure2/TestStateMachineProcedure.java   |   2 +-
 .../hbase/procedure2/TestYieldProcedures.java   |   2 +-
 ...ProcedureWALLoaderPerformanceEvaluation.java |   2 +-
 .../wal/ProcedureWALPerformanceEvaluation.java  |   9 +-
 .../store/wal/TestStressWALProcedureStore.java  |   2 +-
 .../store/wal/TestWALProcedureStore.java        |   4 +-
 .../org/apache/hadoop/hbase/fs/HFileSystem.java |   3 -
 .../hbase/io/asyncfs/AsyncFSOutputHelper.java   |  11 +-
 .../org/apache/hadoop/hbase/master/HMaster.java |  18 +-
 .../hadoop/hbase/master/MasterFileSystem.java   |  10 +-
 .../procedure/MasterProcedureConstants.java     |   3 -
 .../hbase/regionserver/wal/AbstractFSWAL.java   |  29 +-
 .../wal/AbstractProtobufLogWriter.java          |   5 +-
 .../wal/AsyncProtobufLogWriter.java             |   5 +-
 .../regionserver/wal/ProtobufLogWriter.java     |   8 +-
 .../org/apache/hadoop/hbase/util/FSUtils.java   | 668 +-------------
 .../hadoop/hbase/wal/AsyncFSWALProvider.java    |  22 +-
 .../apache/hadoop/hbase/wal/FSHLogProvider.java |  22 +-
 .../hbase/io/asyncfs/TestLocalAsyncOutput.java  |   4 +-
 .../master/assignment/MockMasterServices.java   |   5 +-
 .../procedure/TestMasterProcedureWalLease.java  |   4 +-
 .../procedure/TestWALProcedureStoreOnHDFS.java  |   2 +-
 .../regionserver/wal/AbstractTestFSWAL.java     |  45 +-
 .../regionserver/wal/AbstractTestWALReplay.java |  11 +-
 .../apache/hadoop/hbase/util/TestFSUtils.java   | 187 ++--
 .../apache/hadoop/hbase/wal/IOTestProvider.java |  14 +-
 37 files changed, 1344 insertions(+), 903 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java
new file mode 100644
index 0000000..bdf148e
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java
@@ -0,0 +1,890 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
+
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Utility methods for interacting with the underlying file system.
+ */
+@InterfaceAudience.Private
+public abstract class CommonFSUtils {
+  private static final Log LOG = LogFactory.getLog(CommonFSUtils.class);
+
+  /** Parameter name for HBase WAL directory */
+  public static final String HBASE_WAL_DIR = "hbase.wal.dir";
+
+  /** Full access permissions (starting point for a umask) */
+  public static final String FULL_RWX_PERMISSIONS = "777";
+
+  protected CommonFSUtils() {
+    super();
+  }
+
+  /**
+   * Compare of path component. Does not consider schema; i.e. if schemas
+   * different but <code>path</code> starts with <code>rootPath</code>,
+   * then the function returns true
+   * @param rootPath value to check for
+   * @param path subject to check
+   * @return True if <code>path</code> starts with <code>rootPath</code>
+   */
+  public static boolean isStartingWithPath(final Path rootPath, final String path) {
+    String uriRootPath = rootPath.toUri().getPath();
+    String tailUriPath = (new Path(path)).toUri().getPath();
+    return tailUriPath.startsWith(uriRootPath);
+  }
+
+  /**
+   * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
+   * '/a/b/c' part. Does not consider schema; i.e. if schemas different but path or subpath matches,
+   * the two will equate.
+   * @param pathToSearch Path we will be trying to match against.
+   * @param pathTail what to match
+   * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
+   */
+  public static boolean isMatchingTail(final Path pathToSearch, String pathTail) {
+    return isMatchingTail(pathToSearch, new Path(pathTail));
+  }
+
+  /**
+   * Compare path component of the Path URI; e.g. if hdfs://a/b/c and /a/b/c, it will compare the
+   * '/a/b/c' part. If you passed in 'hdfs://a/b/c and b/c, it would return true.  Does not consider
+   * schema; i.e. if schemas different but path or subpath matches, the two will equate.
+   * @param pathToSearch Path we will be trying to match agains against
+   * @param pathTail what to match
+   * @return True if <code>pathTail</code> is tail on the path of <code>pathToSearch</code>
+   */
+  public static boolean isMatchingTail(final Path pathToSearch, final Path pathTail) {
+    if (pathToSearch.depth() != pathTail.depth()) {
+      return false;
+    }
+    Path tailPath = pathTail;
+    String tailName;
+    Path toSearch = pathToSearch;
+    String toSearchName;
+    boolean result = false;
+    do {
+      tailName = tailPath.getName();
+      if (tailName == null || tailName.length() <= 0) {
+        result = true;
+        break;
+      }
+      toSearchName = toSearch.getName();
+      if (toSearchName == null || toSearchName.length() <= 0) {
+        break;
+      }
+      // Move up a parent on each path for next go around.  Path doesn't let us go off the end.
+      tailPath = tailPath.getParent();
+      toSearch = toSearch.getParent();
+    } while(tailName.equals(toSearchName));
+    return result;
+  }
+
+  /**
+   * Delete if exists.
+   * @param fs filesystem object
+   * @param dir directory to delete
+   * @return True if deleted <code>dir</code>
+   * @throws IOException e
+   */
+  public static boolean deleteDirectory(final FileSystem fs, final Path dir)
+  throws IOException {
+    return fs.exists(dir) && fs.delete(dir, true);
+  }
+
+  /**
+   * Return the number of bytes that large input files should be optimally
+   * be split into to minimize i/o time.
+   *
+   * use reflection to search for getDefaultBlockSize(Path f)
+   * if the method doesn't exist, fall back to using getDefaultBlockSize()
+   *
+   * @param fs filesystem object
+   * @return the default block size for the path's filesystem
+   * @throws IOException e
+   */
+  public static long getDefaultBlockSize(final FileSystem fs, final Path path) throws IOException {
+    Method m = null;
+    Class<? extends FileSystem> cls = fs.getClass();
+    try {
+      m = cls.getMethod("getDefaultBlockSize", new Class<?>[] { Path.class });
+    } catch (NoSuchMethodException e) {
+      LOG.info("FileSystem doesn't support getDefaultBlockSize");
+    } catch (SecurityException e) {
+      LOG.info("Doesn't have access to getDefaultBlockSize on FileSystems", e);
+      m = null; // could happen on setAccessible()
+    }
+    if (m == null) {
+      return fs.getDefaultBlockSize(path);
+    } else {
+      try {
+        Object ret = m.invoke(fs, path);
+        return ((Long)ret).longValue();
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+    }
+  }
+
+  /*
+   * Get the default replication.
+   *
+   * use reflection to search for getDefaultReplication(Path f)
+   * if the method doesn't exist, fall back to using getDefaultReplication()
+   *
+   * @param fs filesystem object
+   * @param f path of file
+   * @return default replication for the path's filesystem
+   * @throws IOException e
+   */
+  public static short getDefaultReplication(final FileSystem fs, final Path path)
+      throws IOException {
+    Method m = null;
+    Class<? extends FileSystem> cls = fs.getClass();
+    try {
+      m = cls.getMethod("getDefaultReplication", new Class<?>[] { Path.class });
+    } catch (NoSuchMethodException e) {
+      LOG.info("FileSystem doesn't support getDefaultReplication");
+    } catch (SecurityException e) {
+      LOG.info("Doesn't have access to getDefaultReplication on FileSystems", e);
+      m = null; // could happen on setAccessible()
+    }
+    if (m == null) {
+      return fs.getDefaultReplication(path);
+    } else {
+      try {
+        Object ret = m.invoke(fs, path);
+        return ((Number)ret).shortValue();
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+    }
+  }
+
+  /**
+   * Returns the default buffer size to use during writes.
+   *
+   * The size of the buffer should probably be a multiple of hardware
+   * page size (4096 on Intel x86), and it determines how much data is
+   * buffered during read and write operations.
+   *
+   * @param fs filesystem object
+   * @return default buffer size to use during writes
+   */
+  public static int getDefaultBufferSize(final FileSystem fs) {
+    return fs.getConf().getInt("io.file.buffer.size", 4096);
+  }
+
+  /**
+   * Create the specified file on the filesystem. By default, this will:
+   * <ol>
+   * <li>apply the umask in the configuration (if it is enabled)</li>
+   * <li>use the fs configured buffer size (or 4096 if not set)</li>
+   * <li>use the default replication</li>
+   * <li>use the default block size</li>
+   * <li>not track progress</li>
+   * </ol>
+   *
+   * @param fs {@link FileSystem} on which to write the file
+   * @param path {@link Path} to the file to write
+   * @param perm intial permissions
+   * @param overwrite Whether or not the created file should be overwritten.
+   * @return output stream to the created file
+   * @throws IOException if the file cannot be created
+   */
+  public static FSDataOutputStream create(FileSystem fs, Path path,
+      FsPermission perm, boolean overwrite) throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Creating file=" + path + " with permission=" + perm + ", overwrite=" + overwrite);
+    }
+    return fs.create(path, perm, overwrite, getDefaultBufferSize(fs),
+        getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null);
+  }
+
+  /**
+   * Get the file permissions specified in the configuration, if they are
+   * enabled.
+   *
+   * @param fs filesystem that the file will be created on.
+   * @param conf configuration to read for determining if permissions are
+   *          enabled and which to use
+   * @param permssionConfKey property key in the configuration to use when
+   *          finding the permission
+   * @return the permission to use when creating a new file on the fs. If
+   *         special permissions are not specified in the configuration, then
+   *         the default permissions on the the fs will be returned.
+   */
+  public static FsPermission getFilePermissions(final FileSystem fs,
+      final Configuration conf, final String permssionConfKey) {
+    boolean enablePermissions = conf.getBoolean(
+        HConstants.ENABLE_DATA_FILE_UMASK, false);
+
+    if (enablePermissions) {
+      try {
+        FsPermission perm = new FsPermission(FULL_RWX_PERMISSIONS);
+        // make sure that we have a mask, if not, go default.
+        String mask = conf.get(permssionConfKey);
+        if (mask == null) {
+          return FsPermission.getFileDefault();
+        }
+        // appy the umask
+        FsPermission umask = new FsPermission(mask);
+        return perm.applyUMask(umask);
+      } catch (IllegalArgumentException e) {
+        LOG.warn(
+            "Incorrect umask attempted to be created: "
+                + conf.get(permssionConfKey)
+                + ", using default file permissions.", e);
+        return FsPermission.getFileDefault();
+      }
+    }
+    return FsPermission.getFileDefault();
+  }
+
+  /**
+   * Verifies root directory path is a valid URI with a scheme
+   *
+   * @param root root directory path
+   * @return Passed <code>root</code> argument.
+   * @throws IOException if not a valid URI with a scheme
+   */
+  public static Path validateRootPath(Path root) throws IOException {
+    try {
+      URI rootURI = new URI(root.toString());
+      String scheme = rootURI.getScheme();
+      if (scheme == null) {
+        throw new IOException("Root directory does not have a scheme");
+      }
+      return root;
+    } catch (URISyntaxException e) {
+      IOException io = new IOException("Root directory path is not a valid " +
+        "URI -- check your " + HConstants.HBASE_DIR + " configuration");
+      io.initCause(e);
+      throw io;
+    }
+  }
+
+  /**
+   * Checks for the presence of the WAL log root path (using the provided conf object) in the given
+   * path. If it exists, this method removes it and returns the String representation of remaining
+   * relative path.
+   * @param path must not be null
+   * @param conf must not be null
+   * @return String representation of the remaining relative path
+   * @throws IOException from underlying filesystem
+   */
+  public static String removeWALRootPath(Path path, final Configuration conf) throws IOException {
+    Path root = getWALRootDir(conf);
+    String pathStr = path.toString();
+    // check that the path is absolute... it has the root path in it.
+    if (!pathStr.startsWith(root.toString())) {
+      return pathStr;
+    }
+    // if not, return as it is.
+    return pathStr.substring(root.toString().length() + 1);// remove the "/" too.
+  }
+
+  /**
+   * Return the 'path' component of a Path.  In Hadoop, Path is an URI.  This
+   * method returns the 'path' component of a Path's URI: e.g. If a Path is
+   * <code>hdfs://example.org:9000/hbase_trunk/TestTable/compaction.dir</code>,
+   * this method returns <code>/hbase_trunk/TestTable/compaction.dir</code>.
+   * This method is useful if you want to print out a Path without qualifying
+   * Filesystem instance.
+   * @param p Filesystem Path whose 'path' component we are to return.
+   * @return Path portion of the Filesystem
+   */
+  public static String getPath(Path p) {
+    return p.toUri().getPath();
+  }
+
+  /**
+   * @param c configuration
+   * @return {@link Path} to hbase root directory from
+   *     configuration as a qualified Path.
+   * @throws IOException e
+   */
+  public static Path getRootDir(final Configuration c) throws IOException {
+    Path p = new Path(c.get(HConstants.HBASE_DIR));
+    FileSystem fs = p.getFileSystem(c);
+    return p.makeQualified(fs);
+  }
+
+  public static void setRootDir(final Configuration c, final Path root) throws IOException {
+    c.set(HConstants.HBASE_DIR, root.toString());
+  }
+
+  public static void setFsDefault(final Configuration c, final Path root) throws IOException {
+    c.set("fs.defaultFS", root.toString());    // for hadoop 0.21+
+  }
+
+  public static FileSystem getRootDirFileSystem(final Configuration c) throws IOException {
+    Path p = getRootDir(c);
+    return p.getFileSystem(c);
+  }
+
+  /**
+   * @param c configuration
+   * @return {@link Path} to hbase log root directory: e.g. {@value HBASE_WAL_DIR} from
+   *     configuration as a qualified Path. Defaults to HBase root dir.
+   * @throws IOException e
+   */
+  public static Path getWALRootDir(final Configuration c) throws IOException {
+    Path p = new Path(c.get(HBASE_WAL_DIR, c.get(HConstants.HBASE_DIR)));
+    if (!isValidWALRootDir(p, c)) {
+      return getRootDir(c);
+    }
+    FileSystem fs = p.getFileSystem(c);
+    return p.makeQualified(fs);
+  }
+
+  @VisibleForTesting
+  public static void setWALRootDir(final Configuration c, final Path root) throws IOException {
+    c.set(HBASE_WAL_DIR, root.toString());
+  }
+
+  public static FileSystem getWALFileSystem(final Configuration c) throws IOException {
+    Path p = getWALRootDir(c);
+    return p.getFileSystem(c);
+  }
+
+  private static boolean isValidWALRootDir(Path walDir, final Configuration c) throws IOException {
+    Path rootDir = getRootDir(c);
+    if (walDir != rootDir) {
+      if (walDir.toString().startsWith(rootDir.toString() + "/")) {
+        throw new IllegalStateException("Illegal WAL directory specified. " +
+            "WAL directories are not permitted to be under the root directory if set.");
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Returns the {@link org.apache.hadoop.fs.Path} object representing the table directory under
+   * path rootdir
+   *
+   * @param rootdir qualified path of HBase root directory
+   * @param tableName name of table
+   * @return {@link org.apache.hadoop.fs.Path} for table
+   */
+  public static Path getTableDir(Path rootdir, final TableName tableName) {
+    return new Path(getNamespaceDir(rootdir, tableName.getNamespaceAsString()),
+        tableName.getQualifierAsString());
+  }
+
+  /**
+   * Returns the {@link org.apache.hadoop.hbase.TableName} object representing
+   * the table directory under
+   * path rootdir
+   *
+   * @param tablePath path of table
+   * @return {@link org.apache.hadoop.fs.Path} for table
+   */
+  public static TableName getTableName(Path tablePath) {
+    return TableName.valueOf(tablePath.getParent().getName(), tablePath.getName());
+  }
+
+  /**
+   * Returns the {@link org.apache.hadoop.fs.Path} object representing
+   * the namespace directory under path rootdir
+   *
+   * @param rootdir qualified path of HBase root directory
+   * @param namespace namespace name
+   * @return {@link org.apache.hadoop.fs.Path} for table
+   */
+  public static Path getNamespaceDir(Path rootdir, final String namespace) {
+    return new Path(rootdir, new Path(HConstants.BASE_NAMESPACE_DIR,
+        new Path(namespace)));
+  }
+
+  /**
+   * Sets storage policy for given path according to config setting.
+   * If the passed path is a directory, we'll set the storage policy for all files
+   * created in the future in said directory. Note that this change in storage
+   * policy takes place at the FileSystem level; it will persist beyond this RS's lifecycle.
+   * If we're running on a FileSystem implementation that doesn't support the given storage policy
+   * (or storage policies at all), then we'll issue a log message and continue.
+   *
+   * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
+   *
+   * @param fs We only do anything it implements a setStoragePolicy method
+   * @param conf used to look up storage policy with given key; not modified.
+   * @param path the Path whose storage policy is to be set
+   * @param policyKey Key to use pulling a policy from Configuration:
+   *   e.g. HConstants.WAL_STORAGE_POLICY (hbase.wal.storage.policy).
+   * @param defaultPolicy if the configured policy is equal to this policy name, we will skip
+   *   telling the FileSystem to set a storage policy.
+   */
+  public static void setStoragePolicy(final FileSystem fs, final Configuration conf,
+      final Path path, final String policyKey, final String defaultPolicy) {
+    String storagePolicy = conf.get(policyKey, defaultPolicy).toUpperCase(Locale.ROOT);
+    if (storagePolicy.equals(defaultPolicy)) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("default policy of " + defaultPolicy + " requested, exiting early.");
+      }
+      return;
+    }
+    setStoragePolicy(fs, path, storagePolicy);
+  }
+
+  // this mapping means that under a federated FileSystem implementation, we'll
+  // only log the first failure from any of the underlying FileSystems at WARN and all others
+  // will be at DEBUG.
+  private static final Map<FileSystem, Boolean> warningMap =
+      new ConcurrentHashMap<FileSystem, Boolean>();
+
+  /**
+   * Sets storage policy for given path.
+   * If the passed path is a directory, we'll set the storage policy for all files
+   * created in the future in said directory. Note that this change in storage
+   * policy takes place at the FileSystem level; it will persist beyond this RS's lifecycle.
+   * If we're running on a version of FileSystem that doesn't support the given storage policy
+   * (or storage policies at all), then we'll issue a log message and continue.
+   *
+   * See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
+   *
+   * @param fs We only do anything it implements a setStoragePolicy method
+   * @param path the Path whose storage policy is to be set
+   * @param storagePolicy Policy to set on <code>path</code>; see hadoop 2.6+
+   *   org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g
+   *   'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'.
+   */
+  public static void setStoragePolicy(final FileSystem fs, final Path path,
+      final String storagePolicy) {
+    if (storagePolicy == null) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("We were passed a null storagePolicy, exiting early.");
+      }
+      return;
+    }
+    final String trimmedStoragePolicy = storagePolicy.trim();
+    if (trimmedStoragePolicy.isEmpty()) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("We were passed an empty storagePolicy, exiting early.");
+      }
+      return;
+    }
+    invokeSetStoragePolicy(fs, path, trimmedStoragePolicy);
+  }
+
+  /*
+   * All args have been checked and are good. Run the setStoragePolicy invocation.
+   */
+  private static void invokeSetStoragePolicy(final FileSystem fs, final Path path,
+      final String storagePolicy) {
+    Method m = null;
+    try {
+      m = fs.getClass().getDeclaredMethod("setStoragePolicy",
+        new Class<?>[] { Path.class, String.class });
+      m.setAccessible(true);
+    } catch (NoSuchMethodException e) {
+      final String msg = "FileSystem doesn't support setStoragePolicy; HDFS-6584, HDFS-9345 " +
+          "not available. This is normal and expected on earlier Hadoop versions.";
+      if (!warningMap.containsKey(fs)) {
+        warningMap.put(fs, true);
+        LOG.warn(msg, e);
+      } else if (LOG.isDebugEnabled()) {
+        LOG.debug(msg, e);
+      }
+      m = null;
+    } catch (SecurityException e) {
+      final String msg = "No access to setStoragePolicy on FileSystem from the SecurityManager; " +
+          "HDFS-6584, HDFS-9345 not available. This is unusual and probably warrants an email " +
+          "to the user@hbase mailing list. Please be sure to include a link to your configs, and " +
+          "logs that include this message and period of time before it. Logs around service " +
+          "start up will probably be useful as well.";
+      if (!warningMap.containsKey(fs)) {
+        warningMap.put(fs, true);
+        LOG.warn(msg, e);
+      } else if (LOG.isDebugEnabled()) {
+        LOG.debug(msg, e);
+      }
+      m = null; // could happen on setAccessible() or getDeclaredMethod()
+    }
+    if (m != null) {
+      try {
+        m.invoke(fs, path, storagePolicy);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Set storagePolicy=" + storagePolicy + " for path=" + path);
+        }
+      } catch (Exception e) {
+        // This swallows FNFE, should we be throwing it? seems more likely to indicate dev
+        // misuse than a runtime problem with HDFS.
+        if (!warningMap.containsKey(fs)) {
+          warningMap.put(fs, true);
+          LOG.warn("Unable to set storagePolicy=" + storagePolicy + " for path=" + path + ". " +
+              "DEBUG log level might have more details.", e);
+        } else if (LOG.isDebugEnabled()) {
+          LOG.debug("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e);
+        }
+        // check for lack of HDFS-7228
+        if (e instanceof InvocationTargetException) {
+          final Throwable exception = e.getCause();
+          if (exception instanceof RemoteException &&
+              HadoopIllegalArgumentException.class.getName().equals(
+                ((RemoteException)exception).getClassName())) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Given storage policy, '" +storagePolicy +"', was rejected and probably " +
+                "isn't a valid policy for the version of Hadoop you're running. I.e. if you're " +
+                "trying to use SSD related policies then you're likely missing HDFS-7228. For " +
+                "more information see the 'ArchivalStorage' docs for your Hadoop release.");
+            }
+          // Hadoop 2.8+, 3.0-a1+ added FileSystem.setStoragePolicy with a default implementation
+          // that throws UnsupportedOperationException
+          } else if (exception instanceof UnsupportedOperationException) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("The underlying FileSystem implementation doesn't support " +
+                  "setStoragePolicy. This is probably intentional on their part, since HDFS-9345 " +
+                  "appears to be present in your version of Hadoop. For more information check " +
+                  "the Hadoop documentation on 'ArchivalStorage', the Hadoop FileSystem " +
+                  "specification docs from HADOOP-11981, and/or related documentation from the " +
+                  "provider of the underlying FileSystem (its name should appear in the " +
+                  "stacktrace that accompanies this message). Note in particular that Hadoop's " +
+                  "local filesystem implementation doesn't support storage policies.", exception);
+            }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * @param conf must not be null
+   * @return True if this filesystem whose scheme is 'hdfs'.
+   * @throws IOException from underlying FileSystem
+   */
+  public static boolean isHDFS(final Configuration conf) throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    String scheme = fs.getUri().getScheme();
+    return scheme.equalsIgnoreCase("hdfs");
+  }
+
+  /**
+   * Checks if the given path is the one with 'recovered.edits' dir.
+   * @param path must not be null
+   * @return True if we recovered edits
+   */
+  public static boolean isRecoveredEdits(Path path) {
+    return path.toString().contains(HConstants.RECOVERED_EDITS_DIR);
+  }
+
+  /**
+   * @param conf must not be null
+   * @return Returns the filesystem of the hbase rootdir.
+   * @throws IOException from underlying FileSystem
+   */
+  public static FileSystem getCurrentFileSystem(Configuration conf)
+  throws IOException {
+    return getRootDir(conf).getFileSystem(conf);
+  }
+
+  /**
+   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
+   * This accommodates differences between hadoop versions, where hadoop 1
+   * does not throw a FileNotFoundException, and return an empty FileStatus[]
+   * while Hadoop 2 will throw FileNotFoundException.
+   *
+   * Where possible, prefer FSUtils#listStatusWithStatusFilter(FileSystem,
+   * Path, FileStatusFilter) instead.
+   *
+   * @param fs file system
+   * @param dir directory
+   * @param filter path filter
+   * @return null if dir is empty or doesn't exist, otherwise FileStatus array
+   */
+  public static FileStatus [] listStatus(final FileSystem fs,
+      final Path dir, final PathFilter filter) throws IOException {
+    FileStatus [] status = null;
+    try {
+      status = filter == null ? fs.listStatus(dir) : fs.listStatus(dir, filter);
+    } catch (FileNotFoundException fnfe) {
+      // if directory doesn't exist, return null
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(dir + " doesn't exist");
+      }
+    }
+    if (status == null || status.length < 1) {
+      return null;
+    }
+    return status;
+  }
+
+  /**
+   * Calls fs.listStatus() and treats FileNotFoundException as non-fatal
+   * This would accommodates differences between hadoop versions
+   *
+   * @param fs file system
+   * @param dir directory
+   * @return null if dir is empty or doesn't exist, otherwise FileStatus array
+   */
+  public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException {
+    return listStatus(fs, dir, null);
+  }
+
+  /**
+   * Calls fs.listFiles() to get FileStatus and BlockLocations together for reducing rpc call
+   *
+   * @param fs file system
+   * @param dir directory
+   * @return LocatedFileStatus list
+   */
+  public static List<LocatedFileStatus> listLocatedStatus(final FileSystem fs,
+      final Path dir) throws IOException {
+    List<LocatedFileStatus> status = null;
+    try {
+      RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fs
+          .listFiles(dir, false);
+      while (locatedFileStatusRemoteIterator.hasNext()) {
+        if (status == null) {
+          status = Lists.newArrayList();
+        }
+        status.add(locatedFileStatusRemoteIterator.next());
+      }
+    } catch (FileNotFoundException fnfe) {
+      // if directory doesn't exist, return null
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(dir + " doesn't exist");
+      }
+    }
+    return status;
+  }
+
+  /**
+   * Calls fs.delete() and returns the value returned by the fs.delete()
+   *
+   * @param fs must not be null
+   * @param path must not be null
+   * @param recursive delete tree rooted at path
+   * @return the value returned by the fs.delete()
+   * @throws IOException from underlying FileSystem
+   */
+  public static boolean delete(final FileSystem fs, final Path path, final boolean recursive)
+      throws IOException {
+    return fs.delete(path, recursive);
+  }
+
+  /**
+   * Calls fs.exists(). Checks if the specified path exists
+   *
+   * @param fs must not be null
+   * @param path must not be null
+   * @return the value returned by fs.exists()
+   * @throws IOException from underlying FileSystem
+   */
+  public static boolean isExists(final FileSystem fs, final Path path) throws IOException {
+    return fs.exists(path);
+  }
+
+  /**
+   * Log the current state of the filesystem from a certain root directory
+   * @param fs filesystem to investigate
+   * @param root root file/directory to start logging from
+   * @param LOG log to output information
+   * @throws IOException if an unexpected exception occurs
+   */
+  public static void logFileSystemState(final FileSystem fs, final Path root, Log LOG)
+      throws IOException {
+    LOG.debug("Current file system:");
+    logFSTree(LOG, fs, root, "|-");
+  }
+
+  /**
+   * Recursive helper to log the state of the FS
+   *
+   * @see #logFileSystemState(FileSystem, Path, Log)
+   */
+  private static void logFSTree(Log LOG, final FileSystem fs, final Path root, String prefix)
+      throws IOException {
+    FileStatus[] files = listStatus(fs, root, null);
+    if (files == null) {
+      return;
+    }
+
+    for (FileStatus file : files) {
+      if (file.isDirectory()) {
+        LOG.debug(prefix + file.getPath().getName() + "/");
+        logFSTree(LOG, fs, file.getPath(), prefix + "---");
+      } else {
+        LOG.debug(prefix + file.getPath().getName());
+      }
+    }
+  }
+
+  public static boolean renameAndSetModifyTime(final FileSystem fs, final Path src, final Path dest)
+      throws IOException {
+    // set the modify time for TimeToLive Cleaner
+    fs.setTimes(src, EnvironmentEdgeManager.currentTime(), -1);
+    return fs.rename(src, dest);
+  }
+
+  /**
+   * Do our short circuit read setup.
+   * Checks buffer size to use and whether to do checksumming in hbase or hdfs.
+   * @param conf must not be null
+   */
+  public static void setupShortCircuitRead(final Configuration conf) {
+    // Check that the user has not set the "dfs.client.read.shortcircuit.skip.checksum" property.
+    boolean shortCircuitSkipChecksum =
+      conf.getBoolean("dfs.client.read.shortcircuit.skip.checksum", false);
+    boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true);
+    if (shortCircuitSkipChecksum) {
+      LOG.warn("Configuration \"dfs.client.read.shortcircuit.skip.checksum\" should not " +
+        "be set to true." + (useHBaseChecksum ? " HBase checksum doesn't require " +
+        "it, see https://issues.apache.org/jira/browse/HBASE-6868." : ""));
+      assert !shortCircuitSkipChecksum; //this will fail if assertions are on
+    }
+    checkShortCircuitReadBufferSize(conf);
+  }
+
+  /**
+   * Check if short circuit read buffer size is set and if not, set it to hbase value.
+   * @param conf must not be null
+   */
+  public static void checkShortCircuitReadBufferSize(final Configuration conf) {
+    final int defaultSize = HConstants.DEFAULT_BLOCKSIZE * 2;
+    final int notSet = -1;
+    // DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY is only defined in h2
+    final String dfsKey = "dfs.client.read.shortcircuit.buffer.size";
+    int size = conf.getInt(dfsKey, notSet);
+    // If a size is set, return -- we will use it.
+    if (size != notSet) {
+      return;
+    }
+    // But short circuit buffer size is normally not set.  Put in place the hbase wanted size.
+    int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
+    conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
+  }
+
+  // Holder singleton idiom. JVM spec ensures this will be run at most once per Classloader, and
+  // not until we attempt to reference it.
+  private static class StreamCapabilities {
+    public static final boolean PRESENT;
+    public static final Class<?> CLASS;
+    public static final Method METHOD;
+    static {
+      boolean tmp = false;
+      Class<?> clazz = null;
+      Method method = null;
+      try {
+        clazz = Class.forName("org.apache.hadoop.fs.StreamCapabilities");
+        method = clazz.getMethod("hasCapability", String.class);
+        tmp = true;
+      } catch(ClassNotFoundException|NoSuchMethodException|SecurityException exception) {
+        LOG.warn("Your Hadoop installation does not include the StreamCapabilities class from " +
+                 "HDFS-11644, so we will skip checking if any FSDataOutputStreams actually " +
+                 "support hflush/hsync. If you are running on top of HDFS this probably just " +
+                 "means you have an older version and this can be ignored. If you are running on " +
+                 "top of an alternate FileSystem implementation you should manually verify that " +
+                 "hflush and hsync are implemented; otherwise you risk data loss and hard to " +
+                 "diagnose errors when our assumptions are violated.");
+        LOG.debug("The first request to check for StreamCapabilities came from this stacktrace.",
+            exception);
+      } finally {
+        PRESENT = tmp;
+        CLASS = clazz;
+        METHOD = method;
+      }
+    }
+  }
+
+  /**
+   * If our FileSystem version includes the StreamCapabilities class, check if
+   * the given stream has a particular capability.
+   * @param stream capabilities are per-stream instance, so check this one specifically. must not be
+   *        null
+   * @param capability what to look for, per Hadoop Common's FileSystem docs
+   * @return true if there are no StreamCapabilities. false if there are, but this stream doesn't
+   *         implement it. return result of asking the stream otherwise.
+   */
+  public static boolean hasCapability(FSDataOutputStream stream, String capability) {
+    // be consistent whether or not StreamCapabilities is present
+    if (stream == null) {
+      throw new NullPointerException("stream parameter must not be null.");
+    }
+    // If o.a.h.fs.StreamCapabilities doesn't exist, assume everyone does everything
+    // otherwise old versions of Hadoop will break.
+    boolean result = true;
+    if (StreamCapabilities.PRESENT) {
+      // if StreamCapabilities is present, but the stream doesn't implement it
+      // or we run into a problem invoking the method,
+      // we treat that as equivalent to not declaring anything
+      result = false;
+      if (StreamCapabilities.CLASS.isAssignableFrom(stream.getClass())) {
+        try {
+          result = ((Boolean)StreamCapabilities.METHOD.invoke(stream, capability)).booleanValue();
+        } catch (IllegalAccessException|IllegalArgumentException|InvocationTargetException
+            exception) {
+          LOG.warn("Your Hadoop installation's StreamCapabilities implementation doesn't match " +
+              "our understanding of how it's supposed to work. Please file a JIRA and include " +
+              "the following stack trace. In the mean time we're interpreting this behavior " +
+              "difference as a lack of capability support, which will probably cause a failure.",
+              exception);
+        }
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Helper exception for those cases where the place where we need to check a stream capability
+   * is not where we have the needed context to explain the impact and mitigation for a lack.
+   */
+  public static class StreamLacksCapabilityException extends Exception {
+    public StreamLacksCapabilityException(String message, Throwable cause) {
+      super(message, cause);
+    }
+    public StreamLacksCapabilityException(String message) {
+      super(message);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestCommonFSUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestCommonFSUtils.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestCommonFSUtils.java
new file mode 100644
index 0000000..7ff5792
--- /dev/null
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestCommonFSUtils.java
@@ -0,0 +1,164 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test {@link CommonFSUtils}.
+ */
+@Category({MiscTests.class, MediumTests.class})
+public class TestCommonFSUtils {
+  private static final Log LOG = LogFactory.getLog(TestCommonFSUtils.class);
+
+  private HBaseCommonTestingUtility htu;
+  private Configuration conf;
+
+  @Before
+  public void setUp() throws IOException {
+    htu = new HBaseCommonTestingUtility();
+    conf = htu.getConfiguration();
+  }
+
+  /**
+   * Test path compare and prefix checking.
+   */
+  @Test
+  public void testMatchingTail() throws IOException {
+    Path rootdir = htu.getDataTestDir();
+    final FileSystem fs = rootdir.getFileSystem(conf);
+    assertTrue(rootdir.depth() > 1);
+    Path partPath = new Path("a", "b");
+    Path fullPath = new Path(rootdir, partPath);
+    Path fullyQualifiedPath = fs.makeQualified(fullPath);
+    assertFalse(CommonFSUtils.isMatchingTail(fullPath, partPath));
+    assertFalse(CommonFSUtils.isMatchingTail(fullPath, partPath.toString()));
+    assertTrue(CommonFSUtils.isStartingWithPath(rootdir, fullPath.toString()));
+    assertTrue(CommonFSUtils.isStartingWithPath(fullyQualifiedPath, fullPath.toString()));
+    assertFalse(CommonFSUtils.isStartingWithPath(rootdir, partPath.toString()));
+    assertFalse(CommonFSUtils.isMatchingTail(fullyQualifiedPath, partPath));
+    assertTrue(CommonFSUtils.isMatchingTail(fullyQualifiedPath, fullPath));
+    assertTrue(CommonFSUtils.isMatchingTail(fullyQualifiedPath, fullPath.toString()));
+    assertTrue(CommonFSUtils.isMatchingTail(fullyQualifiedPath, fs.makeQualified(fullPath)));
+    assertTrue(CommonFSUtils.isStartingWithPath(rootdir, fullyQualifiedPath.toString()));
+    assertFalse(CommonFSUtils.isMatchingTail(fullPath, new Path("x")));
+    assertFalse(CommonFSUtils.isMatchingTail(new Path("x"), fullPath));
+  }
+
+  private void WriteDataToHDFS(FileSystem fs, Path file, int dataSize)
+    throws Exception {
+    FSDataOutputStream out = fs.create(file);
+    byte [] data = new byte[dataSize];
+    out.write(data, 0, dataSize);
+    out.close();
+  }
+
+  @Test
+  public void testSetWALRootDir() throws Exception {
+    Path p = new Path("file:///hbase/root");
+    CommonFSUtils.setWALRootDir(conf, p);
+    assertEquals(p.toString(), conf.get(CommonFSUtils.HBASE_WAL_DIR));
+  }
+
+  @Test
+  public void testGetWALRootDir() throws IOException {
+    Path root = new Path("file:///hbase/root");
+    Path walRoot = new Path("file:///hbase/logroot");
+    CommonFSUtils.setRootDir(conf, root);
+    assertEquals(CommonFSUtils.getRootDir(conf), root);
+    assertEquals(CommonFSUtils.getWALRootDir(conf), root);
+    CommonFSUtils.setWALRootDir(conf, walRoot);
+    assertEquals(CommonFSUtils.getWALRootDir(conf), walRoot);
+  }
+
+  @Test(expected=IllegalStateException.class)
+  public void testGetWALRootDirIllegalWALDir() throws IOException {
+    Path root = new Path("file:///hbase/root");
+    Path invalidWALDir = new Path("file:///hbase/root/logroot");
+    CommonFSUtils.setRootDir(conf, root);
+    CommonFSUtils.setWALRootDir(conf, invalidWALDir);
+    CommonFSUtils.getWALRootDir(conf);
+  }
+
+  @Test
+  public void testRemoveWALRootPath() throws Exception {
+    CommonFSUtils.setRootDir(conf, new Path("file:///user/hbase"));
+    Path testFile = new Path(CommonFSUtils.getRootDir(conf), "test/testfile");
+    Path tmpFile = new Path("file:///test/testfile");
+    assertEquals(CommonFSUtils.removeWALRootPath(testFile, conf), "test/testfile");
+    assertEquals(CommonFSUtils.removeWALRootPath(tmpFile, conf), tmpFile.toString());
+    CommonFSUtils.setWALRootDir(conf, new Path("file:///user/hbaseLogDir"));
+    assertEquals(CommonFSUtils.removeWALRootPath(testFile, conf), testFile.toString());
+    Path logFile = new Path(CommonFSUtils.getWALRootDir(conf), "test/testlog");
+    assertEquals(CommonFSUtils.removeWALRootPath(logFile, conf), "test/testlog");
+  }
+
+  @Test(expected=NullPointerException.class)
+  public void streamCapabilitiesDoesNotAllowNullStream() {
+    CommonFSUtils.hasCapability(null, "hopefully any string");
+  }
+
+  private static final boolean STREAM_CAPABILITIES_IS_PRESENT;
+  static {
+    boolean tmp = false;
+    try {
+      Class.forName("org.apache.hadoop.fs.StreamCapabilities");
+      tmp = true;
+      LOG.debug("Test thought StreamCapabilities class was present.");
+    } catch (ClassNotFoundException exception) {
+      LOG.debug("Test didn't think StreamCapabilities class was present.");
+    } finally {
+      STREAM_CAPABILITIES_IS_PRESENT = tmp;
+    }
+  }
+
+  @Test
+  public void checkStreamCapabilitiesOnKnownNoopStream() throws IOException {
+    FSDataOutputStream stream = new FSDataOutputStream(new ByteArrayOutputStream(), null);
+    assertNotEquals("We expect our dummy FSDOS to claim capabilities iff the StreamCapabilities " +
+        "class is not defined.", STREAM_CAPABILITIES_IS_PRESENT,
+        CommonFSUtils.hasCapability(stream, "hsync"));
+    assertNotEquals("We expect our dummy FSDOS to claim capabilities iff the StreamCapabilities " +
+        "class is not defined.", STREAM_CAPABILITIES_IS_PRESENT,
+        CommonFSUtils.hasCapability(stream, "hflush"));
+    assertNotEquals("We expect our dummy FSDOS to claim capabilities iff the StreamCapabilities " +
+        "class is not defined.", STREAM_CAPABILITIES_IS_PRESENT,
+        CommonFSUtils.hasCapability(stream, "a capability that hopefully no filesystem will " +
+            "implement."));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index 974bc13..f49833c 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -46,18 +46,20 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
 import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
 import org.apache.hadoop.hbase.procedure2.util.StringUtils;
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALHeader;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
 
-import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
 
 /**
  * WAL implementation of the ProcedureStore.
@@ -67,6 +69,9 @@ import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTe
 public class WALProcedureStore extends ProcedureStoreBase {
   private static final Log LOG = LogFactory.getLog(WALProcedureStore.class);
   public static final String LOG_PREFIX = "pv2-";
+  /** Used to construct the name of the log directory for master procedures */
+  public static final String MASTER_PROCEDURE_LOGDIR = "MasterProcWALs";
+
 
   public interface LeaseRecovery {
     void recoverFileLease(FileSystem fs, Path path) throws IOException;
@@ -185,18 +190,42 @@ public class WALProcedureStore extends ProcedureStoreBase {
     }
   }
 
-  public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path walDir,
-      final LeaseRecovery leaseRecovery) {
-    this(conf, fs, walDir, null, leaseRecovery);
+  public WALProcedureStore(final Configuration conf, final LeaseRecovery leaseRecovery)
+      throws IOException {
+    this(conf,
+        new Path(CommonFSUtils.getWALRootDir(conf), MASTER_PROCEDURE_LOGDIR),
+        new Path(CommonFSUtils.getRootDir(conf), HConstants.HREGION_OLDLOGDIR_NAME), leaseRecovery);
   }
 
-  public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path walDir,
-      final Path walArchiveDir, final LeaseRecovery leaseRecovery) {
-    this.fs = fs;
+  @VisibleForTesting
+  public WALProcedureStore(final Configuration conf, final Path walDir, final Path walArchiveDir,
+      final LeaseRecovery leaseRecovery) throws IOException {
     this.conf = conf;
+    this.leaseRecovery = leaseRecovery;
     this.walDir = walDir;
     this.walArchiveDir = walArchiveDir;
-    this.leaseRecovery = leaseRecovery;
+    this.fs = walDir.getFileSystem(conf);
+
+    // Create the log directory for the procedure store
+    if (!fs.exists(walDir)) {
+      if (!fs.mkdirs(walDir)) {
+        throw new IOException("Unable to mkdir " + walDir);
+      }
+    }
+    // Now that it exists, set the log policy
+    CommonFSUtils.setStoragePolicy(fs, conf, walDir, HConstants.WAL_STORAGE_POLICY,
+      HConstants.DEFAULT_WAL_STORAGE_POLICY);
+
+    // Create archive dir up front. Rename won't work w/o it up on HDFS.
+    if (this.walArchiveDir != null && !this.fs.exists(this.walArchiveDir)) {
+      if (this.fs.mkdirs(this.walArchiveDir)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Created Procedure Store WAL archive dir " + this.walArchiveDir);
+        }
+      } else {
+        LOG.warn("Failed create of " + this.walArchiveDir);
+      }
+    }
   }
 
   @Override
@@ -247,16 +276,6 @@ public class WALProcedureStore extends ProcedureStoreBase {
       }
     };
     syncThread.start();
-
-    // Create archive dir up front. Rename won't work w/o it up on HDFS.
-    if (this.walArchiveDir != null && !this.fs.exists(this.walArchiveDir)) {
-      if (this.fs.mkdirs(this.walArchiveDir)) {
-        if (LOG.isDebugEnabled()) LOG.debug("Created Procedure Store WAL archive dir " +
-            this.walArchiveDir);
-      } else {
-        LOG.warn("Failed create of " + this.walArchiveDir);
-      }
-    }
   }
 
   @Override
@@ -1005,6 +1024,17 @@ public class WALProcedureStore extends ProcedureStoreBase {
       LOG.warn("failed to create log file with id=" + logId, re);
       return false;
     }
+    // After we create the stream but before we attempt to use it at all
+    // ensure that we can provide the level of data safety we're configured
+    // to provide.
+    final String durability = useHsync ? "hsync" : "hflush";
+    if (!(CommonFSUtils.hasCapability(newStream, durability))) {
+      throw new IllegalStateException("The procedure WAL relies on the ability to " + durability +
+          " for proper operation during component failures, but the underlying filesystem does " +
+          "not support doing so. Please check the config value of '" + USE_HSYNC_CONF_KEY +
+          "' to set the desired level of robustness and ensure the config value of '" +
+          CommonFSUtils.HBASE_WAL_DIR + "' points to a FileSystem mount that can provide it.");
+    }
     try {
       ProcedureWALFormat.writeHeader(newStream, header);
       startPos = newStream.getPos();

http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
index 99d3c28..6e0c02e 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
@@ -51,14 +51,14 @@ public class ProcedureTestingUtility {
   private ProcedureTestingUtility() {
   }
 
-  public static ProcedureStore createStore(final Configuration conf, final FileSystem fs,
-      final Path baseDir) throws IOException {
-    return createWalStore(conf, fs, baseDir);
+  public static ProcedureStore createStore(final Configuration conf, final Path dir)
+      throws IOException {
+    return createWalStore(conf, dir);
   }
 
-  public static WALProcedureStore createWalStore(final Configuration conf, final FileSystem fs,
-      final Path walDir) throws IOException {
-    return new WALProcedureStore(conf, fs, walDir, new WALProcedureStore.LeaseRecovery() {
+  public static WALProcedureStore createWalStore(final Configuration conf, final Path dir)
+      throws IOException {
+    return new WALProcedureStore(conf, dir, null, new WALProcedureStore.LeaseRecovery() {
       @Override
       public void recoverFileLease(FileSystem fs, Path path) throws IOException {
         // no-op

http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java
index 1a4dd86..4c1611a 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java
@@ -60,7 +60,7 @@ public class TestChildProcedures {
 
     logDir = new Path(testDir, "proc-logs");
     procEnv = new TestProcEnv();
-    procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir);
+    procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir);
     procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
     procExecutor.testing = new ProcedureExecutor.Testing();
     procStore.start(PROCEDURE_EXECUTOR_SLOTS);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java
index ce9795f..bd310fd 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java
@@ -61,7 +61,7 @@ public class TestProcedureEvents {
     logDir = new Path(testDir, "proc-logs");
 
     procEnv = new TestProcEnv();
-    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
+    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
     procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
     procStore.start(1);
     procExecutor.start(1, true);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java
index 1a3f898..ed6d512 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java
@@ -63,7 +63,7 @@ public class TestProcedureExecution {
     assertTrue(testDir.depth() > 1);
 
     logDir = new Path(testDir, "proc-logs");
-    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
+    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
     procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, procStore);
     procStore.start(PROCEDURE_EXECUTOR_SLOTS);
     procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureMetrics.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureMetrics.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureMetrics.java
index 0a57efa..6246629 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureMetrics.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureMetrics.java
@@ -66,7 +66,7 @@ public class TestProcedureMetrics {
 
     logDir = new Path(testDir, "proc-logs");
     procEnv = new TestProcEnv();
-    procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir);
+    procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir);
     procExecutor = new ProcedureExecutor<TestProcEnv>(htu.getConfiguration(), procEnv, procStore);
     procExecutor.testing = new ProcedureExecutor.Testing();
     procStore.start(PROCEDURE_EXECUTOR_SLOTS);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java
index ec2e54e..12a8012 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java
@@ -67,7 +67,7 @@ public class TestProcedureNonce {
 
     logDir = new Path(testDir, "proc-logs");
     procEnv = new TestProcEnv();
-    procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir);
+    procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir);
     procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
     procExecutor.testing = new ProcedureExecutor.Testing();
     procStore.start(PROCEDURE_EXECUTOR_SLOTS);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
index b0f6cbc..06f8833 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java
@@ -69,7 +69,7 @@ public class TestProcedureRecovery {
 
     logDir = new Path(testDir, "proc-logs");
     procEnv = new TestProcEnv();
-    procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), fs, logDir);
+    procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir);
     procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
     procExecutor.testing = new ProcedureExecutor.Testing();
     procStore.start(PROCEDURE_EXECUTOR_SLOTS);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java
index 23ca6ba..12b2184 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java
@@ -68,7 +68,7 @@ public class TestProcedureReplayOrder {
 
     logDir = new Path(testDir, "proc-logs");
     procEnv = new TestProcedureEnv();
-    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
+    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
     procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore);
     procStore.start(NUM_THREADS);
     procExecutor.start(1, true);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStateMachineProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStateMachineProcedure.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStateMachineProcedure.java
index 8347dbf..cbe50f2 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStateMachineProcedure.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStateMachineProcedure.java
@@ -76,7 +76,7 @@ public class TestStateMachineProcedure {
     fs = testDir.getFileSystem(htu.getConfiguration());
 
     logDir = new Path(testDir, "proc-logs");
-    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
+    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
     procExecutor = new ProcedureExecutor(htu.getConfiguration(), new TestProcEnv(), procStore);
     procStore.start(PROCEDURE_EXECUTOR_SLOTS);
     procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java
index 4882168..017992c 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java
@@ -65,7 +65,7 @@ public class TestYieldProcedures {
     assertTrue(testDir.depth() > 1);
 
     logDir = new Path(testDir, "proc-logs");
-    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
+    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
     procRunnables = new TestScheduler();
     procExecutor = new ProcedureExecutor(htu.getConfiguration(), new TestProcEnv(),
         procStore, procRunnables);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALLoaderPerformanceEvaluation.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALLoaderPerformanceEvaluation.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALLoaderPerformanceEvaluation.java
index 5554a6c..503850d 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALLoaderPerformanceEvaluation.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALLoaderPerformanceEvaluation.java
@@ -126,7 +126,7 @@ public class ProcedureWALLoaderPerformanceEvaluation extends AbstractHBaseTool {
     Path logDir = new Path(testDir, "proc-logs");
     System.out.println("\n\nLogs directory : " + logDir.toString() + "\n\n");
     fs.delete(logDir, true);
-    store = ProcedureTestingUtility.createWalStore(conf, fs, logDir);
+    store = ProcedureTestingUtility.createWalStore(conf, logDir);
     store.start(1);
     store.recoverLease();
     store.load(new LoadCounter());

http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java
index 823972f..1a7fc80 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java
@@ -93,9 +93,9 @@ public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool {
     System.out.println("Logs directory : " + logDir.toString());
     fs.delete(logDir, true);
     if ("nosync".equals(syncType)) {
-      store = new NoSyncWalProcedureStore(conf, fs, logDir);
+      store = new NoSyncWalProcedureStore(conf, logDir);
     } else {
-      store = ProcedureTestingUtility.createWalStore(conf, fs, logDir);
+      store = ProcedureTestingUtility.createWalStore(conf, logDir);
     }
     store.start(numThreads);
     store.recoverLease();
@@ -244,9 +244,8 @@ public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool {
   }
 
   private class NoSyncWalProcedureStore extends WALProcedureStore {
-    public NoSyncWalProcedureStore(final Configuration conf, final FileSystem fs,
-        final Path logDir) {
-      super(conf, fs, logDir, new WALProcedureStore.LeaseRecovery() {
+    public NoSyncWalProcedureStore(final Configuration conf, final Path logDir) throws IOException {
+      super(conf, logDir, null, new WALProcedureStore.LeaseRecovery() {
         @Override
         public void recoverFileLease(FileSystem fs, Path path) throws IOException {
           // no-op

http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java
index 610688f..98ec114 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestStressWALProcedureStore.java
@@ -75,7 +75,7 @@ public class TestStressWALProcedureStore {
     assertTrue(testDir.depth() > 1);
 
     logDir = new Path(testDir, "proc-logs");
-    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
+    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
     procStore.start(PROCEDURE_STORE_SLOTS);
     procStore.recoverLease();
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
index 44c8e12..98b1b7c 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
@@ -86,7 +86,7 @@ public class TestWALProcedureStore {
 
     setupConfig(htu.getConfiguration());
     logDir = new Path(testDir, "proc-logs");
-    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), fs, logDir);
+    procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir);
     procStore.start(PROCEDURE_STORE_SLOTS);
     procStore.recoverLease();
     procStore.load(new LoadCounter());
@@ -729,7 +729,7 @@ public class TestWALProcedureStore {
     assertEquals(procs.length + 1, status.length);
 
     // simulate another active master removing the wals
-    procStore = new WALProcedureStore(htu.getConfiguration(), fs, logDir,
+    procStore = new WALProcedureStore(htu.getConfiguration(), logDir, null,
         new WALProcedureStore.LeaseRecovery() {
       private int count = 0;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java
index bb34af6..f7eb02b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/fs/HFileSystem.java
@@ -65,9 +65,6 @@ import edu.umd.cs.findbugs.annotations.Nullable;
 public class HFileSystem extends FilterFileSystem {
   public static final Log LOG = LogFactory.getLog(HFileSystem.class);
 
-  /** Parameter name for HBase WAL directory */
-  public static final String HBASE_WAL_DIR = "hbase.wal.dir";
-
   private final FileSystem noChecksumFs;   // read hfile data from storage
   private final boolean useHBaseChecksum;
   private static volatile byte unspecifiedStoragePolicyId = Byte.MIN_VALUE;

http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
index 04bf01f..1f5462f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 
@@ -56,7 +57,8 @@ public final class AsyncFSOutputHelper {
    */
   public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite,
       boolean createParent, short replication, long blockSize, EventLoop eventLoop,
-      Class<? extends Channel> channelClass) throws IOException {
+      Class<? extends Channel> channelClass)
+          throws IOException, CommonFSUtils.StreamLacksCapabilityException {
     if (fs instanceof DistributedFileSystem) {
       return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f,
         overwrite, createParent, replication, blockSize, eventLoop, channelClass);
@@ -69,6 +71,13 @@ public final class AsyncFSOutputHelper {
     } else {
       fsOut = fs.createNonRecursive(f, overwrite, bufferSize, replication, blockSize, null);
     }
+    // After we create the stream but before we attempt to use it at all
+    // ensure that we can provide the level of data safety we're configured
+    // to provide.
+    if (!(CommonFSUtils.hasCapability(fsOut, "hflush") &&
+        CommonFSUtils.hasCapability(fsOut, "hsync"))) {
+      throw new CommonFSUtils.StreamLacksCapabilityException("hflush and hsync");
+    }
     final ExecutorService flushExecutor =
         Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true)
             .setNameFormat("AsyncFSOutputFlusher-" + f.toString().replace("%", "%%")).build());

http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 3ba31da..7a778f2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -53,7 +53,6 @@ import java.util.regex.Pattern;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.ClusterStatus;
 import org.apache.hadoop.hbase.ClusterStatus.Option;
@@ -1201,23 +1200,8 @@ public class HMaster extends HRegionServer implements MasterServices {
   private void startProcedureExecutor() throws IOException {
     final MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
     final Path rootDir = FSUtils.getRootDir(conf);
-    final Path walDir = new Path(FSUtils.getWALRootDir(this.conf),
-        MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR);
-    final Path walArchiveDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
 
-    final FileSystem walFs = walDir.getFileSystem(conf);
-
-    // Create the log directory for the procedure store
-    if (!walFs.exists(walDir)) {
-      if (!walFs.mkdirs(walDir)) {
-        throw new IOException("Unable to mkdir " + walDir);
-      }
-    }
-    // Now that it exists, set the log policy
-    FSUtils.setStoragePolicy(walFs, conf, walDir, HConstants.WAL_STORAGE_POLICY,
-      HConstants.DEFAULT_WAL_STORAGE_POLICY);
-
-    procedureStore = new WALProcedureStore(conf, walDir.getFileSystem(conf), walDir, walArchiveDir,
+    procedureStore = new WALProcedureStore(conf,
         new MasterProcedureEnv.WALStoreLeaseRecovery(this));
     procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this));
     MasterProcedureScheduler procedureScheduler = procEnv.getProcedureScheduler();

http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
index 3b268cb..27987f6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
@@ -39,8 +39,8 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.fs.HFileSystem;
-import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
 import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSTableDescriptors;
@@ -144,10 +144,10 @@ public class MasterFileSystem {
     };
 
     final String[] protectedSubLogDirs = new String[] {
-            HConstants.HREGION_LOGDIR_NAME,
-            HConstants.HREGION_OLDLOGDIR_NAME,
-            HConstants.CORRUPT_DIR_NAME,
-            MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR
+      HConstants.HREGION_LOGDIR_NAME,
+      HConstants.HREGION_OLDLOGDIR_NAME,
+      HConstants.CORRUPT_DIR_NAME,
+      WALProcedureStore.MASTER_PROCEDURE_LOGDIR
     };
     // check if the root directory exists
     checkRootDir(this.rootdir, conf, this.fs);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java
index 16647d2..495fab6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java
@@ -24,9 +24,6 @@ import org.apache.yetus.audience.InterfaceAudience;
 public final class MasterProcedureConstants {
   private MasterProcedureConstants() {}
 
-  /** Used to construct the name of the log directory for master procedures */
-  public static final String MASTER_PROCEDURE_LOGDIR = "MasterProcWALs";
-
   /** Number of threads used by the procedure executor */
   public static final String MASTER_PROCEDURE_THREADS = "hbase.master.procedure.threads";
   public static final int DEFAULT_MIN_MASTER_PROCEDURE_THREADS = 16;

http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index 61c7100..ad54cab 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -61,9 +61,9 @@ import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CollectionUtils;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.DrainBarrier;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALEdit;
@@ -356,7 +356,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
     }
     // Now that it exists, set the storage policy for the entire directory of wal files related to
     // this FSHLog instance
-    FSUtils.setStoragePolicy(fs, conf, this.walDir, HConstants.WAL_STORAGE_POLICY,
+    CommonFSUtils.setStoragePolicy(fs, conf, this.walDir, HConstants.WAL_STORAGE_POLICY,
       HConstants.DEFAULT_WAL_STORAGE_POLICY);
     this.walFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8");
     this.prefixPathStr = new Path(walDir, walFilePrefix + WAL_FILE_NAME_DELIMITER).toString();
@@ -381,7 +381,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
     };
 
     if (failIfWALExists) {
-      final FileStatus[] walFiles = FSUtils.listStatus(fs, walDir, ourFiles);
+      final FileStatus[] walFiles = CommonFSUtils.listStatus(fs, walDir, ourFiles);
       if (null != walFiles && 0 != walFiles.length) {
         throw new IOException("Target WAL already exists within directory " + walDir);
       }
@@ -398,7 +398,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
     // Get size to roll log at. Roll at 95% of HDFS block size so we avoid crossing HDFS blocks
     // (it costs a little x'ing bocks)
     final long blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize",
-      FSUtils.getDefaultBlockSize(this.fs, this.walDir));
+      CommonFSUtils.getDefaultBlockSize(this.fs, this.walDir));
     this.logrollsize = (long) (blocksize
         * conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f));
 
@@ -652,7 +652,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
       }
     }
     LOG.info("Archiving " + p + " to " + newPath);
-    if (!FSUtils.renameAndSetModifyTime(this.fs, p, newPath)) {
+    if (!CommonFSUtils.renameAndSetModifyTime(this.fs, p, newPath)) {
       throw new IOException("Unable to rename " + p + " to " + newPath);
     }
     // Tell our listeners that a log has been archived.
@@ -685,12 +685,12 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
     try {
       long oldFileLen = doReplaceWriter(oldPath, newPath, nextWriter);
       int oldNumEntries = this.numEntries.getAndSet(0);
-      final String newPathString = (null == newPath ? null : FSUtils.getPath(newPath));
+      final String newPathString = (null == newPath ? null : CommonFSUtils.getPath(newPath));
       if (oldPath != null) {
         this.walFile2Props.put(oldPath,
           new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
         this.totalLogSize.addAndGet(oldFileLen);
-        LOG.info("Rolled WAL " + FSUtils.getPath(oldPath) + " with entries=" + oldNumEntries
+        LOG.info("Rolled WAL " + CommonFSUtils.getPath(oldPath) + " with entries=" + oldNumEntries
             + ", filesize=" + StringUtils.byteDesc(oldFileLen) + "; new WAL " + newPathString);
       } else {
         LOG.info("New WAL " + newPathString);
@@ -767,6 +767,11 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
           cleanOldLogs();
           regionsToFlush = findRegionsToForceFlush();
         }
+      } catch (CommonFSUtils.StreamLacksCapabilityException exception) {
+        // If the underlying FileSystem can't do what we ask, treat as IO failure so
+        // we'll abort.
+        throw new IOException("Underlying FileSystem can't meet stream requirements. See RS log " +
+            "for details.", exception);
       } finally {
         closeBarrier.endOp();
         assert scope == NullScope.INSTANCE || !scope.isDetached();
@@ -794,7 +799,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
    * @return may be null if there are no files.
    */
   protected FileStatus[] getFiles() throws IOException {
-    return FSUtils.listStatus(fs, walDir, ourFiles);
+    return CommonFSUtils.listStatus(fs, walDir, ourFiles);
   }
 
   @Override
@@ -833,7 +838,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
           }
         }
 
-        if (!FSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) {
+        if (!CommonFSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) {
           throw new IOException("Unable to rename " + file.getPath() + " to " + p);
         }
         // Tell our listeners that a log was archived.
@@ -843,7 +848,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
           }
         }
       }
-      LOG.debug("Moved " + files.length + " WAL file(s) to " + FSUtils.getPath(this.walArchiveDir));
+      LOG.debug("Moved " + files.length + " WAL file(s) to " +
+          CommonFSUtils.getPath(this.walArchiveDir));
     }
     LOG.info("Closed WAL: " + toString());
   }
@@ -1022,7 +1028,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
 
   protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException;
 
-  protected abstract W createWriterInstance(Path path) throws IOException;
+  protected abstract W createWriterInstance(Path path) throws IOException,
+      CommonFSUtils.StreamLacksCapabilityException;
 
   /**
    * @return old wal file size

http://git-wip-us.apache.org/repos/asf/hbase/blob/e79a007d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
index 3747f47..256ced6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
+import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
 import org.apache.hadoop.hbase.util.EncryptionTest;
 import org.apache.hadoop.hbase.util.FSUtils;
 
@@ -153,7 +154,7 @@ public abstract class AbstractProtobufLogWriter {
   }
 
   public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable)
-      throws IOException {
+      throws IOException, StreamLacksCapabilityException {
     this.conf = conf;
     boolean doCompress = initializeCompressionContext(conf, path);
     this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE);
@@ -237,7 +238,7 @@ public abstract class AbstractProtobufLogWriter {
   }
 
   protected abstract void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
-      short replication, long blockSize) throws IOException;
+      short replication, long blockSize) throws IOException, StreamLacksCapabilityException;
 
   /**
    * return the file length after written.