You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/12/15 01:50:37 UTC

hbase git commit: HBASE-19513 Fix the wrapped AsyncFSOutput implementation

Repository: hbase
Updated Branches:
  refs/heads/master 6ab8ce982 -> 661491b56


HBASE-19513 Fix the wrapped AsyncFSOutput implementation


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/661491b5
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/661491b5
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/661491b5

Branch: refs/heads/master
Commit: 661491b56b250a8faa0f375f83815c0aa69bac24
Parents: 6ab8ce9
Author: zhangduo <zh...@apache.org>
Authored: Thu Dec 14 21:04:10 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Fri Dec 15 09:39:36 2017 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/io/asyncfs/AsyncFSOutput.java  |   5 +
 .../hbase/io/asyncfs/AsyncFSOutputHelper.java   | 116 +----------------
 .../asyncfs/FanOutOneBlockAsyncDFSOutput.java   |   8 +-
 .../hbase/io/asyncfs/WrapperAsyncFSOutput.java  | 127 +++++++++++++++++++
 .../hbase/regionserver/wal/AsyncFSWAL.java      |   2 +-
 .../TestFanOutOneBlockAsyncDFSOutput.java       |  18 +--
 .../hbase/io/asyncfs/TestLocalAsyncOutput.java  |  27 +---
 .../TestSaslFanOutOneBlockAsyncDFSOutput.java   |   2 +-
 .../io/asyncfs/TestSendBufSizePredictor.java    |   5 +-
 9 files changed, 163 insertions(+), 147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/661491b5/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java
index bfe66de..3c520b8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java
@@ -62,6 +62,11 @@ public interface AsyncFSOutput extends Closeable {
   int buffered();
 
   /**
+   * Whether the stream is broken.
+   */
+  boolean isBroken();
+
+  /**
    * Return current pipeline. Empty array if no pipeline.
    */
   DatanodeInfo[] getPipeline();

http://git-wip-us.apache.org/repos/asf/hbase/blob/661491b5/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
index 583759b..43ddfb0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
@@ -18,26 +18,15 @@
 package org.apache.hadoop.hbase.io.asyncfs;
 
 import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
-import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.yetus.audience.InterfaceAudience;
 
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
-import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
 import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
 
@@ -62,114 +51,23 @@ public final class AsyncFSOutputHelper {
       return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f,
         overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass);
     }
-    final FSDataOutputStream fsOut;
+    final FSDataOutputStream out;
     int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
       CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
     if (createParent) {
-      fsOut = fs.create(f, overwrite, bufferSize, replication, blockSize, null);
+      out = fs.create(f, overwrite, bufferSize, replication, blockSize, null);
     } else {
-      fsOut = fs.createNonRecursive(f, overwrite, bufferSize, replication, blockSize, null);
+      out = fs.createNonRecursive(f, overwrite, bufferSize, replication, blockSize, null);
     }
     // After we create the stream but before we attempt to use it at all
     // ensure that we can provide the level of data safety we're configured
     // to provide.
     if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true) &&
-        !(CommonFSUtils.hasCapability(fsOut, "hflush") &&
-          CommonFSUtils.hasCapability(fsOut, "hsync"))) {
+      !(CommonFSUtils.hasCapability(out, "hflush") &&
+        CommonFSUtils.hasCapability(out, "hsync"))) {
+      out.close();
       throw new CommonFSUtils.StreamLacksCapabilityException("hflush and hsync");
     }
