You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/11/30 07:22:44 UTC

hbase git commit: HBASE-19346 Use EventLoopGroup to create AsyncFSOutput

Repository: hbase
Updated Branches:
  refs/heads/master 91e75b2a2 -> 9434d52c1


HBASE-19346 Use EventLoopGroup to create AsyncFSOutput


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9434d52c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9434d52c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9434d52c

Branch: refs/heads/master
Commit: 9434d52c190386f15188e0473ce005e96bf78413
Parents: 91e75b2
Author: zhangduo <zh...@apache.org>
Authored: Tue Nov 28 17:56:13 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Thu Nov 30 15:22:23 2017 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/io/asyncfs/AsyncFSOutput.java  |   2 +
 .../hbase/io/asyncfs/AsyncFSOutputHelper.java   |  59 ++-
 .../asyncfs/FanOutOneBlockAsyncDFSOutput.java   | 419 +++++++++----------
 .../FanOutOneBlockAsyncDFSOutputHelper.java     |  80 ++--
 .../hbase/io/asyncfs/SendBufSizePredictor.java  |  57 +++
 .../hadoop/hbase/wal/AsyncFSWALProvider.java    |  23 +-
 .../TestFanOutOneBlockAsyncDFSOutput.java       |  68 +--
 .../hbase/io/asyncfs/TestLocalAsyncOutput.java  |   2 +-
 .../TestSaslFanOutOneBlockAsyncDFSOutput.java   |  64 ++-
 .../io/asyncfs/TestSendBufSizePredictor.java    |  44 ++
 10 files changed, 456 insertions(+), 362 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/9434d52c/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java
index 68adca9..bfe66de 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutput.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 
 /**
  * Interface for asynchronous filesystem output stream.
+ * <p>
+ * The implementation is not required to be thread safe.
  */
 @InterfaceAudience.Private
 public interface AsyncFSOutput extends Closeable {

http://git-wip-us.apache.org/repos/asf/hbase/blob/9434d52c/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
index 1f5462f..6a7e4fa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
@@ -17,12 +17,6 @@
  */
 package org.apache.hadoop.hbase.io.asyncfs;
 
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
-import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
-
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.nio.ByteBuffer;
@@ -35,12 +29,17 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
+import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
 
 /**
  * Helper class for creating AsyncFSOutput.
@@ -56,12 +55,12 @@ public final class AsyncFSOutputHelper {
    * implementation for other {@link FileSystem} which wraps around a {@link FSDataOutputStream}.
    */
   public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite,
-      boolean createParent, short replication, long blockSize, EventLoop eventLoop,
+      boolean createParent, short replication, long blockSize, EventLoopGroup eventLoopGroup,
       Class<? extends Channel> channelClass)
-          throws IOException, CommonFSUtils.StreamLacksCapabilityException {
+      throws IOException, CommonFSUtils.StreamLacksCapabilityException {
     if (fs instanceof DistributedFileSystem) {
       return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f,
-        overwrite, createParent, replication, blockSize, eventLoop, channelClass);
+        overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass);
     }
     final FSDataOutputStream fsOut;
     int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
@@ -75,23 +74,19 @@ public final class AsyncFSOutputHelper {
     // ensure that we can provide the level of data safety we're configured
     // to provide.
     if (!(CommonFSUtils.hasCapability(fsOut, "hflush") &&
-        CommonFSUtils.hasCapability(fsOut, "hsync"))) {
+      CommonFSUtils.hasCapability(fsOut, "hsync"))) {
       throw new CommonFSUtils.StreamLacksCapabilityException("hflush and hsync");
     }
     final ExecutorService flushExecutor =
-        Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true)
-            .setNameFormat("AsyncFSOutputFlusher-" + f.toString().replace("%", "%%")).build());
+      Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true)
+          .setNameFormat("AsyncFSOutputFlusher-" + f.toString().replace("%", "%%")).build());
     return new AsyncFSOutput() {
 
       private final ByteArrayOutputStream out = new ByteArrayOutputStream();
 
       @Override
-      public void write(final byte[] b, final int off, final int len) {
-        if (eventLoop.inEventLoop()) {
-          out.write(b, off, len);
-        } else {
-          eventLoop.submit(() -> out.write(b, off, len)).syncUninterruptibly();
-        }
+      public void write(byte[] b, int off, int len) {
+        out.write(b, off, len);
       }
 
       @Override
@@ -100,6 +95,16 @@ public final class AsyncFSOutputHelper {
       }
 
       @Override
+      public void writeInt(int i) {
+        out.writeInt(i);
+      }
+
+      @Override
+      public void write(ByteBuffer bb) {
+        out.write(bb, bb.position(), bb.remaining());
+      }
+
+      @Override
       public void recoverAndClose(CancelableProgressable reporter) throws IOException {
         fsOut.close();
       }
@@ -116,7 +121,7 @@ public final class AsyncFSOutputHelper {
             out.reset();
           }
         } catch (IOException e) {
-          eventLoop.execute(() -> future.completeExceptionally(e));
+          eventLoopGroup.next().execute(() -> future.completeExceptionally(e));
           return;
         }
         try {
@@ -126,9 +131,9 @@ public final class AsyncFSOutputHelper {
             fsOut.hflush();
           }
           long pos = fsOut.getPos();
-          eventLoop.execute(() -> future.complete(pos));
+          eventLoopGroup.next().execute(() -> future.complete(pos));
         } catch (IOException e) {
-          eventLoop.execute(() -> future.completeExceptionally(e));
+          eventLoopGroup.next().execute(() -> future.completeExceptionally(e));
         }
       }
 
@@ -164,16 +169,6 @@ public final class AsyncFSOutputHelper {
       public int buffered() {
         return out.size();
       }
-
-      @Override
-      public void writeInt(int i) {
-        out.writeInt(i);
-      }
-
-      @Override
-      public void write(ByteBuffer bb) {
-        out.write(bb, bb.position(), bb.remaining());
-      }
     };
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/9434d52c/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
index 3daf15b..91086d7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
@@ -17,48 +17,33 @@
  */
 package org.apache.hadoop.hbase.io.asyncfs;
 
-import static org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleState.READER_IDLE;
-import static org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleState.WRITER_IDLE;
 import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.HEART_BEAT_SEQNO;
 import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.READ_TIMEOUT;
 import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.completeFile;
 import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.endFileLease;
 import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus;
