You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by he...@apache.org on 2020/12/04 14:42:52 UTC
[hadoop] 01/02: HDFS-15240. Erasure Coding: dirty buffer causes
reconstruction block error. Contributed by HuangTao.
This is an automated email from the ASF dual-hosted git repository.
hexiaoqiao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 6e215953e6a7397f87cd0c44e1c3eaab220cdffe
Author: Hui Fei <fe...@apache.org>
AuthorDate: Fri Dec 4 09:20:09 2020 +0800
HDFS-15240. Erasure Coding: dirty buffer causes reconstruction block error. Contributed by HuangTao.
---
.../apache/hadoop/io/ElasticByteBufferPool.java | 1 +
.../server/datanode/DataNodeFaultInjector.java | 16 ++
.../datanode/erasurecode/StripedBlockReader.java | 5 +
.../server/datanode/erasurecode/StripedReader.java | 21 +-
.../datanode/erasurecode/StripedReconstructor.java | 6 +
.../hadoop/hdfs/TestReconstructStripedFile.java | 240 +++++++++++++++++++++
.../erasurecode/ErasureCodingTestHelper.java | 30 +++
7 files changed, 316 insertions(+), 3 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java
index bbedf2a..0350db3 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java
@@ -96,6 +96,7 @@ public final class ElasticByteBufferPool implements ByteBufferPool {
ByteBuffer.allocate(length);
}
tree.remove(entry.getKey());
+ entry.getValue().clear();
return entry.getValue();
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
index 7e66111..08123c1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
@@ -97,6 +97,22 @@ public class DataNodeFaultInjector {
public void stripedBlockReconstruction() throws IOException {}
/**
+ * Used as a hook to inject latency when read block
+ * in erasure coding reconstruction process.
+ */
+ public void delayBlockReader() {}
+
+ /**
+ * Used as a hook to inject intercept when free the block reader buffer.
+ */
+ public void interceptFreeBlockReaderBuffer() {}
+
+ /**
+ * Used as a hook to inject intercept When finish reading from block.
+ */
+ public void interceptBlockReader() {}
+
+ /**
* Used as a hook to inject intercept when BPOfferService hold lock.
*/
public void delayWhenOfferServiceHoldLock() {}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
index 0db8a6f..ff3306b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
@@ -95,6 +96,7 @@ class StripedBlockReader {
}
void freeReadBuffer() {
+ DataNodeFaultInjector.get().interceptFreeBlockReaderBuffer();
buffer = null;
}
@@ -179,6 +181,8 @@ class StripedBlockReader {
} catch (IOException e) {
LOG.info(e.getMessage());
throw e;
+ } finally {
+ DataNodeFaultInjector.get().interceptBlockReader();
}
}
};
@@ -188,6 +192,7 @@ class StripedBlockReader {
* Perform actual reading of bytes from block.
*/
private BlockReadStats actualReadFromBlock() throws IOException {
+ DataNodeFaultInjector.get().delayBlockReader();
int len = buffer.remaining();
int n = 0;
while (n < len) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java
index 98edf72..070931c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java
@@ -43,6 +43,7 @@ import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
/**
* Manage striped readers that performs reading of block data from remote to
@@ -328,14 +329,14 @@ class StripedReader {
// cancel remaining reads if we read successfully from minimum
// number of source DNs required by reconstruction.
cancelReads(futures.keySet());
- futures.clear();
+ clearFuturesAndService();
break;
}
}
} catch (InterruptedException e) {
LOG.info("Read data interrupted.", e);
cancelReads(futures.keySet());
- futures.clear();
+ clearFuturesAndService();
break;
}
}
@@ -429,6 +430,20 @@ class StripedReader {
}
}
+ // remove all stale futures from readService, and clear futures.
+ private void clearFuturesAndService() {
+ while (!futures.isEmpty()) {
+ try {
+ Future<BlockReadStats> future = readService.poll(
+ stripedReadTimeoutInMills, TimeUnit.MILLISECONDS
+ );
+ futures.remove(future);
+ } catch (InterruptedException e) {
+ LOG.info("Clear stale futures from service is interrupted.", e);
+ }
+ }
+ }
+
void close() {
if (zeroStripeBuffers != null) {
for (ByteBuffer zeroStripeBuffer : zeroStripeBuffers) {
@@ -438,9 +453,9 @@ class StripedReader {
zeroStripeBuffers = null;
for (StripedBlockReader reader : readers) {
+ reader.closeBlockReader();
reconstructor.freeBuffer(reader.getReadBuffer());
reader.freeReadBuffer();
- reader.closeBlockReader();
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
index 4c8be82..48a0747 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.datanode.erasurecode;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -279,4 +280,9 @@ abstract class StripedReconstructor {
public ErasureCodingWorker getErasureCodingWorker() {
return erasureCodingWorker;
}
+
+ @VisibleForTesting
+ static ByteBufferPool getBufferPool() {
+ return BUFFER_POOL;
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java
index b0b3350..16ce0dd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java
@@ -23,6 +23,7 @@ import static org.junit.Assume.assumeTrue;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
@@ -34,6 +35,12 @@ import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.datanode.erasurecode.ErasureCodingTestHelper;
+import org.apache.hadoop.io.ElasticByteBufferPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -575,4 +582,237 @@ public class TestReconstructStripedFile {
}
}
}
+
+ /**
+ * When the StripedBlockReader timeout, the outdated future should be ignored.
+ * Or the NPE will be thrown, which will stop reading the remaining data, and
+ * the reconstruction task will fail.
+ */
+ @Test(timeout = 120000)
+ public void testTimeoutReadBlockInReconstruction() throws Exception {
+ assumeTrue("Ignore case where num parity units <= 1",
+ ecPolicy.getNumParityUnits() > 1);
+ int stripedBufferSize = conf.getInt(
+ DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY,
+ cellSize);
+ ErasureCodingPolicy policy = ecPolicy;
+ fs.enableErasureCodingPolicy(policy.getName());
+ fs.getClient().setErasureCodingPolicy("/", policy.getName());
+
+ // StripedBlockReconstructor#reconstruct will loop 2 times
+ final int fileLen = stripedBufferSize * 2 * ecPolicy.getNumDataUnits();
+ String fileName = "/timeout-read-block";
+ Path file = new Path(fileName);
+ writeFile(fs, fileName, fileLen);
+ fs.getFileBlockLocations(file, 0, fileLen);
+
+ LocatedBlocks locatedBlocks =
+ StripedFileTestUtil.getLocatedBlocks(file, fs);
+ Assert.assertEquals(1, locatedBlocks.getLocatedBlocks().size());
+ // The file only has one block group
+ LocatedBlock lblock = locatedBlocks.get(0);
+ DatanodeInfo[] datanodeinfos = lblock.getLocations();
+
+ // to reconstruct first block
+ DataNode dataNode = cluster.getDataNode(datanodeinfos[0].getIpcPort());
+
+ int stripedReadTimeoutInMills = conf.getInt(
+ DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_KEY,
+ DFSConfigKeys.
+ DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_DEFAULT);
+ Assert.assertTrue(
+ DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_KEY
+ + " must be greater than 2000",
+ stripedReadTimeoutInMills > 2000);
+
+ DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
+ DataNodeFaultInjector timeoutInjector = new DataNodeFaultInjector() {
+ private AtomicInteger numDelayReader = new AtomicInteger(0);
+
+ @Override
+ public void delayBlockReader() {
+ int index = numDelayReader.incrementAndGet();
+ LOG.info("Delay the {}th read block", index);
+
+ // the file's first StripedBlockReconstructor#reconstruct,
+ // and the first reader will timeout
+ if (index == 1) {
+ try {
+ GenericTestUtils.waitFor(() -> numDelayReader.get() >=
+ ecPolicy.getNumDataUnits() + 1, 50,
+ stripedReadTimeoutInMills * 3
+ );
+ } catch (TimeoutException e) {
+ Assert.fail("Can't reconstruct the file's first part.");
+ } catch (InterruptedException e) {
+ }
+ }
+ // stop all the following re-reconstruction tasks
+ if (index > 3 * ecPolicy.getNumDataUnits() + 1) {
+ while (true) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ }
+ };
+ DataNodeFaultInjector.set(timeoutInjector);
+
+ try {
+ shutdownDataNode(dataNode);
+ // before HDFS-15240, NPE will cause reconstruction fail(test timeout)
+ StripedFileTestUtil
+ .waitForReconstructionFinished(file, fs, groupSize);
+ } finally {
+ DataNodeFaultInjector.set(oldInjector);
+ }
+ }
+
+ /**
+ * When block reader timeout, the outdated future should be ignored.
+ * Or the ByteBuffer would be wrote after giving back to the BufferPool.
+ * This UT is used to ensure that we should close block reader
+ * before freeing the buffer.
+ */
+ @Test(timeout = 120000)
+ public void testAbnormallyCloseDoesNotWriteBufferAgain() throws Exception {
+ assumeTrue("Ignore case where num parity units <= 1",
+ ecPolicy.getNumParityUnits() > 1);
+ int stripedBufferSize = conf.getInt(
+ DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY,
+ cellSize);
+ // StripedBlockReconstructor#reconstruct will loop 2 times
+ final int fileLen = stripedBufferSize * 2 * ecPolicy.getNumDataUnits();
+ String fileName = "/no-dirty-buffer";
+ Path file = new Path(fileName);
+ writeFile(fs, fileName, fileLen);
+ fs.getFileBlockLocations(file, 0, fileLen);
+
+ LocatedBlocks locatedBlocks =
+ StripedFileTestUtil.getLocatedBlocks(file, fs);
+ Assert.assertEquals(1, locatedBlocks.getLocatedBlocks().size());
+ // The file only has one block group
+ LocatedBlock lblock = locatedBlocks.get(0);
+ DatanodeInfo[] datanodeinfos = lblock.getLocations();
+
+ // to reconstruct first block
+ DataNode dataNode = cluster.getDataNode(datanodeinfos[0].getIpcPort());
+
+ int stripedReadTimeoutInMills = conf.getInt(
+ DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_KEY,
+ DFSConfigKeys.
+ DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_DEFAULT);
+ Assert.assertTrue(
+ DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_KEY
+ + " must be greater than 2000",
+ stripedReadTimeoutInMills > 2000);
+
+ ElasticByteBufferPool bufferPool =
+ (ElasticByteBufferPool) ErasureCodingTestHelper.getBufferPool();
+ emptyBufferPool(bufferPool, true);
+ emptyBufferPool(bufferPool, false);
+
+ AtomicInteger finishedReadBlock = new AtomicInteger(0);
+
+ DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
+ DataNodeFaultInjector timeoutInjector = new DataNodeFaultInjector() {
+ private AtomicInteger numDelayReader = new AtomicInteger(0);
+ private AtomicBoolean continueRead = new AtomicBoolean(false);
+ private AtomicBoolean closeByNPE = new AtomicBoolean(false);
+
+ @Override
+ public void delayBlockReader() {
+ int index = numDelayReader.incrementAndGet();
+ LOG.info("Delay the {}th read block", index);
+
+ // the file's first StripedBlockReconstructor#reconstruct,
+ // and the first reader will timeout
+ if (index == 1) {
+ try {
+ GenericTestUtils.waitFor(() -> numDelayReader.get() >=
+ ecPolicy.getNumDataUnits() + 1, 50,
+ stripedReadTimeoutInMills * 3
+ );
+ } catch (TimeoutException e) {
+ Assert.fail("Can't reconstruct the file's first part.");
+ } catch (InterruptedException e) {
+ }
+ }
+ if (index > ecPolicy.getNumDataUnits() + 1) {
+ try {
+ GenericTestUtils.waitFor(
+ () -> {
+ LOG.info("Close by NPE: {}, continue read: {}",
+ closeByNPE, continueRead);
+ return closeByNPE.get() ? continueRead.get()
+ : index == finishedReadBlock.get() + 1; }, 5,
+ stripedReadTimeoutInMills * 3
+ );
+ } catch (TimeoutException e) {
+ Assert.fail("Can't reconstruct the file's remaining part.");
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ @Override
+ public void interceptBlockReader() {
+ int n = finishedReadBlock.incrementAndGet();
+ LOG.info("Intercept the end of {}th read block.", n);
+ }
+
+ private AtomicInteger numFreeBuffer = new AtomicInteger(0);
+ @Override
+ public void interceptFreeBlockReaderBuffer() {
+ closeByNPE.compareAndSet(false, true);
+ int num = numFreeBuffer.incrementAndGet();
+ LOG.info("Intercept the {} free block buffer.", num);
+ if (num >= ecPolicy.getNumDataUnits() + 1) {
+ continueRead.compareAndSet(false, true);
+ try {
+ GenericTestUtils.waitFor(() -> finishedReadBlock.get() >=
+ 2 * ecPolicy.getNumDataUnits() + 1, 50,
+ stripedReadTimeoutInMills * 3
+ );
+ } catch (TimeoutException e) {
+ Assert.fail("Can't finish the file's reconstruction.");
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ };
+ DataNodeFaultInjector.set(timeoutInjector);
+ try {
+ shutdownDataNode(dataNode);
+ // at least one timeout reader
+ GenericTestUtils.waitFor(() -> finishedReadBlock.get() >=
+ 2 * ecPolicy.getNumDataUnits() + 1, 50,
+ stripedReadTimeoutInMills * 3
+ );
+
+ assertBufferPoolIsEmpty(bufferPool, false);
+ assertBufferPoolIsEmpty(bufferPool, true);
+ StripedFileTestUtil.waitForReconstructionFinished(file, fs, groupSize);
+ } finally {
+ DataNodeFaultInjector.set(oldInjector);
+ }
+ }
+
+ private void assertBufferPoolIsEmpty(ElasticByteBufferPool bufferPool,
+ boolean direct) {
+ while (bufferPool.size(direct) != 0) {
+ // iterate all ByteBuffers in ElasticByteBufferPool
+ ByteBuffer byteBuffer = bufferPool.getBuffer(direct, 0);
+ Assert.assertEquals(0, byteBuffer.position());
+ }
+ }
+
+ private void emptyBufferPool(ElasticByteBufferPool bufferPool,
+ boolean direct) {
+ while (bufferPool.size(direct) != 0) {
+ bufferPool.getBuffer(direct, 0);
+ }
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingTestHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingTestHelper.java
new file mode 100644
index 0000000..da571055
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingTestHelper.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.erasurecode;
+
+import org.apache.hadoop.io.ByteBufferPool;
+
+public final class ErasureCodingTestHelper {
+
+ private ErasureCodingTestHelper() { }
+
+ public static ByteBufferPool getBufferPool() {
+ return StripedReconstructor.getBufferPool();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org