You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2022/07/29 09:54:42 UTC

[hive] branch master updated: HIVE-26428: Limit usage of LLAP BPWrapper to threads of IO threadpools (Adam Szita, reviewed by Laszlo Pinter)

This is an automated email from the ASF dual-hosted git repository.

szita pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new ea59d0b6871 HIVE-26428: Limit usage of LLAP BPWrapper to threads of IO threadpools (Adam Szita, reviewed by Laszlo Pinter)
ea59d0b6871 is described below

commit ea59d0b6871672d5c03d0e6031bf537d6044ed25
Author: Adam Szita <40...@users.noreply.github.com>
AuthorDate: Fri Jul 29 11:54:31 2022 +0200

    HIVE-26428: Limit usage of LLAP BPWrapper to threads of IO threadpools (Adam Szita, reviewed by Laszlo Pinter)
---
 .../hive/llap/cache/LowLevelLrfuCachePolicy.java   | 149 +++++++++++----------
 .../hive/llap/daemon/impl/LlapPooledIOThread.java  |  29 ++++
 .../hadoop/hive/llap/io/api/impl/LlapIoImpl.java   |   8 +-
 .../llap/cache/TestLowLevelLrfuCachePolicy.java    |  71 +++++-----
 4 files changed, 155 insertions(+), 102 deletions(-)

diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
index 813a6abe93e..4c40c3102e1 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.LlapUtil;
 import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
+import org.apache.hadoop.hive.llap.daemon.impl.LlapPooledIOThread;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
 import org.apache.hadoop.hive.llap.io.metadata.MetadataCache;
 import org.apache.hadoop.hive.llap.io.metadata.MetadataCache.LlapMetadataBuffer;
@@ -215,84 +216,96 @@ public final class LowLevelLrfuCachePolicy extends ProactiveEvictingCachePolicy.
     if (proactiveEvictionEnabled && !instantProactiveEviction) {
       buffer.removeProactiveEvictionMark();
     }
-    BPWrapper bpWrapper = threadLocalBPWrapper.get();
 
