You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by so...@apache.org on 2021/11/16 14:30:13 UTC
[ozone] branch HDDS-3816-ec updated: HDDS-5951. EC: ECBlockReconstructedStripeInputStream should handle block read failures and continue reading (#2831)
This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
new 52f05fc HDDS-5951. EC: ECBlockReconstructedStripeInputStream should handle block read failures and continue reading (#2831)
52f05fc is described below
commit 52f05fc34927a6d02fcae01814b5e92e8a7d7f14
Author: Stephen O'Donnell <st...@gmail.com>
AuthorDate: Tue Nov 16 14:29:52 2021 +0000
HDDS-5951. EC: ECBlockReconstructedStripeInputStream should handle block read failures and continue reading (#2831)
---
.../hadoop/ozone/client/io/ECBlockInputStream.java | 22 ++--
.../io/ECBlockReconstructedStripeInputStream.java | 144 ++++++++++++++-------
.../TestECBlockReconstructedStripeInputStream.java | 96 +++++++++++++-
3 files changed, 198 insertions(+), 64 deletions(-)
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
index e6e7ccc..07333d2 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
@@ -111,9 +111,9 @@ public class ECBlockInputStream extends BlockExtendedInputStream {
this.xceiverClientFactory = xceiverClientFactory;
this.refreshFunction = refreshFunction;
this.maxLocations = repConfig.getData() + repConfig.getParity();
- this.dataLocations =
- new DatanodeDetails[repConfig.getData() + repConfig.getParity()];
- this.blockStreams = new BlockExtendedInputStream[repConfig.getData()];
+ this.dataLocations = new DatanodeDetails[repConfig.getRequiredNodes()];
+ this.blockStreams =
+ new BlockExtendedInputStream[repConfig.getRequiredNodes()];
this.stripeSize = (long)ecChunkSize * repConfig.getData();
setBlockLocations(this.blockInfo.getPipeline());
@@ -152,9 +152,8 @@ public class ECBlockInputStream extends BlockExtendedInputStream {
* stream if it has not been opened already.
* @return BlockInput stream to read from.
*/
- protected BlockExtendedInputStream getOrOpenStream(
- int streamIndex, int locationIndex) {
- BlockExtendedInputStream stream = blockStreams[streamIndex];
+ protected BlockExtendedInputStream getOrOpenStream(int locationIndex) {
+ BlockExtendedInputStream stream = blockStreams[locationIndex];
if (stream == null) {
// To read an EC block, we create a STANDALONE pipeline that contains the
// single location for the block index we want to read. The EC blocks are
@@ -180,7 +179,7 @@ public class ECBlockInputStream extends BlockExtendedInputStream {
blkInfo, pipeline,
blockInfo.getToken(), verifyChecksum, xceiverClientFactory,
refreshFunction);
- blockStreams[streamIndex] = stream;
+ blockStreams[locationIndex] = stream;
}
return stream;
}
@@ -258,8 +257,7 @@ public class ECBlockInputStream extends BlockExtendedInputStream {
int totalRead = 0;
while(strategy.getTargetLength() > 0 && remaining() > 0) {
int currentIndex = currentStreamIndex();
- BlockExtendedInputStream stream =
- getOrOpenStream(currentIndex, currentIndex);
+ BlockExtendedInputStream stream = getOrOpenStream(currentIndex);
int read = readFromStream(stream, strategy);
totalRead += read;
position += read;
@@ -380,10 +378,8 @@ public class ECBlockInputStream extends BlockExtendedInputStream {
"EOF encountered at pos: " + pos + " for block: "
+ blockInfo.getBlockID());
}
- if (position != pos) {
- position = pos;
- seeked = true;
- }
+ position = pos;
+ seeked = true;
}
@Override
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
index 54e2de0..1fdb7a2 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
@@ -36,8 +36,11 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Random;
+import java.util.Set;
import java.util.function.Function;
/**
@@ -94,6 +97,8 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
private int[] missingIndexes;
// The blockLocation indexes to use to read data into the dataBuffers.
private List<Integer> dataIndexes = new ArrayList<>();
+ // Data Indexes we have tried to read from, and failed for some reason
+ private Set<Integer> failedDataIndexes = new HashSet<>();
private final RawErasureDecoder decoder;
@@ -109,19 +114,19 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
decoder = CodecRegistry.getInstance()
.getCodecFactory(repConfig.getCodec().toString())
.createDecoder(repConfig);
+
+ // The EC decoder needs an array data+parity long, with missing or not
+ // needed indexes set to null.
+ decoderInputBuffers = new ByteBuffer[getRepConfig().getRequiredNodes()];
}
protected void init() throws InsufficientLocationsException {
if (!hasSufficientLocations()) {
- throw new InsufficientLocationsException("There are not enough " +
+ throw new InsufficientLocationsException("There are insufficient " +
"datanodes to read the EC block");
}
-
+ dataIndexes.clear();
ECReplicationConfig repConfig = getRepConfig();
- // The EC decoder needs an array data+parity long, with missing or not
- // needed indexes set to null.
- decoderInputBuffers = new ByteBuffer[
- getRepConfig().getData() + getRepConfig().getParity()];
DatanodeDetails[] locations = getDataLocations();
setMissingIndexesAndDataLocations(locations);
List<Integer> parityIndexes =
@@ -130,9 +135,16 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
dataIndexes.addAll(parityIndexes);
// The decoder inputs originally start as all nulls. Then we populate the
// pieces we have data for. The parity buffers are reused for the block
- // so we can allocated them now.
- for (Integer i : parityIndexes) {
- decoderInputBuffers[i] = allocateBuffer(repConfig);
+ // so we can allocated them now. On re-init, we reuse any parity buffers
+ // already allocated.
+ for (int i = repConfig.getData(); i < repConfig.getRequiredNodes(); i++) {
+ if (parityIndexes.contains(i)) {
+ if (decoderInputBuffers[i] == null) {
+ decoderInputBuffers[i] = allocateBuffer(repConfig);
+ }
+ } else {
+ decoderInputBuffers[i] = null;
+ }
}
decoderOutputBuffers = new ByteBuffer[missingIndexes.length];
initialized = true;
@@ -150,9 +162,10 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
int expectedDataBlocks = calculateExpectedDataBlocks(repConfig);
List<Integer> missingInd = new ArrayList<>();
for (int i = 0; i < repConfig.getData(); i++) {
- if (locations[i] == null && i < expectedDataBlocks) {
+ if ((locations[i] == null || failedDataIndexes.contains(i))
+ && i < expectedDataBlocks) {
missingInd.add(i);
- } else if (locations[i] != null) {
+ } else if (locations[i] != null && !failedDataIndexes.contains(i)) {
dataIndexes.add(i);
}
}
@@ -171,6 +184,7 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
for (int i = 0; i < repConfig.getData(); i++) {
if (isMissingIndex(i)) {
decoderOutputBuffers[recoveryIndex++] = bufs[i];
+ decoderInputBuffers[i] = null;
} else {
decoderInputBuffers[i] = bufs[i];
}
@@ -210,12 +224,28 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
return EOF;
}
validateBuffers(bufs);
- assignBuffers(bufs);
- clearParityBuffers();
- // Set the read limits on the buffers so we do not read any garbage data
- // from the end of the block that is unexpected.
- setBufferReadLimits(bufs, toRead);
- loadDataBuffersFromStream();
+ while(true) {
+ try {
+ assignBuffers(bufs);
+ clearParityBuffers();
+ // Set the read limits on the buffers so we do not read any garbage data
+ // from the end of the block that is unexpected.
+ setBufferReadLimits(bufs, toRead);
+ loadDataBuffersFromStream();
+ break;
+ } catch (IOException e) {
+ // Re-init now the bad block has been excluded. If we have ran out of
+ // locations, init will throw an InsufficientLocations exception.
+ init();
+ // seek to the current position so it rewinds any blocks we read
+ // already.
+ seek(getPos());
+ // Reset the input positions back to zero
+ for (ByteBuffer b : bufs) {
+ b.position(0);
+ }
+ }
+ }
padBuffers(toRead);
flipInputs();
decodeStripe();
@@ -277,7 +307,7 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
return;
}
- if (fullChunks == 0){
+ if (fullChunks == 0) {
bufs[0].limit(toRead);
// All buffers except the first contain no data.
for (int i = 1; i < bufs.length; i++) {
@@ -319,8 +349,9 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
}
/**
- * Take the parity indexes which are available, shuffle them and truncate the
- * list to the number of required parity chunks.
+ * Take the parity indexes which are already used, and the others which are
+ * available, and select random indexes to meet numRequired. The resulting
+ * list is sorted in ascending order of the indexes.
* @param locations The list of locations for all blocks in the block group/
* @param numRequired The number of parity chunks needed for reconstruction
* @return A list of indexes indicating which parity locations to read.
@@ -328,19 +359,27 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
private List<Integer> selectParityIndexes(
DatanodeDetails[] locations, int numRequired) {
List<Integer> indexes = new ArrayList<>();
+ List<Integer> selected = new ArrayList<>();
ECReplicationConfig repConfig = getRepConfig();
- for (int i = repConfig.getData();
- i < repConfig.getParity() + repConfig.getData(); i++) {
- if (locations[i] != null) {
+ for (int i = repConfig.getData(); i < repConfig.getRequiredNodes(); i++) {
+ if (locations[i] != null && !failedDataIndexes.contains(i)
+ && decoderInputBuffers[i] == null) {
indexes.add(i);
}
+ // If we are re-initializing, we want to make sure we are re-using any
+ // previously selected good parity indexes, as the block stream is already
+ // opened.
+ if (decoderInputBuffers[i] != null && !failedDataIndexes.contains(i)) {
+ selected.add(i);
+ }
}
- Preconditions.assertTrue(indexes.size() >= numRequired);
+ Preconditions.assertTrue(indexes.size() + selected.size() >= numRequired);
Random rand = new Random();
- while (indexes.size() > numRequired) {
- indexes.remove(rand.nextInt(indexes.size()));
+ while (selected.size() < numRequired) {
+ selected.add(indexes.remove(rand.nextInt(indexes.size())));
}
- return indexes;
+ Collections.sort(selected);
+ return selected;
}
private ByteBuffer allocateBuffer(ECReplicationConfig repConfig) {
@@ -366,26 +405,32 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
}
protected void loadDataBuffersFromStream() throws IOException {
- for (int i = 0; i < dataIndexes.size(); i++) {
- BlockExtendedInputStream stream =
- getOrOpenStream(i, dataIndexes.get(i));
- seekStreamIfNecessary(stream, 0);
- ByteBuffer b = decoderInputBuffers[dataIndexes.get(i)];
- while (b.hasRemaining()) {
- int read = stream.read(b);
- if (read == EOF) {
- // We should not reach EOF, as the block should have enough data to
- // fill the buffer. If the block does not, then it indicates the block
- // is not as long as it should be, based on the block length stored in
- // OM. Therefore if there is any remaining space in the buffer, we
- // should throw an exception.
- if (b.hasRemaining()) {
- throw new IOException("Expected to read " + b.remaining() +
- " bytes from block " + getBlockID() + " EC index " + (i + 1) +
- " but reached EOF");
+ for (int i : dataIndexes) {
+ try {
+ BlockExtendedInputStream stream = getOrOpenStream(i);
+ seekStreamIfNecessary(stream, 0);
+ ByteBuffer b = decoderInputBuffers[i];
+ while (b.hasRemaining()) {
+ int read = stream.read(b);
+ if (read == EOF) {
+ // We should not reach EOF, as the block should have enough data to
+ // fill the buffer. If the block does not, then it indicates the
+ // block is not as long as it should be, based on the block length
+ // stored in OM. Therefore if there is any remaining space in the
+ // buffer, we should throw an exception.
+ if (b.hasRemaining()) {
+ throw new IOException("Expected to read " + b.remaining() +
+ " bytes from block " + getBlockID() + " EC index " + (i + 1) +
+ " but reached EOF");
+ }
+ break;
}
- break;
}
+ } catch (IOException e) {
+ LOG.warn("Failed to read from block {} EC index {}. Excluding the " +
+ "block", getBlockID(), i + 1, e);
+ failedDataIndexes.add(i);
+ throw e;
}
}
}
@@ -422,12 +467,15 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
int availableLocations =
availableDataLocations() + availableParityLocations();
int paddedLocations = repConfig.getData() - expectedDataBlocks;
+ int failedLocations = failedDataIndexes.size();
- if (availableLocations + paddedLocations >= repConfig.getData()) {
+ if (availableLocations + paddedLocations - failedLocations
+ >= repConfig.getData()) {
return true;
} else {
- LOG.warn("There are insufficient locations. {} available {} padded {} " +
- "expected", availableLocations, paddedLocations, expectedDataBlocks);
+ LOG.error("There are insufficient locations. {} available; {} padded;" +
+ " {} failed; {} expected;", availableLocations, paddedLocations,
+ failedLocations, expectedDataBlocks);
return false;
}
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
index bf44297..33fb33a 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
import org.apache.hadoop.ozone.client.io.ECBlockInputStream;
import org.apache.hadoop.ozone.client.io.ECBlockReconstructedStripeInputStream;
+import org.apache.hadoop.ozone.client.io.InsufficientLocationsException;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.security.token.Token;
import org.apache.ozone.erasurecode.CodecRegistry;
@@ -354,8 +355,8 @@ public class TestECBlockReconstructedStripeInputStream {
try {
ecb.readStripe(bufs);
Assert.fail("Read should have thrown an exception");
- } catch (IOException e) {
- Assert.assertTrue(e.getMessage().matches("^Expected\\sto\\sread.+"));
+ } catch (InsufficientLocationsException e) {
+ // expected
}
}
}
@@ -459,6 +460,87 @@ public class TestECBlockReconstructedStripeInputStream {
}
}
+ @Test
+ public void testErrorReadingBlockContinuesReading() throws IOException {
+ // Generate the input data for 3 full stripes and generate the parity.
+ int chunkSize = repConfig.getEcChunkSize();
+ int partialStripeSize = chunkSize * 2 - 1;
+ ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(),
+ 4 * chunkSize);
+ dataBufs[1].limit(4 * chunkSize - 1);
+ dataBufs[2].limit(3 * chunkSize);
+ for (ByteBuffer b : dataBufs) {
+ randomFill(b);
+ }
+ ByteBuffer[] parity = generateParity(dataBufs, repConfig);
+
+ List<List<Integer>> failLists = new ArrayList<>();
+ failLists.add(indexesToList(0, 1));
+ // These will be the first parity read and then the next parity read as a
+ // replacement
+ failLists.add(indexesToList(2, 3));
+ // First parity and then the data block
+ failLists.add(indexesToList(2, 0));
+
+ for (List<Integer> failList : failLists) {
+ streamFactory = new TestBlockInputStreamFactory();
+ addDataStreamsToFactory(dataBufs, parity);
+
+ // Data block index 3 is missing and needs recovered initially.
+ Map<DatanodeDetails, Integer> dnMap = createIndexMap(1, 2, 4, 5);
+ OmKeyLocationInfo keyInfo = createKeyInfo(repConfig,
+ stripeSize() * 3 + partialStripeSize, dnMap);
+ streamFactory.setCurrentPipeline(keyInfo.getPipeline());
+
+ ByteBuffer[] bufs = allocateByteBuffers(repConfig);
+ try (ECBlockReconstructedStripeInputStream ecb =
+ new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
+ null, null, streamFactory)) {
+ // After reading the first stripe, make one of the streams error
+ for (int i = 0; i < 3; i++) {
+ int read = ecb.readStripe(bufs);
+ for (int j = 0; j < bufs.length; j++) {
+ validateContents(dataBufs[j], bufs[j], i * chunkSize, chunkSize);
+ }
+ Assert.assertEquals(stripeSize() * (i + 1), ecb.getPos());
+ Assert.assertEquals(stripeSize(), read);
+ clearBuffers(bufs);
+ if (i == 0) {
+ streamFactory.getBlockStreams().get(failList.remove(0))
+ .setShouldError(true);
+ }
+ }
+ // The next read is a partial stripe
+ int read = ecb.readStripe(bufs);
+ Assert.assertEquals(partialStripeSize, read);
+ validateContents(dataBufs[0], bufs[0], 3 * chunkSize, chunkSize);
+ validateContents(dataBufs[1], bufs[1], 3 * chunkSize, chunkSize - 1);
+ Assert.assertEquals(0, bufs[2].remaining());
+ Assert.assertEquals(0, bufs[2].position());
+
+ // seek back to zero, make another block fail. The next read should
+ // error as there are not enough blocks to read.
+ ecb.seek(0);
+ streamFactory.getBlockStreams().get(failList.remove(0))
+ .setShouldError(true);
+ try {
+ clearBuffers(bufs);
+ ecb.readStripe(bufs);
+ Assert.fail("InsufficientLocationsException expected");
+ } catch (InsufficientLocationsException e) {
+ // expected
+ }
+ }
+ }
+ }
+
+ private List<Integer> indexesToList(int... indexes) {
+ List<Integer> list = new ArrayList<>();
+ for (int i : indexes) {
+ list.add(i);
+ }
+ return list;
+ }
private void addDataStreamsToFactory(ByteBuffer[] data, ByteBuffer[] parity) {
List<ByteBuffer> dataStreams = new ArrayList<>();
@@ -655,7 +737,7 @@ public class TestECBlockReconstructedStripeInputStream {
int repInd = currentPipeline.getReplicaIndex(pipeline.getNodes().get(0));
TestBlockInputStream stream = new TestBlockInputStream(
blockInfo.getBlockID(), blockInfo.getLength(),
- blockStreamData.get(repInd -1));
+ blockStreamData.get(repInd - 1));
blockStreams.add(stream);
return stream;
}
@@ -667,6 +749,7 @@ public class TestECBlockReconstructedStripeInputStream {
private boolean closed = false;
private BlockID blockID;
private long length;
+ private boolean shouldError = false;
private static final byte EOF = -1;
TestBlockInputStream(BlockID blockId, long blockLen, ByteBuffer data) {
@@ -680,6 +763,10 @@ public class TestECBlockReconstructedStripeInputStream {
return closed;
}
+ public void setShouldError(boolean val) {
+ shouldError = val;
+ }
+
@Override
public BlockID getBlockID() {
return blockID;
@@ -703,6 +790,9 @@ public class TestECBlockReconstructedStripeInputStream {
@Override
public int read(ByteBuffer buf) throws IOException {
+ if (shouldError) {
+ throw new IOException("Simulated error reading block");
+ }
if (getRemaining() == 0) {
return EOF;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org