You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/08/31 21:26:56 UTC

[2/2] hive git commit: HIVE-17006 : LLAP: Parquet caching v1 (Sergey Shelukhin, reviewed by Gunther Hagleitner)

HIVE-17006 : LLAP: Parquet caching v1 (Sergey Shelukhin, reviewed by Gunther Hagleitner)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9e673a73
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9e673a73
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9e673a73

Branch: refs/heads/master
Commit: 9e673a73d28ef047c5d4299b415c306423f68c71
Parents: 642acdf
Author: sergey <se...@apache.org>
Authored: Thu Aug 31 14:21:04 2017 -0700
Committer: sergey <se...@apache.org>
Committed: Thu Aug 31 14:21:04 2017 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hive/common/FileUtils.java    |  35 +
 .../test/resources/testconfiguration.properties |   1 +
 .../apache/hadoop/hive/llap/io/api/LlapIo.java  |   4 +-
 .../hive/llap/cache/EvictionDispatcher.java     |  13 +-
 .../hive/llap/cache/LlapAllocatorBuffer.java    |   8 +-
 .../hive/llap/io/api/impl/LlapIoImpl.java       |  99 ++-
 .../io/metadata/ParquetMetadataCacheImpl.java   | 353 ++++++++
 .../hadoop/hive/llap/LlapCacheAwareFs.java      | 422 +++++++++
 .../org/apache/hadoop/hive/ql/io/HdfsUtils.java |   3 +-
 .../hadoop/hive/ql/io/HiveInputFormat.java      | 103 ++-
 .../io/LlapCacheOnlyInputFormatInterface.java   |  28 +
 .../ql/io/parquet/MapredParquetInputFormat.java |  12 +-
 .../parquet/VectorizedParquetInputFormat.java   |  22 +-
 .../vector/ParquetFooterInputFromCache.java     | 196 +++++
 .../vector/VectorizedParquetRecordReader.java   | 169 +++-
 .../org/apache/hadoop/hive/ql/plan/MapWork.java |  28 +-
 .../clientpositive/parquet_ppd_decimal.q        |   1 +
 .../clientpositive/parquet_predicate_pushdown.q |   1 +
 .../test/queries/clientpositive/parquet_types.q |   1 +
 .../parquet_types_vectorization.q               |   1 +
 .../queries/clientpositive/vectorized_parquet.q |   1 +
 .../clientpositive/vectorized_parquet_types.q   |   1 +
 .../llap/parquet_predicate_pushdown.q.out       |  18 +-
 .../llap/parquet_types_vectorization.q.out      | 849 +++++++++++++++++++
 .../llap/vector_partitioned_date_time.q.out     |  16 +-
 .../llap/vectorized_parquet.q.out               |   2 +-
 .../llap/vectorized_parquet_types.q.out         |   2 +-
 .../hive/common/io/FileMetadataCache.java       |  51 ++
 .../io/encoded/MemoryBufferOrBuffers.java       |  24 +
 29 files changed, 2360 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
index e784797..0feff59 100644
--- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
@@ -18,11 +18,14 @@
 
 package org.apache.hadoop.hive.common;
 
+import java.io.EOFException;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
 import java.security.AccessControlException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
@@ -1033,4 +1036,36 @@ public final class FileUtils {
     return result;
   }
 