-    // This will only block in a very very rare scenario only.
-    bpWrapper.lock.lock();
-    try {
-      final LlapCacheableBuffer[] cacheableBuffers = bpWrapper.buffers;
-      if (bpWrapper.count < maxQueueSize) {
-        cacheableBuffers[bpWrapper.count] = buffer;
-        ++bpWrapper.count;
-      }
-      if (bpWrapper.count <= maxQueueSize / 2) {
-        // case too early to flush
-        return;
-      }
+    if (Thread.currentThread() instanceof LlapPooledIOThread) {
+      BPWrapper bpWrapper = threadLocalBPWrapper.get();
 
-      if (bpWrapper.count == maxQueueSize) {
-        // case we have to flush thus block on heap lock
-        bpWrapper.flush();
-        return;
+      // This will only block in a very very rare scenario only.
+      bpWrapper.lock.lock();
+      try {
+        final LlapCacheableBuffer[] cacheableBuffers = bpWrapper.buffers;
+        if (bpWrapper.count < maxQueueSize) {
+          cacheableBuffers[bpWrapper.count] = buffer;
+          ++bpWrapper.count;
+        }
+        if (bpWrapper.count <= maxQueueSize / 2) {
+          // case too early to flush
+          return;
+        }
+
+        if (bpWrapper.count == maxQueueSize) {
+          // case we have to flush thus block on heap lock
+          bpWrapper.flush();
+          return;
+        }
+        bpWrapper.tryFlush(); //case 50% < queue usage < 100%, flush is preferred but not required yet
+      } finally {
+        bpWrapper.lock.unlock();
+      }
+    } else {
+      heapLock.lock();
+      try {
+        doNotifyUnderHeapLock(buffer);
+      } finally {
+        heapLock.unlock();
       }
-      bpWrapper.tryFlush(); //case 50% < queue usage < 100%, flush is preferred but not required yet
-    } finally {
-      bpWrapper.lock.unlock();
     }
   }
 
   private void doNotifyUnderHeapLock(int count, LlapCacheableBuffer[] cacheableBuffers) {
-    LlapCacheableBuffer buffer;
     for (int i = 0; i < count; i++) {
-      buffer = cacheableBuffers[i];
-      long time = timer.incrementAndGet();
-      if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
-        LlapIoImpl.CACHE_LOGGER.trace("Touching {} at {}", buffer, time);
-      }
-      // First, update buffer priority - we have just been using it.
-      buffer.priority = (buffer.lastUpdate == -1) ? F0
-          : touchPriority(time, buffer.lastUpdate, buffer.priority);
-      buffer.lastUpdate = time;
-      // Then, if the buffer was in the list, remove it.
-      if (buffer.indexInHeap == LlapCacheableBuffer.IN_LIST) {
-        listLock.lock();
-        removeFromListAndUnlock(buffer);
-      }
-      // The only concurrent change that can happen when we hold the heap lock is list removal;
-      // we have just ensured the item is not in the list, so we have a definite state now.
-      if (buffer.indexInHeap >= 0) {
-        // The buffer has lived in the heap all along. Restore heap property.
-        heapifyDownUnderLock(buffer, time);
-      } else if (heapSize == heap.length) {
-        // The buffer is not in the (full) heap. Demote the top item of the heap into the list.
-        LlapCacheableBuffer demoted = heap[0];
-        listLock.lock();
-        try {
-          assert demoted.indexInHeap == 0; // Noone could have moved it, we have the heap lock.
-          demoted.indexInHeap = LlapCacheableBuffer.IN_LIST;
-          demoted.prev = null;
-          if (listHead != null) {
-            demoted.next = listHead;
-            listHead.prev = demoted;
-            listHead = demoted;
-          } else {
-            listHead = demoted;
-            listTail = demoted;
-            demoted.next = null;
-          }
-        } finally {
-          listLock.unlock();
+      doNotifyUnderHeapLock(cacheableBuffers[i]);
+    }
+  }
+
+  private void doNotifyUnderHeapLock(LlapCacheableBuffer buffer) {
+    long time = timer.incrementAndGet();
+    if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
+      LlapIoImpl.CACHE_LOGGER.trace("Touching {} at {}", buffer, time);
+    }
+    // First, update buffer priority - we have just been using it.
+    buffer.priority = (buffer.lastUpdate == -1) ? F0
+        : touchPriority(time, buffer.lastUpdate, buffer.priority);
+    buffer.lastUpdate = time;
+    // Then, if the buffer was in the list, remove it.
+    if (buffer.indexInHeap == LlapCacheableBuffer.IN_LIST) {
+      listLock.lock();
+      removeFromListAndUnlock(buffer);
+    }
+    // The only concurrent change that can happen when we hold the heap lock is list removal;
+    // we have just ensured the item is not in the list, so we have a definite state now.
+    if (buffer.indexInHeap >= 0) {
+      // The buffer has lived in the heap all along. Restore heap property.
+      heapifyDownUnderLock(buffer, time);
+    } else if (heapSize == heap.length) {
+      // The buffer is not in the (full) heap. Demote the top item of the heap into the list.
+      LlapCacheableBuffer demoted = heap[0];
+      listLock.lock();
+      try {
+        assert demoted.indexInHeap == 0; // Noone could have moved it, we have the heap lock.
+        demoted.indexInHeap = LlapCacheableBuffer.IN_LIST;
+        demoted.prev = null;
+        if (listHead != null) {
+          demoted.next = listHead;
+          listHead.prev = demoted;
+          listHead = demoted;
+        } else {
+          listHead = demoted;
+          listTail = demoted;
+          demoted.next = null;
         }
-        // Now insert the new buffer in its place and restore heap property.
-        buffer.indexInHeap = 0;
-        heapifyDownUnderLock(buffer, time);
-      } else {
-        // Heap is not full, add the buffer to the heap and restore heap property up.
-        assert heapSize < heap.length : heap.length + " < " + heapSize;
-        buffer.indexInHeap = heapSize;
-        heapifyUpUnderLock(buffer, time);
-        ++heapSize;
+      } finally {
+        listLock.unlock();
       }
+      // Now insert the new buffer in its place and restore heap property.
+      buffer.indexInHeap = 0;
+      heapifyDownUnderLock(buffer, time);
+    } else {
+      // Heap is not full, add the buffer to the heap and restore heap property up.
+      assert heapSize < heap.length : heap.length + " < " + heapSize;
+      buffer.indexInHeap = heapSize;
+      heapifyUpUnderLock(buffer, time);
+      ++heapSize;
     }
   }
 
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapPooledIOThread.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapPooledIOThread.java
new file mode 100644
index 00000000000..6acb2ec7279
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapPooledIOThread.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.daemon.impl;
+
+/**
+ * Marker class for threads created for the purpose of doing IO work in LLAP. Currently used by LRFU cache policy to
+ * distinguish from ephemeral threads (e.g. TezTR) and decide whether ThreadLocal-based features are applicable or not.
+ */
+public class LlapPooledIOThread extends Thread {
+
+  public LlapPooledIOThread(Runnable runnable) {
+    super(runnable);
+  }
+}
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index 65bbc4b4cff..ecf8d2575c9 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -36,7 +36,9 @@ import org.apache.hadoop.hive.llap.cache.LlapCacheHydration;
 import org.apache.hadoop.hive.llap.cache.MemoryLimitedPathCache;
 import org.apache.hadoop.hive.llap.cache.PathCache;
 import org.apache.hadoop.hive.llap.cache.ProactiveEvictingCachePolicy;
+import org.apache.hadoop.hive.llap.daemon.impl.LlapPooledIOThread;
 import org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -237,14 +239,16 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch>, LlapIoDebugDump {
     int numThreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_IO_THREADPOOL_SIZE);
     executor = new StatsRecordingThreadPool(numThreads, numThreads, 0L, TimeUnit.MILLISECONDS,
         new LinkedBlockingQueue<Runnable>(),
-        new ThreadFactoryBuilder().setNameFormat("IO-Elevator-Thread-%d").setDaemon(true).build());
+        new ThreadFactoryBuilder().setNameFormat("IO-Elevator-Thread-%d").setDaemon(true)
+            .setThreadFactory(r -> new LlapPooledIOThread(r)).build());
     tracePool = IoTrace.createTracePool(conf);
     if (isEncodeEnabled) {
       int encodePoolMultiplier = HiveConf.getIntVar(conf, ConfVars.LLAP_IO_ENCODE_THREADPOOL_MULTIPLIER);
       int encodeThreads = numThreads * encodePoolMultiplier;
       encodeExecutor = new StatsRecordingThreadPool(encodeThreads, encodeThreads, 0L, TimeUnit.MILLISECONDS,
           new LinkedBlockingQueue<Runnable>(),
-          new ThreadFactoryBuilder().setNameFormat("IO-Elevator-Thread-OrcEncode-%d").setDaemon(true).build());
+          new ThreadFactoryBuilder().setNameFormat("IO-Elevator-Thread-OrcEncode-%d").setDaemon(true)
+              .setThreadFactory(r -> new LlapPooledIOThread(r)).build());
     } else {
       encodeExecutor = null;
     }
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
index 17f49782caa..8c38ac10cdf 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
@@ -44,6 +44,7 @@ import java.util.stream.IntStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
+import org.apache.hadoop.hive.llap.daemon.impl.LlapPooledIOThread;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
 import org.junit.Assume;
 import org.junit.Test;
