You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by so...@apache.org on 2021/11/11 11:17:19 UTC

[ozone] branch HDDS-3816-ec updated: HDDS-5950. EC: Implement seek on ECBlockReconstructedStripeInputStream (#2818)

This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
     new f85ac37  HDDS-5950. EC: Implement seek on ECBlockReconstructedStripeInputStream (#2818)
f85ac37 is described below

commit f85ac376e67daaa7191abb3de902c16c913440f9
Author: Stephen O'Donnell <st...@gmail.com>
AuthorDate: Thu Nov 11 11:16:58 2021 +0000

    HDDS-5950. EC: Implement seek on ECBlockReconstructedStripeInputStream (#2818)
---
 .../hadoop/ozone/client/io/ECBlockInputStream.java |  33 +++--
 .../io/ECBlockReconstructedStripeInputStream.java  |  47 ++++++-
 .../TestECBlockReconstructedStripeInputStream.java | 144 ++++++++++++++++++++-
 3 files changed, 204 insertions(+), 20 deletions(-)

diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
index 52b370b..e6e7ccc 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
@@ -282,19 +282,8 @@ public class ECBlockInputStream extends BlockExtendedInputStream {
     return blockInfo.getBlockID();
   }
 
-  /**
-   * Read the most allowable amount of data from the current stream. This
-   * ensures we don't read past the end of an EC cell or the overall block
-   * group length.
-   * @param stream Stream to read from
-   * @param strategy The ReaderStrategy to read data into
-   * @return
-   * @throws IOException
-   */
-  private int readFromStream(BlockExtendedInputStream stream,
-      ByteReaderStrategy strategy)
-      throws IOException {
-    long partialPosition = position % ecChunkSize;
+  protected void seekStreamIfNecessary(BlockExtendedInputStream stream,
+      long partialChunkSize) throws IOException {
     if (seeked) {
       // Seek on the underlying streams is performed lazily, as there is a
       // possibility a read after a seek may only read a small amount of data.
@@ -302,7 +291,7 @@ public class ECBlockInputStream extends BlockExtendedInputStream {
       // but in the usual case, where there are no seeks at all, we don't need
       // to do this extra work.
       long basePosition = (position / stripeSize) * (long)ecChunkSize;
-      long streamPosition = basePosition + partialPosition;
+      long streamPosition = basePosition + partialChunkSize;
       if (streamPosition != stream.getPos()) {
         // This ECBlockInputStream has been seeked, so the underlying
         // block stream is no longer at the correct position. Therefore we need
@@ -310,6 +299,22 @@ public class ECBlockInputStream extends BlockExtendedInputStream {
         stream.seek(streamPosition);
       }
     }
+  }
+
+  /**
+   * Read the most allowable amount of data from the current stream. This
+   * ensures we don't read past the end of an EC cell or the overall block
+   * group length.
+   * @param stream Stream to read from
+   * @param strategy The ReaderStrategy to read data into
+   * @return
+   * @throws IOException
+   */
+  private int readFromStream(BlockExtendedInputStream stream,
+      ByteReaderStrategy strategy)
+      throws IOException {
+    long partialPosition = position % ecChunkSize;
+    seekStreamIfNecessary(stream, partialPosition);
     long ecLimit = ecChunkSize - partialPosition;
     // Free space in the buffer to read into
     long bufLimit = strategy.getTargetLength();
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
index 674c1c6..54e2de0 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
@@ -212,11 +212,16 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
     validateBuffers(bufs);
     assignBuffers(bufs);
     clearParityBuffers();
+    // Set the read limits on the buffers so we do not read any garbage data
+    // from the end of the block that is unexpected.
+    setBufferReadLimits(bufs, toRead);
     loadDataBuffersFromStream();
     padBuffers(toRead);
     flipInputs();
     decodeStripe();
-    unPadBuffers(bufs, toRead);
+    // Reset the buffer positions after padding to the read limits so the client
+    // reads the correct amount of data.
+    setBufferReadLimits(bufs, toRead);
     setPos(getPos() + toRead);
     return toRead;
   }
@@ -233,11 +238,11 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
     int dataNum = getRepConfig().getData();
     int parityNum = getRepConfig().getParity();
     int chunkSize = getRepConfig().getEcChunkSize();
-    int fullChunks = toRead / chunkSize;
-    if (fullChunks == dataNum) {
+    if (toRead >= getStripeSize()) {
       // There is no padding to do - we are reading a full stripe.
       return;
     }
+    int fullChunks = toRead / chunkSize;
     // The size of each chunk is governed by the size of the first chunk.
     // The parity always matches the first chunk size.
     int paritySize = Math.min(toRead, chunkSize);
@@ -264,22 +269,32 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
     }
   }
 
-  private void unPadBuffers(ByteBuffer[] bufs, int toRead) {
+  private void setBufferReadLimits(ByteBuffer[] bufs, int toRead) {
     int chunkSize = getRepConfig().getEcChunkSize();
     int fullChunks = toRead / chunkSize;
-    int remainingLength = toRead % chunkSize;
     if (fullChunks == getRepConfig().getData()) {
       // We are reading a full stripe, no concerns over padding.
       return;
     }
 
     if (fullChunks == 0){
+      bufs[0].limit(toRead);
       // All buffers except the first contain no data.
       for (int i = 1; i < bufs.length; i++) {
         bufs[i].position(0);
         bufs[i].limit(0);
       }
+      // If we have less than 1 chunk, then the parity buffers are the size
+      // of the first chunk.
+      for (int i = getRepConfig().getData();
+           i < getRepConfig().getRequiredNodes(); i++) {
+        ByteBuffer b = decoderInputBuffers[i];
+        if (b != null) {
+          b.limit(toRead);
+        }
+      }
     } else {
+      int remainingLength = toRead % chunkSize;
       // The first partial has the remaining length
       bufs[fullChunks].limit(remainingLength);
       // All others have a zero limit
@@ -354,10 +369,21 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
     for (int i = 0; i < dataIndexes.size(); i++) {
       BlockExtendedInputStream stream =
           getOrOpenStream(i, dataIndexes.get(i));
+      seekStreamIfNecessary(stream, 0);
       ByteBuffer b = decoderInputBuffers[dataIndexes.get(i)];
       while (b.hasRemaining()) {
         int read = stream.read(b);
         if (read == EOF) {
+          // We should not reach EOF, as the block should have enough data to
+          // fill the buffer. If the block does not, then it indicates the block
+          // is not as long as it should be, based on the block length stored in
+          // OM. Therefore if there is any remaining space in the buffer, we
+          // should throw an exception.
+          if (b.hasRemaining()) {
+            throw new IOException("Expected to read " + b.remaining() +
+                " bytes from block " + getBlockID() + " EC index " + (i + 1) +
+                " but reached EOF");
+          }
           break;
         }
       }
@@ -412,4 +438,15 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
         "Use readStripe() instead");
   }
 
+  @Override
+  public synchronized void seek(long pos) throws IOException {
+    if (pos % getStripeSize() != 0) {
+      // As this reader can only return full stripes, we only seek to the start
+      // stripe offsets
+      throw new IOException("Requested position " + pos
+          + " does not align with a stripe offset");
+    }
+    super.seek(pos);
+  }
+
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
index 172aafe..bf44297 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
@@ -54,7 +54,6 @@ import java.util.function.Function;
  */
 public class TestECBlockReconstructedStripeInputStream {
 
-
   private static final int ONEMB = 1024 * 1024;
 
   private ECReplicationConfig repConfig;
@@ -323,6 +322,144 @@ public class TestECBlockReconstructedStripeInputStream {
     }
   }
 
+  @Test
+  public void testErrorThrownIfBlockNotLongEnough() throws IOException {
+    int blockLength = repConfig.getEcChunkSize() - 1;
+    ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(), 3 * ONEMB);
+    // First buffer has only the blockLength, the other two will have no data.
+    dataBufs[0].limit(blockLength);
+    dataBufs[1].limit(0);
+    dataBufs[2].limit(0);
+    for (ByteBuffer b : dataBufs) {
+      randomFill(b);
+    }
+    ByteBuffer[] parity = generateParity(dataBufs, repConfig);
+    addDataStreamsToFactory(dataBufs, parity);
+
+    // Set the parity buffer limit to be less than the block length
+    parity[0].limit(blockLength - 1);
+    parity[1].limit(blockLength - 1);
+
+    ByteBuffer[] bufs = allocateByteBuffers(repConfig);
+    // We have a length that is less than a single chunk, so blocks 2 and 3
+    // are padding and will not be present. Block 1 is lost and needs recovered
+    // from the parity and padded blocks 2 and 3.
+    Map<DatanodeDetails, Integer> dnMap = createIndexMap(4, 5);
+    OmKeyLocationInfo keyInfo =
+        createKeyInfo(repConfig, blockLength, dnMap);
+    streamFactory.setCurrentPipeline(keyInfo.getPipeline());
+    try (ECBlockReconstructedStripeInputStream ecb =
+             new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
+                 null, null, streamFactory)) {
+      try {
+        ecb.readStripe(bufs);
+        Assert.fail("Read should have thrown an exception");
+      } catch (IOException e) {
+        Assert.assertTrue(e.getMessage().matches("^Expected\\sto\\sread.+"));
+      }
+    }
+  }
+
+  @Test
+  public void testSeek() throws IOException {
+    // Generate the input data for 3 full stripes and generate the parity
+    // and a partial stripe
+    int chunkSize = repConfig.getEcChunkSize();
+    int partialStripeSize = chunkSize * 2 - 1;
+    int dataLength = stripeSize() * 3 + partialStripeSize;
+    ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(), 4 * chunkSize);
+    dataBufs[1].limit(4 * chunkSize - 1);
+    dataBufs[2].limit(3 * chunkSize);
+    for (ByteBuffer b : dataBufs) {
+      randomFill(b);
+    }
+    ByteBuffer[] parity = generateParity(dataBufs, repConfig);
+
+    List<Map<DatanodeDetails, Integer>> locations = new ArrayList<>();
+    // Two data missing
+    locations.add(createIndexMap(1, 4, 5));
+    // One data missing
+    locations.add(createIndexMap(1, 2, 4, 5));
+    // Two data missing including first
+    locations.add(createIndexMap(2, 4, 5));
+    // One data and one parity missing
+    locations.add(createIndexMap(2, 3, 4));
+
+    for (Map<DatanodeDetails, Integer> dnMap : locations) {
+      streamFactory = new TestBlockInputStreamFactory();
+      addDataStreamsToFactory(dataBufs, parity);
+
+      OmKeyLocationInfo keyInfo = createKeyInfo(repConfig,
+          stripeSize() * 3 + partialStripeSize, dnMap);
+      streamFactory.setCurrentPipeline(keyInfo.getPipeline());
+
+      ByteBuffer[] bufs = allocateByteBuffers(repConfig);
+      try (ECBlockReconstructedStripeInputStream ecb =
+               new ECBlockReconstructedStripeInputStream(repConfig, keyInfo,
+                   true, null, null, streamFactory)) {
+        // Read Stripe 1
+        int read = ecb.readStripe(bufs);
+        for (int j = 0; j < bufs.length; j++) {
+          validateContents(dataBufs[j], bufs[j], 0, chunkSize);
+        }
+        Assert.assertEquals(stripeSize(), read);
+        Assert.assertEquals(dataLength - stripeSize(), ecb.getRemaining());
+
+        // Seek to 0 and read again
+        clearBuffers(bufs);
+        ecb.seek(0);
+        ecb.readStripe(bufs);
+        for (int j = 0; j < bufs.length; j++) {
+          validateContents(dataBufs[j], bufs[j], 0, chunkSize);
+        }
+        Assert.assertEquals(stripeSize(), read);
+        Assert.assertEquals(dataLength - stripeSize(), ecb.getRemaining());
+
+        // Seek to the last stripe
+        // Seek to the last stripe
+        clearBuffers(bufs);
+        ecb.seek(stripeSize() * 3);
+        read = ecb.readStripe(bufs);
+        validateContents(dataBufs[0], bufs[0], 3 * chunkSize, chunkSize);
+        validateContents(dataBufs[1], bufs[1], 3 * chunkSize, chunkSize - 1);
+        Assert.assertEquals(0, bufs[2].remaining());
+        Assert.assertEquals(partialStripeSize, read);
+        Assert.assertEquals(0, ecb.getRemaining());
+
+        // seek to the start of stripe 3
+        clearBuffers(bufs);
+        ecb.seek(stripeSize() * 2);
+        read = ecb.readStripe(bufs);
+        for (int j = 0; j < bufs.length; j++) {
+          validateContents(dataBufs[j], bufs[j], 2 * chunkSize, chunkSize);
+        }
+        Assert.assertEquals(stripeSize(), read);
+        Assert.assertEquals(partialStripeSize, ecb.getRemaining());
+      }
+    }
+  }
+
+  @Test
+  public void testSeekToPartialOffsetFails() {
+    Map<DatanodeDetails, Integer> dnMap = createIndexMap(1, 4, 5);
+    OmKeyLocationInfo keyInfo = createKeyInfo(repConfig,
+        stripeSize() * 3, dnMap);
+    streamFactory.setCurrentPipeline(keyInfo.getPipeline());
+
+    try (ECBlockReconstructedStripeInputStream ecb =
+             new ECBlockReconstructedStripeInputStream(repConfig, keyInfo,
+                 true, null, null, streamFactory)) {
+      try {
+        ecb.seek(10);
+        Assert.fail("Seek should have thrown an exception");
+      } catch (IOException e) {
+        Assert.assertEquals("Requested position 10 does not align " +
+            "with a stripe offset", e.getMessage());
+      }
+    }
+  }
+
+
   private void addDataStreamsToFactory(ByteBuffer[] data, ByteBuffer[] parity) {
     List<ByteBuffer> dataStreams = new ArrayList<>();
     for (ByteBuffer b : data) {
@@ -596,6 +733,11 @@ public class TestECBlockReconstructedStripeInputStream {
       return data.position();
     }
 
+    @Override
+    public void seek(long pos) {
+      data.position((int)pos);
+    }
+
   }
 
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org