You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ha...@apache.org on 2022/01/06 03:48:02 UTC

[hbase] branch branch-2 updated: HBASE-26347 Support detect and exclude slow DNs in fan-out of WAL (#3800)

This is an automated email from the ASF dual-hosted git repository.

haxiaolin pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 7db2777  HBASE-26347 Support detect and exclude slow DNs in fan-out of WAL (#3800)
7db2777 is described below

commit 7db277737a42d5a68980f35cc8e093782fdde3f4
Author: Xiaolin Ha <ha...@apache.org>
AuthorDate: Thu Jan 6 11:45:38 2022 +0800

    HBASE-26347 Support detect and exclude slow DNs in fan-out of WAL (#3800)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../hbase/io/asyncfs/AsyncFSOutputHelper.java      |   5 +-
 .../io/asyncfs/FanOutOneBlockAsyncDFSOutput.java   |  56 ++++--
 .../FanOutOneBlockAsyncDFSOutputHelper.java        |  37 ++--
 .../io/asyncfs/monitor/ExcludeDatanodeManager.java | 114 +++++++++++
 .../io/asyncfs/monitor/StreamSlowMonitor.java      | 217 +++++++++++++++++++++
 .../io/asyncfs/TestExcludeDatanodeManager.java     |  87 +++++++++
 .../asyncfs/TestFanOutOneBlockAsyncDFSOutput.java  |  48 ++++-
 .../hbase/io/asyncfs/TestLocalAsyncOutput.java     |   6 +-
 .../TestSaslFanOutOneBlockAsyncDFSOutput.java      |   8 +-
 .../wal/AbstractProtobufLogWriter.java             |  10 +-
 .../hadoop/hbase/regionserver/wal/AsyncFSWAL.java  |  13 +-
 .../regionserver/wal/AsyncProtobufLogWriter.java   |   6 +-
 .../hbase/regionserver/wal/ProtobufLogWriter.java  |   5 +-
 .../hadoop/hbase/wal/AsyncFSWALProvider.java       |  19 +-
 .../apache/hadoop/hbase/wal/FSHLogProvider.java    |   8 +-
 .../org/apache/hadoop/hbase/wal/WALFactory.java    |  10 +-
 .../regionserver/wal/AbstractTestWALReplay.java    |   6 +-
 .../apache/hadoop/hbase/wal/IOTestProvider.java    |   9 +-
 18 files changed, 592 insertions(+), 72 deletions(-)

diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
index 452da1c..5b71319 100644
--- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
+++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -47,11 +48,11 @@ public final class AsyncFSOutputHelper {
    */
   public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite,
       boolean createParent, short replication, long blockSize, EventLoopGroup eventLoopGroup,
-      Class<? extends Channel> channelClass)
+      Class<? extends Channel> channelClass, StreamSlowMonitor monitor)
       throws IOException, CommonFSUtils.StreamLacksCapabilityException {
     if (fs instanceof DistributedFileSystem) {
       return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f,
-        overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass);
+        overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass, monitor);
     }
     final FSDataOutputStream out;
     int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
index 457b7c1..dda991b 100644
--- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
+++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
@@ -32,7 +32,7 @@ import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
-import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -45,7 +45,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.Encryptor;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose;
+import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -68,6 +70,7 @@ import org.apache.hbase.thirdparty.io.netty.channel.Channel;
 import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler.Sharable;
 import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
 import org.apache.hbase.thirdparty.io.netty.channel.ChannelId;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundInvoker;
 import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
 import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
 import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
@@ -121,7 +124,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
 
   private final Encryptor encryptor;
 
-  private final List<Channel> datanodeList;
+  private final Map<Channel, DatanodeInfo> datanodeInfoMap;
 
   private final DataChecksum summer;
 
@@ -137,17 +140,22 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
 
     // should be backed by a thread safe collection
     private final Set<ChannelId> unfinishedReplicas;
+    private final long packetDataLen;
+    private final long flushTimestamp;
+    private long lastAckTimestamp = -1;
 
     public Callback(CompletableFuture<Long> future, long ackedLength,
-        Collection<Channel> replicas) {
+      final Collection<Channel> replicas, long packetDataLen) {
       this.future = future;
       this.ackedLength = ackedLength;
+      this.packetDataLen = packetDataLen;
+      this.flushTimestamp = EnvironmentEdgeManager.currentTime();
       if (replicas.isEmpty()) {
         this.unfinishedReplicas = Collections.emptySet();
       } else {
         this.unfinishedReplicas =
           Collections.newSetFromMap(new ConcurrentHashMap<ChannelId, Boolean>(replicas.size()));
-        replicas.stream().map(c -> c.id()).forEachOrdered(unfinishedReplicas::add);
+        replicas.stream().map(Channel::id).forEachOrdered(unfinishedReplicas::add);
       }
     }
   }
@@ -177,6 +185,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
 
   private volatile State state;
 
+  private final StreamSlowMonitor streamSlowMonitor;
+
   // all lock-free to make it run faster
   private void completed(Channel channel) {
     for (Iterator<Callback> iter = waitingAckQueue.iterator(); iter.hasNext();) {
@@ -184,6 +194,10 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
       // if the current unfinished replicas does not contain us then it means that we have already
       // acked this one, let's iterate to find the one we have not acked yet.
       if (c.unfinishedReplicas.remove(channel.id())) {
+        long current = EnvironmentEdgeManager.currentTime();
+        streamSlowMonitor.checkProcessTimeAndSpeed(datanodeInfoMap.get(channel), c.packetDataLen,
+            current - c.flushTimestamp, c.lastAckTimestamp, c.unfinishedReplicas.size());
+        c.lastAckTimestamp = current;
         if (c.unfinishedReplicas.isEmpty()) {
           // we need to remove first before complete the future. It is possible that after we
           // complete the future the upper layer will call close immediately before we remove the
@@ -246,7 +260,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
       }
       break;
     }
-    datanodeList.forEach(ch -> ch.close());
+    datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close);
   }
 
   @Sharable
