You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2020/03/26 09:05:42 UTC

[hbase] branch master updated: HBASE-24000 Simplify CommonFSUtils after upgrading to hadoop 2.10.0 (#1335)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new f3ee9b8  HBASE-24000 Simplify CommonFSUtils after upgrading to hadoop 2.10.0 (#1335)
f3ee9b8 is described below

commit f3ee9b8aa37dd30d34ff54cd39fb9b4b6d22e683
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Thu Mar 26 17:05:31 2020 +0800

    HBASE-24000 Simplify CommonFSUtils after upgrading to hadoop 2.10.0 (#1335)
    
    Signed-off-by: stack <st...@apache.org>
    Signed-off-by: Nick Dimiduk <nd...@apache.org>
---
 .../apache/hadoop/hbase/util/CommonFSUtils.java    | 217 ++++-----------------
 .../hadoop/hbase/util/TestCommonFSUtils.java       |  41 +---
 .../procedure2/store/wal/WALProcedureStore.java    |  10 +-
 .../store/wal/TestWALProcedureStore.java           |   2 +-
 .../hbase/io/asyncfs/AsyncFSOutputHelper.java      |  18 +-
 .../hbase/regionserver/wal/ProtobufLogWriter.java  |  10 +-
 .../hbase/regionserver/wal/TestHBaseWalOnEC.java   |   8 +-
 .../org/apache/hadoop/hbase/util/TestFSUtils.java  |  11 +-
 8 files changed, 68 insertions(+), 249 deletions(-)

diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java
index ea0cb2b..ea655a2 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/CommonFSUtils.java
@@ -27,11 +27,11 @@ import java.net.URISyntaxException;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
-import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
@@ -490,26 +490,19 @@ public abstract class CommonFSUtils {
     }
     String trimmedStoragePolicy = storagePolicy.trim();
     if (trimmedStoragePolicy.isEmpty()) {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("We were passed an empty storagePolicy, exiting early.");
-      }
+      LOG.trace("We were passed an empty storagePolicy, exiting early.");
       return;
     } else {
       trimmedStoragePolicy = trimmedStoragePolicy.toUpperCase(Locale.ROOT);
     }
     if (trimmedStoragePolicy.equals(HConstants.DEFER_TO_HDFS_STORAGE_POLICY)) {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("We were passed the defer-to-hdfs policy {}, exiting early.",
-          trimmedStoragePolicy);
-      }
+      LOG.trace("We were passed the defer-to-hdfs policy {}, exiting early.", trimmedStoragePolicy);
       return;
     }
     try {
       invokeSetStoragePolicy(fs, path, trimmedStoragePolicy);
     } catch (IOException e) {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Failed to invoke set storage policy API on FS", e);
-      }
+      LOG.trace("Failed to invoke set storage policy API on FS", e);
       if (throwException) {
         throw e;
       }
@@ -525,10 +518,7 @@ public abstract class CommonFSUtils {
 
     try {
       fs.setStoragePolicy(path, storagePolicy);
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Set storagePolicy={} for path={}", storagePolicy, path);
-      }
+      LOG.debug("Set storagePolicy={} for path={}", storagePolicy, path);
     } catch (Exception e) {
       toThrow = e;
       // This swallows FNFE, should we be throwing it? seems more likely to indicate dev
@@ -541,19 +531,9 @@ public abstract class CommonFSUtils {
         LOG.debug("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e);
       }
 
-      // check for lack of HDFS-7228
-      if (e instanceof RemoteException &&
-          HadoopIllegalArgumentException.class.getName().equals(
-            ((RemoteException)e).getClassName())) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Given storage policy, '" +storagePolicy +"', was rejected and probably " +
-            "isn't a valid policy for the version of Hadoop you're running. I.e. if you're " +
-            "trying to use SSD related policies then you're likely missing HDFS-7228. For " +
-            "more information see the 'ArchivalStorage' docs for your Hadoop release.");
-        }
       // Hadoop 2.8+, 3.0-a1+ added FileSystem.setStoragePolicy with a default implementation
       // that throws UnsupportedOperationException
