You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by um...@apache.org on 2022/03/23 16:31:36 UTC
[ozone] branch HDDS-3816-ec updated: HDDS-6448. EC: Reconstructed Input Streams should free resources after reading to end of block (#3197)
This is an automated email from the ASF dual-hosted git repository.
umamahesh pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
new fc6ac0c HDDS-6448. EC: Reconstructed Input Streams should free resources after reading to end of block (#3197)
fc6ac0c is described below
commit fc6ac0c48c078e0baf560d2bbe03b7bdb70985fc
Author: Stephen O'Donnell <st...@gmail.com>
AuthorDate: Wed Mar 23 16:31:12 2022 +0000
HDDS-6448. EC: Reconstructed Input Streams should free resources after reading to end of block (#3197)
---
.../hadoop/ozone/client/io/ECBlockInputStream.java | 19 ++++++---
.../client/io/ECBlockReconstructedInputStream.java | 25 ++++++++++-
.../io/ECBlockReconstructedStripeInputStream.java | 34 ++++++++++++---
.../ozone/client/rpc/read/ECStreamTestUtil.java | 19 +++++++--
.../read/TestECBlockReconstructedInputStream.java | 49 ++++++++++++++++++++++
.../TestECBlockReconstructedStripeInputStream.java | 37 ++++++++++------
6 files changed, 155 insertions(+), 28 deletions(-)
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
index 06e6844..405182f 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
@@ -356,16 +356,25 @@ public class ECBlockInputStream extends BlockExtendedInputStream {
@Override
public synchronized void close() {
- for (BlockExtendedInputStream stream : blockStreams) {
- if (stream != null) {
+ closeStreams();
+ closed = true;
+ }
+
+ protected synchronized void closeStreams() {
+ for (int i = 0; i < blockStreams.length; i++) {
+ if (blockStreams[i] != null) {
try {
- stream.close();
+ blockStreams[i].close();
+ blockStreams[i] = null;
} catch (IOException e) {
- LOG.error("Failed to close stream {}", stream, e);
+ LOG.error("Failed to close stream {}", blockStreams[i], e);
}
}
}
- closed = true;
+ // If the streams have been closed outside of a close() call, then it may
+ // be due to freeing resources. If they are reopened, then we will need to
+ // seek the stream to its expected position when the next read is attempted.
+ seeked = true;
}
@Override
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java
index 034a3a2..fe93b2e 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java
@@ -38,6 +38,7 @@ public class ECBlockReconstructedInputStream extends BlockExtendedInputStream {
private ByteBuffer[] bufs;
private final ByteBufferPool byteBufferPool;
private boolean closed = false;
+ private boolean unBuffered = false;
private long position = 0;
@@ -73,10 +74,14 @@ public class ECBlockReconstructedInputStream extends BlockExtendedInputStream {
@Override
public synchronized int read(ByteBuffer buf) throws IOException {
ensureNotClosed();
- allocateBuffers();
if (!hasRemaining()) {
return EOF;
}
+ allocateBuffers();
+ if (unBuffered) {
+ seek(getPos());
+ unBuffered = false;
+ }
int totalRead = 0;
while (buf.hasRemaining() && getRemaining() > 0) {
ByteBuffer b = selectNextBuffer();
@@ -88,6 +93,14 @@ public class ECBlockReconstructedInputStream extends BlockExtendedInputStream {
long read = readBufferToDest(b, buf);
totalRead += read;
}
+ if (!hasRemaining()) {
+ // We have reached the end of the block. While the block is still open
+ // and could be seeked back, it is most likely the block will be closed.
+ // KeyInputStream does not call close on the block until all blocks in the
+ // key have been read, so releasing the resources here helps to avoid
+ // excessive memory usage.
+ freeBuffers();
+ }
return totalRead;
}
@@ -131,6 +144,8 @@ public class ECBlockReconstructedInputStream extends BlockExtendedInputStream {
@Override
public synchronized void unbuffer() {
stripeReader.unbuffer();
+ freeBuffers();
+ unBuffered = true;
}
@Override
@@ -141,13 +156,18 @@ public class ECBlockReconstructedInputStream extends BlockExtendedInputStream {
@Override
public synchronized void close() throws IOException {
stripeReader.close();
+ freeBuffers();
+ closed = true;
+ }
+
+ private void freeBuffers() {
if (bufs != null) {
for (int i = 0; i < bufs.length; i++) {
byteBufferPool.putBuffer(bufs[i]);
bufs[i] = null;
}
+ bufs = null;
}
- closed = true;
}
@Override
@@ -174,6 +194,7 @@ public class ECBlockReconstructedInputStream extends BlockExtendedInputStream {
}
private void readAndSeekStripe(int offset) throws IOException {
+ allocateBuffers();
readStripe();
if (offset == 0) {
return;
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
index 2b8d231..d5ec6db 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
@@ -138,7 +138,7 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
*
* @param dns A list of DatanodeDetails that are known to be bad.
*/
- public void addFailedDatanodes(List<DatanodeDetails> dns) {
+ public synchronized void addFailedDatanodes(List<DatanodeDetails> dns) {
if (initialized) {
throw new RuntimeException("Cannot add failed datanodes after the " +
"reader has been initialized");
@@ -154,7 +154,7 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
}
}
- protected void init() throws InsufficientLocationsException {
+ private void init() throws InsufficientLocationsException {
if (decoder == null) {
decoder = CodecUtil.createRawDecoderWithFallback(getRepConfig());
}
@@ -258,13 +258,13 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
* @throws IOException
*/
public synchronized int readStripe(ByteBuffer[] bufs) throws IOException {
- if (!initialized) {
- init();
- }
int toRead = (int)Math.min(getRemaining(), getStripeSize());
if (toRead == 0) {
return EOF;
}
+ if (!initialized) {
+ init();
+ }
validateBuffers(bufs);
while (true) {
try {
@@ -305,6 +305,13 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
flipInputs();
}
setPos(getPos() + toRead);
+ if (remaining() == 0) {
+ // If we reach the end of the block (ie remaining is zero) we free
+ // the underlying streams and buffers. This is because KeyInputStream,
+ // which reads from the EC streams does not close the blocks until it has
+ // read all blocks in the key.
+ freeAllResourcesWithoutClosing();
+ }
return toRead;
}
@@ -578,6 +585,16 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
@Override
public synchronized void close() {
super.close();
+ freeBuffers();
+ }
+
+ @Override
+ public synchronized void unbuffer() {
+ super.unbuffer();
+ freeBuffers();
+ }
+
+ private void freeBuffers() {
// Inside this class, we only allocate buffers to read parity into. Data
// is reconstructed or read into a set of buffers passed in from the calling
// class. Therefore we only need to ensure we free the parity buffers here.
@@ -591,6 +608,13 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
}
}
}
+ initialized = false;
+ }
+
+ private void freeAllResourcesWithoutClosing() throws IOException {
+ LOG.debug("Freeing all resources while leaving the block open");
+ freeBuffers();
+ closeStreams();
}
@Override
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/ECStreamTestUtil.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/ECStreamTestUtil.java
index b8b9977..0503dbe 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/ECStreamTestUtil.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/ECStreamTestUtil.java
@@ -41,10 +41,13 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.SplittableRandom;
import java.util.function.Function;
+import java.util.stream.Collectors;
/**
* Utility class providing methods useful in EC tests.
@@ -218,7 +221,8 @@ public final class ECStreamTestUtil {
public static class TestBlockInputStreamFactory implements
BlockInputStreamFactory {
- private List<TestBlockInputStream> blockStreams = new ArrayList<>();
+ private Map<Integer, TestBlockInputStream> blockStreams =
+ new LinkedHashMap<>();
private List<ByteBuffer> blockStreamData;
// List of EC indexes that should fail immediately on read
private List<Integer> failIndexes = new ArrayList<>();
@@ -227,7 +231,16 @@ public final class ECStreamTestUtil {
public synchronized
List<ECStreamTestUtil.TestBlockInputStream> getBlockStreams() {
- return blockStreams;
+ return blockStreams.values().stream().collect(Collectors.toList());
+ }
+
+ public synchronized Set<Integer> getStreamIndexes() {
+ return blockStreams.keySet();
+ }
+
+ public synchronized ECStreamTestUtil.TestBlockInputStream getBlockStream(
+ int ecIndex) {
+ return blockStreams.get(ecIndex);
}
public synchronized void setBlockStreamData(List<ByteBuffer> bufs) {
@@ -256,7 +269,7 @@ public final class ECStreamTestUtil {
if (failIndexes.contains(repInd)) {
stream.setShouldError(true);
}
- blockStreams.add(stream);
+ blockStreams.put(repInd, stream);
return stream;
}
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedInputStream.java
index d625b76..ee3f50a 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedInputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedInputStream.java
@@ -142,6 +142,55 @@ public class TestECBlockReconstructedInputStream {
b.clear();
long read = stream.read(b);
Assert.assertEquals(-1, read);
+ // Seek back to zero and read again to ensure the buffers are
+ // re-allocated after being freed at the end of block.
+ stream.seek(0);
+ read = stream.read(b);
+ Assert.assertEquals(readBufferSize, read);
+ dataGenerator = new SplittableRandom(randomSeed);
+ ECStreamTestUtil.assertBufferMatches(b, dataGenerator);
+ }
+ }
+ }
+
+
+ @Test
+ public void testReadDataWithUnbuffer() throws IOException {
+ // Read buffer is 16kb + 5 bytes so it does not align with stripes exactly
+ int readBufferSize = random.nextInt(1024 * 16 + 5);
+ // 3 stripes and a partial chunk
+ int blockLength = repConfig.getEcChunkSize() * repConfig.getData() * 3
+ + repConfig.getEcChunkSize() - 1;
+ ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(),
+ repConfig.getEcChunkSize() * 4);
+ ECStreamTestUtil.randomFill(dataBufs, repConfig.getEcChunkSize(),
+ dataGenerator, blockLength);
+ ByteBuffer[] parity = generateParity(dataBufs, repConfig);
+ addDataStreamsToFactory(dataBufs, parity);
+
+ Map<DatanodeDetails, Integer> dnMap =
+ ECStreamTestUtil.createIndexMap(1, 2, 4, 5);
+ try (ECBlockReconstructedStripeInputStream stripeStream
+ = createStripeInputStream(dnMap, blockLength)) {
+ try (ECBlockReconstructedInputStream stream =
+ new ECBlockReconstructedInputStream(repConfig, bufferPool,
+ stripeStream)) {
+ ByteBuffer b = ByteBuffer.allocate(readBufferSize);
+ int totalRead = 0;
+ dataGenerator = new SplittableRandom(randomSeed);
+ while (totalRead < blockLength) {
+ int expectedRead = Math.min(blockLength - totalRead, readBufferSize);
+ long read = stream.read(b);
+ totalRead += read;
+ Assert.assertEquals(expectedRead, read);
+ ECStreamTestUtil.assertBufferMatches(b, dataGenerator);
+ b.clear();
+ stream.unbuffer();
+ }
+ // Next read should be EOF
+ b.clear();
+ long read = stream.read(b);
+ Assert.assertEquals(-1, read);
}
}
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
index a5ea3a4..0eff8f9 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
@@ -36,8 +36,11 @@ import org.junit.Test;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Random;
+import java.util.Set;
import java.util.SplittableRandom;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -474,6 +477,11 @@ public class TestECBlockReconstructedStripeInputStream {
}
}
+ private Integer getRandomStreamIndex(Set<Integer> set) {
+ return set.stream().skip(new Random().nextInt(set.size()))
+ .findFirst().orElse(null);
+ }
+
@Test
public void testErrorReadingBlockContinuesReading() throws IOException {
// Generate the input data for 3 full stripes and generate the parity.
@@ -487,15 +495,8 @@ public class TestECBlockReconstructedStripeInputStream {
.randomFill(dataBufs, repConfig.getEcChunkSize(), dataGen, blockLength);
ByteBuffer[] parity = generateParity(dataBufs, repConfig);
- List<List<Integer>> failLists = new ArrayList<>();
- failLists.add(indexesToList(0, 1));
- // These will be the first parity read and then the next parity read as a
- // replacement
- failLists.add(indexesToList(2, 3));
- // First parity and then the data block
- failLists.add(indexesToList(2, 0));
-
- for (List<Integer> failList : failLists) {
+ for (int k = 0; k < 5; k++) {
+ Set<Integer> failed = new HashSet<>();
streamFactory = new TestBlockInputStreamFactory();
addDataStreamsToFactory(dataBufs, parity);
@@ -519,8 +520,11 @@ public class TestECBlockReconstructedStripeInputStream {
Assert.assertEquals(stripeSize(), read);
clearBuffers(bufs);
if (i == 0) {
- streamFactory.getBlockStreams().get(failList.remove(0))
+ Integer failStream =
+ getRandomStreamIndex(streamFactory.getStreamIndexes());
+ streamFactory.getBlockStream(failStream)
.setShouldError(true);
+ failed.add(failStream);
}
}
// The next read is a partial stripe
@@ -531,10 +535,17 @@ public class TestECBlockReconstructedStripeInputStream {
Assert.assertEquals(0, bufs[2].remaining());
Assert.assertEquals(0, bufs[2].position());
- // seek back to zero, make another block fail. The next read should
- // error as there are not enough blocks to read.
+ // seek back to zero and read a stripe to re-open the streams
ecb.seek(0);
- streamFactory.getBlockStreams().get(failList.remove(0))
+ clearBuffers(bufs);
+ ecb.readStripe(bufs);
+ // Now fail another random stream and the read should fail with
+ // insufficient locations
+ Set<Integer> currentStreams =
+ new HashSet<>(streamFactory.getStreamIndexes());
+ currentStreams.removeAll(failed);
+ Integer failStream = getRandomStreamIndex(currentStreams);
+ streamFactory.getBlockStream(failStream)
.setShouldError(true);
try {
clearBuffers(bufs);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org