@@ -314,7 +328,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
 
   private void setupReceiver(int timeoutMs) {
     AckHandler ackHandler = new AckHandler(timeoutMs);
-    for (Channel ch : datanodeList) {
+    for (Channel ch : datanodeInfoMap.keySet()) {
       ch.pipeline().addLast(
         new IdleStateHandler(timeoutMs, timeoutMs / 2, 0, TimeUnit.MILLISECONDS),
         new ProtobufVarint32FrameDecoder(),
@@ -325,8 +339,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
 
   FanOutOneBlockAsyncDFSOutput(Configuration conf,DistributedFileSystem dfs,
       DFSClient client, ClientProtocol namenode, String clientName, String src, long fileId,
-      LocatedBlock locatedBlock, Encryptor encryptor, List<Channel> datanodeList,
-      DataChecksum summer, ByteBufAllocator alloc) {
+      LocatedBlock locatedBlock, Encryptor encryptor, Map<Channel, DatanodeInfo> datanodeInfoMap,
+      DataChecksum summer, ByteBufAllocator alloc, StreamSlowMonitor streamSlowMonitor) {
     this.conf = conf;
     this.dfs = dfs;
     this.client = client;
@@ -337,13 +351,14 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
     this.block = locatedBlock.getBlock();
     this.locations = locatedBlock.getLocations();
     this.encryptor = encryptor;
-    this.datanodeList = datanodeList;
+    this.datanodeInfoMap = datanodeInfoMap;
     this.summer = summer;
     this.maxDataLen = MAX_DATA_LEN - (MAX_DATA_LEN % summer.getBytesPerChecksum());
     this.alloc = alloc;
     this.buf = alloc.directBuffer(sendBufSizePRedictor.initialSize());
     this.state = State.STREAMING;
     setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT));
+    this.streamSlowMonitor = streamSlowMonitor;
   }
 
   @Override
@@ -395,7 +410,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
     ByteBuf headerBuf = alloc.buffer(headerLen);
     header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
     headerBuf.writerIndex(headerLen);
-    Callback c = new Callback(future, nextPacketOffsetInBlock + dataLen, datanodeList);
+    Callback c = new Callback(future, nextPacketOffsetInBlock + dataLen,
+        datanodeInfoMap.keySet(), dataLen);
     waitingAckQueue.addLast(c);
     // recheck again after we pushed the callback to queue
     if (state != State.STREAMING && waitingAckQueue.peekFirst() == c) {
@@ -404,7 +420,9 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
       waitingAckQueue.removeFirst();
       return;
     }
-    datanodeList.forEach(ch -> {
+    // TODO: we should perhaps measure time taken per DN here;
+    //       we could collect statistics per DN, and/or exclude bad nodes in createOutput.
+    datanodeInfoMap.keySet().forEach(ch -> {
       ch.write(headerBuf.retainedDuplicate());
       ch.write(checksumBuf.retainedDuplicate());
       ch.writeAndFlush(dataBuf.retainedDuplicate());
@@ -426,7 +444,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
       long lengthAfterFlush = nextPacketOffsetInBlock + dataLen;
       Callback lastFlush = waitingAckQueue.peekLast();
       if (lastFlush != null) {
-        Callback c = new Callback(future, lengthAfterFlush, Collections.emptyList());
+        Callback c = new Callback(future, lengthAfterFlush, Collections.emptySet(), dataLen);
         waitingAckQueue.addLast(c);
         // recheck here if we have already removed the previous callback from the queue
         if (waitingAckQueue.peekFirst() == c) {
@@ -526,8 +544,8 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
     header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
     headerBuf.writerIndex(headerLen);
     CompletableFuture<Long> future = new CompletableFuture<>();
-    waitingAckQueue.add(new Callback(future, finalizedLength, datanodeList));
-    datanodeList.forEach(ch -> ch.writeAndFlush(headerBuf.retainedDuplicate()));
+    waitingAckQueue.add(new Callback(future, finalizedLength, datanodeInfoMap.keySet(), 0));
+    datanodeInfoMap.keySet().forEach(ch -> ch.writeAndFlush(headerBuf.retainedDuplicate()));
     headerBuf.release();
     try {
       future.get();
@@ -544,13 +562,14 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
    * The close method when error occurred. Now we just call recoverFileLease.
    */
   @Override
+  @SuppressWarnings("FutureReturnValueIgnored")
   public void recoverAndClose(CancelableProgressable reporter) throws IOException {
     if (buf != null) {
       buf.release();
       buf = null;
     }
-    datanodeList.forEach(ch -> ch.close());
-    datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly());
+    datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close);
+    datanodeInfoMap.keySet().forEach(ch -> ch.closeFuture().awaitUninterruptibly());
     endFileLease(client, fileId);
     RecoverLeaseFSUtils.recoverFileLease(dfs, new Path(src), conf,
       reporter == null ? new CancelOnClose(client) : reporter);
@@ -561,11 +580,12 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
    * {@link #recoverAndClose(CancelableProgressable)} if this method throws an exception.
    */
   @Override
+  @SuppressWarnings("FutureReturnValueIgnored")
   public void close() throws IOException {
     endBlock();
     state = State.CLOSED;
-    datanodeList.forEach(ch -> ch.close());
-    datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly());
+    datanodeInfoMap.keySet().forEach(ChannelOutboundInvoker::close);
+    datanodeInfoMap.keySet().forEach(ch -> ch.closeFuture().awaitUninterruptibly());
     block.setNumBytes(ackedBlockLength);
     completeFile(client, namenode, src, clientName, block, fileId);
   }
diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
index 2978eb9..45ff1cb 100644
--- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
+++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
@@ -33,9 +33,12 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.CryptoProtocolVersion;
 import org.apache.hadoop.crypto.Encryptor;
@@ -47,6 +50,8 @@ import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hbase.client.ConnectionUtils;
+import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager;
+import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSOutputStream;
@@ -128,8 +133,6 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
   // Timeouts for communicating with DataNode for streaming writes/reads
   public static final int READ_TIMEOUT = 60 * 1000;
 
-  private static final DatanodeInfo[] EMPTY_DN_ARRAY = new DatanodeInfo[0];
-
   private interface LeaseManager {
 
     void begin(DFSClient client, long inodeId);
@@ -511,15 +514,20 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
 
   private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src,
       boolean overwrite, boolean createParent, short replication, long blockSize,
-      EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException {
+      EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass,
+      StreamSlowMonitor monitor) throws IOException {
     Configuration conf = dfs.getConf();
     DFSClient client = dfs.getClient();
     String clientName = client.getClientName();
     ClientProtocol namenode = client.getNamenode();
     int createMaxRetries = conf.getInt(ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES,
       DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES);
-    DatanodeInfo[] excludesNodes = EMPTY_DN_ARRAY;
+    ExcludeDatanodeManager excludeDatanodeManager = monitor.getExcludeDatanodeManager();
+    Set<DatanodeInfo> toExcludeNodes =
+      new HashSet<>(excludeDatanodeManager.getExcludeDNs().keySet());
     for (int retry = 0;; retry++) {
+      LOG.debug("When create output stream for {}, exclude list is {}, retry={}", src,
+          toExcludeNodes, retry);
       HdfsFileStatus stat;
       try {
         stat = FILE_CREATOR.create(namenode, src,
@@ -539,24 +547,26 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
       List<Future<Channel>> futureList = null;
       try {
         DataChecksum summer = createChecksum(client);
-        locatedBlock = namenode.addBlock(src, client.getClientName(), null, excludesNodes,
-          stat.getFileId(), null, null);
-        List<Channel> datanodeList = new ArrayList<>();
+        locatedBlock = namenode.addBlock(src, client.getClientName(), null,
+          toExcludeNodes.toArray(new DatanodeInfo[0]), stat.getFileId(), null, null);
+        Map<Channel, DatanodeInfo> datanodes = new IdentityHashMap<>();
         futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L,
           PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass);
         for (int i = 0, n = futureList.size(); i < n; i++) {
+          DatanodeInfo datanodeInfo = locatedBlock.getLocations()[i];
           try {
-            datanodeList.add(futureList.get(i).syncUninterruptibly().getNow());
+            datanodes.put(futureList.get(i).syncUninterruptibly().getNow(), datanodeInfo);
           } catch (Exception e) {
             // exclude the broken DN next time
-            excludesNodes = ArrayUtils.add(excludesNodes, locatedBlock.getLocations()[i]);
+            toExcludeNodes.add(datanodeInfo);
+            excludeDatanodeManager.tryAddExcludeDN(datanodeInfo, "connect error");
             throw e;
           }
         }
         Encryptor encryptor = createEncryptor(conf, stat, client);
         FanOutOneBlockAsyncDFSOutput output =
           new FanOutOneBlockAsyncDFSOutput(conf, dfs, client, namenode, clientName, src,
-            stat.getFileId(), locatedBlock, encryptor, datanodeList, summer, ALLOC);
+            stat.getFileId(), locatedBlock, encryptor, datanodes, summer, ALLOC, monitor);
         succ = true;
         return output;
       } catch (RemoteException e) {
@@ -607,14 +617,15 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
    */
   public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f,
       boolean overwrite, boolean createParent, short replication, long blockSize,
-      EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException {
+      EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass,
+      final StreamSlowMonitor monitor) throws IOException {
     return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() {
 
       @Override
       public FanOutOneBlockAsyncDFSOutput doCall(Path p)
           throws IOException, UnresolvedLinkException {
         return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication,
-          blockSize, eventLoopGroup, channelClass);
+          blockSize, eventLoopGroup, channelClass, monitor);
       }
 
       @Override
diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/monitor/ExcludeDatanodeManager.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/monitor/ExcludeDatanodeManager.java
new file mode 100644
index 0000000..80748ca
--- /dev/null
+++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/monitor/ExcludeDatanodeManager.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.asyncfs.monitor;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.conf.ConfigurationObserver;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
+import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
+
+/**
+ * The class to manage the excluded datanodes of the WALs on the regionserver.
+ */
+@InterfaceAudience.Private
+public class ExcludeDatanodeManager implements ConfigurationObserver {
+  private static final Logger LOG = LoggerFactory.getLogger(ExcludeDatanodeManager.class);
+
+  /**
+   * Configure for the max count the excluded datanodes.
+   */
+  public static final String WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT_KEY =
+    "hbase.regionserver.async.wal.max.exclude.datanode.count";
+  public static final int DEFAULT_WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT = 3;
+
+  /**
+   * Configure for the TTL time of the datanodes excluded
+   */
+  public static final String WAL_EXCLUDE_DATANODE_TTL_KEY =
+    "hbase.regionserver.async.wal.exclude.datanode.info.ttl.hour";
+  public static final int DEFAULT_WAL_EXCLUDE_DATANODE_TTL = 6; // 6 hours
+
+  private volatile Cache<DatanodeInfo, Long> excludeDNsCache;
+  private final int maxExcludeDNCount;
+  private final Configuration conf;
+  // This is a map of providerId->StreamSlowMonitor
+  private final Map<String, StreamSlowMonitor> streamSlowMonitors =
+    new ConcurrentHashMap<>(1);
+
+  public ExcludeDatanodeManager(Configuration conf) {
+    this.conf = conf;
+    this.maxExcludeDNCount = conf.getInt(WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT_KEY,
+      DEFAULT_WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT);
+    this.excludeDNsCache = CacheBuilder.newBuilder()
+      .expireAfterWrite(this.conf.getLong(WAL_EXCLUDE_DATANODE_TTL_KEY,
+        DEFAULT_WAL_EXCLUDE_DATANODE_TTL), TimeUnit.HOURS)
+      .maximumSize(this.maxExcludeDNCount)
+      .build();
+  }
+
+  /**
+   * Try to add a datanode to the regionserver excluding cache
+   * @param datanodeInfo the datanode to be added to the excluded cache
+   * @param cause the cause that the datanode is hope to be excluded
+   * @return True if the datanode is added to the regionserver excluding cache, false otherwise
+   */
+  public boolean tryAddExcludeDN(DatanodeInfo datanodeInfo, String cause) {
+    boolean alreadyMarkedSlow = getExcludeDNs().containsKey(datanodeInfo);
+    if (!alreadyMarkedSlow) {
+      excludeDNsCache.put(datanodeInfo, EnvironmentEdgeManager.currentTime());
+      LOG.info(
+        "Added datanode: {} to exclude cache by [{}] success, current excludeDNsCache size={}",
+        datanodeInfo, cause, excludeDNsCache.size());
+      return true;
+    }
+    LOG.debug("Try add datanode {} to exclude cache by [{}] failed, "
+        + "current exclude DNs are {}", datanodeInfo, cause, getExcludeDNs().keySet());
+    return false;
+  }
+
+  public StreamSlowMonitor getStreamSlowMonitor(String name) {
+    String key = name == null || name.isEmpty() ? "defaultMonitorName" : name;
+    return streamSlowMonitors
+      .computeIfAbsent(key, k -> new StreamSlowMonitor(conf, key, this));
+  }
+
+  public Map<DatanodeInfo, Long> getExcludeDNs() {
+    return excludeDNsCache.asMap();
+  }
+
+  @Override
+  public void onConfigurationChange(Configuration conf) {
+    for (StreamSlowMonitor monitor : streamSlowMonitors.values()) {
+      monitor.onConfigurationChange(conf);
+    }
+    this.excludeDNsCache = CacheBuilder.newBuilder().expireAfterWrite(
+      this.conf.getLong(WAL_EXCLUDE_DATANODE_TTL_KEY, DEFAULT_WAL_EXCLUDE_DATANODE_TTL),
+      TimeUnit.HOURS).maximumSize(this.conf
+      .getInt(WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT_KEY, DEFAULT_WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT))
+      .build();
+  }
+}
diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/monitor/StreamSlowMonitor.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/monitor/StreamSlowMonitor.java
new file mode 100644
index 0000000..7ee04f8
--- /dev/null
+++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/monitor/StreamSlowMonitor.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.asyncfs.monitor;
+
+import static org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager.DEFAULT_WAL_EXCLUDE_DATANODE_TTL;
+import static org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager.DEFAULT_WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT;
+import static org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager.WAL_EXCLUDE_DATANODE_TTL_KEY;
+import static org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager.WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT_KEY;
+
+import java.util.Deque;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.conf.ConfigurationObserver;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
+import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
+import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
+
+/**
+ * Class for monitor the wal file flush performance.
+ * Each active wal file has a StreamSlowMonitor.
+ */
+@InterfaceAudience.Private
+public class StreamSlowMonitor implements ConfigurationObserver {
+  private static final Logger LOG = LoggerFactory.getLogger(StreamSlowMonitor.class);
+
+  /**
+   * Configure for the min count for a datanode detected slow.
+   * If a datanode is detected slow times up to this count, then it will be added to the exclude
+   * datanode cache by {@link ExcludeDatanodeManager#tryAddExcludeDN(DatanodeInfo, String)}
+   * of this regionsever.
+   */
+  private static final String WAL_SLOW_DETECT_MIN_COUNT_KEY =
+    "hbase.regionserver.async.wal.min.slow.detect.count";
+  private static final int DEFAULT_WAL_SLOW_DETECT_MIN_COUNT = 3;
+
+  /**
+   * Configure for the TTL of the data that a datanode detected slow.
+   */
+  private static final String WAL_SLOW_DETECT_DATA_TTL_KEY =
+    "hbase.regionserver.async.wal.slow.detect.data.ttl.ms";
+  private static final long DEFAULT_WAL_SLOW_DETECT_DATA_TTL = 10 * 60 * 1000; // 10min in ms
+
+  /**
+   * Configure for the speed check of packet min length.
+   * For packets whose data length smaller than this value, check slow by processing time.
+   * While for packets whose data length larger than this value, check slow by flushing speed.
+   */
+  private static final String DATANODE_PACKET_FLUSH_CHECK_SPEED_MIN_DATA_LENGTH_KEY =
+    "hbase.regionserver.async.wal.datanode.slow.check.speed.packet.data.length.min";
+  private static final long DEFAULT_DATANODE_PACKET_FLUSH_CHECK_SPEED_MIN_DATA_LENGTH =
+    64 * 1024; //64KB
+
+  /**
+   * Configure for the slow packet process time, a duration from send to ACK.
+   * The processing time check is for packets that data length smaller than
+   * {@link StreamSlowMonitor#DATANODE_PACKET_FLUSH_CHECK_SPEED_MIN_DATA_LENGTH_KEY}
+   */
+  public static final String DATANODE_SLOW_PACKET_PROCESS_TIME_KEY =
+    "hbase.regionserver.async.wal.datanode.slow.packet.process.time.millis";
+  private static final long DEFAULT_DATANODE_SLOW_PACKET_PROCESS_TIME = 6000; // 6s in ms
+
+  /**
+   * Configure for the check of large packet(which is configured by
+   * {@link StreamSlowMonitor#DATANODE_PACKET_FLUSH_CHECK_SPEED_MIN_DATA_LENGTH_KEY}) flush speed.
+   * e.g. If the configured slow packet process time is smaller than 10s, then here 20KB/s means
+   * 64KB should be processed in less than 3.2s.
+   */
+  private static final String DATANODE_SLOW_PACKET_FLUSH_MIN_SPEED_KEY =
+    "hbase.regionserver.async.wal.datanode.slow.packet.speed.min.kbs";
+  private static final double DEFAULT_DATANODE_SLOW_PACKET_FLUSH_MIN_SPEED = 20; // 20KB/s
+
+  private final String name;
+  // this is a map of datanodeInfo->queued slow PacketAckData
+  private final LoadingCache<DatanodeInfo, Deque<PacketAckData>> datanodeSlowDataQueue;
+  private final ExcludeDatanodeManager excludeDatanodeManager;
+
+  private int minSlowDetectCount;
+  private long slowDataTtl;
+  private long slowPacketAckMs;
+  private double minPacketFlushSpeedKBs;
+  private long minLengthForSpeedCheck;
+
+  public StreamSlowMonitor(Configuration conf, String name,
+      ExcludeDatanodeManager excludeDatanodeManager) {
+    setConf(conf);
+    this.name = name;
+    this.excludeDatanodeManager = excludeDatanodeManager;
+    this.datanodeSlowDataQueue = CacheBuilder.newBuilder()
+      .maximumSize(conf.getInt(WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT_KEY,
+        DEFAULT_WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT))
+      .expireAfterWrite(conf.getLong(WAL_EXCLUDE_DATANODE_TTL_KEY,
+        DEFAULT_WAL_EXCLUDE_DATANODE_TTL), TimeUnit.HOURS)
+      .build(new CacheLoader<DatanodeInfo, Deque<PacketAckData>>() {
+        @Override
+        public Deque<PacketAckData> load(DatanodeInfo key) throws Exception {
+          return new ConcurrentLinkedDeque<>();
+        }
+      });
+    LOG.info("New stream slow monitor {}", this.name);
+  }
+
+  public static StreamSlowMonitor create(Configuration conf, String name) {
+    return new StreamSlowMonitor(conf, name, new ExcludeDatanodeManager(conf));
+  }
+
+  /**
+   * Check if the packet process time shows that the relevant datanode is a slow node.
+   * @param datanodeInfo the datanode that processed the packet
+   * @param packetDataLen the data length of the packet (in bytes)
+   * @param processTimeMs the process time (in ms) of the packet on the datanode,
+   * @param lastAckTimestamp the last acked timestamp of the packet on another datanode
+   * @param unfinished if the packet is unfinished flushed to the datanode replicas
+   */
+  public void checkProcessTimeAndSpeed(DatanodeInfo datanodeInfo, long packetDataLen,
+      long processTimeMs, long lastAckTimestamp, int unfinished) {
+    long current = EnvironmentEdgeManager.currentTime();
+    // Here are two conditions used to determine whether a datanode is slow,
+    // 1. For small packet, we just have a simple time limit, without considering
+    // the size of the packet.
+    // 2. For large packet, we will calculate the speed, and check if the speed is too slow.
+    boolean slow = (packetDataLen <= minLengthForSpeedCheck && processTimeMs > slowPacketAckMs) || (
+      packetDataLen > minLengthForSpeedCheck
+        && (double) packetDataLen / processTimeMs < minPacketFlushSpeedKBs);
+    if (slow) {
+      // Check if large diff ack timestamp between replicas,
+      // should try to avoid misjudgments that caused by GC STW.
+      if ((lastAckTimestamp > 0 && current - lastAckTimestamp > slowPacketAckMs / 2) || (
+          lastAckTimestamp <= 0 && unfinished == 0)) {
+        LOG.info("Slow datanode: {}, data length={}, duration={}ms, unfinishedReplicas={}, "
+            + "lastAckTimestamp={}, monitor name: {}", datanodeInfo, packetDataLen, processTimeMs,
+          unfinished, lastAckTimestamp, this.name);
+        if (addSlowAckData(datanodeInfo, packetDataLen, processTimeMs)) {
+          excludeDatanodeManager.tryAddExcludeDN(datanodeInfo, "slow packet ack");
+        }
+      }
+    }
+  }
+
+  @Override
+  public void onConfigurationChange(Configuration conf) {
+    setConf(conf);
+  }
+
+  private boolean addSlowAckData(DatanodeInfo datanodeInfo, long dataLength, long processTime) {
+    Deque<PacketAckData> slowDNQueue = datanodeSlowDataQueue.getUnchecked(datanodeInfo);
+    long current = EnvironmentEdgeManager.currentTime();
+    while (!slowDNQueue.isEmpty() && (current - slowDNQueue.getFirst().getTimestamp() > slowDataTtl
+      || slowDNQueue.size() >= minSlowDetectCount)) {
+      slowDNQueue.removeFirst();
+    }
+    slowDNQueue.addLast(new PacketAckData(dataLength, processTime));
+    return slowDNQueue.size() >= minSlowDetectCount;
+  }
+
+  private void setConf(Configuration conf) {
+    this.minSlowDetectCount = conf.getInt(WAL_SLOW_DETECT_MIN_COUNT_KEY,
+        DEFAULT_WAL_SLOW_DETECT_MIN_COUNT);
+    this.slowDataTtl = conf.getLong(WAL_SLOW_DETECT_DATA_TTL_KEY, DEFAULT_WAL_SLOW_DETECT_DATA_TTL);
+    this.slowPacketAckMs = conf.getLong(DATANODE_SLOW_PACKET_PROCESS_TIME_KEY,
+        DEFAULT_DATANODE_SLOW_PACKET_PROCESS_TIME);
+    this.minLengthForSpeedCheck = conf.getLong(
+        DATANODE_PACKET_FLUSH_CHECK_SPEED_MIN_DATA_LENGTH_KEY,
+        DEFAULT_DATANODE_PACKET_FLUSH_CHECK_SPEED_MIN_DATA_LENGTH);
+    this.minPacketFlushSpeedKBs = conf.getDouble(DATANODE_SLOW_PACKET_FLUSH_MIN_SPEED_KEY,
+      DEFAULT_DATANODE_SLOW_PACKET_FLUSH_MIN_SPEED);
+  }
+
+  public ExcludeDatanodeManager getExcludeDatanodeManager() {
+    return excludeDatanodeManager;
+  }
+
+  private static class PacketAckData {
+    private final long dataLength;
+    private final long processTime;
+    private final long timestamp;
+
+    public PacketAckData(long dataLength, long processTime) {
+      this.dataLength = dataLength;
+      this.processTime = processTime;
+      this.timestamp = EnvironmentEdgeManager.currentTime();
+    }
+
+    public long getDataLength() {
+      return dataLength;
+    }
+
+    public long getProcessTime() {
+      return processTime;
+    }
+
+    public long getTimestamp() {
+      return timestamp;
+    }
+  }
+}
diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestExcludeDatanodeManager.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestExcludeDatanodeManager.java
new file mode 100644
index 0000000..a3da52e
--- /dev/null
+++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestExcludeDatanodeManager.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.asyncfs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager;
+import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ SmallTests.class })
+public class TestExcludeDatanodeManager {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestExcludeDatanodeManager.class);
+
+  @Test
+  public void testExcludeSlowDNBySpeed() {
+    Configuration conf = HBaseConfiguration.create();
+    ExcludeDatanodeManager excludeDatanodeManager = new ExcludeDatanodeManager(conf);
+    StreamSlowMonitor streamSlowDNsMonitor =
+      excludeDatanodeManager.getStreamSlowMonitor("testMonitor");
+    assertEquals(0, excludeDatanodeManager.getExcludeDNs().size());
+    DatanodeInfo datanodeInfo =
+      new DatanodeInfo.DatanodeInfoBuilder().setIpAddr("0.0.0.0").setHostName("hostname1")
+        .setDatanodeUuid("uuid1").setXferPort(111).setInfoPort(222).setInfoSecurePort(333)
+        .setIpcPort(444).setNetworkLocation("location1").build();
+    streamSlowDNsMonitor
+      .checkProcessTimeAndSpeed(datanodeInfo, 100000, 5100,
+        System.currentTimeMillis() - 5100, 0);
+    streamSlowDNsMonitor
+      .checkProcessTimeAndSpeed(datanodeInfo, 100000, 5100,
+        System.currentTimeMillis() - 5100, 0);
+    streamSlowDNsMonitor
+      .checkProcessTimeAndSpeed(datanodeInfo, 100000, 5100,
+        System.currentTimeMillis() - 5100, 0);
+    assertEquals(1, excludeDatanodeManager.getExcludeDNs().size());
+    assertTrue(excludeDatanodeManager.getExcludeDNs().containsKey(datanodeInfo));
+  }
+
+  @Test
+  public void testExcludeSlowDNByProcessTime() {
+    Configuration conf = HBaseConfiguration.create();
+    ExcludeDatanodeManager excludeDatanodeManager = new ExcludeDatanodeManager(conf);
+    StreamSlowMonitor streamSlowDNsMonitor =
+      excludeDatanodeManager.getStreamSlowMonitor("testMonitor");
+    assertEquals(0, excludeDatanodeManager.getExcludeDNs().size());
+    DatanodeInfo datanodeInfo =
+      new DatanodeInfo.DatanodeInfoBuilder().setIpAddr("0.0.0.0").setHostName("hostname1")
+        .setDatanodeUuid("uuid1").setXferPort(111).setInfoPort(222).setInfoSecurePort(333)
+        .setIpcPort(444).setNetworkLocation("location1").build();
+    streamSlowDNsMonitor
+      .checkProcessTimeAndSpeed(datanodeInfo, 5000, 7000,
+        System.currentTimeMillis() - 7000, 0);
+    streamSlowDNsMonitor
+      .checkProcessTimeAndSpeed(datanodeInfo, 5000, 7000,
+        System.currentTimeMillis() - 7000, 0);
+    streamSlowDNsMonitor
+      .checkProcessTimeAndSpeed(datanodeInfo, 5000, 7000,
+        System.currentTimeMillis() - 7000, 0);
+    assertEquals(1, excludeDatanodeManager.getExcludeDNs().size());
+    assertTrue(excludeDatanodeManager.getExcludeDNs().containsKey(datanodeInfo));
+  }
+}
diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
index 03ff1ee..8533d38 100644
--- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
+++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
@@ -39,6 +39,9 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager;
+import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.MiscTests;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -78,6 +81,8 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
 
   private static int READ_TIMEOUT_MS = 2000;
 
+  private static StreamSlowMonitor MONITOR;
+
   @Rule
   public TestName name = new TestName();
 
@@ -88,6 +93,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
     FS = CLUSTER.getFileSystem();
     EVENT_LOOP_GROUP = new NioEventLoopGroup();
     CHANNEL_CLASS = NioSocketChannel.class;
+    MONITOR = StreamSlowMonitor.create(UTIL.getConfiguration(), "testMonitor");
   }
 
   @AfterClass
@@ -133,7 +139,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
     Path f = new Path("/" + name.getMethodName());
     EventLoop eventLoop = EVENT_LOOP_GROUP.next();
     FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
-      false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
+      false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
     writeAndVerify(FS, f, out);
   }
 
@@ -142,7 +148,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
     Path f = new Path("/" + name.getMethodName());
     EventLoop eventLoop = EVENT_LOOP_GROUP.next();
     FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
-      false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
+      false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
     byte[] b = new byte[10];
     ThreadLocalRandom.current().nextBytes(b);
     out.write(b, 0, b.length);
@@ -171,7 +177,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
     Path f = new Path("/" + name.getMethodName());
     EventLoop eventLoop = EVENT_LOOP_GROUP.next();
     FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
-      false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
+      false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
     Thread.sleep(READ_TIMEOUT_MS * 2);
     // the connection to datanode should still alive.
     writeAndVerify(FS, f, out);
@@ -186,7 +192,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
     EventLoop eventLoop = EVENT_LOOP_GROUP.next();
     try {
       FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
-        FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
+        FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
       fail("should fail with parent does not exist");
     } catch (RemoteException e) {
       LOG.info("expected exception caught", e);
@@ -209,9 +215,39 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
     Path f = new Path("/test");
     EventLoop eventLoop = EVENT_LOOP_GROUP.next();
     try (FanOutOneBlockAsyncDFSOutput output = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS,
-      f, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS)) {
+      f, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR)) {
+      // should exclude the dead dn when retry so here we only have 2 DNs in pipeline
+      assertEquals(2, output.getPipeline().length);
+    } finally {
+      CLUSTER.restartDataNode(dnProp);
+    }
+  }
+
+  @Test
+  public void testExcludeFailedConnectToDatanode()
+    throws IOException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException,
+    InvocationTargetException, InterruptedException, NoSuchFieldException {
+    Field xceiverServerDaemonField = DataNode.class.getDeclaredField("dataXceiverServer");
+    xceiverServerDaemonField.setAccessible(true);
+    Class<?> xceiverServerClass =
+      Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer");
+    Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers");
+    numPeersMethod.setAccessible(true);
+    // make one datanode broken
+    DataNodeProperties dnProp = CLUSTER.stopDataNode(0);
+    Path f = new Path("/test");
+    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
+    ExcludeDatanodeManager excludeDatanodeManager =
+      new ExcludeDatanodeManager(HBaseConfiguration.create());
+    StreamSlowMonitor streamSlowDNsMonitor =
+      excludeDatanodeManager.getStreamSlowMonitor("testMonitor");
+    assertEquals(0, excludeDatanodeManager.getExcludeDNs().size());
+    try (FanOutOneBlockAsyncDFSOutput output = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS,
+      f, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop,
+      CHANNEL_CLASS, streamSlowDNsMonitor)) {
       // should exclude the dead dn when retry so here we only have 2 DNs in pipeline
       assertEquals(2, output.getPipeline().length);
+      assertEquals(1, excludeDatanodeManager.getExcludeDNs().size());
     } finally {
       CLUSTER.restartDataNode(dnProp);
     }
@@ -222,7 +258,7 @@ public class TestFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
     Path f = new Path("/" + name.getMethodName());
     EventLoop eventLoop = EVENT_LOOP_GROUP.next();
     FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
-      false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS);
+      false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS, MONITOR);
     byte[] b = new byte[50 * 1024 * 1024];
     ThreadLocalRandom.current().nextBytes(b);
     out.write(b);
diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
index d2fdf17..66735a3 100644
--- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
+++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
 import org.apache.hadoop.hbase.testclassification.MiscTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
@@ -49,10 +50,13 @@ public class TestLocalAsyncOutput {
 
   private static final HBaseCommonTestingUtility TEST_UTIL = new HBaseCommonTestingUtility();
 
+  private static StreamSlowMonitor MONITOR;
+
   @AfterClass
   public static void tearDownAfterClass() throws IOException {
     TEST_UTIL.cleanupTestDir();
     GROUP.shutdownGracefully();
+    MONITOR = StreamSlowMonitor.create(TEST_UTIL.getConfiguration(), "testMonitor");
   }
 
   @Test
@@ -61,7 +65,7 @@ public class TestLocalAsyncOutput {
     Path f = new Path(TEST_UTIL.getDataTestDir(), "test");
     FileSystem fs = FileSystem.getLocal(TEST_UTIL.getConfiguration());
     AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, false, true,
-      fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP, CHANNEL_CLASS);
+      fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP, CHANNEL_CLASS, MONITOR);
     TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(fs, f, out);
   }
 }
diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
index 2de7f41..ab23b74 100644
--- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
+++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
@@ -21,7 +21,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
-
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Method;
@@ -40,6 +39,7 @@ import org.apache.hadoop.crypto.key.KeyProvider;
 import org.apache.hadoop.crypto.key.KeyProviderFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
 import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
 import org.apache.hadoop.hbase.security.SecurityConstants;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -62,7 +62,6 @@ import org.junit.runners.Parameterized.Parameter;
 import org.junit.runners.Parameterized.Parameters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.hbase.thirdparty.io.netty.channel.Channel;
 import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
 import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
@@ -102,6 +101,8 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
 
   private static String TEST_KEY_NAME = "test_key";
 
+  private static StreamSlowMonitor MONITOR;
+
   @Rule
   public TestName name = new TestName();
 
@@ -187,6 +188,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
     HBaseKerberosUtils.setSecuredConfiguration(UTIL.getConfiguration(),
       PRINCIPAL + "@" + KDC.getRealm(), HTTP_PRINCIPAL + "@" + KDC.getRealm());
     HBaseKerberosUtils.setSSLConfiguration(UTIL, TestSaslFanOutOneBlockAsyncDFSOutput.class);
+    MONITOR = StreamSlowMonitor.create(UTIL.getConfiguration(), "testMonitor");
   }
 
   @AfterClass