+import static org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleState.READER_IDLE;
+import static org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleState.WRITER_IDLE;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
 
-
-import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBuf;
-import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBufAllocator;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandler.Sharable;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandlerContext;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.SimpleChannelInboundHandler;
-import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.protobuf.ProtobufDecoder;
-import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
-import org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleStateEvent;
-import org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleStateHandler;
-import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.Future;
-import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.Promise;
-import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.PromiseCombiner;
-
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Deque;
-import java.util.IdentityHashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.Encryptor;
 import org.apache.hadoop.fs.Path;
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.util.FSUtils;
@@ -66,14 +51,28 @@ import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.util.DataChecksum;
+import org.apache.yetus.audience.InterfaceAudience;
 
-import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
+import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBuf;
+import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBufAllocator;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandler.Sharable;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandlerContext;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelId;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.protobuf.ProtobufDecoder;
+import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleStateEvent;
+import org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleStateHandler;
 
 /**
  * An asynchronous HDFS output stream implementation which fans out data to datanode and only
@@ -81,21 +80,16 @@ import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTe
  * <p>
  * Use the createOutput method in {@link FanOutOneBlockAsyncDFSOutputHelper} to create. The mainly
  * usage of this class is implementing WAL, so we only expose a little HDFS configurations in the
- * method. And we place it here under util package because we want to make it independent of WAL
+ * method. And we place it here under io package because we want to make it independent of WAL
  * implementation thus easier to move it to HDFS project finally.
  * <p>
- * Note that, all connections to datanode will run in the same {@link EventLoop} which means we only
- * need one thread here. But be careful, we do some blocking operations in {@link #close()} and
- * {@link #recoverAndClose(CancelableProgressable)} methods, so do not call them inside
- * {@link EventLoop}. And for {@link #write(byte[])} {@link #write(byte[], int, int)},
- * {@link #buffered()} and {@link #flush(boolean)}, if you call them outside {@link EventLoop},
- * there will be an extra context-switch.
+ * Note that, although we support pipelined flush, i.e, write new data and then flush before the
+ * previous flush succeeds, the implementation is not thread safe, so you should not call its
+ * methods concurrently.
  * <p>
  * Advantages compare to DFSOutputStream:
  * <ol>
  * <li>The fan out mechanism. This will reduce the latency.</li>
- * <li>The asynchronous WAL could also run in the same EventLoop, we could just call write and flush
- * inside the EventLoop thread, so generally we only have one thread to do all the things.</li>
  * <li>Fail-fast when connection to datanode error. The WAL implementation could open new writer
  * ASAP.</li>
  * <li>We could benefit from netty's ByteBuf management mechanism.</li>
@@ -124,11 +118,11 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
 
   private final long fileId;
 
-  private final LocatedBlock locatedBlock;
+  private final ExtendedBlock block;
 
-  private final Encryptor encryptor;
+  private final DatanodeInfo[] locations;
 
-  private final EventLoop eventLoop;
+  private final Encryptor encryptor;
 
   private final List<Channel> datanodeList;
 
@@ -140,65 +134,81 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
 
   private static final class Callback {
 
-    private final Promise<Void> promise;
+    private final CompletableFuture<Long> future;
 
     private final long ackedLength;
 
-    private final Set<Channel> unfinishedReplicas;
+    // should be backed by a thread safe collection
+    private final Set<ChannelId> unfinishedReplicas;
 
-    public Callback(Promise<Void> promise, long ackedLength, Collection<Channel> replicas) {
-      this.promise = promise;
+    public Callback(CompletableFuture<Long> future, long ackedLength,
+        Collection<Channel> replicas) {
+      this.future = future;
       this.ackedLength = ackedLength;
       if (replicas.isEmpty()) {
         this.unfinishedReplicas = Collections.emptySet();
       } else {
         this.unfinishedReplicas =
-            Collections.newSetFromMap(new IdentityHashMap<Channel, Boolean>(replicas.size()));
-        this.unfinishedReplicas.addAll(replicas);
+          Collections.newSetFromMap(new ConcurrentHashMap<ChannelId, Boolean>(replicas.size()));
+        replicas.stream().map(c -> c.id()).forEachOrdered(unfinishedReplicas::add);
       }
     }
   }
 
-  private final Deque<Callback> waitingAckQueue = new ArrayDeque<>();
+  private final ConcurrentLinkedDeque<Callback> waitingAckQueue = new ConcurrentLinkedDeque<>();
+
+  private volatile long ackedBlockLength = 0L;
 
   // this could be different from acked block length because a packet can not start at the middle of
   // a chunk.
   private long nextPacketOffsetInBlock = 0L;
 
+  // the length of the trailing partial chunk, this is because the packet start offset must be
+  // aligned with the length of checksum chunk so we need to resend the same data.
+  private int trailingPartialChunkLength = 0;
+
   private long nextPacketSeqno = 0L;
 
   private ByteBuf buf;
-  // buf's initial capacity - 4KB
-  private int capacity = 4 * 1024;
 
-  // LIMIT is 128MB
-  private static final int LIMIT = 128 * 1024 * 1024;
+  private final SendBufSizePredictor sendBufSizePRedictor = new SendBufSizePredictor();
 
+  // State for connections to DN
   private enum State {
     STREAMING, CLOSING, BROKEN, CLOSED
   }
 
-  private State state;
+  private volatile State state;
 
+  // all lock-free to make it run faster
   private void completed(Channel channel) {
-    if (waitingAckQueue.isEmpty()) {
-      return;
-    }
-    for (Callback c : waitingAckQueue) {
-      if (c.unfinishedReplicas.remove(channel)) {
+    for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) {
+      Callback c = iter.next();
+      // if the current unfinished replicas does not contain us then it means that we have already
+      // acked this one, let's iterate to find the one we have not acked yet.
+      if (c.unfinishedReplicas.remove(channel.id())) {
         if (c.unfinishedReplicas.isEmpty()) {
-          c.promise.trySuccess(null);
-          // since we will remove the Callback entry from waitingAckQueue if its unfinishedReplicas
-          // is empty, so this could only happen at the head of waitingAckQueue, so we just call
-          // removeFirst here.
-          waitingAckQueue.removeFirst();
-          // also wake up flush requests which have the same length.
-          for (Callback cb; (cb = waitingAckQueue.peekFirst()) != null;) {
-            if (cb.ackedLength == c.ackedLength) {
-              cb.promise.trySuccess(null);
-              waitingAckQueue.removeFirst();
-            } else {
-              break;
+          // we need to remove first before complete the future. It is possible that after we
+          // complete the future the upper layer will call close immediately before we remove the
+          // entry from waitingAckQueue and lead to an IllegalStateException. And also set the
+          // ackedBlockLength first otherwise we may use a wrong length to commit the block. This
+          // may lead to multiple remove and assign but is OK. The semantic of iter.remove is
+          // removing the entry returned by calling previous next, so if the entry has already been
+          // removed then it is a no-op, and for the assign, the values are the same so no problem.
+          iter.remove();
+          ackedBlockLength = c.ackedLength;
+          // the future.complete check is to confirm that we are the only one who grabbed the work,
+          // otherwise just give up and return.
+          if (c.future.complete(c.ackedLength)) {
+            // also wake up flush requests which have the same length.
+            while (iter.hasNext()) {
+              Callback maybeDummyCb = iter.next();
+              if (maybeDummyCb.ackedLength == c.ackedLength) {
+                iter.remove();
+                maybeDummyCb.future.complete(c.ackedLength);
+              } else {
+                break;
+              }
             }
           }
         }
@@ -207,13 +217,16 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
     }
   }
 
-  private void failed(Channel channel, Supplier<Throwable> errorSupplier) {
+  // this usually does not happen which means it is not on the critical path so make it synchronized
+  // so that the implementation will not burn up our brain as there are multiple state changes and
+  // checks.
+  private synchronized void failed(Channel channel, Supplier<Throwable> errorSupplier) {
     if (state == State.BROKEN || state == State.CLOSED) {
       return;
     }
     if (state == State.CLOSING) {
       Callback c = waitingAckQueue.peekFirst();
-      if (c == null || !c.unfinishedReplicas.contains(channel)) {
+      if (c == null || !c.unfinishedReplicas.contains(channel.id())) {
         // nothing, the endBlock request has already finished.
         return;
       }
@@ -221,8 +234,21 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
     // disable further write, and fail all pending ack.
     state = State.BROKEN;
     Throwable error = errorSupplier.get();
-    waitingAckQueue.stream().forEach(c -> c.promise.tryFailure(error));
-    waitingAckQueue.clear();
+    for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) {
+      Callback c = iter.next();
+      // find the first sync request which we have not acked yet and fail all the request after it.
+      if (!c.unfinishedReplicas.contains(channel.id())) {
+        continue;
+      }
+      for (;;) {
+        c.future.completeExceptionally(error);
+        if (!iter.hasNext()) {
+          break;
+        }
+        c = iter.next();
+      }
+      break;
+    }
     datanodeList.forEach(ch -> ch.close());
   }
 
@@ -239,13 +265,13 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
     protected void channelRead0(ChannelHandlerContext ctx, PipelineAckProto ack) throws Exception {
       Status reply = getStatus(ack);
       if (reply != Status.SUCCESS) {
-        failed(ctx.channel(), () -> new IOException("Bad response " + reply + " for block "
-            + locatedBlock.getBlock() + " from datanode " + ctx.channel().remoteAddress()));
+        failed(ctx.channel(), () -> new IOException("Bad response " + reply + " for block " +
+          block + " from datanode " + ctx.channel().remoteAddress()));
         return;
       }
       if (PipelineAck.isRestartOOBStatus(reply)) {
-        failed(ctx.channel(), () -> new IOException("Restart response " + reply + " for block "
-            + locatedBlock.getBlock() + " from datanode " + ctx.channel().remoteAddress()));
+        failed(ctx.channel(), () -> new IOException("Restart response " + reply + " for block " +
+          block + " from datanode " + ctx.channel().remoteAddress()));
         return;
       }
       if (ack.getSeqno() == HEART_BEAT_SEQNO) {
@@ -256,6 +282,9 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
 
     @Override
     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+      if (state == State.CLOSED) {
+        return;
+      }
       failed(ctx.channel(),
         () -> new IOException("Connection to " + ctx.channel().remoteAddress() + " closed"));
     }
@@ -299,8 +328,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
 
   FanOutOneBlockAsyncDFSOutput(Configuration conf, FSUtils fsUtils, DistributedFileSystem dfs,
       DFSClient client, ClientProtocol namenode, String clientName, String src, long fileId,
-      LocatedBlock locatedBlock, Encryptor encryptor, EventLoop eventLoop,
-      List<Channel> datanodeList, DataChecksum summer, ByteBufAllocator alloc) {
+      LocatedBlock locatedBlock, Encryptor encryptor, List<Channel> datanodeList,
+      DataChecksum summer, ByteBufAllocator alloc) {
     this.conf = conf;
     this.fsUtils = fsUtils;
     this.dfs = dfs;
@@ -309,81 +338,53 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
     this.fileId = fileId;
     this.clientName = clientName;
     this.src = src;
-    this.locatedBlock = locatedBlock;
+    this.block = locatedBlock.getBlock();
+    this.locations = locatedBlock.getLocations();
     this.encryptor = encryptor;
-    this.eventLoop = eventLoop;
     this.datanodeList = datanodeList;
     this.summer = summer;
     this.maxDataLen = MAX_DATA_LEN - (MAX_DATA_LEN % summer.getBytesPerChecksum());
     this.alloc = alloc;
-    this.buf = alloc.directBuffer(capacity);
+    this.buf = alloc.directBuffer(sendBufSizePRedictor.initialSize());
     this.state = State.STREAMING;
     setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT));
   }
 
-  private void writeInt0(int i) {
+  @Override
+  public void writeInt(int i) {
     buf.ensureWritable(4);
     buf.writeInt(i);
   }
 
   @Override
-  public void writeInt(int i) {
-    if (eventLoop.inEventLoop()) {
-      writeInt0(i);
-    } else {
-      eventLoop.submit(() -> writeInt0(i));
-    }
-  }
-
-  private void write0(ByteBuffer bb) {
+  public void write(ByteBuffer bb) {
     buf.ensureWritable(bb.remaining());
     buf.writeBytes(bb);
   }
 
   @Override
-  public void write(ByteBuffer bb) {
-    if (eventLoop.inEventLoop()) {
-      write0(bb);
-    } else {
-      eventLoop.submit(() -> write0(bb));
-    }
-  }
-
-  @Override
   public void write(byte[] b) {
     write(b, 0, b.length);
   }
 
-  private void write0(byte[] b, int off, int len) {
-    buf.ensureWritable(len);
-    buf.writeBytes(b, off, len);
-  }
-
   @Override
   public void write(byte[] b, int off, int len) {
-    if (eventLoop.inEventLoop()) {
-      write0(b, off, len);
-    } else {
-      eventLoop.submit(() -> write0(b, off, len)).syncUninterruptibly();
-    }
+    buf.ensureWritable(len);
+    buf.writeBytes(b, off, len);
   }
 
   @Override
   public int buffered() {
-    if (eventLoop.inEventLoop()) {
-      return buf.readableBytes();
-    } else {
-      return eventLoop.submit(() -> buf.readableBytes()).syncUninterruptibly().getNow().intValue();
-    }
+    return buf.readableBytes();
   }
 
   @Override
   public DatanodeInfo[] getPipeline() {
-    return locatedBlock.getLocations();
+    return locations;
   }
 
-  private Promise<Void> flushBuffer(ByteBuf dataBuf, long nextPacketOffsetInBlock,
-      boolean syncBlock) {
+  private void flushBuffer(CompletableFuture<Long> future, ByteBuf dataBuf,
+      long nextPacketOffsetInBlock, boolean syncBlock) {
     int dataLen = dataBuf.readableBytes();
     int chunkLen = summer.getBytesPerChecksum();
     int trailingPartialChunkLen = dataLen % chunkLen;
@@ -398,24 +399,24 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
     ByteBuf headerBuf = alloc.buffer(headerLen);
     header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
     headerBuf.writerIndex(headerLen);
-
-    long ackedLength = nextPacketOffsetInBlock + dataLen;
-    Promise<Void> promise = eventLoop.<Void> newPromise().addListener(future -> {
-      if (future.isSuccess()) {
-        locatedBlock.getBlock().setNumBytes(ackedLength);
-      }
-    });
-    waitingAckQueue.addLast(new Callback(promise, ackedLength, datanodeList));
-    for (Channel ch : datanodeList) {
-      ch.write(headerBuf.duplicate().retain());
-      ch.write(checksumBuf.duplicate().retain());
-      ch.writeAndFlush(dataBuf.duplicate().retain());
+    Callback c = new Callback(future, nextPacketOffsetInBlock + dataLen, datanodeList);
+    waitingAckQueue.addLast(c);
+    // recheck again after we pushed the callback to queue
+    if (state != State.STREAMING && waitingAckQueue.peekFirst() == c) {
+      future.completeExceptionally(new IOException("stream already broken"));
+      // it's the one we have just pushed or just a no-op
+      waitingAckQueue.removeFirst();
+      return;
     }
+    datanodeList.forEach(ch -> {
+      ch.write(headerBuf.retainedDuplicate());
+      ch.write(checksumBuf.retainedDuplicate());
+      ch.writeAndFlush(dataBuf.retainedDuplicate());
+    });
     checksumBuf.release();
     headerBuf.release();
     dataBuf.release();
     nextPacketSeqno++;
-    return promise;
   }
 
   private void flush0(CompletableFuture<Long> future, boolean syncBlock) {
@@ -424,11 +425,43 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
       return;
     }
     int dataLen = buf.readableBytes();
+    if (dataLen == trailingPartialChunkLength) {
+      // no new data
+      long lengthAfterFlush = nextPacketOffsetInBlock + dataLen;
+      Callback lastFlush = waitingAckQueue.peekLast();
+      if (lastFlush != null) {
+        Callback c = new Callback(future, lengthAfterFlush, Collections.emptyList());
+        waitingAckQueue.addLast(c);
+        // recheck here if we have already removed the previous callback from the queue
+        if (waitingAckQueue.peekFirst() == c) {
+          // all previous callbacks have been removed
+          // notice that this does mean we will always win here because the background thread may
+          // have already started to mark the future here as completed in the completed or failed
+          // methods but haven't removed it from the queue yet. That's also why the removeFirst
+          // call below may be a no-op.
+          if (state != State.STREAMING) {
+            future.completeExceptionally(new IOException("stream already broken"));
+          } else {
+            future.complete(lengthAfterFlush);
+          }
+          // it's the one we have just pushed or just a no-op
+          waitingAckQueue.removeFirst();
+        }
+      } else {
+        // we must have acked all the data so the ackedBlockLength must be same with
+        // lengthAfterFlush
+        future.complete(lengthAfterFlush);
+      }
+      return;
+    }
+
     if (encryptor != null) {
       ByteBuf encryptBuf = alloc.directBuffer(dataLen);
+      buf.readBytes(encryptBuf, trailingPartialChunkLength);
+      int toEncryptLength = dataLen - trailingPartialChunkLength;
       try {
-        encryptor.encrypt(buf.nioBuffer(buf.readerIndex(), dataLen),
-          encryptBuf.nioBuffer(0, dataLen));
+        encryptor.encrypt(buf.nioBuffer(trailingPartialChunkLength, toEncryptLength),
+          encryptBuf.nioBuffer(trailingPartialChunkLength, toEncryptLength));
       } catch (IOException e) {
         encryptBuf.release();
         future.completeExceptionally(e);
@@ -438,56 +471,35 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
       buf.release();
       buf = encryptBuf;
     }
-    long lengthAfterFlush = nextPacketOffsetInBlock + dataLen;
-    if (lengthAfterFlush == locatedBlock.getBlock().getNumBytes()) {
-      // no new data, just return
-      future.complete(locatedBlock.getBlock().getNumBytes());
-      return;
-    }
-    Callback c = waitingAckQueue.peekLast();
-    if (c != null && lengthAfterFlush == c.ackedLength) {
-      // just append it to the tail of waiting ack queue,, do not issue new hflush request.
-      waitingAckQueue.addLast(new Callback(eventLoop.<Void> newPromise().addListener(f -> {
-        if (f.isSuccess()) {
-          future.complete(lengthAfterFlush);
-        } else {
-          future.completeExceptionally(f.cause());
-        }
-      }), lengthAfterFlush, Collections.<Channel> emptyList()));
-      return;
-    }
-    Promise<Void> promise;
+
     if (dataLen > maxDataLen) {
       // We need to write out the data by multiple packets as the max packet allowed is 16M.
-      PromiseCombiner combiner = new PromiseCombiner();
       long nextSubPacketOffsetInBlock = nextPacketOffsetInBlock;
-      for (int remaining = dataLen; remaining > 0;) {
-        int toWriteDataLen = Math.min(remaining, maxDataLen);
-        combiner.add((Future<Void>) flushBuffer(buf.readRetainedSlice(toWriteDataLen),
-          nextSubPacketOffsetInBlock, syncBlock));
-        nextSubPacketOffsetInBlock += toWriteDataLen;
-        remaining -= toWriteDataLen;
+      for (int remaining = dataLen;;) {
+        if (remaining < maxDataLen) {
+          flushBuffer(future, buf.readRetainedSlice(remaining), nextSubPacketOffsetInBlock,
+            syncBlock);
+          break;
+        } else {
+          flushBuffer(new CompletableFuture<>(), buf.readRetainedSlice(maxDataLen),
+            nextSubPacketOffsetInBlock, syncBlock);
+          remaining -= maxDataLen;
+          nextSubPacketOffsetInBlock += maxDataLen;
+        }
       }
-      promise = eventLoop.newPromise();
-      combiner.finish(promise);
     } else {
-      promise = flushBuffer(buf.retain(), nextPacketOffsetInBlock, syncBlock);
+      flushBuffer(future, buf.retain(), nextPacketOffsetInBlock, syncBlock);
     }
-    promise.addListener(f -> {
-      if (f.isSuccess()) {
-        future.complete(lengthAfterFlush);
-      } else {
-        future.completeExceptionally(f.cause());
-      }
-    });
-    int trailingPartialChunkLen = dataLen % summer.getBytesPerChecksum();
-    ByteBuf newBuf = alloc.directBuffer(guess(dataLen)).ensureWritable(trailingPartialChunkLen);
-    if (trailingPartialChunkLen != 0) {
-      buf.readerIndex(dataLen - trailingPartialChunkLen).readBytes(newBuf, trailingPartialChunkLen);
+    trailingPartialChunkLength = dataLen % summer.getBytesPerChecksum();
+    ByteBuf newBuf = alloc.directBuffer(sendBufSizePRedictor.guess(dataLen))
+        .ensureWritable(trailingPartialChunkLength);
+    if (trailingPartialChunkLength != 0) {
+      buf.readerIndex(dataLen - trailingPartialChunkLength).readBytes(newBuf,
+        trailingPartialChunkLength);
     }
     buf.release();
     this.buf = newBuf;
-    nextPacketOffsetInBlock += dataLen - trailingPartialChunkLen;
+    nextPacketOffsetInBlock += dataLen - trailingPartialChunkLength;
   }
 
   /**
@@ -497,34 +509,38 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
    */
   public CompletableFuture<Long> flush(boolean syncBlock) {
     CompletableFuture<Long> future = new CompletableFuture<>();
-    if (eventLoop.inEventLoop()) {
-      flush0(future, syncBlock);
-    } else {
-      eventLoop.execute(() -> flush0(future, syncBlock));
-    }
+    flush0(future, syncBlock);
     return future;
   }
 
