You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2019/08/19 20:09:53 UTC

[hadoop] branch trunk updated: HDFS-13709. Report bad block to NN when transfer block encounter EIO exception. Contributed by Chen Zhang.

This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 360a96f  HDFS-13709. Report bad block to NN when transfer block encounter EIO exception. Contributed by Chen Zhang.
360a96f is described below

commit 360a96f342f3c8cb8246f011abb9bcb0b6ef3eaa
Author: Wei-Chiu Chuang <we...@apache.org>
AuthorDate: Mon Aug 19 13:08:55 2019 -0700

    HDFS-13709. Report bad block to NN when transfer block encounter EIO exception. Contributed by Chen Zhang.
---
 .../hadoop/hdfs/server/datanode/BlockSender.java   |  18 +++-
 .../hadoop/hdfs/server/datanode/DataNode.java      |  43 ++++++--
 .../server/datanode/DiskFileCorruptException.java  |  39 ++++++++
 .../hadoop/hdfs/server/datanode/VolumeScanner.java |   7 +-
 .../org/apache/hadoop/hdfs/TestReplication.java    | 111 +++++++++++++++++++++
 .../hdfs/server/datanode/SimulatedFSDataset.java   |   4 +-
 .../hadoop/hdfs/server/datanode/TestDiskError.java |   4 +-
 7 files changed, 208 insertions(+), 18 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index 74c1025..a308b49 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -183,6 +183,8 @@ class BlockSender implements java.io.Closeable {
   // would risk sending too much unnecessary data. 512 (1 disk sector)
   // is likely to result in minimal extra IO.
   private static final long CHUNK_SIZE = 512;
+
+  private static final String EIO_ERROR = "Input/output error";
   /**
    * Constructor
    * 
@@ -576,7 +578,14 @@ class BlockSender implements java.io.Closeable {
     
     int dataOff = checksumOff + checksumDataLen;
     if (!transferTo) { // normal transfer
-      ris.readDataFully(buf, dataOff, dataLen);
+      try {
+        ris.readDataFully(buf, dataOff, dataLen);
+      } catch (IOException ioe) {
+        if (ioe.getMessage().startsWith(EIO_ERROR)) {
+          throw new DiskFileCorruptException("A disk IO error occurred", ioe);
+        }
+        throw ioe;
+      }
 
       if (verifyChecksum) {
         verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff);
@@ -623,6 +632,13 @@ class BlockSender implements java.io.Closeable {
          * It was done here because the NIO throws an IOException for EPIPE.
          */
         String ioem = e.getMessage();
+        /*
+         * If we got an EIO when reading files or transferTo the client socket,
+         * it's very likely caused by bad disk track or other file corruptions.
+         */
+        if (ioem.startsWith(EIO_ERROR)) {
+          throw new DiskFileCorruptException("A disk IO error occurred", e);
+        }
         if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection reset")) {
           LOG.error("BlockSender.sendChunks() exception: ", e);
           datanode.getBlockScanner().markSuspectBlock(
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index c8f6896..e620a2b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -2610,13 +2610,7 @@ public class DataNode extends ReconfigurableBase
           metrics.incrBlocksReplicated();
         }
       } catch (IOException ie) {
-        if (ie instanceof InvalidChecksumSizeException) {
-          // Add the block to the front of the scanning queue if metadata file
-          // is corrupt. We already add the block to front of scanner if the
-          // peer disconnects.
-          LOG.info("Adding block: {} for scanning", b);
-          blockScanner.markSuspectBlock(data.getVolume(b).getStorageID(), b);
-        }
+        handleBadBlock(b, ie, false);
         LOG.warn("{}:Failed to transfer {} to {} got",
             bpReg, b, targets[0], ie);
       } finally {
@@ -3462,6 +3456,41 @@ public class DataNode extends ReconfigurableBase
     handleDiskError(sb.toString());
   }
 