@@ -254,7 +256,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput extends AsyncFSTestBase {
   private void test(Path file) throws IOException, InterruptedException, ExecutionException {
     EventLoop eventLoop = EVENT_LOOP_GROUP.next();
     FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file,
-      true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
+      true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
     TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(FS, file, out);
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
index 3b84488..743369f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.crypto.Cipher;
 import org.apache.hadoop.hbase.io.crypto.Encryption;
@@ -47,7 +48,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
-
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
 
@@ -169,7 +169,8 @@ public abstract class AbstractProtobufLogWriter {
   }
 
   public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable,
-      long blocksize) throws IOException, StreamLacksCapabilityException {
+      long blocksize, StreamSlowMonitor monitor) throws IOException,
+      StreamLacksCapabilityException {
     this.conf = conf;
     boolean doCompress = initializeCompressionContext(conf, path);
     this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE);
@@ -177,7 +178,7 @@ public abstract class AbstractProtobufLogWriter {
     short replication = (short) conf.getInt("hbase.regionserver.hlog.replication",
       CommonFSUtils.getDefaultReplication(fs, path));
 
-    initOutput(fs, path, overwritable, bufferSize, replication, blocksize);
+    initOutput(fs, path, overwritable, bufferSize, replication, blocksize, monitor);
 
     boolean doTagCompress = doCompress &&
       conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
@@ -266,7 +267,8 @@ public abstract class AbstractProtobufLogWriter {
   }
 
   protected abstract void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
-      short replication, long blockSize) throws IOException, StreamLacksCapabilityException;
+      short replication, long blockSize, StreamSlowMonitor monitor)
+      throws IOException, StreamLacksCapabilityException;
 
   /**
    * return the file length after written.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index c5cd2b0..a3076ff 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
+import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
 import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALKeyImpl;
@@ -198,22 +199,26 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
 
   private final int waitOnShutdownInSeconds;
 
+  private final StreamSlowMonitor streamSlowMonitor;
+
   public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir,
       Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
       String prefix, String suffix, EventLoopGroup eventLoopGroup,
       Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
     this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix,
-        eventLoopGroup, channelClass);
+        eventLoopGroup, channelClass, StreamSlowMonitor.create(conf, "monitorForSuffix"));
   }
 
   public AsyncFSWAL(FileSystem fs, Abortable abortable, Path rootDir, String logDir,
       String archiveDir, Configuration conf, List<WALActionsListener> listeners,
       boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup,
-      Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
+      Class<? extends Channel> channelClass, StreamSlowMonitor monitor)
+      throws FailedLogCloseException, IOException {
     super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix,
         suffix);
     this.eventLoopGroup = eventLoopGroup;
     this.channelClass = channelClass;
+    this.streamSlowMonitor = monitor;
     Supplier<Boolean> hasConsumerTask;
     if (conf.getBoolean(ASYNC_WAL_USE_SHARED_EVENT_LOOP, DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP)) {
       this.consumeExecutor = eventLoopGroup.next();
@@ -655,8 +660,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
 
   @Override
   protected AsyncWriter createWriterInstance(Path path) throws IOException {
-    return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false,
-        this.blocksize, eventLoopGroup, channelClass);
+    return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false, this.blocksize,
+      eventLoopGroup, channelClass, streamSlowMonitor);
   }
 
   private void waitForSafePoint() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
index e834d65..0ab0407 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.io.ByteBufferWriter;
 import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
 import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper;
+import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
 import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
 import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -176,9 +177,10 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter
 
   @Override
   protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
-      short replication, long blockSize) throws IOException, StreamLacksCapabilityException {
+      short replication, long blockSize, StreamSlowMonitor monitor) throws IOException,
+      StreamLacksCapabilityException {
     this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication,
-        blockSize, eventLoopGroup, channelClass);
+        blockSize, eventLoopGroup, channelClass, monitor);
     this.asyncOutputWrapper = new OutputStreamWrapper(output);
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
index 4bbc13d..7ed267b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
@@ -20,12 +20,12 @@ package org.apache.hadoop.hbase.regionserver.wal;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
 import org.apache.hadoop.hbase.util.AtomicUtils;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
@@ -104,7 +104,8 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter
 
   @Override
   protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
-      short replication, long blockSize) throws IOException, StreamLacksCapabilityException {
+      short replication, long blockSize, StreamSlowMonitor monitor) throws IOException,
+      StreamLacksCapabilityException {
     this.output = CommonFSUtils.createForWal(fs, path, overwritable, bufferSize, replication,
         blockSize, false);
     if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true)) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
index 4bef4bc..4a4ac37 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutput;
 import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper;
 import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper;
+import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
 import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL;
 import org.apache.hadoop.hbase.regionserver.wal.AsyncProtobufLogWriter;
 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
@@ -60,8 +61,8 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
      * @throws StreamLacksCapabilityException if the given FileSystem can't provide streams that
      *         meet the needs of the given Writer implementation.
      */
