You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by "virajjasani (via GitHub)" <gi...@apache.org> on 2023/06/24 07:01:51 UTC

[GitHub] [hadoop] virajjasani commented on a diff in pull request #5754: HADOOP-18291. S3A prefetch - Implement thread-safe LRU cache for SingleFilePerBlockCache

virajjasani commented on code in PR #5754:
URL: https://github.com/apache/hadoop/pull/5754#discussion_r1240621453


##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java:
##########
@@ -247,9 +305,46 @@ private Entry getEntry(int blockNumber) {
       throw new IllegalStateException(String.format("block %d not found in cache", blockNumber));
     }
     numGets++;
+    addToHeadOfLinkedList(entry);
     return entry;
   }
 
+  /**
+   * Add the given entry to the head of the linked list.
+   *
+   * @param entry Block entry to add.
+   */
+  private void addToHeadOfLinkedList(Entry entry) {
+    blocksLock.writeLock().lock();
+    try {

Review Comment:
   done, added on L305, and yes i agree that it can be helpful, in fact i added it for testing purpose anyways



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java:
##########
@@ -247,9 +305,46 @@ private Entry getEntry(int blockNumber) {
       throw new IllegalStateException(String.format("block %d not found in cache", blockNumber));
     }
     numGets++;
+    addToHeadOfLinkedList(entry);
     return entry;
   }
 
+  /**
+   * Add the given entry to the head of the linked list.
+   *
+   * @param entry Block entry to add.
+   */
+  private void addToHeadOfLinkedList(Entry entry) {
+    blocksLock.writeLock().lock();
+    try {
+      if (head == null) {
+        head = entry;
+        tail = entry;
+      }
+      if (entry != head) {
+        Entry prev = entry.getPrevious();
+        Entry nxt = entry.getNext();
+        if (prev != null) {
+          prev.setNext(nxt);
+        }
+        if (nxt != null) {
+          nxt.setPrevious(prev);
+        }
+        entry.setPrevious(null);
+        entry.setNext(head);
+        head.setPrevious(entry);
+        head = entry;
+      }
+      if (tail != null) {
+        while (tail.getNext() != null) {
+          tail = tail.getNext();
+        }
+      }

Review Comment:
   sure, let's say:
   
   head -> 1, tail -> 2
   
   new block: 3
   so, we need to make: 3 -> 1 -> 2
   i.e. new head -> 3, tail -> 2
   
   new block: 4
   updated list: 4 -> 3 -> 1 -> 2
   
   now let's say input stream accesses block 2, hence block 2 needs to be put at the head.
   new list should be: 2 -> 4 -> 3 -> 1
   
   we change head to 2 and we also update next pointer of block 1
   however, if we don't update tail (L322-L326), we will not be able to move tail to block 1.
   



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java:
##########
@@ -299,9 +395,62 @@ public void put(int blockNumber, ByteBuffer buffer, Configuration conf,
     // Update stream_read_blocks_in_cache stats only after blocks map is updated with new file
     // entry to avoid any discrepancy related to the value of stream_read_blocks_in_cache.
     // If stream_read_blocks_in_cache is updated before updating the blocks map here, closing of
-    // the input stream can lead to the removal of the cache file even before blocks is added with
-    // the new cache file, leading to incorrect value of stream_read_blocks_in_cache.
+    // the input stream can lead to the removal of the cache file even before blocks is added
+    // with the new cache file, leading to incorrect value of stream_read_blocks_in_cache.
     prefetchingStatistics.blockAddedToFileCache();
+    addToLinkedListAndEvictIfRequired(entry);
+  }
+
+  /**
+   * Add the given entry to the head of the linked list and if the LRU cache size
+   * exceeds the max limit, evict tail of the LRU linked list.
+   *
+   * @param entry Block entry to add.
+   */
+  private void addToLinkedListAndEvictIfRequired(Entry entry) {
+    addToHeadOfLinkedList(entry);
+    blocksLock.writeLock().lock();

Review Comment:
   yeah that would still be okay given that eviction would take place anyways



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java:
##########
@@ -299,9 +395,62 @@ public void put(int blockNumber, ByteBuffer buffer, Configuration conf,
     // Update stream_read_blocks_in_cache stats only after blocks map is updated with new file
     // entry to avoid any discrepancy related to the value of stream_read_blocks_in_cache.
     // If stream_read_blocks_in_cache is updated before updating the blocks map here, closing of
-    // the input stream can lead to the removal of the cache file even before blocks is added with
-    // the new cache file, leading to incorrect value of stream_read_blocks_in_cache.
+    // the input stream can lead to the removal of the cache file even before blocks is added
+    // with the new cache file, leading to incorrect value of stream_read_blocks_in_cache.
     prefetchingStatistics.blockAddedToFileCache();
+    addToLinkedListAndEvictIfRequired(entry);
+  }
+
+  /**
+   * Add the given entry to the head of the linked list and if the LRU cache size
+   * exceeds the max limit, evict tail of the LRU linked list.
+   *
+   * @param entry Block entry to add.
+   */
+  private void addToLinkedListAndEvictIfRequired(Entry entry) {
+    addToHeadOfLinkedList(entry);
+    blocksLock.writeLock().lock();
+    try {
+      if (blocks.size() > maxBlocksCount && !closed.get()) {
+        Entry elementToPurge = tail;
+        tail = tail.getPrevious();
+        if (tail == null) {
+          tail = head;
+        }
+        tail.setNext(null);
+        elementToPurge.setPrevious(null);
+        deleteBlockFileAndEvictCache(elementToPurge);
+      }
+    } finally {
+      blocksLock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * Delete cache file as part of the block cache LRU eviction.
+   *
+   * @param elementToPurge Block entry to evict.
+   */
+  private void deleteBlockFileAndEvictCache(Entry elementToPurge) {
+    boolean lockAcquired =
+        elementToPurge.takeLock(Entry.LockType.WRITE, PREFETCH_WRITE_LOCK_TIMEOUT,
+            PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
+    if (!lockAcquired) {
+      LOG.error("Cache file {} deletion would not be attempted as write lock could not"

Review Comment:
   since we are already using 5s at other place also (PREFETCH_WRITE_LOCK_TIMEOUT), used it here as well but happy to change it in future as/if we encounter some problem with this, does that sound good?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org