@@ -272,40 +273,46 @@ public class TestLowLevelLrfuCachePolicy {
   }
 
   @Test
-  public void testBPWrapperFlush() {
-    int heapSize = 20;
-    LOG.info("Testing bp wrapper flush logic");
-    ArrayList<LlapDataBuffer> inserted = new ArrayList<LlapDataBuffer>(heapSize);
-    EvictionTracker et = new EvictionTracker();
-    Configuration conf = new Configuration();
-    conf.setInt(HiveConf.ConfVars.LLAP_LRFU_BP_WRAPPER_SIZE.varname, 10);
-    LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(1, heapSize, conf);
-    LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager(heapSize, lrfu,
-        LlapDaemonCacheMetrics.create("test", "1"));
-    lrfu.setEvictionListener(et);
-
-    // Test with 4 buffers: they should all remain in BP wrapper and not go to heap upon insertion.
-    // .. but after purging, they need to show up as 4 evicted bytes.
-    for (int i = 0; i < 4; ++i) {
-      LlapDataBuffer buffer = LowLevelCacheImpl.allocateFake();
-      assertTrue(cache(mm, lrfu, et, buffer));
-      inserted.add(buffer);
-    }
-    assertArrayEquals(new long[] {0, 0, 0, 0, 0, 0, 0, 4, 4, 4, 0}, lrfu.metrics.getUsageStats());
-    assertEquals(4, mm.purge());
-    assertArrayEquals(new long[] {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, lrfu.metrics.getUsageStats());
+  public void testBPWrapperFlush() throws Exception {
+    // LlapPooledIOThread type of thread is needed in order to verify BPWrapper functionality as it is deliberately
+    // turned off for other threads, potentially ephemeral in nature.
+    LlapPooledIOThread thread = new LlapPooledIOThread(() -> {
+      int heapSize = 20;
+      LOG.info("Testing bp wrapper flush logic");
+      ArrayList<LlapDataBuffer> inserted = new ArrayList<LlapDataBuffer>(heapSize);
+      EvictionTracker et = new EvictionTracker();
+      Configuration conf = new Configuration();
+      conf.setInt(HiveConf.ConfVars.LLAP_LRFU_BP_WRAPPER_SIZE.varname, 10);
+      LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(1, heapSize, conf);
+      LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager(heapSize, lrfu,
+          LlapDaemonCacheMetrics.create("test", "1"));
+      lrfu.setEvictionListener(et);
+
+      // Test with 4 buffers: they should all remain in BP wrapper and not go to heap upon insertion.
+      // .. but after purging, they need to show up as 4 evicted bytes.
+      for (int i = 0; i < 4; ++i) {
+        LlapDataBuffer buffer = LowLevelCacheImpl.allocateFake();
+        assertTrue(cache(mm, lrfu, et, buffer));
+        inserted.add(buffer);
+      }
+      assertArrayEquals(new long[] {0, 0, 0, 0, 0, 0, 0, 4, 4, 4, 0}, lrfu.metrics.getUsageStats());
+      assertEquals(4, mm.purge());
+      assertArrayEquals(new long[] {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, lrfu.metrics.getUsageStats());
 
-    // Testing with 8 buffers: on the 6th buffer BP wrapper content should be flushed into heap, next 2 buffers won't
-    for (int i = 0; i < 8; ++i) {
-      LlapDataBuffer buffer = LowLevelCacheImpl.allocateFake();
-      assertTrue(cache(mm, lrfu, et, buffer));
-      inserted.add(buffer);
-    }
-    assertArrayEquals(new long[] {6, 0, 0, 0, 0, 0, 0, 2, 2, 2, 0}, lrfu.metrics.getUsageStats());
-    assertEquals(8, mm.purge());
-    assertArrayEquals(new long[] {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, lrfu.metrics.getUsageStats());
+      // Testing with 8 buffers: on the 6th buffer BP wrapper content should be flushed into heap, next 2 buffers won't
+      for (int i = 0; i < 8; ++i) {
+        LlapDataBuffer buffer = LowLevelCacheImpl.allocateFake();
+        assertTrue(cache(mm, lrfu, et, buffer));
+        inserted.add(buffer);
+      }
+      assertArrayEquals(new long[] {6, 0, 0, 0, 0, 0, 0, 2, 2, 2, 0}, lrfu.metrics.getUsageStats());
+      assertEquals(8, mm.purge());
+      assertArrayEquals(new long[] {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, lrfu.metrics.getUsageStats());
 
-    assertTrue(et.evicted.containsAll(inserted));
+      assertTrue(et.evicted.containsAll(inserted));
+    });
+    thread.start();
+    thread.join(30000);
   }
 
   @Test