You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/01/15 10:33:35 UTC

[09/48] hbase git commit: HBASE-19768 RegionServer startup failing when DN is dead

HBASE-19768 RegionServer startup failing when DN is dead


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ffa28502
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ffa28502
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ffa28502

Branch: refs/heads/HBASE-19397-branch-2
Commit: ffa28502c471945384845b0f072d1c468b0c7f31
Parents: 842f794
Author: zhangduo <zh...@apache.org>
Authored: Sun Jan 14 17:30:50 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sun Jan 14 17:31:23 2018 +0800

----------------------------------------------------------------------
 .../FanOutOneBlockAsyncDFSOutputHelper.java     | 140 ++++++++++++-------
 .../hbase/regionserver/wal/AsyncFSWAL.java      |  65 ++-------
 .../TestFanOutOneBlockAsyncDFSOutput.java       |  78 ++++-------
 .../regionserver/wal/TestAsyncLogRolling.java   |   3 +-
 4 files changed, 126 insertions(+), 160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ffa28502/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
index 4fa06a4..d7aa897 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
@@ -21,23 +21,23 @@ import static org.apache.hadoop.fs.CreateFlag.CREATE;
 import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
 import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor;
 import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
-import static org.apache.hbase.thirdparty.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
-import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
 import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE;
+import static org.apache.hbase.thirdparty.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
+import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
 
 import com.google.protobuf.CodedOutputStream;
-
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
-
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.CryptoProtocolVersion;
 import org.apache.hadoop.crypto.Encryptor;
@@ -85,6 +85,7 @@ import org.apache.hadoop.util.DataChecksum;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
 import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
 import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
@@ -121,6 +122,9 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
   private FanOutOneBlockAsyncDFSOutputHelper() {
   }
 
+  public static final String ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = "hbase.fs.async.create.retries";
+
+  public static final int DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = 10;
   // use pooled allocator for performance.
   private static final ByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT;
 
@@ -129,8 +133,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
 
   // Timeouts for communicating with DataNode for streaming writes/reads
   public static final int READ_TIMEOUT = 60 * 1000;
-  public static final int READ_TIMEOUT_EXTENSION = 5 * 1000;
-  public static final int WRITE_TIMEOUT = 8 * 60 * 1000;
+
+  private static final DatanodeInfo[] EMPTY_DN_ARRAY = new DatanodeInfo[0];
 
   // helper class for getting Status from PipelineAckProto. In hadoop 2.6 or before, there is a
   // getStatus method, and for hadoop 2.7 or after, the status is retrieved from flag. The flag may
@@ -744,58 +748,90 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
     DFSClient client = dfs.getClient();
     String clientName = client.getClientName();
     ClientProtocol namenode = client.getNamenode();
