You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ad...@apache.org on 2022/06/07 19:35:44 UTC
[ozone] branch master updated: HDDS-6665. EC: Extend BlockReconstructedInputStreams to recover parity block buffers as well if missing (#3457)
This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 039e8a8f5a HDDS-6665. EC: Extend BlockReconstructedInputStreams to recover parity block buffers as well if missing (#3457)
039e8a8f5a is described below
commit 039e8a8f5a4c49a6ebed31839862ce1975a28036
Author: Doroszlai, Attila <64...@users.noreply.github.com>
AuthorDate: Tue Jun 7 21:35:38 2022 +0200
HDDS-6665. EC: Extend BlockReconstructedInputStreams to recover parity block buffers as well if missing (#3457)
---
hadoop-hdds/client/pom.xml | 4 +
.../io/ECBlockReconstructedStripeInputStream.java | 460 +++++++++++++--------
.../TestECBlockReconstructedStripeInputStream.java | 372 +++++++++++------
.../rawcoder/ByteBufferDecodingState.java | 17 +-
4 files changed, 555 insertions(+), 298 deletions(-)
diff --git a/hadoop-hdds/client/pom.xml b/hadoop-hdds/client/pom.xml
index 9f2116c96f..5770f72970 100644
--- a/hadoop-hdds/client/pom.xml
+++ b/hadoop-hdds/client/pom.xml
@@ -70,6 +70,10 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<version>${spotbugs.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-params</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
index 197eba7922..6a2eb9af26 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.ozone.client.io;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.hadoop.hdds.client.BlockID;
@@ -39,22 +40,31 @@ import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Queue;
-import java.util.Random;
import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.Function;
+import static java.util.Collections.emptySortedSet;
+import static java.util.stream.Collectors.toCollection;
+import static java.util.stream.Collectors.toSet;
+import static java.util.stream.IntStream.range;
+
/**
* Class to read EC encoded data from blocks a stripe at a time, when some of
* the data blocks are not available. The public API for this class is:
*
* readStripe(ByteBuffer[] bufs)
+ * recoverChunks(ByteBuffer[] bufs)
*
* The other inherited public APIs will throw a NotImplementedException. This is
* because this class is intended to only read full stripes into a reusable set
@@ -88,6 +98,18 @@ import java.util.function.Function;
* than a full stripe, the client can simply read upto remaining from each
* buffer in turn. If there is a full stripe, each buffer should have ecChunk
* size remaining.
+ *
+ * recoverChunks() handles the more generic case, where specific buffers, even
+ * parity ones, are to be recovered -- these should be passed to the method.
+ * The whole stripe is not important for the caller in this case. The indexes
+ * of the buffers that need to be recovered should be set by calling
+ * setRecoveryIndexes() before any read operation.
+ *
+ * Example: with rs-3-2 there are 3 data and 2 parity replicas, indexes are [0,
+ * 1, 2, 3, 4]. If replicas 2 and 3 are missing and need to be recovered,
+ * caller should {@code setRecoveryIndexes([2, 3])}, and then can recover the
+ * part of each stripe for these replicas by calling
+ * {@code recoverChunks(bufs)}, passing two buffers.
*/
public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
@@ -99,20 +121,34 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
private ByteBuffer[] decoderInputBuffers;
// Missing chunks are recovered into these buffers.
private ByteBuffer[] decoderOutputBuffers;
+
// Missing indexes to be recovered into the recovered buffers. Required by the
// EC decoder
- private int[] missingIndexes;
- // The blockLocation indexes to use to read data into the dataBuffers.
- private List<Integer> dataIndexes = new ArrayList<>();
+ private final SortedSet<Integer> missingIndexes = new TreeSet<>();
+
+ // indexes for data, padding and parity blocks
+ private final SortedSet<Integer> dataIndexes;
+ private final SortedSet<Integer> paddingIndexes;
+ private final SortedSet<Integer> parityIndexes;
+ private final SortedSet<Integer> allIndexes;
+
+ // The blockLocation indexes to use to read data into the output buffers
+ private final SortedSet<Integer> selectedIndexes = new TreeSet<>();
+ // indexes of internal buffers (ones which are not requested by the caller,
+ // but needed for reconstructing missing data)
+ private final SortedSet<Integer> internalBuffers = new TreeSet<>();
// Data Indexes we have tried to read from, and failed for some reason
- private Set<Integer> failedDataIndexes = new HashSet<>();
- private ByteBufferPool byteBufferPool;
+ private final Set<Integer> failedDataIndexes = new HashSet<>();
+ private final ByteBufferPool byteBufferPool;
private RawErasureDecoder decoder;
private boolean initialized = false;
- private ExecutorService executor;
+ private final ExecutorService executor;
+
+ // for offline recovery: indexes to be recovered
+ private final Set<Integer> recoveryIndexes = new TreeSet<>();
@SuppressWarnings("checkstyle:ParameterNumber")
public ECBlockReconstructedStripeInputStream(ECReplicationConfig repConfig,
@@ -125,6 +161,13 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
refreshFunction, streamFactory);
this.byteBufferPool = byteBufferPool;
this.executor = ecReconstructExecutor;
+
+ int expectedDataBlocks = calculateExpectedDataBlocks(repConfig);
+ int d = repConfig.getData();
+ dataIndexes = setOfRange(0, expectedDataBlocks);
+ paddingIndexes = setOfRange(expectedDataBlocks, d);
+ parityIndexes = setOfRange(d, repConfig.getRequiredNodes());
+ allIndexes = setOfRange(0, repConfig.getRequiredNodes());
}
/**
@@ -138,7 +181,7 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
*
* @param dns A list of DatanodeDetails that are known to be bad.
*/
- public synchronized void addFailedDatanodes(List<DatanodeDetails> dns) {
+ public synchronized void addFailedDatanodes(Collection<DatanodeDetails> dns) {
if (initialized) {
throw new RuntimeException("Cannot add failed datanodes after the " +
"reader has been initialized");
@@ -154,92 +197,133 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
}
}
+ /**
+ * Set the EC indexes that should be recovered by
+ * {@link #recoverChunks(ByteBuffer[])}.
+ */
+ public synchronized void setRecoveryIndexes(Collection<Integer> indexes) {
+ if (initialized) {
+ throw new IllegalStateException("Cannot set recovery indexes after the " +
+ "reader has been initialized");
+ }
+ Preconditions.assertNotNull(indexes, "recovery indexes");
+ recoveryIndexes.clear();
+ recoveryIndexes.addAll(indexes);
+ }
+
private void init() throws InsufficientLocationsException {
if (decoder == null) {
decoder = CodecUtil.createRawDecoderWithFallback(getRepConfig());
}
- if (decoderInputBuffers == null) {
- // The EC decoder needs an array data+parity long, with missing or not
- // needed indexes set to null.
- decoderInputBuffers = new ByteBuffer[getRepConfig().getRequiredNodes()];
- }
if (!hasSufficientLocations()) {
throw new InsufficientLocationsException("There are insufficient " +
"datanodes to read the EC block");
}
- dataIndexes.clear();
- ECReplicationConfig repConfig = getRepConfig();
- DatanodeDetails[] locations = getDataLocations();
- setMissingIndexesAndDataLocations(locations);
- List<Integer> parityIndexes =
- selectParityIndexes(locations, missingIndexes.length);
- // We read from the selected parity blocks, so add them to the data indexes.
- dataIndexes.addAll(parityIndexes);
+ allocateInternalBuffers();
+ if (!isOfflineRecovery()) {
+ decoderOutputBuffers = new ByteBuffer[missingIndexes.size()];
+ }
+ initialized = true;
+ }
+
+ private void allocateInternalBuffers() {
// The decoder inputs originally start as all nulls. Then we populate the
- // pieces we have data for. The parity buffers are reused for the block
- // so we can allocated them now. On re-init, we reuse any parity buffers
+ // pieces we have data for. The internal buffers are reused for the block,
+ // so we can allocate them now. On re-init, we reuse any internal buffers
// already allocated.
- for (int i = repConfig.getData(); i < repConfig.getRequiredNodes(); i++) {
- if (parityIndexes.contains(i)) {
- if (decoderInputBuffers[i] == null) {
- decoderInputBuffers[i] = allocateBuffer(repConfig);
- }
- } else {
- decoderInputBuffers[i] = null;
+ final int minIndex = isOfflineRecovery() ? 0 : getRepConfig().getData();
+ for (int i = minIndex; i < getRepConfig().getRequiredNodes(); i++) {
+ boolean internalInput = selectedIndexes.contains(i)
+ || paddingIndexes.contains(i);
+ boolean hasBuffer = decoderInputBuffers[i] != null;
+
+ if (internalInput && !hasBuffer) {
+ allocateInternalBuffer(i);
+ } else if (!internalInput && hasBuffer) {
+ releaseInternalBuffer(i);
}
}
- decoderOutputBuffers = new ByteBuffer[missingIndexes.length];
- initialized = true;
}
- /**
- * Determine which indexes are missing, taking into account the length of the
- * block. For a block shorter than a full EC stripe, it is expected that
- * some of the data locations will not be present.
- * Populates the missingIndex and dataIndexes instance variables.
- * @param locations Available locations for the block group
- */
- private void setMissingIndexesAndDataLocations(DatanodeDetails[] locations) {
- ECReplicationConfig repConfig = getRepConfig();
- int expectedDataBlocks = calculateExpectedDataBlocks(repConfig);
- List<Integer> missingInd = new ArrayList<>();
- for (int i = 0; i < repConfig.getData(); i++) {
- if ((locations[i] == null || failedDataIndexes.contains(i))
- && i < expectedDataBlocks) {
- missingInd.add(i);
- } else if (locations[i] != null && !failedDataIndexes.contains(i)) {
- dataIndexes.add(i);
+ private void allocateInternalBuffer(int index) {
+ Preconditions.assertTrue(internalBuffers.add(index),
+ () -> "Buffer " + index + " already tracked as internal input");
+ decoderInputBuffers[index] =
+ byteBufferPool.getBuffer(false, getRepConfig().getEcChunkSize());
+ }
+
+ private void releaseInternalBuffer(int index) {
+ Preconditions.assertTrue(internalBuffers.remove(index),
+ () -> "Buffer " + index + " not tracked as internal input");
+ byteBufferPool.putBuffer(decoderInputBuffers[index]);
+ decoderInputBuffers[index] = null;
+ }
+
+ private void markMissingLocationsAsFailed() {
+ DatanodeDetails[] locations = getDataLocations();
+ for (int i = 0; i < locations.length; i++) {
+ if (locations[i] == null && failedDataIndexes.add(i)) {
+ LOG.debug("Marked index={} as failed", i);
}
}
- missingIndexes = missingInd.stream().mapToInt(Integer::valueOf).toArray();
+ }
+
+ private boolean isOfflineRecovery() {
+ return !recoveryIndexes.isEmpty();
}
private void assignBuffers(ByteBuffer[] bufs) {
- ECReplicationConfig repConfig = getRepConfig();
- Preconditions.assertTrue(bufs.length == repConfig.getData());
- int recoveryIndex = 0;
- // Here bufs come from the caller and will be filled with data read from
- // the blocks or recovered. Therefore, if the index is missing, we assign
- // the buffer to the decoder outputs, where data is recovered via EC
- // decoding. Otherwise the buffer is set to the input. Note, it may be a
- // buffer which needs padded.
- for (int i = 0; i < repConfig.getData(); i++) {
- if (isMissingIndex(i)) {
- decoderOutputBuffers[recoveryIndex++] = bufs[i];
- decoderInputBuffers[i] = null;
- } else {
- decoderInputBuffers[i] = bufs[i];
+ Preconditions.assertTrue(bufs.length == getExpectedBufferCount());
+
+ if (isOfflineRecovery()) {
+ decoderOutputBuffers = bufs;
+ } else {
+ int recoveryIndex = 0;
+ // Here bufs come from the caller and will be filled with data read from
+ // the blocks or recovered. Therefore, if the index is missing, we assign
+ // the buffer to the decoder outputs, where data is recovered via EC
+ // decoding. Otherwise the buffer is set to the input. Note, it may be a
+ // buffer which needs padded.
+ for (int i = 0; i < bufs.length; i++) {
+ if (isMissingIndex(i)) {
+ decoderOutputBuffers[recoveryIndex++] = bufs[i];
+ if (internalBuffers.contains(i)) {
+ releaseInternalBuffer(i);
+ } else {
+ decoderInputBuffers[i] = null;
+ }
+ } else {
+ decoderInputBuffers[i] = bufs[i];
+ }
}
}
}
+ private int getExpectedBufferCount() {
+ return isOfflineRecovery()
+ ? recoveryIndexes.size()
+ : getRepConfig().getData();
+ }
+
private boolean isMissingIndex(int ind) {
- for (int i : missingIndexes) {
- if (i == ind) {
- return true;
- }
- }
- return false;
+ return missingIndexes.contains(ind);
+ }
+
+ /**
+ * This method should be passed a list of byteBuffers which must contain the
+ * same number of entries as previously set recovery indexes. Each Bytebuffer
+ * should be at position 0 and have EC ChunkSize bytes remaining.
+ * After returning, the buffers will contain the data for the parts to be
+ * recovered in the block's next stripe. The buffers will be returned
+ * "ready to read" with their position set to zero and the limit set
+ * according to how much data they contain.
+ *
+ * @param bufs A list of byteBuffers into which recovered data is written
+ * @return The number of bytes read
+ */
+ public synchronized int recoverChunks(ByteBuffer[] bufs) throws IOException {
+ Preconditions.assertTrue(isOfflineRecovery());
+ return read(bufs);
}
/**
@@ -258,6 +342,11 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
* @throws IOException
*/
public synchronized int readStripe(ByteBuffer[] bufs) throws IOException {
+ return read(bufs);
+ }
+
+ @VisibleForTesting
+ synchronized int read(ByteBuffer[] bufs) throws IOException {
int toRead = (int)Math.min(getRemaining(), getStripeSize());
if (toRead == 0) {
return EOF;
@@ -269,10 +358,10 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
while (true) {
try {
assignBuffers(bufs);
- clearParityBuffers();
+ clearInternalBuffers();
// Set the read limits on the buffers so we do not read any garbage data
// from the end of the block that is unexpected.
- setBufferReadLimits(bufs, toRead);
+ setBufferReadLimits(toRead);
loadDataBuffersFromStream();
break;
} catch (IOException e) {
@@ -291,13 +380,13 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
throw new IOException("Interrupted waiting for reads to complete", ie);
}
}
- if (missingIndexes.length > 0) {
+ if (!missingIndexes.isEmpty()) {
padBuffers(toRead);
flipInputs();
decodeStripe();
// Reset the buffer positions and limits to remove any padding added
// before EC Decode.
- setBufferReadLimits(bufs, toRead);
+ setBufferReadLimits(toRead);
} else {
// If we have no missing indexes, then the buffers will be at their
// limits after reading so we need to flip them to ensure they are ready
@@ -316,7 +405,7 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
}
private void validateBuffers(ByteBuffer[] bufs) {
- Preconditions.assertTrue(bufs.length == getRepConfig().getData());
+ Preconditions.assertTrue(bufs.length == getExpectedBufferCount());
int chunkSize = getRepConfig().getEcChunkSize();
for (ByteBuffer b : bufs) {
Preconditions.assertTrue(b.remaining() == chunkSize);
@@ -358,38 +447,39 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
}
}
- private void setBufferReadLimits(ByteBuffer[] bufs, int toRead) {
+ private void setBufferReadLimits(int toRead) {
int chunkSize = getRepConfig().getEcChunkSize();
int fullChunks = toRead / chunkSize;
- if (fullChunks == getRepConfig().getData()) {
+ int data = getRepConfig().getData();
+ if (fullChunks == data) {
// We are reading a full stripe, no concerns over padding.
return;
}
- if (fullChunks == 0) {
- bufs[0].limit(toRead);
- // All buffers except the first contain no data.
- for (int i = 1; i < bufs.length; i++) {
- bufs[i].position(0);
- bufs[i].limit(0);
- }
- // If we have less than 1 chunk, then the parity buffers are the size
- // of the first chunk.
- for (int i = getRepConfig().getData();
- i < getRepConfig().getRequiredNodes(); i++) {
- ByteBuffer b = decoderInputBuffers[i];
- if (b != null) {
- b.limit(toRead);
- }
+ int partialLength = toRead % chunkSize;
+ setReadLimits(partialLength, fullChunks, decoderInputBuffers, allIndexes);
+ setReadLimits(partialLength, fullChunks, decoderOutputBuffers,
+ missingIndexes);
+ }
+
+ private void setReadLimits(int partialChunkSize, int fullChunks,
+ ByteBuffer[] buffers, Collection<Integer> indexes) {
+ int data = getRepConfig().getData();
+ Preconditions.assertTrue(buffers.length == indexes.size(),
+ "Mismatch: %d buffers but %d indexes", buffers.length, indexes.size());
+ Iterator<ByteBuffer> iter = Arrays.asList(buffers).iterator();
+ for (int i : indexes) {
+ ByteBuffer buf = iter.next();
+ if (buf == null) {
+ continue;
}
- } else {
- int remainingLength = toRead % chunkSize;
- // The first partial has the remaining length
- bufs[fullChunks].limit(remainingLength);
- // All others have a zero limit
- for (int i = fullChunks + 1; i < bufs.length; i++) {
- bufs[i].position(0);
- bufs[i].limit(0);
+ if (i == fullChunks) {
+ buf.limit(partialChunkSize);
+ } else if (fullChunks < i && i < data) {
+ buf.position(0);
+ buf.limit(0);
+ } else if (data <= i && fullChunks == 0) {
+ buf.limit(partialChunkSize);
}
}
}
@@ -411,41 +501,38 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
* Take the parity indexes which are already used, and the others which are
* available, and select random indexes to meet numRequired. The resulting
* list is sorted in ascending order of the indexes.
- * @param locations The list of locations for all blocks in the block group/
- * @param numRequired The number of parity chunks needed for reconstruction
- * @return A list of indexes indicating which parity locations to read.
*/
@SuppressWarnings("java:S2245") // no need for secure random
- private List<Integer> selectParityIndexes(
- DatanodeDetails[] locations, int numRequired) {
- List<Integer> indexes = new ArrayList<>();
- List<Integer> selected = new ArrayList<>();
- ECReplicationConfig repConfig = getRepConfig();
- for (int i = repConfig.getData(); i < repConfig.getRequiredNodes(); i++) {
- if (locations[i] != null && !failedDataIndexes.contains(i)
- && decoderInputBuffers[i] == null) {
- indexes.add(i);
- }
- // If we are re-initializing, we want to make sure we are re-using any
- // previously selected good parity indexes, as the block stream is already
- // opened.
- if (decoderInputBuffers[i] != null && !failedDataIndexes.contains(i)) {
+ private SortedSet<Integer> selectInternalInputs(
+ SortedSet<Integer> available, long count) {
+
+ LOG.debug("Selecting {} internal inputs from {}", count, available);
+
+ if (count <= 0) {
+ return emptySortedSet();
+ }
+
+ if (available.size() == count) {
+ return available;
+ }
+
+ SortedSet<Integer> selected = new TreeSet<>();
+ for (int i : available) {
+ if (decoderInputBuffers[i] != null) {
+ // If we are re-initializing, we want to make sure we are re-using any
+ // previously selected good parity indexes, as the block stream is
+ // already opened.
selected.add(i);
}
}
- Preconditions.assertTrue(indexes.size() + selected.size() >= numRequired);
- Random rand = new Random();
- while (selected.size() < numRequired) {
- selected.add(indexes.remove(rand.nextInt(indexes.size())));
- }
- Collections.sort(selected);
- return selected;
- }
+ List<Integer> candidates = new ArrayList<>(available);
+ candidates.removeAll(selected);
+ Collections.shuffle(candidates);
+ selected.addAll(candidates.stream()
+ .limit(count - selected.size())
+ .collect(toSet()));
- private ByteBuffer allocateBuffer(ECReplicationConfig repConfig) {
- ByteBuffer buf = byteBufferPool.getBuffer(
- false, repConfig.getEcChunkSize());
- return buf;
+ return selected;
}
private void flipInputs() {
@@ -456,9 +543,8 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
}
}
- private void clearParityBuffers() {
- for (int i = getRepConfig().getData();
- i < getRepConfig().getRequiredNodes(); i++) {
+ private void clearInternalBuffers() {
+ for (int i : internalBuffers) {
if (decoderInputBuffers[i] != null) {
decoderInputBuffers[i].clear();
}
@@ -469,9 +555,10 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
throws IOException, InterruptedException {
Queue<ImmutablePair<Integer, Future<Void>>> pendingReads
= new ArrayDeque<>();
- for (int i : dataIndexes) {
+ for (int i : selectedIndexes) {
+ ByteBuffer buf = decoderInputBuffers[i];
pendingReads.add(new ImmutablePair<>(i, executor.submit(() -> {
- readIntoBuffer(i, decoderInputBuffers[i]);
+ readIntoBuffer(i, buf);
return null;
})));
}
@@ -544,12 +631,24 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
* @throws IOException
*/
private void decodeStripe() throws IOException {
- decoder.decode(decoderInputBuffers, missingIndexes, decoderOutputBuffers);
+ int[] erasedIndexes = missingIndexes.stream()
+ .mapToInt(Integer::valueOf)
+ .toArray();
+ decoder.decode(decoderInputBuffers, erasedIndexes, decoderOutputBuffers);
flipInputs();
}
@Override
public synchronized boolean hasSufficientLocations() {
+ if (decoderInputBuffers == null) {
+ // The EC decoder needs an array data+parity long, with missing or not
+ // needed indexes set to null.
+ decoderInputBuffers = new ByteBuffer[getRepConfig().getRequiredNodes()];
+ }
+
+ markMissingLocationsAsFailed();
+ selectIndexes();
+
// The number of locations needed is a function of the EC Chunk size. If the
// block length is <= the chunk size, we should only have one data location.
// If it is greater than the chunk size but less than chunk_size * 2, then
@@ -559,22 +658,8 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
// be all zeros.
// Then we need a total of dataNum blocks available across the available
// data, parity and padding blocks.
- ECReplicationConfig repConfig = getRepConfig();
- int expectedDataBlocks = calculateExpectedDataBlocks(repConfig);
- int availableLocations =
- availableDataLocations(expectedDataBlocks) + availableParityLocations();
- int paddedLocations = repConfig.getData() - expectedDataBlocks;
- int failedLocations = failedDataIndexes.size();
-
- if (availableLocations + paddedLocations - failedLocations
- >= repConfig.getData()) {
- return true;
- } else {
- LOG.error("There are insufficient locations. {} available; {} padded;" +
- " {} failed; {} expected;", availableLocations, paddedLocations,
- failedLocations, expectedDataBlocks);
- return false;
- }
+
+ return selectedIndexes.size() >= dataIndexes.size();
}
@Override
@@ -596,18 +681,12 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
}
private void freeBuffers() {
- // Inside this class, we only allocate buffers to read parity into. Data
- // is reconstructed or read into a set of buffers passed in from the calling
- // class. Therefore we only need to ensure we free the parity buffers here.
+ // free any internal buffers
if (decoderInputBuffers != null) {
- for (int i = getRepConfig().getData();
- i < getRepConfig().getRequiredNodes(); i++) {
- ByteBuffer buf = decoderInputBuffers[i];
- if (buf != null) {
- byteBufferPool.putBuffer(buf);
- decoderInputBuffers[i] = null;
- }
+ for (int i : new ArrayList<>(internalBuffers)) {
+ releaseInternalBuffer(i);
}
+ internalBuffers.clear();
}
initialized = false;
}
@@ -629,4 +708,63 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
super.seek(pos);
}
+ /**
+ * Determine which indexes are missing, taking into account the length of the
+ * block. For a block shorter than a full EC stripe, it is expected that
+ * some data locations will not be present.
+ * Populates the missingIndexes and selectedIndexes instance variables.
+ */
+ private void selectIndexes() {
+ SortedSet<Integer> candidates;
+ int required;
+
+ missingIndexes.clear();
+ selectedIndexes.clear();
+
+ if (isOfflineRecovery()) {
+ if (!paddingIndexes.isEmpty()) {
+ paddingIndexes.forEach(i ->
+ Preconditions.assertTrue(!recoveryIndexes.contains(i)));
+ }
+
+ missingIndexes.addAll(recoveryIndexes);
+
+ // data locations available for reading
+ SortedSet<Integer> availableIndexes = new TreeSet<>();
+ availableIndexes.addAll(dataIndexes);
+ availableIndexes.addAll(parityIndexes);
+ availableIndexes.removeAll(failedDataIndexes);
+ availableIndexes.removeAll(recoveryIndexes);
+
+ // choose from all available
+ candidates = availableIndexes;
+ required = dataIndexes.size();
+ } else {
+ missingIndexes.addAll(failedDataIndexes);
+ missingIndexes.retainAll(dataIndexes);
+
+ SortedSet<Integer> dataAvailable = new TreeSet<>(dataIndexes);
+ dataAvailable.removeAll(failedDataIndexes);
+
+ SortedSet<Integer> parityAvailable = new TreeSet<>(parityIndexes);
+ parityAvailable.removeAll(failedDataIndexes);
+
+ selectedIndexes.addAll(dataAvailable);
+
+ // choose from parity
+ candidates = parityAvailable;
+ required = dataIndexes.size() - dataAvailable.size();
+ }
+
+ SortedSet<Integer> internal = selectInternalInputs(candidates, required);
+ selectedIndexes.addAll(internal);
+ }
+
+ private static SortedSet<Integer> setOfRange(
+ int startInclusive, int endExclusive) {
+
+ return range(startInclusive, endExclusive)
+ .boxed().collect(toCollection(TreeSet::new));
+ }
+
}
diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockReconstructedStripeInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockReconstructedStripeInputStream.java
index c461e019dd..4f12034b9b 100644
--- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockReconstructedStripeInputStream.java
+++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockReconstructedStripeInputStream.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.ozone.client.io;
+import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
@@ -25,14 +26,18 @@ import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.ozone.client.io.ECStreamTestUtil.TestBlockInputStreamFactory;
import org.apache.hadoop.ozone.client.io.ECStreamTestUtil.TestBlockInputStream;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -43,7 +48,11 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singleton;
+import static java.util.stream.Collectors.toSet;
import static org.apache.hadoop.ozone.client.io.ECStreamTestUtil.generateParity;
+import static org.junit.jupiter.api.Assertions.assertThrows;
/**
* Test for the ECBlockReconstructedStripeInputStream.
@@ -61,7 +70,21 @@ public class TestECBlockReconstructedStripeInputStream {
private ExecutorService ecReconstructExecutor =
Executors.newFixedThreadPool(3);
- @Before
+ static List<Set<Integer>> recoveryCases() { // TODO better name
+ List<Set<Integer>> params = new ArrayList<>();
+ params.add(emptySet()); // non-recovery
+ for (int i = 0; i < 5; i++) {
+ params.add(singleton(i));
+ }
+ params.add(ImmutableSet.of(0, 1)); // only data
+ params.add(ImmutableSet.of(1, 4)); // data and parity
+ params.add(ImmutableSet.of(3, 4)); // only parity
+ params.add(ImmutableSet.of(2, 3)); // data and parity
+ params.add(ImmutableSet.of(2, 4)); // data and parity
+ return params;
+ }
+
+ @BeforeEach
public void setup() {
repConfig = new ECReplicationConfig(3, 2,
ECReplicationConfig.EcCodec.RS, ONEMB);
@@ -71,7 +94,7 @@ public class TestECBlockReconstructedStripeInputStream {
dataGen = new SplittableRandom(randomSeed);
}
- @After
+ @AfterEach
public void teardown() {
ecReconstructExecutor.shutdownNow();
}
@@ -82,50 +105,72 @@ public class TestECBlockReconstructedStripeInputStream {
BlockLocationInfo keyInfo = ECStreamTestUtil
.createKeyInfo(repConfig, 1, ONEMB);
try (ECBlockInputStream ecb = createInputStream(keyInfo)) {
- Assert.assertTrue(ecb.hasSufficientLocations());
+ Assertions.assertTrue(ecb.hasSufficientLocations());
}
// Two Chunks, but missing data block 2.
Map<DatanodeDetails, Integer> dnMap
= ECStreamTestUtil.createIndexMap(1, 4, 5);
keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, ONEMB * 2, dnMap);
- try (ECBlockInputStream ecb = createInputStream(keyInfo)) {
- Assert.assertTrue(ecb.hasSufficientLocations());
+ try (ECBlockReconstructedStripeInputStream ecb =
+ createInputStream(keyInfo)) {
+ Assertions.assertTrue(ecb.hasSufficientLocations());
+ Collection<Integer> idxs = dnMap.values();
+ for (int i : idxs) {
+ ecb.setRecoveryIndexes(singleton(i - 1));
+ Assertions.assertTrue(ecb.hasSufficientLocations());
+ }
+
+ // trying to recover all
+ ecb.setRecoveryIndexes(toBufferIndexes(idxs));
+ Assertions.assertFalse(ecb.hasSufficientLocations());
}
// Three Chunks, but missing data block 2 and 3.
dnMap = ECStreamTestUtil.createIndexMap(1, 4, 5);
keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, ONEMB * 3, dnMap);
- try (ECBlockInputStream ecb = createInputStream(keyInfo)) {
- Assert.assertTrue(ecb.hasSufficientLocations());
+ try (ECBlockReconstructedStripeInputStream ecb =
+ createInputStream(keyInfo)) {
+ Assertions.assertTrue(ecb.hasSufficientLocations());
// Set a failed location
List<DatanodeDetails> failed = new ArrayList<>();
failed.add(keyInfo.getPipeline().getFirstNode());
- ((ECBlockReconstructedStripeInputStream)ecb).addFailedDatanodes(failed);
- Assert.assertFalse(ecb.hasSufficientLocations());
+ ecb.addFailedDatanodes(failed);
+ Assertions.assertFalse(ecb.hasSufficientLocations());
}
// Three Chunks, but missing data block 2 and 3 and parity 1.
dnMap = ECStreamTestUtil.createIndexMap(1, 4);
keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, ONEMB * 3, dnMap);
try (ECBlockInputStream ecb = createInputStream(keyInfo)) {
- Assert.assertFalse(ecb.hasSufficientLocations());
+ Assertions.assertFalse(ecb.hasSufficientLocations());
}
// Three Chunks, all available but fail 3
dnMap = ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, ONEMB * 3, dnMap);
- try (ECBlockInputStream ecb = createInputStream(keyInfo)) {
- Assert.assertTrue(ecb.hasSufficientLocations());
+ try (ECBlockReconstructedStripeInputStream ecb =
+ createInputStream(keyInfo)) {
+ Assertions.assertTrue(ecb.hasSufficientLocations());
// Set a failed location
List<DatanodeDetails> failed = new ArrayList<>();
- for (DatanodeDetails dn : dnMap.keySet()) {
- failed.add(dn);
- if (failed.size() == 3) {
- break;
- }
+ for (Map.Entry<DatanodeDetails, Integer> entry : dnMap.entrySet()) {
+ failed.add(entry.getKey());
+ boolean expected = failed.size() < 3;
+
+ ecb.addFailedDatanodes(singleton(entry.getKey()));
+ Assertions.assertEquals(expected, ecb.hasSufficientLocations());
+ }
+ }
+
+ try (ECBlockReconstructedStripeInputStream ecb =
+ createInputStream(keyInfo)) {
+ List<Integer> recover = new ArrayList<>();
+ for (int i : Arrays.asList(4, 3, 2, 1, 0)) {
+ recover.add(i);
+ ecb.setRecoveryIndexes(recover);
+ boolean expected = recover.size() < 3;
+ Assertions.assertEquals(expected, ecb.hasSufficientLocations());
}
- ((ECBlockReconstructedStripeInputStream)ecb).addFailedDatanodes(failed);
- Assert.assertFalse(ecb.hasSufficientLocations());
}
// One chunk, indexes 2 and 3 are padding, but still reported in the
@@ -134,12 +179,14 @@ public class TestECBlockReconstructedStripeInputStream {
dnMap = ECStreamTestUtil.createIndexMap(2, 3);
keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, ONEMB, dnMap);
try (ECBlockInputStream ecb = createInputStream(keyInfo)) {
- Assert.assertFalse(ecb.hasSufficientLocations());
+ Assertions.assertFalse(ecb.hasSufficientLocations());
}
}
- @Test
- public void testReadFullStripesWithPartial() throws IOException {
+ @ParameterizedTest
+ @MethodSource("recoveryCases")
+ void testReadFullStripesWithPartial(Set<Integer> recoveryIndexes)
+ throws IOException {
// Generate the input data for 3 full stripes and generate the parity.
int chunkSize = repConfig.getEcChunkSize();
int partialStripeSize = chunkSize * 2 - 1;
@@ -149,59 +196,65 @@ public class TestECBlockReconstructedStripeInputStream {
ByteBuffer[] parity = generateParity(dataBufs, repConfig);
- List<Map<DatanodeDetails, Integer>> locations = new ArrayList<>();
- // Two data missing
- locations.add(ECStreamTestUtil.createIndexMap(1, 4, 5));
- // One data missing
- locations.add(ECStreamTestUtil.createIndexMap(1, 2, 4, 5));
- // Two data missing including first
- locations.add(ECStreamTestUtil.createIndexMap(2, 4, 5));
- // One data and one parity missing
- locations.add(ECStreamTestUtil.createIndexMap(2, 3, 4));
- // No missing indexes
- locations.add(ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5));
+ Map<DatanodeDetails, Integer> dnMap =
+ ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
- for (Map<DatanodeDetails, Integer> dnMap : locations) {
- streamFactory = new TestBlockInputStreamFactory();
- addDataStreamsToFactory(dataBufs, parity);
+ streamFactory = new TestBlockInputStreamFactory();
+ addDataStreamsToFactory(dataBufs, parity);
- BlockLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
- stripeSize() * 3 + partialStripeSize, dnMap);
- streamFactory.setCurrentPipeline(keyInfo.getPipeline());
+ BlockLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
+ stripeSize() * 3 + partialStripeSize, dnMap);
+ streamFactory.setCurrentPipeline(keyInfo.getPipeline());
- ByteBuffer[] bufs = allocateByteBuffers(repConfig);
- dataGen = new SplittableRandom(randomSeed);
- try (ECBlockReconstructedStripeInputStream ecb =
- createInputStream(keyInfo)) {
- // Read 3 full stripes
- for (int i = 0; i < 3; i++) {
- int read = ecb.readStripe(bufs);
- for (int j = 0; j < bufs.length; j++) {
- ECStreamTestUtil.assertBufferMatches(bufs[j], dataGen);
- }
- Assert.assertEquals(stripeSize(), read);
+ List<Integer> outputIndexes = getOutputIndexes(recoveryIndexes);
- // Check the underlying streams have read 1 chunk per read:
- for (TestBlockInputStream bis : streamFactory.getBlockStreams()) {
- Assert.assertEquals(chunkSize * (i + 1),
- bis.getPos());
+ ByteBuffer[] bufs = allocateByteBuffers(
+ outputIndexes.size(), repConfig.getEcChunkSize());
+
+ dataGen = new SplittableRandom(randomSeed);
+ try (ECBlockReconstructedStripeInputStream ecb =
+ createInputStream(keyInfo)) {
+
+ ecb.setRecoveryIndexes(recoveryIndexes);
+
+ // Read 3 full stripes
+ for (int i = 0; i < 3; i++) {
+ int read = ecb.read(bufs);
+ Assertions.assertEquals(stripeSize(), read);
+
+ int output = 0;
+ for (int j = 0; j < repConfig.getRequiredNodes(); j++) {
+ if (outputIndexes.contains(j)) {
+ ECStreamTestUtil.assertBufferMatches(bufs[output++], dataGen);
}
- Assert.assertEquals(stripeSize() * (i + 1), ecb.getPos());
- clearBuffers(bufs);
}
- // The next read is a partial stripe
- int read = ecb.readStripe(bufs);
- Assert.assertEquals(partialStripeSize, read);
- ECStreamTestUtil.assertBufferMatches(bufs[0], dataGen);
- ECStreamTestUtil.assertBufferMatches(bufs[1], dataGen);
- Assert.assertEquals(0, bufs[2].remaining());
- Assert.assertEquals(0, bufs[2].position());
- // A further read should give EOF
+ // Check the underlying streams have read 1 chunk per read:
+ for (TestBlockInputStream bis : streamFactory.getBlockStreams()) {
+ Assertions.assertEquals(chunkSize * (i + 1),
+ bis.getPos());
+ }
+ Assertions.assertEquals(stripeSize() * (i + 1), ecb.getPos());
clearBuffers(bufs);
- read = ecb.readStripe(bufs);
- Assert.assertEquals(-1, read);
}
+ // The next read is a partial stripe
+ int read = ecb.read(bufs);
+ Assertions.assertEquals(partialStripeSize, read);
+ int output = 0;
+ for (int j = 0; j < 2; j++) {
+ if (outputIndexes.contains(j)) {
+ ECStreamTestUtil.assertBufferMatches(bufs[output++], dataGen);
+ }
+ }
+ if (outputIndexes.contains(2)) {
+ Assertions.assertEquals(0, bufs[output].remaining());
+ Assertions.assertEquals(0, bufs[output].position());
+ }
+
+ // A further read should give EOF
+ clearBuffers(bufs);
+ read = ecb.read(bufs);
+ Assertions.assertEquals(-1, read);
}
}
@@ -226,22 +279,62 @@ public class TestECBlockReconstructedStripeInputStream {
dataGen = new SplittableRandom(randomSeed);
try (ECBlockReconstructedStripeInputStream ecb =
createInputStream(keyInfo)) {
- int read = ecb.readStripe(bufs);
- Assert.assertEquals(blockLength, read);
+ int read = ecb.read(bufs);
+ Assertions.assertEquals(blockLength, read);
+ ECStreamTestUtil.assertBufferMatches(bufs[0], dataGen);
+ Assertions.assertEquals(0, bufs[1].remaining());
+ Assertions.assertEquals(0, bufs[1].position());
+ Assertions.assertEquals(0, bufs[2].remaining());
+ Assertions.assertEquals(0, bufs[2].position());
+ // Check the underlying streams have been advanced by 1 blockLength:
+ for (TestBlockInputStream bis : streamFactory.getBlockStreams()) {
+ Assertions.assertEquals(blockLength, bis.getPos());
+ }
+ Assertions.assertEquals(ecb.getPos(), blockLength);
+ clearBuffers(bufs);
+ // A further read should give EOF
+ read = ecb.read(bufs);
+ Assertions.assertEquals(-1, read);
+ }
+ }
+
+ @Test
+ void recoverPartialStripe() throws IOException {
+ int ecChunkSize = repConfig.getEcChunkSize();
+ int blockLength = ecChunkSize - 1;
+ ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(), 3 * ONEMB);
+ ECStreamTestUtil
+ .randomFill(dataBufs, ecChunkSize, dataGen, blockLength);
+ ByteBuffer[] parity = generateParity(dataBufs, repConfig);
+ addDataStreamsToFactory(dataBufs, parity);
+
+ // We have a length that is less than a single chunk, so blocks 2 and 3 are
+ // padding and will not be present. Parity blocks are lost and need to be
+ // recovered from block 1 and padded blocks 2 and 3.
+ Map<DatanodeDetails, Integer> dnMap =
+ ECStreamTestUtil.createIndexMap(1, 4, 5);
+ ByteBuffer[] bufs = allocateByteBuffers(2, ecChunkSize);
+ BlockLocationInfo keyInfo =
+ ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
+ streamFactory.setCurrentPipeline(keyInfo.getPipeline());
+ dataGen = new SplittableRandom(randomSeed);
+ try (ECBlockReconstructedStripeInputStream ecb =
+ createInputStream(keyInfo)) {
+ ecb.setRecoveryIndexes(Arrays.asList(3, 4));
+
+ int read = ecb.read(bufs);
+ Assertions.assertEquals(blockLength, read);
ECStreamTestUtil.assertBufferMatches(bufs[0], dataGen);
- Assert.assertEquals(0, bufs[1].remaining());
- Assert.assertEquals(0, bufs[1].position());
- Assert.assertEquals(0, bufs[2].remaining());
- Assert.assertEquals(0, bufs[2].position());
+ ECStreamTestUtil.assertBufferMatches(bufs[1], dataGen);
// Check the underlying streams have been advanced by 1 blockLength:
for (TestBlockInputStream bis : streamFactory.getBlockStreams()) {
- Assert.assertEquals(blockLength, bis.getPos());
+ Assertions.assertEquals(blockLength, bis.getPos());
}
- Assert.assertEquals(ecb.getPos(), blockLength);
+ Assertions.assertEquals(ecb.getPos(), blockLength);
clearBuffers(bufs);
// A further read should give EOF
- read = ecb.readStripe(bufs);
- Assert.assertEquals(-1, read);
+ read = ecb.read(bufs);
+ Assertions.assertEquals(-1, read);
}
}
@@ -268,21 +361,21 @@ public class TestECBlockReconstructedStripeInputStream {
dataGen = new SplittableRandom(randomSeed);
try (ECBlockReconstructedStripeInputStream ecb =
createInputStream(keyInfo)) {
- int read = ecb.readStripe(bufs);
- Assert.assertEquals(blockLength, read);
+ int read = ecb.read(bufs);
+ Assertions.assertEquals(blockLength, read);
ECStreamTestUtil.assertBufferMatches(bufs[0], dataGen);
ECStreamTestUtil.assertBufferMatches(bufs[1], dataGen);
- Assert.assertEquals(0, bufs[2].remaining());
- Assert.assertEquals(0, bufs[2].position());
+ Assertions.assertEquals(0, bufs[2].remaining());
+ Assertions.assertEquals(0, bufs[2].position());
// Check the underlying streams have been advanced by 1 chunk:
for (TestBlockInputStream bis : streamFactory.getBlockStreams()) {
- Assert.assertEquals(chunkSize, bis.getPos());
+ Assertions.assertEquals(chunkSize, bis.getPos());
}
- Assert.assertEquals(ecb.getPos(), blockLength);
+ Assertions.assertEquals(ecb.getPos(), blockLength);
clearBuffers(bufs);
// A further read should give EOF
- read = ecb.readStripe(bufs);
- Assert.assertEquals(-1, read);
+ read = ecb.read(bufs);
+ Assertions.assertEquals(-1, read);
}
}
@@ -325,20 +418,20 @@ public class TestECBlockReconstructedStripeInputStream {
dataGen = new SplittableRandom(randomSeed);
try (ECBlockReconstructedStripeInputStream ecb =
createInputStream(keyInfo)) {
- int read = ecb.readStripe(bufs);
- Assert.assertEquals(blockLength, read);
+ int read = ecb.read(bufs);
+ Assertions.assertEquals(blockLength, read);
ECStreamTestUtil.assertBufferMatches(bufs[0], dataGen);
ECStreamTestUtil.assertBufferMatches(bufs[1], dataGen);
ECStreamTestUtil.assertBufferMatches(bufs[2], dataGen);
// Check the underlying streams have been advanced by 1 chunk:
for (TestBlockInputStream bis : streamFactory.getBlockStreams()) {
- Assert.assertEquals(0, bis.getRemaining());
+ Assertions.assertEquals(0, bis.getRemaining());
}
- Assert.assertEquals(ecb.getPos(), blockLength);
+ Assertions.assertEquals(ecb.getPos(), blockLength);
clearBuffers(bufs);
// A further read should give EOF
- read = ecb.readStripe(bufs);
- Assert.assertEquals(-1, read);
+ read = ecb.read(bufs);
+ Assertions.assertEquals(-1, read);
}
}
}
@@ -368,8 +461,8 @@ public class TestECBlockReconstructedStripeInputStream {
try (ECBlockReconstructedStripeInputStream ecb =
createInputStream(keyInfo)) {
try {
- ecb.readStripe(bufs);
- Assert.fail("Read should have thrown an exception");
+ ecb.read(bufs);
+ Assertions.fail("Read should have thrown an exception");
} catch (InsufficientLocationsException e) {
// expected
}
@@ -413,43 +506,43 @@ public class TestECBlockReconstructedStripeInputStream {
createInputStream(keyInfo)) {
// Read Stripe 1
- int read = ecb.readStripe(bufs);
+ int read = ecb.read(bufs);
for (int j = 0; j < bufs.length; j++) {
validateContents(dataBufs[j], bufs[j], 0, chunkSize);
}
- Assert.assertEquals(stripeSize(), read);
- Assert.assertEquals(dataLength - stripeSize(), ecb.getRemaining());
+ Assertions.assertEquals(stripeSize(), read);
+ Assertions.assertEquals(dataLength - stripeSize(), ecb.getRemaining());
// Seek to 0 and read again
clearBuffers(bufs);
ecb.seek(0);
- ecb.readStripe(bufs);
+ ecb.read(bufs);
for (int j = 0; j < bufs.length; j++) {
validateContents(dataBufs[j], bufs[j], 0, chunkSize);
}
- Assert.assertEquals(stripeSize(), read);
- Assert.assertEquals(dataLength - stripeSize(), ecb.getRemaining());
+ Assertions.assertEquals(stripeSize(), read);
+ Assertions.assertEquals(dataLength - stripeSize(), ecb.getRemaining());
// Seek to the last stripe
// Seek to the last stripe
clearBuffers(bufs);
ecb.seek(stripeSize() * 3);
- read = ecb.readStripe(bufs);
+ read = ecb.read(bufs);
validateContents(dataBufs[0], bufs[0], 3 * chunkSize, chunkSize);
validateContents(dataBufs[1], bufs[1], 3 * chunkSize, chunkSize - 1);
- Assert.assertEquals(0, bufs[2].remaining());
- Assert.assertEquals(partialStripeSize, read);
- Assert.assertEquals(0, ecb.getRemaining());
+ Assertions.assertEquals(0, bufs[2].remaining());
+ Assertions.assertEquals(partialStripeSize, read);
+ Assertions.assertEquals(0, ecb.getRemaining());
// seek to the start of stripe 3
clearBuffers(bufs);
ecb.seek(stripeSize() * (long)2);
- read = ecb.readStripe(bufs);
+ read = ecb.read(bufs);
for (int j = 0; j < bufs.length; j++) {
validateContents(dataBufs[j], bufs[j], 2 * chunkSize, chunkSize);
}
- Assert.assertEquals(stripeSize(), read);
- Assert.assertEquals(partialStripeSize, ecb.getRemaining());
+ Assertions.assertEquals(stripeSize(), read);
+ Assertions.assertEquals(partialStripeSize, ecb.getRemaining());
}
}
}
@@ -466,9 +559,9 @@ public class TestECBlockReconstructedStripeInputStream {
createInputStream(keyInfo)) {
try {
ecb.seek(10);
- Assert.fail("Seek should have thrown an exception");
+ Assertions.fail("Seek should have thrown an exception");
} catch (IOException e) {
- Assert.assertEquals("Requested position 10 does not align " +
+ Assertions.assertEquals("Requested position 10 does not align " +
"with a stripe offset", e.getMessage());
}
}
@@ -509,12 +602,12 @@ public class TestECBlockReconstructedStripeInputStream {
createInputStream(keyInfo)) {
// After reading the first stripe, make one of the streams error
for (int i = 0; i < 3; i++) {
- int read = ecb.readStripe(bufs);
+ int read = ecb.read(bufs);
for (int j = 0; j < bufs.length; j++) {
validateContents(dataBufs[j], bufs[j], i * chunkSize, chunkSize);
}
- Assert.assertEquals(stripeSize() * (i + 1), ecb.getPos());
- Assert.assertEquals(stripeSize(), read);
+ Assertions.assertEquals(stripeSize() * (i + 1), ecb.getPos());
+ Assertions.assertEquals(stripeSize(), read);
clearBuffers(bufs);
if (i == 0) {
Integer failStream =
@@ -525,17 +618,17 @@ public class TestECBlockReconstructedStripeInputStream {
}
}
// The next read is a partial stripe
- int read = ecb.readStripe(bufs);
- Assert.assertEquals(partialStripeSize, read);
+ int read = ecb.read(bufs);
+ Assertions.assertEquals(partialStripeSize, read);
validateContents(dataBufs[0], bufs[0], 3 * chunkSize, chunkSize);
validateContents(dataBufs[1], bufs[1], 3 * chunkSize, chunkSize - 1);
- Assert.assertEquals(0, bufs[2].remaining());
- Assert.assertEquals(0, bufs[2].position());
+ Assertions.assertEquals(0, bufs[2].remaining());
+ Assertions.assertEquals(0, bufs[2].position());
// seek back to zero and read a stripe to re-open the streams
ecb.seek(0);
clearBuffers(bufs);
- ecb.readStripe(bufs);
+ ecb.read(bufs);
// Now fail another random stream and the read should fail with
// insufficient locations
Set<Integer> currentStreams =
@@ -546,8 +639,8 @@ public class TestECBlockReconstructedStripeInputStream {
.setShouldError(true);
try {
clearBuffers(bufs);
- ecb.readStripe(bufs);
- Assert.fail("InsufficientLocationsException expected");
+ ecb.read(bufs);
+ Assertions.fail("InsufficientLocationsException expected");
} catch (InsufficientLocationsException e) {
// expected
}
@@ -555,7 +648,7 @@ public class TestECBlockReconstructedStripeInputStream {
}
}
- @Test(expected = InsufficientLocationsException.class)
+ @Test
public void testAllLocationsFailOnFirstRead() throws IOException {
// This test simulates stale nodes. When the nodes are stale, but not yet
// dead, the locations will still be given to the client and it will try to
@@ -587,7 +680,8 @@ public class TestECBlockReconstructedStripeInputStream {
ByteBuffer[] bufs = allocateByteBuffers(repConfig);
try (ECBlockReconstructedStripeInputStream ecb =
createInputStream(keyInfo)) {
- ecb.readStripe(bufs);
+ assertThrows(InsufficientLocationsException.class,
+ () -> ecb.read(bufs));
}
}
@@ -624,17 +718,17 @@ public class TestECBlockReconstructedStripeInputStream {
ecb.addFailedDatanodes(failed);
// Read full stripe
- int read = ecb.readStripe(bufs);
+ int read = ecb.read(bufs);
for (int j = 0; j < bufs.length; j++) {
ECStreamTestUtil.assertBufferMatches(bufs[j], dataGen);
}
- Assert.assertEquals(stripeSize(), read);
+ Assertions.assertEquals(stripeSize(), read);
// Now ensure that streams with repIndexes 1, 2 and 3 have not been
// created in the stream factory, indicating we did not read them.
List<TestBlockInputStream> streams = streamFactory.getBlockStreams();
for (TestBlockInputStream stream : streams) {
- Assert.assertTrue(stream.getEcReplicaIndex() > 2);
+ Assertions.assertTrue(stream.getEcReplicaIndex() > 2);
}
}
}
@@ -675,9 +769,9 @@ public class TestECBlockReconstructedStripeInputStream {
private void validateContents(ByteBuffer src, ByteBuffer data, int offset,
int count) {
byte[] srcArray = src.array();
- Assert.assertEquals(count, data.remaining());
+ Assertions.assertEquals(count, data.remaining());
for (int i = offset; i < offset + count; i++) {
- Assert.assertEquals("Element " + i, srcArray[i], data.get());
+ Assertions.assertEquals(srcArray[i], data.get(), "Element " + i);
}
data.flip();
}
@@ -711,11 +805,29 @@ public class TestECBlockReconstructedStripeInputStream {
}
private ByteBuffer[] allocateByteBuffers(ECReplicationConfig rConfig) {
- ByteBuffer[] bufs = new ByteBuffer[repConfig.getData()];
+ return allocateByteBuffers(rConfig.getData(), rConfig.getEcChunkSize());
+ }
+
+ private ByteBuffer[] allocateByteBuffers(int count, int capacity) {
+ ByteBuffer[] bufs = new ByteBuffer[count];
for (int i = 0; i < bufs.length; i++) {
- bufs[i] = ByteBuffer.allocate(rConfig.getEcChunkSize());
+ bufs[i] = ByteBuffer.allocate(capacity);
}
return bufs;
}
+ private List<Integer> getOutputIndexes(Set<Integer> recoveryIndexes) {
+ return recoveryIndexes.isEmpty()
+ ? Arrays.asList(0, 1, 2)
+ : new ArrayList<>(recoveryIndexes);
+ }
+
+ private static Set<Integer> toBufferIndexes(Collection<Integer> dnIdxs) {
+ return dnIdxs.stream()
+ .mapToInt(Integer::intValue)
+ .map(i -> i - 1)
+ .boxed()
+ .collect(toSet());
+ }
+
}
diff --git a/hadoop-hdds/erasurecode/src/main/java/org/apache/ozone/erasurecode/rawcoder/ByteBufferDecodingState.java b/hadoop-hdds/erasurecode/src/main/java/org/apache/ozone/erasurecode/rawcoder/ByteBufferDecodingState.java
index 7b9ebf6ad2..3d6de14a47 100644
--- a/hadoop-hdds/erasurecode/src/main/java/org/apache/ozone/erasurecode/rawcoder/ByteBufferDecodingState.java
+++ b/hadoop-hdds/erasurecode/src/main/java/org/apache/ozone/erasurecode/rawcoder/ByteBufferDecodingState.java
@@ -95,26 +95,29 @@ class ByteBufferDecodingState extends DecodingState {
void checkInputBuffers(ByteBuffer[] buffers) {
int validInputs = 0;
- for (ByteBuffer buffer : buffers) {
+ for (int i = 0; i < buffers.length; i++) {
+ ByteBuffer buffer = buffers[i];
+
if (buffer == null) {
continue;
}
if (buffer.remaining() != decodeLength) {
- throw new IllegalArgumentException(
- "Invalid buffer, not of length " + decodeLength);
+ throw new IllegalArgumentException("Invalid buffer [" + i +
+ "], not of length " + decodeLength);
}
if (buffer.isDirect() != usingDirectBuffer) {
- throw new IllegalArgumentException(
- "Invalid buffer, isDirect should be " + usingDirectBuffer);
+ throw new IllegalArgumentException("Invalid buffer [" + i +
+ "], isDirect should be " + usingDirectBuffer);
}
validInputs++;
}
if (validInputs < decoder.getNumDataUnits()) {
- throw new IllegalArgumentException(
- "No enough valid inputs are provided, not recoverable");
+ throw new IllegalArgumentException("No enough valid inputs are provided ("
+ + validInputs + " vs. " + decoder.getNumDataUnits()
+ + "), not recoverable");
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org