You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/08/31 21:26:56 UTC
[2/2] hive git commit: HIVE-17006 : LLAP: Parquet caching v1 (Sergey
Shelukhin, reviewed by Gunther Hagleitner)
HIVE-17006 : LLAP: Parquet caching v1 (Sergey Shelukhin, reviewed by Gunther Hagleitner)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9e673a73
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9e673a73
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9e673a73
Branch: refs/heads/master
Commit: 9e673a73d28ef047c5d4299b415c306423f68c71
Parents: 642acdf
Author: sergey <se...@apache.org>
Authored: Thu Aug 31 14:21:04 2017 -0700
Committer: sergey <se...@apache.org>
Committed: Thu Aug 31 14:21:04 2017 -0700
----------------------------------------------------------------------
.../apache/hadoop/hive/common/FileUtils.java | 35 +
.../test/resources/testconfiguration.properties | 1 +
.../apache/hadoop/hive/llap/io/api/LlapIo.java | 4 +-
.../hive/llap/cache/EvictionDispatcher.java | 13 +-
.../hive/llap/cache/LlapAllocatorBuffer.java | 8 +-
.../hive/llap/io/api/impl/LlapIoImpl.java | 99 ++-
.../io/metadata/ParquetMetadataCacheImpl.java | 353 ++++++++
.../hadoop/hive/llap/LlapCacheAwareFs.java | 422 +++++++++
.../org/apache/hadoop/hive/ql/io/HdfsUtils.java | 3 +-
.../hadoop/hive/ql/io/HiveInputFormat.java | 103 ++-
.../io/LlapCacheOnlyInputFormatInterface.java | 28 +
.../ql/io/parquet/MapredParquetInputFormat.java | 12 +-
.../parquet/VectorizedParquetInputFormat.java | 22 +-
.../vector/ParquetFooterInputFromCache.java | 196 +++++
.../vector/VectorizedParquetRecordReader.java | 169 +++-
.../org/apache/hadoop/hive/ql/plan/MapWork.java | 28 +-
.../clientpositive/parquet_ppd_decimal.q | 1 +
.../clientpositive/parquet_predicate_pushdown.q | 1 +
.../test/queries/clientpositive/parquet_types.q | 1 +
.../parquet_types_vectorization.q | 1 +
.../queries/clientpositive/vectorized_parquet.q | 1 +
.../clientpositive/vectorized_parquet_types.q | 1 +
.../llap/parquet_predicate_pushdown.q.out | 18 +-
.../llap/parquet_types_vectorization.q.out | 849 +++++++++++++++++++
.../llap/vector_partitioned_date_time.q.out | 16 +-
.../llap/vectorized_parquet.q.out | 2 +-
.../llap/vectorized_parquet_types.q.out | 2 +-
.../hive/common/io/FileMetadataCache.java | 51 ++
.../io/encoded/MemoryBufferOrBuffers.java | 24 +
29 files changed, 2360 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
index e784797..0feff59 100644
--- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
@@ -18,11 +18,14 @@
package org.apache.hadoop.hive.common;
+import java.io.EOFException;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
import java.security.AccessControlException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
@@ -1033,4 +1036,36 @@ public final class FileUtils {
return result;
}
+ /**
+ * Reads length bytes of data from the stream into the byte buffer.
+ * @param stream Stream to read from.
+ * @param length The number of bytes to read.
+ * @param bb The buffer to read into; the data is written at current position and then the
+ * position is incremented by length.
+ * @throws EOFException the length bytes cannot be read. The buffer position is not modified.
+ */
+ public static void readFully(InputStream stream, int length, ByteBuffer bb) throws IOException {
+ byte[] b = null;
+ int offset = 0;
+ if (bb.hasArray()) {
+ b = bb.array();
+ offset = bb.arrayOffset() + bb.position();
+ } else {
+ b = new byte[bb.remaining()];
+ }
+ int fullLen = length;
+ while (length > 0) {
+ int result = stream.read(b, offset, length);
+ if (result < 0) {
+ throw new EOFException("Reading " + fullLen + " bytes");
+ }
+ offset += result;
+ length -= result;
+ }
+ if (!bb.hasArray()) {
+ bb.put(b);
+ } else {
+ bb.position(bb.position() + fullLen);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 6f2efa7..f452341 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -75,6 +75,7 @@ minillap.shared.query.files=insert_into1.q,\
orc_merge4.q,\
orc_merge_diff_fs.q,\
parallel_colstats.q,\
+ parquet_types_vectorization.q,\
unionDistinct_1.q,\
union_type_chk.q,\
cte_2.q,\
http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java
index 42129b7..d2f9e6c 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java
@@ -23,7 +23,9 @@ import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.InputFormat;
public interface LlapIo<T> {
- InputFormat<NullWritable, T> getInputFormat(InputFormat sourceInputFormat, Deserializer serde);
+ InputFormat<NullWritable, T> getInputFormat(
+ InputFormat<?, ?> sourceInputFormat, Deserializer serde);
void close();
String getMemoryInfo();
+ void initCacheOnlyInputFormat(InputFormat<?, ?> inputFormat);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
index 0cbc8f6..c5248ce 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.hive.llap.io.metadata.OrcFileEstimateErrors;
import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata;
import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache;
import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
+import org.apache.hadoop.hive.llap.io.metadata.ParquetMetadataCacheImpl;
+import org.apache.hadoop.hive.llap.io.metadata.ParquetMetadataCacheImpl.LlapFileMetadataBuffer;
/**
* Eviction dispatcher - uses double dispatch to route eviction notifications to correct caches.
@@ -31,11 +33,15 @@ public final class EvictionDispatcher implements EvictionListener, LlapOomDebugD
private final SerDeLowLevelCacheImpl serdeCache;
private final OrcMetadataCache metadataCache;
private final EvictionAwareAllocator allocator;
+ // TODO# temporary, will be merged with OrcMetadataCache after HIVE-15665.
+ private final ParquetMetadataCacheImpl parquetMetadataCache;
public EvictionDispatcher(LowLevelCache dataCache, SerDeLowLevelCacheImpl serdeCache,
- OrcMetadataCache metadataCache, EvictionAwareAllocator allocator) {
+ OrcMetadataCache metadataCache, EvictionAwareAllocator allocator,
+ ParquetMetadataCacheImpl parquetMetadataCache) {
this.dataCache = dataCache;
this.metadataCache = metadataCache;
+ this.parquetMetadataCache = parquetMetadataCache;
this.serdeCache = serdeCache;
this.allocator = allocator;
}
@@ -45,10 +51,13 @@ public final class EvictionDispatcher implements EvictionListener, LlapOomDebugD
buffer.notifyEvicted(this); // This will call one of the specific notifyEvicted overloads.
}
+ public void notifyEvicted(LlapFileMetadataBuffer buffer) {
+ this.parquetMetadataCache.notifyEvicted(buffer);
+ }
+
public void notifyEvicted(LlapSerDeDataBuffer buffer) {
serdeCache.notifyEvicted(buffer);
allocator.deallocateEvicted(buffer);
-
}
public void notifyEvicted(LlapDataBuffer buffer) {
http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapAllocatorBuffer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapAllocatorBuffer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapAllocatorBuffer.java
index 52144c2..8f57a04 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapAllocatorBuffer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapAllocatorBuffer.java
@@ -131,15 +131,17 @@ public abstract class LlapAllocatorBuffer extends LlapCacheableBuffer implements
long newState, oldState;
do {
oldState = state.get();
+ // We have to check it here since invalid decref will overflow.
+ int oldRefCount = State.getRefCount(oldState);
+ if (oldRefCount == 0) {
+ throw new AssertionError("Invalid decRef when refCount is 0: " + this);
+ }
newState = State.decRefCount(oldState);
} while (!state.compareAndSet(oldState, newState));
int newRefCount = State.getRefCount(newState);
if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
LlapIoImpl.LOCKING_LOGGER.trace("Unlocked {}; refcount {}", this, newRefCount);
}
- if (newRefCount < 0) {
- throw new AssertionError("Unexpected refCount " + newRefCount + ": " + this);
- }
return newRefCount;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index 35b9d1f..f42622b 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -34,11 +34,18 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.io.Allocator;
+import org.apache.hadoop.hive.common.io.DataCache;
+import org.apache.hadoop.hive.common.io.DiskRange;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
+import org.apache.hadoop.hive.common.io.FileMetadataCache;
+import org.apache.hadoop.hive.common.io.Allocator.BufferObjectFactory;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.cache.BuddyAllocator;
import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
import org.apache.hadoop.hive.llap.cache.EvictionDispatcher;
+import org.apache.hadoop.hive.llap.cache.LlapDataBuffer;
import org.apache.hadoop.hive.llap.cache.LlapOomDebugDump;
import org.apache.hadoop.hive.llap.cache.LowLevelCache;
import org.apache.hadoop.hive.llap.cache.LowLevelCacheImpl;
@@ -49,24 +56,30 @@ import org.apache.hadoop.hive.llap.cache.LowLevelLrfuCachePolicy;
import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl;
import org.apache.hadoop.hive.llap.cache.SimpleAllocator;
import org.apache.hadoop.hive.llap.cache.SimpleBufferManager;
+import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
import org.apache.hadoop.hive.llap.io.api.LlapIo;
import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer;
import org.apache.hadoop.hive.llap.io.decode.GenericColumnVectorProducer;
import org.apache.hadoop.hive.llap.io.decode.OrcColumnVectorProducer;
import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache;
+import org.apache.hadoop.hive.llap.io.metadata.ParquetMetadataCacheImpl;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics;
import org.apache.hadoop.hive.llap.metrics.MetricsUtils;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.LlapCacheOnlyInputFormatInterface;
import org.apache.hadoop.hive.ql.io.orc.encoded.IoTrace;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hive.common.util.FixedSizedObjectPool;
+
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -86,8 +99,13 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
private ObjectName buddyAllocatorMXBean;
private final Allocator allocator;
private final LlapOomDebugDump memoryDump;
+ private final FileMetadataCache fileMetadataCache;
+ private final LowLevelCache dataCache;
+ private final BufferUsageManager bufferManager;
+ private final Configuration daemonConf;
private LlapIoImpl(Configuration conf) throws IOException {
+ this.daemonConf = conf;
String ioMode = HiveConf.getVar(conf, HiveConf.ConfVars.LLAP_IO_MEMORY_MODE);
boolean useLowLevelCache = LlapIoImpl.MODE_CACHE.equalsIgnoreCase(ioMode);
LOG.info("Initializing LLAP IO in {} mode", useLowLevelCache ? LlapIoImpl.MODE_CACHE : "none");
@@ -115,7 +133,6 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
sessionId);
OrcMetadataCache metadataCache = null;
- LowLevelCache cache = null;
SerDeLowLevelCacheImpl serdeCache = null; // TODO: extract interface when needed
BufferUsageManager bufferManagerOrc = null, bufferManagerGeneric = null;
boolean isEncodeEnabled = HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ENCODE_ENABLED);
@@ -154,16 +171,22 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
this.memoryDump = allocator;
LowLevelCacheImpl cacheImpl = new LowLevelCacheImpl(
cacheMetrics, cachePolicy, allocator, true);
- cache = cacheImpl;
+ dataCache = cacheImpl;
if (isEncodeEnabled) {
SerDeLowLevelCacheImpl serdeCacheImpl = new SerDeLowLevelCacheImpl(
cacheMetrics, cachePolicy, allocator);
serdeCache = serdeCacheImpl;
}
+
boolean useGapCache = HiveConf.getBoolVar(conf, ConfVars.LLAP_CACHE_ENABLE_ORC_GAP_CACHE);
metadataCache = new OrcMetadataCache(metaMemManager, metaCachePolicy, useGapCache);
+ // TODO# temporary, see comments there
+ ParquetMetadataCacheImpl parquetMc = new ParquetMetadataCacheImpl(
+ allocator, memManager, cachePolicy, cacheMetrics);
+ fileMetadataCache = parquetMc;
// And finally cache policy uses cache to notify it of eviction. The cycle is complete!
- EvictionDispatcher e = new EvictionDispatcher(cache, serdeCache, metadataCache, allocator);
+ EvictionDispatcher e = new EvictionDispatcher(
+ dataCache, serdeCache, metadataCache, allocator, parquetMc);
if (isSplitCache) {
metaCachePolicy.setEvictionListener(e);
metaCachePolicy.setParentDebugDumper(e);
@@ -172,14 +195,15 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
cachePolicy.setParentDebugDumper(e);
cacheImpl.startThreads(); // Start the cache threads.
- bufferManagerOrc = cacheImpl; // Cache also serves as buffer manager.
+ bufferManager = bufferManagerOrc = cacheImpl; // Cache also serves as buffer manager.
bufferManagerGeneric = serdeCache;
} else {
this.allocator = new SimpleAllocator(conf);
memoryDump = null;
+ fileMetadataCache = null;
SimpleBufferManager sbm = new SimpleBufferManager(allocator, cacheMetrics);
- bufferManagerOrc = bufferManagerGeneric = sbm;
- cache = sbm;
+ bufferManager = bufferManagerOrc = bufferManagerGeneric = sbm;
+ dataCache = sbm;
}
// IO thread pool. Listening is used for unhandled errors for now (TODO: remove?)
int numThreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_IO_THREADPOOL_SIZE);
@@ -190,7 +214,7 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
FixedSizedObjectPool<IoTrace> tracePool = IoTrace.createTracePool(conf);
// TODO: this should depends on input format and be in a map, or something.
this.orcCvp = new OrcColumnVectorProducer(
- metadataCache, cache, bufferManagerOrc, conf, cacheMetrics, ioMetrics, tracePool);
+ metadataCache, dataCache, bufferManagerOrc, conf, cacheMetrics, ioMetrics, tracePool);
this.genericCvp = isEncodeEnabled ? new GenericColumnVectorProducer(
serdeCache, bufferManagerGeneric, conf, cacheMetrics, ioMetrics, tracePool) : null;
LOG.info("LLAP IO initialized");
@@ -210,10 +234,9 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
return sb.toString();
}
- @SuppressWarnings("rawtypes")
@Override
public InputFormat<NullWritable, VectorizedRowBatch> getInputFormat(
- InputFormat sourceInputFormat, Deserializer sourceSerDe) {
+ InputFormat<?, ?> sourceInputFormat, Deserializer sourceSerDe) {
ColumnVectorProducer cvp = genericCvp;
if (sourceInputFormat instanceof OrcInputFormat) {
cvp = orcCvp; // Special-case for ORC.
@@ -233,4 +256,62 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
}
executor.shutdownNow();
}
+
+
+ @Override
+ public void initCacheOnlyInputFormat(InputFormat<?, ?> inputFormat) {
+ LlapCacheOnlyInputFormatInterface cacheIf = (LlapCacheOnlyInputFormatInterface)inputFormat;
+ cacheIf.injectCaches(fileMetadataCache,
+ new GenericDataCache(dataCache, bufferManager), daemonConf);
+ }
+
+ private class GenericDataCache implements DataCache, BufferObjectFactory {
+ private final LowLevelCache lowLevelCache;
+ private final BufferUsageManager bufferManager;
+
+ public GenericDataCache(LowLevelCache lowLevelCache, BufferUsageManager bufferManager) {
+ this.lowLevelCache = lowLevelCache;
+ this.bufferManager = bufferManager;
+ }
+
+ @Override
+ public DiskRangeList getFileData(Object fileKey, DiskRangeList range,
+ long baseOffset, DiskRangeListFactory factory, BooleanRef gotAllData) {
+ // TODO: we currently pass null counters because this doesn't use LlapRecordReader.
+ // Create counters for non-elevator-using fragments also?
+ return lowLevelCache.getFileData(fileKey, range, baseOffset, factory, null, gotAllData);
+ }
+
+ @Override
+ public long[] putFileData(Object fileKey, DiskRange[] ranges,
+ MemoryBuffer[] data, long baseOffset) {
+ return lowLevelCache.putFileData(fileKey, ranges, data, baseOffset, Priority.NORMAL, null);
+ }
+
+ @Override
+ public void releaseBuffer(MemoryBuffer buffer) {
+ bufferManager.decRefBuffer(buffer);
+ }
+
+ @Override
+ public void reuseBuffer(MemoryBuffer buffer) {
+ boolean isReused = bufferManager.incRefBuffer(buffer);
+ assert isReused;
+ }
+
+ @Override
+ public Allocator getAllocator() {
+ return bufferManager.getAllocator();
+ }
+
+ @Override
+ public BufferObjectFactory getDataBufferFactory() {
+ return this;
+ }
+
+ @Override
+ public MemoryBuffer create() {
+ return new LlapDataBuffer();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ParquetMetadataCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ParquetMetadataCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ParquetMetadataCacheImpl.java
new file mode 100644
index 0000000..b61a8ca
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ParquetMetadataCacheImpl.java
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.io.metadata;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.common.io.FileMetadataCache;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers;
+import org.apache.hadoop.hive.llap.cache.EvictionAwareAllocator;
+import org.apache.hadoop.hive.llap.cache.EvictionDispatcher;
+import org.apache.hadoop.hive.llap.cache.LlapAllocatorBuffer;
+import org.apache.hadoop.hive.llap.cache.LlapOomDebugDump;
+import org.apache.hadoop.hive.llap.cache.LowLevelCachePolicy;
+import org.apache.hadoop.hive.llap.cache.MemoryManager;
+import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
+import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
+import org.apache.parquet.io.SeekableInputStream;
+
+// TODO# merge with OrcMetadataCache (and rename) after HIVE-15665. Shares a lot of the code.
+public class ParquetMetadataCacheImpl implements LlapOomDebugDump, FileMetadataCache {
+ private final ConcurrentHashMap<Object, LlapBufferOrBuffers> metadata =
+ new ConcurrentHashMap<>();
+
+ private final MemoryManager memoryManager;
+ private final LowLevelCachePolicy policy;
+ private final EvictionAwareAllocator allocator;
+ private final LlapDaemonCacheMetrics metrics;
+
+ public ParquetMetadataCacheImpl(EvictionAwareAllocator allocator, MemoryManager memoryManager,
+ LowLevelCachePolicy policy, LlapDaemonCacheMetrics metrics) {
+ this.memoryManager = memoryManager;
+ this.allocator = allocator;
+ this.policy = policy;
+ this.metrics = metrics;
+ }
+
+ public void notifyEvicted(LlapFileMetadataBuffer buffer) {
+ LlapBufferOrBuffers removed = metadata.remove(buffer.getFileKey());
+ if (removed == null) return;
+ if (removed.getSingleBuffer() != null) {
+ assert removed.getSingleBuffer() == buffer;
+ return;
+ }
+ discardMultiBuffer(removed);
+ }
+
+ @Override
+ public LlapBufferOrBuffers getFileMetadata(Object fileKey) {
+ LlapBufferOrBuffers result = metadata.get(fileKey);
+ if (result == null) return null;
+ if (!lockBuffer(result, true)) {
+ // No need to discard the buffer we cannot lock - eviction takes care of that.
+ metadata.remove(fileKey, result);
+ return null;
+ }
+ return result;
+ }
+
+ @Override
+ public LlapBufferOrBuffers putFileMetadata(
+ Object fileKey, int length, InputStream is) throws IOException {
+ LlapBufferOrBuffers result = null;
+ while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value).
+ LlapBufferOrBuffers oldVal = metadata.get(fileKey);
+ if (oldVal == null) {
+ result = wrapBbForFile(result, fileKey, length, is);
+ if (!lockBuffer(result, false)) {
+ throw new AssertionError("Cannot lock a newly created value " + result);
+ }
+ oldVal = metadata.putIfAbsent(fileKey, result);
+ if (oldVal == null) {
+ cacheInPolicy(result); // Cached successfully, add to policy.
+ return result;
+ }
+ }
+ if (lockOldVal(fileKey, result, oldVal)) {
+ return oldVal;
+ }
+ // We found some old value but couldn't incRef it; remove it.
+ metadata.remove(fileKey, oldVal);
+ }
+ }
+
+ private void cacheInPolicy(LlapBufferOrBuffers buffers) {
+ LlapAllocatorBuffer singleBuffer = buffers.getSingleLlapBuffer();
+ if (singleBuffer != null) {
+ policy.cache(singleBuffer, Priority.HIGH);
+ return;
+ }
+ for (LlapAllocatorBuffer buffer : buffers.getMultipleLlapBuffers()) {
+ policy.cache(buffer, Priority.HIGH);
+ }
+ }
+
+ private <T extends LlapBufferOrBuffers> boolean lockOldVal(Object key, T newVal, T oldVal) {
+ if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
+ LlapIoImpl.CACHE_LOGGER.trace("Trying to cache when metadata is already cached for" +
+ " {}; old {}, new {}", key, oldVal, newVal);
+ }
+ if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
+ LlapIoImpl.LOCKING_LOGGER.trace("Locking {} due to cache collision", oldVal);
+ }
+ if (lockBuffer(oldVal, true)) {
+ // We found an old, valid block for this key in the cache.
+ if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
+ LlapIoImpl.LOCKING_LOGGER.trace("Unlocking {} due to cache collision with {}",
+ newVal, oldVal);
+ }
+
+ if (newVal != null) {
+ unlockBuffer(newVal, false);
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void decRefBuffer(MemoryBufferOrBuffers buffer) {
+ if (!(buffer instanceof LlapBufferOrBuffers)) {
+ throw new AssertionError(buffer.getClass());
+ }
+ unlockBuffer((LlapBufferOrBuffers)buffer, true);
+ }
+
+ private LlapBufferOrBuffers wrapBbForFile(LlapBufferOrBuffers result,
+ Object fileKey, int length, InputStream stream) throws IOException {
+ if (result != null) return result;
+ int maxAlloc = allocator.getMaxAllocation();
+ LlapFileMetadataBuffer[] largeBuffers = null;
+ if (maxAlloc < length) {
+ largeBuffers = new LlapFileMetadataBuffer[length / maxAlloc];
+ for (int i = 0; i < largeBuffers.length; ++i) {
+ largeBuffers[i] = new LlapFileMetadataBuffer(fileKey);
+ }
+ allocator.allocateMultiple(largeBuffers, maxAlloc, null);
+ for (int i = 0; i < largeBuffers.length; ++i) {
+ readIntoCacheBuffer(stream, maxAlloc, largeBuffers[i]);
+ }
+ }
+ int smallSize = length % maxAlloc;
+ if (smallSize == 0) {
+ return new LlapFileMetadataBuffers(largeBuffers);
+ } else {
+ LlapFileMetadataBuffer[] smallBuffer = new LlapFileMetadataBuffer[1];
+ smallBuffer[0] = new LlapFileMetadataBuffer(fileKey);
+ allocator.allocateMultiple(smallBuffer, length, null);
+ readIntoCacheBuffer(stream, smallSize, smallBuffer[0]);
+ if (largeBuffers == null) {
+ return smallBuffer[0];
+ } else {
+ LlapFileMetadataBuffer[] cacheData = new LlapFileMetadataBuffer[largeBuffers.length + 1];
+ System.arraycopy(largeBuffers, 0, cacheData, 0, largeBuffers.length);
+ cacheData[largeBuffers.length] = smallBuffer[0];
+ return new LlapFileMetadataBuffers(largeBuffers);
+ }
+ }
+ }
+
+ private static void readIntoCacheBuffer(
+ InputStream stream, int length, MemoryBuffer dest) throws IOException {
+ ByteBuffer bb = dest.getByteBufferRaw();
+ int pos = bb.position();
+ bb.limit(pos + length);
+ // TODO: SeekableInputStream.readFully eventually calls a Hadoop method that used to be
+ // buggy in 2.7 and also anyway just does a copy for a direct buffer. Do a copy here.
+ // ((SeekableInputStream)stream).readFully(bb);
+ FileUtils.readFully(stream, length, bb);
+ bb.position(pos);
+ }
+
+
+ private boolean lockBuffer(LlapBufferOrBuffers buffers, boolean doNotifyPolicy) {
+ LlapAllocatorBuffer buffer = buffers.getSingleLlapBuffer();
+ if (buffer != null) {
+ return lockOneBuffer(buffer, doNotifyPolicy);
+ }
+ LlapAllocatorBuffer[] bufferArray = buffers.getMultipleLlapBuffers();
+ for (int i = 0; i < bufferArray.length; ++i) {
+ if (lockOneBuffer(bufferArray[i], doNotifyPolicy)) continue;
+ for (int j = 0; j < i; ++j) {
+ unlockSingleBuffer(buffer, true);
+ }
+ discardMultiBuffer(buffers);
+ return false;
+ }
+ return true;
+ }
+
+ private void discardMultiBuffer(LlapBufferOrBuffers removed) {
+ long memoryFreed = 0;
+ for (LlapAllocatorBuffer buf : removed.getMultipleLlapBuffers()) {
+ long memUsage = buf.getMemoryUsage();
+ // We cannot just deallocate the buffer, as it can hypothetically have users.
+ int result = buf.invalidate();
+ switch (result) {
+ case LlapAllocatorBuffer.INVALIDATE_ALREADY_INVALID: continue; // Nothing to do.
+ case LlapAllocatorBuffer.INVALIDATE_FAILED: {
+ // Someone is using this buffer; eventually, it will be evicted.
+ continue;
+ }
+ case LlapAllocatorBuffer.INVALIDATE_OK: {
+ memoryFreed += memUsage;
+ allocator.deallocateEvicted(buf);
+ break;
+ }
+ default: throw new AssertionError(result);
+ }
+ }
+ memoryManager.releaseMemory(memoryFreed);
+ }
+
+ private boolean lockOneBuffer(LlapAllocatorBuffer buffer, boolean doNotifyPolicy) {
+ int rc = buffer.incRef();
+ if (rc > 0) {
+ metrics.incrCacheNumLockedBuffers();
+ }
+ if (doNotifyPolicy && rc == 1) {
+ // We have just locked a buffer that wasn't previously locked.
+ policy.notifyLock(buffer);
+ }
+ return rc > 0;
+ }
+
+ private void unlockBuffer(LlapBufferOrBuffers buffers, boolean isCached) {
+ LlapAllocatorBuffer singleBuffer = buffers.getSingleLlapBuffer();
+ if (singleBuffer != null) {
+ unlockSingleBuffer(singleBuffer, isCached);
+ return;
+ }
+ for (LlapAllocatorBuffer buffer : buffers.getMultipleLlapBuffers()) {
+ unlockSingleBuffer(buffer, isCached);
+ }
+ }
+
+ private void unlockSingleBuffer(LlapAllocatorBuffer buffer, boolean isCached) {
+ boolean isLastDecref = (buffer.decRef() == 0);
+ if (isLastDecref) {
+ if (isCached) {
+ policy.notifyUnlock(buffer);
+ } else {
+ allocator.deallocate(buffer);
+ }
+ }
+ metrics.decrCacheNumLockedBuffers();
+ }
+
+
+ public static interface LlapBufferOrBuffers extends MemoryBufferOrBuffers {
+ LlapAllocatorBuffer getSingleLlapBuffer();
+ LlapAllocatorBuffer[] getMultipleLlapBuffers();
+ }
+
+ public final static class LlapFileMetadataBuffer
+ extends LlapAllocatorBuffer implements LlapBufferOrBuffers {
+ private final Object fileKey;
+
+ public LlapFileMetadataBuffer(Object fileKey) {
+ this.fileKey = fileKey;
+ }
+
+ @Override
+ public void notifyEvicted(EvictionDispatcher evictionDispatcher) {
+ evictionDispatcher.notifyEvicted(this);
+ }
+
+ public Object getFileKey() {
+ return fileKey;
+ }
+
+ @Override
+ public LlapAllocatorBuffer getSingleLlapBuffer() {
+ return this;
+ }
+
+ @Override
+ public LlapAllocatorBuffer[] getMultipleLlapBuffers() {
+ return null;
+ }
+
+ @Override
+ public MemoryBuffer getSingleBuffer() {
+ return this;
+ }
+
+ @Override
+ public MemoryBuffer[] getMultipleBuffers() {
+ return null;
+ }
+ }
+
+ public final static class LlapFileMetadataBuffers implements LlapBufferOrBuffers {
+ private final LlapFileMetadataBuffer[] buffers;
+
+ public LlapFileMetadataBuffers(LlapFileMetadataBuffer[] buffers) {
+ this.buffers = buffers;
+ }
+
+ @Override
+ public LlapAllocatorBuffer getSingleLlapBuffer() {
+ return null;
+ }
+
+ @Override
+ public LlapAllocatorBuffer[] getMultipleLlapBuffers() {
+ return buffers;
+ }
+
+ @Override
+ public MemoryBuffer getSingleBuffer() {
+ return null;
+ }
+
+ @Override
+ public MemoryBuffer[] getMultipleBuffers() {
+ return buffers;
+ }
+ }
+
+ @Override
+ public String debugDumpForOom() {
+ // TODO: nothing, will be merged with ORC cache
+ return null;
+ }
+
+ @Override
+ public void debugDumpShort(StringBuilder sb) {
+ // TODO: nothing, will be merged with ORC cache
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java
new file mode 100644
index 0000000..5c1eed3
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java
@@ -0,0 +1,422 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap;
+
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.common.io.Allocator;
+import org.apache.hadoop.hive.common.io.DataCache;
+import org.apache.hadoop.hive.common.io.DiskRange;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+import org.apache.hadoop.hive.ql.io.orc.encoded.CacheChunk;
+import org.apache.hadoop.util.Progressable;
+import org.apache.orc.impl.RecordReaderUtils;
+
+/**
+ * This is currently only used by Parquet; however, a generally applicable approach is used -
+ * you pass in a set of offset pairs for a file, and the file is cached with these boundaries.
+ * Don't add anything format specific here.
+ */
+public class LlapCacheAwareFs extends FileSystem {
+ public static final String SCHEME = "llapcache";
+ private static AtomicLong currentSplitId = new AtomicLong(-1);
+ private URI uri;
+
+ // We store the chunk indices by split file; that way if several callers are reading
+ // the same file they can separately store and remove the relevant parts of the index.
+ private static final ConcurrentHashMap<Long, CacheAwareInputStream> files =
+ new ConcurrentHashMap<>();
+
+ public static Path registerFile(DataCache cache, Path path, Object fileKey,
+ TreeMap<Long, Long> index, Configuration conf) throws IOException {
+ long splitId = currentSplitId.incrementAndGet();
+ CacheAwareInputStream stream = new CacheAwareInputStream(
+ cache, conf, index, path, fileKey, -1);
+ if (files.putIfAbsent(splitId, stream) != null) {
+ throw new IOException("Record already exists for " + splitId);
+ }
+ conf.set("fs." + LlapCacheAwareFs.SCHEME + ".impl", LlapCacheAwareFs.class.getCanonicalName());
+ return new Path(SCHEME + "://" + SCHEME + "/" + splitId);
+ }
+
+ public static void unregisterFile(Path cachePath) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Unregistering " + cachePath);
+ }
+ files.remove(extractSplitId(cachePath));
+ }
+
+ @Override
+ public void initialize(URI uri, Configuration conf) throws IOException {
+ super.initialize(uri, conf);
+ this.uri = URI.create(SCHEME + "://" + SCHEME);
+ }
+
+ @Override
+ public FSDataInputStream open(Path path, int bufferSize) throws IOException {
+ return new FSDataInputStream(getCtx(path).cloneWithBufferSize(bufferSize));
+ }
+
+ private LlapCacheAwareFs.CacheAwareInputStream getCtx(Path path) {
+ return files.get(extractSplitId(path));
+ }
+
+ private static long extractSplitId(Path path) {
+ String pathOnly = path.toUri().getPath();
+ if (pathOnly.startsWith("/")) {
+ pathOnly = pathOnly.substring(1);
+ }
+ return Long.parseLong(pathOnly);
+ }
+
+ @Override
+ public URI getUri() {
+ return uri;
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setWorkingDirectory(Path arg0) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public FSDataOutputStream append(Path arg0, int arg1, Progressable arg2) throws IOException {
+ LlapCacheAwareFs.CacheAwareInputStream ctx = getCtx(arg0);
+ return ctx.getFs().append(ctx.path, arg1, arg2);
+ }
+
+ @Override
+ public FSDataOutputStream create(Path arg0, FsPermission arg1, boolean arg2, int arg3,
+ short arg4, long arg5, Progressable arg6) throws IOException {
+ LlapCacheAwareFs.CacheAwareInputStream ctx = getCtx(arg0);
+ return ctx.getFs().create(ctx.path, arg1, arg2, arg3, arg4, arg5, arg6);
+ }
+
+ @Override
+ public boolean delete(Path arg0, boolean arg1) throws IOException {
+ LlapCacheAwareFs.CacheAwareInputStream ctx = getCtx(arg0);
+ return ctx.getFs().delete(ctx.path, arg1);
+
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path arg0) throws IOException {
+ LlapCacheAwareFs.CacheAwareInputStream ctx = getCtx(arg0);
+ return ctx.getFs().getFileStatus(ctx.path);
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path arg0) throws FileNotFoundException, IOException {
+ LlapCacheAwareFs.CacheAwareInputStream ctx = getCtx(arg0);
+ return ctx.getFs().listStatus(ctx.path);
+ }
+
+ @Override
+ public boolean mkdirs(Path arg0, FsPermission arg1) throws IOException {
+ LlapCacheAwareFs.CacheAwareInputStream ctx = getCtx(arg0);
+ return ctx.getFs().mkdirs(ctx.path, arg1);
+ }
+
+ @Override
+ public boolean rename(Path arg0, Path arg1) throws IOException {
+ LlapCacheAwareFs.CacheAwareInputStream ctx1 = getCtx(arg0), ctx2 = getCtx(arg1);
+ return ctx1.getFs().rename(ctx1.path, ctx2.path);
+ }
+
+ private static class CacheAwareInputStream extends InputStream
+ implements Seekable, PositionedReadable {
+ private final TreeMap<Long, Long> chunkIndex;
+ private final Path path;
+ private final Object fileKey;
+ private final Configuration conf;
+ private final DataCache cache;
+ private final int bufferSize;
+ private long position = 0;
+
+ public CacheAwareInputStream(DataCache cache, Configuration conf,
+ TreeMap<Long, Long> chunkIndex, Path path, Object fileKey, int bufferSize) {
+ this.cache = cache;
+ this.fileKey = fileKey;
+ this.chunkIndex = chunkIndex;
+ this.path = path;
+ this.conf = conf;
+ this.bufferSize = bufferSize;
+ }
+
+ public LlapCacheAwareFs.CacheAwareInputStream cloneWithBufferSize(int bufferSize) {
+ return new CacheAwareInputStream(cache, conf, chunkIndex, path, fileKey, bufferSize);
+ }
+
+ @Override
+ public int read() throws IOException {
+ // This is not called by ConsecutiveChunk stuff in Parquet.
+ // If this were used, it might make sense to make it faster.
+ byte[] theByte = new byte[1];
+ int result = read(theByte, 0, 1);
+ if (result != 1) throw new EOFException();
+ return theByte[0] & 0xFF;
+ }
+
+ @Override
+ public int read(byte[] array, final int arrayOffset, final int len) throws IOException {
+ long readStartPos = position;
+ DiskRangeList drl = new DiskRangeList(readStartPos, readStartPos + len);
+ DataCache.BooleanRef gotAllData = new DataCache.BooleanRef();
+ drl = cache.getFileData(fileKey, drl, 0, new DataCache.DiskRangeListFactory() {
+ @Override
+ public DiskRangeList createCacheChunk(
+ MemoryBuffer buffer, long startOffset, long endOffset) {
+ CacheChunk result = new CacheChunk(); // TODO: pool?
+ result.init(buffer, startOffset, endOffset);
+ return result;
+ }
+ }, gotAllData);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Buffers after cache " + RecordReaderUtils.stringifyDiskRanges(drl));
+ }
+ if (gotAllData.value) {
+ long sizeRead = 0;
+ while (drl != null) {
+ assert drl.hasData();
+ long from = drl.getOffset(), to = drl.getEnd();
+ int offsetFromReadStart = (int)(from - readStartPos), candidateSize = (int)(to - from);
+ ByteBuffer data = drl.getData().duplicate();
+ data.get(array, arrayOffset + offsetFromReadStart, candidateSize);
+ sizeRead += candidateSize;
+ drl = drl.next;
+ }
+ validateAndUpdatePosition(len, sizeRead);
+ return len;
+ }
+ int maxAlloc = cache.getAllocator().getMaxAllocation();
+ // We have some disk data. Separate it by column chunk and put into cache.
+
+ // We started with a single DRL, so we assume there will be no consecutive missing blocks
+ // after the cache has inserted cache data. We also assume all the missing parts will
+ // represent one or several column chunks, since we always cache on column chunk boundaries.
+ DiskRangeList current = drl;
+ FileSystem fs = path.getFileSystem(conf);
+ FSDataInputStream is = fs.open(path, bufferSize);
+ Allocator allocator = cache.getAllocator();
+ long sizeRead = 0;
+ while (current != null) {
+ DiskRangeList candidate = current;
+ current = current.next;
+ long from = candidate.getOffset(), to = candidate.getEnd();
+ // The offset in the destination array for the beginning of this missing range.
+ int offsetFromReadStart = (int)(from - readStartPos), candidateSize = (int)(to - from);
+ if (candidate.hasData()) {
+ ByteBuffer data = candidate.getData().duplicate();
+ data.get(array, arrayOffset + offsetFromReadStart, candidateSize);
+ sizeRead += candidateSize;
+ continue;
+ }
+ // The data is not in cache.
+
+ // Account for potential partial chunks.
+ SortedMap<Long, Long> chunksInThisRead = getAndValidateMissingChunks(maxAlloc, from, to);
+
+ is.seek(from);
+ is.readFully(array, arrayOffset + offsetFromReadStart, candidateSize);
+ sizeRead += candidateSize;
+ // Now copy missing chunks (and parts of chunks) into cache buffers.
+ if (fileKey == null || cache == null) continue;
+ int extraDiskDataOffset = 0;
+ // TODO: should we try to make a giant array for one cache call to avoid overhead?
+ for (Map.Entry<Long, Long> missingChunk : chunksInThisRead.entrySet()) {
+ long chunkFrom = Math.max(from, missingChunk.getKey()),
+ chunkTo = Math.min(to, missingChunk.getValue()),
+ chunkLength = chunkTo - chunkFrom;
+ MemoryBuffer[] largeBuffers = null, smallBuffer = null, newCacheData = null;
+ try {
+ int largeBufCount = (int) (chunkLength / maxAlloc);
+ int smallSize = (int) (chunkLength % maxAlloc);
+ int chunkPartCount = largeBufCount + ((smallSize > 0) ? 1 : 0);
+ DiskRange[] cacheRanges = new DiskRange[chunkPartCount];
+ int extraOffsetInChunk = 0;
+ if (maxAlloc < chunkLength) {
+ largeBuffers = new MemoryBuffer[largeBufCount];
+ allocator.allocateMultiple(largeBuffers, maxAlloc, cache.getDataBufferFactory());
+ for (int i = 0; i < largeBuffers.length; ++i) {
+ // By definition here we copy up to the limit of the buffer.
+ ByteBuffer bb = largeBuffers[i].getByteBufferRaw();
+ int remaining = bb.remaining();
+ assert remaining == maxAlloc;
+ copyDiskDataToCacheBuffer(array,
+ arrayOffset + offsetFromReadStart + extraDiskDataOffset,
+ remaining, bb, cacheRanges, i, chunkFrom + extraOffsetInChunk);
+ extraDiskDataOffset += remaining;
+ extraOffsetInChunk += remaining;
+ }
+ }
+ newCacheData = largeBuffers;
+ largeBuffers = null;
+ if (smallSize > 0) {
+ smallBuffer = new MemoryBuffer[1];
+ allocator.allocateMultiple(smallBuffer, smallSize, cache.getDataBufferFactory());
+ ByteBuffer bb = smallBuffer[0].getByteBufferRaw();
+ copyDiskDataToCacheBuffer(array,
+ arrayOffset + offsetFromReadStart + extraDiskDataOffset,
+ smallSize, bb, cacheRanges, largeBufCount, chunkFrom + extraOffsetInChunk);
+ extraDiskDataOffset += smallSize;
+ extraOffsetInChunk += smallSize; // Not strictly necessary, noone will look at it.
+ if (newCacheData == null) {
+ newCacheData = smallBuffer;
+ } else {
+ // TODO: add allocate overload with an offset and length
+ MemoryBuffer[] combinedCacheData = new MemoryBuffer[largeBufCount + 1];
+ System.arraycopy(newCacheData, 0, combinedCacheData, 0, largeBufCount);
+ newCacheData = combinedCacheData;
+ newCacheData[largeBufCount] = smallBuffer[0];
+ }
+ smallBuffer = null;
+ }
+ cache.putFileData(fileKey, cacheRanges, newCacheData, 0);
+ } finally {
+ // We do not use the new cache buffers for the actual read, given the way read() API is.
+ // Therefore, we don't need to handle cache collisions - just decref all the buffers.
+ if (newCacheData != null) {
+ for (MemoryBuffer buffer : newCacheData) {
+ if (buffer == null) continue;
+ cache.releaseBuffer(buffer);
+ }
+ }
+ // If we have failed before building newCacheData, deallocate other the allocated.
+ if (largeBuffers != null) {
+ for (MemoryBuffer buffer : largeBuffers) {
+ if (buffer == null) continue;
+ allocator.deallocate(buffer);
+ }
+ }
+ if (smallBuffer != null && smallBuffer[0] != null) {
+ allocator.deallocate(smallBuffer[0]);
+ }
+ }
+ }
+ }
+ validateAndUpdatePosition(len, sizeRead);
+ return len;
+ }
+
+ private void validateAndUpdatePosition(int len, long sizeRead) {
+ if (sizeRead != len) {
+ throw new AssertionError("Reading at " + position + " for " + len + ": "
+ + sizeRead + " bytes copied");
+ }
+ position += len;
+ }
+
+ private void copyDiskDataToCacheBuffer(byte[] diskData, int offsetInDiskData, int sizeToCopy,
+ ByteBuffer cacheBuffer, DiskRange[] cacheRanges, int cacheRangeIx, long cacheRangeStart) {
+ int bbPos = cacheBuffer.position();
+ long cacheRangeEnd = cacheRangeStart + sizeToCopy;
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Caching [" + cacheRangeStart + ", " + cacheRangeEnd + ")");
+ }
+ cacheRanges[cacheRangeIx] = new DiskRange(cacheRangeStart, cacheRangeEnd);
+ cacheBuffer.put(diskData, offsetInDiskData, sizeToCopy);
+ cacheBuffer.position(bbPos);
+ }
+
+ private SortedMap<Long, Long> getAndValidateMissingChunks(
+ int maxAlloc, long from, long to) {
+ Map.Entry<Long, Long> firstMissing = chunkIndex.floorEntry(from);
+ if (firstMissing == null) {
+ throw new AssertionError("No lower bound for offset " + from);
+ }
+ if (firstMissing.getValue() <= from
+ || ((from - firstMissing.getKey()) % maxAlloc) != 0) {
+ // The data does not belong to a recognized chunk, or is split wrong.
+ throw new AssertionError("Lower bound for offset " + from + " is ["
+ + firstMissing.getKey() + ", " + firstMissing.getValue() + ")");
+ }
+ SortedMap<Long, Long> missingChunks = chunkIndex.subMap(firstMissing.getKey(), to);
+ if (missingChunks.isEmpty()) {
+ throw new AssertionError("No chunks for [" + from + ", " + to + ")");
+ }
+ long lastMissingOffset = missingChunks.lastKey(),
+ lastMissingEnd = missingChunks.get(lastMissingOffset);
+ if (lastMissingEnd < to
+ || (to != lastMissingEnd && ((to - lastMissingOffset) % maxAlloc) != 0)) {
+ // The data does not belong to a recognized chunk, or is split wrong.
+ throw new AssertionError("Lower bound for offset " + to + " is ["
+ + lastMissingOffset + ", " + lastMissingEnd + ")");
+ }
+ return missingChunks;
+ }
+
+ public FileSystem getFs() throws IOException {
+ return path.getFileSystem(conf);
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return position;
+ }
+
+ @Override
+ public void seek(long position) throws IOException {
+ this.position = position;
+ }
+
+ @Override
+ @Private
+ public boolean seekToNewSource(long arg0) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int read(long arg0, byte[] arg1, int arg2, int arg3) throws IOException {
+ seek(arg0);
+ return read(arg1, arg2, arg3);
+ }
+
+ @Override
+ public void readFully(long arg0, byte[] arg1) throws IOException {
+ read(arg0, arg1, 0, arg1.length);
+ }
+
+ @Override
+ public void readFully(long arg0, byte[] arg1, int arg2, int arg3) throws IOException {
+ read(arg0, arg1, 0, arg1.length);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java
index fa7f59d..102150a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java
@@ -49,7 +49,8 @@ public class HdfsUtils {
if (fileSystem instanceof DistributedFileSystem) {
DistributedFileSystem dfs = (DistributedFileSystem) fileSystem;
if ((!checkDefaultFs) || isDefaultFs(dfs)) {
- return SHIMS.getFileId(dfs, path.toUri().getPath());
+ Object result = SHIMS.getFileId(dfs, path.toUri().getPath());
+ if (result != null) return result;
}
}
if (!allowSynthetic) {
http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 442c921..5c9d289 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -214,6 +214,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
}
String ifName = inputFormat.getClass().getCanonicalName();
boolean isSupported = inputFormat instanceof LlapWrappableInputFormatInterface;
+ boolean isCacheOnly = inputFormat instanceof LlapCacheOnlyInputFormatInterface;
boolean isVectorized = Utilities.getUseVectorizedInputFileFormat(conf);
if (!isVectorized) {
// Pretend it's vectorized if the non-vector wrapped is enabled.
@@ -224,33 +225,17 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
if (isVectorized && !isSupported
&& HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ENCODE_ENABLED)) {
// See if we can use re-encoding to read the format thru IO elevator.
- String formatList = HiveConf.getVar(conf, ConfVars.LLAP_IO_ENCODE_FORMATS);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Checking " + ifName + " against " + formatList);
- }
- String[] formats = StringUtils.getStrings(formatList);
- if (formats != null) {
- for (String format : formats) {
- // TODO: should we check isAssignableFrom?
- if (ifName.equals(format)) {
- if (LOG.isInfoEnabled()) {
- LOG.info("Using SerDe-based LLAP reader for " + ifName);
- }
- isSupported = isSerdeBased = true;
- break;
- }
- }
- }
+ isSupported = isSerdeBased = checkInputFormatForLlapEncode(conf, ifName);
}
- if (!isSupported || !isVectorized) {
+ if ((!isSupported || !isVectorized) && !isCacheOnly) {
if (LOG.isInfoEnabled()) {
LOG.info("Not using llap for " + ifName + ": supported = "
- + isSupported + ", vectorized = " + isVectorized);
+ + isSupported + ", vectorized = " + isVectorized + ", cache only = " + isCacheOnly);
}
return inputFormat;
}
if (LOG.isDebugEnabled()) {
- LOG.debug("Wrapping " + ifName);
+ LOG.debug("Processing " + ifName);
}
@SuppressWarnings("unchecked")
@@ -264,43 +249,85 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
Deserializer serde = null;
if (isSerdeBased) {
if (part == null) {
- if (LOG.isInfoEnabled()) {
+ if (isCacheOnly) {
+ LOG.info("Using cache only because there's no partition spec for SerDe-based IF");
+ injectLlapCaches(inputFormat, llapIo);
+ } else {
LOG.info("Not using LLAP IO because there's no partition spec for SerDe-based IF");
}
return inputFormat;
}
- VectorPartitionDesc vpart = part.getVectorPartitionDesc();
- if (vpart != null) {
- VectorMapOperatorReadType old = vpart.getVectorMapOperatorReadType();
- if (old != VectorMapOperatorReadType.VECTORIZED_INPUT_FILE_FORMAT) {
+ serde = findSerDeForLlapSerDeIf(conf, part);
+ }
+ if (isSupported && isVectorized) {
+ InputFormat<?, ?> wrappedIf = llapIo.getInputFormat(inputFormat, serde);
+ // null means we cannot wrap; the cause is logged inside.
+ if (wrappedIf != null) {
+ return castInputFormat(wrappedIf);
+ }
+ }
+ if (isCacheOnly) {
+ injectLlapCaches(inputFormat, llapIo);
+ }
+ return inputFormat;
+ }
+
+ private static boolean checkInputFormatForLlapEncode(Configuration conf, String ifName) {
+ String formatList = HiveConf.getVar(conf, ConfVars.LLAP_IO_ENCODE_FORMATS);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Checking " + ifName + " against " + formatList);
+ }
+ String[] formats = StringUtils.getStrings(formatList);
+ if (formats != null) {
+ for (String format : formats) {
+ // TODO: should we check isAssignableFrom?
+ if (ifName.equals(format)) {
if (LOG.isInfoEnabled()) {
- LOG.info("Resetting VectorMapOperatorReadType from " + old + " for partition "
- + part.getTableName() + " " + part.getPartSpec());
+ LOG.info("Using SerDe-based LLAP reader for " + ifName);
}
- vpart.setVectorMapOperatorReadType(
- VectorMapOperatorReadType.VECTORIZED_INPUT_FILE_FORMAT);
+ return true;
}
}
- try {
- serde = part.getDeserializer(conf);
- } catch (Exception e) {
- throw new HiveException("Error creating SerDe for LLAP IO", e);
+ }
+ return false;
+ }
+
+ private static Deserializer findSerDeForLlapSerDeIf(
+ Configuration conf, PartitionDesc part) throws HiveException {
+ VectorPartitionDesc vpart = part.getVectorPartitionDesc();
+ if (vpart != null) {
+ VectorMapOperatorReadType old = vpart.getVectorMapOperatorReadType();
+ if (old != VectorMapOperatorReadType.VECTORIZED_INPUT_FILE_FORMAT) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Resetting VectorMapOperatorReadType from " + old + " for partition "
+ + part.getTableName() + " " + part.getPartSpec());
+ }
+ vpart.setVectorMapOperatorReadType(
+ VectorMapOperatorReadType.VECTORIZED_INPUT_FILE_FORMAT);
}
}
- InputFormat<?, ?> wrappedIf = llapIo.getInputFormat(inputFormat, serde);
- if (wrappedIf == null) {
- return inputFormat; // We cannot wrap; the cause is logged inside.
+ try {
+ return part.getDeserializer(conf);
+ } catch (Exception e) {
+ throw new HiveException("Error creating SerDe for LLAP IO", e);
}
- return castInputFormat(wrappedIf);
}
-
+ public static void injectLlapCaches(InputFormat<WritableComparable, Writable> inputFormat,
+ LlapIo<VectorizedRowBatch> llapIo) {
+ LOG.info("Injecting LLAP caches into " + inputFormat.getClass().getCanonicalName());
+ llapIo.initCacheOnlyInputFormat(inputFormat);
+ }
public static boolean canWrapForLlap(Class<? extends InputFormat> clazz, boolean checkVector) {
return LlapWrappableInputFormatInterface.class.isAssignableFrom(clazz) &&
(!checkVector || BatchToRowInputFormat.class.isAssignableFrom(clazz));
}
+ public static boolean canInjectCaches(Class<? extends InputFormat> clazz) {
+ return LlapCacheOnlyInputFormatInterface.class.isAssignableFrom(clazz);
+ }
+
@SuppressWarnings("unchecked")
private static <T, U, V, W> InputFormat<T, U> castInputFormat(InputFormat<V, W> from) {
// This is ugly in two ways...
http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/ql/src/java/org/apache/hadoop/hive/ql/io/LlapCacheOnlyInputFormatInterface.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/LlapCacheOnlyInputFormatInterface.java b/ql/src/java/org/apache/hadoop/hive/ql/io/LlapCacheOnlyInputFormatInterface.java
new file mode 100644
index 0000000..13d594c
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/LlapCacheOnlyInputFormatInterface.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.io.DataCache;
+import org.apache.hadoop.hive.common.io.FileMetadataCache;
+
+/** Marker interface for LLAP IO. */
+public interface LlapCacheOnlyInputFormatInterface {
+ void injectCaches(FileMetadataCache metadataCache, DataCache dataCache, Configuration cacheConf);
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java
index f4fadbb..38aaeed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java
@@ -15,7 +15,11 @@ package org.apache.hadoop.hive.ql.io.parquet;
import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.io.DataCache;
+import org.apache.hadoop.hive.common.io.FileMetadataCache;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
+import org.apache.hadoop.hive.ql.io.LlapCacheOnlyInputFormatInterface;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -37,7 +41,7 @@ import org.apache.parquet.hadoop.ParquetInputFormat;
* are not currently supported. Removing the interface turns off vectorization.
*/
public class MapredParquetInputFormat extends FileInputFormat<NullWritable, ArrayWritable>
- implements VectorizedInputFormatInterface {
+ implements VectorizedInputFormatInterface, LlapCacheOnlyInputFormatInterface {
private static final Logger LOG = LoggerFactory.getLogger(MapredParquetInputFormat.class);
@@ -78,4 +82,10 @@ public class MapredParquetInputFormat extends FileInputFormat<NullWritable, Arra
throw new RuntimeException("Cannot create a RecordReaderWrapper", e);
}
}
+
+ @Override
+ public void injectCaches(
+ FileMetadataCache metadataCache, DataCache dataCache, Configuration cacheConf) {
+ vectorizedSelf.injectCaches(metadataCache, dataCache, cacheConf);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java
index 322178a..0cc580a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java
@@ -15,6 +15,10 @@ package org.apache.hadoop.hive.ql.io.parquet;
import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.io.DataCache;
+import org.apache.hadoop.hive.common.io.FileMetadataCache;
+import org.apache.hadoop.hive.ql.io.LlapCacheOnlyInputFormatInterface;
import org.apache.hadoop.hive.ql.io.parquet.vector.VectorizedParquetRecordReader;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
@@ -28,7 +32,12 @@ import org.apache.hadoop.mapred.Reporter;
* Vectorized input format for Parquet files
*/
public class VectorizedParquetInputFormat
- extends FileInputFormat<NullWritable, VectorizedRowBatch> {
+ extends FileInputFormat<NullWritable, VectorizedRowBatch>
+ implements LlapCacheOnlyInputFormatInterface {
+
+ private FileMetadataCache metadataCache = null;
+ private DataCache dataCache = null;
+ private Configuration cacheConf = null;
public VectorizedParquetInputFormat() {
}
@@ -38,6 +47,15 @@ public class VectorizedParquetInputFormat
InputSplit inputSplit,
JobConf jobConf,
Reporter reporter) throws IOException {
- return new VectorizedParquetRecordReader(inputSplit, jobConf);
+ return new VectorizedParquetRecordReader(
+ inputSplit, jobConf, metadataCache, dataCache, cacheConf);
+ }
+
+ @Override
+ public void injectCaches(
+ FileMetadataCache metadataCache, DataCache dataCache, Configuration cacheConf) {
+ this.metadataCache = metadataCache;
+ this.dataCache = dataCache;
+ this.cacheConf = cacheConf;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetFooterInputFromCache.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetFooterInputFromCache.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetFooterInputFromCache.java
new file mode 100644
index 0000000..2a6e052
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetFooterInputFromCache.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.parquet.vector;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.SeekableInputStream;
+
+/**
+ * The Parquet InputFile implementation that allows the reader to
+ * read the footer from cache without being aware of the latter.
+ * This implements both InputFile and the InputStream that the reader gets from InputFile.
+ */
+final class ParquetFooterInputFromCache
+ extends SeekableInputStream implements InputFile {
+ final static int FOOTER_LENGTH_SIZE = 4; // For the file size check.
+ private static final int TAIL_LENGTH = ParquetFileWriter.MAGIC.length + FOOTER_LENGTH_SIZE;
+ private static final int FAKE_PREFIX_LENGTH = ParquetFileWriter.MAGIC.length;
+ private final int length, footerLength;
+ private int position = 0, bufferIx = 0, bufferPos = 0;
+ private final MemoryBuffer[] cacheData;
+ private final int[] positions;
+
+ public ParquetFooterInputFromCache(MemoryBufferOrBuffers footerData) {
+ MemoryBuffer oneBuffer = footerData.getSingleBuffer();
+ if (oneBuffer != null) {
+ cacheData = new MemoryBuffer[2];
+ cacheData[0] = oneBuffer;
+ } else {
+ MemoryBuffer[] bufs = footerData.getMultipleBuffers();
+ cacheData = new MemoryBuffer[bufs.length + 1];
+ System.arraycopy(bufs, 0, cacheData, 0, bufs.length);
+ }
+ int footerLength = 0;
+ positions = new int[cacheData.length];
+ for (int i = 0; i < cacheData.length - 1; ++i) {
+ positions[i] = footerLength;
+ int dataLen = cacheData[i].getByteBufferRaw().remaining();
+ assert dataLen > 0;
+ footerLength += dataLen;
+ }
+ positions[cacheData.length - 1] = footerLength;
+ cacheData[cacheData.length - 1] = new FooterEndBuffer(footerLength);
+ this.footerLength = footerLength;
+ this.length = footerLength + FAKE_PREFIX_LENGTH + TAIL_LENGTH;
+ }
+
+ @Override
+ public long getLength() throws IOException {
+ return length;
+ }
+
+ @Override
+ public SeekableInputStream newStream() throws IOException {
+ // Note: this doesn't maintain proper newStream semantics (if any).
+ // We could either clone this instead or enforce that this is only called once.
+ return this;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return position;
+ }
+
+ @Override
+ public void seek(long targetPos) throws IOException {
+ this.position = (int)targetPos;
+ targetPos -= FAKE_PREFIX_LENGTH;
+ // Not efficient, but we don't expect this to be called frequently.
+ for (int i = 1; i <= positions.length; ++i) {
+ int endPos = (i == positions.length) ? (length - FAKE_PREFIX_LENGTH) : positions[i];
+ if (endPos > targetPos) {
+ bufferIx = i - 1;
+ bufferPos = (int) (targetPos - positions[i - 1]);
+ return;
+ }
+ }
+ throw new IOException("Incorrect seek " + targetPos + "; footer length " + footerLength
+ + Arrays.toString(positions));
+ }
+
+ @Override
+ public void readFully(byte[] b, int offset, int len) throws IOException {
+ if (readInternal(b, offset, len) == len) return;
+ throw new EOFException();
+ }
+
+ public int readInternal(byte[] b, int offset, int len) {
+ if (position >= length) return -1;
+ int argPos = offset, argEnd = offset + len;
+ while (argPos < argEnd) {
+ if (bufferIx == cacheData.length) return (argPos - offset);
+ ByteBuffer data = cacheData[bufferIx].getByteBufferDup();
+ int toConsume = Math.min(argEnd - argPos, data.remaining() - bufferPos);
+ data.position(data.position() + bufferPos);
+ data.get(b, argPos, toConsume);
+ if (data.remaining() == 0) {
+ ++bufferIx;
+ bufferPos = 0;
+ } else {
+ bufferPos += toConsume;
+ }
+ argPos += toConsume;
+ }
+ return len;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (position >= length) return -1;
+ ++position;
+ ByteBuffer data = cacheData[bufferIx].getByteBufferRaw();
+ int bp = bufferPos;
+ ++bufferPos;
+ if (bufferPos == data.remaining()) {
+ ++bufferIx; // The first line check should handle the OOB.
+ bufferPos = 0;
+ }
+ return data.get(data.position() + bp) & 0xFF;
+ }
+
+ @Override
+ public int read(ByteBuffer bb) throws IOException {
+ // Simple implementation for now - currently Parquet uses heap buffers.
+ int result = -1;
+ if (bb.hasArray()) {
+ result = readInternal(bb.array(), bb.arrayOffset(), result);
+ if (result > 0) {
+ bb.position(bb.position() + result);
+ }
+ } else {
+ byte[] b = new byte[bb.remaining()];
+ result = readInternal(b, 0, result);
+ bb.put(b, 0, result);
+ }
+ return result;
+ }
+
+ @Override
+ public void readFully(byte[] arg0) throws IOException {
+ readFully(arg0, 0, arg0.length);
+ }
+
+ @Override
+ public void readFully(ByteBuffer arg0) throws IOException {
+ read(arg0);
+ }
+
+ /**
+ * The fake buffer that emulates end of file, with footer length and magic. Given that these
+ * can be generated based on the footer buffer itself, we don't cache them.
+ */
+ private final static class FooterEndBuffer implements MemoryBuffer {
+ private final ByteBuffer bb;
+ public FooterEndBuffer(int footerLength) {
+ byte[] b = new byte[8];
+ b[0] = (byte) ((footerLength >>> 0) & 0xFF);
+ b[1] = (byte) ((footerLength >>> 8) & 0xFF);
+ b[2] = (byte) ((footerLength >>> 16) & 0xFF);
+ b[3] = (byte) ((footerLength >>> 24) & 0xFF);
+ for (int i = 0; i < ParquetFileWriter.MAGIC.length; ++i) {
+ b[4 + i] = ParquetFileWriter.MAGIC[i];
+ }
+ bb = ByteBuffer.wrap(b);
+ }
+
+ @Override
+ public ByteBuffer getByteBufferRaw() {
+ return bb;
+ }
+
+ @Override
+ public ByteBuffer getByteBufferDup() {
+ return bb.duplicate();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
index 6a7a219..0977759 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -14,9 +14,22 @@
package org.apache.hadoop.hive.ql.io.parquet.vector;
import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.DataCache;
+import org.apache.hadoop.hive.common.io.FileMetadataCache;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.LlapCacheAwareFs;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.hadoop.hive.ql.io.HdfsUtils;
import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.io.parquet.ParquetRecordReaderBase;
import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher;
@@ -31,13 +44,21 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.parquet.ParquetRuntimeException;
+import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter;
import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetInputSplit;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopStreams;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.SeekableInputStream;
import org.apache.parquet.schema.InvalidSchemaException;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
@@ -50,11 +71,11 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.TreeMap;
import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
import static org.apache.parquet.format.converter.ParquetMetadataConverter.range;
-import static org.apache.parquet.hadoop.ParquetFileReader.readFooter;
import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
/**
@@ -74,6 +95,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
private VectorizedRowBatchCtx rbCtx;
private List<Integer> indexColumnsWanted;
private Object[] partitionValues;
+ private Path cacheFsPath;
/**
* For each request column, the reader to read this column. This is NULL if this column
@@ -114,9 +136,17 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
}
public VectorizedParquetRecordReader(
- org.apache.hadoop.mapred.InputSplit oldInputSplit,
- JobConf conf) {
+ org.apache.hadoop.mapred.InputSplit oldInputSplit, JobConf conf) {
+ this(oldInputSplit, conf, null, null, null);
+ }
+
+ public VectorizedParquetRecordReader(
+ org.apache.hadoop.mapred.InputSplit oldInputSplit, JobConf conf,
+ FileMetadataCache metadataCache, DataCache dataCache, Configuration cacheConf) {
try {
+ this.metadataCache = metadataCache;
+ this.cache = dataCache;
+ this.cacheConf = cacheConf;
serDeStats = new SerDeStats();
projectionPusher = new ProjectionPusher();
ParquetInputSplit inputSplit = getSplit(oldInputSplit, conf);
@@ -132,16 +162,17 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
}
}
- private void initPartitionValues(FileSplit fileSplit, JobConf conf) throws IOException {
- int partitionColumnCount = rbCtx.getPartitionColumnCount();
- if (partitionColumnCount > 0) {
- partitionValues = new Object[partitionColumnCount];
- rbCtx.getPartitionValues(rbCtx, conf, fileSplit, partitionValues);
- } else {
- partitionValues = null;
- }
- }
+ private void initPartitionValues(FileSplit fileSplit, JobConf conf) throws IOException {
+ int partitionColumnCount = rbCtx.getPartitionColumnCount();
+ if (partitionColumnCount > 0) {
+ partitionValues = new Object[partitionColumnCount];
+ VectorizedRowBatchCtx.getPartitionValues(rbCtx, conf, fileSplit, partitionValues);
+ } else {
+ partitionValues = null;
+ }
+ }
+ @SuppressWarnings("deprecation")
public void initialize(
InputSplit oldSplit,
JobConf configuration) throws IOException, InterruptedException {
@@ -164,16 +195,33 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
columnTypesList = DataWritableReadSupport.getColumnTypes(columnTypes);
// if task.side.metadata is set, rowGroupOffsets is null
+ Object cacheKey = null;
+ // TODO: also support fileKey in splits, like OrcSplit does
+ if (metadataCache != null) {
+ cacheKey = HdfsUtils.getFileId(file.getFileSystem(configuration), file,
+ HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID),
+ HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID));
+ }
+ if (cacheKey != null) {
+ // If we are going to use cache, change the path to depend on file ID for extra consistency.
+ FileSystem fs = file.getFileSystem(configuration);
+ if (cacheKey instanceof Long && HiveConf.getBoolVar(
+ cacheConf, ConfVars.LLAP_IO_USE_FILEID_PATH)) {
+ file = HdfsUtils.getFileIdPath(fs, file, (long)cacheKey);
+ }
+ }
+
if (rowGroupOffsets == null) {
//TODO check whether rowGroupOffSets can be null
// then we need to apply the predicate push down filter
- footer = readFooter(configuration, file, range(split.getStart(), split.getEnd()));
+ footer = readSplitFooter(
+ configuration, file, cacheKey, range(split.getStart(), split.getEnd()));
MessageType fileSchema = footer.getFileMetaData().getSchema();
FilterCompat.Filter filter = getFilter(configuration);
blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema);
} else {
// otherwise we find the row groups that were selected on the client
- footer = readFooter(configuration, file, NO_FILTER);
+ footer = readSplitFooter(configuration, file, cacheKey, NO_FILTER);
Set<Long> offsets = new HashSet<>();
for (long offset : rowGroupOffsets) {
offsets.add(offset);
@@ -230,10 +278,98 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
requestedSchema = fileSchema;
}
+ Path path = wrapPathForCache(file, cacheKey, configuration, blocks);
this.reader = new ParquetFileReader(
- configuration, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns());
+ configuration, footer.getFileMetaData(), path, blocks, requestedSchema.getColumns());
+ }
+
+ private Path wrapPathForCache(Path path, Object fileKey, JobConf configuration,
+ List<BlockMetaData> blocks) throws IOException {
+ if (fileKey == null || cache == null) {
+ return path;
+ }
+ HashSet<ColumnPath> includedCols = new HashSet<>();
+ for (ColumnDescriptor col : requestedSchema.getColumns()) {
+ includedCols.add(ColumnPath.get(col.getPath()));
+ }
+ // We could make some assumptions given how the reader currently does the work (consecutive
+ // chunks, etc.; blocks and columns stored in offset order in the lists), but we won't -
+ // just save all the chunk boundaries and lengths for now.
+ TreeMap<Long, Long> chunkIndex = new TreeMap<>();
+ for (BlockMetaData block : blocks) {
+ for (ColumnChunkMetaData mc : block.getColumns()) {
+ if (!includedCols.contains(mc.getPath())) continue;
+ chunkIndex.put(mc.getStartingPos(), mc.getStartingPos() + mc.getTotalSize());
+ }
+ }
+ // Register the cache-aware path so that Parquet reader would go thru it.
+ configuration.set("fs." + LlapCacheAwareFs.SCHEME + ".impl",
+ LlapCacheAwareFs.class.getCanonicalName());
+ path = LlapCacheAwareFs.registerFile(cache, path, fileKey, chunkIndex, configuration);
+ this.cacheFsPath = path;
+ return path;
+ }
+
+ private ParquetMetadata readSplitFooter(
+ JobConf configuration, final Path file, Object cacheKey, MetadataFilter filter) throws IOException {
+ MemoryBufferOrBuffers footerData = (cacheKey == null || metadataCache == null) ? null
+ : metadataCache.getFileMetadata(cacheKey);
+ if (footerData != null) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Found the footer in cache for " + cacheKey);
+ }
+ try {
+ return ParquetFileReader.readFooter(new ParquetFooterInputFromCache(footerData), filter);
+ } finally {
+ metadataCache.decRefBuffer(footerData);
+ }
+ }
+ final FileSystem fs = file.getFileSystem(configuration);
+ final FileStatus stat = fs.getFileStatus(file);
+ if (cacheKey == null || metadataCache == null) {
+ return readFooterFromFile(file, fs, stat, filter);
+ }
+
+ // To avoid reading the footer twice, we will cache it first and then read from cache.
+ // Parquet calls protobuf methods directly on the stream and we can't get bytes after the fact.
+ try (SeekableInputStream stream = HadoopStreams.wrap(fs.open(file))) {
+ long footerLengthIndex = stat.getLen()
+ - ParquetFooterInputFromCache.FOOTER_LENGTH_SIZE - ParquetFileWriter.MAGIC.length;
+ stream.seek(footerLengthIndex);
+ int footerLength = BytesUtils.readIntLittleEndian(stream);
+ stream.seek(footerLengthIndex - footerLength);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Caching the footer of length " + footerLength + " for " + cacheKey);
+ }
+ footerData = metadataCache.putFileMetadata(cacheKey, footerLength, stream);
+ try {
+ return ParquetFileReader.readFooter(new ParquetFooterInputFromCache(footerData), filter);
+ } finally {
+ metadataCache.decRefBuffer(footerData);
+ }
+ }
}
+ private ParquetMetadata readFooterFromFile(final Path file, final FileSystem fs,
+ final FileStatus stat, MetadataFilter filter) throws IOException {
+ InputFile inputFile = new InputFile() {
+ @Override
+ public SeekableInputStream newStream() throws IOException {
+ return HadoopStreams.wrap(fs.open(file));
+ }
+ @Override
+ public long getLength() throws IOException {
+ return stat.getLen();
+ }
+ };
+ return ParquetFileReader.readFooter(inputFile, filter);
+ }
+
+
+ private FileMetadataCache metadataCache;
+ private DataCache cache;
+ private Configuration cacheConf;
+
@Override
public boolean next(
NullWritable nullWritable,
@@ -259,6 +395,9 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
@Override
public void close() throws IOException {
+ if (cacheFsPath != null) {
+ LlapCacheAwareFs.unregisterFile(cacheFsPath);
+ }
if (reader != null) {
reader.close();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
index 8b99ae0..2e63260 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
@@ -258,7 +258,7 @@ public class MapWork extends BaseWork {
}
public void deriveLlap(Configuration conf, boolean isExecDriver) {
- boolean hasLlap = false, hasNonLlap = false, hasAcid = false;
+ boolean hasLlap = false, hasNonLlap = false, hasAcid = false, hasCacheOnly = false;
// Assume the IO is enabled on the daemon by default. We cannot reasonably check it here.
boolean isLlapOn = HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ENABLED, llapMode);
boolean canWrapAny = false, doCheckIfs = false;
@@ -272,10 +272,9 @@ public class MapWork extends BaseWork {
}
}
boolean hasPathToPartInfo = (pathToPartitionInfo != null && !pathToPartitionInfo.isEmpty());
- if (canWrapAny && hasPathToPartInfo) {
- assert isLlapOn;
+ if (hasPathToPartInfo) {
for (PartitionDesc part : pathToPartitionInfo.values()) {
- boolean isUsingLlapIo = HiveInputFormat.canWrapForLlap(
+ boolean isUsingLlapIo = canWrapAny && HiveInputFormat.canWrapForLlap(
part.getInputFileFormatClass(), doCheckIfs);
if (isUsingLlapIo) {
if (part.getTableDesc() != null &&
@@ -284,28 +283,31 @@ public class MapWork extends BaseWork {
} else {
hasLlap = true;
}
+ } else if (isLlapOn && HiveInputFormat.canInjectCaches(part.getInputFileFormatClass())) {
+ hasCacheOnly = true;
} else {
hasNonLlap = true;
}
}
}
- // check if the column types that are read are supported by LLAP IO
- if (hasLlap) {
- // TODO: no need for now hasLlap = checkVectorizerSupportedTypes();
- }
-
llapIoDesc = deriveLlapIoDescString(
- isLlapOn, canWrapAny, hasPathToPartInfo, hasLlap, hasNonLlap, hasAcid);
+ isLlapOn, canWrapAny, hasPathToPartInfo, hasLlap, hasNonLlap, hasAcid, hasCacheOnly);
}
private static String deriveLlapIoDescString(boolean isLlapOn, boolean canWrapAny,
- boolean hasPathToPartInfo, boolean hasLlap, boolean hasNonLlap, boolean hasAcid) {
+ boolean hasPathToPartInfo, boolean hasLlap, boolean hasNonLlap, boolean hasAcid,
+ boolean hasCacheOnly) {
if (!isLlapOn) return null; // LLAP IO is off, don't output.
- if (!canWrapAny) return "no inputs"; // Cannot use with input formats.
+ if (!canWrapAny && !hasCacheOnly) return "no inputs"; // Cannot use with input formats.
if (!hasPathToPartInfo) return "unknown"; // No information to judge.
+ int varieties = (hasAcid ? 1 : 0) + (hasLlap ? 1 : 0)
+ + (hasCacheOnly ? 1 : 0) + (hasNonLlap ? 1 : 0);
+ if (varieties > 1) return "some inputs"; // Will probably never actually happen.
if (hasAcid) return "may be used (ACID table)";
- return (hasLlap ? (hasNonLlap ? "some inputs" : "all inputs") : "no inputs");
+ if (hasLlap) return "all inputs";
+ if (hasCacheOnly) return "all inputs (cache only)";
+ return "no inputs";
}
public void internTable(Interner<TableDesc> interner) {
http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/ql/src/test/queries/clientpositive/parquet_ppd_decimal.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/parquet_ppd_decimal.q b/ql/src/test/queries/clientpositive/parquet_ppd_decimal.q
index dfca486..ff883db 100644
--- a/ql/src/test/queries/clientpositive/parquet_ppd_decimal.q
+++ b/ql/src/test/queries/clientpositive/parquet_ppd_decimal.q
@@ -1,6 +1,7 @@
SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
SET mapred.min.split.size=1000;
SET mapred.max.split.size=5000;
+set hive.llap.cache.allow.synthetic.fileid=true;
create table newtypestbl(c char(10), v varchar(10), d decimal(5,3), da date) stored as parquet;
http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/ql/src/test/queries/clientpositive/parquet_predicate_pushdown.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/parquet_predicate_pushdown.q b/ql/src/test/queries/clientpositive/parquet_predicate_pushdown.q
index a38cdbe..0e8dd04 100644
--- a/ql/src/test/queries/clientpositive/parquet_predicate_pushdown.q
+++ b/ql/src/test/queries/clientpositive/parquet_predicate_pushdown.q
@@ -1,5 +1,6 @@
set hive.mapred.mode=nonstrict;
SET hive.optimize.ppd=true;
+set hive.llap.cache.allow.synthetic.fileid=true;
-- SORT_QUERY_RESULTS
CREATE TABLE tbl_pred(t tinyint,