-    HdfsFileStatus stat;
-    try {
-      stat = FILE_CREATOR.create(namenode, src,
-        FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
-        new EnumSetWritable<>(overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)),
-        createParent, replication, blockSize, CryptoProtocolVersion.supported());
-    } catch (Exception e) {
-      if (e instanceof RemoteException) {
-        throw (RemoteException) e;
-      } else {
-        throw new NameNodeException(e);
-      }
-    }
-    beginFileLease(client, stat.getFileId());
-    boolean succ = false;
-    LocatedBlock locatedBlock = null;
-    List<Future<Channel>> futureList = null;
-    try {
-      DataChecksum summer = createChecksum(client);
-      locatedBlock = BLOCK_ADDER.addBlock(namenode, src, client.getClientName(), null, null,
-        stat.getFileId(), null);
-      List<Channel> datanodeList = new ArrayList<>();
-      futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L,
-        PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass);
-      for (Future<Channel> future : futureList) {
-        // fail the creation if there are connection failures since we are fail-fast. The upper
-        // layer should retry itself if needed.
-        datanodeList.add(future.syncUninterruptibly().getNow());
+    int createMaxRetries = conf.getInt(ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES,
+      DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES);
+    DatanodeInfo[] excludesNodes = EMPTY_DN_ARRAY;
+    for (int retry = 0;; retry++) {
+      HdfsFileStatus stat;
+      try {
+        stat = FILE_CREATOR.create(namenode, src,
+          FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
+          new EnumSetWritable<>(overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)),
+          createParent, replication, blockSize, CryptoProtocolVersion.supported());
+      } catch (Exception e) {
+        if (e instanceof RemoteException) {
+          throw (RemoteException) e;
+        } else {
+          throw new NameNodeException(e);
+        }
       }
-      Encryptor encryptor = createEncryptor(conf, stat, client);
-      FanOutOneBlockAsyncDFSOutput output =
+      beginFileLease(client, stat.getFileId());
+      boolean succ = false;
+      LocatedBlock locatedBlock = null;
+      List<Future<Channel>> futureList = null;
+      try {
+        DataChecksum summer = createChecksum(client);
+        locatedBlock = BLOCK_ADDER.addBlock(namenode, src, client.getClientName(), null,
+          excludesNodes, stat.getFileId(), null);
+        List<Channel> datanodeList = new ArrayList<>();
+        futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L,
+          PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass);
+        for (int i = 0, n = futureList.size(); i < n; i++) {
+          try {
+            datanodeList.add(futureList.get(i).syncUninterruptibly().getNow());
+          } catch (Exception e) {
+            // exclude the broken DN next time
+            excludesNodes = ArrayUtils.add(excludesNodes, locatedBlock.getLocations()[i]);
+            throw e;
+          }
+        }
+        Encryptor encryptor = createEncryptor(conf, stat, client);
+        FanOutOneBlockAsyncDFSOutput output =
           new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs, client, namenode, clientName, src,
               stat.getFileId(), locatedBlock, encryptor, datanodeList, summer, ALLOC);
