You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by mt...@apache.org on 2021/09/07 10:22:12 UTC

[hadoop] branch branch-3.3 updated: HADOOP-17156. ABFS: Release the byte buffers held by input streams in close() (#3285)

This is an automated email from the ASF dual-hosted git repository.

mthakur pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 3b1c594  HADOOP-17156. ABFS: Release the byte buffers held by input streams in close() (#3285)
3b1c594 is described below

commit 3b1c59435523418c09160db8a1e01fea9eaa2611
Author: Mukund Thakur <mt...@cloudera.com>
AuthorDate: Tue Sep 7 15:13:36 2021 +0530

    HADOOP-17156. ABFS: Release the byte buffers held by input streams in close() (#3285)
    
    
    Contributed By: Mukund Thakur
---
 hadoop-tools/hadoop-azure/pom.xml                  |   2 +
 .../fs/azurebfs/services/AbfsInputStream.java      |   3 +-
 .../fs/azurebfs/services/ReadBufferManager.java    |  85 ++++++++--
 .../azurebfs/services/ITestReadBufferManager.java  | 172 +++++++++++++++++++++
 4 files changed, 250 insertions(+), 12 deletions(-)

diff --git a/hadoop-tools/hadoop-azure/pom.xml b/hadoop-tools/hadoop-azure/pom.xml
index a72644a..f610295 100644
--- a/hadoop-tools/hadoop-azure/pom.xml
+++ b/hadoop-tools/hadoop-azure/pom.xml
@@ -555,6 +555,7 @@
                     <exclude>**/azurebfs/ITestAzureBlobFileSystemListStatus.java</exclude>
                     <exclude>**/azurebfs/extensions/ITestAbfsDelegationTokens.java</exclude>
                     <exclude>**/azurebfs/ITestSmallWriteOptimization.java</exclude>
+                    <exclude>**/azurebfs/services/ITestReadBufferManager.java</exclude>
                   </excludes>
 
                 </configuration>
@@ -595,6 +596,7 @@
                     <include>**/azurebfs/ITestAzureBlobFileSystemListStatus.java</include>
                     <include>**/azurebfs/extensions/ITestAbfsDelegationTokens.java</include>
                     <include>**/azurebfs/ITestSmallWriteOptimization.java</include>
+                    <include>**/azurebfs/services/ITestReadBufferManager.java</include>
                   </includes>
                 </configuration>
               </execution>
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
index 5dd5eb7..165ebc2 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
@@ -668,9 +668,10 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
 
   @Override
   public synchronized void close() throws IOException {
+    LOG.debug("Closing {}", this);
     closed = true;
     buffer = null; // de-reference the buffer so it can be GC'ed sooner
-    LOG.debug("Closing {}", this);
+    ReadBufferManager.getBufferManager().purgeBuffersForStream(this);
   }
 
   /**
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
index f330d79..2826a23 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
@@ -24,7 +24,9 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Queue;
 import java.util.Stack;
 import java.util.concurrent.CountDownLatch;
@@ -452,18 +454,23 @@ final class ReadBufferManager {
           buffer.getStream().getPath(),  buffer.getOffset(), bytesActuallyRead);
     }
     synchronized (this) {
-      inProgressList.remove(buffer);
-      if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
-        buffer.setStatus(ReadBufferStatus.AVAILABLE);
-        buffer.setLength(bytesActuallyRead);
-      } else {
-        freeList.push(buffer.getBufferindex());
-        // buffer will be deleted as per the eviction policy.
+      // If this buffer has already been purged during
+      // close of InputStream then we don't update the lists.
+      if (inProgressList.contains(buffer)) {
+        inProgressList.remove(buffer);
+        if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
+          buffer.setStatus(ReadBufferStatus.AVAILABLE);
+          buffer.setLength(bytesActuallyRead);
+        } else {
+          freeList.push(buffer.getBufferindex());
+          // buffer will be deleted as per the eviction policy.
+        }
+        // completed list also contains FAILED read buffers
+        // for sending exception message to clients.
+        buffer.setStatus(result);
+        buffer.setTimeStamp(currentTimeMillis());
+        completedReadList.add(buffer);
       }
-
-      buffer.setStatus(result);
-      buffer.setTimeStamp(currentTimeMillis());
-      completedReadList.add(buffer);
     }
 
     //outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results
@@ -499,10 +506,66 @@ final class ReadBufferManager {
   }
 
   @VisibleForTesting
+  public synchronized List<ReadBuffer> getCompletedReadListCopy() {
+    return new ArrayList<>(completedReadList);
+  }
+
+  @VisibleForTesting
+  public synchronized List<Integer> getFreeListCopy() {
+    return new ArrayList<>(freeList);
+  }
+
+  @VisibleForTesting
+  public synchronized List<ReadBuffer> getReadAheadQueueCopy() {
+    return new ArrayList<>(readAheadQueue);
+  }
+
+  @VisibleForTesting
+  public synchronized List<ReadBuffer> getInProgressCopiedList() {
+    return new ArrayList<>(inProgressList);
+  }
+
+  @VisibleForTesting
   void callTryEvict() {
     tryEvict();
   }
 
+
+  /**
+   * Purging the buffers associated with an {@link AbfsInputStream}
+   * from {@link ReadBufferManager} when stream is closed.
+   * @param stream input stream.
+   */
+  public synchronized void purgeBuffersForStream(AbfsInputStream stream) {
+    LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream);
+    readAheadQueue.removeIf(readBuffer -> readBuffer.getStream() == stream);
+    purgeList(stream, completedReadList);
+    purgeList(stream, inProgressList);
+  }
+
+  /**
+   * Method to remove buffers associated with a {@link AbfsInputStream}
+   * when its close method is called.
+   * NOTE: This method is not threadsafe and must be called inside a
+   * synchronised block. See caller.
+   * @param stream associated input stream.
+   * @param list list of buffers like {@link this#completedReadList}
+   *             or {@link this#inProgressList}.
+   */
+  private void purgeList(AbfsInputStream stream, LinkedList<ReadBuffer> list) {
+    for (Iterator<ReadBuffer> it = list.iterator(); it.hasNext();) {
+      ReadBuffer readBuffer = it.next();
+      if (readBuffer.getStream() == stream) {
+        it.remove();
+        // As failed ReadBuffers (bufferIndex = -1) are already pushed to free
+        // list in doneReading method, we will skip adding those here again.
+        if (readBuffer.getBufferindex() != -1) {
+          freeList.push(readBuffer.getBufferindex());
+        }
+      }
+    }
+  }
+
   /**
    * Test method that can clean up the current state of readAhead buffers and
    * the lists. Will also trigger a fresh init.
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
new file mode 100644
index 0000000..705cc25
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.io.IOUtils;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_BUFFER_SIZE;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READ_AHEAD_BLOCK_SIZE;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READ_AHEAD_QUEUE_DEPTH;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
+
+public class ITestReadBufferManager extends AbstractAbfsIntegrationTest {
+
+    public ITestReadBufferManager() throws Exception {
+    }
+
+    @Test
+    public void testPurgeBufferManagerForParallelStreams() throws Exception {
+        describe("Testing purging of buffers from ReadBufferManager for "
+                + "parallel input streams");
+        final int numBuffers = 16;
+        final LinkedList<Integer> freeList = new LinkedList<>();
+        for (int i=0; i < numBuffers; i++) {
+            freeList.add(i);
+        }
+        ExecutorService executorService = Executors.newFixedThreadPool(4);
+        AzureBlobFileSystem fs = getABFSWithReadAheadConfig();
+        try {
+            for (int i = 0; i < 4; i++) {
+                final String fileName = methodName.getMethodName() + i;
+                executorService.submit((Callable<Void>) () -> {
+                    byte[] fileContent = getRandomBytesArray(ONE_MB);
+                    Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+                    try (FSDataInputStream iStream = fs.open(testFilePath)) {
+                        iStream.read();
+                    }
+                    return null;
+                });
+            }
+        } finally {
+            executorService.shutdown();
+        }
+
+        ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
+        assertListEmpty("CompletedList", bufferManager.getCompletedReadListCopy());
+        assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList());
+        assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy());
+        Assertions.assertThat(bufferManager.getFreeListCopy())
+                .describedAs("After closing all streams free list contents should match with " + freeList)
+                .hasSize(numBuffers)
+                .containsExactlyInAnyOrderElementsOf(freeList);
+
+    }
+
+    private void assertListEmpty(String listName, List<ReadBuffer> list) {
+        Assertions.assertThat(list)
+                .describedAs("After closing all streams %s should be empty", listName)
+                .hasSize(0);
+    }
+
+    @Test
+    public void testPurgeBufferManagerForSequentialStream() throws Exception {
+        describe("Testing purging of buffers in ReadBufferManager for "
+                + "sequential input streams");
+        AzureBlobFileSystem fs = getABFSWithReadAheadConfig();
+        final String fileName = methodName.getMethodName();
+        byte[] fileContent = getRandomBytesArray(ONE_MB);
+        Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+
+        AbfsInputStream iStream1 =  null;
+        // stream1 will be closed right away.
+        try {
+            iStream1 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream();
+            // Just reading one byte will trigger all read ahead calls.
+            iStream1.read();
+        } finally {
+            IOUtils.closeStream(iStream1);
+        }
+        ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
+        AbfsInputStream iStream2 = null;
+        try {
+            iStream2 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream();
+            iStream2.read();
+            // After closing stream1, none of the buffers associated with stream1 should be present.
+            assertListDoesnotContainBuffersForIstream(bufferManager.getInProgressCopiedList(), iStream1);
+            assertListDoesnotContainBuffersForIstream(bufferManager.getCompletedReadListCopy(), iStream1);
+            assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream1);
+        } finally {
+            // closing the stream later.
+            IOUtils.closeStream(iStream2);
+        }
+        // After closing stream2, none of the buffers associated with stream2 should be present.
+        assertListDoesnotContainBuffersForIstream(bufferManager.getInProgressCopiedList(), iStream2);
+        assertListDoesnotContainBuffersForIstream(bufferManager.getCompletedReadListCopy(), iStream2);
+        assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(), iStream2);
+
+        // After closing both the streams, all lists should be empty.
+        assertListEmpty("CompletedList", bufferManager.getCompletedReadListCopy());
+        assertListEmpty("InProgressList", bufferManager.getInProgressCopiedList());
+        assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy());
+
+    }
+
+
+    private void assertListDoesnotContainBuffersForIstream(List<ReadBuffer> list,
+                                                           AbfsInputStream inputStream) {
+        for (ReadBuffer buffer : list) {
+            Assertions.assertThat(buffer.getStream())
+                    .describedAs("Buffers associated with closed input streams shouldn't be present")
+                    .isNotEqualTo(inputStream);
+        }
+    }
+
+    private AzureBlobFileSystem getABFSWithReadAheadConfig() throws Exception {
+        Configuration conf = getRawConfiguration();
+        conf.setLong(FS_AZURE_READ_AHEAD_QUEUE_DEPTH, 8);
+        conf.setInt(AZURE_READ_BUFFER_SIZE, MIN_BUFFER_SIZE);
+        conf.setInt(FS_AZURE_READ_AHEAD_BLOCK_SIZE, MIN_BUFFER_SIZE);
+        return (AzureBlobFileSystem) FileSystem.newInstance(conf);
+    }
+
+    protected byte[] getRandomBytesArray(int length) {
+        final byte[] b = new byte[length];
+        new Random().nextBytes(b);
+        return b;
+    }
+
+    protected Path createFileWithContent(FileSystem fs, String fileName,
+                                         byte[] fileContent) throws IOException {
+        Path testFilePath = path(fileName);
+        try (FSDataOutputStream oStream = fs.create(testFilePath)) {
+            oStream.write(fileContent);
+            oStream.flush();
+        }
+        return testFilePath;
+    }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org