You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ad...@apache.org on 2022/06/07 19:35:44 UTC

[ozone] branch master updated: HDDS-6665. EC: Extend BlockReconstructedInputStreams to recover parity block buffers as well if missing (#3457)

This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 039e8a8f5a HDDS-6665. EC: Extend BlockReconstructedInputStreams to recover parity block buffers as well if missing (#3457)
039e8a8f5a is described below

commit 039e8a8f5a4c49a6ebed31839862ce1975a28036
Author: Doroszlai, Attila <64...@users.noreply.github.com>
AuthorDate: Tue Jun 7 21:35:38 2022 +0200

    HDDS-6665. EC: Extend BlockReconstructedInputStreams to recover parity block buffers as well if missing (#3457)
---
 hadoop-hdds/client/pom.xml                         |   4 +
 .../io/ECBlockReconstructedStripeInputStream.java  | 460 +++++++++++++--------
 .../TestECBlockReconstructedStripeInputStream.java | 372 +++++++++++------
 .../rawcoder/ByteBufferDecodingState.java          |  17 +-
 4 files changed, 555 insertions(+), 298 deletions(-)

diff --git a/hadoop-hdds/client/pom.xml b/hadoop-hdds/client/pom.xml
index 9f2116c96f..5770f72970 100644
--- a/hadoop-hdds/client/pom.xml
+++ b/hadoop-hdds/client/pom.xml
@@ -70,6 +70,10 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
       <version>${spotbugs.version}</version>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>org.junit.jupiter</groupId>
+      <artifactId>junit-jupiter-params</artifactId>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
index 197eba7922..6a2eb9af26 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.ozone.client.io;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.hadoop.hdds.client.BlockID;
@@ -39,22 +40,31 @@ import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Queue;
-import java.util.Random;
 import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.function.Function;
 
+import static java.util.Collections.emptySortedSet;
+import static java.util.stream.Collectors.toCollection;
+import static java.util.stream.Collectors.toSet;
+import static java.util.stream.IntStream.range;
+
 /**
  * Class to read EC encoded data from blocks a stripe at a time, when some of
  * the data blocks are not available. The public API for this class is:
  *
  *     readStripe(ByteBuffer[] bufs)
+ *     recoverChunks(ByteBuffer[] bufs)
  *
  * The other inherited public APIs will throw a NotImplementedException. This is
  * because this class is intended to only read full stripes into a reusable set
@@ -88,6 +98,18 @@ import java.util.function.Function;
  * than a full stripe, the client can simply read upto remaining from each
  * buffer in turn. If there is a full stripe, each buffer should have ecChunk
  * size remaining.
+ *
+ * recoverChunks() handles the more generic case, where specific buffers, even
+ * parity ones, are to be recovered -- these should be passed to the method.
+ * The whole stripe is not important for the caller in this case.  The indexes
+ * of the buffers that need to be recovered should be set by calling
+ * setRecoveryIndexes() before any read operation.
+ *
+ * Example: with rs-3-2 there are 3 data and 2 parity replicas, indexes are [0,
+ * 1, 2, 3, 4].  If replicas 2 and 3 are missing and need to be recovered,
+ * caller should {@code setRecoveryIndexes([2, 3])}, and then can recover the
+ * part of each stripe for these replicas by calling
+ * {@code recoverChunks(bufs)}, passing two buffers.
  */
 public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
 
@@ -99,20 +121,34 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
   private ByteBuffer[] decoderInputBuffers;
   // Missing chunks are recovered into these buffers.
   private ByteBuffer[] decoderOutputBuffers;
+
   // Missing indexes to be recovered into the recovered buffers. Required by the
   // EC decoder
-  private int[] missingIndexes;
-  // The blockLocation indexes to use to read data into the dataBuffers.
-  private List<Integer> dataIndexes = new ArrayList<>();
+  private final SortedSet<Integer> missingIndexes = new TreeSet<>();
+
+  // indexes for data, padding and parity blocks
+  private final SortedSet<Integer> dataIndexes;
+  private final SortedSet<Integer> paddingIndexes;
+  private final SortedSet<Integer> parityIndexes;
+  private final SortedSet<Integer> allIndexes;
+
+  // The blockLocation indexes to use to read data into the output buffers
+  private final SortedSet<Integer> selectedIndexes = new TreeSet<>();
+  // indexes of internal buffers (ones which are not requested by the caller,
+  // but needed for reconstructing missing data)
+  private final SortedSet<Integer> internalBuffers = new TreeSet<>();
   // Data Indexes we have tried to read from, and failed for some reason
-  private Set<Integer> failedDataIndexes = new HashSet<>();
-  private ByteBufferPool byteBufferPool;
+  private final Set<Integer> failedDataIndexes = new HashSet<>();
+  private final ByteBufferPool byteBufferPool;
 
   private RawErasureDecoder decoder;
 
   private boolean initialized = false;
 
-  private ExecutorService executor;
+  private final ExecutorService executor;
+
+  // for offline recovery: indexes to be recovered
+  private final Set<Integer> recoveryIndexes = new TreeSet<>();
 
   @SuppressWarnings("checkstyle:ParameterNumber")
   public ECBlockReconstructedStripeInputStream(ECReplicationConfig repConfig,
@@ -125,6 +161,13 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
         refreshFunction, streamFactory);
     this.byteBufferPool = byteBufferPool;
     this.executor = ecReconstructExecutor;
+
+    int expectedDataBlocks = calculateExpectedDataBlocks(repConfig);
+    int d = repConfig.getData();
+    dataIndexes = setOfRange(0, expectedDataBlocks);
+    paddingIndexes = setOfRange(expectedDataBlocks, d);
+    parityIndexes = setOfRange(d, repConfig.getRequiredNodes());
+    allIndexes = setOfRange(0, repConfig.getRequiredNodes());
   }
 
   /**
@@ -138,7 +181,7 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
    *
    * @param dns A list of DatanodeDetails that are known to be bad.
    */
-  public synchronized void addFailedDatanodes(List<DatanodeDetails> dns) {
+  public synchronized void addFailedDatanodes(Collection<DatanodeDetails> dns) {
     if (initialized) {
       throw new RuntimeException("Cannot add failed datanodes after the " +
           "reader has been initialized");
@@ -154,92 +197,133 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
     }
   }
 
+  /**
+   * Set the EC indexes that should be recovered by
+   * {@link #recoverChunks(ByteBuffer[])}.
+   */
+  public synchronized void setRecoveryIndexes(Collection<Integer> indexes) {
+    if (initialized) {
+      throw new IllegalStateException("Cannot set recovery indexes after the " +
+          "reader has been initialized");
+    }
+    Preconditions.assertNotNull(indexes, "recovery indexes");
+    recoveryIndexes.clear();
+    recoveryIndexes.addAll(indexes);
+  }
+
   private void init() throws InsufficientLocationsException {
     if (decoder == null) {
       decoder = CodecUtil.createRawDecoderWithFallback(getRepConfig());
     }
-    if (decoderInputBuffers == null) {
-      // The EC decoder needs an array data+parity long, with missing or not
-      // needed indexes set to null.
-      decoderInputBuffers = new ByteBuffer[getRepConfig().getRequiredNodes()];
-    }
     if (!hasSufficientLocations()) {
       throw new InsufficientLocationsException("There are insufficient " +
           "datanodes to read the EC block");
     }
-    dataIndexes.clear();
-    ECReplicationConfig repConfig = getRepConfig();
-    DatanodeDetails[] locations = getDataLocations();
-    setMissingIndexesAndDataLocations(locations);
-    List<Integer> parityIndexes =
-        selectParityIndexes(locations, missingIndexes.length);
-    // We read from the selected parity blocks, so add them to the data indexes.
-    dataIndexes.addAll(parityIndexes);
+    allocateInternalBuffers();
+    if (!isOfflineRecovery()) {
+      decoderOutputBuffers = new ByteBuffer[missingIndexes.size()];
+    }
+    initialized = true;
+  }
+
+  private void allocateInternalBuffers() {
     // The decoder inputs originally start as all nulls. Then we populate the
-    // pieces we have data for. The parity buffers are reused for the block
-    // so we can allocated them now. On re-init, we reuse any parity buffers
+    // pieces we have data for. The internal buffers are reused for the block,
+    // so we can allocate them now. On re-init, we reuse any internal buffers
     // already allocated.
-    for (int i = repConfig.getData(); i < repConfig.getRequiredNodes(); i++) {
-      if (parityIndexes.contains(i)) {
-        if (decoderInputBuffers[i] == null) {
-          decoderInputBuffers[i] = allocateBuffer(repConfig);
-        }
-      } else {
-        decoderInputBuffers[i] = null;
+    final int minIndex = isOfflineRecovery() ? 0 : getRepConfig().getData();
+    for (int i = minIndex; i < getRepConfig().getRequiredNodes(); i++) {
+      boolean internalInput = selectedIndexes.contains(i)
+          || paddingIndexes.contains(i);
+      boolean hasBuffer = decoderInputBuffers[i] != null;
+
+      if (internalInput && !hasBuffer) {
+        allocateInternalBuffer(i);
+      } else if (!internalInput && hasBuffer) {
+        releaseInternalBuffer(i);
       }
     }
-    decoderOutputBuffers = new ByteBuffer[missingIndexes.length];
-    initialized = true;
   }
 
-  /**
-   * Determine which indexes are missing, taking into account the length of the
-   * block. For a block shorter than a full EC stripe, it is expected that
-   * some of the data locations will not be present.
-   * Populates the missingIndex and dataIndexes instance variables.
-   * @param locations Available locations for the block group
-   */
-  private void setMissingIndexesAndDataLocations(DatanodeDetails[] locations) {
-    ECReplicationConfig repConfig = getRepConfig();
-    int expectedDataBlocks = calculateExpectedDataBlocks(repConfig);
-    List<Integer> missingInd = new ArrayList<>();
-    for (int i = 0; i < repConfig.getData(); i++) {
-      if ((locations[i] == null || failedDataIndexes.contains(i))
-          && i < expectedDataBlocks) {
-        missingInd.add(i);
-      } else if (locations[i] != null && !failedDataIndexes.contains(i)) {
-        dataIndexes.add(i);
+  private void allocateInternalBuffer(int index) {
+    Preconditions.assertTrue(internalBuffers.add(index),
+        () -> "Buffer " + index + " already tracked as internal input");
+    decoderInputBuffers[index] =
+        byteBufferPool.getBuffer(false, getRepConfig().getEcChunkSize());
+  }
+
+  private void releaseInternalBuffer(int index) {
+    Preconditions.assertTrue(internalBuffers.remove(index),
+        () -> "Buffer " + index + " not tracked as internal input");
+    byteBufferPool.putBuffer(decoderInputBuffers[index]);
+    decoderInputBuffers[index] = null;
+  }
+
+  private void markMissingLocationsAsFailed() {
+    DatanodeDetails[] locations = getDataLocations();
+    for (int i = 0; i < locations.length; i++) {
+      if (locations[i] == null && failedDataIndexes.add(i)) {
+        LOG.debug("Marked index={} as failed", i);
       }
     }
-    missingIndexes = missingInd.stream().mapToInt(Integer::valueOf).toArray();
+  }
+
+  private boolean isOfflineRecovery() {
+    return !recoveryIndexes.isEmpty();
   }
 
   private void assignBuffers(ByteBuffer[] bufs) {
-    ECReplicationConfig repConfig = getRepConfig();
-    Preconditions.assertTrue(bufs.length == repConfig.getData());
-    int recoveryIndex = 0;
-    // Here bufs come from the caller and will be filled with data read from
-    // the blocks or recovered. Therefore, if the index is missing, we assign
-    // the buffer to the decoder outputs, where data is recovered via EC
-    // decoding. Otherwise the buffer is set to the input. Note, it may be a
-    // buffer which needs padded.
-    for (int i = 0; i < repConfig.getData(); i++) {
-      if (isMissingIndex(i)) {
-        decoderOutputBuffers[recoveryIndex++] = bufs[i];
-        decoderInputBuffers[i] = null;
-      } else {
-        decoderInputBuffers[i] = bufs[i];
+    Preconditions.assertTrue(bufs.length == getExpectedBufferCount());
+
+    if (isOfflineRecovery()) {
+      decoderOutputBuffers = bufs;
+    } else {
+      int recoveryIndex = 0;
+      // Here bufs come from the caller and will be filled with data read from
+      // the blocks or recovered. Therefore, if the index is missing, we assign
+      // the buffer to the decoder outputs, where data is recovered via EC
+      // decoding. Otherwise the buffer is set to the input. Note, it may be a
+      // buffer which needs padded.
+      for (int i = 0; i < bufs.length; i++) {
+        if (isMissingIndex(i)) {
+          decoderOutputBuffers[recoveryIndex++] = bufs[i];
+          if (internalBuffers.contains(i)) {
+            releaseInternalBuffer(i);
+          } else {
+            decoderInputBuffers[i] = null;
+          }
+        } else {
+          decoderInputBuffers[i] = bufs[i];
+        }
       }
     }
   }
 
+  private int getExpectedBufferCount() {
+    return isOfflineRecovery()
+        ? recoveryIndexes.size()
+        : getRepConfig().getData();
+  }
+
   private boolean isMissingIndex(int ind) {
-    for (int i : missingIndexes) {
-      if (i == ind) {
-        return true;
-      }
-    }
-    return false;
+    return missingIndexes.contains(ind);
+  }
+
+  /**
+   * This method should be passed a list of byteBuffers which must contain the
+   * same number of entries as previously set recovery indexes. Each Bytebuffer
+   * should be at position 0 and have EC ChunkSize bytes remaining.
+   * After returning, the buffers will contain the data for the parts to be
+   * recovered in the block's next stripe. The buffers will be returned
+   * "ready to read" with their position set to zero and the limit set
+   * according to how much data they contain.
+   *
+   * @param bufs A list of byteBuffers into which recovered data is written
+   * @return The number of bytes read
+   */
+  public synchronized int recoverChunks(ByteBuffer[] bufs) throws IOException {
+    Preconditions.assertTrue(isOfflineRecovery());
+    return read(bufs);
   }
 
   /**
@@ -258,6 +342,11 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
    * @throws IOException
    */
   public synchronized int readStripe(ByteBuffer[] bufs) throws IOException {
+    return read(bufs);
+  }
+
+  @VisibleForTesting
+  synchronized int read(ByteBuffer[] bufs) throws IOException {
     int toRead = (int)Math.min(getRemaining(), getStripeSize());
     if (toRead == 0) {
       return EOF;
@@ -269,10 +358,10 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
     while (true) {
       try {
         assignBuffers(bufs);
-        clearParityBuffers();
+        clearInternalBuffers();
         // Set the read limits on the buffers so we do not read any garbage data
         // from the end of the block that is unexpected.
-        setBufferReadLimits(bufs, toRead);
+        setBufferReadLimits(toRead);
         loadDataBuffersFromStream();
         break;
       } catch (IOException e) {
@@ -291,13 +380,13 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
         throw new IOException("Interrupted waiting for reads to complete", ie);
       }
     }
-    if (missingIndexes.length > 0) {
+    if (!missingIndexes.isEmpty()) {
       padBuffers(toRead);
       flipInputs();
       decodeStripe();
       // Reset the buffer positions and limits to remove any padding added
       // before EC Decode.
-      setBufferReadLimits(bufs, toRead);
+      setBufferReadLimits(toRead);
     } else {
       // If we have no missing indexes, then the buffers will be at their
       // limits after reading so we need to flip them to ensure they are ready
@@ -316,7 +405,7 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
   }
 
   private void validateBuffers(ByteBuffer[] bufs) {
-    Preconditions.assertTrue(bufs.length == getRepConfig().getData());
+    Preconditions.assertTrue(bufs.length == getExpectedBufferCount());
     int chunkSize = getRepConfig().getEcChunkSize();
     for (ByteBuffer b : bufs) {
       Preconditions.assertTrue(b.remaining() == chunkSize);
@@ -358,38 +447,39 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
     }
   }
 
-  private void setBufferReadLimits(ByteBuffer[] bufs, int toRead) {
+  private void setBufferReadLimits(int toRead) {
     int chunkSize = getRepConfig().getEcChunkSize();
     int fullChunks = toRead / chunkSize;
-    if (fullChunks == getRepConfig().getData()) {
+    int data = getRepConfig().getData();
+    if (fullChunks == data) {
       // We are reading a full stripe, no concerns over padding.
       return;
     }
 
-    if (fullChunks == 0) {
-      bufs[0].limit(toRead);
-      // All buffers except the first contain no data.
-      for (int i = 1; i < bufs.length; i++) {
-        bufs[i].position(0);
-        bufs[i].limit(0);
-      }
-      // If we have less than 1 chunk, then the parity buffers are the size
-      // of the first chunk.
-      for (int i = getRepConfig().getData();
-           i < getRepConfig().getRequiredNodes(); i++) {
-        ByteBuffer b = decoderInputBuffers[i];
-        if (b != null) {
-          b.limit(toRead);
-        }
+    int partialLength = toRead % chunkSize;
+    setReadLimits(partialLength, fullChunks, decoderInputBuffers, allIndexes);
+    setReadLimits(partialLength, fullChunks, decoderOutputBuffers,
+        missingIndexes);
+  }
+
+  private void setReadLimits(int partialChunkSize, int fullChunks,
+      ByteBuffer[] buffers, Collection<Integer> indexes) {
+    int data = getRepConfig().getData();
+    Preconditions.assertTrue(buffers.length == indexes.size(),
+        "Mismatch: %d buffers but %d indexes", buffers.length, indexes.size());
+    Iterator<ByteBuffer> iter = Arrays.asList(buffers).iterator();
+    for (int i : indexes) {
+      ByteBuffer buf = iter.next();
+      if (buf == null) {
+        continue;
       }
-    } else {
-      int remainingLength = toRead % chunkSize;
-      // The first partial has the remaining length
-      bufs[fullChunks].limit(remainingLength);
-      // All others have a zero limit
-      for (int i = fullChunks + 1; i < bufs.length; i++) {
-        bufs[i].position(0);
-        bufs[i].limit(0);
+      if (i == fullChunks) {
+        buf.limit(partialChunkSize);
+      } else if (fullChunks < i && i < data) {
+        buf.position(0);
+        buf.limit(0);
+      } else if (data <= i && fullChunks == 0) {
+        buf.limit(partialChunkSize);
       }
     }
   }
@@ -411,41 +501,38 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
    * Take the parity indexes which are already used, and the others which are
    * available, and select random indexes to meet numRequired. The resulting
    * list is sorted in ascending order of the indexes.
-   * @param locations The list of locations for all blocks in the block group/
-   * @param numRequired The number of parity chunks needed for reconstruction
-   * @return A list of indexes indicating which parity locations to read.
    */
   @SuppressWarnings("java:S2245") // no need for secure random
-  private List<Integer> selectParityIndexes(
-      DatanodeDetails[] locations, int numRequired) {
-    List<Integer> indexes = new ArrayList<>();
-    List<Integer> selected = new ArrayList<>();
-    ECReplicationConfig repConfig = getRepConfig();
-    for (int i = repConfig.getData(); i < repConfig.getRequiredNodes(); i++) {
-      if (locations[i] != null && !failedDataIndexes.contains(i)
-          && decoderInputBuffers[i] == null) {
-        indexes.add(i);
-      }
-      // If we are re-initializing, we want to make sure we are re-using any
-      // previously selected good parity indexes, as the block stream is already
-      // opened.
-      if (decoderInputBuffers[i] != null && !failedDataIndexes.contains(i)) {
+  private SortedSet<Integer> selectInternalInputs(
+      SortedSet<Integer> available, long count) {
+
+    LOG.debug("Selecting {} internal inputs from {}", count, available);
+
+    if (count <= 0) {
+      return emptySortedSet();
+    }
+
+    if (available.size() == count) {
+      return available;
+    }
+
+    SortedSet<Integer> selected = new TreeSet<>();
+    for (int i : available) {
+      if (decoderInputBuffers[i] != null) {
+        // If we are re-initializing, we want to make sure we are re-using any
+        // previously selected good parity indexes, as the block stream is
+        // already opened.
         selected.add(i);
       }
     }
-    Preconditions.assertTrue(indexes.size() + selected.size() >= numRequired);
-    Random rand = new Random();
-    while (selected.size() < numRequired) {
-      selected.add(indexes.remove(rand.nextInt(indexes.size())));
-    }
-    Collections.sort(selected);
-    return selected;
-  }
+    List<Integer> candidates = new ArrayList<>(available);
+    candidates.removeAll(selected);
+    Collections.shuffle(candidates);
+    selected.addAll(candidates.stream()
+        .limit(count - selected.size())
+        .collect(toSet()));
 
-  private ByteBuffer allocateBuffer(ECReplicationConfig repConfig) {
-    ByteBuffer buf = byteBufferPool.getBuffer(
-        false, repConfig.getEcChunkSize());
-    return buf;
+    return selected;
   }
 
   private void flipInputs() {
@@ -456,9 +543,8 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
     }
   }
 
-  private void clearParityBuffers() {
-    for (int i = getRepConfig().getData();
-         i < getRepConfig().getRequiredNodes(); i++) {
+  private void clearInternalBuffers() {
+    for (int i : internalBuffers) {
       if (decoderInputBuffers[i] != null) {
         decoderInputBuffers[i].clear();
       }
@@ -469,9 +555,10 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
       throws IOException, InterruptedException {
     Queue<ImmutablePair<Integer, Future<Void>>> pendingReads
         = new ArrayDeque<>();
-    for (int i : dataIndexes) {
+    for (int i : selectedIndexes) {
+      ByteBuffer buf = decoderInputBuffers[i];
       pendingReads.add(new ImmutablePair<>(i, executor.submit(() -> {
-        readIntoBuffer(i, decoderInputBuffers[i]);
+        readIntoBuffer(i, buf);
         return null;
       })));
     }
@@ -544,12 +631,24 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
    * @throws IOException
    */
   private void decodeStripe() throws IOException {
-    decoder.decode(decoderInputBuffers, missingIndexes, decoderOutputBuffers);
+    int[] erasedIndexes = missingIndexes.stream()
+        .mapToInt(Integer::valueOf)
+        .toArray();
+    decoder.decode(decoderInputBuffers, erasedIndexes, decoderOutputBuffers);
     flipInputs();
   }
 
   @Override
   public synchronized boolean hasSufficientLocations() {
+    if (decoderInputBuffers == null) {
+      // The EC decoder needs an array data+parity long, with missing or not
+      // needed indexes set to null.
+      decoderInputBuffers = new ByteBuffer[getRepConfig().getRequiredNodes()];
+    }
+
+    markMissingLocationsAsFailed();
+    selectIndexes();
+
     // The number of locations needed is a function of the EC Chunk size. If the
     // block length is <= the chunk size, we should only have one data location.
     // If it is greater than the chunk size but less than chunk_size * 2, then
@@ -559,22 +658,8 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
     // be all zeros.
     // Then we need a total of dataNum blocks available across the available
     // data, parity and padding blocks.
-    ECReplicationConfig repConfig = getRepConfig();
-    int expectedDataBlocks = calculateExpectedDataBlocks(repConfig);
-    int availableLocations =
-        availableDataLocations(expectedDataBlocks) + availableParityLocations();
-    int paddedLocations = repConfig.getData() - expectedDataBlocks;
-    int failedLocations = failedDataIndexes.size();
-
-    if (availableLocations + paddedLocations - failedLocations
-        >= repConfig.getData()) {
-      return true;
-    } else {
-      LOG.error("There are insufficient locations. {} available; {} padded;" +
-          " {} failed; {} expected;", availableLocations, paddedLocations,
-          failedLocations, expectedDataBlocks);
-      return false;
-    }
+
+    return selectedIndexes.size() >= dataIndexes.size();
   }
 
   @Override
@@ -596,18 +681,12 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
   }
 
   private void freeBuffers() {
-    // Inside this class, we only allocate buffers to read parity into. Data
-    // is reconstructed or read into a set of buffers passed in from the calling
-    // class. Therefore we only need to ensure we free the parity buffers here.
+    // free any internal buffers
     if (decoderInputBuffers != null) {
-      for (int i = getRepConfig().getData();
-           i < getRepConfig().getRequiredNodes(); i++) {
-        ByteBuffer buf = decoderInputBuffers[i];
-        if (buf != null) {
-          byteBufferPool.putBuffer(buf);
-          decoderInputBuffers[i] = null;
-        }
+      for (int i : new ArrayList<>(internalBuffers)) {
+        releaseInternalBuffer(i);
       }
+      internalBuffers.clear();
     }
     initialized = false;
   }
@@ -629,4 +708,63 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
     super.seek(pos);
   }
 
+  /**
+   * Determine which indexes are missing, taking into account the length of the
+   * block. For a block shorter than a full EC stripe, it is expected that
+   * some data locations will not be present.
+   * Populates the missingIndexes and selectedIndexes instance variables.
+   */
+  private void selectIndexes() {
+    SortedSet<Integer> candidates;
+    int required;
+
+    missingIndexes.clear();
+    selectedIndexes.clear();
+
+    if (isOfflineRecovery()) {
+      if (!paddingIndexes.isEmpty()) {
+        paddingIndexes.forEach(i ->
+            Preconditions.assertTrue(!recoveryIndexes.contains(i)));
+      }
+
+      missingIndexes.addAll(recoveryIndexes);
+
+      // data locations available for reading
+      SortedSet<Integer> availableIndexes = new TreeSet<>();
+      availableIndexes.addAll(dataIndexes);
+      availableIndexes.addAll(parityIndexes);
+      availableIndexes.removeAll(failedDataIndexes);
+      availableIndexes.removeAll(recoveryIndexes);
+
+      // choose from all available
+      candidates = availableIndexes;
+      required = dataIndexes.size();
+    } else {
+      missingIndexes.addAll(failedDataIndexes);
+      missingIndexes.retainAll(dataIndexes);
+
+      SortedSet<Integer> dataAvailable = new TreeSet<>(dataIndexes);
+      dataAvailable.removeAll(failedDataIndexes);
+
+      SortedSet<Integer> parityAvailable = new TreeSet<>(parityIndexes);
+      parityAvailable.removeAll(failedDataIndexes);
+
+      selectedIndexes.addAll(dataAvailable);
+
+      // choose from parity
+      candidates = parityAvailable;
+      required = dataIndexes.size() - dataAvailable.size();
+    }
+
+    SortedSet<Integer> internal = selectInternalInputs(candidates, required);
+    selectedIndexes.addAll(internal);
+  }
+
+  private static SortedSet<Integer> setOfRange(
+      int startInclusive, int endExclusive) {
+
+    return range(startInclusive, endExclusive)
+        .boxed().collect(toCollection(TreeSet::new));
+  }
+
 }
diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockReconstructedStripeInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockReconstructedStripeInputStream.java
index c461e019dd..4f12034b9b 100644
--- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockReconstructedStripeInputStream.java
+++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockReconstructedStripeInputStream.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.ozone.client.io;
 
+import com.google.common.collect.ImmutableSet;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
@@ -25,14 +26,18 @@ import org.apache.hadoop.io.ElasticByteBufferPool;
 import org.apache.hadoop.ozone.client.io.ECStreamTestUtil.TestBlockInputStreamFactory;
 import org.apache.hadoop.ozone.client.io.ECStreamTestUtil.TestBlockInputStream;
 
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -43,7 +48,11 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadLocalRandom;
 
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singleton;
+import static java.util.stream.Collectors.toSet;
 import static org.apache.hadoop.ozone.client.io.ECStreamTestUtil.generateParity;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 /**
  * Test for the ECBlockReconstructedStripeInputStream.
@@ -61,7 +70,21 @@ public class TestECBlockReconstructedStripeInputStream {
   private ExecutorService ecReconstructExecutor =
       Executors.newFixedThreadPool(3);
 
-  @Before
+  static List<Set<Integer>> recoveryCases() { // TODO better name
+    List<Set<Integer>> params = new ArrayList<>();
+    params.add(emptySet()); // non-recovery
+    for (int i = 0; i < 5; i++) {
+      params.add(singleton(i));
+    }
+    params.add(ImmutableSet.of(0, 1)); // only data
+    params.add(ImmutableSet.of(1, 4)); // data and parity
+    params.add(ImmutableSet.of(3, 4)); // only parity
+    params.add(ImmutableSet.of(2, 3)); // data and parity
+    params.add(ImmutableSet.of(2, 4)); // data and parity
+    return params;
+  }
+
+  @BeforeEach
   public void setup() {
     repConfig = new ECReplicationConfig(3, 2,
         ECReplicationConfig.EcCodec.RS, ONEMB);
@@ -71,7 +94,7 @@ public class TestECBlockReconstructedStripeInputStream {
     dataGen = new SplittableRandom(randomSeed);
   }
 
-  @After
+  @AfterEach
   public void teardown() {
     ecReconstructExecutor.shutdownNow();
   }
@@ -82,50 +105,72 @@ public class TestECBlockReconstructedStripeInputStream {
     BlockLocationInfo keyInfo = ECStreamTestUtil
         .createKeyInfo(repConfig, 1, ONEMB);
     try (ECBlockInputStream ecb = createInputStream(keyInfo)) {
-      Assert.assertTrue(ecb.hasSufficientLocations());
+      Assertions.assertTrue(ecb.hasSufficientLocations());
     }
     // Two Chunks, but missing data block 2.
     Map<DatanodeDetails, Integer> dnMap
         = ECStreamTestUtil.createIndexMap(1, 4, 5);
     keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, ONEMB * 2, dnMap);
-    try (ECBlockInputStream ecb = createInputStream(keyInfo)) {
-      Assert.assertTrue(ecb.hasSufficientLocations());
+    try (ECBlockReconstructedStripeInputStream ecb =
+             createInputStream(keyInfo)) {
+      Assertions.assertTrue(ecb.hasSufficientLocations());
+      Collection<Integer> idxs = dnMap.values();
+      for (int i : idxs) {
+        ecb.setRecoveryIndexes(singleton(i - 1));
+        Assertions.assertTrue(ecb.hasSufficientLocations());
+      }
+
+      // trying to recover all
+      ecb.setRecoveryIndexes(toBufferIndexes(idxs));
+      Assertions.assertFalse(ecb.hasSufficientLocations());
     }
 
     // Three Chunks, but missing data block 2 and 3.
     dnMap = ECStreamTestUtil.createIndexMap(1, 4, 5);
     keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, ONEMB * 3, dnMap);
-    try (ECBlockInputStream ecb =  createInputStream(keyInfo)) {
-      Assert.assertTrue(ecb.hasSufficientLocations());
+    try (ECBlockReconstructedStripeInputStream ecb =
+             createInputStream(keyInfo)) {
+      Assertions.assertTrue(ecb.hasSufficientLocations());
       // Set a failed location
       List<DatanodeDetails> failed = new ArrayList<>();
       failed.add(keyInfo.getPipeline().getFirstNode());
-      ((ECBlockReconstructedStripeInputStream)ecb).addFailedDatanodes(failed);
-      Assert.assertFalse(ecb.hasSufficientLocations());
+      ecb.addFailedDatanodes(failed);
+      Assertions.assertFalse(ecb.hasSufficientLocations());
     }
 
     // Three Chunks, but missing data block 2 and 3 and parity 1.
     dnMap = ECStreamTestUtil.createIndexMap(1, 4);
     keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, ONEMB * 3, dnMap);
     try (ECBlockInputStream ecb = createInputStream(keyInfo)) {
-      Assert.assertFalse(ecb.hasSufficientLocations());
+      Assertions.assertFalse(ecb.hasSufficientLocations());
     }
 
     // Three Chunks, all available but fail 3
     dnMap = ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
     keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, ONEMB * 3, dnMap);
-    try (ECBlockInputStream ecb = createInputStream(keyInfo)) {
-      Assert.assertTrue(ecb.hasSufficientLocations());
+    try (ECBlockReconstructedStripeInputStream ecb =
+             createInputStream(keyInfo)) {
+      Assertions.assertTrue(ecb.hasSufficientLocations());
       // Set a failed location
       List<DatanodeDetails> failed = new ArrayList<>();
-      for (DatanodeDetails dn : dnMap.keySet()) {
-        failed.add(dn);
-        if (failed.size() == 3) {
-          break;
-        }
+      for (Map.Entry<DatanodeDetails, Integer> entry : dnMap.entrySet()) {
+        failed.add(entry.getKey());
+        boolean expected = failed.size() < 3;
+
+        ecb.addFailedDatanodes(singleton(entry.getKey()));
+        Assertions.assertEquals(expected, ecb.hasSufficientLocations());
+      }
+    }
+
+    try (ECBlockReconstructedStripeInputStream ecb =
+             createInputStream(keyInfo)) {
+      List<Integer> recover = new ArrayList<>();
+      for (int i : Arrays.asList(4, 3, 2, 1, 0)) {
+        recover.add(i);
+        ecb.setRecoveryIndexes(recover);
+        boolean expected = recover.size() < 3;
+        Assertions.assertEquals(expected, ecb.hasSufficientLocations());
       }
-      ((ECBlockReconstructedStripeInputStream)ecb).addFailedDatanodes(failed);
-      Assert.assertFalse(ecb.hasSufficientLocations());
     }
 
     // One chunk, indexes 2 and 3 are padding, but still reported in the
@@ -134,12 +179,14 @@ public class TestECBlockReconstructedStripeInputStream {
     dnMap = ECStreamTestUtil.createIndexMap(2, 3);
     keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, ONEMB, dnMap);
     try (ECBlockInputStream ecb = createInputStream(keyInfo)) {
-      Assert.assertFalse(ecb.hasSufficientLocations());
+      Assertions.assertFalse(ecb.hasSufficientLocations());
     }
   }
 
-  @Test
-  public void testReadFullStripesWithPartial() throws IOException {
+  @ParameterizedTest
+  @MethodSource("recoveryCases")
+  void testReadFullStripesWithPartial(Set<Integer> recoveryIndexes)
+      throws IOException {
     // Generate the input data for 3 full stripes and generate the parity.
     int chunkSize = repConfig.getEcChunkSize();
     int partialStripeSize = chunkSize * 2 - 1;
@@ -149,59 +196,65 @@ public class TestECBlockReconstructedStripeInputStream {
 
     ByteBuffer[] parity = generateParity(dataBufs, repConfig);
 
-    List<Map<DatanodeDetails, Integer>> locations = new ArrayList<>();
-    // Two data missing
-    locations.add(ECStreamTestUtil.createIndexMap(1, 4, 5));
-    // One data missing
-    locations.add(ECStreamTestUtil.createIndexMap(1, 2, 4, 5));
-    // Two data missing including first
-    locations.add(ECStreamTestUtil.createIndexMap(2, 4, 5));
-    // One data and one parity missing
-    locations.add(ECStreamTestUtil.createIndexMap(2, 3, 4));
-    // No missing indexes
-    locations.add(ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5));
+    Map<DatanodeDetails, Integer> dnMap =
+        ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
 
-    for (Map<DatanodeDetails, Integer> dnMap : locations) {
-      streamFactory = new TestBlockInputStreamFactory();
-      addDataStreamsToFactory(dataBufs, parity);
+    streamFactory = new TestBlockInputStreamFactory();
+    addDataStreamsToFactory(dataBufs, parity);
 
-      BlockLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
-          stripeSize() * 3 + partialStripeSize, dnMap);
-      streamFactory.setCurrentPipeline(keyInfo.getPipeline());
+    BlockLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
+        stripeSize() * 3 + partialStripeSize, dnMap);
+    streamFactory.setCurrentPipeline(keyInfo.getPipeline());
 
-      ByteBuffer[] bufs = allocateByteBuffers(repConfig);
-      dataGen = new SplittableRandom(randomSeed);
-      try (ECBlockReconstructedStripeInputStream ecb =
-          createInputStream(keyInfo)) {
-        // Read 3 full stripes
-        for (int i = 0; i < 3; i++) {
-          int read = ecb.readStripe(bufs);
-          for (int j = 0; j < bufs.length; j++) {
-            ECStreamTestUtil.assertBufferMatches(bufs[j], dataGen);
-          }
-          Assert.assertEquals(stripeSize(), read);
+    List<Integer> outputIndexes = getOutputIndexes(recoveryIndexes);
 
-          // Check the underlying streams have read 1 chunk per read:
-          for (TestBlockInputStream bis : streamFactory.getBlockStreams()) {
-            Assert.assertEquals(chunkSize * (i + 1),
-                bis.getPos());
+    ByteBuffer[] bufs = allocateByteBuffers(
+        outputIndexes.size(), repConfig.getEcChunkSize());
+
+    dataGen = new SplittableRandom(randomSeed);
+    try (ECBlockReconstructedStripeInputStream ecb =
+             createInputStream(keyInfo)) {
+
+      ecb.setRecoveryIndexes(recoveryIndexes);
+
+      // Read 3 full stripes
+      for (int i = 0; i < 3; i++) {
+        int read = ecb.read(bufs);
+        Assertions.assertEquals(stripeSize(), read);
+
+        int output = 0;
+        for (int j = 0; j < repConfig.getRequiredNodes(); j++) {
+          if (outputIndexes.contains(j)) {
+            ECStreamTestUtil.assertBufferMatches(bufs[output++], dataGen);
           }
-          Assert.assertEquals(stripeSize() * (i + 1), ecb.getPos());
-          clearBuffers(bufs);
         }
-        // The next read is a partial stripe
-        int read = ecb.readStripe(bufs);
-        Assert.assertEquals(partialStripeSize, read);
-        ECStreamTestUtil.assertBufferMatches(bufs[0], dataGen);
-        ECStreamTestUtil.assertBufferMatches(bufs[1], dataGen);
-        Assert.assertEquals(0, bufs[2].remaining());
-        Assert.assertEquals(0, bufs[2].position());
 
-        // A further read should give EOF
+        // Check the underlying streams have read 1 chunk per read:
+        for (TestBlockInputStream bis : streamFactory.getBlockStreams()) {
+          Assertions.assertEquals(chunkSize * (i + 1),
+              bis.getPos());
+        }
+        Assertions.assertEquals(stripeSize() * (i + 1), ecb.getPos());
         clearBuffers(bufs);
-        read = ecb.readStripe(bufs);
-        Assert.assertEquals(-1, read);
       }
+      // The next read is a partial stripe
+      int read = ecb.read(bufs);
+      Assertions.assertEquals(partialStripeSize, read);
+      int output = 0;
+      for (int j = 0; j < 2; j++) {
+        if (outputIndexes.contains(j)) {
+          ECStreamTestUtil.assertBufferMatches(bufs[output++], dataGen);
+        }
+      }
+      if (outputIndexes.contains(2)) {
+        Assertions.assertEquals(0, bufs[output].remaining());
+        Assertions.assertEquals(0, bufs[output].position());
+      }
+
+      // A further read should give EOF
+      clearBuffers(bufs);
+      read = ecb.read(bufs);
+      Assertions.assertEquals(-1, read);
     }
   }
 
@@ -226,22 +279,62 @@ public class TestECBlockReconstructedStripeInputStream {
     dataGen = new SplittableRandom(randomSeed);
     try (ECBlockReconstructedStripeInputStream ecb =
         createInputStream(keyInfo)) {
-      int read = ecb.readStripe(bufs);
-      Assert.assertEquals(blockLength, read);
+      int read = ecb.read(bufs);
+      Assertions.assertEquals(blockLength, read);
+      ECStreamTestUtil.assertBufferMatches(bufs[0], dataGen);
+      Assertions.assertEquals(0, bufs[1].remaining());
+      Assertions.assertEquals(0, bufs[1].position());
+      Assertions.assertEquals(0, bufs[2].remaining());
+      Assertions.assertEquals(0, bufs[2].position());
+      // Check the underlying streams have been advanced by 1 blockLength:
+      for (TestBlockInputStream bis : streamFactory.getBlockStreams()) {
+        Assertions.assertEquals(blockLength, bis.getPos());
+      }
+      Assertions.assertEquals(ecb.getPos(), blockLength);
+      clearBuffers(bufs);
+      // A further read should give EOF
+      read = ecb.read(bufs);
+      Assertions.assertEquals(-1, read);
+    }
+  }
+
+  @Test
+  void recoverPartialStripe() throws IOException {
+    int ecChunkSize = repConfig.getEcChunkSize();
+    int blockLength = ecChunkSize - 1;
+    ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(), 3 * ONEMB);
+    ECStreamTestUtil
+        .randomFill(dataBufs, ecChunkSize, dataGen, blockLength);
+    ByteBuffer[] parity = generateParity(dataBufs, repConfig);
+    addDataStreamsToFactory(dataBufs, parity);
+
+    // We have a length that is less than a single chunk, so blocks 2 and 3 are
+    // padding and will not be present. Parity blocks are lost and need to be
+    // recovered from block 1 and padded blocks 2 and 3.
+    Map<DatanodeDetails, Integer> dnMap =
+        ECStreamTestUtil.createIndexMap(1, 4, 5);
+    ByteBuffer[] bufs = allocateByteBuffers(2, ecChunkSize);
+    BlockLocationInfo keyInfo =
+        ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
+    streamFactory.setCurrentPipeline(keyInfo.getPipeline());
+    dataGen = new SplittableRandom(randomSeed);
+    try (ECBlockReconstructedStripeInputStream ecb =
+             createInputStream(keyInfo)) {
+      ecb.setRecoveryIndexes(Arrays.asList(3, 4));
+
+      int read = ecb.read(bufs);
+      Assertions.assertEquals(blockLength, read);
       ECStreamTestUtil.assertBufferMatches(bufs[0], dataGen);
-      Assert.assertEquals(0, bufs[1].remaining());
-      Assert.assertEquals(0, bufs[1].position());
-      Assert.assertEquals(0, bufs[2].remaining());
-      Assert.assertEquals(0, bufs[2].position());
+      ECStreamTestUtil.assertBufferMatches(bufs[1], dataGen);
       // Check the underlying streams have been advanced by 1 blockLength:
       for (TestBlockInputStream bis : streamFactory.getBlockStreams()) {
-        Assert.assertEquals(blockLength, bis.getPos());
+        Assertions.assertEquals(blockLength, bis.getPos());
       }
-      Assert.assertEquals(ecb.getPos(), blockLength);
+      Assertions.assertEquals(ecb.getPos(), blockLength);
       clearBuffers(bufs);
       // A further read should give EOF
-      read = ecb.readStripe(bufs);
-      Assert.assertEquals(-1, read);
+      read = ecb.read(bufs);
+      Assertions.assertEquals(-1, read);
     }
   }
 
@@ -268,21 +361,21 @@ public class TestECBlockReconstructedStripeInputStream {
     dataGen = new SplittableRandom(randomSeed);
     try (ECBlockReconstructedStripeInputStream ecb =
         createInputStream(keyInfo)) {
-      int read = ecb.readStripe(bufs);
-      Assert.assertEquals(blockLength, read);
+      int read = ecb.read(bufs);
+      Assertions.assertEquals(blockLength, read);
       ECStreamTestUtil.assertBufferMatches(bufs[0], dataGen);
       ECStreamTestUtil.assertBufferMatches(bufs[1], dataGen);
-      Assert.assertEquals(0, bufs[2].remaining());
-      Assert.assertEquals(0, bufs[2].position());
+      Assertions.assertEquals(0, bufs[2].remaining());
+      Assertions.assertEquals(0, bufs[2].position());
       // Check the underlying streams have been advanced by 1 chunk:
       for (TestBlockInputStream bis : streamFactory.getBlockStreams()) {
-        Assert.assertEquals(chunkSize, bis.getPos());
+        Assertions.assertEquals(chunkSize, bis.getPos());
       }
-      Assert.assertEquals(ecb.getPos(), blockLength);
+      Assertions.assertEquals(ecb.getPos(), blockLength);
       clearBuffers(bufs);
       // A further read should give EOF
-      read = ecb.readStripe(bufs);
-      Assert.assertEquals(-1, read);
+      read = ecb.read(bufs);
+      Assertions.assertEquals(-1, read);
     }
   }
 
@@ -325,20 +418,20 @@ public class TestECBlockReconstructedStripeInputStream {
       dataGen = new SplittableRandom(randomSeed);
       try (ECBlockReconstructedStripeInputStream ecb =
           createInputStream(keyInfo)) {
-        int read = ecb.readStripe(bufs);
-        Assert.assertEquals(blockLength, read);
+        int read = ecb.read(bufs);
+        Assertions.assertEquals(blockLength, read);
         ECStreamTestUtil.assertBufferMatches(bufs[0], dataGen);
         ECStreamTestUtil.assertBufferMatches(bufs[1], dataGen);
         ECStreamTestUtil.assertBufferMatches(bufs[2], dataGen);
         // Check the underlying streams have been advanced by 1 chunk:
         for (TestBlockInputStream bis : streamFactory.getBlockStreams()) {
-          Assert.assertEquals(0, bis.getRemaining());
+          Assertions.assertEquals(0, bis.getRemaining());
         }
-        Assert.assertEquals(ecb.getPos(), blockLength);
+        Assertions.assertEquals(ecb.getPos(), blockLength);
         clearBuffers(bufs);
         // A further read should give EOF
-        read = ecb.readStripe(bufs);
-        Assert.assertEquals(-1, read);
+        read = ecb.read(bufs);
+        Assertions.assertEquals(-1, read);
       }
     }
   }
@@ -368,8 +461,8 @@ public class TestECBlockReconstructedStripeInputStream {
     try (ECBlockReconstructedStripeInputStream ecb =
         createInputStream(keyInfo)) {
       try {
-        ecb.readStripe(bufs);
-        Assert.fail("Read should have thrown an exception");
+        ecb.read(bufs);
+        Assertions.fail("Read should have thrown an exception");
       } catch (InsufficientLocationsException e) {
         // expected
       }
@@ -413,43 +506,43 @@ public class TestECBlockReconstructedStripeInputStream {
           createInputStream(keyInfo)) {
 
         // Read Stripe 1
-        int read = ecb.readStripe(bufs);
+        int read = ecb.read(bufs);
         for (int j = 0; j < bufs.length; j++) {
           validateContents(dataBufs[j], bufs[j], 0, chunkSize);
         }
-        Assert.assertEquals(stripeSize(), read);
-        Assert.assertEquals(dataLength - stripeSize(), ecb.getRemaining());
+        Assertions.assertEquals(stripeSize(), read);
+        Assertions.assertEquals(dataLength - stripeSize(), ecb.getRemaining());
 
         // Seek to 0 and read again
         clearBuffers(bufs);
         ecb.seek(0);
-        ecb.readStripe(bufs);
+        ecb.read(bufs);
         for (int j = 0; j < bufs.length; j++) {
           validateContents(dataBufs[j], bufs[j], 0, chunkSize);
         }
-        Assert.assertEquals(stripeSize(), read);
-        Assert.assertEquals(dataLength - stripeSize(), ecb.getRemaining());
+        Assertions.assertEquals(stripeSize(), read);
+        Assertions.assertEquals(dataLength - stripeSize(), ecb.getRemaining());
 
         // Seek to the last stripe
         // Seek to the last stripe
         clearBuffers(bufs);
         ecb.seek(stripeSize() * 3);
-        read = ecb.readStripe(bufs);
+        read = ecb.read(bufs);
         validateContents(dataBufs[0], bufs[0], 3 * chunkSize, chunkSize);
         validateContents(dataBufs[1], bufs[1], 3 * chunkSize, chunkSize - 1);
-        Assert.assertEquals(0, bufs[2].remaining());
-        Assert.assertEquals(partialStripeSize, read);
-        Assert.assertEquals(0, ecb.getRemaining());
+        Assertions.assertEquals(0, bufs[2].remaining());
+        Assertions.assertEquals(partialStripeSize, read);
+        Assertions.assertEquals(0, ecb.getRemaining());
 
         // seek to the start of stripe 3
         clearBuffers(bufs);
         ecb.seek(stripeSize() * (long)2);
-        read = ecb.readStripe(bufs);
+        read = ecb.read(bufs);
         for (int j = 0; j < bufs.length; j++) {
           validateContents(dataBufs[j], bufs[j], 2 * chunkSize, chunkSize);
         }
-        Assert.assertEquals(stripeSize(), read);
-        Assert.assertEquals(partialStripeSize, ecb.getRemaining());
+        Assertions.assertEquals(stripeSize(), read);
+        Assertions.assertEquals(partialStripeSize, ecb.getRemaining());
       }
     }
   }
@@ -466,9 +559,9 @@ public class TestECBlockReconstructedStripeInputStream {
         createInputStream(keyInfo)) {
       try {
         ecb.seek(10);
-        Assert.fail("Seek should have thrown an exception");
+        Assertions.fail("Seek should have thrown an exception");
       } catch (IOException e) {
-        Assert.assertEquals("Requested position 10 does not align " +
+        Assertions.assertEquals("Requested position 10 does not align " +
             "with a stripe offset", e.getMessage());
       }
     }
@@ -509,12 +602,12 @@ public class TestECBlockReconstructedStripeInputStream {
           createInputStream(keyInfo)) {
         // After reading the first stripe, make one of the streams error
         for (int i = 0; i < 3; i++) {
-          int read = ecb.readStripe(bufs);
+          int read = ecb.read(bufs);
           for (int j = 0; j < bufs.length; j++) {
             validateContents(dataBufs[j], bufs[j], i * chunkSize, chunkSize);
           }
-          Assert.assertEquals(stripeSize() * (i + 1), ecb.getPos());
-          Assert.assertEquals(stripeSize(), read);
+          Assertions.assertEquals(stripeSize() * (i + 1), ecb.getPos());
+          Assertions.assertEquals(stripeSize(), read);
           clearBuffers(bufs);
           if (i == 0) {
             Integer failStream =
@@ -525,17 +618,17 @@ public class TestECBlockReconstructedStripeInputStream {
           }
         }
         // The next read is a partial stripe
-        int read = ecb.readStripe(bufs);
-        Assert.assertEquals(partialStripeSize, read);
+        int read = ecb.read(bufs);
+        Assertions.assertEquals(partialStripeSize, read);
         validateContents(dataBufs[0], bufs[0], 3 * chunkSize, chunkSize);
         validateContents(dataBufs[1], bufs[1], 3 * chunkSize, chunkSize - 1);
-        Assert.assertEquals(0, bufs[2].remaining());
-        Assert.assertEquals(0, bufs[2].position());
+        Assertions.assertEquals(0, bufs[2].remaining());
+        Assertions.assertEquals(0, bufs[2].position());
 
         // seek back to zero and read a stripe to re-open the streams
         ecb.seek(0);
         clearBuffers(bufs);
-        ecb.readStripe(bufs);
+        ecb.read(bufs);
         // Now fail another random stream and the read should fail with
         // insufficient locations
         Set<Integer> currentStreams =
@@ -546,8 +639,8 @@ public class TestECBlockReconstructedStripeInputStream {
             .setShouldError(true);
         try {
           clearBuffers(bufs);
-          ecb.readStripe(bufs);
-          Assert.fail("InsufficientLocationsException expected");
+          ecb.read(bufs);
+          Assertions.fail("InsufficientLocationsException expected");
         } catch (InsufficientLocationsException e) {
           // expected
         }
@@ -555,7 +648,7 @@ public class TestECBlockReconstructedStripeInputStream {
     }
   }
 
-  @Test(expected = InsufficientLocationsException.class)
+  @Test
   public void testAllLocationsFailOnFirstRead() throws IOException {
     // This test simulates stale nodes. When the nodes are stale, but not yet
     // dead, the locations will still be given to the client and it will try to
@@ -587,7 +680,8 @@ public class TestECBlockReconstructedStripeInputStream {
     ByteBuffer[] bufs = allocateByteBuffers(repConfig);
     try (ECBlockReconstructedStripeInputStream ecb =
              createInputStream(keyInfo)) {
-      ecb.readStripe(bufs);
+      assertThrows(InsufficientLocationsException.class,
+          () -> ecb.read(bufs));
     }
   }
 
@@ -624,17 +718,17 @@ public class TestECBlockReconstructedStripeInputStream {
       ecb.addFailedDatanodes(failed);
 
       // Read full stripe
-      int read = ecb.readStripe(bufs);
+      int read = ecb.read(bufs);
       for (int j = 0; j < bufs.length; j++) {
         ECStreamTestUtil.assertBufferMatches(bufs[j], dataGen);
       }
-      Assert.assertEquals(stripeSize(), read);
+      Assertions.assertEquals(stripeSize(), read);
 
       // Now ensure that streams with repIndexes 1, 2 and 3 have not been
       // created in the stream factory, indicating we did not read them.
       List<TestBlockInputStream> streams = streamFactory.getBlockStreams();
       for (TestBlockInputStream stream : streams) {
-        Assert.assertTrue(stream.getEcReplicaIndex() > 2);
+        Assertions.assertTrue(stream.getEcReplicaIndex() > 2);
       }
     }
   }
@@ -675,9 +769,9 @@ public class TestECBlockReconstructedStripeInputStream {
   private void validateContents(ByteBuffer src, ByteBuffer data, int offset,
       int count) {
     byte[] srcArray = src.array();
-    Assert.assertEquals(count, data.remaining());
+    Assertions.assertEquals(count, data.remaining());
     for (int i = offset; i < offset + count; i++) {
-      Assert.assertEquals("Element " + i, srcArray[i], data.get());
+      Assertions.assertEquals(srcArray[i], data.get(), "Element " + i);
     }
     data.flip();
   }
@@ -711,11 +805,29 @@ public class TestECBlockReconstructedStripeInputStream {
   }
 
   private ByteBuffer[] allocateByteBuffers(ECReplicationConfig rConfig) {
-    ByteBuffer[] bufs = new ByteBuffer[repConfig.getData()];
+    return allocateByteBuffers(rConfig.getData(), rConfig.getEcChunkSize());
+  }
+
+  private ByteBuffer[] allocateByteBuffers(int count, int capacity) {
+    ByteBuffer[] bufs = new ByteBuffer[count];
     for (int i = 0; i < bufs.length; i++) {
-      bufs[i] = ByteBuffer.allocate(rConfig.getEcChunkSize());
+      bufs[i] = ByteBuffer.allocate(capacity);
     }
     return bufs;
   }
 
+  private List<Integer> getOutputIndexes(Set<Integer> recoveryIndexes) {
+    return recoveryIndexes.isEmpty()
+        ? Arrays.asList(0, 1, 2)
+        : new ArrayList<>(recoveryIndexes);
+  }
+
+  private static Set<Integer> toBufferIndexes(Collection<Integer> dnIdxs) {
+    return dnIdxs.stream()
+        .mapToInt(Integer::intValue)
+        .map(i -> i - 1)
+        .boxed()
+        .collect(toSet());
+  }
+
 }
diff --git a/hadoop-hdds/erasurecode/src/main/java/org/apache/ozone/erasurecode/rawcoder/ByteBufferDecodingState.java b/hadoop-hdds/erasurecode/src/main/java/org/apache/ozone/erasurecode/rawcoder/ByteBufferDecodingState.java
index 7b9ebf6ad2..3d6de14a47 100644
--- a/hadoop-hdds/erasurecode/src/main/java/org/apache/ozone/erasurecode/rawcoder/ByteBufferDecodingState.java
+++ b/hadoop-hdds/erasurecode/src/main/java/org/apache/ozone/erasurecode/rawcoder/ByteBufferDecodingState.java
@@ -95,26 +95,29 @@ class ByteBufferDecodingState extends DecodingState {
   void checkInputBuffers(ByteBuffer[] buffers) {
     int validInputs = 0;
 
-    for (ByteBuffer buffer : buffers) {
+    for (int i = 0; i < buffers.length; i++) {
+      ByteBuffer buffer = buffers[i];
+
       if (buffer == null) {
         continue;
       }
 
       if (buffer.remaining() != decodeLength) {
-        throw new IllegalArgumentException(
-            "Invalid buffer, not of length " + decodeLength);
+        throw new IllegalArgumentException("Invalid buffer [" + i +
+            "], not of length " + decodeLength);
       }
       if (buffer.isDirect() != usingDirectBuffer) {
-        throw new IllegalArgumentException(
-            "Invalid buffer, isDirect should be " + usingDirectBuffer);
+        throw new IllegalArgumentException("Invalid buffer [" + i +
+            "], isDirect should be " + usingDirectBuffer);
       }
 
       validInputs++;
     }
 
     if (validInputs < decoder.getNumDataUnits()) {
-      throw new IllegalArgumentException(
-          "No enough valid inputs are provided, not recoverable");
+      throw new IllegalArgumentException("No enough valid inputs are provided ("
+          + validInputs + " vs. " + decoder.getNumDataUnits()
+          + "), not recoverable");
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org