You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2019/08/19 20:09:53 UTC
[hadoop] branch trunk updated: HDFS-13709. Report bad block to NN
when transfer block encounter EIO exception. Contributed by Chen Zhang.
This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 360a96f HDFS-13709. Report bad block to NN when transfer block encounter EIO exception. Contributed by Chen Zhang.
360a96f is described below
commit 360a96f342f3c8cb8246f011abb9bcb0b6ef3eaa
Author: Wei-Chiu Chuang <we...@apache.org>
AuthorDate: Mon Aug 19 13:08:55 2019 -0700
HDFS-13709. Report bad block to NN when transfer block encounter EIO exception. Contributed by Chen Zhang.
---
.../hadoop/hdfs/server/datanode/BlockSender.java | 18 +++-
.../hadoop/hdfs/server/datanode/DataNode.java | 43 ++++++--
.../server/datanode/DiskFileCorruptException.java | 39 ++++++++
.../hadoop/hdfs/server/datanode/VolumeScanner.java | 7 +-
.../org/apache/hadoop/hdfs/TestReplication.java | 111 +++++++++++++++++++++
.../hdfs/server/datanode/SimulatedFSDataset.java | 4 +-
.../hadoop/hdfs/server/datanode/TestDiskError.java | 4 +-
7 files changed, 208 insertions(+), 18 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index 74c1025..a308b49 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -183,6 +183,8 @@ class BlockSender implements java.io.Closeable {
// would risk sending too much unnecessary data. 512 (1 disk sector)
// is likely to result in minimal extra IO.
private static final long CHUNK_SIZE = 512;
+
+ private static final String EIO_ERROR = "Input/output error";
/**
* Constructor
*
@@ -576,7 +578,14 @@ class BlockSender implements java.io.Closeable {
int dataOff = checksumOff + checksumDataLen;
if (!transferTo) { // normal transfer
- ris.readDataFully(buf, dataOff, dataLen);
+ try {
+ ris.readDataFully(buf, dataOff, dataLen);
+ } catch (IOException ioe) {
+ if (ioe.getMessage().startsWith(EIO_ERROR)) {
+ throw new DiskFileCorruptException("A disk IO error occurred", ioe);
+ }
+ throw ioe;
+ }
if (verifyChecksum) {
verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff);
@@ -623,6 +632,13 @@ class BlockSender implements java.io.Closeable {
* It was done here because the NIO throws an IOException for EPIPE.
*/
String ioem = e.getMessage();
+ /*
+ * If we got an EIO when reading files or transferTo the client socket,
+ * it's very likely caused by bad disk track or other file corruptions.
+ */
+ if (ioem.startsWith(EIO_ERROR)) {
+ throw new DiskFileCorruptException("A disk IO error occurred", e);
+ }
if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection reset")) {
LOG.error("BlockSender.sendChunks() exception: ", e);
datanode.getBlockScanner().markSuspectBlock(
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index c8f6896..e620a2b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -2610,13 +2610,7 @@ public class DataNode extends ReconfigurableBase
metrics.incrBlocksReplicated();
}
} catch (IOException ie) {
- if (ie instanceof InvalidChecksumSizeException) {
- // Add the block to the front of the scanning queue if metadata file
- // is corrupt. We already add the block to front of scanner if the
- // peer disconnects.
- LOG.info("Adding block: {} for scanning", b);
- blockScanner.markSuspectBlock(data.getVolume(b).getStorageID(), b);
- }
+ handleBadBlock(b, ie, false);
LOG.warn("{}:Failed to transfer {} to {} got",
bpReg, b, targets[0], ie);
} finally {
@@ -3462,6 +3456,41 @@ public class DataNode extends ReconfigurableBase
handleDiskError(sb.toString());
}
+ /**
+ * A bad block need to be handled, either to add to blockScanner suspect queue
+ * or report to NameNode directly.
+ *
+ * If the method is called by scanner, then the block must be a bad block, we
+ * report it to NameNode directly. Otherwise if we judge it as a bad block
+ * according to exception type, then we try to add the bad block to
+ * blockScanner suspect queue if blockScanner is enabled, or report to
+ * NameNode directly otherwise.
+ *
+ * @param block The suspicious block
+ * @param e The exception encountered when accessing the block
+ * @param fromScanner Is it from blockScanner. The blockScanner will call this
+ * method only when it's sure that the block is corrupt.
+ */
+ void handleBadBlock(ExtendedBlock block, IOException e, boolean fromScanner) {
+
+ boolean isBadBlock = fromScanner || (e instanceof DiskFileCorruptException
+ || e instanceof InvalidChecksumSizeException);
+
+ if (!isBadBlock) {
+ return;
+ }
+ if (!fromScanner && blockScanner.isEnabled()) {
+ blockScanner.markSuspectBlock(data.getVolume(block).getStorageID(),
+ block);
+ } else {
+ try {
+ reportBadBlocks(block);
+ } catch (IOException ie) {
+ LOG.warn("report bad block {} failed", block, ie);
+ }
+ }
+ }
+
@VisibleForTesting
public long getLastDiskErrorCheck() {
return lastDiskErrorCheck;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskFileCorruptException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskFileCorruptException.java
new file mode 100644
index 0000000..1a70fe7
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskFileCorruptException.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.IOException;
+
+/**
+ * When kernel report a "Input/output error", we use this exception to
+ * represents some corruption(e.g. bad disk track) happened on some disk file.
+ */
+public class DiskFileCorruptException extends IOException {
+ /**
+ * Instantiate.
+ * @param msg the exception message
+ * @param cause the underlying cause
+ */
+ public DiskFileCorruptException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+
+ public DiskFileCorruptException(String msg) {
+ super(msg);
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
index 3bdb1d7..84cfb04 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
@@ -290,12 +290,7 @@ public class VolumeScanner extends Thread {
return;
}
LOG.warn("Reporting bad {} on {}", block, volume);
- try {
- scanner.datanode.reportBadBlocks(block, volume);
- } catch (IOException ie) {
- // This is bad, but not bad enough to shut down the scanner.
- LOG.warn("Cannot report bad block " + block, ie);
- }
+ scanner.datanode.handleBadBlock(block, e, true);
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java
index 7e51ed9..6af7106 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java
@@ -27,6 +27,7 @@ import static org.mockito.ArgumentMatchers.any;
import com.google.common.base.Supplier;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@@ -53,7 +54,9 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.FsDatasetTestUtils.MaterializedReplica;
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
@@ -154,6 +157,67 @@ public class TestReplication {
assertTrue(!fileSys.exists(name));
}
+ private static class CorruptFileSimulatedFSDataset
+ extends SimulatedFSDataset {
+ /**
+ * Simulated input and output streams.
+ *
+ */
+ static private class CorruptFileSimulatedInputStream
+ extends java.io.InputStream {
+ private InputStream inputStream;
+
+ CorruptFileSimulatedInputStream(InputStream is) {
+ inputStream = is;
+ }
+
+ @Override
+ public int read() throws IOException {
+ int ret = inputStream.read();
+ if (ret > 0) {
+ throw new IOException("Input/output error");
+ }
+ return ret;
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ int ret = inputStream.read(b);
+ if (ret > 0) {
+ throw new IOException("Input/output error");
+ }
+ return ret;
+ }
+ }
+
+ CorruptFileSimulatedFSDataset(DataNode datanode, DataStorage storage,
+ Configuration conf) {
+ super(storage, conf);
+ }
+
+ @Override
+ public synchronized InputStream getBlockInputStream(ExtendedBlock b,
+ long seekOffset) throws IOException {
+ InputStream result = super.getBlockInputStream(b);
+ IOUtils.skipFully(result, seekOffset);
+ return new CorruptFileSimulatedInputStream(result);
+ }
+
+ static class Factory
+ extends FsDatasetSpi.Factory<CorruptFileSimulatedFSDataset> {
+ @Override
+ public CorruptFileSimulatedFSDataset newInstance(DataNode datanode,
+ DataStorage storage, Configuration conf) throws IOException {
+ return new CorruptFileSimulatedFSDataset(datanode, storage, conf);
+ }
+
+ @Override
+ public boolean isSimulated() {
+ return true;
+ }
+ }
+ }
+
private void testBadBlockReportOnTransfer(
boolean corruptBlockByDeletingBlockFile) throws Exception {
Configuration conf = new HdfsConfiguration();
@@ -205,6 +269,53 @@ public class TestReplication {
cluster.shutdown();
}
+ @Test(timeout = 30000)
+ public void testBadBlockReportOnTransferCorruptFile() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ conf.set(DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY,
+ CorruptFileSimulatedFSDataset.Factory.class.getName());
+ // Disable BlockScanner to trigger reportBadBlocks
+ conf.setLong(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1L);
+ FileSystem fs;
+ int replicaCount = 0;
+ short replFactor = 1;
+ MiniDFSCluster cluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
+ cluster.waitActive();
+ try {
+ fs = cluster.getFileSystem();
+ final DFSClient dfsClient = new DFSClient(
+ new InetSocketAddress("localhost", cluster.getNameNodePort()), conf);
+
+ // Create file with replication factor of 1
+ Path file1 = new Path("/tmp/testBadBlockReportOnTransfer/file1");
+ DFSTestUtil.createFile(fs, file1, 1024, replFactor, 0);
+ DFSTestUtil.waitReplication(fs, file1, replFactor);
+
+ // Increase replication factor, this should invoke transfer request
+ // Receiving datanode fails on checksum and reports it to namenode
+ replFactor = 2;
+ fs.setReplication(file1, replFactor);
+
+ // Now get block details and check if the block is corrupt
+ GenericTestUtils.waitFor(() -> {
+ try {
+ return dfsClient.getNamenode()
+ .getBlockLocations(file1.toString(), 0, Long.MAX_VALUE).get(0)
+ .isCorrupt();
+ } catch (IOException ie) {
+ return false;
+ }
+ }, 1000, 15000);
+ replicaCount = dfsClient.getNamenode()
+ .getBlockLocations(file1.toString(), 0, Long.MAX_VALUE).get(0)
+ .getLocations().length;
+ assertEquals("replication should not success", 1, replicaCount);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
/*
* Test if Datanode reports bad blocks during replication request
*/
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index c399673..f33332f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -598,7 +598,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override
public StorageType getStorageType() {
- return null;
+ return StorageType.DISK;
}
@Override
@@ -1178,7 +1178,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
return new ReplicaHandler(binfo, null);
}
- protected synchronized InputStream getBlockInputStream(ExtendedBlock b)
+ public synchronized InputStream getBlockInputStream(ExtendedBlock b)
throws IOException {
BInfo binfo = getBlockMap(b).get(b.getLocalBlock());
if (binfo == null) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
index 38e4287..0a589a9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
@@ -240,10 +240,10 @@ public class TestDiskError {
@Test
public void testDataTransferWhenBytesPerChecksumIsZero() throws IOException {
DataNode dn0 = cluster.getDataNodes().get(0);
- // Make a mock blockScanner class and return false whenever isEnabled is
+ // Make a mock blockScanner class and return true whenever isEnabled is
// called on blockScanner
BlockScanner mockScanner = Mockito.mock(BlockScanner.class);
- Mockito.when(mockScanner.isEnabled()).thenReturn(false);
+ Mockito.when(mockScanner.isEnabled()).thenReturn(true);
dn0.setBlockScanner(mockScanner);
Path filePath = new Path("test.dat");
FSDataOutputStream out = fs.create(filePath, (short) 1);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org