You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by he...@apache.org on 2021/04/21 10:33:34 UTC
[hadoop] branch branch-3.3 updated: HDFS-15963. Unreleased volume
references cause an infinite loop. (#2941) Contributed by Shuyan Zhang.
This is an automated email from the ASF dual-hosted git repository.
hexiaoqiao pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 9f2db2c HDFS-15963. Unreleased volume references cause an infinite loop. (#2941) Contributed by Shuyan Zhang.
9f2db2c is described below
commit 9f2db2c9fdcf1e61b39651a51f8223cfb8ce2e31
Author: zhangshuyan0 <81...@users.noreply.github.com>
AuthorDate: Wed Apr 21 18:32:56 2021 +0800
HDFS-15963. Unreleased volume references cause an infinite loop. (#2941) Contributed by Shuyan Zhang.
Reviewed-by: Wei-Chiu Chuang <we...@apache.org>
Reviewed-by: He Xiaoqiao <he...@apache.org>
---
.../hadoop/hdfs/server/datanode/BlockSender.java | 1 +
.../fsdataset/impl/FsDatasetAsyncDiskService.java | 71 +++++++++++++---------
.../datanode/fsdataset/impl/FsVolumeImpl.java | 2 +-
.../impl/RamDiskAsyncLazyPersistService.java | 29 ++++++---
.../hadoop/hdfs/TestDataTransferProtocol.java | 55 +++++++++++++++++
.../datanode/fsdataset/impl/TestFsDatasetImpl.java | 34 +++++++++++
.../fsdataset/impl/TestLazyPersistFiles.java | 37 +++++++++++
7 files changed, 188 insertions(+), 41 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index 99b9d64..aff0909 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -431,6 +431,7 @@ class BlockSender implements java.io.Closeable {
ris = new ReplicaInputStreams(
blockIn, checksumIn, volumeRef, fileIoProvider);
} catch (IOException ioe) {
+ IOUtils.cleanupWithLogger(null, volumeRef);
IOUtils.closeStream(this);
org.apache.commons.io.IOUtils.closeQuietly(blockIn);
org.apache.commons.io.IOUtils.closeQuietly(checksumIn);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
index 2a89a80d..706c078 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
@@ -167,18 +167,26 @@ class FsDatasetAsyncDiskService {
* Execute the task sometime in the future, using ThreadPools.
*/
synchronized void execute(FsVolumeImpl volume, Runnable task) {
- if (executors == null) {
- throw new RuntimeException("AsyncDiskService is already shutdown");
- }
- if (volume == null) {
- throw new RuntimeException("A null volume does not have a executor");
- }
- ThreadPoolExecutor executor = executors.get(volume.getStorageID());
- if (executor == null) {
- throw new RuntimeException("Cannot find volume " + volume
- + " for execution of task " + task);
- } else {
- executor.execute(task);
+ try {
+ if (executors == null) {
+ throw new RuntimeException("AsyncDiskService is already shutdown");
+ }
+ if (volume == null) {
+ throw new RuntimeException("A null volume does not have a executor");
+ }
+ ThreadPoolExecutor executor = executors.get(volume.getStorageID());
+ if (executor == null) {
+ throw new RuntimeException("Cannot find volume " + volume
+ + " for execution of task " + task);
+ } else {
+ executor.execute(task);
+ }
+ } catch (RuntimeException re) {
+ if (task instanceof ReplicaFileDeleteTask) {
+ IOUtils.cleanupWithLogger(null,
+ ((ReplicaFileDeleteTask) task).volumeRef);
+ }
+ throw re;
}
}
@@ -314,28 +322,31 @@ class FsDatasetAsyncDiskService {
@Override
public void run() {
- final long blockLength = replicaToDelete.getBlockDataLength();
- final long metaLength = replicaToDelete.getMetadataLength();
- boolean result;
+ try {
+ final long blockLength = replicaToDelete.getBlockDataLength();
+ final long metaLength = replicaToDelete.getMetadataLength();
+ boolean result;
- result = (trashDirectory == null) ? deleteFiles() : moveFiles();
+ result = (trashDirectory == null) ? deleteFiles() : moveFiles();
- if (!result) {
- LOG.warn("Unexpected error trying to "
- + (trashDirectory == null ? "delete" : "move")
- + " block " + block.getBlockPoolId() + " " + block.getLocalBlock()
- + " at file " + replicaToDelete.getBlockURI() + ". Ignored.");
- } else {
- if(block.getLocalBlock().getNumBytes() != BlockCommand.NO_ACK){
- datanode.notifyNamenodeDeletedBlock(block, volume.getStorageID());
+ if (!result) {
+ LOG.warn("Unexpected error trying to "
+ + (trashDirectory == null ? "delete" : "move")
+ + " block " + block.getBlockPoolId() + " " + block.getLocalBlock()
+ + " at file " + replicaToDelete.getBlockURI() + ". Ignored.");
+ } else {
+ if (block.getLocalBlock().getNumBytes() != BlockCommand.NO_ACK) {
+ datanode.notifyNamenodeDeletedBlock(block, volume.getStorageID());
+ }
+ volume.onBlockFileDeletion(block.getBlockPoolId(), blockLength);
+ volume.onMetaFileDeletion(block.getBlockPoolId(), metaLength);
+ LOG.info("Deleted " + block.getBlockPoolId() + " " +
+ block.getLocalBlock() + " URI " + replicaToDelete.getBlockURI());
}
- volume.onBlockFileDeletion(block.getBlockPoolId(), blockLength);
- volume.onMetaFileDeletion(block.getBlockPoolId(), metaLength);
- LOG.info("Deleted " + block.getBlockPoolId() + " "
- + block.getLocalBlock() + " URI " + replicaToDelete.getBlockURI());
+ updateDeletedBlockId(block);
+ } finally {
+ IOUtils.cleanupWithLogger(null, this.volumeRef);
}
- updateDeletedBlockId(block);
- IOUtils.cleanupWithLogger(null, volumeRef);
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index d644e05..e83b7c9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -304,7 +304,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
}
@VisibleForTesting
- int getReferenceCount() {
+ public int getReferenceCount() {
return this.reference.getReferenceCount();
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
index a77faf2..0d42ae9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+import org.apache.hadoop.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -153,16 +154,24 @@ class RamDiskAsyncLazyPersistService {
* Execute the task sometime in the future, using ThreadPools.
*/
synchronized void execute(String storageId, Runnable task) {
- if (executors == null) {
- throw new RuntimeException(
- "AsyncLazyPersistService is already shutdown");
- }
- ThreadPoolExecutor executor = executors.get(storageId);
- if (executor == null) {
- throw new RuntimeException("Cannot find root storage volume with id " +
- storageId + " for execution of task " + task);
- } else {
- executor.execute(task);
+ try {
+ if (executors == null) {
+ throw new RuntimeException(
+ "AsyncLazyPersistService is already shutdown");
+ }
+ ThreadPoolExecutor executor = executors.get(storageId);
+ if (executor == null) {
+ throw new RuntimeException("Cannot find root storage volume with id " +
+ storageId + " for execution of task " + task);
+ } else {
+ executor.execute(task);
+ }
+ } catch (RuntimeException re) {
+ if (task instanceof ReplicaLazyPersistTask) {
+ IOUtils.cleanupWithLogger(null,
+ ((ReplicaLazyPersistTask) task).targetVolume);
+ }
+ throw re;
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
index b9da5f4..b1a675c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
@@ -33,6 +33,9 @@ import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.Random;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -562,4 +565,56 @@ public class TestDataTransferProtocol {
checksum, CachingStrategy.newDefaultStrategy(), false, false,
null, null, new String[0]);
}
+
+ @Test(timeout = 30000)
+ public void testReleaseVolumeRefIfExceptionThrown()
+ throws IOException, InterruptedException {
+ Path file = new Path("dataprotocol.dat");
+ int numDataNodes = 1;
+
+ Configuration conf = new HdfsConfiguration();
+ conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, numDataNodes);
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
+ numDataNodes).build();
+ try {
+ cluster.waitActive();
+ datanode = cluster.getFileSystem().getDataNodeStats(
+ DatanodeReportType.LIVE)[0];
+ dnAddr = NetUtils.createSocketAddr(datanode.getXferAddr());
+ FileSystem fileSys = cluster.getFileSystem();
+
+ int fileLen = Math.min(
+ conf.getInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096), 4096);
+
+ DFSTestUtil.createFile(fileSys, file, fileLen, fileLen,
+ fileSys.getDefaultBlockSize(file),
+ fileSys.getDefaultReplication(file), 0L);
+
+ // Get the first blockid for the file.
+ final ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
+
+ String bpid = cluster.getNamesystem().getBlockPoolId();
+ ExtendedBlock blk = new ExtendedBlock(bpid, firstBlock.getLocalBlock());
+ sendBuf.reset();
+ recvBuf.reset();
+
+ // Delete the meta file to create a exception in BlockSender constructor.
+ DataNode dn = cluster.getDataNodes().get(0);
+ cluster.getMaterializedReplica(0, blk).deleteMeta();
+
+ FsVolumeImpl volume = (FsVolumeImpl) DataNodeTestUtils.getFSDataset(
+ dn).getVolume(blk);
+ int beforeCnt = volume.getReferenceCount();
+
+ sender.copyBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN);
+ sendRecvData("Copy a block.", false);
+ Thread.sleep(3000);
+
+ int afterCnt = volume.getReferenceCount();
+ assertEquals(beforeCnt, afterCnt);
+
+ } finally {
+ cluster.shutdown();
+ }
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index d45efed..1a28f2d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.util.function.Supplier;
+import org.apache.hadoop.hdfs.server.datanode.LocalReplica;
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
import java.io.OutputStream;
@@ -1241,4 +1242,37 @@ public class TestFsDatasetImpl {
assertTrue(blockDir.delete());
}
}
+
+ @Test(timeout = 20000)
+ public void testReleaseVolumeRefIfExceptionThrown() throws IOException {
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(
+ new HdfsConfiguration()).build();
+ cluster.waitActive();
+ FsVolumeImpl vol = (FsVolumeImpl) dataset.getFsVolumeReferences().get(0);
+ ExtendedBlock eb;
+ ReplicaInfo info;
+ int beforeCnt = 0;
+ try {
+ List<Block> blockList = new ArrayList<Block>();
+ eb = new ExtendedBlock(BLOCKPOOL, 1, 1, 1001);
+ info = new FinalizedReplica(
+ eb.getLocalBlock(), vol, vol.getCurrentDir().getParentFile());
+ dataset.volumeMap.add(BLOCKPOOL, info);
+ ((LocalReplica) info).getBlockFile().createNewFile();
+ ((LocalReplica) info).getMetaFile().createNewFile();
+ blockList.add(info);
+
+ // Create a runtime exception.
+ dataset.asyncDiskService.shutdown();
+
+ beforeCnt = vol.getReferenceCount();
+ dataset.invalidate(BLOCKPOOL, blockList.toArray(new Block[0]));
+
+ } catch (RuntimeException re) {
+ int afterCnt = vol.getReferenceCount();
+ assertEquals(beforeCnt, afterCnt);
+ } finally {
+ cluster.shutdown();
+ }
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
index c0b4b17..14ed26e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
@@ -18,6 +18,9 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.ThreadUtil;
import org.junit.Assert;
@@ -280,4 +283,38 @@ public class TestLazyPersistFiles extends LazyPersistTestCase {
}
}
}
+
+ @Test(timeout = 20000)
+ public void testReleaseVolumeRefIfExceptionThrown()
+ throws IOException, InterruptedException {
+ getClusterBuilder().setRamDiskReplicaCapacity(2).build();
+ final String methodName = GenericTestUtils.getMethodName();
+ final int seed = 0xFADED;
+ Path path = new Path("/" + methodName + ".Writer.File.dat");
+
+ DataNode dn = cluster.getDataNodes().get(0);
+ FsDatasetSpi.FsVolumeReferences volumes =
+ DataNodeTestUtils.getFSDataset(dn).getFsVolumeReferences();
+ int[] beforeCnts = new int[volumes.size()];
+ FsDatasetImpl ds = (FsDatasetImpl) DataNodeTestUtils.getFSDataset(dn);
+
+ // Create a runtime exception.
+ ds.asyncLazyPersistService.shutdown();
+ for (int i = 0; i < volumes.size(); ++i) {
+ beforeCnts[i] = ((FsVolumeImpl) volumes.get(i)).getReferenceCount();
+ }
+
+ makeRandomTestFile(path, BLOCK_SIZE, true, seed);
+ Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+
+ for (int i = 0; i < volumes.size(); ++i) {
+ int afterCnt = ((FsVolumeImpl) volumes.get(i)).getReferenceCount();
+ // LazyWriter keeps trying to save copies even if
+ // asyncLazyPersistService is already shutdown.
+ // If we do not release references, the number of
+ // references will increase infinitely.
+ Assert.assertTrue(
+ beforeCnts[i] == afterCnt || beforeCnts[i] == (afterCnt - 1));
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org