-  private void endBlock(Promise<Void> promise, long size) {
+  private void endBlock() throws IOException {
+    Preconditions.checkState(waitingAckQueue.isEmpty(),
+      "should call flush first before calling close");
     if (state != State.STREAMING) {
-      promise.tryFailure(new IOException("stream already broken"));
-      return;
-    }
-    if (!waitingAckQueue.isEmpty()) {
-      promise.tryFailure(new IllegalStateException("should call flush first before calling close"));
-      return;
+      throw new IOException("stream already broken");
     }
     state = State.CLOSING;
-    PacketHeader header = new PacketHeader(4, size, nextPacketSeqno, true, 0, false);
+    long finalizedLength = ackedBlockLength;
+    PacketHeader header = new PacketHeader(4, finalizedLength, nextPacketSeqno, true, 0, false);
     buf.release();
     buf = null;
     int headerLen = header.getSerializedSize();
     ByteBuf headerBuf = alloc.directBuffer(headerLen);
     header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
     headerBuf.writerIndex(headerLen);
-    waitingAckQueue.add(new Callback(promise, size, datanodeList));
-    datanodeList.forEach(ch -> ch.writeAndFlush(headerBuf.duplicate().retain()));
+    CompletableFuture<Long> future = new CompletableFuture<>();
+    waitingAckQueue.add(new Callback(future, finalizedLength, datanodeList));
+    datanodeList.forEach(ch -> ch.writeAndFlush(headerBuf.retainedDuplicate()));
     headerBuf.release();
+    try {
+      future.get();
+    } catch (InterruptedException e) {
+      throw (IOException) new InterruptedIOException().initCause(e);
+    } catch (ExecutionException e) {
+      Throwable cause = e.getCause();
+      Throwables.propagateIfPossible(cause, IOException.class);
+      throw new IOException(cause);
+    }
   }
 
   /**
@@ -532,7 +548,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
    */
   @Override
   public void recoverAndClose(CancelableProgressable reporter) throws IOException {
-    assert !eventLoop.inEventLoop();
+    datanodeList.forEach(ch -> ch.close());
     datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly());
     endFileLease(client, fileId);
     fsUtils.recoverFileLease(dfs, new Path(src), conf,
@@ -545,32 +561,11 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
    */
   @Override
   public void close() throws IOException {
-    assert !eventLoop.inEventLoop();
-    Promise<Void> promise = eventLoop.newPromise();
-    eventLoop.execute(() -> endBlock(promise, nextPacketOffsetInBlock + buf.readableBytes()));
-    promise.addListener(f -> datanodeList.forEach(ch -> ch.close())).syncUninterruptibly();
+    endBlock();
+    state = State.CLOSED;
+    datanodeList.forEach(ch -> ch.close());
     datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly());
-    completeFile(client, namenode, src, clientName, locatedBlock.getBlock(), fileId);
-  }
-
-  @VisibleForTesting
-  int guess(int bytesWritten) {
-    // if the bytesWritten is greater than the current capacity
-    // always increase the capacity in powers of 2.
-    if (bytesWritten > this.capacity) {
-      // Ensure we don't cross the LIMIT
-      if ((this.capacity << 1) <= LIMIT) {
-        // increase the capacity in the range of power of 2
-        this.capacity = this.capacity << 1;
-      }
-    } else {
-      // if we see that the bytesWritten is lesser we could again decrease
-      // the capacity by dividing it by 2 if the bytesWritten is satisfied by
-      // that reduction
-      if ((this.capacity >> 1) >= bytesWritten) {
-        this.capacity = this.capacity >> 1;
-      }
-    }
-    return this.capacity;
+    block.setNumBytes(ackedBlockLength);
+    completeFile(client, namenode, src, clientName, block, fileId);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/9434d52c/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
index d3dc957..61aa97c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
@@ -17,43 +17,19 @@
  */
 package org.apache.hadoop.hbase.io.asyncfs;
 
-import static org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
-import static org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleState.READER_IDLE;
 import static org.apache.hadoop.fs.CreateFlag.CREATE;
 import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
 import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor;
 import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
+import static org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
+import static org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleState.READER_IDLE;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
 import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE;
 
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableMap;
 import com.google.protobuf.CodedOutputStream;
 
-import org.apache.hadoop.hbase.shaded.io.netty.bootstrap.Bootstrap;
-import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBuf;
-import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBufAllocator;
-import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBufOutputStream;
-import org.apache.hadoop.hbase.shaded.io.netty.buffer.PooledByteBufAllocator;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelFuture;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelFutureListener;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandler;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandlerContext;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelInitializer;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelPipeline;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.SimpleChannelInboundHandler;
-import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.protobuf.ProtobufDecoder;
-import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
-import org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleStateEvent;
-import org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleStateHandler;
-import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.Future;
-import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.FutureListener;
-import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.Promise;
-
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -73,7 +49,6 @@ import org.apache.hadoop.fs.FileSystemLinkResolver;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.client.ConnectionUtils;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.util.FSUtils;
@@ -109,6 +84,32 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.hbase.shaded.io.netty.bootstrap.Bootstrap;
+import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBuf;
+import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBufAllocator;
+import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBufOutputStream;
+import org.apache.hadoop.hbase.shaded.io.netty.buffer.PooledByteBufAllocator;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelFuture;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelFutureListener;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandler;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandlerContext;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelInitializer;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelPipeline;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.protobuf.ProtobufDecoder;
+import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleStateEvent;
+import org.apache.hadoop.hbase.shaded.io.netty.handler.timeout.IdleStateHandler;
+import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.Future;
+import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.FutureListener;
+import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.Promise;
 
 /**
  * Helper class for implementing {@link FanOutOneBlockAsyncDFSOutput}.
@@ -664,10 +665,10 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
     });
   }
 
-  private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSClient client,
-      String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS,
-      BlockConstructionStage stage, DataChecksum summer, EventLoop eventLoop,
-      Class<? extends Channel> channelClass) {
+  private static List<Future<Channel>> connectToDataNodes(Configuration conf,
+      DFSClient client, String clientName, LocatedBlock locatedBlock, long maxBytesRcvd,
+      long latestGS, BlockConstructionStage stage, DataChecksum summer,
+      EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) {
     Enum<?>[] storageTypes = locatedBlock.getStorageTypes();
     DatanodeInfo[] datanodeInfos = locatedBlock.getLocations();
     boolean connectToDnViaHostname =
@@ -690,10 +691,10 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
     for (int i = 0; i < datanodeInfos.length; i++) {
       DatanodeInfo dnInfo = datanodeInfos[i];
       Enum<?> storageType = storageTypes[i];
-      Promise<Channel> promise = eventLoop.newPromise();
+      Promise<Channel> promise = eventLoopGroup.next().newPromise();
       futureList.add(promise);
       String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname);
-      new Bootstrap().group(eventLoop).channel(channelClass)
+      new Bootstrap().group(eventLoopGroup).channel(channelClass)
           .option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer<Channel>() {
 
             @Override
@@ -732,7 +733,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
 
   private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src,
       boolean overwrite, boolean createParent, short replication, long blockSize,
-      EventLoop eventLoop, Class<? extends Channel> channelClass) throws IOException {
+      EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException {
     Configuration conf = dfs.getConf();
     FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
     DFSClient client = dfs.getClient();
@@ -761,7 +762,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
         stat.getFileId(), null);
       List<Channel> datanodeList = new ArrayList<>();
       futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L,
-        PIPELINE_SETUP_CREATE, summer, eventLoop, channelClass);
+        PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass);
       for (Future<Channel> future : futureList) {
         // fail the creation if there are connection failures since we are fail-fast. The upper
         // layer should retry itself if needed.
@@ -770,7 +771,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
       Encryptor encryptor = createEncryptor(conf, stat, client);
       FanOutOneBlockAsyncDFSOutput output =
           new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs, client, namenode, clientName, src,
-              stat.getFileId(), locatedBlock, encryptor, eventLoop, datanodeList, summer, ALLOC);
+              stat.getFileId(), locatedBlock, encryptor, datanodeList, summer, ALLOC);
       succ = true;
       return output;
     } finally {
@@ -796,19 +797,18 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
 
   /**
    * Create a {@link FanOutOneBlockAsyncDFSOutput}. The method maybe blocked so do not call it
-   * inside {@link EventLoop}.
-   * @param eventLoop all connections to datanode will use the same event loop.
+   * inside an {@link EventLoop}.
    */
   public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f,
       boolean overwrite, boolean createParent, short replication, long blockSize,
-      EventLoop eventLoop, Class<? extends Channel> channelClass) throws IOException {
+      EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException {
     return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() {
 
       @Override
       public FanOutOneBlockAsyncDFSOutput doCall(Path p)
           throws IOException, UnresolvedLinkException {
         return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication,
-          blockSize, eventLoop, channelClass);
+          blockSize, eventLoopGroup, channelClass);
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/9434d52c/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/SendBufSizePredictor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/SendBufSizePredictor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/SendBufSizePredictor.java
new file mode 100644
index 0000000..2f65244
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/SendBufSizePredictor.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.asyncfs;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Used to predict the next send buffer size.
+ */
+@InterfaceAudience.Private
+class SendBufSizePredictor {
+
+  // LIMIT is 128MB
+  private static final int LIMIT = 128 * 1024 * 1024;
+
+  // buf's initial capacity - 4KB
+  private int capacity = 4 * 1024;
+
+  int initialSize() {
+    return capacity;
+  }
+
+  int guess(int bytesWritten) {
+    // if the bytesWritten is greater than the current capacity
+    // always increase the capacity in powers of 2.
+    if (bytesWritten > this.capacity) {
+      // Ensure we don't cross the LIMIT
+      if ((this.capacity << 1) <= LIMIT) {
+        // increase the capacity in the range of power of 2
+        this.capacity = this.capacity << 1;
+      }
+    } else {
+      // if we see that the bytesWritten is lesser we could again decrease
+      // the capacity by dividing it by 2 if the bytesWritten is satisfied by
+      // that reduction
+      if ((this.capacity >> 1) >= bytesWritten) {
+        this.capacity = this.capacity >> 1;
+      }
+    }
+    return this.capacity;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/9434d52c/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
index 4304137..bf3b2ad 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
@@ -17,15 +17,6 @@
  */
 package org.apache.hadoop.hbase.wal;
 
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
-
-import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioSocketChannel;
-import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.DefaultThreadFactory;
-
 import java.io.IOException;
 
 import org.apache.commons.logging.Log;
@@ -33,13 +24,21 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
 import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL;
 import org.apache.hadoop.hbase.regionserver.wal.AsyncProtobufLogWriter;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.DefaultThreadFactory;
 
 /**
  * A WAL provider that use {@link AsyncFSWAL}.

http://git-wip-us.apache.org/repos/asf/hbase/blob/9434d52c/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
index 4377196..48c1cbf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
@@ -24,17 +24,15 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
-import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioSocketChannel;
-
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
 
@@ -56,6 +54,12 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
 
+import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioSocketChannel;
+
 @Category({ MiscTests.class, MediumTests.class })
 public class TestFanOutOneBlockAsyncDFSOutput {
 
@@ -97,9 +101,9 @@ public class TestFanOutOneBlockAsyncDFSOutput {
     // will fail.
     for (;;) {
       try {
-        FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS,
-          new Path("/ensureDatanodeAlive"), true, true, (short) 3, FS.getDefaultBlockSize(),
-          EVENT_LOOP_GROUP.next(), CHANNEL_CLASS);
+        FanOutOneBlockAsyncDFSOutput out =
+          FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, new Path("/ensureDatanodeAlive"),
+            true, true, (short) 3, FS.getDefaultBlockSize(), EVENT_LOOP_GROUP, CHANNEL_CLASS);
         out.close();
         break;
       } catch (IOException e) {
@@ -111,17 +115,32 @@ public class TestFanOutOneBlockAsyncDFSOutput {
   static void writeAndVerify(EventLoop eventLoop, DistributedFileSystem dfs, Path f,
       final FanOutOneBlockAsyncDFSOutput out)
       throws IOException, InterruptedException, ExecutionException {
-    final byte[] b = new byte[10];
-    ThreadLocalRandom.current().nextBytes(b);
-    out.write(b, 0, b.length);
-    assertEquals(b.length, out.flush(false).get().longValue());
+    List<CompletableFuture<Long>> futures = new ArrayList<>();
+    byte[] b = new byte[10];
+    Random rand = new Random(12345);
+    // test pipelined flush
+    for (int i = 0; i < 10; i++) {
+      rand.nextBytes(b);
+      out.write(b);
+      futures.add(out.flush(false));
+      futures.add(out.flush(false));
+    }
+    for (int i = 0; i < 10; i++) {
+      assertEquals((i + 1) * b.length, futures.get(2 * i).join().longValue());
+      assertEquals((i + 1) * b.length, futures.get(2 * i + 1).join().longValue());
+    }
     out.close();
-    assertEquals(b.length, dfs.getFileStatus(f).getLen());
+    assertEquals(b.length * 10, dfs.getFileStatus(f).getLen());
     byte[] actual = new byte[b.length];
+    rand.setSeed(12345);
     try (FSDataInputStream in = dfs.open(f)) {
-      in.readFully(actual);
+      for (int i = 0; i < 10; i++) {
+        in.readFully(actual);
+        rand.nextBytes(b);
+        assertArrayEquals(b, actual);
+      }
+      assertEquals(-1, in.read());
     }
-    assertArrayEquals(b, actual);
   }
 
   @Test
@@ -134,21 +153,6 @@ public class TestFanOutOneBlockAsyncDFSOutput {
   }
 
   @Test
-  public void testMaxByteBufAllocated() throws Exception {
-    Path f = new Path("/" + name.getMethodName());
-    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
-    FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
-      false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
-    out.guess(5 * 1024);
-    assertEquals(8 * 1024, out.guess(5 * 1024));
-    assertEquals(16 * 1024, out.guess(10 * 1024));
-    // it wont reduce directly to 4KB
-    assertEquals(8 * 1024, out.guess(4 * 1024));
-    // This time it will reduece
-    assertEquals(4 * 1024, out.guess(4 * 1024));
-  }
-
-  @Test
   public void testRecover() throws IOException, InterruptedException, ExecutionException {
     Path f = new Path("/" + name.getMethodName());
     EventLoop eventLoop = EVENT_LOOP_GROUP.next();
@@ -216,7 +220,7 @@ public class TestFanOutOneBlockAsyncDFSOutput {
     Field xceiverServerDaemonField = DataNode.class.getDeclaredField("dataXceiverServer");
     xceiverServerDaemonField.setAccessible(true);
     Class<?> xceiverServerClass =
-        Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer");
+      Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer");
     Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers");
     numPeersMethod.setAccessible(true);
     // make one datanode broken

http://git-wip-us.apache.org/repos/asf/hbase/blob/9434d52c/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
index b0d689c..0453f1c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
@@ -61,7 +61,7 @@ public class TestLocalAsyncOutput {
     Path f = new Path(TEST_UTIL.getDataTestDir(), "test");
     FileSystem fs = FileSystem.getLocal(TEST_UTIL.getConfiguration());
     AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, false, true,
-      fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP.next(), CHANNEL_CLASS);
+      fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP, CHANNEL_CLASS);
     byte[] b = new byte[10];
     ThreadLocalRandom.current().nextBytes(b);
     out.write(b);

http://git-wip-us.apache.org/repos/asf/hbase/blob/9434d52c/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
index f1ecd3a..0e2b6d4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
@@ -31,12 +31,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIP
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
 
-import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioSocketChannel;
-
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Method;
@@ -75,6 +69,12 @@ import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
 import org.junit.runners.Parameterized.Parameters;
 
+import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioSocketChannel;
+
 @RunWith(Parameterized.class)
 @Category({ MiscTests.class, LargeTests.class })
 public class TestSaslFanOutOneBlockAsyncDFSOutput {
@@ -90,7 +90,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
   private static int READ_TIMEOUT_MS = 200000;
 
   private static final File KEYTAB_FILE =
-      new File(TEST_UTIL.getDataTestDir("keytab").toUri().getPath());
+    new File(TEST_UTIL.getDataTestDir("keytab").toUri().getPath());
 
   private static MiniKdc KDC;
 
@@ -104,8 +104,6 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
 
   private static String TEST_KEY_NAME = "test_key";
 
-  private static boolean TEST_TRANSPARENT_ENCRYPTION = true;
-
   @Rule
   public TestName name = new TestName();
 
@@ -118,20 +116,13 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
   @Parameter(2)
   public String cipherSuite;
 
-  @Parameter(3)
-  public boolean useTransparentEncryption;
-
-  @Parameters(
-      name = "{index}: protection={0}, encryption={1}, cipherSuite={2}, transparent_enc={3}")
+  @Parameters(name = "{index}: protection={0}, encryption={1}, cipherSuite={2}")
   public static Iterable<Object[]> data() {
     List<Object[]> params = new ArrayList<>();
     for (String protection : Arrays.asList("authentication", "integrity", "privacy")) {
       for (String encryptionAlgorithm : Arrays.asList("", "3des", "rc4")) {
         for (String cipherSuite : Arrays.asList("", CipherSuite.AES_CTR_NOPADDING.getName())) {
-          for (boolean useTransparentEncryption : Arrays.asList(false, true)) {
-            params.add(new Object[] { protection, encryptionAlgorithm, cipherSuite,
-                useTransparentEncryption });
-          }
+          params.add(new Object[] { protection, encryptionAlgorithm, cipherSuite });
         }
       }
     }
@@ -159,7 +150,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
 
   private static void setUpKeyProvider(Configuration conf) throws Exception {
     URI keyProviderUri =
-        new URI("jceks://file" + TEST_UTIL.getDataTestDir("test.jks").toUri().toString());
+      new URI("jceks://file" + TEST_UTIL.getDataTestDir("test.jks").toUri().toString());
     conf.set("dfs.encryption.key.provider.uri", keyProviderUri.toString());
     KeyProvider keyProvider = KeyProviderFactory.get(keyProviderUri, conf);
     keyProvider.createKey(TEST_KEY_NAME, KeyProvider.options(conf));
@@ -197,13 +188,12 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
 
   private Path testDirOnTestFs;
 
+  private Path entryptionTestDirOnTestFs;
+
   private void createEncryptionZone() throws Exception {
-    if (!TEST_TRANSPARENT_ENCRYPTION) {
-      return;
-    }
     Method method =
-        DistributedFileSystem.class.getMethod("createEncryptionZone", Path.class, String.class);
-    method.invoke(FS, testDirOnTestFs, TEST_KEY_NAME);
+      DistributedFileSystem.class.getMethod("createEncryptionZone", Path.class, String.class);
+    method.invoke(FS, entryptionTestDirOnTestFs, TEST_KEY_NAME);
   }
 
   @Before
@@ -225,13 +215,13 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
       TEST_UTIL.getConfiguration().set(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuite);
     }
 
-    TEST_UTIL.startMiniDFSCluster(1);
+    TEST_UTIL.startMiniDFSCluster(3);
     FS = TEST_UTIL.getDFSCluster().getFileSystem();
     testDirOnTestFs = new Path("/" + name.getMethodName().replaceAll("[^0-9a-zA-Z]", "_"));
     FS.mkdirs(testDirOnTestFs);
-    if (useTransparentEncryption) {
-      createEncryptionZone();
-    }
+    entryptionTestDirOnTestFs = new Path("/" + testDirOnTestFs.getName() + "_enc");
+    FS.mkdirs(entryptionTestDirOnTestFs);
+    createEncryptionZone();
   }
 
   @After
@@ -243,12 +233,20 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
     return new Path(testDirOnTestFs, "test");
   }
 
+  private Path getEncryptionTestFile() {
+    return new Path(entryptionTestDirOnTestFs, "test");
+  }
+
+  private void test(Path file) throws IOException, InterruptedException, ExecutionException {
+    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
+    FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file,
+      true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
+    TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(eventLoop, FS, file, out);
+  }
+
   @Test
   public void test() throws IOException, InterruptedException, ExecutionException {
-    Path f = getTestFile();
-    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
-    FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
-      false, (short) 1, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
-    TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(eventLoop, FS, f, out);
+    test(getTestFile());
+    test(getEncryptionTestFile());
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/9434d52c/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSendBufSizePredictor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSendBufSizePredictor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSendBufSizePredictor.java
new file mode 100644
index 0000000..a5f536f
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSendBufSizePredictor.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.asyncfs;
+
+import static org.junit.Assert.*;
+
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ *
+ */
+@Category({ MiscTests.class, SmallTests.class })
+public class TestSendBufSizePredictor {
+
+  @Test
+  public void test() {
+    SendBufSizePredictor p = new SendBufSizePredictor();
+    assertEquals(8 * 1024, p.guess(5 * 1024));
+    assertEquals(8 * 1024, p.guess(5 * 1024));
+    assertEquals(16 * 1024, p.guess(10 * 1024));
+    // it wont reduce directly to 4KB
+    assertEquals(8 * 1024, p.guess(4 * 1024));
+    // This time it will reduece
+    assertEquals(4 * 1024, p.guess(4 * 1024));
+  }
+}