-    final ExecutorService flushExecutor =
-      Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true)
-          .setNameFormat("AsyncFSOutputFlusher-" + f.toString().replace("%", "%%")).build());
-    return new AsyncFSOutput() {
-
-      private final ByteArrayOutputStream out = new ByteArrayOutputStream();
-
-      @Override
-      public void write(byte[] b, int off, int len) {
-        out.write(b, off, len);
-      }
-
-      @Override
-      public void write(byte[] b) {
-        write(b, 0, b.length);
-      }
-
-      @Override
-      public void writeInt(int i) {
-        out.writeInt(i);
-      }
-
-      @Override
-      public void write(ByteBuffer bb) {
-        out.write(bb, bb.position(), bb.remaining());
-      }
-
-      @Override
-      public void recoverAndClose(CancelableProgressable reporter) throws IOException {
-        fsOut.close();
-      }
-
-      @Override
-      public DatanodeInfo[] getPipeline() {
-        return new DatanodeInfo[0];
-      }
-
-      private void flush0(CompletableFuture<Long> future, boolean sync) {
-        try {
-          synchronized (out) {
-            fsOut.write(out.getBuffer(), 0, out.size());
-            out.reset();
-          }
-        } catch (IOException e) {
-          eventLoopGroup.next().execute(() -> future.completeExceptionally(e));
-          return;
-        }
-        try {
-          if (sync) {
-            fsOut.hsync();
-          } else {
-            fsOut.hflush();
-          }
-          long pos = fsOut.getPos();
-          eventLoopGroup.next().execute(() -> future.complete(pos));
-        } catch (IOException e) {
-          eventLoopGroup.next().execute(() -> future.completeExceptionally(e));
-        }
-      }
-
-      @Override
-      public CompletableFuture<Long> flush(boolean sync) {
-        CompletableFuture<Long> future = new CompletableFuture<>();
-        flushExecutor.execute(() -> flush0(future, sync));
-        return future;
-      }
-
-      @Override
-      public void close() throws IOException {
-        try {
-          flushExecutor.submit(() -> {
-            synchronized (out) {
-              fsOut.write(out.getBuffer(), 0, out.size());
-              out.reset();
-            }
-            return null;
-          }).get();
-        } catch (InterruptedException e) {
-          throw new InterruptedIOException();
-        } catch (ExecutionException e) {
-          Throwables.propagateIfPossible(e.getCause(), IOException.class);
-          throw new IOException(e.getCause());
-        } finally {
-          flushExecutor.shutdown();
-        }
-        fsOut.close();
-      }
-
-      @Override
-      public int buffered() {
-        return out.size();
-      }
-    };
+    return new WrapperAsyncFSOutput(f, out);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/661491b5/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
index b874aa7..6d376f3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
@@ -380,8 +380,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
 
   @Override
   public DatanodeInfo[] getPipeline() {
-    State state = this.state;
-    return state == State.STREAMING || state == State.CLOSING ? locations : new DatanodeInfo[0];
+    return locations;
   }
 
   private void flushBuffer(CompletableFuture<Long> future, ByteBuf dataBuf,
@@ -569,4 +568,9 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
     block.setNumBytes(ackedBlockLength);
     completeFile(client, namenode, src, clientName, block, fileId);
   }
