You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2022/07/29 09:54:42 UTC
[hive] branch master updated: HIVE-26428: Limit usage of LLAP BPWrapper to threads of IO threadpools (Adam Szita, reviewed by Laszlo Pinter)
This is an automated email from the ASF dual-hosted git repository.
szita pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new ea59d0b6871 HIVE-26428: Limit usage of LLAP BPWrapper to threads of IO threadpools (Adam Szita, reviewed by Laszlo Pinter)
ea59d0b6871 is described below
commit ea59d0b6871672d5c03d0e6031bf537d6044ed25
Author: Adam Szita <40...@users.noreply.github.com>
AuthorDate: Fri Jul 29 11:54:31 2022 +0200
HIVE-26428: Limit usage of LLAP BPWrapper to threads of IO threadpools (Adam Szita, reviewed by Laszlo Pinter)
---
.../hive/llap/cache/LowLevelLrfuCachePolicy.java | 149 +++++++++++----------
.../hive/llap/daemon/impl/LlapPooledIOThread.java | 29 ++++
.../hadoop/hive/llap/io/api/impl/LlapIoImpl.java | 8 +-
.../llap/cache/TestLowLevelLrfuCachePolicy.java | 71 +++++-----
4 files changed, 155 insertions(+), 102 deletions(-)
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
index 813a6abe93e..4c40c3102e1 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.LlapUtil;
import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
+import org.apache.hadoop.hive.llap.daemon.impl.LlapPooledIOThread;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
import org.apache.hadoop.hive.llap.io.metadata.MetadataCache;
import org.apache.hadoop.hive.llap.io.metadata.MetadataCache.LlapMetadataBuffer;
@@ -215,84 +216,96 @@ public final class LowLevelLrfuCachePolicy extends ProactiveEvictingCachePolicy.
if (proactiveEvictionEnabled && !instantProactiveEviction) {
buffer.removeProactiveEvictionMark();
}
- BPWrapper bpWrapper = threadLocalBPWrapper.get();
- // This will only block in a very very rare scenario only.
- bpWrapper.lock.lock();
- try {
- final LlapCacheableBuffer[] cacheableBuffers = bpWrapper.buffers;
- if (bpWrapper.count < maxQueueSize) {
- cacheableBuffers[bpWrapper.count] = buffer;
- ++bpWrapper.count;
- }
- if (bpWrapper.count <= maxQueueSize / 2) {
- // case too early to flush
- return;
- }
+ if (Thread.currentThread() instanceof LlapPooledIOThread) {
+ BPWrapper bpWrapper = threadLocalBPWrapper.get();
- if (bpWrapper.count == maxQueueSize) {
- // case we have to flush thus block on heap lock
- bpWrapper.flush();
- return;
+ // This will only block in a very very rare scenario only.
+ bpWrapper.lock.lock();
+ try {
+ final LlapCacheableBuffer[] cacheableBuffers = bpWrapper.buffers;
+ if (bpWrapper.count < maxQueueSize) {
+ cacheableBuffers[bpWrapper.count] = buffer;
+ ++bpWrapper.count;
+ }
+ if (bpWrapper.count <= maxQueueSize / 2) {
+ // case too early to flush
+ return;
+ }
+
+ if (bpWrapper.count == maxQueueSize) {
+ // case we have to flush thus block on heap lock
+ bpWrapper.flush();
+ return;
+ }
+ bpWrapper.tryFlush(); //case 50% < queue usage < 100%, flush is preferred but not required yet
+ } finally {
+ bpWrapper.lock.unlock();
+ }
+ } else {
+ heapLock.lock();
+ try {
+ doNotifyUnderHeapLock(buffer);
+ } finally {
+ heapLock.unlock();
}
- bpWrapper.tryFlush(); //case 50% < queue usage < 100%, flush is preferred but not required yet
- } finally {
- bpWrapper.lock.unlock();
}
}
private void doNotifyUnderHeapLock(int count, LlapCacheableBuffer[] cacheableBuffers) {
- LlapCacheableBuffer buffer;
for (int i = 0; i < count; i++) {
- buffer = cacheableBuffers[i];
- long time = timer.incrementAndGet();
- if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
- LlapIoImpl.CACHE_LOGGER.trace("Touching {} at {}", buffer, time);
- }
- // First, update buffer priority - we have just been using it.
- buffer.priority = (buffer.lastUpdate == -1) ? F0
- : touchPriority(time, buffer.lastUpdate, buffer.priority);
- buffer.lastUpdate = time;
- // Then, if the buffer was in the list, remove it.
- if (buffer.indexInHeap == LlapCacheableBuffer.IN_LIST) {
- listLock.lock();
- removeFromListAndUnlock(buffer);
- }
- // The only concurrent change that can happen when we hold the heap lock is list removal;
- // we have just ensured the item is not in the list, so we have a definite state now.
- if (buffer.indexInHeap >= 0) {
- // The buffer has lived in the heap all along. Restore heap property.
- heapifyDownUnderLock(buffer, time);
- } else if (heapSize == heap.length) {
- // The buffer is not in the (full) heap. Demote the top item of the heap into the list.
- LlapCacheableBuffer demoted = heap[0];
- listLock.lock();
- try {
- assert demoted.indexInHeap == 0; // Noone could have moved it, we have the heap lock.
- demoted.indexInHeap = LlapCacheableBuffer.IN_LIST;
- demoted.prev = null;
- if (listHead != null) {
- demoted.next = listHead;
- listHead.prev = demoted;
- listHead = demoted;
- } else {
- listHead = demoted;
- listTail = demoted;
- demoted.next = null;
- }
- } finally {
- listLock.unlock();
+ doNotifyUnderHeapLock(cacheableBuffers[i]);
+ }
+ }
+
+ private void doNotifyUnderHeapLock(LlapCacheableBuffer buffer) {
+ long time = timer.incrementAndGet();
+ if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
+ LlapIoImpl.CACHE_LOGGER.trace("Touching {} at {}", buffer, time);
+ }
+ // First, update buffer priority - we have just been using it.
+ buffer.priority = (buffer.lastUpdate == -1) ? F0
+ : touchPriority(time, buffer.lastUpdate, buffer.priority);
+ buffer.lastUpdate = time;
+ // Then, if the buffer was in the list, remove it.
+ if (buffer.indexInHeap == LlapCacheableBuffer.IN_LIST) {
+ listLock.lock();
+ removeFromListAndUnlock(buffer);
+ }
+ // The only concurrent change that can happen when we hold the heap lock is list removal;
+ // we have just ensured the item is not in the list, so we have a definite state now.
+ if (buffer.indexInHeap >= 0) {
+ // The buffer has lived in the heap all along. Restore heap property.
+ heapifyDownUnderLock(buffer, time);
+ } else if (heapSize == heap.length) {
+ // The buffer is not in the (full) heap. Demote the top item of the heap into the list.
+ LlapCacheableBuffer demoted = heap[0];
+ listLock.lock();
+ try {
+ assert demoted.indexInHeap == 0; // Noone could have moved it, we have the heap lock.
+ demoted.indexInHeap = LlapCacheableBuffer.IN_LIST;
+ demoted.prev = null;
+ if (listHead != null) {
+ demoted.next = listHead;
+ listHead.prev = demoted;
+ listHead = demoted;
+ } else {
+ listHead = demoted;
+ listTail = demoted;
+ demoted.next = null;
}
- // Now insert the new buffer in its place and restore heap property.
- buffer.indexInHeap = 0;
- heapifyDownUnderLock(buffer, time);
- } else {
- // Heap is not full, add the buffer to the heap and restore heap property up.
- assert heapSize < heap.length : heap.length + " < " + heapSize;
- buffer.indexInHeap = heapSize;
- heapifyUpUnderLock(buffer, time);
- ++heapSize;
+ } finally {
+ listLock.unlock();
}
+ // Now insert the new buffer in its place and restore heap property.
+ buffer.indexInHeap = 0;
+ heapifyDownUnderLock(buffer, time);
+ } else {
+ // Heap is not full, add the buffer to the heap and restore heap property up.
+ assert heapSize < heap.length : heap.length + " < " + heapSize;
+ buffer.indexInHeap = heapSize;
+ heapifyUpUnderLock(buffer, time);
+ ++heapSize;
}
}
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapPooledIOThread.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapPooledIOThread.java
new file mode 100644
index 00000000000..6acb2ec7279
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapPooledIOThread.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.daemon.impl;
+
+/**
+ * Marker class for threads created for the purpose of doing IO work in LLAP. Currently used by LRFU cache policy to
+ * distinguish from ephemeral threads (e.g. TezTR) and decide whether ThreadLocal-based features are applicable or not.
+ */
+public class LlapPooledIOThread extends Thread {
+
+ public LlapPooledIOThread(Runnable runnable) {
+ super(runnable);
+ }
+}
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index 65bbc4b4cff..ecf8d2575c9 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -36,7 +36,9 @@ import org.apache.hadoop.hive.llap.cache.LlapCacheHydration;
import org.apache.hadoop.hive.llap.cache.MemoryLimitedPathCache;
import org.apache.hadoop.hive.llap.cache.PathCache;
import org.apache.hadoop.hive.llap.cache.ProactiveEvictingCachePolicy;
+import org.apache.hadoop.hive.llap.daemon.impl.LlapPooledIOThread;
import org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -237,14 +239,16 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch>, LlapIoDebugDump {
int numThreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_IO_THREADPOOL_SIZE);
executor = new StatsRecordingThreadPool(numThreads, numThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
- new ThreadFactoryBuilder().setNameFormat("IO-Elevator-Thread-%d").setDaemon(true).build());
+ new ThreadFactoryBuilder().setNameFormat("IO-Elevator-Thread-%d").setDaemon(true)
+ .setThreadFactory(r -> new LlapPooledIOThread(r)).build());
tracePool = IoTrace.createTracePool(conf);
if (isEncodeEnabled) {
int encodePoolMultiplier = HiveConf.getIntVar(conf, ConfVars.LLAP_IO_ENCODE_THREADPOOL_MULTIPLIER);
int encodeThreads = numThreads * encodePoolMultiplier;
encodeExecutor = new StatsRecordingThreadPool(encodeThreads, encodeThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
- new ThreadFactoryBuilder().setNameFormat("IO-Elevator-Thread-OrcEncode-%d").setDaemon(true).build());
+ new ThreadFactoryBuilder().setNameFormat("IO-Elevator-Thread-OrcEncode-%d").setDaemon(true)
+ .setThreadFactory(r -> new LlapPooledIOThread(r)).build());
} else {
encodeExecutor = null;
}
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
index 17f49782caa..8c38ac10cdf 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelLrfuCachePolicy.java
@@ -44,6 +44,7 @@ import java.util.stream.IntStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
+import org.apache.hadoop.hive.llap.daemon.impl.LlapPooledIOThread;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
import org.junit.Assume;
import org.junit.Test;
@@ -272,40 +273,46 @@ public class TestLowLevelLrfuCachePolicy {
}
@Test
- public void testBPWrapperFlush() {
- int heapSize = 20;
- LOG.info("Testing bp wrapper flush logic");
- ArrayList<LlapDataBuffer> inserted = new ArrayList<LlapDataBuffer>(heapSize);
- EvictionTracker et = new EvictionTracker();
- Configuration conf = new Configuration();
- conf.setInt(HiveConf.ConfVars.LLAP_LRFU_BP_WRAPPER_SIZE.varname, 10);
- LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(1, heapSize, conf);
- LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager(heapSize, lrfu,
- LlapDaemonCacheMetrics.create("test", "1"));
- lrfu.setEvictionListener(et);
-
- // Test with 4 buffers: they should all remain in BP wrapper and not go to heap upon insertion.
- // .. but after purging, they need to show up as 4 evicted bytes.
- for (int i = 0; i < 4; ++i) {
- LlapDataBuffer buffer = LowLevelCacheImpl.allocateFake();
- assertTrue(cache(mm, lrfu, et, buffer));
- inserted.add(buffer);
- }
- assertArrayEquals(new long[] {0, 0, 0, 0, 0, 0, 0, 4, 4, 4, 0}, lrfu.metrics.getUsageStats());
- assertEquals(4, mm.purge());
- assertArrayEquals(new long[] {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, lrfu.metrics.getUsageStats());
+ public void testBPWrapperFlush() throws Exception {
+ // LlapPooledIOThread type of thread is needed in order to verify BPWrapper functionality as it is deliberately
+ // turned off for other threads, potentially ephemeral in nature.
+ LlapPooledIOThread thread = new LlapPooledIOThread(() -> {
+ int heapSize = 20;
+ LOG.info("Testing bp wrapper flush logic");
+ ArrayList<LlapDataBuffer> inserted = new ArrayList<LlapDataBuffer>(heapSize);
+ EvictionTracker et = new EvictionTracker();
+ Configuration conf = new Configuration();
+ conf.setInt(HiveConf.ConfVars.LLAP_LRFU_BP_WRAPPER_SIZE.varname, 10);
+ LowLevelLrfuCachePolicy lrfu = new LowLevelLrfuCachePolicy(1, heapSize, conf);
+ LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager(heapSize, lrfu,
+ LlapDaemonCacheMetrics.create("test", "1"));
+ lrfu.setEvictionListener(et);
+
+ // Test with 4 buffers: they should all remain in BP wrapper and not go to heap upon insertion.
+ // .. but after purging, they need to show up as 4 evicted bytes.
+ for (int i = 0; i < 4; ++i) {
+ LlapDataBuffer buffer = LowLevelCacheImpl.allocateFake();
+ assertTrue(cache(mm, lrfu, et, buffer));
+ inserted.add(buffer);
+ }
+ assertArrayEquals(new long[] {0, 0, 0, 0, 0, 0, 0, 4, 4, 4, 0}, lrfu.metrics.getUsageStats());
+ assertEquals(4, mm.purge());
+ assertArrayEquals(new long[] {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, lrfu.metrics.getUsageStats());
- // Testing with 8 buffers: on the 6th buffer BP wrapper content should be flushed into heap, next 2 buffers won't
- for (int i = 0; i < 8; ++i) {
- LlapDataBuffer buffer = LowLevelCacheImpl.allocateFake();
- assertTrue(cache(mm, lrfu, et, buffer));
- inserted.add(buffer);
- }
- assertArrayEquals(new long[] {6, 0, 0, 0, 0, 0, 0, 2, 2, 2, 0}, lrfu.metrics.getUsageStats());
- assertEquals(8, mm.purge());
- assertArrayEquals(new long[] {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, lrfu.metrics.getUsageStats());
+ // Testing with 8 buffers: on the 6th buffer BP wrapper content should be flushed into heap, next 2 buffers won't
+ for (int i = 0; i < 8; ++i) {
+ LlapDataBuffer buffer = LowLevelCacheImpl.allocateFake();
+ assertTrue(cache(mm, lrfu, et, buffer));
+ inserted.add(buffer);
+ }
+ assertArrayEquals(new long[] {6, 0, 0, 0, 0, 0, 0, 2, 2, 2, 0}, lrfu.metrics.getUsageStats());
+ assertEquals(8, mm.purge());
+ assertArrayEquals(new long[] {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, lrfu.metrics.getUsageStats());
- assertTrue(et.evicted.containsAll(inserted));
+ assertTrue(et.evicted.containsAll(inserted));
+ });
+ thread.start();
+ thread.join(30000);
}
@Test