You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by um...@apache.org on 2022/03/23 16:31:36 UTC

[ozone] branch HDDS-3816-ec updated: HDDS-6448. EC: Reconstructed Input Streams should free resources after reading to end of block (#3197)

This is an automated email from the ASF dual-hosted git repository.

umamahesh pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
     new fc6ac0c  HDDS-6448. EC: Reconstructed Input Streams should free resources after reading to end of block (#3197)
fc6ac0c is described below

commit fc6ac0c48c078e0baf560d2bbe03b7bdb70985fc
Author: Stephen O'Donnell <st...@gmail.com>
AuthorDate: Wed Mar 23 16:31:12 2022 +0000

    HDDS-6448. EC: Reconstructed Input Streams should free resources after reading to end of block (#3197)
---
 .../hadoop/ozone/client/io/ECBlockInputStream.java | 19 ++++++---
 .../client/io/ECBlockReconstructedInputStream.java | 25 ++++++++++-
 .../io/ECBlockReconstructedStripeInputStream.java  | 34 ++++++++++++---
 .../ozone/client/rpc/read/ECStreamTestUtil.java    | 19 +++++++--
 .../read/TestECBlockReconstructedInputStream.java  | 49 ++++++++++++++++++++++
 .../TestECBlockReconstructedStripeInputStream.java | 37 ++++++++++------
 6 files changed, 155 insertions(+), 28 deletions(-)

diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
index 06e6844..405182f 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
@@ -356,16 +356,25 @@ public class ECBlockInputStream extends BlockExtendedInputStream {
 
   @Override
   public synchronized void close() {
-    for (BlockExtendedInputStream stream : blockStreams) {
-      if (stream != null) {
+    closeStreams();
+    closed = true;
+  }
+
+  protected synchronized void closeStreams() {
+    for (int i = 0; i < blockStreams.length; i++) {
+      if (blockStreams[i] != null) {
         try {
-          stream.close();
+          blockStreams[i].close();
+          blockStreams[i] = null;
         } catch (IOException e) {
-          LOG.error("Failed to close stream {}", stream, e);
+          LOG.error("Failed to close stream {}", blockStreams[i], e);
         }
       }
     }
-    closed = true;
+    // If the streams have been closed outside of a close() call, then it may
+    // be due to freeing resources. If they are reopened, then we will need to
+    // seek the stream to its expected position when the next read is attempted.
+    seeked = true;
   }
 
   @Override
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java
index 034a3a2..fe93b2e 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedInputStream.java
@@ -38,6 +38,7 @@ public class ECBlockReconstructedInputStream extends BlockExtendedInputStream {
   private ByteBuffer[] bufs;
   private final ByteBufferPool byteBufferPool;
   private boolean closed = false;
+  private boolean unBuffered = false;
 
   private long position = 0;
 
@@ -73,10 +74,14 @@ public class ECBlockReconstructedInputStream extends BlockExtendedInputStream {
   @Override
   public synchronized int read(ByteBuffer buf) throws IOException {
     ensureNotClosed();
-    allocateBuffers();
     if (!hasRemaining()) {
       return EOF;
     }
+    allocateBuffers();
+    if (unBuffered) {
+      seek(getPos());
+      unBuffered = false;
+    }
     int totalRead = 0;
     while (buf.hasRemaining() && getRemaining() > 0) {
       ByteBuffer b = selectNextBuffer();
@@ -88,6 +93,14 @@ public class ECBlockReconstructedInputStream extends BlockExtendedInputStream {
       long read = readBufferToDest(b, buf);
       totalRead += read;
     }
+    if (!hasRemaining()) {
+      // We have reached the end of the block. While the block is still open
+      // and could be seeked back, it is most likely the block will be closed.
+      // KeyInputStream does not call close on the block until all blocks in the
+      // key have been read, so releasing the resources here helps to avoid
+      // excessive memory usage.
+      freeBuffers();
+    }
     return totalRead;
   }
 
@@ -131,6 +144,8 @@ public class ECBlockReconstructedInputStream extends BlockExtendedInputStream {
   @Override
   public synchronized void unbuffer() {
     stripeReader.unbuffer();
+    freeBuffers();
+    unBuffered = true;
   }
 
   @Override
@@ -141,13 +156,18 @@ public class ECBlockReconstructedInputStream extends BlockExtendedInputStream {
   @Override
   public synchronized void close() throws IOException {
     stripeReader.close();
+    freeBuffers();
+    closed = true;
+  }
+
+  private void freeBuffers() {
     if (bufs != null) {
       for (int i = 0; i < bufs.length; i++) {
         byteBufferPool.putBuffer(bufs[i]);
         bufs[i] = null;
       }
+      bufs = null;
     }
-    closed = true;
   }
 
   @Override
@@ -174,6 +194,7 @@ public class ECBlockReconstructedInputStream extends BlockExtendedInputStream {
   }
 
   private void readAndSeekStripe(int offset) throws IOException {
+    allocateBuffers();
     readStripe();
     if (offset == 0) {
       return;
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
index 2b8d231..d5ec6db 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
@@ -138,7 +138,7 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
    *
    * @param dns A list of DatanodeDetails that are known to be bad.
    */
-  public void addFailedDatanodes(List<DatanodeDetails> dns) {
+  public synchronized void addFailedDatanodes(List<DatanodeDetails> dns) {
     if (initialized) {
       throw new RuntimeException("Cannot add failed datanodes after the " +
           "reader has been initialized");
@@ -154,7 +154,7 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
     }
   }
 
-  protected void init() throws InsufficientLocationsException {
+  private void init() throws InsufficientLocationsException {
     if (decoder == null) {
       decoder = CodecUtil.createRawDecoderWithFallback(getRepConfig());
     }
@@ -258,13 +258,13 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
    * @throws IOException
    */
   public synchronized int readStripe(ByteBuffer[] bufs) throws IOException {
-    if (!initialized) {
-      init();
-    }
     int toRead = (int)Math.min(getRemaining(), getStripeSize());
     if (toRead == 0) {
       return EOF;
     }
+    if (!initialized) {
+      init();
+    }
     validateBuffers(bufs);
     while (true) {
       try {
@@ -305,6 +305,13 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
       flipInputs();
     }
     setPos(getPos() + toRead);
+    if (remaining() == 0) {
+      // If we reach the end of the block (ie remaining is zero) we free
+      // the underlying streams and buffers. This is because KeyInputStream,
+      // which reads from the EC streams does not close the blocks until it has
+      // read all blocks in the key.
+      freeAllResourcesWithoutClosing();
+    }
     return toRead;
   }
 
@@ -578,6 +585,16 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
   @Override
   public synchronized void close() {
     super.close();
+    freeBuffers();
+  }
+
+  @Override
+  public synchronized void unbuffer() {
+    super.unbuffer();
+    freeBuffers();
+  }
+
+  private void freeBuffers() {
     // Inside this class, we only allocate buffers to read parity into. Data
     // is reconstructed or read into a set of buffers passed in from the calling
     // class. Therefore we only need to ensure we free the parity buffers here.
@@ -591,6 +608,13 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
         }
       }
     }
+    initialized = false;
+  }
+
+  private void freeAllResourcesWithoutClosing() throws IOException {
+    LOG.debug("Freeing all resources while leaving the block open");
+    freeBuffers();
+    closeStreams();
   }
 
   @Override
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/ECStreamTestUtil.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/ECStreamTestUtil.java
index b8b9977..0503dbe 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/ECStreamTestUtil.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/ECStreamTestUtil.java
@@ -41,10 +41,13 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.SplittableRandom;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * Utility class providing methods useful in EC tests.
@@ -218,7 +221,8 @@ public final class ECStreamTestUtil {
   public static class TestBlockInputStreamFactory implements
       BlockInputStreamFactory {
 
-    private List<TestBlockInputStream> blockStreams = new ArrayList<>();
+    private Map<Integer, TestBlockInputStream> blockStreams =
+        new LinkedHashMap<>();
     private List<ByteBuffer> blockStreamData;
     // List of EC indexes that should fail immediately on read
     private List<Integer> failIndexes = new ArrayList<>();
@@ -227,7 +231,16 @@ public final class ECStreamTestUtil {
 
     public synchronized
         List<ECStreamTestUtil.TestBlockInputStream> getBlockStreams() {
-      return blockStreams;
+      return blockStreams.values().stream().collect(Collectors.toList());
+    }
+
+    public synchronized Set<Integer> getStreamIndexes() {
+      return blockStreams.keySet();
+    }
+
+    public synchronized ECStreamTestUtil.TestBlockInputStream getBlockStream(
+        int ecIndex) {
+      return blockStreams.get(ecIndex);
     }
 
     public synchronized void setBlockStreamData(List<ByteBuffer> bufs) {
@@ -256,7 +269,7 @@ public final class ECStreamTestUtil {
       if (failIndexes.contains(repInd)) {
         stream.setShouldError(true);
       }
-      blockStreams.add(stream);
+      blockStreams.put(repInd, stream);
       return stream;
     }
   }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedInputStream.java
index d625b76..ee3f50a 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedInputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedInputStream.java
@@ -142,6 +142,55 @@ public class TestECBlockReconstructedInputStream {
         b.clear();
         long read = stream.read(b);
         Assert.assertEquals(-1, read);
+        // Seek back to zero and read again to ensure the buffers are
+        // re-allocated after being freed at the end of block.
+        stream.seek(0);
+        read = stream.read(b);
+        Assert.assertEquals(readBufferSize, read);
+        dataGenerator = new SplittableRandom(randomSeed);
+        ECStreamTestUtil.assertBufferMatches(b, dataGenerator);
+      }
+    }
+  }
+
+
+  @Test
+  public void testReadDataWithUnbuffer() throws IOException {
+    // Read buffer is 16kb + 5 bytes so it does not align with stripes exactly
+    int readBufferSize = random.nextInt(1024 * 16 + 5);
+    // 3 stripes and a partial chunk
+    int blockLength = repConfig.getEcChunkSize() * repConfig.getData() * 3
+        + repConfig.getEcChunkSize() - 1;
+    ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(),
+        repConfig.getEcChunkSize() * 4);
+    ECStreamTestUtil.randomFill(dataBufs, repConfig.getEcChunkSize(),
+        dataGenerator, blockLength);
+    ByteBuffer[] parity = generateParity(dataBufs, repConfig);
+    addDataStreamsToFactory(dataBufs, parity);
+
+    Map<DatanodeDetails, Integer> dnMap =
+        ECStreamTestUtil.createIndexMap(1, 2, 4, 5);
+    try (ECBlockReconstructedStripeInputStream stripeStream
+             = createStripeInputStream(dnMap, blockLength)) {
+      try (ECBlockReconstructedInputStream stream =
+               new ECBlockReconstructedInputStream(repConfig, bufferPool,
+                   stripeStream)) {
+        ByteBuffer b = ByteBuffer.allocate(readBufferSize);
+        int totalRead = 0;
+        dataGenerator = new SplittableRandom(randomSeed);
+        while (totalRead < blockLength) {
+          int expectedRead = Math.min(blockLength - totalRead, readBufferSize);
+          long read = stream.read(b);
+          totalRead += read;
+          Assert.assertEquals(expectedRead, read);
+          ECStreamTestUtil.assertBufferMatches(b, dataGenerator);
+          b.clear();
+          stream.unbuffer();
+        }
+        // Next read should be EOF
+        b.clear();
+        long read = stream.read(b);
+        Assert.assertEquals(-1, read);
       }
     }
   }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
index a5ea3a4..0eff8f9 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestECBlockReconstructedStripeInputStream.java
@@ -36,8 +36,11 @@ import org.junit.Test;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
+import java.util.Set;
 import java.util.SplittableRandom;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -474,6 +477,11 @@ public class TestECBlockReconstructedStripeInputStream {
     }
   }
 
+  private Integer getRandomStreamIndex(Set<Integer> set) {
+    return set.stream().skip(new Random().nextInt(set.size()))
+        .findFirst().orElse(null);
+  }
+
   @Test
   public void testErrorReadingBlockContinuesReading() throws IOException {
     // Generate the input data for 3 full stripes and generate the parity.
@@ -487,15 +495,8 @@ public class TestECBlockReconstructedStripeInputStream {
         .randomFill(dataBufs, repConfig.getEcChunkSize(), dataGen, blockLength);
     ByteBuffer[] parity = generateParity(dataBufs, repConfig);
 
-    List<List<Integer>> failLists = new ArrayList<>();
-    failLists.add(indexesToList(0, 1));
-    // These will be the first parity read and then the next parity read as a
-    // replacement
-    failLists.add(indexesToList(2, 3));
-    // First parity and then the data block
-    failLists.add(indexesToList(2, 0));
-
-    for (List<Integer> failList : failLists) {
+    for (int k = 0; k < 5; k++) {
+      Set<Integer> failed = new HashSet<>();
       streamFactory = new TestBlockInputStreamFactory();
       addDataStreamsToFactory(dataBufs, parity);
 
@@ -519,8 +520,11 @@ public class TestECBlockReconstructedStripeInputStream {
           Assert.assertEquals(stripeSize(), read);
           clearBuffers(bufs);
           if (i == 0) {
-            streamFactory.getBlockStreams().get(failList.remove(0))
+            Integer failStream =
+                getRandomStreamIndex(streamFactory.getStreamIndexes());
+            streamFactory.getBlockStream(failStream)
                 .setShouldError(true);
+            failed.add(failStream);
           }
         }
         // The next read is a partial stripe
@@ -531,10 +535,17 @@ public class TestECBlockReconstructedStripeInputStream {
         Assert.assertEquals(0, bufs[2].remaining());
         Assert.assertEquals(0, bufs[2].position());
 
-        // seek back to zero, make another block fail. The next read should
-        // error as there are not enough blocks to read.
+        // seek back to zero and read a stripe to re-open the streams
         ecb.seek(0);
-        streamFactory.getBlockStreams().get(failList.remove(0))
+        clearBuffers(bufs);
+        ecb.readStripe(bufs);
+        // Now fail another random stream and the read should fail with
+        // insufficient locations
+        Set<Integer> currentStreams =
+            new HashSet<>(streamFactory.getStreamIndexes());
+        currentStreams.removeAll(failed);
+        Integer failStream = getRandomStreamIndex(currentStreams);
+        streamFactory.getBlockStream(failStream)
             .setShouldError(true);
         try {
           clearBuffers(bufs);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org