-      } else if (e instanceof UnsupportedOperationException) {
+      if (e instanceof UnsupportedOperationException) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("The underlying FileSystem implementation doesn't support " +
               "setStoragePolicy. This is probably intentional on their part, since HDFS-9345 " +
@@ -759,200 +739,75 @@ public abstract class CommonFSUtils {
     conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
   }
 
-  private static class DfsBuilderUtility {
-    static Class<?> dfsClass = null;
-    static Method createMethod;
-    static Method overwriteMethod;
-    static Method bufferSizeMethod;
-    static Method blockSizeMethod;
-    static Method recursiveMethod;
-    static Method replicateMethod;
-    static Method replicationMethod;
-    static Method buildMethod;
-    static boolean allMethodsPresent = false;
+  private static final class DfsBuilderUtility {
+    private static final Class<?> BUILDER;
+    private static final Method REPLICATE;
 
     static {
-      String dfsName = "org.apache.hadoop.hdfs.DistributedFileSystem";
-      String builderName = dfsName + "$HdfsDataOutputStreamBuilder";
+      String builderName = "org.apache.hadoop.hdfs.DistributedFileSystem$HdfsDataOutputStreamBuilder";
       Class<?> builderClass = null;
-
-      try {
-        dfsClass = Class.forName(dfsName);
-      } catch (ClassNotFoundException e) {
-        LOG.debug("{} not available, will not use builder API for file creation.", dfsName);
-      }
       try {
         builderClass = Class.forName(builderName);
       } catch (ClassNotFoundException e) {
-        LOG.debug("{} not available, will not use builder API for file creation.", builderName);
+        LOG.debug("{} not available, will not set replicate when creating output stream", builderName);
       }
-
-      if (dfsClass != null && builderClass != null) {
+      Method replicateMethod = null;
+      if (builderClass != null) {
         try {
-          createMethod = dfsClass.getMethod("createFile", Path.class);
-          overwriteMethod = builderClass.getMethod("overwrite", boolean.class);
-          bufferSizeMethod = builderClass.getMethod("bufferSize", int.class);
-          blockSizeMethod = builderClass.getMethod("blockSize", long.class);
-          recursiveMethod = builderClass.getMethod("recursive");
           replicateMethod = builderClass.getMethod("replicate");
-          replicationMethod = builderClass.getMethod("replication", short.class);
-          buildMethod = builderClass.getMethod("build");
-
-          allMethodsPresent = true;
           LOG.debug("Using builder API via reflection for DFS file creation.");
         } catch (NoSuchMethodException e) {
-          LOG.debug("Could not find method on builder; will use old DFS API for file creation {}",
-              e.getMessage());
-        }
-      }
-    }
-
-    /**
-     * Attempt to use builder API via reflection to create a file with the given parameters and
-     * replication enabled.
-     */
-    static FSDataOutputStream createHelper(FileSystem fs, Path path, boolean overwritable,
-        int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException {
-      if (allMethodsPresent && dfsClass.isInstance(fs)) {
-        try {
-          Object builder;
-
-          builder = createMethod.invoke(fs, path);
-          builder = overwriteMethod.invoke(builder, overwritable);
-          builder = bufferSizeMethod.invoke(builder, bufferSize);
-          builder = blockSizeMethod.invoke(builder, blockSize);
-          if (isRecursive) {
-            builder = recursiveMethod.invoke(builder);
-          }
-          builder = replicateMethod.invoke(builder);
-          builder = replicationMethod.invoke(builder, replication);
-          return (FSDataOutputStream) buildMethod.invoke(builder);
-        } catch (IllegalAccessException | InvocationTargetException e) {
-          // Should have caught this failure during initialization, so log full trace here
-          LOG.warn("Couldn't use reflection with builder API", e);
+          LOG.debug("Could not find replicate method on builder; will not set replicate when" +
+            " creating output stream", e);
         }
       }
-
-      if (isRecursive) {
-        return fs.create(path, overwritable, bufferSize, replication, blockSize, null);
-      }
-      return fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize, null);
+      BUILDER = builderClass;
+      REPLICATE = replicateMethod;
     }
 
     /**
-     * Attempt to use builder API via reflection to create a file with the given parameters and
-     * replication enabled.
+     * Attempt to use builder API via reflection to call the replicate method on the given builder.
      */
-    static FSDataOutputStream createHelper(FileSystem fs, Path path, boolean overwritable)
-        throws IOException {
-      if (allMethodsPresent && dfsClass.isInstance(fs)) {
+    static void replicate(FSDataOutputStreamBuilder<?, ?> builder) {
+      if (BUILDER != null && REPLICATE != null && BUILDER.isAssignableFrom(builder.getClass())) {
         try {
-          Object builder;
-
-          builder = createMethod.invoke(fs, path);
-          builder = overwriteMethod.invoke(builder, overwritable);
-          builder = replicateMethod.invoke(builder);
-          return (FSDataOutputStream) buildMethod.invoke(builder);
+          REPLICATE.invoke(builder);
         } catch (IllegalAccessException | InvocationTargetException e) {
           // Should have caught this failure during initialization, so log full trace here
           LOG.warn("Couldn't use reflection with builder API", e);
         }
       }
-
-      return fs.create(path, overwritable);
     }
   }
 
   /**
    * Attempt to use builder API via reflection to create a file with the given parameters and
    * replication enabled.
-   * <p>
+   * <p/>
    * Will not attempt to enable replication when passed an HFileSystem.
    */
-  public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwritable)
-      throws IOException {
-    return DfsBuilderUtility.createHelper(fs, path, overwritable);
+  public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwrite)
+    throws IOException {
+    FSDataOutputStreamBuilder<?, ?> builder = fs.createFile(path).overwrite(overwrite);
+    DfsBuilderUtility.replicate(builder);
+    return builder.build();
   }
 
   /**
    * Attempt to use builder API via reflection to create a file with the given parameters and
    * replication enabled.
-   * <p>
+   * <p/>
    * Will not attempt to enable replication when passed an HFileSystem.
    */
-  public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwritable,
-      int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException {
-    return DfsBuilderUtility.createHelper(fs, path, overwritable, bufferSize, replication,
-        blockSize, isRecursive);
-  }
-
-  // Holder singleton idiom. JVM spec ensures this will be run at most once per Classloader, and
-  // not until we attempt to reference it.
-  private static class StreamCapabilities {
-    public static final boolean PRESENT;
-    public static final Class<?> CLASS;
-    public static final Method METHOD;
-    static {
-      boolean tmp = false;
-      Class<?> clazz = null;
-      Method method = null;
-      try {
-        clazz = Class.forName("org.apache.hadoop.fs.StreamCapabilities");
-        method = clazz.getMethod("hasCapability", String.class);
-        tmp = true;
-      } catch(ClassNotFoundException|NoSuchMethodException|SecurityException exception) {
-        LOG.warn("Your Hadoop installation does not include the StreamCapabilities class from " +
-                 "HDFS-11644, so we will skip checking if any FSDataOutputStreams actually " +
-                 "support hflush/hsync. If you are running on top of HDFS this probably just " +
-                 "means you have an older version and this can be ignored. If you are running on " +
-                 "top of an alternate FileSystem implementation you should manually verify that " +
-                 "hflush and hsync are implemented; otherwise you risk data loss and hard to " +
-                 "diagnose errors when our assumptions are violated.");
-        LOG.debug("The first request to check for StreamCapabilities came from this stacktrace.",
-            exception);
-      } finally {
-        PRESENT = tmp;
-        CLASS = clazz;
-        METHOD = method;
-      }
+  public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwrite,
+    int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException {
+    FSDataOutputStreamBuilder<?, ?> builder = fs.createFile(path).overwrite(overwrite)
+      .bufferSize(bufferSize).replication(replication).blockSize(blockSize);
+    if (isRecursive) {
+      builder.recursive();
     }
-  }
-
-  /**
-   * If our FileSystem version includes the StreamCapabilities class, check if the given stream has
-   * a particular capability.
-   * @param stream capabilities are per-stream instance, so check this one specifically. must not be
-   *          null
-   * @param capability what to look for, per Hadoop Common's FileSystem docs
-   * @return true if there are no StreamCapabilities. false if there are, but this stream doesn't
-   *         implement it. return result of asking the stream otherwise.
-   * @throws NullPointerException if {@code stream} is {@code null}
-   */
-  public static boolean hasCapability(FSDataOutputStream stream, String capability) {
-    // be consistent whether or not StreamCapabilities is present
-    Objects.requireNonNull(stream, "stream cannot be null");
-    // If o.a.h.fs.StreamCapabilities doesn't exist, assume everyone does everything
-    // otherwise old versions of Hadoop will break.
-    boolean result = true;
-    if (StreamCapabilities.PRESENT) {
-      // if StreamCapabilities is present, but the stream doesn't implement it
-      // or we run into a problem invoking the method,
-      // we treat that as equivalent to not declaring anything
-      result = false;
-      if (StreamCapabilities.CLASS.isAssignableFrom(stream.getClass())) {
-        try {
-          result = ((Boolean)StreamCapabilities.METHOD.invoke(stream, capability)).booleanValue();
-        } catch (IllegalAccessException|IllegalArgumentException|InvocationTargetException
-            exception) {
-          LOG.warn("Your Hadoop installation's StreamCapabilities implementation doesn't match " +
-              "our understanding of how it's supposed to work. Please file a JIRA and include " +
-              "the following stack trace. In the mean time we're interpreting this behavior " +
-              "difference as a lack of capability support, which will probably cause a failure.",
-              exception);
-        }
-      }
-    }
-    return result;
+    DfsBuilderUtility.replicate(builder);
+    return builder.build();
   }
 
   /**
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestCommonFSUtils.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestCommonFSUtils.java
index e2dbd41..e6a427e 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestCommonFSUtils.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestCommonFSUtils.java
@@ -19,9 +19,8 @@ package org.apache.hadoop.hbase.util;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
-import java.io.ByteArrayOutputStream;
+
 import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -35,8 +34,6 @@ import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Test {@link CommonFSUtils}.
@@ -48,8 +45,6 @@ public class TestCommonFSUtils {
   public static final HBaseClassTestRule CLASS_RULE =
       HBaseClassTestRule.forClass(TestCommonFSUtils.class);
 
-  private static final Logger LOG = LoggerFactory.getLogger(TestCommonFSUtils.class);
-
   private HBaseCommonTestingUtility htu;
   private Configuration conf;
 
@@ -131,38 +126,4 @@ public class TestCommonFSUtils {
     Path logFile = new Path(CommonFSUtils.getWALRootDir(conf), "test/testlog");
     assertEquals("test/testlog", CommonFSUtils.removeWALRootPath(logFile, conf));
   }
-
-  @Test(expected=NullPointerException.class)
-  public void streamCapabilitiesDoesNotAllowNullStream() {
-    CommonFSUtils.hasCapability(null, "hopefully any string");
-  }
-
-  private static final boolean STREAM_CAPABILITIES_IS_PRESENT;
-  static {
-    boolean tmp = false;
-    try {
-      Class.forName("org.apache.hadoop.fs.StreamCapabilities");
-      tmp = true;
-      LOG.debug("Test thought StreamCapabilities class was present.");
-    } catch (ClassNotFoundException exception) {
-      LOG.debug("Test didn't think StreamCapabilities class was present.");
-    } finally {
-      STREAM_CAPABILITIES_IS_PRESENT = tmp;
-    }
-  }
-
-  @Test
-  public void checkStreamCapabilitiesOnKnownNoopStream() throws IOException {
-    FSDataOutputStream stream = new FSDataOutputStream(new ByteArrayOutputStream(), null);
-    assertNotEquals("We expect our dummy FSDOS to claim capabilities iff the StreamCapabilities " +
-        "class is not defined.", STREAM_CAPABILITIES_IS_PRESENT,
-        CommonFSUtils.hasCapability(stream, "hsync"));
-    assertNotEquals("We expect our dummy FSDOS to claim capabilities iff the StreamCapabilities " +
-        "class is not defined.", STREAM_CAPABILITIES_IS_PRESENT,
-        CommonFSUtils.hasCapability(stream, "hflush"));
-    assertNotEquals("We expect our dummy FSDOS to claim capabilities iff the StreamCapabilities " +
-        "class is not defined.", STREAM_CAPABILITIES_IS_PRESENT,
-        CommonFSUtils.hasCapability(stream, "a capability that hopefully no filesystem will " +
-            "implement."));
-  }
 }
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index b99ed35..b0301c6 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -35,11 +35,13 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSError;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.log.HBaseMarkers;
@@ -1084,8 +1086,8 @@ public class WALProcedureStore extends ProcedureStoreBase {
     // After we create the stream but before we attempt to use it at all
     // ensure that we can provide the level of data safety we're configured
     // to provide.
-    final String durability = useHsync ? "hsync" : "hflush";
-    if (enforceStreamCapability && !(CommonFSUtils.hasCapability(newStream, durability))) {
+    final String durability = useHsync ? StreamCapabilities.HSYNC : StreamCapabilities.HFLUSH;
+    if (enforceStreamCapability && !newStream.hasCapability(durability)) {
       throw new IllegalStateException("The procedure WAL relies on the ability to " + durability +
           " for proper operation during component failures, but the underlying filesystem does " +
           "not support doing so. Please check the config value of '" + USE_HSYNC_CONF_KEY +
@@ -1151,12 +1153,12 @@ public class WALProcedureStore extends ProcedureStoreBase {
           log.addToSize(trailerSize);
         }
       }
-    } catch (IOException e) {
+    } catch (IOException | FSError e) {
       LOG.warn("Unable to write the trailer", e);
     }
     try {
       stream.close();
-    } catch (IOException e) {
+    } catch (IOException | FSError e) {
       LOG.error("Unable to close the stream", e);
     }
     stream = null;
diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
index c34a210..c8335ee 100644
--- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
+++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java
@@ -655,7 +655,7 @@ public class TestWALProcedureStore {
   }
 
   @Test
-  public void testLogFileAleadExists() throws IOException {
+  public void testLogFileAlreadyExists() throws IOException {
     final boolean[] tested = {false};
     WALProcedureStore mStore = Mockito.spy(procStore);
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
index d1645f8..452da1c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
@@ -18,15 +18,17 @@
 package org.apache.hadoop.hbase.io.asyncfs;
 
 import java.io.IOException;
-
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.yetus.audience.InterfaceAudience;
 
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
 import org.apache.hbase.thirdparty.io.netty.channel.Channel;
 import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
 
@@ -63,11 +65,15 @@ public final class AsyncFSOutputHelper {
     // After we create the stream but before we attempt to use it at all
     // ensure that we can provide the level of data safety we're configured
     // to provide.
-    if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true) &&
-      !(CommonFSUtils.hasCapability(out, "hflush") &&
-        CommonFSUtils.hasCapability(out, "hsync"))) {
-      out.close();
-      throw new CommonFSUtils.StreamLacksCapabilityException("hflush and hsync");
+    if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true)) {
+      if (!out.hasCapability(StreamCapabilities.HFLUSH)) {
+        Closeables.close(out, true);
+        throw new StreamLacksCapabilityException(StreamCapabilities.HFLUSH);
+      }
+      if (!out.hasCapability(StreamCapabilities.HSYNC)) {
+        Closeables.close(out, true);
+        throw new StreamLacksCapabilityException(StreamCapabilities.HSYNC);
+      }
     }
     return new WrapperAsyncFSOutput(f, out);
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
index 5c8e0d2..ff08da8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
@@ -22,6 +22,7 @@ import java.io.OutputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
@@ -90,18 +91,17 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter
     return this.output;
   }
 
-  @SuppressWarnings("deprecation")
   @Override
   protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
       short replication, long blockSize) throws IOException, StreamLacksCapabilityException {
     this.output = CommonFSUtils.createForWal(fs, path, overwritable, bufferSize, replication,
         blockSize, false);
     if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true)) {
-      if (!CommonFSUtils.hasCapability(output, "hflush")) {
-        throw new StreamLacksCapabilityException("hflush");
+      if (!output.hasCapability(StreamCapabilities.HFLUSH)) {
+        throw new StreamLacksCapabilityException(StreamCapabilities.HFLUSH);
       }
-      if (!CommonFSUtils.hasCapability(output, "hsync")) {
-        throw new StreamLacksCapabilityException("hsync");
+      if (!output.hasCapability(StreamCapabilities.HSYNC)) {
+        throw new StreamLacksCapabilityException(StreamCapabilities.HSYNC);
       }
     }
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseWalOnEC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseWalOnEC.java
index a7f1624..96c5729 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseWalOnEC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHBaseWalOnEC.java
@@ -24,9 +24,9 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.lang.reflect.Method;
-
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.TableName;
@@ -55,8 +55,6 @@ public class TestHBaseWalOnEC {
 
   private static final HBaseTestingUtility util = new HBaseTestingUtility();
 
-  private static final String HFLUSH = "hflush";
-
   @BeforeClass
   public static void setup() throws Exception {
     try {
@@ -75,7 +73,7 @@ public class TestHBaseWalOnEC {
       try (FSDataOutputStream out = fs.create(new Path("/canary"))) {
         // If this comes back as having hflush then some test setup assumption is wrong.
         // Fail the test so that a developer has to look and triage
-        assertFalse("Did not enable EC!", CommonFSUtils.hasCapability(out, HFLUSH));
+        assertFalse("Did not enable EC!", out.hasCapability(StreamCapabilities.HFLUSH));
       }
     } catch (NoSuchMethodException e) {
       // We're not testing anything interesting if EC is not available, so skip the rest of the test
@@ -95,7 +93,7 @@ public class TestHBaseWalOnEC {
   public void testStreamCreate() throws IOException {
     try (FSDataOutputStream out = CommonFSUtils.createForWal(util.getDFSCluster().getFileSystem(),
         new Path("/testStreamCreate"), true)) {
-      assertTrue(CommonFSUtils.hasCapability(out, HFLUSH));
+      assertTrue(out.hasCapability(StreamCapabilities.HFLUSH));
     }
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
index 1296c47..d425557 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -661,15 +662,11 @@ public class TestFSUtils {
     MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
     try (FileSystem filesystem = cluster.getFileSystem()) {
       FSDataOutputStream stream = filesystem.create(new Path("/tmp/foobar"));
-      assertTrue(FSUtils.hasCapability(stream, "hsync"));
-      assertTrue(FSUtils.hasCapability(stream, "hflush"));
-      assertNotEquals("We expect HdfsDataOutputStream to say it has a dummy capability iff the " +
-          "StreamCapabilities class is not defined.",
-          STREAM_CAPABILITIES_IS_PRESENT,
-          FSUtils.hasCapability(stream, "a capability that hopefully HDFS doesn't add."));
+      assertTrue(stream.hasCapability(StreamCapabilities.HSYNC));
+      assertTrue(stream.hasCapability(StreamCapabilities.HFLUSH));
+      assertFalse(stream.hasCapability("a capability that hopefully HDFS doesn't add."));
     } finally {
       cluster.shutdown();
     }
   }
-
 }