-      succ = true;
-      return output;
-    } finally {
-      if (!succ) {
-        if (futureList != null) {
-          for (Future<Channel> f : futureList) {
-            f.addListener(new FutureListener<Channel>() {
-
-              @Override
-              public void operationComplete(Future<Channel> future) throws Exception {
-                if (future.isSuccess()) {
-                  future.getNow().close();
+        succ = true;
+        return output;
+      } catch (RemoteException e) {
+        LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e);
+        if (shouldRetryCreate(e)) {
+          if (retry >= createMaxRetries) {
+            throw e.unwrapRemoteException();
+          }
+        } else {
+          throw e.unwrapRemoteException();
+        }
+      } catch (NameNodeException e) {
+        throw e;
+      } catch (IOException e) {
+        LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e);
+        if (retry >= createMaxRetries) {
+          throw e;
+        }
+        // overwrite the old broken file.
+        overwrite = true;
+        try {
+          Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
+        } catch (InterruptedException ie) {
+          throw new InterruptedIOException();
+        }
+      } finally {
+        if (!succ) {
+          if (futureList != null) {
+            for (Future<Channel> f : futureList) {
+              f.addListener(new FutureListener<Channel>() {
+
+                @Override
+                public void operationComplete(Future<Channel> future) throws Exception {
+                  if (future.isSuccess()) {
+                    future.getNow().close();
+                  }
                 }
-              }
-            });
+              });
+            }
           }
+          endFileLease(client, stat.getFileId());
+          fsUtils.recoverFileLease(dfs, new Path(src), conf, new CancelOnClose(client));
         }
-        endFileLease(client, stat.getFileId());
-        fsUtils.recoverFileLease(dfs, new Path(src), conf, new CancelOnClose(client));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ffa28502/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index be8665b..0ace782 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -17,14 +17,10 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
-import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.shouldRetryCreate;
-
 import com.lmax.disruptor.RingBuffer;
 import com.lmax.disruptor.Sequence;
 import com.lmax.disruptor.Sequencer;
-
 import java.io.IOException;
-import java.io.InterruptedIOException;
 import java.lang.reflect.Field;
 import java.util.ArrayDeque;
 import java.util.Comparator;
@@ -44,26 +40,23 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Supplier;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.client.ConnectionUtils;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
-import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.NameNodeException;
 import org.apache.hadoop.hbase.trace.TraceUtil;
 import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALKeyImpl;
 import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.ipc.RemoteException;
 import org.apache.htrace.core.TraceScope;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hbase.thirdparty.io.netty.channel.Channel;
 import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
@@ -140,9 +133,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
   public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size";
   public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024;
 
-  public static final String ASYNC_WAL_CREATE_MAX_RETRIES = "hbase.wal.async.create.retries";
-  public static final int DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES = 10;
-
   public static final String ASYNC_WAL_USE_SHARED_EVENT_LOOP =
     "hbase.wal.async.use-shared-event-loop";
   public static final boolean DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP = false;
@@ -189,8 +179,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
 
   private final long batchSize;
 
-  private final int createMaxRetries;
-
   private final ExecutorService closeExecutor = Executors.newCachedThreadPool(
     new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());
 
@@ -257,8 +245,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
     waitingConsumePayloadsGatingSequence.set(waitingConsumePayloads.getCursor());
 
     batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE);
-    createMaxRetries =
-      conf.getInt(ASYNC_WAL_CREATE_MAX_RETRIES, DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES);
     waitOnShutdownInSeconds = conf.getInt(ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS,
       DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS);
     rollWriter();
@@ -622,46 +608,19 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
 
   @Override
   protected AsyncWriter createWriterInstance(Path path) throws IOException {
-    boolean overwrite = false;
-    for (int retry = 0;; retry++) {
-      try {
-        return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, overwrite, eventLoopGroup,
-          channelClass);
-      } catch (RemoteException e) {
-        LOG.warn("create wal log writer " + path + " failed, retry = " + retry, e);
-        if (shouldRetryCreate(e)) {
-          if (retry >= createMaxRetries) {
-            break;
-          }
-        } else {
-          IOException ioe = e.unwrapRemoteException();
-          // this usually means master already think we are dead so let's fail all the pending
-          // syncs. The shutdown process of RS will wait for all regions to be closed before calling
-          // WAL.close so if we do not wake up the thread blocked by sync here it will cause dead
-          // lock.
-          if (e.getMessage().contains("Parent directory doesn't exist:")) {
-            syncFutures.forEach(f -> f.done(f.getTxid(), ioe));
-          }
-          throw ioe;
-        }
-      } catch (NameNodeException e) {
-        throw e;
-      } catch (IOException e) {
-        LOG.warn("create wal log writer " + path + " failed, retry = " + retry, e);
-        if (retry >= createMaxRetries) {
-          break;
-        }
-        // overwrite the old broken file.
-        overwrite = true;
-        try {
-          Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
-        } catch (InterruptedException ie) {
-          throw new InterruptedIOException();
-        }
+    try {
+      return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, false, eventLoopGroup,
+        channelClass);
+    } catch (IOException e) {
+      // this usually means master already think we are dead so let's fail all the pending
+      // syncs. The shutdown process of RS will wait for all regions to be closed before calling
+      // WAL.close so if we do not wake up the thread blocked by sync here it will cause dead
+      // lock.
+      if (e.getMessage().contains("Parent directory doesn't exist:")) {
+        syncFutures.forEach(f -> f.done(f.getTxid(), e));
       }
+      throw e;
     }
-    throw new IOException("Failed to create wal log writer " + path + " after retrying " +
-      createMaxRetries + " time(s)");
   }
 
   private void waitForSafePoint() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ffa28502/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
index 3a8da21..42539c2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
@@ -35,7 +35,6 @@ import java.util.Random;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
-
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -43,9 +42,9 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.MiscTests;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.util.Daemon;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -54,6 +53,7 @@ import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hbase.thirdparty.io.netty.channel.Channel;
 import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
 import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
@@ -95,23 +95,6 @@ public class TestFanOutOneBlockAsyncDFSOutput {
     TEST_UTIL.shutdownMiniDFSCluster();
   }
 
-  private void ensureAllDatanodeAlive() throws InterruptedException {
-    // FanOutOneBlockAsyncDFSOutputHelper.createOutput is fail-fast, so we need to make sure that we
-    // can create a FanOutOneBlockAsyncDFSOutput after a datanode restarting, otherwise some tests
-    // will fail.
-    for (;;) {
-      try {
-        FanOutOneBlockAsyncDFSOutput out =
-            FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, new Path("/ensureDatanodeAlive"),
-              true, true, (short) 3, FS.getDefaultBlockSize(), EVENT_LOOP_GROUP, CHANNEL_CLASS);
-        out.close();
-        break;
-      } catch (IOException e) {
-        Thread.sleep(100);
-      }
-    }
-  }
-
   static void writeAndVerify(FileSystem fs, Path f, AsyncFSOutput out)
       throws IOException, InterruptedException, ExecutionException {
     List<CompletableFuture<Long>> futures = new ArrayList<>();
@@ -163,25 +146,21 @@ public class TestFanOutOneBlockAsyncDFSOutput {
     out.flush(false).get();
     // restart one datanode which causes one connection broken
     TEST_UTIL.getDFSCluster().restartDataNode(0);
+    out.write(b, 0, b.length);
     try {
-      out.write(b, 0, b.length);
-      try {
-        out.flush(false).get();
-        fail("flush should fail");
-      } catch (ExecutionException e) {
-        // we restarted one datanode so the flush should fail
-        LOG.info("expected exception caught", e);
-      }
-      out.recoverAndClose(null);
-      assertEquals(b.length, FS.getFileStatus(f).getLen());
-      byte[] actual = new byte[b.length];
-      try (FSDataInputStream in = FS.open(f)) {
-        in.readFully(actual);
-      }
-      assertArrayEquals(b, actual);
-    } finally {
-      ensureAllDatanodeAlive();
+      out.flush(false).get();
+      fail("flush should fail");
+    } catch (ExecutionException e) {
+      // we restarted one datanode so the flush should fail
+      LOG.info("expected exception caught", e);
+    }
+    out.recoverAndClose(null);
+    assertEquals(b.length, FS.getFileStatus(f).getLen());
+    byte[] actual = new byte[b.length];
+    try (FSDataInputStream in = FS.open(f)) {
+      in.readFully(actual);
     }
+    assertArrayEquals(b, actual);
   }
 
   @Test
@@ -219,28 +198,19 @@ public class TestFanOutOneBlockAsyncDFSOutput {
     Field xceiverServerDaemonField = DataNode.class.getDeclaredField("dataXceiverServer");
     xceiverServerDaemonField.setAccessible(true);
     Class<?> xceiverServerClass =
-        Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer");
+      Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer");
     Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers");
     numPeersMethod.setAccessible(true);
     // make one datanode broken
-    TEST_UTIL.getDFSCluster().getDataNodes().get(0).shutdownDatanode(true);
-    try {
-      Path f = new Path("/test");
-      EventLoop eventLoop = EVENT_LOOP_GROUP.next();
-      try {
-        FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
-          FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS);
-        fail("should fail with connection error");
-      } catch (IOException e) {
-        LOG.info("expected exception caught", e);
-      }
-      for (DataNode dn : TEST_UTIL.getDFSCluster().getDataNodes()) {
-        Daemon daemon = (Daemon) xceiverServerDaemonField.get(dn);
-        assertEquals(0, numPeersMethod.invoke(daemon.getRunnable()));
-      }
+    DataNodeProperties dnProp = TEST_UTIL.getDFSCluster().stopDataNode(0);
+    Path f = new Path("/test");
+    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
+    try (FanOutOneBlockAsyncDFSOutput output = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS,
+      f, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS)) {
+      // should exclude the dead dn when retry so here we only have 2 DNs in pipeline
+      assertEquals(2, output.getPipeline().length);
     } finally {
-      TEST_UTIL.getDFSCluster().restartDataNode(0);
-      ensureAllDatanodeAlive();
+      TEST_UTIL.getDFSCluster().restartDataNode(dnProp);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/ffa28502/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java
index 325da94..01e44fb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CategoryBasedTimeout;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
 import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
@@ -45,7 +46,7 @@ public class TestAsyncLogRolling extends AbstractTestLogRolling {
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     Configuration conf = TestAsyncLogRolling.TEST_UTIL.getConfiguration();
-    conf.setInt(AsyncFSWAL.ASYNC_WAL_CREATE_MAX_RETRIES, 100);
+    conf.setInt(FanOutOneBlockAsyncDFSOutputHelper.ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES, 100);
     conf.set(WALFactory.WAL_PROVIDER, "asyncfs");
     conf.set(WALFactory.META_WAL_PROVIDER, "asyncfs");
     AbstractTestLogRolling.setUpBeforeClass();