+
+  @Override
+  public boolean isBroken() {
+    return state == State.BROKEN;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/661491b5/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java
new file mode 100644
index 0000000..219e865
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/WrapperAsyncFSOutput.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.asyncfs;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
+import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * An {@link AsyncFSOutput} wraps a {@link FSDataOutputStream}.
+ */
+@InterfaceAudience.Private
+public class WrapperAsyncFSOutput implements AsyncFSOutput {
+
+  private final FSDataOutputStream out;
+
+  private ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+
+  private final ExecutorService executor;
+
+  public WrapperAsyncFSOutput(Path file, FSDataOutputStream out) {
+    this.out = out;
+    this.executor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true)
+        .setNameFormat("AsyncFSOutputFlusher-" + file.toString().replace("%", "%%")).build());
+  }
+
+  @Override
+  public void write(byte[] b) {
+    write(b, 0, b.length);
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) {
+    buffer.write(b, off, len);
+  }
+
+  @Override
+  public void writeInt(int i) {
+    buffer.writeInt(i);
+  }
+
+  @Override
+  public void write(ByteBuffer bb) {
+    buffer.write(bb, bb.position(), bb.remaining());
+  }
+
+  @Override
+  public int buffered() {
+    return buffer.size();
+  }
+
+  @Override
+  public DatanodeInfo[] getPipeline() {
+    return new DatanodeInfo[0];
+  }
+
+  private void flush0(CompletableFuture<Long> future, ByteArrayOutputStream buffer, boolean sync) {
+    try {
+      if (buffer.size() > 0) {
+        out.write(buffer.getBuffer(), 0, buffer.size());
+        if (sync) {
+          out.hsync();
+        } else {
+          out.hflush();
+        }
+      }
+      future.complete(out.getPos());
+    } catch (IOException e) {
+      future.completeExceptionally(e);
+      return;
+    }
+  }
+
+  @Override
+  public CompletableFuture<Long> flush(boolean sync) {
+    CompletableFuture<Long> future = new CompletableFuture<>();
+    ByteArrayOutputStream buffer = this.buffer;
+    this.buffer = new ByteArrayOutputStream();
+    executor.execute(() -> flush0(future, buffer, sync));
+    return future;
+  }
+
+  @Override
+  public void recoverAndClose(CancelableProgressable reporter) throws IOException {
+    executor.shutdown();
+    out.close();
+  }
+
+  @Override
+  public void close() throws IOException {
+    Preconditions.checkState(buffer.size() == 0, "should call flush first before calling close");
+    executor.shutdown();
+    out.close();
+  }
+
+  @Override
+  public boolean isBroken() {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/661491b5/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index 4ccfdf3..08b2765 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -751,6 +751,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
     // not like FSHLog, AsyncFSOutput will fail immediately if there are errors writing to DNs, so
     // typically there is no 'low replication' state, only a 'broken' state.
     AsyncFSOutput output = this.fsOut;
-    return output != null && output.getPipeline().length == 0;
+    return output != null && output.isBroken();
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/661491b5/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
index 48c1cbf..6c19037 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
@@ -39,6 +39,7 @@ import java.util.concurrent.ThreadLocalRandom;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -102,8 +103,8 @@ public class TestFanOutOneBlockAsyncDFSOutput {
     for (;;) {
       try {
         FanOutOneBlockAsyncDFSOutput out =
-          FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, new Path("/ensureDatanodeAlive"),
-            true, true, (short) 3, FS.getDefaultBlockSize(), EVENT_LOOP_GROUP, CHANNEL_CLASS);
+            FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, new Path("/ensureDatanodeAlive"),
+              true, true, (short) 3, FS.getDefaultBlockSize(), EVENT_LOOP_GROUP, CHANNEL_CLASS);
         out.close();
         break;
       } catch (IOException e) {
@@ -112,8 +113,7 @@ public class TestFanOutOneBlockAsyncDFSOutput {
     }
   }
 
-  static void writeAndVerify(EventLoop eventLoop, DistributedFileSystem dfs, Path f,
-      final FanOutOneBlockAsyncDFSOutput out)
+  static void writeAndVerify(FileSystem fs, Path f, AsyncFSOutput out)
       throws IOException, InterruptedException, ExecutionException {
     List<CompletableFuture<Long>> futures = new ArrayList<>();
     byte[] b = new byte[10];
@@ -130,10 +130,10 @@ public class TestFanOutOneBlockAsyncDFSOutput {
       assertEquals((i + 1) * b.length, futures.get(2 * i + 1).join().longValue());
     }
     out.close();
-    assertEquals(b.length * 10, dfs.getFileStatus(f).getLen());
+    assertEquals(b.length * 10, fs.getFileStatus(f).getLen());
     byte[] actual = new byte[b.length];
     rand.setSeed(12345);
-    try (FSDataInputStream in = dfs.open(f)) {
+    try (FSDataInputStream in = fs.open(f)) {
       for (int i = 0; i < 10; i++) {
         in.readFully(actual);
         rand.nextBytes(b);
@@ -149,7 +149,7 @@ public class TestFanOutOneBlockAsyncDFSOutput {
     EventLoop eventLoop = EVENT_LOOP_GROUP.next();
     FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
       false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
-    writeAndVerify(eventLoop, FS, f, out);
+    writeAndVerify(FS, f, out);
   }
 
   @Test
@@ -193,7 +193,7 @@ public class TestFanOutOneBlockAsyncDFSOutput {
       false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
     Thread.sleep(READ_TIMEOUT_MS * 2);
     // the connection to datanode should still alive.
-    writeAndVerify(eventLoop, FS, f, out);
+    writeAndVerify(FS, f, out);
   }
 
   /**
@@ -220,7 +220,7 @@ public class TestFanOutOneBlockAsyncDFSOutput {
     Field xceiverServerDaemonField = DataNode.class.getDeclaredField("dataXceiverServer");
     xceiverServerDaemonField.setAccessible(true);
     Class<?> xceiverServerClass =
-      Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer");
+        Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer");
     Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers");
     numPeersMethod.setAccessible(true);
     // make one datanode broken

http://git-wip-us.apache.org/repos/asf/hbase/blob/661491b5/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
index 0453f1c..026c5c1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
@@ -17,19 +17,9 @@
  */
 package org.apache.hadoop.hbase.io.asyncfs;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioSocketChannel;
-
 import java.io.IOException;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ThreadLocalRandom;
 
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
@@ -40,6 +30,11 @@ import org.junit.AfterClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioSocketChannel;
+
 @Category({ MiscTests.class, SmallTests.class })
 public class TestLocalAsyncOutput {
 
@@ -62,16 +57,6 @@ public class TestLocalAsyncOutput {
     FileSystem fs = FileSystem.getLocal(TEST_UTIL.getConfiguration());
     AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, false, true,
       fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP, CHANNEL_CLASS);
-    byte[] b = new byte[10];
-    ThreadLocalRandom.current().nextBytes(b);
-    out.write(b);
-    assertEquals(b.length, out.flush(true).get().longValue());
-    out.close();
-    assertEquals(b.length, fs.getFileStatus(f).getLen());
-    byte[] actual = new byte[b.length];
-    try (FSDataInputStream in = fs.open(f)) {
-      in.readFully(actual);
-    }
-    assertArrayEquals(b, actual);
+    TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(fs, f, out);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/661491b5/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
index 0e2b6d4..217ba60 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
@@ -241,7 +241,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
     EventLoop eventLoop = EVENT_LOOP_GROUP.next();
     FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file,
       true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
-    TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(eventLoop, FS, file, out);
+    TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(FS, file, out);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hbase/blob/661491b5/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSendBufSizePredictor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSendBufSizePredictor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSendBufSizePredictor.java
index a5f536f..6b5b837 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSendBufSizePredictor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSendBufSizePredictor.java
@@ -17,16 +17,13 @@
  */
 package org.apache.hadoop.hbase.io.asyncfs;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
 
 import org.apache.hadoop.hbase.testclassification.MiscTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-/**
- *
- */
 @Category({ MiscTests.class, SmallTests.class })
 public class TestSendBufSizePredictor {