+  /**
+   * A bad block need to be handled, either to add to blockScanner suspect queue
+   * or report to NameNode directly.
+   *
+   * If the method is called by scanner, then the block must be a bad block, we
+   * report it to NameNode directly. Otherwise if we judge it as a bad block
+   * according to exception type, then we try to add the bad block to
+   * blockScanner suspect queue if blockScanner is enabled, or report to
+   * NameNode directly otherwise.
+   *
+   * @param block The suspicious block
+   * @param e The exception encountered when accessing the block
+   * @param fromScanner Is it from blockScanner. The blockScanner will call this
+   *          method only when it's sure that the block is corrupt.
+   */
+  void handleBadBlock(ExtendedBlock block, IOException e, boolean fromScanner) {
+
+    boolean isBadBlock = fromScanner || (e instanceof DiskFileCorruptException
+        || e instanceof InvalidChecksumSizeException);
+
+    if (!isBadBlock) {
+      return;
+    }
+    if (!fromScanner && blockScanner.isEnabled()) {
+      blockScanner.markSuspectBlock(data.getVolume(block).getStorageID(),
+          block);
+    } else {
+      try {
+        reportBadBlocks(block);
+      } catch (IOException ie) {
+        LOG.warn("report bad block {} failed", block, ie);
+      }
+    }
+  }
+
   @VisibleForTesting
   public long getLastDiskErrorCheck() {
     return lastDiskErrorCheck;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskFileCorruptException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskFileCorruptException.java
new file mode 100644
index 0000000..1a70fe7
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskFileCorruptException.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+
+/**
+ * When kernel report a "Input/output error", we use this exception to
+ * represents some corruption(e.g. bad disk track) happened on some disk file.
+ */
+public class DiskFileCorruptException extends IOException {
+  /**
+   * Instantiate.
+   * @param msg the exception message
+   * @param cause the underlying cause
+   */
+  public DiskFileCorruptException(String msg, Throwable cause) {
+    super(msg, cause);
+  }
+
+  public DiskFileCorruptException(String msg) {
+    super(msg);
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
index 3bdb1d7..84cfb04 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
@@ -290,12 +290,7 @@ public class VolumeScanner extends Thread {
         return;
       }
       LOG.warn("Reporting bad {} on {}", block, volume);
-      try {
-        scanner.datanode.reportBadBlocks(block, volume);
-      } catch (IOException ie) {
-        // This is bad, but not bad enough to shut down the scanner.
-        LOG.warn("Cannot report bad block " + block, ie);
-      }
+      scanner.datanode.handleBadBlock(block, e, true);
     }
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java
index 7e51ed9..6af7106 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java
@@ -27,6 +27,7 @@ import static org.mockito.ArgumentMatchers.any;
 import com.google.common.base.Supplier;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
@@ -53,7 +54,9 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils.MaterializedReplica;
 import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
@@ -154,6 +157,67 @@ public class TestReplication {
     assertTrue(!fileSys.exists(name));
   }
 
+  private static class CorruptFileSimulatedFSDataset
+      extends SimulatedFSDataset {
+    /**
+     * Simulated input and output streams.
+     *
+     */
+    static private class CorruptFileSimulatedInputStream
+        extends java.io.InputStream {
+      private InputStream inputStream;
+
+      CorruptFileSimulatedInputStream(InputStream is) {
+        inputStream = is;
+      }
+
+      @Override
+      public int read() throws IOException {
+        int ret = inputStream.read();
+        if (ret > 0) {
+          throw new IOException("Input/output error");
+        }
+        return ret;
+      }
+
+      @Override
+      public int read(byte[] b) throws IOException {
+        int ret = inputStream.read(b);
+        if (ret > 0) {
+          throw new IOException("Input/output error");
+        }
+        return ret;
+      }
+    }
+
+    CorruptFileSimulatedFSDataset(DataNode datanode, DataStorage storage,
+        Configuration conf) {
+      super(storage, conf);
+    }
+
+    @Override
+    public synchronized InputStream getBlockInputStream(ExtendedBlock b,
+        long seekOffset) throws IOException {
+      InputStream result = super.getBlockInputStream(b);
+      IOUtils.skipFully(result, seekOffset);
+      return new CorruptFileSimulatedInputStream(result);
+    }
+
+    static class Factory
+        extends FsDatasetSpi.Factory<CorruptFileSimulatedFSDataset> {
+      @Override
+      public CorruptFileSimulatedFSDataset newInstance(DataNode datanode,
+          DataStorage storage, Configuration conf) throws IOException {
+        return new CorruptFileSimulatedFSDataset(datanode, storage, conf);
+      }
+
+      @Override
+      public boolean isSimulated() {
+        return true;
+      }
+    }
+  }
+
   private void testBadBlockReportOnTransfer(
       boolean corruptBlockByDeletingBlockFile) throws Exception {
     Configuration conf = new HdfsConfiguration();
@@ -205,6 +269,53 @@ public class TestReplication {
     cluster.shutdown();
   }
 
+  @Test(timeout = 30000)
+  public void testBadBlockReportOnTransferCorruptFile() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    conf.set(DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY,
+        CorruptFileSimulatedFSDataset.Factory.class.getName());
+    // Disable BlockScanner to trigger reportBadBlocks
+    conf.setLong(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1L);
+    FileSystem fs;
+    int replicaCount = 0;
+    short replFactor = 1;
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
+    cluster.waitActive();
+    try {
+      fs = cluster.getFileSystem();
+      final DFSClient dfsClient = new DFSClient(
+          new InetSocketAddress("localhost", cluster.getNameNodePort()), conf);
+
+      // Create file with replication factor of 1
+      Path file1 = new Path("/tmp/testBadBlockReportOnTransfer/file1");
+      DFSTestUtil.createFile(fs, file1, 1024, replFactor, 0);
+      DFSTestUtil.waitReplication(fs, file1, replFactor);
+
+      // Increase replication factor, this should invoke transfer request
+      // Receiving datanode fails on checksum and reports it to namenode
+      replFactor = 2;
+      fs.setReplication(file1, replFactor);
+
+      // Now get block details and check if the block is corrupt
+      GenericTestUtils.waitFor(() -> {
+        try {
+          return dfsClient.getNamenode()
+              .getBlockLocations(file1.toString(), 0, Long.MAX_VALUE).get(0)
+              .isCorrupt();
+        } catch (IOException ie) {
+          return false;
+        }
+      }, 1000, 15000);
+      replicaCount = dfsClient.getNamenode()
+          .getBlockLocations(file1.toString(), 0, Long.MAX_VALUE).get(0)
+          .getLocations().length;
+      assertEquals("replication should not success", 1, replicaCount);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
   /* 
    * Test if Datanode reports bad blocks during replication request
    */
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index c399673..f33332f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -598,7 +598,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
 
     @Override
     public StorageType getStorageType() {
-      return null;
+      return StorageType.DISK;
     }
 
     @Override
@@ -1178,7 +1178,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
     return new ReplicaHandler(binfo, null);
   }
 
-  protected synchronized InputStream getBlockInputStream(ExtendedBlock b)
+  public synchronized InputStream getBlockInputStream(ExtendedBlock b)
       throws IOException {
     BInfo binfo = getBlockMap(b).get(b.getLocalBlock());
     if (binfo == null) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
index 38e4287..0a589a9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
@@ -240,10 +240,10 @@ public class TestDiskError {
   @Test
   public void testDataTransferWhenBytesPerChecksumIsZero() throws IOException {
     DataNode dn0 = cluster.getDataNodes().get(0);
-    // Make a mock blockScanner class and return false whenever isEnabled is
+    // Make a mock blockScanner class and return true whenever isEnabled is
     // called on blockScanner
     BlockScanner mockScanner = Mockito.mock(BlockScanner.class);
-    Mockito.when(mockScanner.isEnabled()).thenReturn(false);
+    Mockito.when(mockScanner.isEnabled()).thenReturn(true);
     dn0.setBlockScanner(mockScanner);
     Path filePath = new Path("test.dat");
     FSDataOutputStream out = fs.create(filePath, (short) 1);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org