-    void init(FileSystem fs, Path path, Configuration c, boolean overwritable, long blocksize)
-        throws IOException, CommonFSUtils.StreamLacksCapabilityException;
+    void init(FileSystem fs, Path path, Configuration c, boolean overwritable, long blocksize,
+        StreamSlowMonitor monitor) throws IOException, CommonFSUtils.StreamLacksCapabilityException;
   }
 
   private EventLoopGroup eventLoopGroup;
@@ -70,10 +71,10 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
   @Override
   protected AsyncFSWAL createWAL() throws IOException {
     return new AsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), this.abortable,
-        CommonFSUtils.getWALRootDir(conf), getWALDirectoryName(factory.factoryId),
-        getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix,
-        META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, eventLoopGroup,
-        channelClass);
+      CommonFSUtils.getWALRootDir(conf), getWALDirectoryName(factory.factoryId),
+      getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix,
+      META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, eventLoopGroup,
+      channelClass, factory.getExcludeDatanodeManager().getStreamSlowMonitor(providerId));
   }
 
   @Override
@@ -97,7 +98,7 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
       boolean overwritable, EventLoopGroup eventLoopGroup,
       Class<? extends Channel> channelClass) throws IOException {
     return createAsyncWriter(conf, fs, path, overwritable, WALUtil.getWALBlockSize(conf, fs, path),
-        eventLoopGroup, channelClass);
+        eventLoopGroup, channelClass, StreamSlowMonitor.create(conf, path.getName()));
   }
 
   /**
@@ -105,14 +106,14 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
    */
   public static AsyncWriter createAsyncWriter(Configuration conf, FileSystem fs, Path path,
       boolean overwritable, long blocksize, EventLoopGroup eventLoopGroup,
-      Class<? extends Channel> channelClass) throws IOException {
+      Class<? extends Channel> channelClass, StreamSlowMonitor monitor) throws IOException {
     // Configuration already does caching for the Class lookup.
     Class<? extends AsyncWriter> logWriterClass = conf.getClass(
       WRITER_IMPL, AsyncProtobufLogWriter.class, AsyncWriter.class);
     try {
       AsyncWriter writer = logWriterClass.getConstructor(EventLoopGroup.class, Class.class)
           .newInstance(eventLoopGroup, channelClass);
-      writer.init(fs, path, conf, overwritable, blocksize);
+      writer.init(fs, path, conf, overwritable, blocksize, monitor);
       return writer;
     } catch (Exception e) {
       if (e instanceof CommonFSUtils.StreamLacksCapabilityException) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java
index 8f2ca07..0dedda9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
 import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
 import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
@@ -48,8 +49,8 @@ public class FSHLogProvider extends AbstractFSWALProvider<FSHLog> {
      * @throws StreamLacksCapabilityException if the given FileSystem can't provide streams that
      *         meet the needs of the given Writer implementation.
      */
-    void init(FileSystem fs, Path path, Configuration c, boolean overwritable, long blocksize)
-        throws IOException, CommonFSUtils.StreamLacksCapabilityException;
+    void init(FileSystem fs, Path path, Configuration c, boolean overwritable, long blocksize,
+        StreamSlowMonitor monitor) throws IOException, CommonFSUtils.StreamLacksCapabilityException;
   }
 
   /**
@@ -76,7 +77,8 @@ public class FSHLogProvider extends AbstractFSWALProvider<FSHLog> {
     try {
       writer = logWriterClass.getDeclaredConstructor().newInstance();
       FileSystem rootFs = FileSystem.get(path.toUri(), conf);
-      writer.init(rootFs, path, conf, overwritable, blocksize);
+      writer.init(rootFs, path, conf, overwritable, blocksize,
+          StreamSlowMonitor.create(conf, path.getName()));
       return writer;
     } catch (Exception e) { 
       if (e instanceof CommonFSUtils.StreamLacksCapabilityException) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
index b84f1be..d258b61 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
@@ -21,12 +21,12 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager;
 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
 import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
@@ -105,6 +105,8 @@ public class WALFactory {
 
   private final Configuration conf;
 
+  private final ExcludeDatanodeManager excludeDatanodeManager;
+
   // Used for the singleton WALFactory, see below.
   private WALFactory(Configuration conf) {
     // this code is duplicated here so we can keep our members final.
@@ -121,6 +123,7 @@ public class WALFactory {
     provider = null;
     factoryId = SINGLETON_ID;
     this.abortable = null;
+    this.excludeDatanodeManager = new ExcludeDatanodeManager(conf);
   }
 
   Providers getDefaultProvider() {
@@ -198,6 +201,7 @@ public class WALFactory {
       AbstractFSWALProvider.Reader.class);
     this.conf = conf;
     this.factoryId = factoryId;
+    this.excludeDatanodeManager = new ExcludeDatanodeManager(conf);
     this.abortable = abortable;
     // end required early initialization
     if (conf.getBoolean(WAL_ENABLED, true)) {
@@ -499,4 +503,8 @@ public class WALFactory {
   public final WALProvider getMetaWALProvider() {
     return this.metaProvider.get();
   }
+
+  public ExcludeDatanodeManager getExcludeDatanodeManager() {
+    return excludeDatanodeManager;
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
index 69b9ea4..fd5404c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java
@@ -67,6 +67,9 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
 import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
@@ -1233,7 +1236,8 @@ public abstract class AbstractTestWALReplay {
       StreamLacksCapabilityException {
     fs.mkdirs(file.getParent());
     ProtobufLogWriter writer = new ProtobufLogWriter();
-    writer.init(fs, file, conf, true, WALUtil.getWALBlockSize(conf, fs, file));
+    writer.init(fs, file, conf, true, WALUtil.getWALBlockSize(conf, fs, file),
+      StreamSlowMonitor.create(conf, "testMonitor"));
     for (FSWALEntry entry : entries) {
       writer.append(entry);
     }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
index e624e6f..cc5fa1d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.RegionInfo;
 // imports for things that haven't moved from regionserver.wal yet.
+import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
 import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
 import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
@@ -214,7 +215,8 @@ public class IOTestProvider implements WALProvider {
         LOG.info("creating new writer instance.");
         final ProtobufLogWriter writer = new IOTestWriter();
         try {
-          writer.init(fs, path, conf, false, this.blocksize);
+          writer.init(fs, path, conf, false, this.blocksize,
+              StreamSlowMonitor.create(conf, path.getName()));
         } catch (CommonFSUtils.StreamLacksCapabilityException exception) {
           throw new IOException("Can't create writer instance because underlying FileSystem " +
               "doesn't support needed stream capabilities.", exception);
@@ -242,7 +244,8 @@ public class IOTestProvider implements WALProvider {
 
     @Override
     public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable,
-        long blocksize) throws IOException, CommonFSUtils.StreamLacksCapabilityException {
+        long blocksize, StreamSlowMonitor monitor) throws IOException,
+        CommonFSUtils.StreamLacksCapabilityException {
       Collection<String> operations = conf.getStringCollection(ALLOWED_OPERATIONS);
       if (operations.isEmpty() || operations.contains(AllowedOperations.all.name())) {
         doAppends = doSyncs = true;
@@ -254,7 +257,7 @@ public class IOTestProvider implements WALProvider {
       }
       LOG.info("IOTestWriter initialized with appends " + (doAppends ? "enabled" : "disabled") +
           " and syncs " + (doSyncs ? "enabled" : "disabled"));
-      super.init(fs, path, conf, overwritable, blocksize);
+      super.init(fs, path, conf, overwritable, blocksize, monitor);
     }
 
     @Override