You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2019/03/12 17:25:46 UTC

[hadoop] branch branch-3.1 updated: HDFS-14333. Datanode fails to start if any disk has errors during Namenode registration. Contributed by Stephen O'Donnell.

This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 327e57a  HDFS-14333. Datanode fails to start if any disk has errors during Namenode registration. Contributed by Stephen O'Donnell.
327e57a is described below

commit 327e57ae6e469d599cd94e2ac7f37b9c221730cf
Author: Stephen O'Donnell <so...@cloudera.com>
AuthorDate: Tue Mar 12 10:16:28 2019 -0700

    HDFS-14333. Datanode fails to start if any disk has errors during Namenode registration. Contributed by Stephen O'Donnell.
    
    Signed-off-by: Wei-Chiu Chuang <we...@apache.org>
    (cherry picked from commit 34b14061b38dccab25058dff1b8743d8a3f82734)
    (cherry picked from commit a21e2e4dbc1681175130242138ced0d36b6fafae)
---
 .../hadoop/hdfs/server/datanode/DataNode.java      |  29 +++++-
 .../fsdataset/impl/AddBlockPoolException.java      |  45 +++++++++
 .../datanode/fsdataset/impl/FsVolumeList.java      |  29 +++---
 .../org/apache/hadoop/hdfs/MiniDFSCluster.java     |   9 +-
 .../hdfs/server/datanode/SimulatedFSDataset.java   |  12 ++-
 .../server/datanode/TestDataNodeVolumeFailure.java | 110 +++++++++++++++++++++
 .../fsdataset/impl/FsDatasetImplTestUtils.java     |   3 +-
 7 files changed, 215 insertions(+), 22 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 139b3d0..31d182d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -166,6 +166,7 @@ import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResour
 import org.apache.hadoop.hdfs.server.datanode.erasurecode.ErasureCodingWorker;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.AddBlockPoolException;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeDiskMetrics;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodePeerMetrics;
@@ -1681,13 +1682,37 @@ public class DataNode extends ReconfigurableBase
     // Exclude failed disks before initializing the block pools to avoid startup
     // failures.
     checkDiskError();
-
-    data.addBlockPool(nsInfo.getBlockPoolID(), getConf());
+    try {
+      data.addBlockPool(nsInfo.getBlockPoolID(), getConf());
+    } catch (AddBlockPoolException e) {
+      handleAddBlockPoolError(e);
+    }
     blockScanner.enableBlockPoolId(bpos.getBlockPoolId());
     initDirectoryScanner(getConf());
     initDiskBalancer(data, getConf());
   }
 
