You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/03/16 02:47:03 UTC

[2/2] hive git commit: HIVE-11675 : make use of file footer PPD API in ETL strategy or separate strategy (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

HIVE-11675 : make use of file footer PPD API in ETL strategy or separate strategy (Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/868db42a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/868db42a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/868db42a

Branch: refs/heads/master
Commit: 868db42a695e3137c65b53386eb4d2b2ec76b265
Parents: 26b5c7b
Author: Sergey Shelukhin <se...@apache.org>
Authored: Tue Mar 15 18:37:29 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Tue Mar 15 18:37:29 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   5 +-
 .../hadoop/hive/metastore/FileFormatProxy.java  |   6 +-
 .../hive/metastore/FileMetadataHandler.java     |   2 +-
 .../hadoop/hive/metastore/HiveMetaStore.java    |  19 +-
 .../hive/metastore/HiveMetaStoreClient.java     |  43 +-
 .../hadoop/hive/metastore/IMetaStoreClient.java |   4 +
 .../filemeta/OrcFileMetadataHandler.java        |  15 +-
 orc/src/java/org/apache/orc/impl/InStream.java  |   2 +-
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java |   4 +-
 .../org/apache/hadoop/hive/ql/io/HdfsUtils.java |  17 +
 .../hadoop/hive/ql/io/orc/ExternalCache.java    | 338 +++++++++++
 .../hadoop/hive/ql/io/orc/LocalCache.java       | 112 ++++
 .../io/orc/MetastoreExternalCachesByConf.java   |  82 +++
 .../hive/ql/io/orc/OrcFileFormatProxy.java      |  14 +-
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   | 593 ++++++++-----------
 .../hive/ql/io/orc/OrcNewInputFormat.java       |  16 +-
 .../apache/hadoop/hive/ql/metadata/Hive.java    |  12 +-
 .../hive/ql/io/orc/TestInputOutputFormat.java   |  12 +-
 .../hive/ql/io/orc/TestOrcSplitElimination.java | 405 +++++++++++--
 .../hadoop/hive/ql/io/sarg/PredicateLeaf.java   |   1 -
 .../hive/ql/io/sarg/SearchArgumentImpl.java     |   1 -
 21 files changed, 1286 insertions(+), 417 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/868db42a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 9fd6648..98c6372 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1213,6 +1213,9 @@ public class HiveConf extends Configuration {
 
     HIVE_ORC_MS_FOOTER_CACHE_ENABLED("hive.orc.splits.ms.footer.cache.enabled", false,
         "Whether to enable using file metadata cache in metastore for ORC file footers."),
+    HIVE_ORC_MS_FOOTER_CACHE_PPD("hive.orc.splits.ms.footer.cache.ppd.enabled", true,
+        "Whether to enable file footer cache PPD (hive.orc.splits.ms.footer.cache.enabled\n" +
+        "must also be set to true for this to work)."),
 
     HIVE_ORC_INCLUDE_FILE_FOOTER_IN_SPLITS("hive.orc.splits.include.file.footer", false,
         "If turned on splits generated by orc will include metadata about the stripes in the file. This\n" +
@@ -1222,7 +1225,7 @@ public class HiveConf extends Configuration {
         "generation. 0 means process directories individually. This can increase the number of\n" +
         "metastore calls if metastore metadata cache is used."),
     HIVE_ORC_INCLUDE_FILE_ID_IN_SPLITS("hive.orc.splits.include.fileid", true,
-        "Include file ID in splits on file systems thaty support it."),
+        "Include file ID in splits on file systems that support it."),
     HIVE_ORC_ALLOW_SYNTHETIC_FILE_ID_IN_SPLITS("hive.orc.splits.allow.synthetic.fileid", true,
         "Allow synthetic file ID in splits on file systems that don't have a native one."),
     HIVE_ORC_CACHE_STRIPE_DETAILS_SIZE("hive.orc.cache.stripe.details.size", 10000,

http://git-wip-us.apache.org/repos/asf/hive/blob/868db42a/metastore/src/java/org/apache/hadoop/hive/metastore/FileFormatProxy.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/FileFormatProxy.java b/metastore/src/java/org/apache/hadoop/hive/metastore/FileFormatProxy.java
index ec0be2b..14ff187 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/FileFormatProxy.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/FileFormatProxy.java
@@ -23,6 +23,7 @@ import java.util.List;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.Metastore.SplitInfos;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 
 /**
@@ -33,11 +34,10 @@ public interface FileFormatProxy {
   /**
    * Applies SARG to file metadata, and produces some result for this file.
    * @param sarg SARG
-   * @param byteBuffer File metadata from metastore cache.
+   * @param fileMetadata File metadata from metastore cache.
    * @return The result to return to client for this file, or null if file is eliminated.
-   * @throws IOException
    */
-  ByteBuffer applySargToMetadata(SearchArgument sarg, ByteBuffer byteBuffer) throws IOException;
+  SplitInfos applySargToMetadata(SearchArgument sarg, ByteBuffer fileMetadata) throws IOException;
 
   /**
    * @param fs The filesystem of the file.

http://git-wip-us.apache.org/repos/asf/hive/blob/868db42a/metastore/src/java/org/apache/hadoop/hive/metastore/FileMetadataHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/FileMetadataHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/FileMetadataHandler.java
index bd4e188..832daec 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/FileMetadataHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/FileMetadataHandler.java
@@ -37,7 +37,7 @@ import org.apache.hadoop.hive.metastore.hbase.MetadataStore;
  * contains the actual implementation that depends on some stuff in QL (for ORC).
  */
 public abstract class FileMetadataHandler {
-  static final Log LOG = LogFactory.getLog(FileMetadataHandler.class);
+  protected static final Log LOG = LogFactory.getLog(FileMetadataHandler.class);
 
   private Configuration conf;
   private PartitionExpressionProxy expressionProxy;

http://git-wip-us.apache.org/repos/asf/hive/blob/868db42a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 2fa0e9a..c9fadad 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -5947,17 +5947,16 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       boolean[] eliminated = new boolean[fileIds.size()];
 
       getMS().getFileMetadataByExpr(fileIds, type, req.getExpr(), metadatas, ppdResults, eliminated);
-      for (int i = 0; i < metadatas.length; ++i) {
-        long fileId = fileIds.get(i);
-        ByteBuffer metadata = metadatas[i];
-        if (metadata == null) continue;
-        metadata = (eliminated[i] || !needMetadata) ? null
-            : handleReadOnlyBufferForThrift(metadata);
+      for (int i = 0; i < fileIds.size(); ++i) {
+        if (!eliminated[i] && ppdResults[i] == null) continue; // No metadata => no ppd.
         MetadataPpdResult mpr = new MetadataPpdResult();
-        ByteBuffer bitset = eliminated[i] ? null : handleReadOnlyBufferForThrift(ppdResults[i]);
-        mpr.setMetadata(metadata);
-        mpr.setIncludeBitset(bitset);
-        result.putToMetadata(fileId, mpr);
+        ByteBuffer ppdResult = eliminated[i] ? null : handleReadOnlyBufferForThrift(ppdResults[i]);
+        mpr.setIncludeBitset(ppdResult);
+        if (needMetadata) {
+          ByteBuffer metadata = eliminated[i] ? null : handleReadOnlyBufferForThrift(metadatas[i]);
+          mpr.setMetadata(metadata);
+        }
+        result.putToMetadata(fileIds.get(i), mpr);
       }
       if (!result.isSetMetadata()) {
         result.setMetadata(EMPTY_MAP_FM2); // Set the required field.

http://git-wip-us.apache.org/repos/asf/hive/blob/868db42a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index 9048d45..cdd12ab 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hive.metastore;
 
 import org.apache.hadoop.hive.common.ObjectPair;
-import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.classification.InterfaceAudience;
 import org.apache.hadoop.hive.common.classification.InterfaceAudience.Public;
@@ -54,6 +53,8 @@ import org.apache.hadoop.hive.metastore.api.FireEventResponse;
 import org.apache.hadoop.hive.metastore.api.Function;
 import org.apache.hadoop.hive.metastore.api.GetAllFunctionsResponse;
 import org.apache.hadoop.hive.metastore.api.GetChangeVersionRequest;
+import org.apache.hadoop.hive.metastore.api.GetFileMetadataByExprRequest;
+import org.apache.hadoop.hive.metastore.api.GetFileMetadataByExprResult;
 import org.apache.hadoop.hive.metastore.api.GetFileMetadataRequest;
 import org.apache.hadoop.hive.metastore.api.GetFileMetadataResult;
 import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
@@ -79,6 +80,7 @@ import org.apache.hadoop.hive.metastore.api.InvalidPartitionException;
 import org.apache.hadoop.hive.metastore.api.LockRequest;
 import org.apache.hadoop.hive.metastore.api.LockResponse;
 import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.MetadataPpdResult;
 import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
@@ -2247,15 +2249,48 @@ public class HiveMetaStoreClient implements IMetaStoreClient {
         if (listIndex == fileIds.size()) return null;
         int endIndex = Math.min(listIndex + fileMetadataBatchSize, fileIds.size());
         List<Long> subList = fileIds.subList(listIndex, endIndex);
-        GetFileMetadataRequest req = new GetFileMetadataRequest();
-        req.setFileIds(subList);
-        GetFileMetadataResult resp = client.get_file_metadata(req);
+        GetFileMetadataResult resp = sendGetFileMetadataReq(subList);
+        // TODO: we could remember if it's unsupported and stop sending calls; although, it might
+        //       be a bad idea for HS2+standalone metastore that could be updated with support.
+        //       Maybe we should just remember this for some time.
+        if (!resp.isIsSupported()) return null;
         listIndex = endIndex;
         return resp.getMetadata();
       }
     };
   }
 
+  private GetFileMetadataResult sendGetFileMetadataReq(List<Long> fileIds) throws TException {
+    return client.get_file_metadata(new GetFileMetadataRequest(fileIds));
+  }
+
+  @Override
+  public Iterable<Entry<Long, MetadataPpdResult>> getFileMetadataBySarg(
+      final List<Long> fileIds, final ByteBuffer sarg, final boolean doGetFooters)
+          throws TException {
+    return new MetastoreMapIterable<Long, MetadataPpdResult>() {
+      private int listIndex = 0;
+      @Override
+      protected Map<Long, MetadataPpdResult> fetchNextBatch() throws TException {
+        if (listIndex == fileIds.size()) return null;
+        int endIndex = Math.min(listIndex + fileMetadataBatchSize, fileIds.size());
+        List<Long> subList = fileIds.subList(listIndex, endIndex);
+        GetFileMetadataByExprResult resp = sendGetFileMetadataBySargReq(
+            sarg, subList, doGetFooters);
+        if (!resp.isIsSupported()) return null;
+        listIndex = endIndex;
+        return resp.getMetadata();
+      }
+    };
+  }
+
+  private GetFileMetadataByExprResult sendGetFileMetadataBySargReq(
+      ByteBuffer sarg, List<Long> fileIds, boolean doGetFooters) throws TException {
+    GetFileMetadataByExprRequest req = new GetFileMetadataByExprRequest(fileIds, sarg);
+    req.setDoGetFooters(doGetFooters); // No need to get footers
+    return client.get_file_metadata_by_expr(req);
+  }
+
   public static abstract class MetastoreMapIterable<K, V>
     implements Iterable<Entry<K, V>>, Iterator<Entry<K, V>> {
     private Iterator<Entry<K, V>> currentIter;

http://git-wip-us.apache.org/repos/asf/hive/blob/868db42a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
index 62677d1..39cf927 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.hive.metastore.api.InvalidPartitionException;
 import org.apache.hadoop.hive.metastore.api.LockRequest;
 import org.apache.hadoop.hive.metastore.api.LockResponse;
 import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.MetadataPpdResult;
 import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
@@ -1534,6 +1535,9 @@ public interface IMetaStoreClient {
    */
   Iterable<Entry<Long, ByteBuffer>> getFileMetadata(List<Long> fileIds) throws TException;
 
+  Iterable<Entry<Long, MetadataPpdResult>> getFileMetadataBySarg(
+      List<Long> fileIds, ByteBuffer sarg, boolean doGetFooters) throws TException;
+
   /**
    * Cleares the file metadata cache for respective file IDs.
    */

http://git-wip-us.apache.org/repos/asf/hive/blob/868db42a/metastore/src/java/org/apache/hadoop/hive/metastore/filemeta/OrcFileMetadataHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/filemeta/OrcFileMetadataHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/filemeta/OrcFileMetadataHandler.java
index 1b388aa..3bca85d 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/filemeta/OrcFileMetadataHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/filemeta/OrcFileMetadataHandler.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.hadoop.hive.metastore.FileMetadataHandler;
+import org.apache.hadoop.hive.metastore.Metastore.SplitInfos;
 import org.apache.hadoop.hive.metastore.api.FileMetadataExprType;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 
@@ -44,11 +45,21 @@ public class OrcFileMetadataHandler extends FileMetadataHandler {
     }
     getStore().getFileMetadata(fileIds, metadatas);
     for (int i = 0; i < metadatas.length;  ++i) {
+      eliminated[i] = false;
+      results[i] = null;
       if (metadatas[i] == null) continue;
-      ByteBuffer result = getFileFormatProxy().applySargToMetadata(sarg, metadatas[i]);
+      ByteBuffer metadata = metadatas[i].duplicate(); // Duplicate to avoid modification.
+      SplitInfos result = null;
+      try {
+        result = getFileFormatProxy().applySargToMetadata(sarg, metadata);
+      } catch (IOException ex) {
+        LOG.error("Failed to apply SARG to metadata", ex);
+        metadatas[i] = null;
+        continue;
+      }
       eliminated[i] = (result == null);
       if (!eliminated[i]) {
-        results[i] = result;
+        results[i] = ByteBuffer.wrap(result.toByteArray());
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/868db42a/orc/src/java/org/apache/orc/impl/InStream.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/impl/InStream.java b/orc/src/java/org/apache/orc/impl/InStream.java
index b1c6de5..1893afe 100644
--- a/orc/src/java/org/apache/orc/impl/InStream.java
+++ b/orc/src/java/org/apache/orc/impl/InStream.java
@@ -35,7 +35,7 @@ import com.google.protobuf.CodedInputStream;
 public abstract class InStream extends InputStream {
 
   private static final Logger LOG = LoggerFactory.getLogger(InStream.class);
-  private static final int PROTOBUF_MESSAGE_MAX_LIMIT = 1024 << 20; // 1GB
+  public static final int PROTOBUF_MESSAGE_MAX_LIMIT = 1024 << 20; // 1GB
 
   protected final String name;
   protected long length;

http://git-wip-us.apache.org/repos/asf/hive/blob/868db42a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 2b50a2a..9446876 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -481,7 +481,7 @@ public class AcidUtils {
       try {
         childrenWithId = SHIMS.listLocatedHdfsStatus(fs, directory, hiddenFileFilter);
       } catch (Throwable t) {
-        LOG.error("Failed to get files with ID; using regular API", t);
+        LOG.error("Failed to get files with ID; using regular API: " + t.getMessage());
         useFileIds = false;
       }
     }
@@ -648,7 +648,7 @@ public class AcidUtils {
       try {
         childrenWithId = SHIMS.listLocatedHdfsStatus(fs, stat.getPath(), hiddenFileFilter);
       } catch (Throwable t) {
-        LOG.error("Failed to get files with ID; using regular API", t);
+        LOG.error("Failed to get files with ID; using regular API: " + t.getMessage());
         useFileIds = false;
       }
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/868db42a/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java
index 1a40847..b71ca09 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java
@@ -47,6 +47,23 @@ public class HdfsUtils {
     return new SyntheticFileId(path, fs.getLen(), fs.getModificationTime());
   }
 
+  public static long createFileId(String pathStr, FileStatus fs, boolean doLog, String fsName) {
+    int nameHash = pathStr.hashCode();
+    long fileSize = fs.getLen(), modTime = fs.getModificationTime();
+    int fileSizeHash = (int)(fileSize ^ (fileSize >>> 32)),
+        modTimeHash = (int)(modTime ^ (modTime >>> 32)),
+        combinedHash = modTimeHash ^ fileSizeHash;
+    long id = (((long)nameHash & 0xffffffffL) << 32) | ((long)combinedHash & 0xffffffffL);
+    if (doLog) {
+      LOG.warn("Cannot get unique file ID from " + fsName + "; using " + id
+          + " (" + pathStr + "," + nameHash + "," + fileSize + ")");
+    }
+    return id;
+  }
+
+
+
+
   // TODO: this relies on HDFS not changing the format; we assume if we could get inode ID, this
   //       is still going to work. Otherwise, file IDs can be turned off. Later, we should use
   //       as public utility method in HDFS to obtain the inode-based path.

http://git-wip-us.apache.org/repos/asf/hive/blob/868db42a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ExternalCache.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ExternalCache.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ExternalCache.java
new file mode 100644
index 0000000..6556fbf
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ExternalCache.java
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.codec.binary.Hex;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.MetadataPpdResult;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
+import org.apache.hadoop.hive.ql.io.HdfsUtils;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.FileInfo;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.FooterCache;
+import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
+import org.apache.orc.FileMetaInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.collect.Lists;
+
+/** Metastore-based footer cache storing serialized footers. Also has a local cache. */
+public class ExternalCache implements FooterCache {
+  private static final Logger LOG = LoggerFactory.getLogger(ExternalCache.class);
+  private static boolean isDebugEnabled = LOG.isDebugEnabled();
+
+  private final LocalCache localCache;
+  private final ExternalFooterCachesByConf externalCacheSrc;
+  private boolean isWarnLogged = false;
+
+  // Configuration and things set from it.
+  private HiveConf conf;
+  private boolean isInTest;
+  private SearchArgument sarg;
+  private ByteBuffer sargIsOriginal, sargNotIsOriginal;
+  private boolean isPpdEnabled;
+
+  public ExternalCache(LocalCache lc, ExternalFooterCachesByConf efcf) {
+    localCache = lc;
+    externalCacheSrc = efcf;
+  }
+
+  @Override
+  public void put(Long fileId, FileStatus file, FileMetaInfo fileMetaInfo, Reader orcReader)
+      throws IOException {
+    localCache.put(fileId, file, fileMetaInfo, orcReader);
+    if (fileId != null) {
+      try {
+        externalCacheSrc.getCache(conf).putFileMetadata(Lists.newArrayList(fileId),
+            Lists.newArrayList(((ReaderImpl)orcReader).getSerializedFileFooter()));
+      } catch (HiveException e) {
+        throw new IOException(e);
+      }
+    }
+  }
+
+  @Override
+  public boolean isBlocking() {
+    return true;
+  }
+
+  @Override
+  public boolean hasPpd() {
+    return isPpdEnabled;
+  }
+
+  public void configure(HiveConf queryConfig) {
+    this.conf = queryConfig;
+    this.sarg = ConvertAstToSearchArg.createFromConf(conf);
+    this.isPpdEnabled = HiveConf.getBoolVar(conf, ConfVars.HIVEOPTINDEXFILTER)
+        && HiveConf.getBoolVar(conf, ConfVars.HIVE_ORC_MS_FOOTER_CACHE_PPD);
+    this.isInTest = HiveConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST);
+    this.sargIsOriginal = this.sargNotIsOriginal = null;
+  }
+
+  @Override
+  public void getAndValidate(List<HdfsFileStatusWithId> files, boolean isOriginal,
+      FileInfo[] result, ByteBuffer[] ppdResult) throws IOException, HiveException {
+    assert result.length == files.size();
+    assert ppdResult == null || ppdResult.length == files.size();
+    // First, check the local cache.
+    localCache.getAndValidate(files, isOriginal, result, ppdResult);
+
+    // posMap is an unfortunate consequence of batching/iterating thru MS results.
+    HashMap<Long, Integer> posMap = new HashMap<Long, Integer>();
+    // We won't do metastore-side PPD for the things we have locally.
+    List<Long> fileIds = determineFileIdsToQuery(files, result, posMap);
+     // Need to get a new one, see the comment wrt threadlocals.
+    ExternalFooterCachesByConf.Cache cache = externalCacheSrc.getCache(conf);
+    ByteBuffer serializedSarg = null;
+    if (isPpdEnabled) {
+      serializedSarg = getSerializedSargForMetastore(isOriginal);
+    }
+    if (serializedSarg != null) {
+      Iterator<Entry<Long, MetadataPpdResult>> iter = cache.getFileMetadataByExpr(
+          fileIds, serializedSarg, false); // don't fetch the footer, PPD happens in MS.
+      while (iter.hasNext()) {
+        Entry<Long, MetadataPpdResult> e = iter.next();
+        int ix = getAndVerifyIndex(posMap, files, result, e.getKey());
+        processPpdResult(e.getValue(), files.get(ix), ix, result, ppdResult);
+      }
+    } else {
+      // Only populate corrupt IDs for the things we couldn't deserialize if we are not using
+      // ppd. We assume that PPD makes sure the cached values are correct (or fails otherwise);
+      // also, we don't use the footers in PPD case.
+      List<Long> corruptIds = null;
+      Iterator<Entry<Long, ByteBuffer>> iter = cache.getFileMetadata(fileIds);
+      while (iter.hasNext()) {
+        Entry<Long, ByteBuffer> e = iter.next();
+        int ix = getAndVerifyIndex(posMap, files, result, e.getKey());
+        if (!processBbResult(e.getValue(), ix, files.get(ix), result))  {
+          if (corruptIds == null) {
+            corruptIds = new ArrayList<>();
+          }
+          corruptIds.add(e.getKey());
+        }
+      }
+      if (corruptIds != null) {
+        cache.clearFileMetadata(corruptIds);
+      }
+    }
+  }
+
+  private int getAndVerifyIndex(HashMap<Long, Integer> posMap,
+      List<HdfsFileStatusWithId> files, FileInfo[] result, Long fileId) {
+    int ix = posMap.get(fileId);
+    assert result[ix] == null;
+    assert fileId != null && fileId.equals(files.get(ix).getFileId());
+    return ix;
+  }
+
+  private boolean processBbResult(
+      ByteBuffer bb, int ix, HdfsFileStatusWithId file, FileInfo[] result) throws IOException {
+    if (bb == null) return true;
+    result[ix] = createFileInfoFromMs(file, bb);
+    if (result[ix] == null) {
+      return false;
+    }
+
+    localCache.put(file.getFileStatus().getPath(), result[ix]);
+    return true;
+  }
+
+  private void processPpdResult(MetadataPpdResult mpr, HdfsFileStatusWithId file,
+      int ix, FileInfo[] result, ByteBuffer[] ppdResult) throws IOException {
+    if (mpr == null) return; // This file is unknown to metastore.
+
+    ppdResult[ix] = mpr.isSetIncludeBitset() ? mpr.bufferForIncludeBitset() : NO_SPLIT_AFTER_PPD;
+    if (mpr.isSetMetadata()) {
+      result[ix] = createFileInfoFromMs(file, mpr.bufferForMetadata());
+      if (result[ix] != null) {
+        localCache.put(file.getFileStatus().getPath(), result[ix]);
+      }
+    }
+  }
+
+  private List<Long> determineFileIdsToQuery(
+      List<HdfsFileStatusWithId> files, FileInfo[] result, HashMap<Long, Integer> posMap) {
+    for (int i = 0; i < result.length; ++i) {
+      if (result[i] != null) continue;
+      HdfsFileStatusWithId file = files.get(i);
+      final FileStatus fs = file.getFileStatus();
+      Long fileId = file.getFileId();
+      if (fileId == null) {
+        if (!isInTest) {
+          if (!isWarnLogged || isDebugEnabled) {
+            LOG.warn("Not using metastore cache because fileId is missing: " + fs.getPath());
+            isWarnLogged = true;
+          }
+          continue;
+        }
+        fileId = generateTestFileId(fs, files, i);
+        LOG.info("Generated file ID " + fileId + " at " + i);
+      }
+      posMap.put(fileId, i);
+    }
+    return Lists.newArrayList(posMap.keySet());
+  }
+
+  private Long generateTestFileId(final FileStatus fs, List<HdfsFileStatusWithId> files, int i) {
+    final Long fileId = HdfsUtils.createFileId(fs.getPath().toUri().getPath(), fs, false, null);
+    files.set(i, new HdfsFileStatusWithId() {
+      @Override
+      public FileStatus getFileStatus() {
+        return fs;
+      }
+
+      @Override
+      public Long getFileId() {
+        return fileId;
+      }
+    });
+    return fileId;
+  }
+
+  private ByteBuffer getSerializedSargForMetastore(boolean isOriginal) {
+    if (sarg == null) return null;
+    ByteBuffer serializedSarg = isOriginal ? sargIsOriginal : sargNotIsOriginal;
+    if (serializedSarg != null) return serializedSarg;
+    SearchArgument sarg2 = sarg;
+    Kryo kryo = SerializationUtilities.borrowKryo();
+    try {
+      if ((isOriginal ? sargNotIsOriginal : sargIsOriginal) == null) {
+        sarg2 = kryo.copy(sarg2); // In case we need it for the other case.
+      }
+      translateSargToTableColIndexes(sarg2, conf, OrcInputFormat.getRootColumn(isOriginal));
+      ExternalCache.Baos baos = new Baos();
+      Output output = new Output(baos);
+      kryo.writeObject(output, sarg2);
+      output.flush();
+      serializedSarg = baos.get();
+      if (isOriginal) {
+        sargIsOriginal = serializedSarg;
+      } else {
+        sargNotIsOriginal = serializedSarg;
+      }
+    } finally {
+      SerializationUtilities.releaseKryo(kryo);
+    }
+    return serializedSarg;
+  }
+
+  /**
+   * Modifies the SARG, replacing column names with column indexes in target table schema. This
+   * basically does the same thing as all the shennannigans with included columns, except for the
+   * last step where ORC gets direct subtypes of root column and uses the ordered match to map
+   * table columns to file columns. The numbers put into predicate leaf should allow to go into
+   * said subtypes directly by index to get the proper index in the file.
+   * This won't work with schema evolution, although it's probably much easier to reason about
+   * if schema evolution was to be supported, because this is a clear boundary between table
+   * schema columns and all things ORC. None of the ORC stuff is used here and none of the
+   * table schema stuff is used after that - ORC doesn't need a bunch of extra crap to apply
+   * the SARG thus modified.
+   */
+  public static void translateSargToTableColIndexes(
+      SearchArgument sarg, Configuration conf, int rootColumn) {
+    String nameStr = OrcInputFormat.getNeededColumnNamesString(conf),
+        idStr = OrcInputFormat.getSargColumnIDsString(conf);
+    String[] knownNames = nameStr.split(",");
+    String[] idStrs = (idStr == null) ? null : idStr.split(",");
+    assert idStrs == null || knownNames.length == idStrs.length;
+    HashMap<String, Integer> nameIdMap = new HashMap<>();
+    for (int i = 0; i < knownNames.length; ++i) {
+      Integer newId = (idStrs != null) ? Integer.parseInt(idStrs[i]) : i;
+      Integer oldId = nameIdMap.put(knownNames[i], newId);
+      if (oldId != null && oldId.intValue() != newId.intValue()) {
+        throw new RuntimeException("Multiple IDs for " + knownNames[i] + " in column strings: ["
+            + idStr + "], [" + nameStr + "]");
+      }
+    }
+    List<PredicateLeaf> leaves = sarg.getLeaves();
+    for (int i = 0; i < leaves.size(); ++i) {
+      PredicateLeaf pl = leaves.get(i);
+      Integer colId = nameIdMap.get(pl.getColumnName());
+      String newColName = RecordReaderImpl.encodeTranslatedSargColumn(rootColumn, colId);
+      SearchArgumentFactory.setPredicateLeafColumn(pl, newColName);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("SARG translated into " + sarg);
+    }
+  }
+
+  private static FileInfo createFileInfoFromMs(
+      HdfsFileStatusWithId file, ByteBuffer bb) throws IOException {
+    if (bb == null) return null;
+    FileStatus fs = file.getFileStatus();
+    ReaderImpl.FooterInfo fi = null;
+    ByteBuffer copy = bb.duplicate();
+    try {
+      fi = ReaderImpl.extractMetaInfoFromFooter(copy, fs.getPath());
+    } catch (Exception ex) {
+      byte[] data = new byte[bb.remaining()];
+      System.arraycopy(bb.array(), bb.arrayOffset() + bb.position(), data, 0, data.length);
+      String msg = "Failed to parse the footer stored in cache for file ID "
+          + file.getFileId() + " " + bb + " [ " + Hex.encodeHexString(data) + " ]";
+      LOG.error(msg, ex);
+      return null;
+    }
+    return new FileInfo(fs.getModificationTime(), fs.getLen(), fi.getStripes(), fi.getMetadata(),
+        fi.getFooter().getTypesList(), fi.getFooter().getStatisticsList(), fi.getFileMetaInfo(),
+        fi.getFileMetaInfo().writerVersion, file.getFileId());
+  }
+
+  private static final class Baos extends ByteArrayOutputStream {
+    public ByteBuffer get() {
+      return ByteBuffer.wrap(buf, 0, count);
+    }
+  }
+
+
+  /** An abstraction for testing ExternalCache in OrcInputFormat. */
+  public interface ExternalFooterCachesByConf {
+    public interface Cache {
+      Iterator<Map.Entry<Long, MetadataPpdResult>> getFileMetadataByExpr(List<Long> fileIds,
+          ByteBuffer serializedSarg, boolean doGetFooters) throws HiveException;
+      void clearFileMetadata(List<Long> fileIds) throws HiveException;
+      Iterator<Map.Entry<Long, ByteBuffer>>  getFileMetadata(List<Long> fileIds)
+          throws HiveException;
+      void putFileMetadata(
+          ArrayList<Long> keys, ArrayList<ByteBuffer> values) throws HiveException;
+    }
+
+    public Cache getCache(HiveConf conf) throws IOException;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/868db42a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/LocalCache.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/LocalCache.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/LocalCache.java
new file mode 100644
index 0000000..8151e52
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/LocalCache.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.FileInfo;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.FooterCache;
+import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
+import org.apache.orc.FileMetaInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+/** Local footer cache using Guava. Stores convoluted Java objects. */
+class LocalCache implements FooterCache {
+  private static final Logger LOG = LoggerFactory.getLogger(LocalCache.class);
+  private static boolean isDebugEnabled = LOG.isDebugEnabled();
+
+  private final Cache<Path, FileInfo> cache;
+
+  public LocalCache(int numThreads, int cacheStripeDetailsSize) {
+    cache = CacheBuilder.newBuilder()
+      .concurrencyLevel(numThreads)
+      .initialCapacity(cacheStripeDetailsSize)
+      .maximumSize(cacheStripeDetailsSize)
+      .softValues()
+      .build();
+  }
+
+  public void clear() {
+    cache.invalidateAll();
+    cache.cleanUp();
+  }
+
+  public void getAndValidate(List<HdfsFileStatusWithId> files, boolean isOriginal,
+      FileInfo[] result, ByteBuffer[] ppdResult) throws IOException {
+    // TODO: should local cache also be by fileId? Preserve the original logic for now.
+    assert result.length == files.size();
+    int i = -1;
+    for (HdfsFileStatusWithId fileWithId : files) {
+      ++i;
+      FileStatus file = fileWithId.getFileStatus();
+      Path path = file.getPath();
+      Long fileId = fileWithId.getFileId();
+      FileInfo fileInfo = cache.getIfPresent(path);
+      if (isDebugEnabled) {
+        LOG.debug("Info " + (fileInfo == null ? "not " : "") + "cached for path: " + path);
+      }
+      if (fileInfo == null) continue;
+      if ((fileId != null && fileInfo.fileId != null && fileId == fileInfo.fileId)
+          || (fileInfo.modificationTime == file.getModificationTime() &&
+          fileInfo.size == file.getLen())) {
+        result[i] = fileInfo;
+        continue;
+      }
+      // Invalidate
+      cache.invalidate(path);
+      if (isDebugEnabled) {
+        LOG.debug("Meta-Info for : " + path + " changed. CachedModificationTime: "
+            + fileInfo.modificationTime + ", CurrentModificationTime: "
+            + file.getModificationTime() + ", CachedLength: " + fileInfo.size
+            + ", CurrentLength: " + file.getLen());
+      }
+    }
+  }
+
+  public void put(Path path, FileInfo fileInfo) {
+    cache.put(path, fileInfo);
+  }
+
+  @Override
+  public void put(Long fileId, FileStatus file, FileMetaInfo fileMetaInfo, Reader orcReader)
+      throws IOException {
+    cache.put(file.getPath(), new FileInfo(file.getModificationTime(), file.getLen(),
+        orcReader.getStripes(), orcReader.getStripeStatistics(), orcReader.getTypes(),
+        orcReader.getOrcProtoFileStatistics(), fileMetaInfo, orcReader.getWriterVersion(),
+        fileId));
+  }
+
+  @Override
+  public boolean isBlocking() {
+    return false;
+  }
+
+  @Override
+  public boolean hasPpd() {
+    return false;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/868db42a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetastoreExternalCachesByConf.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetastoreExternalCachesByConf.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetastoreExternalCachesByConf.java
new file mode 100644
index 0000000..ad8f4ef
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MetastoreExternalCachesByConf.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.MetadataPpdResult;
+import org.apache.hadoop.hive.ql.io.orc.ExternalCache.ExternalFooterCachesByConf;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+
+/**
+ * An implementation of external cache and factory based on metastore.
+ */
+public class MetastoreExternalCachesByConf implements ExternalFooterCachesByConf {
+  public static class HBaseCache implements ExternalFooterCachesByConf.Cache {
+    private Hive hive;
+
+    public HBaseCache(Hive hive) {
+      this.hive = hive;
+    }
+
+    @Override
+    public Iterator<Entry<Long, MetadataPpdResult>> getFileMetadataByExpr(
+        List<Long> fileIds, ByteBuffer sarg, boolean doGetFooters) throws HiveException {
+      return hive.getFileMetadataByExpr(fileIds, sarg, doGetFooters).iterator();
+    }
+
+    @Override
+    public void clearFileMetadata(List<Long> fileIds) throws HiveException {
+      hive.clearFileMetadata(fileIds);
+    }
+
+    @Override
+    public Iterator<Entry<Long, ByteBuffer>> getFileMetadata(
+        List<Long> fileIds) throws HiveException {
+      return hive.getFileMetadata(fileIds).iterator();
+    }
+
+    @Override
+    public void putFileMetadata(
+        ArrayList<Long> fileIds, ArrayList<ByteBuffer> metadata) throws HiveException {
+      hive.putFileMetadata(fileIds, metadata);
+    }
+  }
+
+  @Override
+  public ExternalFooterCachesByConf.Cache getCache(HiveConf conf) throws IOException {
+    // TODO: we wish we could cache the Hive object, but it's not thread safe, and each
+    //       threadlocal we "cache" would need to be reinitialized for every query. This is
+    //       a huge PITA. Hive object will be cached internally, but the compat check will be
+    //       done every time inside get().
+    try {
+      return new HBaseCache(Hive.getWithFastCheck(conf));
+    } catch (HiveException e) {
+      throw new IOException(e);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/868db42a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileFormatProxy.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileFormatProxy.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileFormatProxy.java
index ef76723..c9c7b5a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileFormatProxy.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileFormatProxy.java
@@ -29,16 +29,19 @@ import org.apache.hadoop.hive.metastore.Metastore.SplitInfos;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.orc.OrcProto;
 import org.apache.orc.StripeInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** File format proxy for ORC. */
 public class OrcFileFormatProxy implements FileFormatProxy {
+  private static final Logger LOG = LoggerFactory.getLogger(OrcFileFormatProxy.class);
 
   @Override
-  public ByteBuffer applySargToMetadata(
-      SearchArgument sarg, ByteBuffer byteBuffer) throws IOException {
+  public SplitInfos applySargToMetadata(
+      SearchArgument sarg, ByteBuffer fileMetadata) throws IOException {
     // TODO: ideally we should store shortened representation of only the necessary fields
     //       in HBase; it will probably require custom SARG application code.
-    ReaderImpl.FooterInfo fi = ReaderImpl.extractMetaInfoFromFooter(byteBuffer, null);
+    ReaderImpl.FooterInfo fi = ReaderImpl.extractMetaInfoFromFooter(fileMetadata, null);
     OrcProto.Footer footer = fi.getFooter();
     int stripeCount = footer.getStripesCount();
     boolean[] result = OrcInputFormat.pickStripesViaTranslatedSarg(
@@ -52,10 +55,13 @@ public class OrcFileFormatProxy implements FileFormatProxy {
       if (result != null && !result[i]) continue;
       isEliminated = false;
       StripeInformation si = stripes.get(i);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("PPD is adding a split " + i + ": " + si.getOffset() + ", " + si.getLength());
+      }
       sb.addInfos(SplitInfo.newBuilder().setIndex(i)
           .setOffset(si.getOffset()).setLength(si.getLength()));
     }
-    return isEliminated ? null : ByteBuffer.wrap(sb.build().toByteArray());
+    return isEliminated ? null : sb.build();
   }
 
   public ByteBuffer[] getAddedColumnsToCache() {

http://git-wip-us.apache.org/repos/asf/hive/blob/868db42a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index cd2a668..8b611bb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -18,16 +18,17 @@
 
 package org.apache.hadoop.hive.ql.io.orc;
 
+import org.apache.orc.impl.InStream;
+
+  
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.NavigableMap;
 import java.util.TreeMap;
 import java.util.concurrent.Callable;
@@ -40,7 +41,6 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.commons.codec.binary.Hex;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.io.IOConstants;
 import org.apache.hadoop.hive.serde.serdeConstants;
@@ -70,6 +70,8 @@ import org.apache.hadoop.hive.common.ValidReadTxnList;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.Metastore;
+import org.apache.hadoop.hive.metastore.Metastore.SplitInfos;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
@@ -84,15 +86,14 @@ import org.apache.hadoop.hive.ql.io.LlapWrappableInputFormatInterface;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
 import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
 import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader;
+import org.apache.hadoop.hive.ql.io.orc.ExternalCache.ExternalFooterCachesByConf;
 import org.apache.hadoop.hive.ql.io.SyntheticFileId;
 import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
 import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
-import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
-import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.SerDeStats;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -111,10 +112,9 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.orc.OrcProto;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.CodedInputStream;
 /**
  * A MapReduce/Hive input format for ORC files.
  * <p>
@@ -274,7 +274,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
    * @param isOriginal is the file in the original format?
    * @return the column number for the root of row.
    */
-  private static int getRootColumn(boolean isOriginal) {
+  static int getRootColumn(boolean isOriginal) {
     return isOriginal ? 0 : (OrcRecordUpdater.ROW + 1);
   }
 
@@ -335,45 +335,6 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     }
   }
 
-  /**
-   * Modifies the SARG, replacing column names with column indexes in target table schema. This
-   * basically does the same thing as all the shennannigans with included columns, except for the
-   * last step where ORC gets direct subtypes of root column and uses the ordered match to map
-   * table columns to file columns. The numbers put into predicate leaf should allow to go into
-   * said subtypes directly by index to get the proper index in the file.
-   * This won't work with schema evolution, although it's probably much easier to reason about
-   * if schema evolution was to be supported, because this is a clear boundary between table
-   * schema columns and all things ORC. None of the ORC stuff is used here and none of the
-   * table schema stuff is used after that - ORC doesn't need a bunch of extra crap to apply
-   * the SARG thus modified.
-   */
-  public static void translateSargToTableColIndexes(
-      SearchArgument sarg, Configuration conf, int rootColumn) {
-    String nameStr = getNeededColumnNamesString(conf), idStr = getSargColumnIDsString(conf);
-    String[] knownNames = nameStr.split(",");
-    String[] idStrs = (idStr == null) ? null : idStr.split(",");
-    assert idStrs == null || knownNames.length == idStrs.length;
-    HashMap<String, Integer> nameIdMap = new HashMap<>();
-    for (int i = 0; i < knownNames.length; ++i) {
-      Integer newId = (idStrs != null) ? Integer.parseInt(idStrs[i]) : i;
-      Integer oldId = nameIdMap.put(knownNames[i], newId);
-      if (oldId != null && oldId.intValue() != newId.intValue()) {
-        throw new RuntimeException("Multiple IDs for " + knownNames[i] + " in column strings: ["
-            + idStr + "], [" + nameStr + "]");
-      }
-    }
-    List<PredicateLeaf> leaves = sarg.getLeaves();
-    for (int i = 0; i < leaves.size(); ++i) {
-      PredicateLeaf pl = leaves.get(i);
-      Integer colId = nameIdMap.get(pl.getColumnName());
-      String newColName = RecordReaderImpl.encodeTranslatedSargColumn(rootColumn, colId);
-      SearchArgumentFactory.setPredicateLeafColumn(pl, newColName);
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("SARG translated into " + sarg);
-    }
-  }
-
   public static boolean[] genIncludedColumns(
       List<OrcProto.Type> types, List<Integer> included, boolean isOriginal) {
     int rootColumn = getRootColumn(isOriginal);
@@ -477,14 +438,15 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     return getSargColumnNames(columnNamesString.split(","), types, include, isOriginal);
   }
 
-  private static String getNeededColumnNamesString(Configuration conf) {
+  static String getNeededColumnNamesString(Configuration conf) {
     return conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR);
   }
 
-  private static String getSargColumnIDsString(Configuration conf) {
+  static String getSargColumnIDsString(Configuration conf) {
     return conf.getBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, true) ? null
         : conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR);
   }
+
   @Override
   public boolean validateInput(FileSystem fs, HiveConf conf,
                                List<FileStatus> files
@@ -542,7 +504,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     // This is not thread safe between different split generations (and wasn't anyway).
     private FooterCache footerCache;
     private static LocalCache localCache;
-    private static MetastoreCache metaCache;
+    private static ExternalCache metaCache;
     static ExecutorService threadPool = null;
     private final int numBuckets;
     private final int splitStrategyBatchMs;
@@ -559,10 +521,15 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     private final SearchArgument sarg;
 
     Context(Configuration conf) {
-      this(conf, 1);
+      this(conf, 1, null);
     }
 
     Context(Configuration conf, final int minSplits) {
+      this(conf, minSplits, null);
+    }
+
+    @VisibleForTesting
+    Context(Configuration conf, final int minSplits, ExternalFooterCachesByConf efc) {
       this.conf = conf;
       this.forceThreadpool = HiveConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST);
       this.sarg = ConvertAstToSearchArg.createFromConf(conf);
@@ -603,20 +570,22 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
           // HDFS, because only HDFS would return fileIds for us. If fileId is extended using
           // size/mod time/etc. for other FSes, we might need to check FSes explicitly because
           // using such an aggregate fileId cache is not bulletproof and should be disable-able.
-          boolean useMetastoreCache = HiveConf.getBoolVar(
+          boolean useExternalCache = HiveConf.getBoolVar(
               conf, HiveConf.ConfVars.HIVE_ORC_MS_FOOTER_CACHE_ENABLED);
           if (localCache == null) {
             localCache = new LocalCache(numThreads, cacheStripeDetailsSize);
           }
-          if (useMetastoreCache) {
+          if (useExternalCache) {
             if (metaCache == null) {
-              metaCache = new MetastoreCache(localCache);
+              metaCache = new ExternalCache(localCache,
+                  efc == null ? new MetastoreExternalCachesByConf() : efc);
             }
             assert conf instanceof HiveConf;
             metaCache.configure((HiveConf)conf);
           }
           // Set footer cache for current split generation. See field comment - not thread safe.
-          footerCache = useMetastoreCache ? metaCache : localCache;
+          // TODO: we should be able to enable caches separately
+          footerCache = useExternalCache ? metaCache : localCache;
         }
       }
       String value = conf.get(ValidTxnList.VALID_TXNS_KEY,
@@ -638,6 +607,12 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
         threadPool = null;
       }
     }
+
+    @VisibleForTesting
+    public static void clearLocalCache() {
+      if (localCache == null) return;
+      localCache.clear();
+    }
   }
 
   /**
@@ -676,12 +651,11 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     private final boolean isOriginal;
     private final List<DeltaMetaData> deltas;
     private final boolean hasBase;
+    private final ByteBuffer ppdResult;
 
-    SplitInfo(Context context, FileSystem fs,
-        HdfsFileStatusWithId fileWithId, FileInfo fileInfo,
-        boolean isOriginal,
-        List<DeltaMetaData> deltas,
-        boolean hasBase, Path dir, boolean[] covered) throws IOException {
+    SplitInfo(Context context, FileSystem fs, HdfsFileStatusWithId fileWithId, FileInfo fileInfo,
+        boolean isOriginal, List<DeltaMetaData> deltas, boolean hasBase, Path dir,
+        boolean[] covered, ByteBuffer ppdResult) throws IOException {
       super(dir, context.numBuckets, deltas, covered);
       this.context = context;
       this.fs = fs;
@@ -690,6 +664,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       this.isOriginal = isOriginal;
       this.deltas = deltas;
       this.hasBase = hasBase;
+      this.ppdResult = ppdResult;
     }
 
     @VisibleForTesting
@@ -697,7 +672,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
         boolean isOriginal, ArrayList<DeltaMetaData> deltas, boolean hasBase, Path dir,
         boolean[] covered) throws IOException {
       this(context, fs, AcidUtils.createOriginalObj(null, fileStatus),
-          fileInfo, isOriginal, deltas, hasBase, dir, covered);
+          fileInfo, isOriginal, deltas, hasBase, dir, covered, null);
     }
   }
 
@@ -719,14 +694,15 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       private final FileSystem fs;
     }
 
-
     Context context;
-    List<ETLDir> dirs;
+    final List<ETLDir> dirs;
     List<HdfsFileStatusWithId> files;
-    boolean isOriginal;
-    List<DeltaMetaData> deltas;
-    boolean[] covered;
-    private List<Future<List<OrcSplit>>> splitFuturesRef;
+    private final List<DeltaMetaData> deltas;
+    private final boolean[] covered;
+    final boolean isOriginal;
+    // References to external fields for async SplitInfo generation.
+    private List<Future<List<OrcSplit>>> splitFuturesRef = null;
+    private List<OrcSplit> splitsRef = null;
     private final UserGroupInformation ugi;
     private final boolean allowSyntheticFileIds;
 
@@ -748,10 +724,19 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     public List<SplitInfo> getSplits() throws IOException {
       List<SplitInfo> result = new ArrayList<>(files.size());
       // Force local cache if we have deltas.
-      FooterCache cache = context.cacheStripeDetails ?
-          (deltas == null ? context.footerCache : Context.localCache) : null;
+      FooterCache cache = context.cacheStripeDetails ? ((deltas == null || deltas.isEmpty())
+          ? context.footerCache : Context.localCache) : null;
       if (cache != null) {
-        FileInfo[] infos = cache.getAndValidate(files);
+        FileInfo[] infos = new FileInfo[files.size()];
+        ByteBuffer[] ppdResults = null;
+        if (cache.hasPpd()) {
+          ppdResults = new ByteBuffer[files.size()];
+        }
+        try {
+          cache.getAndValidate(files, isOriginal, infos, ppdResults);
+        } catch (HiveException e) {
+          throw new IOException(e);
+        }
         int dirIx = -1, fileInDirIx = -1, filesInDirCount = 0;
         ETLDir dir = null;
         for (int i = 0; i < files.size(); ++i) {
@@ -760,15 +745,16 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
             filesInDirCount = dir.fileCount;
           }
           FileInfo info = infos[i];
+          ByteBuffer ppdResult = ppdResults == null ? null : ppdResults[i];
+          HdfsFileStatusWithId file = files.get(i);
           if (info != null) {
             // Cached copy is valid
             context.cacheHitCounter.incrementAndGet();
           }
-          HdfsFileStatusWithId file = files.get(i);
-          // ignore files of 0 length
-          if (file.getFileStatus().getLen() > 0) {
-            result.add(new SplitInfo(
-                context, dir.fs, file, info, isOriginal, deltas, true, dir.dir, covered));
+          // Ignore files eliminated by PPD, or of 0 length.
+          if (ppdResult != FooterCache.NO_SPLIT_AFTER_PPD && file.getFileStatus().getLen() > 0) {
+            result.add(new SplitInfo(context, dir.fs, file, info,
+                isOriginal, deltas, true, dir.dir, covered, ppdResult));
           }
         }
       } else {
@@ -781,8 +767,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
           }
           // ignore files of 0 length
           if (file.getFileStatus().getLen() > 0) {
-            result.add(new SplitInfo(
-                context, dir.fs, file, null, isOriginal, deltas, true, dir.dir, covered));
+            result.add(new SplitInfo(context, dir.fs, file, null,
+                isOriginal, deltas, true, dir.dir, covered, null));
           }
         }
       }
@@ -826,14 +812,15 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       return CombineResult.YES;
     }
 
-    public Future<Void> generateSplitWork(
-        Context context, List<Future<List<OrcSplit>>> splitFutures) throws IOException {
+    public Future<Void> generateSplitWork(Context context,
+        List<Future<List<OrcSplit>>> splitFutures, List<OrcSplit> splits) throws IOException {
       if ((context.cacheStripeDetails && context.footerCache.isBlocking())
           || context.forceThreadpool) {
         this.splitFuturesRef = splitFutures;
+        this.splitsRef = splits;
         return Context.threadPool.submit(this);
       } else {
-        runGetSplitsSync(splitFutures, null);
+        runGetSplitsSync(splitFutures, splits, null);
         return null;
       }
     }
@@ -841,14 +828,14 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     @Override
     public Void call() throws IOException {
       if (ugi == null) {
-        runGetSplitsSync(splitFuturesRef, null);
+        runGetSplitsSync(splitFuturesRef, splitsRef, null);
         return null;
       }
       try {
         return ugi.doAs(new PrivilegedExceptionAction<Void>() {
           @Override
           public Void run() throws Exception {
-            runGetSplitsSync(splitFuturesRef, ugi);
+            runGetSplitsSync(splitFuturesRef, splitsRef, ugi);
             return null;
           }
         });
@@ -857,20 +844,43 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       }
     }
 
+
+
+
+
     private void runGetSplitsSync(List<Future<List<OrcSplit>>> splitFutures,
-        UserGroupInformation ugi) throws IOException {
-      List<SplitInfo> splits = getSplits();
-      List<Future<List<OrcSplit>>> localList = new ArrayList<>(splits.size());
+        List<OrcSplit> splits, UserGroupInformation ugi) throws IOException {
       UserGroupInformation tpUgi = ugi == null ? UserGroupInformation.getCurrentUser() : ugi;
-      for (SplitInfo splitInfo : splits) {
-        localList.add(Context.threadPool.submit(
-            new SplitGenerator(splitInfo, tpUgi, allowSyntheticFileIds)));
+      List<SplitInfo> splitInfos = getSplits();
+      List<Future<List<OrcSplit>>> localListF = null;
+      List<OrcSplit> localListS = null;
+      for (SplitInfo splitInfo : splitInfos) {
+        SplitGenerator sg = new SplitGenerator(splitInfo, tpUgi, allowSyntheticFileIds);
+        if (!sg.isBlocking()) {
+          if (localListS == null) {
+            localListS = new ArrayList<>(splits.size());
+          }
+          // Already called in doAs, so no need to doAs here.
+          localListS.addAll(sg.call());
+        } else {
+          if (localListF == null) {
+            localListF = new ArrayList<>(splits.size());
+          }
+          localListF.add(Context.threadPool.submit(sg));
+        }
       }
-      synchronized (splitFutures) {
-        splitFutures.addAll(localList);
+      if (localListS != null) {
+        synchronized (splits) {
+          splits.addAll(localListS);
+        }
       }
-    }
-  }
+      if (localListF != null) {
+        synchronized (splitFutures) {
+          splitFutures.addAll(localListF);
+        }
+       }
+     }
+   }
 
   /**
    * BI strategy is used when the requirement is to spend less time in split generation
@@ -1018,7 +1028,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
         try {
           return SHIMS.listLocatedHdfsStatus(fs, base, AcidUtils.hiddenFileFilter);
         } catch (Throwable t) {
-          LOG.error("Failed to get files with ID; using regular API", t);
+          LOG.error("Failed to get files with ID; using regular API: " + t.getMessage());
         }
       }
 
@@ -1055,6 +1065,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     private OrcFile.WriterVersion writerVersion;
     private long projColsUncompressedSize;
     private final List<OrcSplit> deltaSplits;
+    private final ByteBuffer ppdResult;
     private final UserGroupInformation ugi;
     private final boolean allowSyntheticFileIds;
 
@@ -1075,6 +1086,11 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       this.projColsUncompressedSize = -1;
       this.deltaSplits = splitInfo.getSplits();
       this.allowSyntheticFileIds = allowSyntheticFileIds;
+      this.ppdResult = splitInfo.ppdResult;
+    }
+
+    public boolean isBlocking() {
+      return ppdResult != null;
     }
 
     Path getPath() {
@@ -1182,6 +1198,20 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
           fileMetaInfo, isOriginal, hasBase, deltas, scaledProjSize);
     }
 
+    private static final class OffsetAndLength { // Java cruft; pair of long.
+      public OffsetAndLength() {
+        this.offset = -1;
+        this.length = 0;
+      }
+
+      long offset, length;
+
+      @Override
+      public String toString() {
+        return "[offset=" + offset + ", length=" + length + "]";
+      }
+    }
+
     /**
      * Divide the adjacent stripes in the file into input splits based on the
      * block size and the configured minimum and maximum sizes.
@@ -1204,74 +1234,122 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     }
 
     private List<OrcSplit> callInternal() throws IOException {
-      populateAndCacheStripeDetails();
-      List<OrcSplit> splits = Lists.newArrayList();
-
-      // figure out which stripes we need to read
-      boolean[] includeStripe = null;
+      // Figure out which stripes we need to read.
+      if (ppdResult != null) {
+        assert deltaSplits.isEmpty();
+        assert ppdResult.hasArray();
+
+        // TODO: when PB is upgraded to 2.6, newInstance(ByteBuffer) method should be used here.
+        CodedInputStream cis = CodedInputStream.newInstance(
+            ppdResult.array(), ppdResult.arrayOffset(), ppdResult.remaining());
+        cis.setSizeLimit(InStream.PROTOBUF_MESSAGE_MAX_LIMIT);
+        return generateSplitsFromPpd(SplitInfos.parseFrom(cis));
+      } else {
+        populateAndCacheStripeDetails();
+        boolean[] includeStripe = null;
+        // We can't eliminate stripes if there are deltas because the
+        // deltas may change the rows making them match the predicate.
+        if ((deltas == null || deltas.isEmpty()) && context.sarg != null) {
+          String[] colNames = extractNeededColNames(types, context.conf, includedCols, isOriginal);
+          if (colNames == null) {
+            LOG.warn("Skipping split elimination for {} as column names is null", file.getPath());
+          } else {
+            includeStripe = pickStripes(context.sarg, colNames, writerVersion, isOriginal,
+                stripeStats, stripes.size(), file.getPath());
+          }
+        }
+        return generateSplitsFromStripes(includeStripe);
+      }
+    }
 
-      // we can't eliminate stripes if there are deltas because the
-      // deltas may change the rows making them match the predicate.
-      if ((deltas == null || deltas.isEmpty()) && context.sarg != null) {
-        String[] colNames = extractNeededColNames(types, context.conf, includedCols, isOriginal);
-        if (colNames == null) {
-          LOG.warn("Skipping split elimination for {} as column names is null", file.getPath());
-        } else {
-          includeStripe = pickStripes(context.sarg, colNames, writerVersion, isOriginal,
-              stripeStats, stripes.size(), file.getPath());
+    private List<OrcSplit> generateSplitsFromPpd(SplitInfos ppdResult) throws IOException {
+      OffsetAndLength current = new OffsetAndLength();
+      List<OrcSplit> splits = new ArrayList<>(ppdResult.getInfosCount());
+      int lastIdx = -1;
+      for (Metastore.SplitInfo si : ppdResult.getInfosList()) {
+        int index = si.getIndex();
+        if (lastIdx >= 0 && lastIdx + 1 != index && current.offset != -1) {
+          // Create split for the previous unfinished stripe.
+          splits.add(createSplit(current.offset, current.length, null));
+          current.offset = -1;
+        }
+        lastIdx = index;
+        String debugStr = null;
+        if (LOG.isDebugEnabled()) {
+          debugStr = current.toString();
+        }
+        current = generateOrUpdateSplit(splits, current, si.getOffset(), si.getLength(), null);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Updated split from {" + index + ": " + si.getOffset() + ", "
+              + si.getLength() + "} and "+ debugStr + " to " + current);
         }
       }
+      generateLastSplit(splits, current, null);
+      return splits;
+    }
 
+    private List<OrcSplit> generateSplitsFromStripes(boolean[] includeStripe) throws IOException {
+      List<OrcSplit> splits = new ArrayList<>(stripes.size());
       // if we didn't have predicate pushdown, read everything
       if (includeStripe == null) {
         includeStripe = new boolean[stripes.size()];
         Arrays.fill(includeStripe, true);
       }
 
-      long currentOffset = -1;
-      long currentLength = 0;
+      OffsetAndLength current = new OffsetAndLength();
       int idx = -1;
       for (StripeInformation stripe : stripes) {
         idx++;
 
         if (!includeStripe[idx]) {
           // create split for the previous unfinished stripe
-          if (currentOffset != -1) {
-            splits.add(createSplit(currentOffset, currentLength, fileMetaInfo));
-            currentOffset = -1;
+          if (current.offset != -1) {
+            splits.add(createSplit(current.offset, current.length, fileMetaInfo));
+            current.offset = -1;
           }
           continue;
         }
 
-        // if we are working on a stripe, over the min stripe size, and
-        // crossed a block boundary, cut the input split here.
-        if (currentOffset != -1 && currentLength > context.minSize &&
-            (currentOffset / blockSize != stripe.getOffset() / blockSize)) {
-          splits.add(createSplit(currentOffset, currentLength, fileMetaInfo));
-          currentOffset = -1;
-        }
-        // if we aren't building a split, start a new one.
-        if (currentOffset == -1) {
-          currentOffset = stripe.getOffset();
-          currentLength = stripe.getLength();
-        } else {
-          currentLength =
-              (stripe.getOffset() + stripe.getLength()) - currentOffset;
-        }
-        if (currentLength >= context.maxSize) {
-          splits.add(createSplit(currentOffset, currentLength, fileMetaInfo));
-          currentOffset = -1;
-        }
-      }
-      if (currentOffset != -1) {
-        splits.add(createSplit(currentOffset, currentLength, fileMetaInfo));
+        current = generateOrUpdateSplit(
+            splits, current, stripe.getOffset(), stripe.getLength(), fileMetaInfo);
       }
+      generateLastSplit(splits, current, fileMetaInfo);
 
-      // add uncovered ACID delta splits
+      // Add uncovered ACID delta splits.
       splits.addAll(deltaSplits);
       return splits;
     }
 
+    private OffsetAndLength generateOrUpdateSplit(
+        List<OrcSplit> splits, OffsetAndLength current, long offset,
+        long length, FileMetaInfo fileMetaInfo) throws IOException {
+      // if we are working on a stripe, over the min stripe size, and
+      // crossed a block boundary, cut the input split here.
+      if (current.offset != -1 && current.length > context.minSize &&
+          (current.offset / blockSize != offset / blockSize)) {
+        splits.add(createSplit(current.offset, current.length, fileMetaInfo));
+        current.offset = -1;
+      }
+      // if we aren't building a split, start a new one.
+      if (current.offset == -1) {
+        current.offset = offset;
+        current.length = length;
+      } else {
+        current.length = (offset + length) - current.offset;
+      }
+      if (current.length >= context.maxSize) {
+        splits.add(createSplit(current.offset, current.length, fileMetaInfo));
+        current.offset = -1;
+      }
+      return current;
+    }
+
+    private void generateLastSplit(List<OrcSplit> splits, OffsetAndLength current,
+        FileMetaInfo fileMetaInfo) throws IOException {
+      if (current.offset == -1) return;
+      splits.add(createSplit(current.offset, current.length, fileMetaInfo));
+    }
+
     private void populateAndCacheStripeDetails() throws IOException {
       // Only create OrcReader if we are missing some information.
       List<OrcProto.ColumnStatistics> colStatsLocal;
@@ -1290,7 +1368,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
           assert fileInfo.stripeStats != null && fileInfo.types != null
               && fileInfo.writerVersion != null;
           // We assume that if we needed to create a reader, we need to cache it to meta cache.
-          // TODO: This will also needlessly overwrite it in local cache for now.
+          // This will also needlessly overwrite it in local cache for now.
           context.footerCache.put(fsFileId, file, fileInfo.fileMetaInfo, orcReader);
         }
       } else {
@@ -1330,10 +1408,6 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     }
   }
 
-  static List<OrcSplit> generateSplitsInfo(Configuration conf)
-      throws IOException {
-    return generateSplitsInfo(conf, -1);
-  }
 
   /** Class intended to update two values from methods... Java-related cruft. */
   @VisibleForTesting
@@ -1342,14 +1416,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     long combineStartUs;
   }
 
-  static List<OrcSplit> generateSplitsInfo(Configuration conf, int numSplits)
+  static List<OrcSplit> generateSplitsInfo(Configuration conf, Context context)
       throws IOException {
-    // Use threads to resolve directories into splits.
-    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ORC_MS_FOOTER_CACHE_ENABLED)) {
-      // Create HiveConf once, since this is expensive.
-      conf = new HiveConf(conf, OrcInputFormat.class);
-    }
-    Context context = new Context(conf, numSplits);
     if (LOG.isInfoEnabled()) {
       LOG.info("ORC pushdown predicate: " + context.sarg);
     }
@@ -1391,7 +1459,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
         if (adi == null) {
           // We were combining SS-es and the time has expired.
           assert combinedCtx.combined != null;
-          scheduleSplits(combinedCtx.combined, context, splitFutures, strategyFutures);
+          scheduleSplits(combinedCtx.combined, context, splitFutures, strategyFutures, splits);
           combinedCtx.combined = null;
           continue;
         }
@@ -1409,7 +1477,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
         // Hack note - different split strategies return differently typed lists, yay Java.
         // This works purely by magic, because we know which strategy produces which type.
         if (splitStrategy instanceof ETLSplitStrategy) {
-          scheduleSplits((ETLSplitStrategy)splitStrategy, context, splitFutures, strategyFutures);
+          scheduleSplits((ETLSplitStrategy)splitStrategy,
+              context, splitFutures, strategyFutures, splits);
         } else {
           @SuppressWarnings("unchecked")
           List<OrcSplit> readySplits = (List<OrcSplit>)splitStrategy.getSplits();
@@ -1419,7 +1488,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
 
       // Run the last combined strategy, if any.
       if (combinedCtx != null && combinedCtx.combined != null) {
-        scheduleSplits(combinedCtx.combined, context, splitFutures, strategyFutures);
+        scheduleSplits(combinedCtx.combined, context, splitFutures, strategyFutures, splits);
         combinedCtx.combined = null;
       }
 
@@ -1452,10 +1521,18 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     return splits;
   }
 
+  @VisibleForTesting
+  // We could have this as a protected method w/no class, but half of Hive is static, so there.
+  public static class ContextFactory {
+    public Context create(Configuration conf, int numSplits) {
+      return new Context(conf, numSplits);
+    }
+  }
+
   private static void scheduleSplits(ETLSplitStrategy splitStrategy, Context context,
-      List<Future<List<OrcSplit>>> splitFutures, List<Future<Void>> strategyFutures)
-          throws IOException {
-    Future<Void> ssFuture = splitStrategy.generateSplitWork(context, splitFutures);
+      List<Future<List<OrcSplit>>> splitFutures, List<Future<Void>> strategyFutures,
+      List<OrcSplit> splits) throws IOException {
+    Future<Void> ssFuture = splitStrategy.generateSplitWork(context, splitFutures, splits);
     if (ssFuture == null) return;
     strategyFutures.add(ssFuture);
   }
@@ -1504,7 +1581,13 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     if (isDebugEnabled) {
       LOG.debug("getSplits started");
     }
-    List<OrcSplit> result = generateSplitsInfo(job, numSplits);
+    Configuration conf = job;
+    if (HiveConf.getBoolVar(job, HiveConf.ConfVars.HIVE_ORC_MS_FOOTER_CACHE_ENABLED)) {
+      // Create HiveConf once, since this is expensive.
+      conf = new HiveConf(conf, OrcInputFormat.class);
+    }
+    List<OrcSplit> result = generateSplitsInfo(conf,
+        new Context(conf, numSplits, createExternalCaches()));
     if (isDebugEnabled) {
       LOG.debug("getSplits finished");
     }
@@ -1517,10 +1600,10 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
    * Stores information relevant to split generation for an ORC File.
    *
    */
-  private static class FileInfo {
-    private final long modificationTime;
-    private final long size;
-    private final Long fileId;
+  static class FileInfo {
+    final long modificationTime;
+    final long size;
+    final Long fileId;
     private final List<StripeInformation> stripeInfos;
     private FileMetaInfo fileMetaInfo;
     private final List<StripeStatistics> stripeStats;
@@ -1898,196 +1981,16 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
    * Represents footer cache.
    */
   public interface FooterCache {
-    FileInfo[] getAndValidate(List<HdfsFileStatusWithId> files) throws IOException;
+    static final ByteBuffer NO_SPLIT_AFTER_PPD = ByteBuffer.wrap(new byte[0]);
+
+    void getAndValidate(List<HdfsFileStatusWithId> files, boolean isOriginal,
+        FileInfo[] result, ByteBuffer[] ppdResult) throws IOException, HiveException;
+    boolean hasPpd();
     boolean isBlocking();
     void put(Long fileId, FileStatus file, FileMetaInfo fileMetaInfo, Reader orcReader)
         throws IOException;
   }
 
-  /** Local footer cache using Guava. Stores convoluted Java objects. */
-  private static class LocalCache implements FooterCache {
-    private final Cache<Path, FileInfo> cache;
-
-    public LocalCache(int numThreads, int cacheStripeDetailsSize) {
-      cache = CacheBuilder.newBuilder()
-        .concurrencyLevel(numThreads)
-        .initialCapacity(cacheStripeDetailsSize)
-        .maximumSize(cacheStripeDetailsSize)
-        .softValues()
-        .build();
-    }
-
-    @Override
-    public FileInfo[] getAndValidate(List<HdfsFileStatusWithId> files) {
-      // TODO: should local cache also be by fileId? Preserve the original logic for now.
-      FileInfo[] result = new FileInfo[files.size()];
-      int i = -1;
-      for (HdfsFileStatusWithId fileWithId : files) {
-        ++i;
-        FileStatus file = fileWithId.getFileStatus();
-        Path path = file.getPath();
-        Long fileId = fileWithId.getFileId();
-        FileInfo fileInfo = cache.getIfPresent(path);
-        if (isDebugEnabled) {
-          LOG.debug("Info " + (fileInfo == null ? "not " : "") + "cached for path: " + path);
-        }
-        if (fileInfo == null) continue;
-        if ((fileId != null && fileInfo.fileId != null && fileId == fileInfo.fileId)
-            || (fileInfo.modificationTime == file.getModificationTime() &&
-            fileInfo.size == file.getLen())) {
-          result[i] = fileInfo;
-          continue;
-        }
-        // Invalidate
-        cache.invalidate(path);
-        if (isDebugEnabled) {
-          LOG.debug("Meta-Info for : " + path + " changed. CachedModificationTime: "
-              + fileInfo.modificationTime + ", CurrentModificationTime: "
-              + file.getModificationTime() + ", CachedLength: " + fileInfo.size
-              + ", CurrentLength: " + file.getLen());
-        }
-      }
-      return result;
-    }
-
-    public void put(Path path, FileInfo fileInfo) {
-      cache.put(path, fileInfo);
-    }
-
-    @Override
-    public void put(Long fileId, FileStatus file, FileMetaInfo fileMetaInfo, Reader orcReader)
-        throws IOException {
-      cache.put(file.getPath(), new FileInfo(file.getModificationTime(), file.getLen(),
-          orcReader.getStripes(), orcReader.getStripeStatistics(), orcReader.getTypes(),
-          orcReader.getOrcProtoFileStatistics(), fileMetaInfo, orcReader.getWriterVersion(),
-          fileId));
-    }
-
-    @Override
-    public boolean isBlocking() {
-      return false;
-    }
-  }
-
-  /** Metastore-based footer cache storing serialized footers. Also has a local cache. */
-  public static class MetastoreCache implements FooterCache {
-    private final LocalCache localCache;
-    private boolean isWarnLogged = false;
-    private HiveConf conf;
-
-    public MetastoreCache(LocalCache lc) {
-      localCache = lc;
-    }
-
-    @Override
-    public FileInfo[] getAndValidate(List<HdfsFileStatusWithId> files) throws IOException {
-      // First, check the local cache.
-      FileInfo[] result = localCache.getAndValidate(files);
-      assert result.length == files.size();
-      // This is an unfortunate consequence of batching/iterating thru MS results.
-      // TODO: maybe have a direct map call for small lists if this becomes a perf issue.
-      HashMap<Long, Integer> posMap = new HashMap<>(files.size());
-      for (int i = 0; i < result.length; ++i) {
-        if (result[i] != null) continue;
-        HdfsFileStatusWithId file = files.get(i);
-        Long fileId = file.getFileId();
-        if (fileId == null) {
-          if (!isWarnLogged || isDebugEnabled) {
-            LOG.warn("Not using metastore cache because fileId is missing: "
-                + file.getFileStatus().getPath());
-            isWarnLogged = true;
-          }
-          continue;
-        }
-        posMap.put(fileId, i);
-      }
-      Iterator<Entry<Long, ByteBuffer>> iter = null;
-      Hive hive;
-      try {
-        hive = getHive();
-        iter = hive.getFileMetadata(Lists.newArrayList(posMap.keySet()), conf).iterator();
-      } catch (HiveException ex) {
-        throw new IOException(ex);
-      }
-      List<Long> corruptIds = null;
-      while (iter.hasNext()) {
-        Entry<Long, ByteBuffer> e = iter.next();
-        int ix = posMap.get(e.getKey());
-        assert result[ix] == null;
-        HdfsFileStatusWithId file = files.get(ix);
-        assert file.getFileId() == e.getKey();
-        result[ix] = createFileInfoFromMs(file, e.getValue());
-        if (result[ix] == null) {
-          if (corruptIds == null) {
-            corruptIds = new ArrayList<>();
-          }
-          corruptIds.add(file.getFileId());
-        } else {
-          localCache.put(file.getFileStatus().getPath(), result[ix]);
-        }
-      }
-      if (corruptIds != null) {
-        try {
-          hive.clearFileMetadata(corruptIds);
-        } catch (HiveException ex) {
-          LOG.error("Failed to clear corrupt cache data", ex);
-        }
-      }
-      return result;
-    }
-
-    private Hive getHive() throws HiveException {
-      // TODO: we wish we could cache the Hive object, but it's not thread safe, and each
-      //       threadlocal we "cache" would need to be reinitialized for every query. This is
-      //       a huge PITA. Hive object will be cached internally, but the compat check will be
-      //       done every time inside get().
-      return Hive.getWithFastCheck(conf);
-    }
-
-    private static FileInfo createFileInfoFromMs(
-        HdfsFileStatusWithId file, ByteBuffer bb) throws IOException {
-      FileStatus fs = file.getFileStatus();
-      ReaderImpl.FooterInfo fi = null;
-      ByteBuffer original = bb.duplicate();
-      try {
-        fi = ReaderImpl.extractMetaInfoFromFooter(bb, fs.getPath());
-      } catch (Exception ex) {
-        byte[] data = new byte[original.remaining()];
-        System.arraycopy(original.array(), original.arrayOffset() + original.position(),
-            data, 0, data.length);
-        String msg = "Failed to parse the footer stored in cache for file ID "
-            + file.getFileId() + " " + original + " [ " + Hex.encodeHexString(data) + " ]";
-        LOG.error(msg, ex);
-        return null;
-      }
-      return new FileInfo(fs.getModificationTime(), fs.getLen(), fi.getStripes(), fi.getMetadata(),
-          fi.getFooter().getTypesList(), fi.getFooter().getStatisticsList(), fi.getFileMetaInfo(),
-          fi.getFileMetaInfo().writerVersion, file.getFileId());
-    }
-
-    @Override
-    public void put(Long fileId, FileStatus file, FileMetaInfo fileMetaInfo, Reader orcReader)
-        throws IOException {
-      localCache.put(fileId, file, fileMetaInfo, orcReader);
-      if (fileId != null) {
-        try {
-          getHive().putFileMetadata(Lists.newArrayList(fileId),
-              Lists.newArrayList(((ReaderImpl)orcReader).getSerializedFileFooter()));
-        } catch (HiveException e) {
-          throw new IOException(e);
-        }
-      }
-    }
-
-    public void configure(HiveConf queryConfig) {
-      this.conf = queryConfig;
-    }
-
-    @Override
-    public boolean isBlocking() {
-      return true;
-    }
-  }
   /**
    * Convert a Hive type property string that contains separated type names into a list of
    * TypeDescription objects.
@@ -2283,4 +2186,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     return result;
   }
 
+  @VisibleForTesting
+  protected ExternalFooterCachesByConf createExternalCaches() {
+    return null; // The default ones are created in case of null; tests override this.
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/868db42a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java
index 2782d7e..c4a7226 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java
@@ -25,6 +25,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.Context;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.InputFormat;
@@ -121,9 +123,8 @@ public class OrcNewInputFormat extends InputFormat<NullWritable, OrcStruct>{
     if (LOG.isDebugEnabled()) {
       LOG.debug("getSplits started");
     }
-    List<OrcSplit> splits =
-        OrcInputFormat.generateSplitsInfo(ShimLoader.getHadoopShims()
-        .getConfiguration(jobContext));
+    Configuration conf = ShimLoader.getHadoopShims().getConfiguration(jobContext);
+    List<OrcSplit> splits = OrcInputFormat.generateSplitsInfo(conf, createContext(conf, -1));
     List<InputSplit> result = new ArrayList<InputSplit>(splits.size());
     for(OrcSplit split: splits) {
       result.add(new OrcNewSplit(split));
@@ -134,4 +135,13 @@ public class OrcNewInputFormat extends InputFormat<NullWritable, OrcStruct>{
     return result;
   }
 
+  // Nearly C/P from OrcInputFormat; there are too many statics everywhere to sort this out.
+  private Context createContext(Configuration conf, int numSplits) {
+    // Use threads to resolve directories into splits.
+    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ORC_MS_FOOTER_CACHE_ENABLED)) {
+      // Create HiveConf once, since this is expensive.
+      conf = new HiveConf(conf, OrcInputFormat.class);
+    }
+    return new Context(conf, numSplits, null);
+  }
 }