You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/12/17 02:59:12 UTC
[4/4] hive git commit: HIVE-15147 : LLAP: use LLAP cache for
non-columnar formats in a somewhat general way (Sergey Shelukhin)
HIVE-15147 : LLAP: use LLAP cache for non-columnar formats in a somewhat general way (Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/682a3c7b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/682a3c7b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/682a3c7b
Branch: refs/heads/master-15147
Commit: 682a3c7b46aec9e43275551698fa6ba9c7ac5d7c
Parents: 7f46c8d
Author: Sergey Shelukhin <se...@apache.org>
Authored: Fri Dec 16 18:57:28 2016 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Fri Dec 16 18:57:28 2016 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 7 +
.../apache/hadoop/hive/llap/io/api/LlapIo.java | 3 +-
.../llap/IncrementalObjectSizeEstimator.java | 2 +-
.../hadoop/hive/llap/cache/BuddyAllocator.java | 3 +-
.../hive/llap/cache/EvictionDispatcher.java | 11 +-
.../hadoop/hive/llap/cache/FileCache.java | 107 ++
.../hive/llap/cache/FileCacheCleanupThread.java | 104 ++
.../hadoop/hive/llap/cache/LlapDataBuffer.java | 7 +
.../hive/llap/cache/LowLevelCacheImpl.java | 218 +--
.../hive/llap/cache/SerDeLowLevelCacheImpl.java | 716 ++++++++++
.../hive/llap/io/api/impl/LlapInputFormat.java | 348 +----
.../hive/llap/io/api/impl/LlapIoImpl.java | 26 +-
.../hive/llap/io/api/impl/LlapRecordReader.java | 335 +++++
.../llap/io/decode/ColumnVectorProducer.java | 11 +-
.../llap/io/decode/EncodedDataConsumer.java | 7 +
.../io/decode/GenericColumnVectorProducer.java | 201 +++
.../llap/io/decode/LlapTextInputFormat.java | 33 +
.../llap/io/decode/OrcColumnVectorProducer.java | 12 +-
.../llap/io/decode/OrcEncodedDataConsumer.java | 95 +-
.../hive/llap/io/decode/ReadPipeline.java | 2 +-
.../llap/io/encoded/SerDeEncodedDataReader.java | 1248 ++++++++++++++++++
.../llap/io/metadata/ConsumerFileMetadata.java | 31 +
.../io/metadata/ConsumerStripeMetadata.java | 35 +
.../hive/llap/io/metadata/OrcFileMetadata.java | 8 +-
.../hive/llap/io/metadata/OrcMetadataCache.java | 2 +-
.../llap/io/metadata/OrcStripeMetadata.java | 13 +-
orc/src/java/org/apache/orc/OrcUtils.java | 83 ++
.../org/apache/orc/impl/PhysicalWriter.java | 1 -
.../org/apache/orc/impl/RecordReaderImpl.java | 5 +
.../org/apache/orc/impl/TreeReaderFactory.java | 1 +
.../java/org/apache/orc/impl/WriterImpl.java | 99 +-
.../org/apache/hadoop/hive/llap/DebugUtils.java | 1 +
.../hadoop/hive/ql/exec/FetchOperator.java | 2 +-
.../apache/hadoop/hive/ql/exec/Utilities.java | 4 +-
.../hive/ql/io/CombineHiveRecordReader.java | 55 +-
.../hadoop/hive/ql/io/HiveInputFormat.java | 37 +-
.../io/LlapWrappableInputFormatInterface.java | 6 +-
.../hadoop/hive/ql/io/orc/OrcInputFormat.java | 10 +
.../hadoop/hive/ql/io/orc/WriterImpl.java | 20 +-
.../hive/ql/io/orc/encoded/CacheChunk.java | 4 +
.../orc/encoded/EncodedTreeReaderFactory.java | 2 +-
.../hive/ql/io/orc/encoded/StreamUtils.java | 13 +-
.../ql/io/rcfile/stats/PartialScanMapper.java | 5 +-
ql/src/test/queries/clientpositive/llap_text.q | 62 +
.../test/results/clientpositive/llap_text.q.out | 502 +++++++
.../hadoop/hive/common/io/DiskRangeList.java | 6 +
.../common/io/encoded/EncodedColumnBatch.java | 17 +
47 files changed, 3898 insertions(+), 622 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index b4e89b0..9806105 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2900,6 +2900,13 @@ public class HiveConf extends Configuration {
LLAP_IO_USE_FILEID_PATH("hive.llap.io.use.fileid.path", true,
"Whether LLAP should use fileId (inode)-based path to ensure better consistency for the\n" +
"cases of file overwrites. This is supported on HDFS."),
+ LLAP_IO_ENCODE_ALLOC_SIZE("hive.llap.io.encode.alloc.size", "256Kb", new SizeValidator(),
+ "Allocation size for the buffers used to cache encoded data from non-ORC files. Must\n" +
+ "be a power of two between " + LLAP_ALLOCATOR_MIN_ALLOC + " and\n" +
+ LLAP_ALLOCATOR_MAX_ALLOC + "."),
+ LLAP_IO_ENCODE_SLICE_ROW_COUNT("hive.llap.io.encode.slice.row.count", 100000,
+ "Row count to use to separate cache slices when caching encoded data from row-based\n" +
+ "inputs into LLAP cache."),
LLAP_ORC_ENABLE_TIME_COUNTERS("hive.llap.io.orc.time.counters", true,
"Whether to enable time counters for LLAP IO layer (time spent in HDFS, etc.)"),
LLAP_AUTO_ALLOW_UBER("hive.llap.auto.allow.uber", false,
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java
index d82757f..e5ab601 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java
@@ -18,10 +18,11 @@
package org.apache.hadoop.hive.llap.io.api;
+import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.InputFormat;
public interface LlapIo<T> {
- InputFormat<NullWritable, T> getInputFormat(InputFormat sourceInputFormat);
+ InputFormat<NullWritable, T> getInputFormat(InputFormat sourceInputFormat, Deserializer serde);
void close();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java
index 3efbcc2..ff6e7ce 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java
@@ -48,7 +48,7 @@ import org.apache.hadoop.hive.ql.util.JavaDataModel;
* during the actual estimation. TODO: clean up
*/
public class IncrementalObjectSizeEstimator {
- private static final JavaDataModel memoryModel = JavaDataModel.get();
+ public static final JavaDataModel memoryModel = JavaDataModel.get();
private enum FieldType { PRIMITIVE_ARRAY, OBJECT_ARRAY, COLLECTION, MAP, OTHER };
public static HashMap<Class<?>, ObjectEstimator> createEstimators(Object rootObj) {
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
index d9d407d..8d7f0d3 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
@@ -611,7 +611,8 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
assert data != null;
int headerIx = buffer.byteBuffer.position() >>> minAllocLog2,
freeListIx = freeListFromHeader(headers[headerIx]);
- assert freeListIx == (31 - Integer.numberOfLeadingZeros(buffer.allocSize) - minAllocLog2);
+ assert freeListIx == (31 - Integer.numberOfLeadingZeros(buffer.allocSize) - minAllocLog2)
+ : buffer.allocSize + " " + freeListIx;
while (true) {
FreeList freeList = freeLists[freeListIx];
int bHeaderIx = headerIx ^ (1 << freeListIx);
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
index b6fd3e3..2d3197c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
@@ -27,11 +27,16 @@ import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
*/
public final class EvictionDispatcher implements EvictionListener {
private final LowLevelCache dataCache;
+ private final SerDeLowLevelCacheImpl serdeCache;
private final OrcMetadataCache metadataCache;
+ private final EvictionAwareAllocator allocator;
- public EvictionDispatcher(LowLevelCache dataCache, OrcMetadataCache metadataCache) {
+ public EvictionDispatcher(LowLevelCache dataCache, SerDeLowLevelCacheImpl serdeCache,
+ OrcMetadataCache metadataCache, EvictionAwareAllocator allocator) {
this.dataCache = dataCache;
this.metadataCache = metadataCache;
+ this.serdeCache = serdeCache;
+ this.allocator = allocator;
}
@Override
@@ -40,7 +45,11 @@ public final class EvictionDispatcher implements EvictionListener {
}
public void notifyEvicted(LlapDataBuffer buffer) {
+ // Note: we don't know which cache this is from, so we notify both. They can noop if they
+ // want to find the buffer in their structures and can't.
dataCache.notifyEvicted(buffer);
+ serdeCache.notifyEvicted(buffer);
+ allocator.deallocateEvicted(buffer);
}
public void notifyEvicted(OrcFileMetadata buffer) {
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/FileCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/FileCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/FileCache.java
new file mode 100644
index 0000000..44b71c9
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/FileCache.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.cache;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Function;
+
+/** Class used for a single file in LowLevelCacheImpl, etc. */
+class FileCache<T> {
+ private static final int EVICTED_REFCOUNT = -1, EVICTING_REFCOUNT = -2;
+ private final T cache;
+ private final AtomicInteger refCount = new AtomicInteger(0);
+
+ private FileCache(T value) {
+ this.cache = value;
+ }
+
+ public T getCache() {
+ return cache;
+ }
+
+ boolean incRef() {
+ while (true) {
+ int value = refCount.get();
+ if (value == EVICTED_REFCOUNT) return false;
+ if (value == EVICTING_REFCOUNT) continue; // spin until it resolves; extremely rare
+ assert value >= 0;
+ if (refCount.compareAndSet(value, value + 1)) return true;
+ }
+ }
+
+ void decRef() {
+ int value = refCount.decrementAndGet();
+ if (value < 0) {
+ throw new AssertionError("Unexpected refCount " + value);
+ }
+ }
+
+ boolean startEvicting() {
+ while (true) {
+ int value = refCount.get();
+ if (value != 1) return false;
+ if (refCount.compareAndSet(value, EVICTING_REFCOUNT)) return true;
+ }
+ }
+
+ void commitEvicting() {
+ boolean result = refCount.compareAndSet(EVICTING_REFCOUNT, EVICTED_REFCOUNT);
+ assert result;
+ }
+
+ void abortEvicting() {
+ boolean result = refCount.compareAndSet(EVICTING_REFCOUNT, 0);
+ assert result;
+ }
+
+ /**
+ * All this mess is necessary because we want to be able to remove sub-caches for fully
+ * evicted files. It may actually be better to have non-nested map with object keys?
+ */
+ public static <T> FileCache<T> getOrAddFileSubCache(
+ ConcurrentHashMap<Object, FileCache<T>> cache, Object fileKey,
+ Function<Void, T> createFunc) {
+ FileCache<T> newSubCache = null;
+ while (true) { // Overwhelmingly executes once.
+ FileCache<T> subCache = cache.get(fileKey);
+ if (subCache != null) {
+ if (subCache.incRef()) return subCache; // Main path - found it, incRef-ed it.
+ if (newSubCache == null) {
+ newSubCache = new FileCache<T>(createFunc.apply(null));
+ newSubCache.incRef();
+ }
+ // Found a stale value we cannot incRef; try to replace it with new value.
+ if (cache.replace(fileKey, subCache, newSubCache)) return newSubCache;
+ continue; // Someone else replaced/removed a stale value, try again.
+ }
+ // No value found.
+ if (newSubCache == null) {
+ newSubCache = new FileCache<T>(createFunc.apply(null));
+ newSubCache.incRef();
+ }
+ FileCache<T> oldSubCache = cache.putIfAbsent(fileKey, newSubCache);
+ if (oldSubCache == null) return newSubCache; // Main path 2 - created a new file cache.
+ if (oldSubCache.incRef()) return oldSubCache; // Someone created one in parallel.
+ // Someone created one in parallel and then it went stale.
+ if (cache.replace(fileKey, oldSubCache, newSubCache)) return newSubCache;
+ // Someone else replaced/removed a parallel-added stale value, try again. Max confusion.
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/FileCacheCleanupThread.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/FileCacheCleanupThread.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/FileCacheCleanupThread.java
new file mode 100644
index 0000000..17c7ee6
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/FileCacheCleanupThread.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.cache;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+import org.apache.hive.common.util.Ref;
+
+/** Class used to slowly clean up a map of FileCache-s. */
+abstract class FileCacheCleanupThread<T> extends Thread {
+ private final long approxCleanupIntervalSec;
+ private final AtomicInteger newEvictions;
+ private final ConcurrentHashMap<Object, FileCache<T>> fileMap;
+
+ public FileCacheCleanupThread(String name, ConcurrentHashMap<Object, FileCache<T>> fileMap,
+ AtomicInteger newEvictions, long cleanupInterval) {
+ super(name);
+ this.fileMap = fileMap;
+ this.newEvictions = newEvictions;
+ this.approxCleanupIntervalSec = cleanupInterval;
+ setDaemon(true);
+ setPriority(1);
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ doOneCleanupRound();
+ } catch (InterruptedException ex) {
+ LlapIoImpl.LOG.warn("Cleanup thread has been interrupted");
+ Thread.currentThread().interrupt();
+ break;
+ } catch (Throwable t) {
+ LlapIoImpl.LOG.error("Cleanup has failed; the thread will now exit", t);
+ break;
+ }
+ }
+ }
+
+ private void doOneCleanupRound() throws InterruptedException {
+ while (true) {
+ int evictionsSinceLast = newEvictions.getAndSet(0);
+ if (evictionsSinceLast > 0) break;
+ synchronized (newEvictions) {
+ newEvictions.wait(10000);
+ }
+ }
+ // Duration is an estimate; if the size of the map changes, it can be very different.
+ long endTime = System.nanoTime() + approxCleanupIntervalSec * 1000000000L;
+ int leftToCheck = 0; // approximate
+ for (FileCache<T> fc : fileMap.values()) {
+ leftToCheck += getCacheSize(fc);
+ }
+ // Iterate thru all the filecaches. This is best-effort.
+ // If these super-long-lived iterators affect the map in some bad way,
+ // we'd need to sleep once per round instead.
+ Iterator<Map.Entry<Object, FileCache<T>>> iter = fileMap.entrySet().iterator();
+ Ref<Boolean> isPastEndTime = Ref.from(false);
+ while (iter.hasNext()) {
+ FileCache<T> fc = iter.next().getValue();
+ if (!fc.incRef()) {
+ throw new AssertionError("Something other than cleanup is removing elements from map");
+ }
+ leftToCheck = cleanUpOneFileCache(fc, leftToCheck, endTime, isPastEndTime);
+ if (getCacheSize(fc) > 0) {
+ fc.decRef();
+ continue;
+ }
+ // FileCache might be empty; see if we can remove it. "tryWriteLock"
+ if (!fc.startEvicting()) continue;
+ if (getCacheSize(fc) == 0) {
+ fc.commitEvicting();
+ iter.remove();
+ } else {
+ fc.abortEvicting();
+ }
+ }
+ }
+
+ protected abstract int getCacheSize(FileCache<T> fc);
+
+ protected abstract int cleanUpOneFileCache(FileCache<T> fc, int leftToCheck, long endTime,
+ Ref<Boolean> isPastEndTime) throws InterruptedException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
index d1a961c..7d5c101 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
@@ -139,4 +139,11 @@ public final class LlapDataBuffer extends LlapCacheableBuffer implements MemoryB
int refCount = this.refCount.get();
return "0x" + Integer.toHexString(System.identityHashCode(this)) + "(" + refCount + ")";
}
+
+ public static String toDataString(MemoryBuffer s) {
+ if (s == null || s.getByteBufferRaw().remaining() == 0) return "" + s;
+ byte b = s.getByteBufferRaw().get(s.getByteBufferRaw().position());
+ int i = (b < 0) ? -b : b;
+ return s + " (0x" + Integer.toHexString(i) + ")";
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
index ea458ca..72980ae 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
@@ -32,33 +32,45 @@ import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory;
import org.apache.hadoop.hive.common.io.DiskRangeList.MutateHelper;
import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
-import org.apache.hadoop.hive.llap.DebugUtils;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
+import org.apache.hive.common.util.Ref;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, LlapOomDebugDump {
private static final int DEFAULT_CLEANUP_INTERVAL = 600;
- private final EvictionAwareAllocator allocator;
+ private final Allocator allocator;
private final AtomicInteger newEvictions = new AtomicInteger(0);
private Thread cleanupThread = null;
- private final ConcurrentHashMap<Object, FileCache> cache =
- new ConcurrentHashMap<Object, FileCache>();
+ // TODO: given the specific data and lookups, perhaps the nested thing should not be a map
+ // In fact, CSLM has slow single-threaded operation, and one file is probably often read
+ // by just one (or few) threads, so a much more simple DS with locking might be better.
+ // Let's use CSLM for now, since it's available.
+ private final ConcurrentHashMap<Object,
+ FileCache<ConcurrentSkipListMap<Long, LlapDataBuffer>>> cache = new ConcurrentHashMap<>();
private final LowLevelCachePolicy cachePolicy;
private final long cleanupInterval;
private final LlapDaemonCacheMetrics metrics;
private final boolean doAssumeGranularBlocks;
+ private static final Function<Void, ConcurrentSkipListMap<Long, LlapDataBuffer>> CACHE_CTOR =
+ new Function<Void, ConcurrentSkipListMap<Long, LlapDataBuffer>>() {
+ @Override
+ public ConcurrentSkipListMap<Long, LlapDataBuffer> apply(Void input) {
+ return new ConcurrentSkipListMap<>();
+ }
+ };
+
public LowLevelCacheImpl(LlapDaemonCacheMetrics metrics, LowLevelCachePolicy cachePolicy,
- EvictionAwareAllocator allocator, boolean doAssumeGranularBlocks) {
+ Allocator allocator, boolean doAssumeGranularBlocks) {
this(metrics, cachePolicy, allocator, doAssumeGranularBlocks, DEFAULT_CLEANUP_INTERVAL);
}
@VisibleForTesting
LowLevelCacheImpl(LlapDaemonCacheMetrics metrics, LowLevelCachePolicy cachePolicy,
- EvictionAwareAllocator allocator, boolean doAssumeGranularBlocks, long cleanupInterval) {
-
+ Allocator allocator, boolean doAssumeGranularBlocks, long cleanupInterval) {
LlapIoImpl.LOG.info("Low level cache; cleanup interval {} sec", cleanupInterval);
this.cachePolicy = cachePolicy;
this.allocator = allocator;
@@ -69,7 +81,7 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
public void startThreads() {
if (cleanupInterval < 0) return;
- cleanupThread = new CleanupThread(cleanupInterval);
+ cleanupThread = new CleanupThread(cache, newEvictions, cleanupInterval);
cleanupThread.start();
}
@@ -78,7 +90,7 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
DiskRangeListFactory factory, LowLevelCacheCounters qfCounters, BooleanRef gotAllData) {
if (ranges == null) return null;
DiskRangeList prev = ranges.prev;
- FileCache subCache = cache.get(fileKey);
+ FileCache<ConcurrentSkipListMap<Long, LlapDataBuffer>> subCache = cache.get(fileKey);
if (subCache == null || !subCache.incRef()) {
long totalMissed = ranges.getTotalLength();
metrics.incrCacheRequestedBytes(totalMissed);
@@ -102,7 +114,7 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
metrics.incrCacheRequestedBytes(current.getLength());
// We assume ranges in "ranges" are non-overlapping; thus, we will save next in advance.
DiskRangeList next = current.next;
- getOverlappingRanges(baseOffset, current, subCache.cache, factory, gotAllData);
+ getOverlappingRanges(baseOffset, current, subCache.getCache(), factory, gotAllData);
current = next;
}
} finally {
@@ -234,7 +246,8 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
long baseOffset, Priority priority, LowLevelCacheCounters qfCounters) {
long[] result = null;
assert buffers.length == ranges.length;
- FileCache subCache = getOrAddFileSubCache(fileKey);
+ FileCache<ConcurrentSkipListMap<Long, LlapDataBuffer>> subCache =
+ FileCache.getOrAddFileSubCache(cache, fileKey, CACHE_CTOR);
try {
for (int i = 0; i < ranges.length; ++i) {
LlapDataBuffer buffer = (LlapDataBuffer)buffers[i];
@@ -247,7 +260,7 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
assert buffer.declaredCachedLength == LlapDataBuffer.UNKNOWN_CACHED_LENGTH;
buffer.declaredCachedLength = ranges[i].getLength();
while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value).
- LlapDataBuffer oldVal = subCache.cache.putIfAbsent(offset, buffer);
+ LlapDataBuffer oldVal = subCache.getCache().putIfAbsent(offset, buffer);
if (oldVal == null) {
// Cached successfully, add to policy.
cachePolicy.cache(buffer, priority);
@@ -287,7 +300,7 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
break;
}
// We found some old value but couldn't incRef it; remove it.
- subCache.cache.remove(offset, oldVal);
+ subCache.getCache().remove(offset, oldVal);
}
}
} finally {
@@ -296,38 +309,6 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
return result;
}
- /**
- * All this mess is necessary because we want to be able to remove sub-caches for fully
- * evicted files. It may actually be better to have non-nested map with object keys?
- */
- private FileCache getOrAddFileSubCache(Object fileKey) {
- FileCache newSubCache = null;
- while (true) { // Overwhelmingly executes once.
- FileCache subCache = cache.get(fileKey);
- if (subCache != null) {
- if (subCache.incRef()) return subCache; // Main path - found it, incRef-ed it.
- if (newSubCache == null) {
- newSubCache = new FileCache();
- newSubCache.incRef();
- }
- // Found a stale value we cannot incRef; try to replace it with new value.
- if (cache.replace(fileKey, subCache, newSubCache)) return newSubCache;
- continue; // Someone else replaced/removed a stale value, try again.
- }
- // No value found.
- if (newSubCache == null) {
- newSubCache = new FileCache();
- newSubCache.incRef();
- }
- FileCache oldSubCache = cache.putIfAbsent(fileKey, newSubCache);
- if (oldSubCache == null) return newSubCache; // Main path 2 - created a new file cache.
- if (oldSubCache.incRef()) return oldSubCache; // Someone created one in parallel.
- // Someone created one in parallel and then it went stale.
- if (cache.replace(fileKey, oldSubCache, newSubCache)) return newSubCache;
- // Someone else replaced/removed a parallel-added stale value, try again. Max confusion.
- }
- }
-
private static int align64(int number) {
return ((number + 63) & ~63);
}
@@ -370,134 +351,44 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
@Override
public final void notifyEvicted(MemoryBuffer buffer) {
- allocator.deallocateEvicted(buffer);
newEvictions.incrementAndGet();
}
- private static class FileCache {
- private static final int EVICTED_REFCOUNT = -1, EVICTING_REFCOUNT = -2;
- // TODO: given the specific data and lookups, perhaps the nested thing should not be a map
- // In fact, CSLM has slow single-threaded operation, and one file is probably often read
- // by just one (or few) threads, so a much more simple DS with locking might be better.
- // Let's use CSLM for now, since it's available.
- private final ConcurrentSkipListMap<Long, LlapDataBuffer> cache
- = new ConcurrentSkipListMap<Long, LlapDataBuffer>();
- private final AtomicInteger refCount = new AtomicInteger(0);
-
- boolean incRef() {
- while (true) {
- int value = refCount.get();
- if (value == EVICTED_REFCOUNT) return false;
- if (value == EVICTING_REFCOUNT) continue; // spin until it resolves; extremely rare
- assert value >= 0;
- if (refCount.compareAndSet(value, value + 1)) return true;
- }
- }
-
- void decRef() {
- int value = refCount.decrementAndGet();
- if (value < 0) {
- throw new AssertionError("Unexpected refCount " + value);
- }
- }
-
- boolean startEvicting() {
- while (true) {
- int value = refCount.get();
- if (value != 1) return false;
- if (refCount.compareAndSet(value, EVICTING_REFCOUNT)) return true;
- }
- }
-
- void commitEvicting() {
- boolean result = refCount.compareAndSet(EVICTING_REFCOUNT, EVICTED_REFCOUNT);
- assert result;
- }
-
- void abortEvicting() {
- boolean result = refCount.compareAndSet(EVICTING_REFCOUNT, 0);
- assert result;
- }
- }
-
- private final class CleanupThread extends Thread {
- private final long approxCleanupIntervalSec;
+ private static final class CleanupThread
+ extends FileCacheCleanupThread<ConcurrentSkipListMap<Long, LlapDataBuffer>> {
- public CleanupThread(long cleanupInterval) {
- super("Llap low level cache cleanup thread");
- this.approxCleanupIntervalSec = cleanupInterval;
- setDaemon(true);
- setPriority(1);
+ public CleanupThread(ConcurrentHashMap<Object,
+ FileCache<ConcurrentSkipListMap<Long, LlapDataBuffer>>> fileMap,
+ AtomicInteger newEvictions, long cleanupInterval) {
+ super("Llap low level cache cleanup thread", fileMap, newEvictions, cleanupInterval);
}
@Override
- public void run() {
- while (true) {
- try {
- doOneCleanupRound();
- } catch (InterruptedException ex) {
- LlapIoImpl.LOG.warn("Cleanup thread has been interrupted");
- Thread.currentThread().interrupt();
- break;
- } catch (Throwable t) {
- LlapIoImpl.LOG.error("Cleanup has failed; the thread will now exit", t);
- break;
- }
- }
+ protected int getCacheSize( FileCache<ConcurrentSkipListMap<Long, LlapDataBuffer>> fc) {
+ return fc.getCache().size();
}
- private void doOneCleanupRound() throws InterruptedException {
- while (true) {
- int evictionsSinceLast = newEvictions.getAndSet(0);
- if (evictionsSinceLast > 0) break;
- synchronized (newEvictions) {
- newEvictions.wait(10000);
- }
- }
- // Duration is an estimate; if the size of the map changes, it can be very different.
- long endTime = System.nanoTime() + approxCleanupIntervalSec * 1000000000L;
- int leftToCheck = 0; // approximate
- for (FileCache fc : cache.values()) {
- leftToCheck += fc.cache.size();
- }
- // Iterate thru all the filecaches. This is best-effort.
- // If these super-long-lived iterators affect the map in some bad way,
- // we'd need to sleep once per round instead.
- Iterator<Map.Entry<Object, FileCache>> iter = cache.entrySet().iterator();
- boolean isPastEndTime = false;
- while (iter.hasNext()) {
- FileCache fc = iter.next().getValue();
- if (!fc.incRef()) {
- throw new AssertionError("Something other than cleanup is removing elements from map");
- }
- // Iterate thru the file cache. This is best-effort.
- Iterator<Map.Entry<Long, LlapDataBuffer>> subIter = fc.cache.entrySet().iterator();
- boolean isEmpty = true;
- while (subIter.hasNext()) {
- long time = -1;
- isPastEndTime = isPastEndTime || ((time = System.nanoTime()) >= endTime);
- Thread.sleep(((leftToCheck <= 0) || isPastEndTime)
- ? 1 : (endTime - time) / (1000000L * leftToCheck));
- if (subIter.next().getValue().isInvalid()) {
- subIter.remove();
- } else {
- isEmpty = false;
- }
- --leftToCheck;
- }
- if (!isEmpty) {
- fc.decRef();
- continue;
- }
- // FileCache might be empty; see if we can remove it. "tryWriteLock"
- if (!fc.startEvicting()) continue;
- if (fc.cache.isEmpty()) {
- fc.commitEvicting();
- iter.remove();
+ @Override
+ public int cleanUpOneFileCache(
+ FileCache<ConcurrentSkipListMap<Long, LlapDataBuffer>> fc,
+ int leftToCheck, long endTime, Ref<Boolean> isPastEndTime)
+ throws InterruptedException {
+ // Iterate thru the file cache. This is best-effort.
+ Iterator<Map.Entry<Long, LlapDataBuffer>> subIter = fc.getCache().entrySet().iterator();
+ boolean isEmpty = true;
+ while (subIter.hasNext()) {
+ long time = -1;
+ isPastEndTime.value = isPastEndTime.value || ((time = System.nanoTime()) >= endTime);
+ Thread.sleep(((leftToCheck <= 0) || isPastEndTime.value)
+ ? 1 : (endTime - time) / (1000000L * leftToCheck));
+ if (subIter.next().getValue().isInvalid()) {
+ subIter.remove();
} else {
- fc.abortEvicting();
+ isEmpty = false;
}
+ --leftToCheck;
}
+ return leftToCheck;
}
}
@@ -516,11 +407,12 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
@Override
public String debugDumpForOom() {
StringBuilder sb = new StringBuilder("File cache state ");
- for (Map.Entry<Object, FileCache> e : cache.entrySet()) {
+ for (Map.Entry<Object, FileCache<ConcurrentSkipListMap<Long, LlapDataBuffer>>> e :
+ cache.entrySet()) {
if (!e.getValue().incRef()) continue;
try {
sb.append("\n file " + e.getKey());
- for (Map.Entry<Long, LlapDataBuffer> e2 : e.getValue().cache.entrySet()) {
+ for (Map.Entry<Long, LlapDataBuffer> e2 : e.getValue().getCache().entrySet()) {
if (e2.getValue().incRef() < 0) continue;
try {
sb.append("\n [").append(e2.getKey()).append(", ")
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
new file mode 100644
index 0000000..53e3275
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SerDeLowLevelCacheImpl.java
@@ -0,0 +1,716 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.cache;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.hive.common.io.Allocator;
+import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
+import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
+import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
+import org.apache.hive.common.util.Ref;
+import org.apache.orc.OrcProto;
+import org.apache.orc.OrcProto.ColumnEncoding;
+
+import com.google.common.base.Function;
+
+public class SerDeLowLevelCacheImpl implements BufferUsageManager, LlapOomDebugDump {
+ private static final int DEFAULT_CLEANUP_INTERVAL = 600;
+ private final Allocator allocator;
+ private final AtomicInteger newEvictions = new AtomicInteger(0);
+ private Thread cleanupThread = null;
+ private final ConcurrentHashMap<Object, FileCache<FileData>> cache = new ConcurrentHashMap<>();
+ private final LowLevelCachePolicy cachePolicy;
+ private final long cleanupInterval;
+ private final LlapDaemonCacheMetrics metrics;
+
+ private static final class StripeInfoComparator implements
+ Comparator<StripeData> {
+ @Override
+ public int compare(StripeData o1, StripeData o2) {
+ int starts = Long.compare(o1.knownTornStart, o2.knownTornStart);
+ if (starts != 0) return starts;
+ starts = Long.compare(o1.firstStart, o2.firstStart);
+ if (starts != 0) return starts;
+ assert (o1.lastStart == o2.lastStart) == (o1.lastEnd == o2.lastEnd);
+ return Long.compare(o1.lastStart, o2.lastStart);
+ }
+ }
+
+ public static class FileData {
+ /**
+ * RW lock ensures we have a consistent view of the file data, which is important given that
+ * we generate "stripe" boundaries arbitrarily. Reading buffer data itself doesn't require
+ * that this lock is held; however, everything else in stripes list does.
+ * TODO: make more granular? We only care that each one reader sees consistent boundaries.
+ * So, we could shallow-copy the stripes list, then have individual locks inside each.
+ */
+ private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+ private final Object fileKey;
+ private final int colCount;
+ private ArrayList<StripeData> stripes;
+
+ public FileData(Object fileKey, int colCount) {
+ this.fileKey = fileKey;
+ this.colCount = colCount;
+ }
+
+ public void toString(StringBuilder sb) {
+ sb.append("File data for ").append(fileKey).append(" with ").append(colCount)
+ .append(" columns: ").append(stripes);
+ }
+
+ public int getColCount() {
+ return colCount;
+ }
+
+ public ArrayList<StripeData> getData() {
+ return stripes;
+ }
+
+ public void addStripe(StripeData sd) {
+ if (stripes == null) {
+ stripes = new ArrayList<>();
+ }
+ stripes.add(sd);
+ }
+
+ @Override
+ public String toString() {
+ return "[fileKey=" + fileKey + ", colCount=" + colCount + ", stripes=" + stripes + "]";
+ }
+ }
+
+ public static final class StripeData {
+ // In LRR case, if we just store 2 boundaries (which could be split boundaries or reader
+ // positions, we wouldn't be able to account for torn rows correctly because the semantics of
+ // our "exact" reader positions, and inexact split boundaries, are different. We cannot even
+ // tell LRR to use exact boundaries, as there can be a mismatch in an original mid-file split
+ // wrt first row when caching - we may produce incorrect result if we adjust the split
+ // boundary, and also if we don't adjust it, depending where it falls. At best, we'd end up
+ // with spurious disk reads if we cache on row boundaries but splits include torn rows.
+ // This structure implies that when reading a split, we skip the first torn row but fully
+ // read the last torn row (as LineRecordReader does). If we want to support a different scheme,
+ // we'd need to store more offsets and make logic account for that.
+ private long knownTornStart; // This can change based on new splits.
+ private final long firstStart, lastStart, lastEnd;
+ // TODO: we can actually consider storing ALL the delta encoded row offsets - not a lot of
+ // overhead compared to the data itself, and with row offsets, we could use columnar
+ // blocks for inconsistent splits. We are not optimizing for inconsistent splits for now.
+
+ private final long rowCount;
+ private final OrcProto.ColumnEncoding[] encodings;
+ private LlapDataBuffer[][][] data; // column index, stream type, buffers
+
+ public StripeData(long knownTornStart, long firstStart, long lastStart, long lastEnd,
+ long rowCount, ColumnEncoding[] encodings) {
+ this.knownTornStart = knownTornStart;
+ this.firstStart = firstStart;
+ this.lastStart = lastStart;
+ this.lastEnd = lastEnd;
+ this.encodings = encodings;
+ this.rowCount = rowCount;
+ this.data = encodings == null ? null : new LlapDataBuffer[encodings.length][][];
+ }
+
+ @Override
+ public String toString() {
+ return toCoordinateString() + " with encodings [" + Arrays.toString(encodings)
+ .replace('\n', ' ') + "] and data " + SerDeLowLevelCacheImpl.toString(data);
+ }
+
+ public long getKnownTornStart() {
+ return knownTornStart;
+ }
+
+ public long getFirstStart() {
+ return firstStart;
+ }
+
+ public long getLastStart() {
+ return lastStart;
+ }
+
+ public long getLastEnd() {
+ return lastEnd;
+ }
+
+ public long getRowCount() {
+ return rowCount;
+ }
+
+ public OrcProto.ColumnEncoding[] getEncodings() {
+ return encodings;
+ }
+
+ public LlapDataBuffer[][][] getData() {
+ return data;
+ }
+
+ public String toCoordinateString() {
+ return "stripe kts " + knownTornStart + " from "
+ + firstStart + " to [" + lastStart + ", " + lastEnd + ")";
+ }
+
+ public static StripeData duplicateForResults(StripeData s) {
+ return new StripeData(s.knownTornStart, s.firstStart, s.lastStart, s.lastEnd,
+ s.rowCount, new OrcProto.ColumnEncoding[s.encodings.length]);
+ }
+
+ public void setKnownTornStart(long value) {
+ knownTornStart = value;
+ }
+ }
+
+ public static String toString(LlapDataBuffer[][][] data) {
+ if (data == null) return "null";
+ StringBuilder sb = new StringBuilder("[");
+ for (int i = 0; i < data.length; ++i) {
+ LlapDataBuffer[][] colData = data[i];
+ if (colData == null) {
+ sb.append("null, ");
+ continue;
+ }
+ sb.append("[");
+ for (int j = 0; j < colData.length; ++j) {
+ LlapDataBuffer[] streamData = colData[j];
+ if (streamData == null) {
+ sb.append("null, ");
+ continue;
+ }
+ sb.append("[");
+ for (int k = 0; k < streamData.length; ++k) {
+ LlapDataBuffer s = streamData[k];
+ sb.append(LlapDataBuffer.toDataString(s));
+ }
+ sb.append("], ");
+ }
+ sb.append("], ");
+ }
+ sb.append("]");
+ return sb.toString();
+ }
+
+
+ public static String toString(LlapDataBuffer[][] data) {
+ if (data == null) return "null";
+ StringBuilder sb = new StringBuilder("[");
+ for (int j = 0; j < data.length; ++j) {
+ LlapDataBuffer[] streamData = data[j];
+ if (streamData == null) {
+ sb.append("null, ");
+ continue;
+ }
+ sb.append("[");
+ for (int k = 0; k < streamData.length; ++k) {
+ LlapDataBuffer s = streamData[k];
+ sb.append(LlapDataBuffer.toDataString(s));
+ }
+ sb.append("], ");
+ }
+ sb.append("]");
+ return sb.toString();
+ }
+
+ public SerDeLowLevelCacheImpl(
+ LlapDaemonCacheMetrics metrics, LowLevelCachePolicy cachePolicy, Allocator allocator) {
+ this.cachePolicy = cachePolicy;
+ this.allocator = allocator;
+ this.cleanupInterval = DEFAULT_CLEANUP_INTERVAL;
+ this.metrics = metrics;
+ LlapIoImpl.LOG.info("SerDe low-level level cache; cleanup interval {} sec", cleanupInterval);
+ }
+
+ public void startThreads() {
+ if (cleanupInterval < 0) return;
+ cleanupThread = new CleanupThread(cache, newEvictions, cleanupInterval);
+ cleanupThread.start();
+ }
+
+ public FileData getFileData(Object fileKey, long start, long end, boolean[] includes,
+ DiskRangeListFactory factory, LowLevelCacheCounters qfCounters, BooleanRef gotAllData)
+ throws IOException {
+ FileCache<FileData> subCache = cache.get(fileKey);
+ if (subCache == null || !subCache.incRef()) {
+ LlapIoImpl.CACHE_LOGGER.info("TODO# cannot find cache for " + fileKey + " in " + cache);
+ markAllAsMissed(start, end, qfCounters, gotAllData);
+ return null;
+ }
+
+ try {
+ FileData cached = subCache.getCache();
+ cached.rwLock.readLock().lock();
+ LlapIoImpl.CACHE_LOGGER.info(("TODO# cache for " + fileKey + " is " + subCache.getCache()).replace('\n', ' '));
+ try {
+ if (cached.stripes == null) {
+ LlapIoImpl.CACHE_LOGGER.info("TODO# cannot find any stripes for " + fileKey);
+ markAllAsMissed(start, end, qfCounters, gotAllData);
+ return null;
+ }
+ if (includes.length > cached.colCount) {
+ throw new IOException("Includes " + DebugUtils.toString(includes) + " for "
+ + cached.colCount + " columns");
+ }
+ FileData result = new FileData(cached.fileKey, cached.colCount);
+ if (gotAllData != null) {
+ gotAllData.value = true;
+ }
+ // We will adjust start and end so that we could record the metrics; save the originals.
+ long origStart = start, origEnd = end;
+ // startIx is inclusive, endIx is exclusive.
+ int startIx = Integer.MIN_VALUE, endIx = Integer.MIN_VALUE;
+ LlapIoImpl.CACHE_LOGGER.info("TODO# Looking for data between " + start + " and " + end);
+ for (int i = 0; i < cached.stripes.size() && endIx == Integer.MIN_VALUE; ++i) {
+ StripeData si = cached.stripes.get(i);
+ LlapIoImpl.CACHE_LOGGER.info("TODO# looking at " + si.toCoordinateString());
+
+ if (startIx == i) {
+ // The start of the split was in the middle of the previous slice.
+ start = si.knownTornStart;
+ } else if (startIx == Integer.MIN_VALUE) {
+ // Determine if we need to read this slice for the split.
+ if (si.lastEnd <= start) continue; // Slice before the start of the split.
+ // Start of the split falls somewhere within or before this slice.
+ // Note the ">=" - LineRecordReader will skip the first row even if we start
+ // directly at its start, because it cannot know if it's the start or not.
+ // Unless it's 0; note that we DO give 0 special treatment here, unlike the EOF below,
+ // because zero is zero. Need to mention it in Javadoc.
+ if (start == 0 && si.firstStart == 0) {
+ startIx = i;
+ } else if (start >= si.firstStart) {
+ // If the start of the split points into the middle of the cached slice, we cannot
+ // use the cached block - it's encoded and columnar, so we cannot map the file
+ // offset to some "offset" in "middle" of the slice (but see TODO for firstStart).
+ startIx = i + 1;
+ // continue;
+ } else {
+ // Start of the split is before this slice.
+ startIx = i; // Simple case - we will read cache from the split start offset.
+ start = si.knownTornStart;
+ }
+ }
+
+ // Determine if this (or previous) is the last slice we need to read for this split.
+ if (startIx != Integer.MIN_VALUE && endIx == Integer.MIN_VALUE) {
+ if (si.lastEnd <= end) {
+ // The entire current slice is part of the split. Note that if split end EQUALS
+ // lastEnd, the split would also read the next row, so we do need to look at the
+ // next slice, if any (although we'd probably find we cannot use it).
+ // Note also that we DO NOT treat end-of-file differently here, cause we do not know
+ // of any such thing. The caller must handle lastEnd vs end of split vs end of file
+ // match correctly in terms of how LRR handles them. See above for start-of-file.
+ if (i + 1 != cached.stripes.size()) continue;
+ endIx = i + 1;
+ end = si.lastEnd;
+ } else if (si.lastStart <= end) {
+ // The split ends within (and would read) the last row of this slice. Exact match.
+ endIx = i + 1;
+ end = si.lastEnd;
+ } else {
+ // Either the slice comes entirely after the end of split (following a gap in cached
+ // data); or the split ends in the middle of the slice, so it's the same as in the
+ // startIx logic w.r.t. the partial match; so, we either don't want to, or cannot,
+ // use this. There's no need to distinguish these two cases for now.
+ endIx = i;
+ end = (endIx > 0) ? cached.stripes.get(endIx - 1).lastEnd : start;
+ }
+ }
+ }
+ LlapIoImpl.CACHE_LOGGER.info("TODO# determined stripe indexes " + startIx + ", " + endIx);
+ if (endIx <= startIx) {
+ if (gotAllData != null) {
+ gotAllData.value = false;
+ }
+ return null; // No data for the split, or it fits in the middle of one or two slices.
+ }
+ if (start > origStart || end < origEnd) {
+ if (gotAllData != null) {
+ gotAllData.value = false;
+ }
+ long totalMiss = Math.max(0, origEnd - end) + Math.max(0, start - origStart);
+ metrics.incrCacheRequestedBytes(totalMiss);
+ if (qfCounters != null) {
+ qfCounters.recordCacheMiss(totalMiss);
+ }
+ }
+
+ result.stripes = new ArrayList<>(endIx - startIx);
+ for (int stripeIx = startIx; stripeIx < endIx; ++stripeIx) {
+ getCacheDataForOneSlice(stripeIx, cached, result, gotAllData, includes, qfCounters);
+ }
+ return result;
+ } finally {
+ cached.rwLock.readLock().unlock();
+ }
+ } finally {
+ subCache.decRef();
+ }
+ }
+
+
+ private void getCacheDataForOneSlice(int stripeIx, FileData cached, FileData result,
+ BooleanRef gotAllData, boolean[] includes, LowLevelCacheCounters qfCounters) {
+ StripeData cStripe = cached.stripes.get(stripeIx);
+ LlapIoImpl.CACHE_LOGGER.info("TODO# got stripe in cache " + cStripe);
+ StripeData stripe = StripeData.duplicateForResults(cStripe);
+ result.stripes.add(stripe);
+ boolean isMissed = false;
+ for (int colIx = 0; colIx < cached.colCount; ++colIx) {
+ if (!includes[colIx]) continue;
+ if (cStripe.encodings[colIx] == null || cStripe.data[colIx] == null) {
+ assert cStripe.data[colIx] == null; // No encoding => must have no data.
+ isMissed = true;
+ if (gotAllData != null) {
+ gotAllData.value = false;
+ }
+ continue;
+ }
+ stripe.encodings[colIx] = cStripe.encodings[colIx];
+ LlapDataBuffer[][] cColData = cStripe.data[colIx];
+ assert cColData != null;
+ for (int streamIx = 0;
+ cColData != null && streamIx < cColData.length; ++streamIx) {
+ LlapDataBuffer[] streamData = cColData[streamIx];
+ // Note: this relies on the fact that we always evict the entire column, so if
+ // we have the column data, we assume we have all the streams we need.
+ if (streamData == null) continue;
+ for (int i = 0; i < streamData.length; ++i) { // Finally, we are going to use "i"!
+ if (!lockBuffer(streamData[i], true)) {
+ LlapIoImpl.CACHE_LOGGER.info("TODO# couldn't lock data for stripe at "
+ + stripeIx + ", colIx " + colIx + ", stream type " + streamIx);
+
+ cColData = null;
+ isMissed = true;
+ handleRemovedColumnData(cColData);
+ if (gotAllData != null) {
+ gotAllData.value = false;
+ }
+ break;
+ }
+ }
+ }
+ // At this point, we have arrived at the level where we need all the data, and the
+ // arrays never change. So we will just do a shallow assignment here instead of copy.
+ stripe.data[colIx] = cColData;
+ }
+ doMetricsStuffForOneSlice(qfCounters, stripe, isMissed);
+ }
+
+
+ private void doMetricsStuffForOneSlice(
+ LowLevelCacheCounters qfCounters, StripeData stripe, boolean isMissed) {
+ // Slice boundaries may not match split boundaries due to torn rows in either direction,
+ // so this counter may not be consistent with splits. This is also why we increment
+ // requested bytes here, instead of based on the split - we don't want the metrics to be
+ // inconsistent with each other. No matter what we determine here, at least we'll account
+ // for both in the same manner.
+ long bytes = stripe.lastEnd - stripe.knownTornStart;
+ metrics.incrCacheRequestedBytes(bytes);
+ if (!isMissed) {
+ metrics.incrCacheHitBytes(bytes);
+ }
+ if (qfCounters != null) {
+ if (isMissed) {
+ qfCounters.recordCacheMiss(bytes);
+ } else {
+ qfCounters.recordCacheHit(bytes);
+ }
+ }
+ }
+
+ private void markAllAsMissed(long from, long to,
+ LowLevelCacheCounters qfCounters, BooleanRef gotAllData) {
+ if (qfCounters != null) {
+ metrics.incrCacheRequestedBytes(to - from);
+ qfCounters.recordCacheMiss(to - from);
+ }
+ if (gotAllData != null) {
+ gotAllData.value = false;
+ }
+ }
+
+ private boolean lockBuffer(LlapDataBuffer buffer, boolean doNotifyPolicy) {
+ int rc = buffer.incRef();
+ if (rc > 0) {
+ metrics.incrCacheNumLockedBuffers();
+ }
+ if (doNotifyPolicy && rc == 1) {
+ // We have just locked a buffer that wasn't previously locked.
+ cachePolicy.notifyLock(buffer);
+ }
+ return rc > 0;
+ }
+
+ public void putFileData(final FileData data, Priority priority,
+ LowLevelCacheCounters qfCounters) {
+ // TODO: buffers are accounted for at allocation time, but ideally we should report the memory
+ // overhead from the java objects to memory manager and remove it when discarding file.
+ if (data.stripes == null || data.stripes.isEmpty()) {
+ LlapIoImpl.LOG.warn("Trying to cache FileData with no data for " + data.fileKey);
+ return;
+ }
+ FileCache<FileData> subCache = null;
+ FileData cached = null;
+ data.rwLock.writeLock().lock();
+ try {
+ subCache = FileCache.getOrAddFileSubCache(
+ cache, data.fileKey, new Function<Void, FileData>() {
+ @Override
+ public FileData apply(Void input) {
+ return data; // If we don't have a file cache, we will add this one as is.
+ }
+ });
+ cached = subCache.getCache();
+ } finally {
+ if (data != cached) {
+ data.rwLock.writeLock().unlock();
+ }
+ }
+ try {
+ if (data != cached) {
+ cached.rwLock.writeLock().lock();
+ }
+ try {
+ for (StripeData si : data.stripes) {
+ lockAllBuffersForPut(si, priority);
+ }
+ if (data == cached) {
+ LlapIoImpl.CACHE_LOGGER.info(("TODO# cached new data " + data).replace('\n', ' '));
+ return;
+ }
+ LlapIoImpl.CACHE_LOGGER.info(("TODO# merging old " + cached + " and new " + data).replace('\n', ' '));
+ ArrayList<StripeData> combined = new ArrayList<>(
+ cached.stripes.size() + data.stripes.size());
+ combined.addAll(cached.stripes);
+ combined.addAll(data.stripes);
+ Collections.sort(combined, new StripeInfoComparator());
+ int lastIx = combined.size() - 1;
+ for (int ix = 0; ix < lastIx; ++ix) {
+ StripeData cur = combined.get(ix), next = combined.get(ix + 1);
+ if (cur.lastEnd <= next.firstStart) continue; // All good.
+ if (cur.firstStart == next.firstStart && cur.lastEnd == next.lastEnd) {
+ mergeStripeInfos(cur, next);
+ combined.remove(ix + 1);
+ --lastIx;
+ // Don't recheck with next, only 2 lists each w/o collisions.
+ continue;
+ }
+ // The original lists do not contain collisions, so only one is 'old'.
+ boolean isCurOriginal = cached.stripes.contains(cur);
+ handleRemovedStripeInfo(combined.remove(isCurOriginal ? ix : ix + 1));
+ --ix;
+ --lastIx;
+ }
+ cached.stripes = combined;
+ LlapIoImpl.CACHE_LOGGER.info(("TODO# new cache data is " + combined).replace('\n', ' '));
+
+ } finally {
+ cached.rwLock.writeLock().unlock();
+ }
+ } finally {
+ subCache.decRef();
+ }
+ }
+
+ private void lockAllBuffersForPut(StripeData si, Priority priority) {
+ for (int i = 0; i < si.data.length; ++i) {
+ LlapDataBuffer[][] colData = si.data[i];
+ if (colData == null) continue;
+ for (int j = 0; j < colData.length; ++j) {
+ LlapDataBuffer[] streamData = colData[j];
+ if (streamData == null) continue;
+ for (int k = 0; k < streamData.length; ++k) {
+ boolean canLock = lockBuffer(streamData[k], false); // false - not in cache yet
+ assert canLock;
+ /*LlapIoImpl.LOG.info("TODO# Calling cache on "
+ + System.identityHashCode(streamData[k]) + ": " + i + ", " + j + ", " + k);*/
+ cachePolicy.cache(streamData[k], priority);
+ streamData[k].declaredCachedLength = streamData[k].getByteBufferRaw().remaining();
+ }
+ }
+ }
+ }
+
+ private void handleRemovedStripeInfo(StripeData removed) {
+ for (LlapDataBuffer[][] colData : removed.data) {
+ handleRemovedColumnData(colData);
+ }
+ }
+
+ private void handleRemovedColumnData(LlapDataBuffer[][] removed) {
+ // TODO: could we tell the policy that we don't care about these and have them evicted? or we
+ // could just deallocate them when unlocked, and free memory + handle that in eviction.
+ // For now, just abandon the blocks - eventually, they'll get evicted.
+ }
+
+ private void mergeStripeInfos(StripeData to, StripeData from) {
+ LlapIoImpl.CACHE_LOGGER.info("TODO# merging slices data: old " + to + " and new " + from);
+ to.knownTornStart = Math.min(to.knownTornStart, from.knownTornStart);
+ if (from.encodings.length != to.encodings.length) {
+ throw new RuntimeException("Different encodings " + from + "; " + to);
+ }
+ for (int colIx = 0; colIx < from.encodings.length; ++colIx) {
+ if (to.encodings[colIx] == null) {
+ to.encodings[colIx] = from.encodings[colIx];
+ } else if (from.encodings[colIx] != null
+ && !to.encodings[colIx].equals(from.encodings[colIx])) {
+ throw new RuntimeException("Different encodings at " + colIx + ": " + from + "; " + to);
+ }
+ LlapDataBuffer[][] fromColData = from.data[colIx];
+ if (fromColData != null) {
+ if (to.data[colIx] != null) {
+ // Note: we assume here that the data that was returned to the caller from cache will not
+ // be passed back in via put. Right now it's safe since we don't do anything. But if we
+ // evict proactively, we will have to compare objects all the way down.
+ handleRemovedColumnData(to.data[colIx]);
+ }
+ to.data[colIx] = fromColData;
+ }
+ }
+ }
+
+ @Override
+ public void decRefBuffer(MemoryBuffer buffer) {
+ unlockBuffer((LlapDataBuffer)buffer, true);
+ }
+
+ @Override
+ public void decRefBuffers(List<MemoryBuffer> cacheBuffers) {
+ for (MemoryBuffer b : cacheBuffers) {
+ unlockBuffer((LlapDataBuffer)b, true);
+ }
+ }
+
+ private void unlockBuffer(LlapDataBuffer buffer, boolean handleLastDecRef) {
+ boolean isLastDecref = (buffer.decRef() == 0);
+ if (handleLastDecRef && isLastDecref) {
+ // This is kind of not pretty, but this is how we detect whether buffer was cached.
+ // We would always set this for lookups at put time.
+ if (buffer.declaredCachedLength != LlapDataBuffer.UNKNOWN_CACHED_LENGTH) {
+ cachePolicy.notifyUnlock(buffer);
+ } else {
+ if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
+ LlapIoImpl.CACHE_LOGGER.trace("Deallocating {} that was not cached", buffer);
+ }
+ allocator.deallocate(buffer);
+ }
+ }
+ metrics.decrCacheNumLockedBuffers();
+ }
+
+ private static final ByteBuffer fakeBuf = ByteBuffer.wrap(new byte[1]);
+ public static LlapDataBuffer allocateFake() {
+ LlapDataBuffer fake = new LlapDataBuffer();
+ fake.initialize(-1, fakeBuf, 0, 1);
+ return fake;
+ }
+
+ public final void notifyEvicted(MemoryBuffer buffer) {
+ newEvictions.incrementAndGet();
+ }
+
+ private final class CleanupThread extends FileCacheCleanupThread<FileData> {
+
+ public CleanupThread(ConcurrentHashMap<Object, FileCache<FileData>> fileMap,
+ AtomicInteger newEvictions, long cleanupInterval) {
+ super("Llap serde low level cache cleanup thread", fileMap, newEvictions, cleanupInterval);
+ }
+
+ @Override
+ protected int getCacheSize(FileCache<FileData> fc) {
+ return 1; // Each iteration cleans the file cache as a single unit (unlike the ORC cache).
+ }
+
+ @Override
+ public int cleanUpOneFileCache(FileCache<FileData> fc, int leftToCheck, long endTime,
+ Ref<Boolean> isPastEndTime) throws InterruptedException {
+ FileData fd = fc.getCache();
+ fd.rwLock.writeLock().lock();
+ try {
+ for (StripeData sd : fd.stripes) {
+ for (int colIx = 0; colIx < sd.data.length; ++colIx) {
+ LlapDataBuffer[][] colData = sd.data[colIx];
+ if (colData == null) continue;
+ boolean hasAllData = true;
+ for (int j = 0; (j < colData.length) && hasAllData; ++j) {
+ LlapDataBuffer[] streamData = colData[j];
+ if (streamData == null) continue;
+ for (int k = 0; k < streamData.length; ++k) {
+ LlapDataBuffer buf = streamData[k];
+ hasAllData = hasAllData && lockBuffer(buf, false);
+ if (!hasAllData) break;
+ unlockBuffer(buf, true);
+ }
+ }
+ if (!hasAllData) {
+ handleRemovedColumnData(colData);
+ sd.data[colIx] = null;
+ }
+ }
+ }
+ } finally {
+ fd.rwLock.writeLock().unlock();
+ }
+ return leftToCheck - 1;
+ }
+ }
+
+ @Override
+ public boolean incRefBuffer(MemoryBuffer buffer) {
+ // notifyReused implies that buffer is already locked; it's also called once for new
+ // buffers that are not cached yet. Don't notify cache policy.
+ return lockBuffer(((LlapDataBuffer)buffer), false);
+ }
+
+ @Override
+ public Allocator getAllocator() {
+ return allocator;
+ }
+
+ @Override
+ public String debugDumpForOom() {
+ StringBuilder sb = new StringBuilder("File cache state ");
+ for (Map.Entry<Object, FileCache<FileData>> e : cache.entrySet()) {
+ if (!e.getValue().incRef()) continue;
+ try {
+ sb.append("\n file " + e.getKey());
+ sb.append("\n [");
+ e.getValue().getCache().toString(sb);
+ sb.append("]");
+ } finally {
+ e.getValue().decRef();
+ }
+ }
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
index 290624d..ac9c1da 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
@@ -27,21 +27,14 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.ConsumerFeedback;
-import org.apache.hadoop.hive.llap.counters.FragmentCountersMap;
-import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
-import org.apache.hadoop.hive.llap.daemon.impl.StatsRecordingThreadPool;
import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer;
import org.apache.hadoop.hive.llap.io.decode.ReadPipeline;
-import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.RowSchema;
@@ -53,15 +46,13 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat.AvoidSplitCombination;
import org.apache.hadoop.hive.ql.io.LlapAwareSplit;
import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
-import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
-import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
-import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.NullWritable;
@@ -89,65 +80,77 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB
@SuppressWarnings("rawtypes")
private final InputFormat sourceInputFormat;
private final AvoidSplitCombination sourceASC;
- private final ColumnVectorProducer cvp;
- private final ExecutorService executor;
+ @SuppressWarnings("deprecation")
+ private final Deserializer sourceSerDe;
+ final ColumnVectorProducer cvp;
+ final ExecutorService executor;
private final String hostName;
@SuppressWarnings("rawtypes")
- LlapInputFormat(InputFormat sourceInputFormat, ColumnVectorProducer cvp,
- ExecutorService executor) {
- // TODO: right now, we do nothing with source input format, ORC-only in the first cut.
- // We'd need to plumb it thru and use it to get data to cache/etc.
- assert sourceInputFormat instanceof OrcInputFormat;
+ LlapInputFormat(InputFormat sourceInputFormat, Deserializer sourceSerDe,
+ ColumnVectorProducer cvp, ExecutorService executor) {
this.executor = executor;
this.cvp = cvp;
this.sourceInputFormat = sourceInputFormat;
this.sourceASC = (sourceInputFormat instanceof AvoidSplitCombination)
? (AvoidSplitCombination)sourceInputFormat : null;
+ this.sourceSerDe = sourceSerDe;
this.hostName = HiveStringUtils.getHostname();
}
@Override
public RecordReader<NullWritable, VectorizedRowBatch> getRecordReader(
InputSplit split, JobConf job, Reporter reporter) throws IOException {
- boolean useLlapIo = true;
- if (split instanceof LlapAwareSplit) {
- useLlapIo = ((LlapAwareSplit) split).canUseLlapIo();
- }
+ RecordReader<NullWritable, VectorizedRowBatch> noLlap = checkLlapSplit(split, job, reporter);
+ if (noLlap != null) return noLlap;
+
boolean isVectorized = Utilities.getUseVectorizedInputFileFormat(job);
- if (!useLlapIo) {
- LlapIoImpl.LOG.warn("Not using LLAP IO for an unsupported split: " + split);
- return sourceInputFormat.getRecordReader(split, job, reporter);
- }
FileSplit fileSplit = (FileSplit) split;
reporter.setStatus(fileSplit.toString());
try {
List<Integer> includedCols = ColumnProjectionUtils.isReadAllColumns(job)
? null : ColumnProjectionUtils.getReadColumnIDs(job);
- LlapRecordReader rr = new LlapRecordReader(job, fileSplit, includedCols, hostName);
-
+ RecordReader<?, ?> sourceRr = null;
+ LlapRecordReader rr = new LlapRecordReader(job, fileSplit, includedCols, hostName, cvp,
+ executor, sourceInputFormat, sourceSerDe, reporter);
if (!rr.init()) {
return sourceInputFormat.getRecordReader(split, job, reporter);
}
- // vectorized row batch reader
- if (isVectorized) {
- return rr;
- }
-
- // row batch to row-by-row reader
- if (sourceInputFormat instanceof BatchToRowInputFormat) {
- return bogusCast(((BatchToRowInputFormat) sourceInputFormat).getWrapper(
- rr, rr.getVectorizedRowBatchCtx(), includedCols));
- }
-
- return sourceInputFormat.getRecordReader(split, job, reporter);
+ return wrapLlapReader(isVectorized, includedCols, rr, split, job, reporter);
} catch (Exception ex) {
throw new IOException(ex);
}
}
+ public RecordReader<NullWritable, VectorizedRowBatch> wrapLlapReader(
+ boolean isVectorized, List<Integer> includedCols, LlapRecordReader rr,
+ InputSplit split, JobConf job, Reporter reporter) throws IOException {
+ // vectorized row batch reader
+ if (isVectorized) {
+ return rr;
+ } else if (sourceInputFormat instanceof BatchToRowInputFormat) {
+ return bogusCast(((BatchToRowInputFormat) sourceInputFormat).getWrapper(
+ rr, rr.getVectorizedRowBatchCtx(), includedCols));
+ } else {
+ LlapIoImpl.LOG.warn("Not using LLAP IO for an unsupported split: " + split);
+ return sourceInputFormat.getRecordReader(split, job, reporter);
+ }
+ }
+
+ public RecordReader<NullWritable, VectorizedRowBatch> checkLlapSplit(
+ InputSplit split, JobConf job, Reporter reporter) throws IOException {
+ boolean useLlapIo = true;
+ if (split instanceof LlapAwareSplit) {
+ useLlapIo = ((LlapAwareSplit) split).canUseLlapIo();
+ }
+ if (useLlapIo) return null;
+
+ LlapIoImpl.LOG.warn("Not using LLAP IO for an unsupported split: " + split);
+ return sourceInputFormat.getRecordReader(split, job, reporter);
+ }
+
// Returning either a vectorized or non-vectorized reader from the same call requires breaking
// generics... this is how vectorization currently works.
@SuppressWarnings("unchecked")
@@ -160,276 +163,12 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB
return sourceInputFormat.getSplits(job, numSplits);
}
- private class LlapRecordReader
- implements RecordReader<NullWritable, VectorizedRowBatch>, Consumer<ColumnVectorBatch> {
- private final Logger LOG = LoggerFactory.getLogger(LlapRecordReader.class);
- private final FileSplit split;
- private final List<Integer> columnIds;
- private final SearchArgument sarg;
- private final String[] columnNames;
- private final VectorizedRowBatchCtx rbCtx;
- private final Object[] partitionValues;
-
- private final LinkedList<ColumnVectorBatch> pendingData = new LinkedList<ColumnVectorBatch>();
- private ColumnVectorBatch lastCvb = null;
- private boolean isFirst = true;
-
- private Throwable pendingError = null;
- /** Vector that is currently being processed by our user. */
- private boolean isDone = false;
- private final boolean isClosed = false;
- private final ConsumerFeedback<ColumnVectorBatch> feedback;
- private final QueryFragmentCounters counters;
- private long firstReturnTime;
-
- private final JobConf jobConf;
- private final TypeDescription fileSchema;
- private final boolean[] includedColumns;
- private final ReadPipeline rp;
-
- public LlapRecordReader(JobConf job, FileSplit split, List<Integer> includedCols,
- String hostName) throws IOException, HiveException {
- this.jobConf = job;
- this.split = split;
- this.columnIds = includedCols;
- this.sarg = ConvertAstToSearchArg.createFromConf(job);
- this.columnNames = ColumnProjectionUtils.getReadColumnNames(job);
- final String fragmentId = LlapTezUtils.getFragmentId(job);
- final String dagId = LlapTezUtils.getDagId(job);
- final String queryId = HiveConf.getVar(job, HiveConf.ConfVars.HIVEQUERYID);
- MDC.put("dagId", dagId);
- MDC.put("queryId", queryId);
- TezCounters taskCounters = null;
- if (fragmentId != null) {
- MDC.put("fragmentId", fragmentId);
- taskCounters = FragmentCountersMap.getCountersForFragment(fragmentId);
- LOG.info("Received fragment id: {}", fragmentId);
- } else {
- LOG.warn("Not using tez counters as fragment id string is null");
- }
- this.counters = new QueryFragmentCounters(job, taskCounters);
- this.counters.setDesc(QueryFragmentCounters.Desc.MACHINE, hostName);
-
- MapWork mapWork = Utilities.getMapWork(job);
- VectorizedRowBatchCtx ctx = mapWork.getVectorizedRowBatchCtx();
- rbCtx = ctx != null ? ctx : createFakeVrbCtx(mapWork);
-
- int partitionColumnCount = rbCtx.getPartitionColumnCount();
- if (partitionColumnCount > 0) {
- partitionValues = new Object[partitionColumnCount];
- VectorizedRowBatchCtx.getPartitionValues(rbCtx, job, split, partitionValues);
- } else {
- partitionValues = null;
- }
- boolean isAcidScan = HiveConf.getBoolVar(jobConf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN);
- TypeDescription schema = OrcInputFormat.getDesiredRowTypeDescr(job, isAcidScan, Integer.MAX_VALUE);
-
- // Create the consumer of encoded data; it will coordinate decoding to CVBs.
- rp = cvp.createReadPipeline(this, split, columnIds, sarg, columnNames, counters, schema);
- feedback = rp;
- fileSchema = rp.getFileSchema();
- includedColumns = rp.getIncludedColumns();
- }
-
- /**
- * Starts the data read pipeline
- */
- public boolean init() {
- SchemaEvolution schemaEvolution = new SchemaEvolution(fileSchema,
- rp.getReaderSchema(), includedColumns);
- for (Integer colId : columnIds) {
- if (!schemaEvolution.isPPDSafeConversion(colId)) {
- LlapIoImpl.LOG.warn("Unsupported schema evolution! Disabling Llap IO for {}", split);
- return false;
- }
- }
-
- // perform the data read asynchronously
- if (executor instanceof StatsRecordingThreadPool) {
- // Every thread created by this thread pool will use the same handler
- ((StatsRecordingThreadPool) executor)
- .setUncaughtExceptionHandler(new IOUncaughtExceptionHandler());
- }
- executor.submit(rp.getReadCallable());
- return true;
- }
-
- @Override
- public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException {
- assert value != null;
- if (isClosed) {
- throw new AssertionError("next called after close");
- }
- // Add partition cols if necessary (see VectorizedOrcInputFormat for details).
- boolean wasFirst = isFirst;
- if (isFirst) {
- if (partitionValues != null) {
- rbCtx.addPartitionColsToBatch(value, partitionValues);
- }
- isFirst = false;
- }
- ColumnVectorBatch cvb = null;
- try {
- cvb = nextCvb();
- } catch (InterruptedException e) {
- // Query might have been canceled. Stop the background processing.
- feedback.stop();
- throw new IOException(e);
- }
- if (cvb == null) {
- if (wasFirst) {
- firstReturnTime = counters.startTimeCounter();
- }
- counters.incrTimeCounter(LlapIOCounters.CONSUMER_TIME_NS, firstReturnTime);
- return false;
- }
- if (columnIds.size() != cvb.cols.length) {
- throw new RuntimeException("Unexpected number of columns, VRB has " + columnIds.size()
- + " included, but the reader returned " + cvb.cols.length);
- }
- // VRB was created from VrbCtx, so we already have pre-allocated column vectors
- for (int i = 0; i < cvb.cols.length; ++i) {
- // Return old CVs (if any) to caller. We assume these things all have the same schema.
- cvb.swapColumnVector(i, value.cols, columnIds.get(i));
- }
- value.selectedInUse = false;
- value.size = cvb.size;
- if (wasFirst) {
- firstReturnTime = counters.startTimeCounter();
- }
- return true;
- }
-
- public VectorizedRowBatchCtx getVectorizedRowBatchCtx() {
- return rbCtx;
- }
-
- private final class IOUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
- @Override
- public void uncaughtException(final Thread t, final Throwable e) {
- LlapIoImpl.LOG.error("Unhandled error from reader thread. threadName: {} threadId: {}" +
- " Message: {}", t.getName(), t.getId(), e.getMessage());
- setError(e);
- }
- }
-
- ColumnVectorBatch nextCvb() throws InterruptedException, IOException {
- boolean isFirst = (lastCvb == null);
- if (!isFirst) {
- feedback.returnData(lastCvb);
- }
- synchronized (pendingData) {
- // We are waiting for next block. Either we will get it, or be told we are done.
- boolean doLogBlocking = LlapIoImpl.LOG.isTraceEnabled() && isNothingToReport();
- if (doLogBlocking) {
- LlapIoImpl.LOG.trace("next will block");
- }
- while (isNothingToReport()) {
- pendingData.wait(100);
- }
- if (doLogBlocking) {
- LlapIoImpl.LOG.trace("next is unblocked");
- }
- rethrowErrorIfAny();
- lastCvb = pendingData.poll();
- }
- if (LlapIoImpl.LOG.isTraceEnabled() && lastCvb != null) {
- LlapIoImpl.LOG.trace("Processing will receive vector {}", lastCvb);
- }
- return lastCvb;
- }
-
- private boolean isNothingToReport() {
- return !isDone && pendingData.isEmpty() && pendingError == null;
- }
-
- @Override
- public NullWritable createKey() {
- return NullWritable.get();
- }
-
- @Override
- public VectorizedRowBatch createValue() {
- return rbCtx.createVectorizedRowBatch();
- }
-
- @Override
- public long getPos() throws IOException {
- return -1; // Position doesn't make sense for async reader, chunk order is arbitrary.
- }
-
- @Override
- public void close() throws IOException {
- if (LlapIoImpl.LOG.isTraceEnabled()) {
- LlapIoImpl.LOG.trace("close called; closed {}, done {}, err {}, pending {}",
- isClosed, isDone, pendingError, pendingData.size());
- }
- LlapIoImpl.LOG.info("Llap counters: {}" ,counters); // This is where counters are logged!
- feedback.stop();
- rethrowErrorIfAny();
- MDC.clear();
- }
-
- private void rethrowErrorIfAny() throws IOException {
- if (pendingError == null) return;
- if (pendingError instanceof IOException) {
- throw (IOException)pendingError;
- }
- throw new IOException(pendingError);
- }
-
- @Override
- public void setDone() {
- if (LlapIoImpl.LOG.isTraceEnabled()) {
- LlapIoImpl.LOG.trace("setDone called; closed {}, done {}, err {}, pending {}",
- isClosed, isDone, pendingError, pendingData.size());
- }
- synchronized (pendingData) {
- isDone = true;
- pendingData.notifyAll();
- }
- }
-
- @Override
- public void consumeData(ColumnVectorBatch data) {
- if (LlapIoImpl.LOG.isTraceEnabled()) {
- LlapIoImpl.LOG.trace("consume called; closed {}, done {}, err {}, pending {}",
- isClosed, isDone, pendingError, pendingData.size());
- }
- synchronized (pendingData) {
- if (isClosed) {
- return;
- }
- pendingData.add(data);
- pendingData.notifyAll();
- }
- }
-
- @Override
- public void setError(Throwable t) {
- counters.incrCounter(LlapIOCounters.NUM_ERRORS);
- LlapIoImpl.LOG.info("setError called; closed {}, done {}, err {}, pending {}",
- isClosed, isDone, pendingError, pendingData.size());
- assert t != null;
- synchronized (pendingData) {
- pendingError = t;
- pendingData.notifyAll();
- }
- }
-
- @Override
- public float getProgress() throws IOException {
- // TODO: plumb progress info thru the reader if we can get metadata from loader first.
- return 0.0f;
- }
- }
-
@Override
public boolean shouldSkipCombine(Path path, Configuration conf) throws IOException {
return sourceASC == null ? false : sourceASC.shouldSkipCombine(path, conf);
}
- private static VectorizedRowBatchCtx createFakeVrbCtx(MapWork mapWork) throws HiveException {
+ static VectorizedRowBatchCtx createFakeVrbCtx(MapWork mapWork) throws HiveException {
// This is based on Vectorizer code, minus the validation.
// Add all non-virtual columns from the TableScan operator.
@@ -477,5 +216,4 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB
}
return tableScanOperator;
}
-
}
http://git-wip-us.apache.org/repos/asf/hive/blob/682a3c7b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index 8048624..7cfd133 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -47,16 +47,20 @@ import org.apache.hadoop.hive.llap.cache.LowLevelCacheMemoryManager;
import org.apache.hadoop.hive.llap.cache.LowLevelCachePolicy;
import org.apache.hadoop.hive.llap.cache.LowLevelFifoCachePolicy;
import org.apache.hadoop.hive.llap.cache.LowLevelLrfuCachePolicy;
+import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl;
import org.apache.hadoop.hive.llap.cache.SimpleAllocator;
import org.apache.hadoop.hive.llap.cache.SimpleBufferManager;
import org.apache.hadoop.hive.llap.io.api.LlapIo;
import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer;
+import org.apache.hadoop.hive.llap.io.decode.GenericColumnVectorProducer;
import org.apache.hadoop.hive.llap.io.decode.OrcColumnVectorProducer;
import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics;
import org.apache.hadoop.hive.llap.metrics.MetricsUtils;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.metrics2.util.MBeans;
@@ -74,7 +78,8 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
private static final String MODE_CACHE = "cache";
- private final ColumnVectorProducer cvp;
+ // TODO: later, we may have a map
+ private final ColumnVectorProducer orcCvp, genericCvp;
private final ExecutorService executor;
private final LlapDaemonCacheMetrics cacheMetrics;
private final LlapDaemonIOMetrics ioMetrics;
@@ -110,6 +115,7 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
OrcMetadataCache metadataCache = null;
LowLevelCache cache = null;
+ SerDeLowLevelCacheImpl serdeCache = null; // TODO: extract interface when needed
BufferUsageManager bufferManager = null;
if (useLowLevelCache) {
// Memory manager uses cache policy to trigger evictions, so create the policy first.
@@ -124,11 +130,15 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
this.allocator = allocator;
LowLevelCacheImpl cacheImpl = new LowLevelCacheImpl(
cacheMetrics, cachePolicy, allocator, true);
+ SerDeLowLevelCacheImpl serdeCacheImpl = new SerDeLowLevelCacheImpl(
+ cacheMetrics, cachePolicy, allocator);
cache = cacheImpl;
+ serdeCache = serdeCacheImpl;
boolean useGapCache = HiveConf.getBoolVar(conf, ConfVars.LLAP_CACHE_ENABLE_ORC_GAP_CACHE);
metadataCache = new OrcMetadataCache(memManager, cachePolicy, useGapCache);
// And finally cache policy uses cache to notify it of eviction. The cycle is complete!
- cachePolicy.setEvictionListener(new EvictionDispatcher(cache, metadataCache));
+ cachePolicy.setEvictionListener(new EvictionDispatcher(
+ cache, serdeCacheImpl, metadataCache, allocator));
cachePolicy.setParentDebugDumper(cacheImpl);
cacheImpl.startThreads(); // Start the cache threads.
bufferManager = cacheImpl; // Cache also serves as buffer manager.
@@ -145,8 +155,10 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
new LinkedBlockingQueue<Runnable>(),
new ThreadFactoryBuilder().setNameFormat("IO-Elevator-Thread-%d").setDaemon(true).build());
// TODO: this should depends on input format and be in a map, or something.
- this.cvp = new OrcColumnVectorProducer(
+ this.orcCvp = new OrcColumnVectorProducer(
metadataCache, cache, bufferManager, conf, cacheMetrics, ioMetrics);
+ this.genericCvp = new GenericColumnVectorProducer(
+ serdeCache, bufferManager, conf, cacheMetrics, ioMetrics);
LOG.info("LLAP IO initialized");
registerMXBeans();
@@ -159,8 +171,12 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
@SuppressWarnings("rawtypes")
@Override
public InputFormat<NullWritable, VectorizedRowBatch> getInputFormat(
- InputFormat sourceInputFormat) {
- return new LlapInputFormat(sourceInputFormat, cvp, executor);
+ InputFormat sourceInputFormat, Deserializer sourceSerDe) {
+ ColumnVectorProducer cvp = genericCvp;
+ if (sourceInputFormat instanceof OrcInputFormat) {
+ cvp = orcCvp; // Special-case for ORC.
+ }
+ return new LlapInputFormat(sourceInputFormat, sourceSerDe, cvp, executor);
}
@Override