+  /**
+   * Handles an AddBlockPoolException object thrown from
+   * {@link org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeList#
+   * addBlockPool}. Will ensure that all volumes that encounted a
+   * AddBlockPoolException are removed from the DataNode and marked as failed
+   * volumes in the same way as a runtime volume failure.
+   *
+   * @param e this exception is a container for all IOException objects caught
+   *          in FsVolumeList#addBlockPool.
+   */
+  private void handleAddBlockPoolError(AddBlockPoolException e)
+      throws IOException {
+    Map<FsVolumeSpi, IOException> unhealthyDataDirs =
+        e.getFailingVolumes();
+    if (unhealthyDataDirs != null && !unhealthyDataDirs.isEmpty()) {
+      handleVolumeFailures(unhealthyDataDirs.keySet());
+    } else {
+      LOG.debug("HandleAddBlockPoolError called with empty exception list");
+    }
+  }
+
   List<BPOfferService> getAllBpOs() {
     return blockPoolManager.getAllNamenodeThreads();
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/AddBlockPoolException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/AddBlockPoolException.java
new file mode 100644
index 0000000..ef63f00
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/AddBlockPoolException.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+
+/**
+ * This exception collects all IOExceptions thrown when adding block pools and
+ * scanning volumes. It keeps the information about which volume is associated
+ * with an exception.
+ *
+ */
+public class AddBlockPoolException extends RuntimeException {
+  private Map<FsVolumeSpi, IOException> unhealthyDataDirs;
+  public AddBlockPoolException(Map<FsVolumeSpi, IOException>
+      unhealthyDataDirs) {
+    this.unhealthyDataDirs = unhealthyDataDirs;
+  }
+
+  public Map<FsVolumeSpi, IOException> getFailingVolumes() {
+    return unhealthyDataDirs;
+  }
+  @Override
+  public String toString() {
+    return getClass().getName() + ": " + unhealthyDataDirs.toString();
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
index a0fcb54..85b85cf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
@@ -22,6 +22,7 @@ import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.Iterator;
 import java.util.List;
@@ -188,8 +189,8 @@ class FsVolumeList {
                         final RamDiskReplicaTracker ramDiskReplicaMap)
       throws IOException {
     long totalStartTime = Time.monotonicNow();
-    final List<IOException> exceptions = Collections.synchronizedList(
-        new ArrayList<IOException>());
+    final Map<FsVolumeSpi, IOException> unhealthyDataDirs =
+        new ConcurrentHashMap<FsVolumeSpi, IOException>();
     List<Thread> replicaAddingThreads = new ArrayList<Thread>();
     for (final FsVolumeImpl v : volumes) {
       Thread t = new Thread() {
@@ -208,7 +209,7 @@ class FsVolumeList {
           } catch (IOException ioe) {
             FsDatasetImpl.LOG.info("Caught exception while adding replicas " +
                 "from " + v + ". Will throw later.", ioe);
-            exceptions.add(ioe);
+            unhealthyDataDirs.put(v, ioe);
           }
         }
       };
@@ -222,13 +223,13 @@ class FsVolumeList {
         throw new IOException(ie);
       }
     }
-    if (!exceptions.isEmpty()) {
-      throw exceptions.get(0);
-    }
     long totalTimeTaken = Time.monotonicNow() - totalStartTime;
     FsDatasetImpl.LOG
         .info("Total time to add all replicas to map for block pool " + bpid
             + ": " + totalTimeTaken + "ms");
+    if (!unhealthyDataDirs.isEmpty()) {
+      throw new AddBlockPoolException(unhealthyDataDirs);
+    }
   }
 
   /**
@@ -398,9 +399,8 @@ class FsVolumeList {
 
   void addBlockPool(final String bpid, final Configuration conf) throws IOException {
     long totalStartTime = Time.monotonicNow();
-    
-    final List<IOException> exceptions = Collections.synchronizedList(
-        new ArrayList<IOException>());
+    final Map<FsVolumeSpi, IOException> unhealthyDataDirs =
+        new ConcurrentHashMap<FsVolumeSpi, IOException>();
     List<Thread> blockPoolAddingThreads = new ArrayList<Thread>();
     for (final FsVolumeImpl v : volumes) {
       Thread t = new Thread() {
@@ -418,7 +418,7 @@ class FsVolumeList {
           } catch (IOException ioe) {
             FsDatasetImpl.LOG.info("Caught exception while scanning " + v +
                 ". Will throw later.", ioe);
-            exceptions.add(ioe);
+            unhealthyDataDirs.put(v, ioe);
           }
         }
       };
@@ -432,15 +432,14 @@ class FsVolumeList {
         throw new IOException(ie);
       }
     }
-    if (!exceptions.isEmpty()) {
-      throw exceptions.get(0);
-    }
-    
     long totalTimeTaken = Time.monotonicNow() - totalStartTime;
     FsDatasetImpl.LOG.info("Total time to scan all replicas for block pool " +
         bpid + ": " + totalTimeTaken + "ms");
+    if (!unhealthyDataDirs.isEmpty()) {
+      throw new AddBlockPoolException(unhealthyDataDirs);
+    }
   }
-  
+
   void removeBlockPool(String bpid, Map<DatanodeStorage, BlockListAsLongs>
       blocksPerVolume) {
     for (FsVolumeImpl v : volumes) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index cabe7ac..8fd9dc4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -2350,14 +2350,19 @@ public class MiniDFSCluster implements AutoCloseable {
     return restartDataNode(dnprop, false);
   }
 
-  private void waitDataNodeFullyStarted(final DataNode dn)
+  public void waitDatanodeFullyStarted(DataNode dn, int timeout)
       throws TimeoutException, InterruptedException {
     GenericTestUtils.waitFor(new Supplier<Boolean>() {
       @Override
       public Boolean get() {
         return dn.isDatanodeFullyStarted();
       }
-    }, 100, 60000);
+    }, 100, timeout);
+  }
+
+  private void waitDataNodeFullyStarted(final DataNode dn)
+      throws TimeoutException, InterruptedException {
+    waitDatanodeFullyStarted(dn, 60000);
   }
 
   /**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 987ba97..781fe3b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -430,7 +430,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
    * Class used for tracking datanode level storage utilization similar
    * to {@link FSVolumeSet}
    */
-  private static class SimulatedStorage {
+  static class SimulatedStorage {
     private final Map<String, SimulatedBPStorage> map =
         new ConcurrentHashMap<>();
 
@@ -615,7 +615,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
 
     @Override
     public StorageLocation getStorageLocation() {
-      return null;
+      try {
+        return StorageLocation.parse("[DISK]file:///simulated");
+      } catch (Exception e) {
+        return null;
+      }
     }
 
     @Override
@@ -663,6 +667,10 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
   private final DataNode datanode;
   
 
+  public List<SimulatedStorage> getStorages() {
+    return storages;
+  }
+
   public SimulatedFSDataset(DataStorage storage, Configuration conf) {
     this(null, storage, conf);
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
index 7d04942..4c691b4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.File;
 import java.io.IOException;
@@ -31,12 +32,16 @@ import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.filefilter.TrueFileFilter;
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
@@ -64,6 +69,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.AddBlockPoolException;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
@@ -219,6 +225,50 @@ public class TestDataNodeVolumeFailure {
         " is created and replicated");
   }
 
+  /*
+   * If one of the sub-folders under the finalized directory is unreadable,
+   * either due to permissions or a filesystem corruption, the DN will fail
+   * to read it when scanning it for blocks to load into the replica map. This
+   * test ensures the DN does not exit and reports the failed volume to the
+   * NN (HDFS-14333). This is done by using a simulated FsDataset that throws
+   * an exception for a failed volume when the block pool is initialized.
+   */
+  @Test(timeout=15000)
+  public void testDnStartsAfterDiskErrorScanningBlockPool() throws Exception {
+    // Don't use the cluster configured in the setup() method for this test.
+    cluster.shutdown(true);
+    cluster.close();
+
+    conf.set(DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY,
+        BadDiskFSDataset.Factory.class.getName());
+
+    final MiniDFSCluster localCluster = new MiniDFSCluster
+        .Builder(conf).numDataNodes(1).build();
+
+    try {
+      localCluster.waitActive();
+      DataNode dn = localCluster.getDataNodes().get(0);
+
+      try {
+        localCluster.waitDatanodeFullyStarted(dn, 3000);
+      } catch (TimeoutException e) {
+        fail("Datanode did not get fully started");
+      }
+      assertTrue(dn.isDatanodeUp());
+
+      // trigger DN to send heartbeat
+      DataNodeTestUtils.triggerHeartbeat(dn);
+      final BlockManager bm = localCluster.getNamesystem().getBlockManager();
+      // trigger NN handle heartbeat
+      BlockManagerTestUtil.checkHeartbeat(bm);
+
+      // NN now should have the failed volume
+      assertEquals(1, localCluster.getNamesystem().getVolumeFailuresTotal());
+    } finally {
+      localCluster.close();
+    }
+  }
+
   /**
    * Test that DataStorage and BlockPoolSliceStorage remove the failed volume
    * after failure.
@@ -758,4 +808,64 @@ public class TestDataNodeVolumeFailure {
     }
     return total;
   }
+
+  private static class BadDiskFSDataset extends SimulatedFSDataset {
+
+    BadDiskFSDataset(DataStorage storage, Configuration conf) {
+      super(storage, conf);
+    }
+
+    private String[] failedStorageLocations = null;
+
+    @Override
+    public void addBlockPool(String bpid, Configuration conf) {
+      super.addBlockPool(bpid, conf);
+      Map<FsVolumeSpi, IOException>
+          unhealthyDataDirs = new HashMap<>();
+      unhealthyDataDirs.put(this.getStorages().get(0).getVolume(),
+          new IOException());
+      throw new AddBlockPoolException(unhealthyDataDirs);
+    }
+
+    @Override
+    public synchronized void removeVolumes(Collection<StorageLocation> volumes,
+        boolean clearFailure) {
+      Iterator<StorageLocation> itr = volumes.iterator();
+      String[] failedLocations = new String[volumes.size()];
+      int index = 0;
+      while(itr.hasNext()) {
+        StorageLocation s = itr.next();
+        failedLocations[index] = s.getUri().getPath();
+        index += 1;
+      }
+      failedStorageLocations = failedLocations;
+    }
+
+    @Override
+    public void handleVolumeFailures(Set<FsVolumeSpi> failedVolumes) {
+      // do nothing
+    }
+
+    @Override
+    public VolumeFailureSummary getVolumeFailureSummary() {
+      if (failedStorageLocations != null) {
+        return new VolumeFailureSummary(failedStorageLocations, 0, 0);
+      } else {
+        return new VolumeFailureSummary(ArrayUtils.EMPTY_STRING_ARRAY, 0, 0);
+      }
+    }
+
+    static class Factory extends FsDatasetSpi.Factory<BadDiskFSDataset> {
+      @Override
+      public BadDiskFSDataset newInstance(DataNode datanode,
+          DataStorage storage, Configuration conf) throws IOException {
+        return new BadDiskFSDataset(storage, conf);
+      }
+
+      @Override
+      public boolean isSimulated() {
+        return true;
+      }
+    }
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java
index baaed9f..70ac799 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java
@@ -500,6 +500,7 @@ public class FsDatasetImplTestUtils implements FsDatasetTestUtils {
    * @param level the level to set
    */
   public static void setFsDatasetImplLogLevel(Level level) {
-    GenericTestUtils.setLogLevel(FsDatasetImpl.LOG, level);
+    GenericTestUtils.setLogLevel(FsDatasetImpl.LOG,
+        org.slf4j.event.Level.valueOf(level.toString()));
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org