You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by so...@apache.org on 2021/11/16 14:30:13 UTC

[ozone] branch HDDS-3816-ec updated: HDDS-5951. EC: ECBlockReconstructedStripeInputStream should handle block read failures and continue reading (#2831)

This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
     new 52f05fc  HDDS-5951. EC: ECBlockReconstructedStripeInputStream should handle block read failures and continue reading (#2831)
52f05fc is described below

commit 52f05fc34927a6d02fcae01814b5e92e8a7d7f14
Author: Stephen O'Donnell <st...@gmail.com>
AuthorDate: Tue Nov 16 14:29:52 2021 +0000

    HDDS-5951. EC: ECBlockReconstructedStripeInputStream should handle block read failures and continue reading (#2831)
---
 .../hadoop/ozone/client/io/ECBlockInputStream.java |  22 ++--
 .../io/ECBlockReconstructedStripeInputStream.java  | 144 ++++++++++++++-------
 .../TestECBlockReconstructedStripeInputStream.java |  96 +++++++++++++-
 3 files changed, 198 insertions(+), 64 deletions(-)

diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
index e6e7ccc..07333d2 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
@@ -111,9 +111,9 @@ public class ECBlockInputStream extends BlockExtendedInputStream {
     this.xceiverClientFactory = xceiverClientFactory;
     this.refreshFunction = refreshFunction;
     this.maxLocations = repConfig.getData() + repConfig.getParity();
-    this.dataLocations =
-        new DatanodeDetails[repConfig.getData() + repConfig.getParity()];
-    this.blockStreams = new BlockExtendedInputStream[repConfig.getData()];
+    this.dataLocations = new DatanodeDetails[repConfig.getRequiredNodes()];
+    this.blockStreams =
+        new BlockExtendedInputStream[repConfig.getRequiredNodes()];
 
     this.stripeSize = (long)ecChunkSize * repConfig.getData();
     setBlockLocations(this.blockInfo.getPipeline());
@@ -152,9 +152,8 @@ public class ECBlockInputStream extends BlockExtendedInputStream {
    * stream if it has not been opened already.
    * @return BlockInput stream to read from.
    */
-  protected BlockExtendedInputStream getOrOpenStream(
-      int streamIndex, int locationIndex) {
-    BlockExtendedInputStream stream = blockStreams[streamIndex];
+  protected BlockExtendedInputStream getOrOpenStream(int locationIndex) {
+    BlockExtendedInputStream stream = blockStreams[locationIndex];
     if (stream == null) {
       // To read an EC block, we create a STANDALONE pipeline that contains the
       // single location for the block index we want to read. The EC blocks are
@@ -180,7 +179,7 @@ public class ECBlockInputStream extends BlockExtendedInputStream {
           blkInfo, pipeline,
           blockInfo.getToken(), verifyChecksum, xceiverClientFactory,
           refreshFunction);
-      blockStreams[streamIndex] = stream;
+      blockStreams[locationIndex] = stream;
     }
     return stream;
   }
@@ -258,8 +257,7 @@ public class ECBlockInputStream extends BlockExtendedInputStream {
     int totalRead = 0;
     while(strategy.getTargetLength() > 0 && remaining() > 0) {
       int currentIndex = currentStreamIndex();
-      BlockExtendedInputStream stream =
-          getOrOpenStream(currentIndex, currentIndex);
+      BlockExtendedInputStream stream = getOrOpenStream(currentIndex);
       int read = readFromStream(stream, strategy);
       totalRead += read;
       position += read;
@@ -380,10 +378,8 @@ public class ECBlockInputStream extends BlockExtendedInputStream {
           "EOF encountered at pos: " + pos + " for block: "
               + blockInfo.getBlockID());
     }
-    if (position != pos) {
-      position = pos;
-      seeked = true;
-    }
+    position = pos;
+    seeked = true;
   }
 
   @Override
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
index 54e2de0..1fdb7a2 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
@@ -36,8 +36,11 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
+import java.util.Set;
 import java.util.function.Function;
 
 /**
@@ -94,6 +97,8 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
   private int[] missingIndexes;
   // The blockLocation indexes to use to read data into the dataBuffers.
   private List<Integer> dataIndexes = new ArrayList<>();
+  // Data Indexes we have tried to read from, and failed for some reason
+  private Set<Integer> failedDataIndexes = new HashSet<>();
 
   private final RawErasureDecoder decoder;
 
@@ -109,19 +114,19 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
     decoder = CodecRegistry.getInstance()
         .getCodecFactory(repConfig.getCodec().toString())
         .createDecoder(repConfig);
+
+    // The EC decoder needs an array data+parity long, with missing or not
+    // needed indexes set to null.
+    decoderInputBuffers = new ByteBuffer[getRepConfig().getRequiredNodes()];
   }
 
   protected void init() throws InsufficientLocationsException {
     if (!hasSufficientLocations()) {
-      throw new InsufficientLocationsException("There are not enough " +
+      throw new InsufficientLocationsException("There are insufficient " +
           "datanodes to read the EC block");
     }
-
+    dataIndexes.clear();
     ECReplicationConfig repConfig = getRepConfig();
-    // The EC decoder needs an array data+parity long, with missing or not
-    // needed indexes set to null.
-    decoderInputBuffers = new ByteBuffer[
-        getRepConfig().getData() + getRepConfig().getParity()];
     DatanodeDetails[] locations = getDataLocations();
     setMissingIndexesAndDataLocations(locations);
     List<Integer> parityIndexes =
@@ -130,9 +135,16 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
     dataIndexes.addAll(parityIndexes);
     // The decoder inputs originally start as all nulls. Then we populate the
     // pieces we have data for. The parity buffers are reused for the block
-    // so we can allocated them now.
-    for (Integer i : parityIndexes) {
-      decoderInputBuffers[i] = allocateBuffer(repConfig);
+    // so we can allocated them now. On re-init, we reuse any parity buffers
+    // already allocated.
+    for (int i = repConfig.getData(); i < repConfig.getRequiredNodes(); i++) {
+      if (parityIndexes.contains(i)) {
+        if (decoderInputBuffers[i] == null) {
+          decoderInputBuffers[i] = allocateBuffer(repConfig);
+        }
+      } else {
+        decoderInputBuffers[i] = null;
+      }
     }
     decoderOutputBuffers = new ByteBuffer[missingIndexes.length];
     initialized = true;
@@ -150,9 +162,10 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
     int expectedDataBlocks = calculateExpectedDataBlocks(repConfig);
     List<Integer> missingInd = new ArrayList<>();
     for (int i = 0; i < repConfig.getData(); i++) {
-      if (locations[i] == null && i < expectedDataBlocks) {
+      if ((locations[i] == null || failedDataIndexes.contains(i))
+          && i < expectedDataBlocks) {
         missingInd.add(i);
-      } else if (locations[i] != null) {
+      } else if (locations[i] != null && !failedDataIndexes.contains(i)) {
         dataIndexes.add(i);
       }
     }
@@ -171,6 +184,7 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
     for (int i = 0; i < repConfig.getData(); i++) {
       if (isMissingIndex(i)) {
         decoderOutputBuffers[recoveryIndex++] = bufs[i];
+        decoderInputBuffers[i] = null;
       } else {
         decoderInputBuffers[i] = bufs[i];
       }
@@ -210,12 +224,28 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
       return EOF;
     }
     validateBuffers(bufs);
-    assignBuffers(bufs);
-    clearParityBuffers();
-    // Set the read limits on the buffers so we do not read any garbage data
-    // from the end of the block that is unexpected.
-    setBufferReadLimits(bufs, toRead);
-    loadDataBuffersFromStream();
+    while(true) {
+      try {
+        assignBuffers(bufs);
+        clearParityBuffers();
+        // Set the read limits on the buffers so we do not read any garbage data
+        // from the end of the block that is unexpected.
+        setBufferReadLimits(bufs, toRead);
+        loadDataBuffersFromStream();
+        break;
+      } catch (IOException e) {
+        // Re-init now the bad block has been excluded. If we have ran out of
+        // locations, init will throw an InsufficientLocations exception.
+        init();
+        // seek to the current position so it rewinds any blocks we read
+        // already.
+        seek(getPos());
+        // Reset the input positions back to zero
+        for (ByteBuffer b : bufs) {
+          b.position(0);
+        }
+      }
+    }
     padBuffers(toRead);
     flipInputs();
     decodeStripe();
@@ -277,7 +307,7 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
       return;
     }
 
-    if (fullChunks == 0){
+    if (fullChunks == 0) {
       bufs[0].limit(toRead);
       // All buffers except the first contain no data.
       for (int i = 1; i < bufs.length; i++) {
@@ -319,8 +349,9 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
   }
 
   /**
-   * Take the parity indexes which are available, shuffle them and truncate the
-   * list to the number of required parity chunks.
+   * Take the parity indexes which are already used, and the others which are
+   * available, and select random indexes to meet numRequired. The resulting
+   * list is sorted in ascending order of the indexes.
    * @param locations The list of locations for all blocks in the block group/
    * @param numRequired The number of parity chunks needed for reconstruction
    * @return A list of indexes indicating which parity locations to read.
@@ -328,19 +359,27 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
   private List<Integer> selectParityIndexes(
       DatanodeDetails[] locations, int numRequired) {
     List<Integer> indexes = new ArrayList<>();
+    List<Integer> selected = new ArrayList<>();
     ECReplicationConfig repConfig = getRepConfig();
-    for (int i = repConfig.getData();
-         i < repConfig.getParity() + repConfig.getData(); i++) {
-      if (locations[i] != null) {
+    for (int i = repConfig.getData(); i < repConfig.getRequiredNodes(); i++) {
+      if (locations[i] != null && !failedDataIndexes.contains(i)
+          && decoderInputBuffers[i] == null) {
         indexes.add(i);
       }
+      // If we are re-initializing, we want to make sure we are re-using any
+      // previously selected good parity indexes, as the block stream is already
+      // opened.
+      if (decoderInputBuffers[i] != null && !failedDataIndexes.contains(i)) {
+        selected.add(i);
+      }
     }
-    Preconditions.assertTrue(indexes.size() >= numRequired);
+    Preconditions.assertTrue(indexes.size() + selected.size() >= numRequired);
     Random rand = new Random();
-    while (indexes.size() > numRequired) {
-      indexes.remove(rand.nextInt(indexes.size()));
+    while (selected.size() < numRequired) {
+      selected.add(indexes.remove(rand.nextInt(indexes.size())));
     }
-    return indexes;
+    Collections.sort(selected);
+    return selected;
   }
 
   private ByteBuffer allocateBuffer(ECReplicationConfig repConfig) {
@@ -366,26 +405,32 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
   }
 
   protected void loadDataBuffersFromStream() throws IOException {
-    for (int i = 0; i < dataIndexes.size(); i++) {
-      BlockExtendedInputStream stream =
-          getOrOpenStream(i, dataIndexes.get(i));
-      seekStreamIfNecessary(stream, 0);
-      ByteBuffer b = decoderInputBuffers[dataIndexes.get(i)];
-      while (b.hasRemaining()) {
-        int read = stream.read(b);
-        if (read == EOF) {
-          // We should not reach EOF, as the block should have enough data to
-          // fill the buffer. If the block does not, then it indicates the block
-          // is not as long as it should be, based on the block length stored in
-          // OM. Therefore if there is any remaining space in the buffer, we
-          // should throw an exception.
-          if (b.hasRemaining()) {
-            throw new IOException("Expected to read " + b.remaining() +
-                " bytes from block " + getBlockID() + " EC index " + (i + 1) +
-                " but reached EOF");
+    for (int i : dataIndexes) {
+      try {
+        BlockExtendedInputStream stream = getOrOpenStream(i);
+        seekStreamIfNecessary(stream, 0);
+        ByteBuffer b = decoderInputBuffers[i];
+        while (b.hasRemaining()) {
+          int read = stream.read(b);
+          if (read == EOF) {
+            // We should not reach EOF, as the block should have enough data to
+            // fill the buffer. If the block does not, then it indicates the
+            // block is not as long as it should be, based on the block length
+            // stored in OM. Therefore if there is any remaining space in the
+            // buffer, we should throw an exception.
+            if (b.hasRemaining()) {
+              throw new IOException("Expected to read " + b.remaining() +
+                  " bytes from block " + getBlockID() + " EC index " + (i + 1) +
+                  " but reached EOF");
+            }
+            break;
           }
-          break;
         }
+      } catch (IOException e) {
+        LOG.warn("Failed to read from block {} EC index {}. Excluding the " +
+                "block", getBlockID(), i + 1, e);
+        failedDataIndexes.add(i);
+        throw e;
       }
     }
   }
@@ -422,12 +467,15 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
     int availableLocations =
         availableDataLocations() + availableParityLocations();
     int paddedLocations = repConfig.getData() - expectedDataBlocks;
+    int failedLocations = failedDataIndexes.size();
 
-    if (availableLocations + paddedLocations >= repConfig.getData()) {
+    if (availableLocations + paddedLocations - failedLocations
+        >= repConfig.getData()) {
       return true;
     } else {
-      LOG.warn("There are insufficient locations. {} available {} padded {} " +
-          "expected", availableLocations, paddedLocations, expectedDataBlocks);
+      LOG.error("There are insufficient locations. {} available; {} padded;" +
+          " {} failed; {} expected;", availableLocations, paddedLocations,
+          failedLocations, expectedDataBlocks);
       return false;
     }
   }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
index bf44297..33fb33a 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
 import org.apache.hadoop.ozone.client.io.ECBlockInputStream;
 import org.apache.hadoop.ozone.client.io.ECBlockReconstructedStripeInputStream;
+import org.apache.hadoop.ozone.client.io.InsufficientLocationsException;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.security.token.Token;
 import org.apache.ozone.erasurecode.CodecRegistry;
@@ -354,8 +355,8 @@ public class TestECBlockReconstructedStripeInputStream {
       try {
         ecb.readStripe(bufs);
         Assert.fail("Read should have thrown an exception");
-      } catch (IOException e) {
-        Assert.assertTrue(e.getMessage().matches("^Expected\\sto\\sread.+"));
+      } catch (InsufficientLocationsException e) {
+        // expected
       }
     }
   }
@@ -459,6 +460,87 @@ public class TestECBlockReconstructedStripeInputStream {
     }
   }
 
+  @Test
+  public void testErrorReadingBlockContinuesReading() throws IOException {
+    // Generate the input data for 3 full stripes and generate the parity.
+    int chunkSize = repConfig.getEcChunkSize();
+    int partialStripeSize = chunkSize * 2 - 1;
+    ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(),
+        4 * chunkSize);
+    dataBufs[1].limit(4 * chunkSize - 1);
+    dataBufs[2].limit(3 * chunkSize);
+    for (ByteBuffer b : dataBufs) {
+      randomFill(b);
+    }
+    ByteBuffer[] parity = generateParity(dataBufs, repConfig);
+
+    List<List<Integer>> failLists = new ArrayList<>();
+    failLists.add(indexesToList(0, 1));
+    // These will be the first parity read and then the next parity read as a
+    // replacement
+    failLists.add(indexesToList(2, 3));
+    // First parity and then the data block
+    failLists.add(indexesToList(2, 0));
+
+    for (List<Integer> failList : failLists) {
+      streamFactory = new TestBlockInputStreamFactory();
+      addDataStreamsToFactory(dataBufs, parity);
+
+      // Data block index 3 is missing and needs recovered initially.
+      Map<DatanodeDetails, Integer> dnMap = createIndexMap(1, 2, 4, 5);
+      OmKeyLocationInfo keyInfo = createKeyInfo(repConfig,
+          stripeSize() * 3 + partialStripeSize, dnMap);
+      streamFactory.setCurrentPipeline(keyInfo.getPipeline());
+
+      ByteBuffer[] bufs = allocateByteBuffers(repConfig);
+      try (ECBlockReconstructedStripeInputStream ecb =
+          new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
+                   null, null, streamFactory)) {
+        // After reading the first stripe, make one of the streams error
+        for (int i = 0; i < 3; i++) {
+          int read = ecb.readStripe(bufs);
+          for (int j = 0; j < bufs.length; j++) {
+            validateContents(dataBufs[j], bufs[j], i * chunkSize, chunkSize);
+          }
+          Assert.assertEquals(stripeSize() * (i + 1), ecb.getPos());
+          Assert.assertEquals(stripeSize(), read);
+          clearBuffers(bufs);
+          if (i == 0) {
+            streamFactory.getBlockStreams().get(failList.remove(0))
+                .setShouldError(true);
+          }
+        }
+        // The next read is a partial stripe
+        int read = ecb.readStripe(bufs);
+        Assert.assertEquals(partialStripeSize, read);
+        validateContents(dataBufs[0], bufs[0], 3 * chunkSize, chunkSize);
+        validateContents(dataBufs[1], bufs[1], 3 * chunkSize, chunkSize - 1);
+        Assert.assertEquals(0, bufs[2].remaining());
+        Assert.assertEquals(0, bufs[2].position());
+
+        // seek back to zero, make another block fail. The next read should
+        // error as there are not enough blocks to read.
+        ecb.seek(0);
+        streamFactory.getBlockStreams().get(failList.remove(0))
+            .setShouldError(true);
+        try {
+          clearBuffers(bufs);
+          ecb.readStripe(bufs);
+          Assert.fail("InsufficientLocationsException expected");
+        } catch (InsufficientLocationsException e) {
+          // expected
+        }
+      }
+    }
+  }
+
+  private List<Integer> indexesToList(int... indexes) {
+    List<Integer> list = new ArrayList<>();
+    for (int i : indexes) {
+      list.add(i);
+    }
+    return list;
+  }
 
   private void addDataStreamsToFactory(ByteBuffer[] data, ByteBuffer[] parity) {
     List<ByteBuffer> dataStreams = new ArrayList<>();
@@ -655,7 +737,7 @@ public class TestECBlockReconstructedStripeInputStream {
       int repInd = currentPipeline.getReplicaIndex(pipeline.getNodes().get(0));
       TestBlockInputStream stream = new TestBlockInputStream(
           blockInfo.getBlockID(), blockInfo.getLength(),
-          blockStreamData.get(repInd -1));
+          blockStreamData.get(repInd - 1));
       blockStreams.add(stream);
       return stream;
     }
@@ -667,6 +749,7 @@ public class TestECBlockReconstructedStripeInputStream {
     private boolean closed = false;
     private BlockID blockID;
     private long length;
+    private boolean shouldError = false;
     private static final byte EOF = -1;
 
     TestBlockInputStream(BlockID blockId, long blockLen, ByteBuffer data) {
@@ -680,6 +763,10 @@ public class TestECBlockReconstructedStripeInputStream {
       return closed;
     }
 
+    public void setShouldError(boolean val) {
+      shouldError = val;
+    }
+
     @Override
     public BlockID getBlockID() {
       return blockID;
@@ -703,6 +790,9 @@ public class TestECBlockReconstructedStripeInputStream {
 
     @Override
     public int read(ByteBuffer buf) throws IOException {
+      if (shouldError) {
+        throw new IOException("Simulated error reading block");
+      }
       if (getRemaining() == 0) {
         return EOF;
       }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org