You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by so...@apache.org on 2021/11/11 11:17:19 UTC
[ozone] branch HDDS-3816-ec updated: HDDS-5950. EC: Implement seek
on ECBlockReconstructedStripeInputStream (#2818)
This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
new f85ac37 HDDS-5950. EC: Implement seek on ECBlockReconstructedStripeInputStream (#2818)
f85ac37 is described below
commit f85ac376e67daaa7191abb3de902c16c913440f9
Author: Stephen O'Donnell <st...@gmail.com>
AuthorDate: Thu Nov 11 11:16:58 2021 +0000
HDDS-5950. EC: Implement seek on ECBlockReconstructedStripeInputStream (#2818)
---
.../hadoop/ozone/client/io/ECBlockInputStream.java | 33 +++--
.../io/ECBlockReconstructedStripeInputStream.java | 47 ++++++-
.../TestECBlockReconstructedStripeInputStream.java | 144 ++++++++++++++++++++-
3 files changed, 204 insertions(+), 20 deletions(-)
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
index 52b370b..e6e7ccc 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
@@ -282,19 +282,8 @@ public class ECBlockInputStream extends BlockExtendedInputStream {
return blockInfo.getBlockID();
}
- /**
- * Read the most allowable amount of data from the current stream. This
- * ensures we don't read past the end of an EC cell or the overall block
- * group length.
- * @param stream Stream to read from
- * @param strategy The ReaderStrategy to read data into
- * @return
- * @throws IOException
- */
- private int readFromStream(BlockExtendedInputStream stream,
- ByteReaderStrategy strategy)
- throws IOException {
- long partialPosition = position % ecChunkSize;
+ protected void seekStreamIfNecessary(BlockExtendedInputStream stream,
+ long partialChunkSize) throws IOException {
if (seeked) {
// Seek on the underlying streams is performed lazily, as there is a
// possibility a read after a seek may only read a small amount of data.
@@ -302,7 +291,7 @@ public class ECBlockInputStream extends BlockExtendedInputStream {
// but in the usual case, where there are no seeks at all, we don't need
// to do this extra work.
long basePosition = (position / stripeSize) * (long)ecChunkSize;
- long streamPosition = basePosition + partialPosition;
+ long streamPosition = basePosition + partialChunkSize;
if (streamPosition != stream.getPos()) {
// This ECBlockInputStream has been seeked, so the underlying
// block stream is no longer at the correct position. Therefore we need
@@ -310,6 +299,22 @@ public class ECBlockInputStream extends BlockExtendedInputStream {
stream.seek(streamPosition);
}
}
+ }
+
+ /**
+ * Read the most allowable amount of data from the current stream. This
+ * ensures we don't read past the end of an EC cell or the overall block
+ * group length.
+ * @param stream Stream to read from
+ * @param strategy The ReaderStrategy to read data into
+ * @return
+ * @throws IOException
+ */
+ private int readFromStream(BlockExtendedInputStream stream,
+ ByteReaderStrategy strategy)
+ throws IOException {
+ long partialPosition = position % ecChunkSize;
+ seekStreamIfNecessary(stream, partialPosition);
long ecLimit = ecChunkSize - partialPosition;
// Free space in the buffer to read into
long bufLimit = strategy.getTargetLength();
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
index 674c1c6..54e2de0 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
@@ -212,11 +212,16 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
validateBuffers(bufs);
assignBuffers(bufs);
clearParityBuffers();
+ // Set the read limits on the buffers so we do not read any garbage data
+ // from the end of the block that is unexpected.
+ setBufferReadLimits(bufs, toRead);
loadDataBuffersFromStream();
padBuffers(toRead);
flipInputs();
decodeStripe();
- unPadBuffers(bufs, toRead);
+ // Reset the buffer positions after padding to the read limits so the client
+ // reads the correct amount of data.
+ setBufferReadLimits(bufs, toRead);
setPos(getPos() + toRead);
return toRead;
}
@@ -233,11 +238,11 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
int dataNum = getRepConfig().getData();
int parityNum = getRepConfig().getParity();
int chunkSize = getRepConfig().getEcChunkSize();
- int fullChunks = toRead / chunkSize;
- if (fullChunks == dataNum) {
+ if (toRead >= getStripeSize()) {
// There is no padding to do - we are reading a full stripe.
return;
}
+ int fullChunks = toRead / chunkSize;
// The size of each chunk is governed by the size of the first chunk.
// The parity always matches the first chunk size.
int paritySize = Math.min(toRead, chunkSize);
@@ -264,22 +269,32 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
}
}
- private void unPadBuffers(ByteBuffer[] bufs, int toRead) {
+ private void setBufferReadLimits(ByteBuffer[] bufs, int toRead) {
int chunkSize = getRepConfig().getEcChunkSize();
int fullChunks = toRead / chunkSize;
- int remainingLength = toRead % chunkSize;
if (fullChunks == getRepConfig().getData()) {
// We are reading a full stripe, no concerns over padding.
return;
}
if (fullChunks == 0){
+ bufs[0].limit(toRead);
// All buffers except the first contain no data.
for (int i = 1; i < bufs.length; i++) {
bufs[i].position(0);
bufs[i].limit(0);
}
+ // If we have less than 1 chunk, then the parity buffers are the size
+ // of the first chunk.
+ for (int i = getRepConfig().getData();
+ i < getRepConfig().getRequiredNodes(); i++) {
+ ByteBuffer b = decoderInputBuffers[i];
+ if (b != null) {
+ b.limit(toRead);
+ }
+ }
} else {
+ int remainingLength = toRead % chunkSize;
// The first partial has the remaining length
bufs[fullChunks].limit(remainingLength);
// All others have a zero limit
@@ -354,10 +369,21 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
for (int i = 0; i < dataIndexes.size(); i++) {
BlockExtendedInputStream stream =
getOrOpenStream(i, dataIndexes.get(i));
+ seekStreamIfNecessary(stream, 0);
ByteBuffer b = decoderInputBuffers[dataIndexes.get(i)];
while (b.hasRemaining()) {
int read = stream.read(b);
if (read == EOF) {
+ // We should not reach EOF, as the block should have enough data to
+ // fill the buffer. If the block does not, then it indicates the block
+ // is not as long as it should be, based on the block length stored in
+ // OM. Therefore if there is any remaining space in the buffer, we
+ // should throw an exception.
+ if (b.hasRemaining()) {
+ throw new IOException("Expected to read " + b.remaining() +
+ " bytes from block " + getBlockID() + " EC index " + (i + 1) +
+ " but reached EOF");
+ }
break;
}
}
@@ -412,4 +438,15 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
"Use readStripe() instead");
}
+ @Override
+ public synchronized void seek(long pos) throws IOException {
+ if (pos % getStripeSize() != 0) {
+ // As this reader can only return full stripes, we only seek to the start
+ // stripe offsets
+ throw new IOException("Requested position " + pos
+ + " does not align with a stripe offset");
+ }
+ super.seek(pos);
+ }
+
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
index 172aafe..bf44297 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
@@ -54,7 +54,6 @@ import java.util.function.Function;
*/
public class TestECBlockReconstructedStripeInputStream {
-
private static final int ONEMB = 1024 * 1024;
private ECReplicationConfig repConfig;
@@ -323,6 +322,144 @@ public class TestECBlockReconstructedStripeInputStream {
}
}
+ @Test
+ public void testErrorThrownIfBlockNotLongEnough() throws IOException {
+ int blockLength = repConfig.getEcChunkSize() - 1;
+ ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(), 3 * ONEMB);
+ // First buffer has only the blockLength, the other two will have no data.
+ dataBufs[0].limit(blockLength);
+ dataBufs[1].limit(0);
+ dataBufs[2].limit(0);
+ for (ByteBuffer b : dataBufs) {
+ randomFill(b);
+ }
+ ByteBuffer[] parity = generateParity(dataBufs, repConfig);
+ addDataStreamsToFactory(dataBufs, parity);
+
+ // Set the parity buffer limit to be less than the block length
+ parity[0].limit(blockLength - 1);
+ parity[1].limit(blockLength - 1);
+
+ ByteBuffer[] bufs = allocateByteBuffers(repConfig);
+ // We have a length that is less than a single chunk, so blocks 2 and 3
+ // are padding and will not be present. Block 1 is lost and needs recovered
+ // from the parity and padded blocks 2 and 3.
+ Map<DatanodeDetails, Integer> dnMap = createIndexMap(4, 5);
+ OmKeyLocationInfo keyInfo =
+ createKeyInfo(repConfig, blockLength, dnMap);
+ streamFactory.setCurrentPipeline(keyInfo.getPipeline());
+ try (ECBlockReconstructedStripeInputStream ecb =
+ new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
+ null, null, streamFactory)) {
+ try {
+ ecb.readStripe(bufs);
+ Assert.fail("Read should have thrown an exception");
+ } catch (IOException e) {
+ Assert.assertTrue(e.getMessage().matches("^Expected\\sto\\sread.+"));
+ }
+ }
+ }
+
+ @Test
+ public void testSeek() throws IOException {
+ // Generate the input data for 3 full stripes and generate the parity
+ // and a partial stripe
+ int chunkSize = repConfig.getEcChunkSize();
+ int partialStripeSize = chunkSize * 2 - 1;
+ int dataLength = stripeSize() * 3 + partialStripeSize;
+ ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(), 4 * chunkSize);
+ dataBufs[1].limit(4 * chunkSize - 1);
+ dataBufs[2].limit(3 * chunkSize);
+ for (ByteBuffer b : dataBufs) {
+ randomFill(b);
+ }
+ ByteBuffer[] parity = generateParity(dataBufs, repConfig);
+
+ List<Map<DatanodeDetails, Integer>> locations = new ArrayList<>();
+ // Two data missing
+ locations.add(createIndexMap(1, 4, 5));
+ // One data missing
+ locations.add(createIndexMap(1, 2, 4, 5));
+ // Two data missing including first
+ locations.add(createIndexMap(2, 4, 5));
+ // One data and one parity missing
+ locations.add(createIndexMap(2, 3, 4));
+
+ for (Map<DatanodeDetails, Integer> dnMap : locations) {
+ streamFactory = new TestBlockInputStreamFactory();
+ addDataStreamsToFactory(dataBufs, parity);
+
+ OmKeyLocationInfo keyInfo = createKeyInfo(repConfig,
+ stripeSize() * 3 + partialStripeSize, dnMap);
+ streamFactory.setCurrentPipeline(keyInfo.getPipeline());
+
+ ByteBuffer[] bufs = allocateByteBuffers(repConfig);
+ try (ECBlockReconstructedStripeInputStream ecb =
+ new ECBlockReconstructedStripeInputStream(repConfig, keyInfo,
+ true, null, null, streamFactory)) {
+ // Read Stripe 1
+ int read = ecb.readStripe(bufs);
+ for (int j = 0; j < bufs.length; j++) {
+ validateContents(dataBufs[j], bufs[j], 0, chunkSize);
+ }
+ Assert.assertEquals(stripeSize(), read);
+ Assert.assertEquals(dataLength - stripeSize(), ecb.getRemaining());
+
+ // Seek to 0 and read again
+ clearBuffers(bufs);
+ ecb.seek(0);
+ ecb.readStripe(bufs);
+ for (int j = 0; j < bufs.length; j++) {
+ validateContents(dataBufs[j], bufs[j], 0, chunkSize);
+ }
+ Assert.assertEquals(stripeSize(), read);
+ Assert.assertEquals(dataLength - stripeSize(), ecb.getRemaining());
+
+ // Seek to the last stripe
+ // Seek to the last stripe
+ clearBuffers(bufs);
+ ecb.seek(stripeSize() * 3);
+ read = ecb.readStripe(bufs);
+ validateContents(dataBufs[0], bufs[0], 3 * chunkSize, chunkSize);
+ validateContents(dataBufs[1], bufs[1], 3 * chunkSize, chunkSize - 1);
+ Assert.assertEquals(0, bufs[2].remaining());
+ Assert.assertEquals(partialStripeSize, read);
+ Assert.assertEquals(0, ecb.getRemaining());
+
+ // seek to the start of stripe 3
+ clearBuffers(bufs);
+ ecb.seek(stripeSize() * 2);
+ read = ecb.readStripe(bufs);
+ for (int j = 0; j < bufs.length; j++) {
+ validateContents(dataBufs[j], bufs[j], 2 * chunkSize, chunkSize);
+ }
+ Assert.assertEquals(stripeSize(), read);
+ Assert.assertEquals(partialStripeSize, ecb.getRemaining());
+ }
+ }
+ }
+
+ @Test
+ public void testSeekToPartialOffsetFails() {
+ Map<DatanodeDetails, Integer> dnMap = createIndexMap(1, 4, 5);
+ OmKeyLocationInfo keyInfo = createKeyInfo(repConfig,
+ stripeSize() * 3, dnMap);
+ streamFactory.setCurrentPipeline(keyInfo.getPipeline());
+
+ try (ECBlockReconstructedStripeInputStream ecb =
+ new ECBlockReconstructedStripeInputStream(repConfig, keyInfo,
+ true, null, null, streamFactory)) {
+ try {
+ ecb.seek(10);
+ Assert.fail("Seek should have thrown an exception");
+ } catch (IOException e) {
+ Assert.assertEquals("Requested position 10 does not align " +
+ "with a stripe offset", e.getMessage());
+ }
+ }
+ }
+
+
private void addDataStreamsToFactory(ByteBuffer[] data, ByteBuffer[] parity) {
List<ByteBuffer> dataStreams = new ArrayList<>();
for (ByteBuffer b : data) {
@@ -596,6 +733,11 @@ public class TestECBlockReconstructedStripeInputStream {
return data.position();
}
+ @Override
+ public void seek(long pos) {
+ data.position((int)pos);
+ }
+
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org