+  /**
+   * Reads length bytes of data from the stream into the byte buffer.
+   * @param stream Stream to read from.
+   * @param length The number of bytes to read.
+   * @param bb The buffer to read into; the data is written at current position and then the
+   *           position is incremented by length.
+   * @throws EOFException the length bytes cannot be read. The buffer position is not modified.
+   */
+  public static void readFully(InputStream stream, int length, ByteBuffer bb) throws IOException {
+    byte[] b = null;
+    int offset = 0;
+    if (bb.hasArray()) {
+      b = bb.array();
+      offset = bb.arrayOffset() + bb.position();
+    } else {
+      b = new byte[bb.remaining()];
+    }
+    int fullLen = length;
+    while (length > 0) {
+      int result = stream.read(b, offset, length);
+      if (result < 0) {
+        throw new EOFException("Reading " + fullLen + " bytes");
+      }
+      offset += result;
+      length -= result;
+    }
+    if (!bb.hasArray()) {
+      bb.put(b);
+    } else {
+      bb.position(bb.position() + fullLen);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 6f2efa7..f452341 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -75,6 +75,7 @@ minillap.shared.query.files=insert_into1.q,\
   orc_merge4.q,\
   orc_merge_diff_fs.q,\
   parallel_colstats.q,\
+  parquet_types_vectorization.q,\
   unionDistinct_1.q,\
   union_type_chk.q,\
   cte_2.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java
index 42129b7..d2f9e6c 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIo.java
@@ -23,7 +23,9 @@ import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.InputFormat;
 
 public interface LlapIo<T> {
-  InputFormat<NullWritable, T> getInputFormat(InputFormat sourceInputFormat, Deserializer serde);
+  InputFormat<NullWritable, T> getInputFormat(
+      InputFormat<?, ?> sourceInputFormat, Deserializer serde);
   void close();
   String getMemoryInfo();
+  void initCacheOnlyInputFormat(InputFormat<?, ?> inputFormat);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
index 0cbc8f6..c5248ce 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.hive.llap.io.metadata.OrcFileEstimateErrors;
 import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata;
 import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache;
 import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
+import org.apache.hadoop.hive.llap.io.metadata.ParquetMetadataCacheImpl;
+import org.apache.hadoop.hive.llap.io.metadata.ParquetMetadataCacheImpl.LlapFileMetadataBuffer;
 
 /**
  * Eviction dispatcher - uses double dispatch to route eviction notifications to correct caches.
@@ -31,11 +33,15 @@ public final class EvictionDispatcher implements EvictionListener, LlapOomDebugD
   private final SerDeLowLevelCacheImpl serdeCache;
   private final OrcMetadataCache metadataCache;
   private final EvictionAwareAllocator allocator;
+  // TODO# temporary, will be merged with OrcMetadataCache after HIVE-15665.
+  private final ParquetMetadataCacheImpl parquetMetadataCache;
 
   public EvictionDispatcher(LowLevelCache dataCache, SerDeLowLevelCacheImpl serdeCache,
-      OrcMetadataCache metadataCache, EvictionAwareAllocator allocator) {
+      OrcMetadataCache metadataCache, EvictionAwareAllocator allocator,
+      ParquetMetadataCacheImpl parquetMetadataCache) {
     this.dataCache = dataCache;
     this.metadataCache = metadataCache;
+    this.parquetMetadataCache = parquetMetadataCache;
     this.serdeCache = serdeCache;
     this.allocator = allocator;
   }
@@ -45,10 +51,13 @@ public final class EvictionDispatcher implements EvictionListener, LlapOomDebugD
     buffer.notifyEvicted(this); // This will call one of the specific notifyEvicted overloads.
   }
 
+  public void notifyEvicted(LlapFileMetadataBuffer buffer) {
+    this.parquetMetadataCache.notifyEvicted(buffer);
+  }
+
   public void notifyEvicted(LlapSerDeDataBuffer buffer) {
     serdeCache.notifyEvicted(buffer);
     allocator.deallocateEvicted(buffer);
-   
   }
 
   public void notifyEvicted(LlapDataBuffer buffer) {

http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapAllocatorBuffer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapAllocatorBuffer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapAllocatorBuffer.java
index 52144c2..8f57a04 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapAllocatorBuffer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapAllocatorBuffer.java
@@ -131,15 +131,17 @@ public abstract class LlapAllocatorBuffer extends LlapCacheableBuffer implements
     long newState, oldState;
     do {
       oldState = state.get();
+      // We have to check it here since invalid decref will overflow.
+      int oldRefCount = State.getRefCount(oldState);
+      if (oldRefCount == 0) {
+        throw new AssertionError("Invalid decRef when refCount is 0: " + this);
+      }
       newState = State.decRefCount(oldState);
     } while (!state.compareAndSet(oldState, newState));
     int newRefCount = State.getRefCount(newState);
     if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
       LlapIoImpl.LOCKING_LOGGER.trace("Unlocked {}; refcount {}", this, newRefCount);
     }
-    if (newRefCount < 0) {
-      throw new AssertionError("Unexpected refCount " + newRefCount + ": " + this);
-    }
     return newRefCount;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index 35b9d1f..f42622b 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -34,11 +34,18 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.io.Allocator;
+import org.apache.hadoop.hive.common.io.DataCache;
+import org.apache.hadoop.hive.common.io.DiskRange;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
+import org.apache.hadoop.hive.common.io.FileMetadataCache;
+import org.apache.hadoop.hive.common.io.Allocator.BufferObjectFactory;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.cache.BuddyAllocator;
 import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
 import org.apache.hadoop.hive.llap.cache.EvictionDispatcher;
+import org.apache.hadoop.hive.llap.cache.LlapDataBuffer;
 import org.apache.hadoop.hive.llap.cache.LlapOomDebugDump;
 import org.apache.hadoop.hive.llap.cache.LowLevelCache;
 import org.apache.hadoop.hive.llap.cache.LowLevelCacheImpl;
@@ -49,24 +56,30 @@ import org.apache.hadoop.hive.llap.cache.LowLevelLrfuCachePolicy;
 import org.apache.hadoop.hive.llap.cache.SerDeLowLevelCacheImpl;
 import org.apache.hadoop.hive.llap.cache.SimpleAllocator;
 import org.apache.hadoop.hive.llap.cache.SimpleBufferManager;
+import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
 import org.apache.hadoop.hive.llap.io.api.LlapIo;
 import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer;
 import org.apache.hadoop.hive.llap.io.decode.GenericColumnVectorProducer;
 import org.apache.hadoop.hive.llap.io.decode.OrcColumnVectorProducer;
 import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache;
+import org.apache.hadoop.hive.llap.io.metadata.ParquetMetadataCacheImpl;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics;
 import org.apache.hadoop.hive.llap.metrics.MetricsUtils;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.LlapCacheOnlyInputFormatInterface;
 import org.apache.hadoop.hive.ql.io.orc.encoded.IoTrace;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hive.common.util.FixedSizedObjectPool;
 
 
+
 import com.google.common.primitives.Ints;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
@@ -86,8 +99,13 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
   private ObjectName buddyAllocatorMXBean;
   private final Allocator allocator;
   private final LlapOomDebugDump memoryDump;
+  private final FileMetadataCache fileMetadataCache;
+  private final LowLevelCache dataCache;
+  private final BufferUsageManager bufferManager;
+  private final Configuration daemonConf;
 
   private LlapIoImpl(Configuration conf) throws IOException {
+    this.daemonConf = conf;
     String ioMode = HiveConf.getVar(conf, HiveConf.ConfVars.LLAP_IO_MEMORY_MODE);
     boolean useLowLevelCache = LlapIoImpl.MODE_CACHE.equalsIgnoreCase(ioMode);
     LOG.info("Initializing LLAP IO in {} mode", useLowLevelCache ? LlapIoImpl.MODE_CACHE : "none");
@@ -115,7 +133,6 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
         sessionId);
 
     OrcMetadataCache metadataCache = null;
-    LowLevelCache cache = null;
     SerDeLowLevelCacheImpl serdeCache = null; // TODO: extract interface when needed
     BufferUsageManager bufferManagerOrc = null, bufferManagerGeneric = null;
     boolean isEncodeEnabled = HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ENCODE_ENABLED);
@@ -154,16 +171,22 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
       this.memoryDump = allocator;
       LowLevelCacheImpl cacheImpl = new LowLevelCacheImpl(
           cacheMetrics, cachePolicy, allocator, true);
-      cache = cacheImpl;
+      dataCache = cacheImpl;
       if (isEncodeEnabled) {
         SerDeLowLevelCacheImpl serdeCacheImpl = new SerDeLowLevelCacheImpl(
             cacheMetrics, cachePolicy, allocator);
         serdeCache = serdeCacheImpl;
       }
+
       boolean useGapCache = HiveConf.getBoolVar(conf, ConfVars.LLAP_CACHE_ENABLE_ORC_GAP_CACHE);
       metadataCache = new OrcMetadataCache(metaMemManager, metaCachePolicy, useGapCache);
+      // TODO# temporary, see comments there
+      ParquetMetadataCacheImpl parquetMc = new ParquetMetadataCacheImpl(
+          allocator, memManager, cachePolicy, cacheMetrics);
+      fileMetadataCache = parquetMc;
       // And finally cache policy uses cache to notify it of eviction. The cycle is complete!
-      EvictionDispatcher e = new EvictionDispatcher(cache, serdeCache, metadataCache, allocator);
+      EvictionDispatcher e = new EvictionDispatcher(
+          dataCache, serdeCache, metadataCache, allocator, parquetMc);
       if (isSplitCache) {
         metaCachePolicy.setEvictionListener(e);
         metaCachePolicy.setParentDebugDumper(e);
@@ -172,14 +195,15 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
       cachePolicy.setParentDebugDumper(e);
 
       cacheImpl.startThreads(); // Start the cache threads.
-      bufferManagerOrc = cacheImpl; // Cache also serves as buffer manager.
+      bufferManager = bufferManagerOrc = cacheImpl; // Cache also serves as buffer manager.
       bufferManagerGeneric = serdeCache;
     } else {
       this.allocator = new SimpleAllocator(conf);
       memoryDump = null;
+      fileMetadataCache = null;
       SimpleBufferManager sbm = new SimpleBufferManager(allocator, cacheMetrics);
-      bufferManagerOrc = bufferManagerGeneric = sbm;
-      cache = sbm;
+      bufferManager = bufferManagerOrc = bufferManagerGeneric = sbm;
+      dataCache = sbm;
     }
     // IO thread pool. Listening is used for unhandled errors for now (TODO: remove?)
     int numThreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_IO_THREADPOOL_SIZE);
@@ -190,7 +214,7 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
     FixedSizedObjectPool<IoTrace> tracePool = IoTrace.createTracePool(conf);
     // TODO: this should depends on input format and be in a map, or something.
     this.orcCvp = new OrcColumnVectorProducer(
-        metadataCache, cache, bufferManagerOrc, conf, cacheMetrics, ioMetrics, tracePool);
+        metadataCache, dataCache, bufferManagerOrc, conf, cacheMetrics, ioMetrics, tracePool);
     this.genericCvp = isEncodeEnabled ? new GenericColumnVectorProducer(
         serdeCache, bufferManagerGeneric, conf, cacheMetrics, ioMetrics, tracePool) : null;
     LOG.info("LLAP IO initialized");
@@ -210,10 +234,9 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
     return sb.toString();
   }
 
-  @SuppressWarnings("rawtypes")
   @Override
   public InputFormat<NullWritable, VectorizedRowBatch> getInputFormat(
-      InputFormat sourceInputFormat, Deserializer sourceSerDe) {
+      InputFormat<?, ?> sourceInputFormat, Deserializer sourceSerDe) {
     ColumnVectorProducer cvp = genericCvp;
     if (sourceInputFormat instanceof OrcInputFormat) {
       cvp = orcCvp; // Special-case for ORC.
@@ -233,4 +256,62 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
     }
     executor.shutdownNow();
   }
+
+
+  @Override
+  public void initCacheOnlyInputFormat(InputFormat<?, ?> inputFormat) {
+    LlapCacheOnlyInputFormatInterface cacheIf = (LlapCacheOnlyInputFormatInterface)inputFormat;
+    cacheIf.injectCaches(fileMetadataCache,
+        new GenericDataCache(dataCache, bufferManager), daemonConf);
+  }
+
+  private class GenericDataCache implements DataCache, BufferObjectFactory {
+    private final LowLevelCache lowLevelCache;
+    private final BufferUsageManager bufferManager;
+
+    public GenericDataCache(LowLevelCache lowLevelCache, BufferUsageManager bufferManager) {
+      this.lowLevelCache = lowLevelCache;
+      this.bufferManager = bufferManager;
+    }
+
+    @Override
+    public DiskRangeList getFileData(Object fileKey, DiskRangeList range,
+        long baseOffset, DiskRangeListFactory factory, BooleanRef gotAllData) {
+      // TODO: we currently pass null counters because this doesn't use LlapRecordReader.
+      //       Create counters for non-elevator-using fragments also?
+      return lowLevelCache.getFileData(fileKey, range, baseOffset, factory, null, gotAllData);
+    }
+
+    @Override
+    public long[] putFileData(Object fileKey, DiskRange[] ranges,
+        MemoryBuffer[] data, long baseOffset) {
+      return lowLevelCache.putFileData(fileKey, ranges, data, baseOffset, Priority.NORMAL, null);
+    }
+
+    @Override
+    public void releaseBuffer(MemoryBuffer buffer) {
+      bufferManager.decRefBuffer(buffer);
+    }
+
+    @Override
+    public void reuseBuffer(MemoryBuffer buffer) {
+      boolean isReused = bufferManager.incRefBuffer(buffer);
+      assert isReused;
+    }
+
+    @Override
+    public Allocator getAllocator() {
+      return bufferManager.getAllocator();
+    }
+
+    @Override
+    public BufferObjectFactory getDataBufferFactory() {
+      return this;
+    }
+
+    @Override
+    public MemoryBuffer create() {
+      return new LlapDataBuffer();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ParquetMetadataCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ParquetMetadataCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ParquetMetadataCacheImpl.java
new file mode 100644
index 0000000..b61a8ca
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/ParquetMetadataCacheImpl.java
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.io.metadata;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.common.io.FileMetadataCache;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers;
+import org.apache.hadoop.hive.llap.cache.EvictionAwareAllocator;
+import org.apache.hadoop.hive.llap.cache.EvictionDispatcher;
+import org.apache.hadoop.hive.llap.cache.LlapAllocatorBuffer;
+import org.apache.hadoop.hive.llap.cache.LlapOomDebugDump;
+import org.apache.hadoop.hive.llap.cache.LowLevelCachePolicy;
+import org.apache.hadoop.hive.llap.cache.MemoryManager;
+import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
+import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
+import org.apache.parquet.io.SeekableInputStream;
+
+// TODO# merge with OrcMetadataCache (and rename) after HIVE-15665. Shares a lot of the code.
+public class ParquetMetadataCacheImpl implements LlapOomDebugDump, FileMetadataCache {
+  private final ConcurrentHashMap<Object, LlapBufferOrBuffers> metadata =
+      new ConcurrentHashMap<>();
+
+  private final MemoryManager memoryManager;
+  private final LowLevelCachePolicy policy;
+  private final EvictionAwareAllocator allocator;
+  private final LlapDaemonCacheMetrics metrics;
+
+  public ParquetMetadataCacheImpl(EvictionAwareAllocator allocator, MemoryManager memoryManager,
+      LowLevelCachePolicy policy, LlapDaemonCacheMetrics metrics) {
+    this.memoryManager = memoryManager;
+    this.allocator = allocator;
+    this.policy = policy;
+    this.metrics = metrics;
+  }
+
+  public void notifyEvicted(LlapFileMetadataBuffer buffer) {
+    LlapBufferOrBuffers removed = metadata.remove(buffer.getFileKey());
+    if (removed == null) return;
+    if (removed.getSingleBuffer() != null) {
+      assert removed.getSingleBuffer() == buffer;
+      return;
+    }
+    discardMultiBuffer(removed);
+  }
+
+  @Override
+  public LlapBufferOrBuffers getFileMetadata(Object fileKey) {
+    LlapBufferOrBuffers result = metadata.get(fileKey);
+    if (result == null) return null;
+    if (!lockBuffer(result, true)) {
+      // No need to discard the buffer we cannot lock - eviction takes care of that.
+      metadata.remove(fileKey, result);
+      return null;
+    }
+    return result;
+  }
+
+  @Override
+  public LlapBufferOrBuffers putFileMetadata(
+      Object fileKey, int length, InputStream is) throws IOException {
+    LlapBufferOrBuffers result = null;
+    while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value).
+      LlapBufferOrBuffers oldVal = metadata.get(fileKey);
+      if (oldVal == null) {
+        result = wrapBbForFile(result, fileKey, length, is);
+        if (!lockBuffer(result, false)) {
+          throw new AssertionError("Cannot lock a newly created value " + result);
+        }
+        oldVal = metadata.putIfAbsent(fileKey, result);
+        if (oldVal == null) {
+          cacheInPolicy(result); // Cached successfully, add to policy.
+          return result;
+        }
+      }
+      if (lockOldVal(fileKey, result, oldVal)) {
+        return oldVal;
+      }
+      // We found some old value but couldn't incRef it; remove it.
+      metadata.remove(fileKey, oldVal);
+    }
+  }
+
+  private void cacheInPolicy(LlapBufferOrBuffers buffers) {
+    LlapAllocatorBuffer singleBuffer = buffers.getSingleLlapBuffer();
+    if (singleBuffer != null) {
+      policy.cache(singleBuffer, Priority.HIGH);
+      return;
+    }
+    for (LlapAllocatorBuffer buffer : buffers.getMultipleLlapBuffers()) {
+      policy.cache(buffer, Priority.HIGH);
+    }
+  }
+
+  private <T extends LlapBufferOrBuffers> boolean lockOldVal(Object key, T newVal, T oldVal) {
+    if (LlapIoImpl.CACHE_LOGGER.isTraceEnabled()) {
+      LlapIoImpl.CACHE_LOGGER.trace("Trying to cache when metadata is already cached for" +
+          " {}; old {}, new {}", key, oldVal, newVal);
+    }
+    if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
+      LlapIoImpl.LOCKING_LOGGER.trace("Locking {} due to cache collision", oldVal);
+    }
+    if (lockBuffer(oldVal, true)) {
+      // We found an old, valid block for this key in the cache.
+      if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
+        LlapIoImpl.LOCKING_LOGGER.trace("Unlocking {} due to cache collision with {}",
+            newVal, oldVal);
+      }
+
+      if (newVal != null) {
+        unlockBuffer(newVal, false);
+      }
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public void decRefBuffer(MemoryBufferOrBuffers buffer) {
+    if (!(buffer instanceof LlapBufferOrBuffers)) {
+      throw new AssertionError(buffer.getClass());
+    }
+    unlockBuffer((LlapBufferOrBuffers)buffer, true);
+  }
+
+  private LlapBufferOrBuffers wrapBbForFile(LlapBufferOrBuffers result,
+      Object fileKey, int length, InputStream stream) throws IOException {
+    if (result != null) return result;
+    int maxAlloc = allocator.getMaxAllocation();
+    LlapFileMetadataBuffer[] largeBuffers = null;
+    if (maxAlloc < length) {
+      largeBuffers = new LlapFileMetadataBuffer[length / maxAlloc];
+      for (int i = 0; i < largeBuffers.length; ++i) {
+        largeBuffers[i] = new LlapFileMetadataBuffer(fileKey);
+      }
+      allocator.allocateMultiple(largeBuffers, maxAlloc, null);
+      for (int i = 0; i < largeBuffers.length; ++i) {
+        readIntoCacheBuffer(stream, maxAlloc, largeBuffers[i]);
+      }
+    }
+    int smallSize = length % maxAlloc;
+    if (smallSize == 0) {
+      return new LlapFileMetadataBuffers(largeBuffers);
+    } else {
+      LlapFileMetadataBuffer[] smallBuffer = new LlapFileMetadataBuffer[1];
+      smallBuffer[0] = new LlapFileMetadataBuffer(fileKey);
+      allocator.allocateMultiple(smallBuffer, length, null);
+      readIntoCacheBuffer(stream, smallSize, smallBuffer[0]);
+      if (largeBuffers == null) {
+        return smallBuffer[0];
+      } else {
+        LlapFileMetadataBuffer[] cacheData = new LlapFileMetadataBuffer[largeBuffers.length + 1];
+        System.arraycopy(largeBuffers, 0, cacheData, 0, largeBuffers.length);
+        cacheData[largeBuffers.length] = smallBuffer[0];
+        return new LlapFileMetadataBuffers(largeBuffers);
+      }
+    }
+  }
+
+  private static void readIntoCacheBuffer(
+      InputStream stream, int length, MemoryBuffer dest) throws IOException {
+    ByteBuffer bb = dest.getByteBufferRaw();
+    int pos = bb.position();
+    bb.limit(pos + length);
+    // TODO: SeekableInputStream.readFully eventually calls a Hadoop method that used to be
+    //       buggy in 2.7 and also anyway just does a copy for a direct buffer. Do a copy here.
+    // ((SeekableInputStream)stream).readFully(bb);
+    FileUtils.readFully(stream, length, bb);
+    bb.position(pos);
+  }
+
+
+  private boolean lockBuffer(LlapBufferOrBuffers buffers, boolean doNotifyPolicy) {
+    LlapAllocatorBuffer buffer = buffers.getSingleLlapBuffer();
+    if (buffer != null) {
+      return lockOneBuffer(buffer, doNotifyPolicy);
+    }
+    LlapAllocatorBuffer[] bufferArray = buffers.getMultipleLlapBuffers();
+    for (int i = 0; i < bufferArray.length; ++i) {
+      if (lockOneBuffer(bufferArray[i], doNotifyPolicy)) continue;
+      for (int j = 0; j < i; ++j) {
+        unlockSingleBuffer(buffer, true);
+      }
+      discardMultiBuffer(buffers);
+      return false;
+    }
+    return true;
+  }
+
+  private void discardMultiBuffer(LlapBufferOrBuffers removed) {
+    long memoryFreed = 0;
+    for (LlapAllocatorBuffer buf : removed.getMultipleLlapBuffers()) {
+      long memUsage = buf.getMemoryUsage();
+      // We cannot just deallocate the buffer, as it can hypothetically have users.
+      int result = buf.invalidate();
+      switch (result) {
+      case LlapAllocatorBuffer.INVALIDATE_ALREADY_INVALID: continue; // Nothing to do.
+      case LlapAllocatorBuffer.INVALIDATE_FAILED: {
+        // Someone is using this buffer; eventually, it will be evicted.
+        continue;
+      }
+      case LlapAllocatorBuffer.INVALIDATE_OK: {
+        memoryFreed += memUsage;
+        allocator.deallocateEvicted(buf);
+        break;
+      }
+      default: throw new AssertionError(result);
+      }
+    }
+    memoryManager.releaseMemory(memoryFreed);
+  }
+
+  private boolean lockOneBuffer(LlapAllocatorBuffer buffer, boolean doNotifyPolicy) {
+    int rc = buffer.incRef();
+    if (rc > 0) {
+      metrics.incrCacheNumLockedBuffers();
+    }
+    if (doNotifyPolicy && rc == 1) {
+      // We have just locked a buffer that wasn't previously locked.
+      policy.notifyLock(buffer);
+    }
+    return rc > 0;
+  }
+
+  private void unlockBuffer(LlapBufferOrBuffers buffers, boolean isCached) {
+    LlapAllocatorBuffer singleBuffer = buffers.getSingleLlapBuffer();
+    if (singleBuffer != null) {
+      unlockSingleBuffer(singleBuffer, isCached);
+      return;
+    }
+    for (LlapAllocatorBuffer buffer : buffers.getMultipleLlapBuffers()) {
+      unlockSingleBuffer(buffer, isCached);
+    }
+  }
+
+  private void unlockSingleBuffer(LlapAllocatorBuffer buffer, boolean isCached) {
+    boolean isLastDecref = (buffer.decRef() == 0);
+    if (isLastDecref) {
+      if (isCached) {
+        policy.notifyUnlock(buffer);
+      } else {
+        allocator.deallocate(buffer);
+      }
+    }
+    metrics.decrCacheNumLockedBuffers();
+  }
+
+
+  public static interface LlapBufferOrBuffers extends MemoryBufferOrBuffers {
+    LlapAllocatorBuffer getSingleLlapBuffer();
+    LlapAllocatorBuffer[] getMultipleLlapBuffers();
+  }
+
+  public final static class LlapFileMetadataBuffer
+      extends LlapAllocatorBuffer implements LlapBufferOrBuffers {
+    private final Object fileKey;
+
+    public LlapFileMetadataBuffer(Object fileKey) {
+      this.fileKey = fileKey;
+    }
+
+    @Override
+    public void notifyEvicted(EvictionDispatcher evictionDispatcher) {
+      evictionDispatcher.notifyEvicted(this);
+    }
+
+    public Object getFileKey() {
+      return fileKey;
+    }
+
+    @Override
+    public LlapAllocatorBuffer getSingleLlapBuffer() {
+      return this;
+    }
+
+    @Override
+    public LlapAllocatorBuffer[] getMultipleLlapBuffers() {
+      return null;
+    }
+
+    @Override
+    public MemoryBuffer getSingleBuffer() {
+      return this;
+    }
+
+    @Override
+    public MemoryBuffer[] getMultipleBuffers() {
+      return null;
+    }
+  }
+
+  public final static class LlapFileMetadataBuffers implements LlapBufferOrBuffers {
+    private final LlapFileMetadataBuffer[] buffers;
+
+    public LlapFileMetadataBuffers(LlapFileMetadataBuffer[] buffers) {
+      this.buffers = buffers;
+    }
+
+    @Override
+    public LlapAllocatorBuffer getSingleLlapBuffer() {
+      return null;
+    }
+
+    @Override
+    public LlapAllocatorBuffer[] getMultipleLlapBuffers() {
+      return buffers;
+    }
+
+    @Override
+    public MemoryBuffer getSingleBuffer() {
+      return null;
+    }
+
+    @Override
+    public MemoryBuffer[] getMultipleBuffers() {
+      return buffers;
+    }
+  }
+
+  @Override
+  public String debugDumpForOom() {
+    // TODO: nothing, will be merged with ORC cache
+    return null;
+  }
+
+  @Override
+  public void debugDumpShort(StringBuilder sb) {
+    // TODO: nothing, will be merged with ORC cache
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java
new file mode 100644
index 0000000..5c1eed3
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapCacheAwareFs.java
@@ -0,0 +1,422 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap;
+
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.common.io.Allocator;
+import org.apache.hadoop.hive.common.io.DataCache;
+import org.apache.hadoop.hive.common.io.DiskRange;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+import org.apache.hadoop.hive.ql.io.orc.encoded.CacheChunk;
+import org.apache.hadoop.util.Progressable;
+import org.apache.orc.impl.RecordReaderUtils;
+
+/**
+ * This is currently only used by Parquet; however, a generally applicable approach is used -
+ * you pass in a set of offset pairs for a file, and the file is cached with these boundaries.
+ * Don't add anything format specific here.
+ */
+public class LlapCacheAwareFs extends FileSystem {
+  public static final String SCHEME = "llapcache";
+  private static AtomicLong currentSplitId = new AtomicLong(-1);
+  private URI uri;
+
+  // We store the chunk indices by split file; that way if several callers are reading
+  // the same file they can separately store and remove the relevant parts of the index.
+  private static final ConcurrentHashMap<Long, CacheAwareInputStream> files =
+      new ConcurrentHashMap<>();
+
+  public static Path registerFile(DataCache cache, Path path, Object fileKey,
+      TreeMap<Long, Long> index, Configuration conf) throws IOException {
+    long splitId = currentSplitId.incrementAndGet();
+    CacheAwareInputStream stream = new CacheAwareInputStream(
+        cache, conf, index, path, fileKey, -1);
+    if (files.putIfAbsent(splitId, stream) != null) {
+      throw new IOException("Record already exists for " + splitId);
+    }
+    conf.set("fs." + LlapCacheAwareFs.SCHEME + ".impl", LlapCacheAwareFs.class.getCanonicalName());
+    return new Path(SCHEME + "://" + SCHEME + "/" + splitId);
+  }
+
+  public static void unregisterFile(Path cachePath) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Unregistering " + cachePath);
+    }
+    files.remove(extractSplitId(cachePath));
+  }
+
+  @Override
+  public void initialize(URI uri, Configuration conf) throws IOException {
+    super.initialize(uri, conf);
+    this.uri = URI.create(SCHEME + "://" + SCHEME);
+  }
+
+  @Override
+  public FSDataInputStream open(Path path, int bufferSize) throws IOException {
+    return new FSDataInputStream(getCtx(path).cloneWithBufferSize(bufferSize));
+  }
+
+  private LlapCacheAwareFs.CacheAwareInputStream getCtx(Path path) {
+    return files.get(extractSplitId(path));
+  }
+
+  private static long extractSplitId(Path path) {
+    String pathOnly = path.toUri().getPath();
+    if (pathOnly.startsWith("/")) {
+      pathOnly = pathOnly.substring(1);
+    }
+    return Long.parseLong(pathOnly);
+  }
+
+  @Override
+  public URI getUri() {
+    return uri;
+  }
+
+  @Override
+  public Path getWorkingDirectory() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setWorkingDirectory(Path arg0) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public FSDataOutputStream append(Path arg0, int arg1, Progressable arg2) throws IOException {
+    LlapCacheAwareFs.CacheAwareInputStream ctx = getCtx(arg0);
+    return ctx.getFs().append(ctx.path, arg1, arg2);
+  }
+
+  @Override
+  public FSDataOutputStream create(Path arg0, FsPermission arg1, boolean arg2, int arg3,
+      short arg4, long arg5, Progressable arg6) throws IOException {
+    LlapCacheAwareFs.CacheAwareInputStream ctx = getCtx(arg0);
+    return ctx.getFs().create(ctx.path, arg1, arg2, arg3, arg4, arg5, arg6);
+  }
+
+  @Override
+  public boolean delete(Path arg0, boolean arg1) throws IOException {
+    LlapCacheAwareFs.CacheAwareInputStream ctx = getCtx(arg0);
+    return ctx.getFs().delete(ctx.path, arg1);
+
+  }
+
+  @Override
+  public FileStatus getFileStatus(Path arg0) throws IOException {
+    LlapCacheAwareFs.CacheAwareInputStream ctx = getCtx(arg0);
+    return ctx.getFs().getFileStatus(ctx.path);
+  }
+
+  @Override
+  public FileStatus[] listStatus(Path arg0) throws FileNotFoundException, IOException {
+    LlapCacheAwareFs.CacheAwareInputStream ctx = getCtx(arg0);
+    return ctx.getFs().listStatus(ctx.path);
+  }
+
+  @Override
+  public boolean mkdirs(Path arg0, FsPermission arg1) throws IOException {
+    LlapCacheAwareFs.CacheAwareInputStream ctx = getCtx(arg0);
+    return ctx.getFs().mkdirs(ctx.path, arg1);
+  }
+
+  @Override
+  public boolean rename(Path arg0, Path arg1) throws IOException {
+    LlapCacheAwareFs.CacheAwareInputStream ctx1 = getCtx(arg0), ctx2 = getCtx(arg1);
+    return ctx1.getFs().rename(ctx1.path, ctx2.path);
+  }
+
+  private static class CacheAwareInputStream extends InputStream
+      implements Seekable, PositionedReadable {
+    private final TreeMap<Long, Long> chunkIndex;
+    private final Path path;
+    private final Object fileKey;
+    private final Configuration conf;
+    private final DataCache cache;
+    private final int bufferSize;
+    private long position = 0;
+
+    public CacheAwareInputStream(DataCache cache, Configuration conf,
+        TreeMap<Long, Long> chunkIndex, Path path, Object fileKey, int bufferSize) {
+      this.cache = cache;
+      this.fileKey = fileKey;
+      this.chunkIndex = chunkIndex;
+      this.path = path;
+      this.conf = conf;
+      this.bufferSize = bufferSize;
+    }
+
+    public LlapCacheAwareFs.CacheAwareInputStream cloneWithBufferSize(int bufferSize) {
+      return new CacheAwareInputStream(cache, conf, chunkIndex, path, fileKey, bufferSize);
+    }
+
+    @Override
+    public int read() throws IOException {
+      // This is not called by ConsecutiveChunk stuff in Parquet.
+      // If this were used, it might make sense to make it faster.
+      byte[] theByte = new byte[1];
+      int result = read(theByte, 0, 1);
+      if (result != 1) throw new EOFException();
+      return theByte[0] & 0xFF;
+    }
+
+    @Override
+    public int read(byte[] array, final int arrayOffset, final int len) throws IOException {
+      long readStartPos = position;
+      DiskRangeList drl = new DiskRangeList(readStartPos, readStartPos + len);
+      DataCache.BooleanRef gotAllData = new DataCache.BooleanRef();
+      drl = cache.getFileData(fileKey, drl, 0, new DataCache.DiskRangeListFactory() {
+        @Override
+        public DiskRangeList createCacheChunk(
+            MemoryBuffer buffer, long startOffset, long endOffset) {
+          CacheChunk result = new CacheChunk(); // TODO: pool?
+          result.init(buffer, startOffset, endOffset);
+          return result;
+        }
+      }, gotAllData);
+      if (LOG.isInfoEnabled()) {
+        LOG.info("Buffers after cache " + RecordReaderUtils.stringifyDiskRanges(drl));
+      }
+      if (gotAllData.value) {
+        long sizeRead = 0;
+        while (drl != null) {
+          assert drl.hasData();
+          long from = drl.getOffset(), to = drl.getEnd();
+          int offsetFromReadStart = (int)(from - readStartPos), candidateSize = (int)(to - from);
+          ByteBuffer data = drl.getData().duplicate();
+          data.get(array, arrayOffset + offsetFromReadStart, candidateSize);
+          sizeRead += candidateSize;
+          drl = drl.next;
+        }
+        validateAndUpdatePosition(len, sizeRead);
+        return len;
+      }
+      int maxAlloc = cache.getAllocator().getMaxAllocation();
+      // We have some disk data. Separate it by column chunk and put into cache.
+
+      // We started with a single DRL, so we assume there will be no consecutive missing blocks
+      // after the cache has inserted cache data. We also assume all the missing parts will
+      // represent one or several column chunks, since we always cache on column chunk boundaries.
+      DiskRangeList current = drl;
+      FileSystem fs = path.getFileSystem(conf);
+      FSDataInputStream is = fs.open(path, bufferSize);
+      Allocator allocator = cache.getAllocator();
+      long sizeRead = 0;
+      while (current != null) {
+        DiskRangeList candidate = current;
+        current = current.next;
+        long from = candidate.getOffset(), to = candidate.getEnd();
+        // The offset in the destination array for the beginning of this missing range.
+        int offsetFromReadStart = (int)(from - readStartPos), candidateSize = (int)(to - from);
+        if (candidate.hasData()) {
+          ByteBuffer data = candidate.getData().duplicate();
+          data.get(array, arrayOffset + offsetFromReadStart, candidateSize);
+          sizeRead += candidateSize;
+          continue;
+        }
+        // The data is not in cache.
+
+        // Account for potential partial chunks.
+        SortedMap<Long, Long> chunksInThisRead = getAndValidateMissingChunks(maxAlloc, from, to);
+
+        is.seek(from);
+        is.readFully(array, arrayOffset + offsetFromReadStart, candidateSize);
+        sizeRead += candidateSize;
+        // Now copy missing chunks (and parts of chunks) into cache buffers.
+        if (fileKey == null || cache == null) continue;
+        int extraDiskDataOffset = 0;
+        // TODO: should we try to make a giant array for one cache call to avoid overhead?
+        for (Map.Entry<Long, Long> missingChunk : chunksInThisRead.entrySet()) {
+          long chunkFrom = Math.max(from, missingChunk.getKey()),
+              chunkTo = Math.min(to, missingChunk.getValue()),
+              chunkLength = chunkTo - chunkFrom;
+          MemoryBuffer[] largeBuffers = null, smallBuffer = null, newCacheData = null;
+          try {
+            int largeBufCount = (int) (chunkLength / maxAlloc);
+            int smallSize = (int) (chunkLength % maxAlloc);
+            int chunkPartCount = largeBufCount + ((smallSize > 0) ? 1 : 0);
+            DiskRange[] cacheRanges = new DiskRange[chunkPartCount];
+            int extraOffsetInChunk = 0;
+            if (maxAlloc < chunkLength) {
+              largeBuffers = new MemoryBuffer[largeBufCount];
+              allocator.allocateMultiple(largeBuffers, maxAlloc, cache.getDataBufferFactory());
+              for (int i = 0; i < largeBuffers.length; ++i) {
+                // By definition here we copy up to the limit of the buffer.
+                ByteBuffer bb = largeBuffers[i].getByteBufferRaw();
+                int remaining = bb.remaining();
+                assert remaining == maxAlloc;
+                copyDiskDataToCacheBuffer(array,
+                    arrayOffset + offsetFromReadStart + extraDiskDataOffset,
+                    remaining, bb, cacheRanges, i, chunkFrom + extraOffsetInChunk);
+                extraDiskDataOffset += remaining;
+                extraOffsetInChunk += remaining;
+              }
+            }
+            newCacheData = largeBuffers;
+            largeBuffers = null;
+            if (smallSize > 0) {
+              smallBuffer = new MemoryBuffer[1];
+              allocator.allocateMultiple(smallBuffer, smallSize, cache.getDataBufferFactory());
+              ByteBuffer bb = smallBuffer[0].getByteBufferRaw();
+              copyDiskDataToCacheBuffer(array,
+                  arrayOffset + offsetFromReadStart + extraDiskDataOffset,
+                  smallSize, bb, cacheRanges, largeBufCount, chunkFrom + extraOffsetInChunk);
+              extraDiskDataOffset += smallSize;
+              extraOffsetInChunk += smallSize; // Not strictly necessary, noone will look at it.
+              if (newCacheData == null) {
+                newCacheData = smallBuffer;
+              } else {
+                // TODO: add allocate overload with an offset and length
+                MemoryBuffer[] combinedCacheData = new MemoryBuffer[largeBufCount + 1];
+                System.arraycopy(newCacheData, 0, combinedCacheData, 0, largeBufCount);
+                newCacheData = combinedCacheData;
+                newCacheData[largeBufCount] = smallBuffer[0];
+              }
+              smallBuffer = null;
+            }
+            cache.putFileData(fileKey, cacheRanges, newCacheData, 0);
+          } finally {
+            // We do not use the new cache buffers for the actual read, given the way read() API is.
+            // Therefore, we don't need to handle cache collisions - just decref all the buffers.
+            if (newCacheData != null) {
+              for (MemoryBuffer buffer : newCacheData) {
+                if (buffer == null) continue;
+                cache.releaseBuffer(buffer);
+              }
+            }
+            // If we have failed before building newCacheData, deallocate other the allocated.
+            if (largeBuffers != null) {
+              for (MemoryBuffer buffer : largeBuffers) {
+                if (buffer == null) continue;
+                allocator.deallocate(buffer);
+              }
+            }
+            if (smallBuffer != null && smallBuffer[0] != null) {
+              allocator.deallocate(smallBuffer[0]);
+            }
+          }
+        }
+      }
+      validateAndUpdatePosition(len, sizeRead);
+      return len;
+    }
+
+    private void validateAndUpdatePosition(int len, long sizeRead) {
+      if (sizeRead != len) {
+        throw new AssertionError("Reading at " + position + " for " + len + ": "
+            + sizeRead + " bytes copied");
+      }
+      position += len;
+    }
+
+    private void copyDiskDataToCacheBuffer(byte[] diskData, int offsetInDiskData, int sizeToCopy,
+        ByteBuffer cacheBuffer, DiskRange[] cacheRanges, int cacheRangeIx, long cacheRangeStart) {
+      int bbPos = cacheBuffer.position();
+      long cacheRangeEnd = cacheRangeStart + sizeToCopy;
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Caching [" + cacheRangeStart + ", " + cacheRangeEnd + ")");
+      }
+      cacheRanges[cacheRangeIx] = new DiskRange(cacheRangeStart, cacheRangeEnd);
+      cacheBuffer.put(diskData, offsetInDiskData, sizeToCopy);
+      cacheBuffer.position(bbPos);
+    }
+
+    private SortedMap<Long, Long> getAndValidateMissingChunks(
+        int maxAlloc, long from, long to) {
+      Map.Entry<Long, Long> firstMissing = chunkIndex.floorEntry(from);
+      if (firstMissing == null) {
+        throw new AssertionError("No lower bound for offset " + from);
+      }
+      if (firstMissing.getValue() <= from
+          || ((from - firstMissing.getKey()) % maxAlloc) != 0) {
+        // The data does not belong to a recognized chunk, or is split wrong.
+        throw new AssertionError("Lower bound for offset " + from + " is ["
+            + firstMissing.getKey() + ", " + firstMissing.getValue() + ")");
+      }
+      SortedMap<Long, Long> missingChunks = chunkIndex.subMap(firstMissing.getKey(), to);
+      if (missingChunks.isEmpty()) {
+        throw new AssertionError("No chunks for [" + from + ", " + to + ")");
+      }
+      long lastMissingOffset = missingChunks.lastKey(),
+          lastMissingEnd = missingChunks.get(lastMissingOffset);
+      if (lastMissingEnd < to
+          || (to != lastMissingEnd && ((to - lastMissingOffset) % maxAlloc) != 0)) {
+        // The data does not belong to a recognized chunk, or is split wrong.
+        throw new AssertionError("Lower bound for offset " + to + " is ["
+            + lastMissingOffset + ", " + lastMissingEnd + ")");
+      }
+      return missingChunks;
+    }
+
+    public FileSystem getFs() throws IOException {
+      return path.getFileSystem(conf);
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      return position;
+    }
+
+    @Override
+    public void seek(long position) throws IOException {
+      this.position = position;
+    }
+
+    @Override
+    @Private
+    public boolean seekToNewSource(long arg0) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int read(long arg0, byte[] arg1, int arg2, int arg3) throws IOException {
+      seek(arg0);
+      return read(arg1, arg2, arg3);
+    }
+
+    @Override
+    public void readFully(long arg0, byte[] arg1) throws IOException {
+      read(arg0, arg1, 0, arg1.length);
+    }
+
+    @Override
+    public void readFully(long arg0, byte[] arg1, int arg2, int arg3) throws IOException {
+      read(arg0, arg1, 0, arg1.length);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java
index fa7f59d..102150a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java
@@ -49,7 +49,8 @@ public class HdfsUtils {
     if (fileSystem instanceof DistributedFileSystem) {
       DistributedFileSystem dfs = (DistributedFileSystem) fileSystem;
       if ((!checkDefaultFs) || isDefaultFs(dfs)) {
-        return SHIMS.getFileId(dfs, path.toUri().getPath());
+        Object result = SHIMS.getFileId(dfs, path.toUri().getPath());
+        if (result != null) return result;
       }
     }
     if (!allowSynthetic) {

http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 442c921..5c9d289 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -214,6 +214,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
     }
     String ifName = inputFormat.getClass().getCanonicalName();
     boolean isSupported = inputFormat instanceof LlapWrappableInputFormatInterface;
+    boolean isCacheOnly = inputFormat instanceof LlapCacheOnlyInputFormatInterface;
     boolean isVectorized = Utilities.getUseVectorizedInputFileFormat(conf);
     if (!isVectorized) {
       // Pretend it's vectorized if the non-vector wrapped is enabled.
@@ -224,33 +225,17 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
     if (isVectorized && !isSupported
         && HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ENCODE_ENABLED)) {
       // See if we can use re-encoding to read the format thru IO elevator.
-      String formatList = HiveConf.getVar(conf, ConfVars.LLAP_IO_ENCODE_FORMATS);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Checking " + ifName + " against " + formatList);
-      }
-      String[] formats = StringUtils.getStrings(formatList);
-      if (formats != null) {
-        for (String format : formats) {
-          // TODO: should we check isAssignableFrom?
-          if (ifName.equals(format)) {
-            if (LOG.isInfoEnabled()) {
-              LOG.info("Using SerDe-based LLAP reader for " + ifName);
-            }
-            isSupported = isSerdeBased = true;
-            break;
-          }
-        }
-      }
+      isSupported = isSerdeBased = checkInputFormatForLlapEncode(conf, ifName);
     }
-    if (!isSupported || !isVectorized) {
+    if ((!isSupported || !isVectorized) && !isCacheOnly) {
       if (LOG.isInfoEnabled()) {
         LOG.info("Not using llap for " + ifName + ": supported = "
-          + isSupported + ", vectorized = " + isVectorized);
+          + isSupported + ", vectorized = " + isVectorized + ", cache only = " + isCacheOnly);
       }
       return inputFormat;
     }
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Wrapping " + ifName);
+      LOG.debug("Processing " + ifName);
     }
 
     @SuppressWarnings("unchecked")
@@ -264,43 +249,85 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
     Deserializer serde = null;
     if (isSerdeBased) {
       if (part == null) {
-        if (LOG.isInfoEnabled()) {
+        if (isCacheOnly) {
+          LOG.info("Using cache only because there's no partition spec for SerDe-based IF");
+          injectLlapCaches(inputFormat, llapIo);
+        } else {
           LOG.info("Not using LLAP IO because there's no partition spec for SerDe-based IF");
         }
         return inputFormat;
       }
-      VectorPartitionDesc vpart =  part.getVectorPartitionDesc();
-      if (vpart != null) {
-        VectorMapOperatorReadType old = vpart.getVectorMapOperatorReadType();
-        if (old != VectorMapOperatorReadType.VECTORIZED_INPUT_FILE_FORMAT) {
+      serde = findSerDeForLlapSerDeIf(conf, part);
+    }
+    if (isSupported && isVectorized) {
+      InputFormat<?, ?> wrappedIf = llapIo.getInputFormat(inputFormat, serde);
+      // null means we cannot wrap; the cause is logged inside.
+      if (wrappedIf != null) {
+        return castInputFormat(wrappedIf);
+      }
+    }
+    if (isCacheOnly) {
+      injectLlapCaches(inputFormat, llapIo);
+    }
+    return inputFormat;
+  }
+
+  private static boolean checkInputFormatForLlapEncode(Configuration conf, String ifName) {
+    String formatList = HiveConf.getVar(conf, ConfVars.LLAP_IO_ENCODE_FORMATS);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Checking " + ifName + " against " + formatList);
+    }
+    String[] formats = StringUtils.getStrings(formatList);
+    if (formats != null) {
+      for (String format : formats) {
+        // TODO: should we check isAssignableFrom?
+        if (ifName.equals(format)) {
           if (LOG.isInfoEnabled()) {
-            LOG.info("Resetting VectorMapOperatorReadType from " + old + " for partition "
-              + part.getTableName() + " " + part.getPartSpec());
+            LOG.info("Using SerDe-based LLAP reader for " + ifName);
           }
-          vpart.setVectorMapOperatorReadType(
-              VectorMapOperatorReadType.VECTORIZED_INPUT_FILE_FORMAT);
+          return true;
         }
       }
-      try {
-        serde = part.getDeserializer(conf);
-      } catch (Exception e) {
-        throw new HiveException("Error creating SerDe for LLAP IO", e);
+    }
+    return false;
+  }
+
+  private static Deserializer findSerDeForLlapSerDeIf(
+      Configuration conf, PartitionDesc part) throws HiveException {
+    VectorPartitionDesc vpart =  part.getVectorPartitionDesc();
+    if (vpart != null) {
+      VectorMapOperatorReadType old = vpart.getVectorMapOperatorReadType();
+      if (old != VectorMapOperatorReadType.VECTORIZED_INPUT_FILE_FORMAT) {
+        if (LOG.isInfoEnabled()) {
+          LOG.info("Resetting VectorMapOperatorReadType from " + old + " for partition "
+            + part.getTableName() + " " + part.getPartSpec());
+        }
+        vpart.setVectorMapOperatorReadType(
+            VectorMapOperatorReadType.VECTORIZED_INPUT_FILE_FORMAT);
       }
     }
-    InputFormat<?, ?> wrappedIf = llapIo.getInputFormat(inputFormat, serde);
-    if (wrappedIf == null) {
-      return inputFormat; // We cannot wrap; the cause is logged inside.
+    try {
+      return part.getDeserializer(conf);
+    } catch (Exception e) {
+      throw new HiveException("Error creating SerDe for LLAP IO", e);
     }
-    return castInputFormat(wrappedIf);
   }
 
-
+  public static void injectLlapCaches(InputFormat<WritableComparable, Writable> inputFormat,
+      LlapIo<VectorizedRowBatch> llapIo) {
+    LOG.info("Injecting LLAP caches into " + inputFormat.getClass().getCanonicalName());
+    llapIo.initCacheOnlyInputFormat(inputFormat);
+  }
 
   public static boolean canWrapForLlap(Class<? extends InputFormat> clazz, boolean checkVector) {
     return LlapWrappableInputFormatInterface.class.isAssignableFrom(clazz) &&
         (!checkVector || BatchToRowInputFormat.class.isAssignableFrom(clazz));
   }
 
+  public static boolean canInjectCaches(Class<? extends InputFormat> clazz) {
+    return LlapCacheOnlyInputFormatInterface.class.isAssignableFrom(clazz);
+  }
+
   @SuppressWarnings("unchecked")
   private static <T, U, V, W> InputFormat<T, U> castInputFormat(InputFormat<V, W> from) {
     // This is ugly in two ways...

http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/ql/src/java/org/apache/hadoop/hive/ql/io/LlapCacheOnlyInputFormatInterface.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/LlapCacheOnlyInputFormatInterface.java b/ql/src/java/org/apache/hadoop/hive/ql/io/LlapCacheOnlyInputFormatInterface.java
new file mode 100644
index 0000000..13d594c
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/LlapCacheOnlyInputFormatInterface.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.io.DataCache;
+import org.apache.hadoop.hive.common.io.FileMetadataCache;
+
+/** Marker interface for LLAP IO. */
+public interface LlapCacheOnlyInputFormatInterface {
+  void injectCaches(FileMetadataCache metadataCache, DataCache dataCache, Configuration cacheConf);
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java
index f4fadbb..38aaeed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java
@@ -15,7 +15,11 @@ package org.apache.hadoop.hive.ql.io.parquet;
 
 import java.io.IOException;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.io.DataCache;
+import org.apache.hadoop.hive.common.io.FileMetadataCache;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
+import org.apache.hadoop.hive.ql.io.LlapCacheOnlyInputFormatInterface;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -37,7 +41,7 @@ import org.apache.parquet.hadoop.ParquetInputFormat;
  *       are not currently supported.  Removing the interface turns off vectorization.
  */
 public class MapredParquetInputFormat extends FileInputFormat<NullWritable, ArrayWritable>
-  implements VectorizedInputFormatInterface {
+  implements VectorizedInputFormatInterface, LlapCacheOnlyInputFormatInterface {
 
   private static final Logger LOG = LoggerFactory.getLogger(MapredParquetInputFormat.class);
 
@@ -78,4 +82,10 @@ public class MapredParquetInputFormat extends FileInputFormat<NullWritable, Arra
       throw new RuntimeException("Cannot create a RecordReaderWrapper", e);
     }
   }
+
+  @Override
+  public void injectCaches(
+      FileMetadataCache metadataCache, DataCache dataCache, Configuration cacheConf) {
+    vectorizedSelf.injectCaches(metadataCache, dataCache, cacheConf);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java
index 322178a..0cc580a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java
@@ -15,6 +15,10 @@ package org.apache.hadoop.hive.ql.io.parquet;
 
 import java.io.IOException;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.io.DataCache;
+import org.apache.hadoop.hive.common.io.FileMetadataCache;
+import org.apache.hadoop.hive.ql.io.LlapCacheOnlyInputFormatInterface;
 import org.apache.hadoop.hive.ql.io.parquet.vector.VectorizedParquetRecordReader;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
@@ -28,7 +32,12 @@ import org.apache.hadoop.mapred.Reporter;
  * Vectorized input format for Parquet files
  */
 public class VectorizedParquetInputFormat
-  extends FileInputFormat<NullWritable, VectorizedRowBatch> {
+  extends FileInputFormat<NullWritable, VectorizedRowBatch> 
+  implements LlapCacheOnlyInputFormatInterface {
+
+  private FileMetadataCache metadataCache = null;
+  private DataCache dataCache = null;
+  private Configuration cacheConf = null;
 
   public VectorizedParquetInputFormat() {
   }
@@ -38,6 +47,15 @@ public class VectorizedParquetInputFormat
     InputSplit inputSplit,
     JobConf jobConf,
     Reporter reporter) throws IOException {
-    return new VectorizedParquetRecordReader(inputSplit, jobConf);
+    return new VectorizedParquetRecordReader(
+        inputSplit, jobConf, metadataCache, dataCache, cacheConf);
+  }
+
+  @Override
+  public void injectCaches(
+      FileMetadataCache metadataCache, DataCache dataCache, Configuration cacheConf) {
+    this.metadataCache = metadataCache;
+    this.dataCache = dataCache;
+    this.cacheConf = cacheConf;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetFooterInputFromCache.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetFooterInputFromCache.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetFooterInputFromCache.java
new file mode 100644
index 0000000..2a6e052
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/ParquetFooterInputFromCache.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.parquet.vector;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.SeekableInputStream;
+
+/**
+ * The Parquet InputFile implementation that allows the reader to
+ * read the footer from cache without being aware of the latter.
+ * This implements both InputFile and the InputStream that the reader gets from InputFile.
+ */
+final class ParquetFooterInputFromCache
+    extends SeekableInputStream implements InputFile {
+  final static int FOOTER_LENGTH_SIZE = 4; // For the file size check.
+  private static final int TAIL_LENGTH = ParquetFileWriter.MAGIC.length + FOOTER_LENGTH_SIZE;
+  private static final int FAKE_PREFIX_LENGTH = ParquetFileWriter.MAGIC.length;
+  private final int length, footerLength;
+  private int position = 0, bufferIx = 0, bufferPos = 0;
+  private final MemoryBuffer[] cacheData;
+  private final int[] positions;
+
+  public ParquetFooterInputFromCache(MemoryBufferOrBuffers footerData) {
+    MemoryBuffer oneBuffer = footerData.getSingleBuffer();
+    if (oneBuffer != null) {
+      cacheData = new MemoryBuffer[2];
+      cacheData[0] = oneBuffer;
+    } else {
+      MemoryBuffer[] bufs = footerData.getMultipleBuffers();
+      cacheData = new MemoryBuffer[bufs.length + 1];
+      System.arraycopy(bufs, 0, cacheData, 0, bufs.length);
+    }
+    int footerLength = 0;
+    positions = new int[cacheData.length];
+    for (int i = 0; i < cacheData.length - 1; ++i) {
+      positions[i] = footerLength;
+      int dataLen = cacheData[i].getByteBufferRaw().remaining();
+      assert dataLen > 0;
+      footerLength += dataLen;
+    }
+    positions[cacheData.length - 1] = footerLength;
+    cacheData[cacheData.length - 1] = new FooterEndBuffer(footerLength);
+    this.footerLength = footerLength;
+    this.length = footerLength + FAKE_PREFIX_LENGTH + TAIL_LENGTH;
+  }
+
+  @Override
+  public long getLength() throws IOException {
+    return length;
+  }
+
+  @Override
+  public SeekableInputStream newStream() throws IOException {
+    // Note: this doesn't maintain proper newStream semantics (if any). 
+    //       We could either clone this instead or enforce that this is only called once.
+    return this;
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    return position;
+  }
+
+  @Override
+  public void seek(long targetPos) throws IOException {
+    this.position = (int)targetPos;
+    targetPos -= FAKE_PREFIX_LENGTH;
+    // Not efficient, but we don't expect this to be called frequently.
+    for (int i = 1; i <= positions.length; ++i) {
+      int endPos = (i == positions.length) ? (length - FAKE_PREFIX_LENGTH) : positions[i];
+      if (endPos > targetPos) {
+        bufferIx = i - 1;
+        bufferPos = (int) (targetPos - positions[i - 1]);
+        return;
+      }
+    }
+    throw new IOException("Incorrect seek " + targetPos + "; footer length " + footerLength
+        + Arrays.toString(positions));
+  }
+
+  @Override
+  public void readFully(byte[] b, int offset, int len) throws IOException {
+    if (readInternal(b, offset, len) == len) return;
+    throw new EOFException();
+  }
+
+  public int readInternal(byte[] b, int offset, int len) {
+    if (position >= length) return -1;
+    int argPos = offset, argEnd = offset + len;
+    while (argPos < argEnd) {
+      if (bufferIx == cacheData.length) return (argPos - offset);
+      ByteBuffer data = cacheData[bufferIx].getByteBufferDup();
+      int toConsume = Math.min(argEnd - argPos, data.remaining() - bufferPos);
+      data.position(data.position() + bufferPos);
+      data.get(b, argPos, toConsume);
+      if (data.remaining() == 0) {
+        ++bufferIx;
+        bufferPos = 0;
+      } else {
+        bufferPos += toConsume;
+      }
+      argPos += toConsume;
+    }
+    return len;
+  }
+
+  @Override
+  public int read() throws IOException {
+    if (position >= length) return -1;
+    ++position;
+    ByteBuffer data = cacheData[bufferIx].getByteBufferRaw();
+    int bp = bufferPos;
+    ++bufferPos;
+    if (bufferPos == data.remaining()) {
+      ++bufferIx; // The first line check should handle the OOB.
+      bufferPos = 0;
+    }
+    return data.get(data.position() + bp) & 0xFF;
+  }
+
+  @Override
+  public int read(ByteBuffer bb) throws IOException {
+    // Simple implementation for now - currently Parquet uses heap buffers.
+    int result = -1;
+    if (bb.hasArray()) {
+      result = readInternal(bb.array(), bb.arrayOffset(), result);
+      if (result > 0) {
+        bb.position(bb.position() + result);
+      }
+    } else {
+      byte[] b = new byte[bb.remaining()];
+      result = readInternal(b, 0, result);
+      bb.put(b, 0, result);
+    }
+    return result;
+  }
+
+  @Override
+  public void readFully(byte[] arg0) throws IOException {
+    readFully(arg0, 0, arg0.length);
+  }
+
+  @Override
+  public void readFully(ByteBuffer arg0) throws IOException {
+    read(arg0);
+  }
+
+  /**
+   * The fake buffer that emulates end of file, with footer length and magic. Given that these
+   * can be generated based on the footer buffer itself, we don't cache them.
+   */
+  private final static class FooterEndBuffer implements MemoryBuffer {
+    private final ByteBuffer bb;
+    public FooterEndBuffer(int footerLength) {
+      byte[] b = new byte[8];
+      b[0] = (byte) ((footerLength >>>  0) & 0xFF);
+      b[1] = (byte) ((footerLength >>>  8) & 0xFF);
+      b[2] = (byte) ((footerLength >>> 16) & 0xFF);
+      b[3] = (byte) ((footerLength >>> 24) & 0xFF);
+      for (int i = 0; i < ParquetFileWriter.MAGIC.length; ++i) {
+        b[4 + i] = ParquetFileWriter.MAGIC[i];
+      }
+      bb = ByteBuffer.wrap(b);
+    }
+
+    @Override
+    public ByteBuffer getByteBufferRaw() {
+      return bb;
+    }
+
+    @Override
+    public ByteBuffer getByteBufferDup() {
+      return bb.duplicate();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
index 6a7a219..0977759 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -14,9 +14,22 @@
 package org.apache.hadoop.hive.ql.io.parquet.vector;
 
 import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.DataCache;
+import org.apache.hadoop.hive.common.io.FileMetadataCache;
+import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.LlapCacheAwareFs;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.hadoop.hive.ql.io.HdfsUtils;
 import org.apache.hadoop.hive.ql.io.IOConstants;
 import org.apache.hadoop.hive.ql.io.parquet.ParquetRecordReaderBase;
 import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher;
@@ -31,13 +44,21 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.parquet.ParquetRuntimeException;
+import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter;
 import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
 import org.apache.parquet.hadoop.ParquetInputSplit;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopStreams;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.SeekableInputStream;
 import org.apache.parquet.schema.InvalidSchemaException;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.Type;
@@ -50,11 +71,11 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.TreeMap;
 
 import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
 import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
 import static org.apache.parquet.format.converter.ParquetMetadataConverter.range;
-import static org.apache.parquet.hadoop.ParquetFileReader.readFooter;
 import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
 
 /**
@@ -74,6 +95,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
   private VectorizedRowBatchCtx rbCtx;
   private List<Integer> indexColumnsWanted;
   private Object[] partitionValues;
+  private Path cacheFsPath;
 
   /**
    * For each request column, the reader to read this column. This is NULL if this column
@@ -114,9 +136,17 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
   }
 
   public VectorizedParquetRecordReader(
-    org.apache.hadoop.mapred.InputSplit oldInputSplit,
-    JobConf conf) {
+      org.apache.hadoop.mapred.InputSplit oldInputSplit, JobConf conf) {
+    this(oldInputSplit, conf, null, null, null);
+  }
+
+  public VectorizedParquetRecordReader(
+      org.apache.hadoop.mapred.InputSplit oldInputSplit, JobConf conf,
+      FileMetadataCache metadataCache, DataCache dataCache, Configuration cacheConf) {
     try {
+      this.metadataCache = metadataCache;
+      this.cache = dataCache;
+      this.cacheConf = cacheConf;
       serDeStats = new SerDeStats();
       projectionPusher = new ProjectionPusher();
       ParquetInputSplit inputSplit = getSplit(oldInputSplit, conf);
@@ -132,16 +162,17 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
     }
   }
 
-   private void initPartitionValues(FileSplit fileSplit, JobConf conf) throws IOException {
-      int partitionColumnCount = rbCtx.getPartitionColumnCount();
-      if (partitionColumnCount > 0) {
-        partitionValues = new Object[partitionColumnCount];
-        rbCtx.getPartitionValues(rbCtx, conf, fileSplit, partitionValues);
-      } else {
-        partitionValues = null;
-      }
-   }
+  private void initPartitionValues(FileSplit fileSplit, JobConf conf) throws IOException {
+     int partitionColumnCount = rbCtx.getPartitionColumnCount();
+     if (partitionColumnCount > 0) {
+       partitionValues = new Object[partitionColumnCount];
+       VectorizedRowBatchCtx.getPartitionValues(rbCtx, conf, fileSplit, partitionValues);
+     } else {
+       partitionValues = null;
+     }
+  }
 
+  @SuppressWarnings("deprecation")
   public void initialize(
     InputSplit oldSplit,
     JobConf configuration) throws IOException, InterruptedException {
@@ -164,16 +195,33 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
     columnTypesList = DataWritableReadSupport.getColumnTypes(columnTypes);
 
     // if task.side.metadata is set, rowGroupOffsets is null
+    Object cacheKey = null;
+    // TODO: also support fileKey in splits, like OrcSplit does
+    if (metadataCache != null) {
+      cacheKey = HdfsUtils.getFileId(file.getFileSystem(configuration), file,
+        HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID),
+        HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID));
+    }
+    if (cacheKey != null) {
+      // If we are going to use cache, change the path to depend on file ID for extra consistency.
+      FileSystem fs = file.getFileSystem(configuration);
+      if (cacheKey instanceof Long && HiveConf.getBoolVar(
+          cacheConf, ConfVars.LLAP_IO_USE_FILEID_PATH)) {
+        file = HdfsUtils.getFileIdPath(fs, file, (long)cacheKey);
+      }
+    }
+
     if (rowGroupOffsets == null) {
       //TODO check whether rowGroupOffSets can be null
       // then we need to apply the predicate push down filter
-      footer = readFooter(configuration, file, range(split.getStart(), split.getEnd()));
+      footer = readSplitFooter(
+          configuration, file, cacheKey, range(split.getStart(), split.getEnd()));
       MessageType fileSchema = footer.getFileMetaData().getSchema();
       FilterCompat.Filter filter = getFilter(configuration);
       blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema);
     } else {
       // otherwise we find the row groups that were selected on the client
-      footer = readFooter(configuration, file, NO_FILTER);
+      footer = readSplitFooter(configuration, file, cacheKey, NO_FILTER);
       Set<Long> offsets = new HashSet<>();
       for (long offset : rowGroupOffsets) {
         offsets.add(offset);
@@ -230,10 +278,98 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
       requestedSchema = fileSchema;
     }
 
+    Path path = wrapPathForCache(file, cacheKey, configuration, blocks);
     this.reader = new ParquetFileReader(
-      configuration, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns());
+      configuration, footer.getFileMetaData(), path, blocks, requestedSchema.getColumns());
+  }
+
+  private Path wrapPathForCache(Path path, Object fileKey, JobConf configuration,
+      List<BlockMetaData> blocks) throws IOException {
+    if (fileKey == null || cache == null) {
+      return path;
+    }
+    HashSet<ColumnPath> includedCols = new HashSet<>();
+    for (ColumnDescriptor col : requestedSchema.getColumns()) {
+      includedCols.add(ColumnPath.get(col.getPath()));
+    }
+    // We could make some assumptions given how the reader currently does the work (consecutive
+    // chunks, etc.; blocks and columns stored in offset order in the lists), but we won't -
+    // just save all the chunk boundaries and lengths for now.
+    TreeMap<Long, Long> chunkIndex = new TreeMap<>();
+    for (BlockMetaData block : blocks) {
+      for (ColumnChunkMetaData mc : block.getColumns()) {
+        if (!includedCols.contains(mc.getPath())) continue;
+        chunkIndex.put(mc.getStartingPos(), mc.getStartingPos() + mc.getTotalSize());
+      }
+    }
+    // Register the cache-aware path so that Parquet reader would go thru it.
+    configuration.set("fs." + LlapCacheAwareFs.SCHEME + ".impl",
+        LlapCacheAwareFs.class.getCanonicalName());
+    path = LlapCacheAwareFs.registerFile(cache, path, fileKey, chunkIndex, configuration);
+    this.cacheFsPath = path;
+    return path;
+  }
+
+  private ParquetMetadata readSplitFooter(
+      JobConf configuration, final Path file, Object cacheKey, MetadataFilter filter) throws IOException {
+    MemoryBufferOrBuffers footerData = (cacheKey == null || metadataCache == null) ? null
+        : metadataCache.getFileMetadata(cacheKey);
+    if (footerData != null) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("Found the footer in cache for " + cacheKey);
+      }
+      try {
+        return ParquetFileReader.readFooter(new ParquetFooterInputFromCache(footerData), filter);
+      } finally {
+        metadataCache.decRefBuffer(footerData);
+      }
+    }
+    final FileSystem fs = file.getFileSystem(configuration);
+    final FileStatus stat = fs.getFileStatus(file);
+    if (cacheKey == null || metadataCache == null) {
+      return readFooterFromFile(file, fs, stat, filter);
+    }
+
+    // To avoid reading the footer twice, we will cache it first and then read from cache.
+    // Parquet calls protobuf methods directly on the stream and we can't get bytes after the fact.
+    try (SeekableInputStream stream = HadoopStreams.wrap(fs.open(file))) {
+      long footerLengthIndex = stat.getLen()
+          - ParquetFooterInputFromCache.FOOTER_LENGTH_SIZE - ParquetFileWriter.MAGIC.length;
+      stream.seek(footerLengthIndex);
+      int footerLength = BytesUtils.readIntLittleEndian(stream);
+      stream.seek(footerLengthIndex - footerLength);
+      if (LOG.isInfoEnabled()) {
+        LOG.info("Caching the footer of length " + footerLength + " for " + cacheKey);
+      }
+      footerData = metadataCache.putFileMetadata(cacheKey, footerLength, stream);
+      try {
+        return ParquetFileReader.readFooter(new ParquetFooterInputFromCache(footerData), filter);
+      } finally {
+        metadataCache.decRefBuffer(footerData);
+      }
+    }
   }
 
+  private ParquetMetadata readFooterFromFile(final Path file, final FileSystem fs,
+      final FileStatus stat, MetadataFilter filter) throws IOException {
+    InputFile inputFile = new InputFile() {
+      @Override
+      public SeekableInputStream newStream() throws IOException {
+        return HadoopStreams.wrap(fs.open(file));
+      }
+      @Override
+      public long getLength() throws IOException {
+        return stat.getLen();
+      }
+    };
+    return ParquetFileReader.readFooter(inputFile, filter);
+  }
+
+
+  private FileMetadataCache metadataCache;
+  private DataCache cache;
+  private Configuration cacheConf;
+
   @Override
   public boolean next(
     NullWritable nullWritable,
@@ -259,6 +395,9 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase
 
   @Override
   public void close() throws IOException {
+    if (cacheFsPath != null) {
+      LlapCacheAwareFs.unregisterFile(cacheFsPath);
+    }
     if (reader != null) {
       reader.close();
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
index 8b99ae0..2e63260 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
@@ -258,7 +258,7 @@ public class MapWork extends BaseWork {
   }
 
   public void deriveLlap(Configuration conf, boolean isExecDriver) {
-    boolean hasLlap = false, hasNonLlap = false, hasAcid = false;
+    boolean hasLlap = false, hasNonLlap = false, hasAcid = false, hasCacheOnly = false;
     // Assume the IO is enabled on the daemon by default. We cannot reasonably check it here.
     boolean isLlapOn = HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ENABLED, llapMode);
     boolean canWrapAny = false, doCheckIfs = false;
@@ -272,10 +272,9 @@ public class MapWork extends BaseWork {
       }
     }
     boolean hasPathToPartInfo = (pathToPartitionInfo != null && !pathToPartitionInfo.isEmpty());
-    if (canWrapAny && hasPathToPartInfo) {
-      assert isLlapOn;
+    if (hasPathToPartInfo) {
       for (PartitionDesc part : pathToPartitionInfo.values()) {
-        boolean isUsingLlapIo = HiveInputFormat.canWrapForLlap(
+        boolean isUsingLlapIo = canWrapAny && HiveInputFormat.canWrapForLlap(
             part.getInputFileFormatClass(), doCheckIfs);
         if (isUsingLlapIo) {
           if (part.getTableDesc() != null &&
@@ -284,28 +283,31 @@ public class MapWork extends BaseWork {
           } else {
             hasLlap = true;
           }
+        } else if (isLlapOn && HiveInputFormat.canInjectCaches(part.getInputFileFormatClass())) {
+          hasCacheOnly = true;
         } else {
           hasNonLlap = true;
         }
       }
     }
 
-    // check if the column types that are read are supported by LLAP IO
-    if (hasLlap) {
-      // TODO: no need for now hasLlap = checkVectorizerSupportedTypes();
-    }
-
     llapIoDesc = deriveLlapIoDescString(
-        isLlapOn, canWrapAny, hasPathToPartInfo, hasLlap, hasNonLlap, hasAcid);
+        isLlapOn, canWrapAny, hasPathToPartInfo, hasLlap, hasNonLlap, hasAcid, hasCacheOnly);
   }
 
   private static String deriveLlapIoDescString(boolean isLlapOn, boolean canWrapAny,
-      boolean hasPathToPartInfo, boolean hasLlap, boolean hasNonLlap, boolean hasAcid) {
+      boolean hasPathToPartInfo, boolean hasLlap, boolean hasNonLlap, boolean hasAcid,
+      boolean hasCacheOnly) {
     if (!isLlapOn) return null; // LLAP IO is off, don't output.
-    if (!canWrapAny) return "no inputs"; // Cannot use with input formats.
+    if (!canWrapAny && !hasCacheOnly) return "no inputs"; // Cannot use with input formats.
     if (!hasPathToPartInfo) return "unknown"; // No information to judge.
+    int varieties = (hasAcid ? 1 : 0) + (hasLlap ? 1 : 0)
+        + (hasCacheOnly ? 1 : 0) + (hasNonLlap ? 1 : 0);
+    if (varieties > 1) return "some inputs"; // Will probably never actually happen.
     if (hasAcid) return "may be used (ACID table)";
-    return (hasLlap ? (hasNonLlap ? "some inputs" : "all inputs") : "no inputs");
+    if (hasLlap) return "all inputs";
+    if (hasCacheOnly) return "all inputs (cache only)";
+    return "no inputs";
   }
 
   public void internTable(Interner<TableDesc> interner) {

http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/ql/src/test/queries/clientpositive/parquet_ppd_decimal.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/parquet_ppd_decimal.q b/ql/src/test/queries/clientpositive/parquet_ppd_decimal.q
index dfca486..ff883db 100644
--- a/ql/src/test/queries/clientpositive/parquet_ppd_decimal.q
+++ b/ql/src/test/queries/clientpositive/parquet_ppd_decimal.q
@@ -1,6 +1,7 @@
 SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
 SET mapred.min.split.size=1000;
 SET mapred.max.split.size=5000;
+set hive.llap.cache.allow.synthetic.fileid=true;
 
 create table newtypestbl(c char(10), v varchar(10), d decimal(5,3), da date) stored as parquet;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/9e673a73/ql/src/test/queries/clientpositive/parquet_predicate_pushdown.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/parquet_predicate_pushdown.q b/ql/src/test/queries/clientpositive/parquet_predicate_pushdown.q
index a38cdbe..0e8dd04 100644
--- a/ql/src/test/queries/clientpositive/parquet_predicate_pushdown.q
+++ b/ql/src/test/queries/clientpositive/parquet_predicate_pushdown.q
@@ -1,5 +1,6 @@
 set hive.mapred.mode=nonstrict;
 SET hive.optimize.ppd=true;
+set hive.llap.cache.allow.synthetic.fileid=true;
 
 -- SORT_QUERY_RESULTS
 CREATE TABLE tbl_pred(t tinyint,