You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/08/18 00:22:11 UTC

[6/7] hive git commit: HIVE-11542 : port fileId support on shims and splits from llap branch (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

HIVE-11542 : port fileId support on shims and splits from llap branch (Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3b6825b5
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3b6825b5
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3b6825b5

Branch: refs/heads/hbase-metastore
Commit: 3b6825b5b61e943e8e41743f5cbf6d640e0ebdf5
Parents: e059409
Author: Sergey Shelukhin <se...@apache.org>
Authored: Mon Aug 17 15:16:57 2015 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Mon Aug 17 15:16:57 2015 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   2 +
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 183 ++++++++++++++-----
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   |  97 +++++++---
 .../apache/hadoop/hive/ql/io/orc/OrcSplit.java  |  25 ++-
 .../hive/ql/txn/compactor/CompactorMR.java      |  13 +-
 .../hadoop/hive/ql/txn/compactor/Initiator.java |   9 +-
 .../apache/hadoop/hive/ql/io/TestAcidUtils.java |  27 +--
 .../hive/ql/io/orc/TestInputOutputFormat.java   |   6 +-
 .../hadoop/hive/shims/Hadoop20SShims.java       |  11 ++
 .../apache/hadoop/hive/shims/Hadoop23Shims.java |  66 +++++++
 .../apache/hadoop/hive/shims/HadoopShims.java   |  15 ++
 11 files changed, 348 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/3b6825b5/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 9a6781b..da171b1 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1024,6 +1024,8 @@ public class HiveConf extends Configuration {
         "data is read remotely (from the client or HS2 machine) and sent to all the tasks."),
     HIVE_ORC_CACHE_STRIPE_DETAILS_SIZE("hive.orc.cache.stripe.details.size", 10000,
         "Max cache size for keeping meta info about orc splits cached in the client."),
+    HIVE_ORC_INCLUDE_FILE_ID_IN_SPLITS("hive.orc.splits.include.fileid", true,
+        "Include file ID in splits on file systems thaty support it."),
     HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS("hive.orc.compute.splits.num.threads", 10,
         "How many threads orc should use to create splits in parallel."),
     HIVE_ORC_SKIP_CORRUPT_DATA("hive.exec.orc.skip.corrupt.data", false,

http://git-wip-us.apache.org/repos/asf/hive/blob/3b6825b5/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index c7e0780..30db513 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -28,6 +28,9 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
+
+import com.google.common.annotations.VisibleForTesting;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -221,7 +224,7 @@ public class AcidUtils {
      * Get the list of original files.
      * @return the list of original files (eg. 000000_0)
      */
-    List<FileStatus> getOriginalFiles();
+    List<HdfsFileStatusWithId> getOriginalFiles();
 
     /**
      * Get the list of base and delta directories that are valid and not
@@ -423,6 +426,20 @@ public class AcidUtils {
     return false;
   }
 
+  @VisibleForTesting
+  public static Directory getAcidState(Path directory,
+      Configuration conf,
+      ValidTxnList txnList
+      ) throws IOException {
+    return getAcidState(directory, conf, txnList, false);
+  }
+
+  /** State class for getChildState; cannot modify 2 things in a method. */
+  private static class TxnBase {
+    private FileStatus status;
+    private long txn;
+  }
+
   /**
    * Get the ACID state of the given directory. It finds the minimal set of
    * base and diff directories. Note that because major compactions don't
@@ -436,51 +453,40 @@ public class AcidUtils {
    */
   public static Directory getAcidState(Path directory,
                                        Configuration conf,
-                                       ValidTxnList txnList
+                                       ValidTxnList txnList,
+                                       boolean useFileIds
                                        ) throws IOException {
     FileSystem fs = directory.getFileSystem(conf);
-    FileStatus bestBase = null;
-    long bestBaseTxn = 0;
     final List<ParsedDelta> deltas = new ArrayList<ParsedDelta>();
     List<ParsedDelta> working = new ArrayList<ParsedDelta>();
     List<FileStatus> originalDirectories = new ArrayList<FileStatus>();
     final List<FileStatus> obsolete = new ArrayList<FileStatus>();
-    List<FileStatus> children = SHIMS.listLocatedStatus(fs, directory,
-        hiddenFileFilter);
-    for(FileStatus child: children) {
-      Path p = child.getPath();
-      String fn = p.getName();
-      if (fn.startsWith(BASE_PREFIX) && child.isDir()) {
-        long txn = parseBase(p);
-        if (bestBase == null) {
-          bestBase = child;
-          bestBaseTxn = txn;
-        } else if (bestBaseTxn < txn) {
-          obsolete.add(bestBase);
-          bestBase = child;
-          bestBaseTxn = txn;
-        } else {
-          obsolete.add(child);
-        }
-      } else if (fn.startsWith(DELTA_PREFIX) && child.isDir()) {
-        ParsedDelta delta = parseDelta(child);
-        if (txnList.isTxnRangeValid(delta.minTransaction,
-            delta.maxTransaction) !=
-            ValidTxnList.RangeResponse.NONE) {
-          working.add(delta);
-        }
-      } else {
-        // This is just the directory.  We need to recurse and find the actual files.  But don't
-        // do this until we have determined there is no base.  This saves time.  Plus,
-        // it is possible that the cleaner is running and removing these original files,
-        // in which case recursing through them could cause us to get an error.
-        originalDirectories.add(child);
+    List<HdfsFileStatusWithId> childrenWithId = null;
+    if (useFileIds) {
+      try {
+        childrenWithId = SHIMS.listLocatedHdfsStatus(fs, directory, hiddenFileFilter);
+      } catch (Throwable t) {
+        LOG.error("Failed to get files with ID; using regular API", t);
+        useFileIds = false;
+      }
+    }
+    TxnBase bestBase = new TxnBase();
+    final List<HdfsFileStatusWithId> original = new ArrayList<>();
+    if (childrenWithId != null) {
+      for (HdfsFileStatusWithId child : childrenWithId) {
+        getChildState(child.getFileStatus(), child, txnList, working,
+            originalDirectories, original, obsolete, bestBase);
+      }
+    } else {
+      List<FileStatus> children = SHIMS.listLocatedStatus(fs, directory, hiddenFileFilter);
+      for (FileStatus child : children) {
+        getChildState(
+            child, null, txnList, working, originalDirectories, original, obsolete, bestBase);
       }
     }
 
-    final List<FileStatus> original = new ArrayList<FileStatus>();
-    // if we have a base, the original files are obsolete.
-    if (bestBase != null) {
+    // If we have a base, the original files are obsolete.
+    if (bestBase.status != null) {
       // remove the entries so we don't get confused later and think we should
       // use them.
       original.clear();
@@ -488,12 +494,12 @@ public class AcidUtils {
       // Okay, we're going to need these originals.  Recurse through them and figure out what we
       // really need.
       for (FileStatus origDir : originalDirectories) {
-        findOriginals(fs, origDir, original);
+        findOriginals(fs, origDir, original, useFileIds);
       }
     }
 
     Collections.sort(working);
-    long current = bestBaseTxn;
+    long current = bestBase.txn;
     int lastStmtId = -1;
     for(ParsedDelta next: working) {
       if (next.maxTransaction > current) {
@@ -516,7 +522,7 @@ public class AcidUtils {
       }
     }
 
-    final Path base = bestBase == null ? null : bestBase.getPath();
+    final Path base = bestBase.status == null ? null : bestBase.status.getPath();
     LOG.debug("in directory " + directory.toUri().toString() + " base = " + base + " deltas = " +
         deltas.size());
 
@@ -528,7 +534,7 @@ public class AcidUtils {
       }
 
       @Override
-      public List<FileStatus> getOriginalFiles() {
+      public List<HdfsFileStatusWithId> getOriginalFiles() {
         return original;
       }
 
@@ -544,23 +550,100 @@ public class AcidUtils {
     };
   }
 
+  private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId,
+      ValidTxnList txnList, List<ParsedDelta> working, List<FileStatus> originalDirectories,
+      List<HdfsFileStatusWithId> original, List<FileStatus> obsolete, TxnBase bestBase) {
+    Path p = child.getPath();
+    String fn = p.getName();
+    if (fn.startsWith(BASE_PREFIX) && child.isDir()) {
+      long txn = parseBase(p);
+      if (bestBase.status == null) {
+        bestBase.status = child;
+        bestBase.txn = txn;
+      } else if (bestBase.txn < txn) {
+        obsolete.add(bestBase.status);
+        bestBase.status = child;
+        bestBase.txn = txn;
+      } else {
+        obsolete.add(child);
+      }
+    } else if (fn.startsWith(DELTA_PREFIX) && child.isDir()) {
+      ParsedDelta delta = parseDelta(child);
+      if (txnList.isTxnRangeValid(delta.minTransaction,
+          delta.maxTransaction) !=
+          ValidTxnList.RangeResponse.NONE) {
+        working.add(delta);
+      }
+    } else if (child.isDir()) {
+      // This is just the directory.  We need to recurse and find the actual files.  But don't
+      // do this until we have determined there is no base.  This saves time.  Plus,
+      // it is possible that the cleaner is running and removing these original files,
+      // in which case recursing through them could cause us to get an error.
+      originalDirectories.add(child);
+    } else {
+      original.add(createOriginalObj(childWithId, child));
+    }
+  }
+
+  public static HdfsFileStatusWithId createOriginalObj(
+      HdfsFileStatusWithId childWithId, FileStatus child) {
+    return childWithId != null ? childWithId : new HdfsFileStatusWithoutId(child);
+  }
+
+  private static class HdfsFileStatusWithoutId implements HdfsFileStatusWithId {
+    private FileStatus fs;
+
+    public HdfsFileStatusWithoutId(FileStatus fs) {
+      this.fs = fs;
+    }
+
+    @Override
+    public FileStatus getFileStatus() {
+      return fs;
+    }
+
+    @Override
+    public Long getFileId() {
+      return null;
+    }
+  }
+
   /**
-   * Find the original files (non-ACID layout) recursively under the partition
-   * directory.
+   * Find the original files (non-ACID layout) recursively under the partition directory.
    * @param fs the file system
-   * @param stat the file/directory to add
+   * @param stat the directory to add
    * @param original the list of original files
    * @throws IOException
    */
   private static void findOriginals(FileSystem fs, FileStatus stat,
-                                    List<FileStatus> original
-                                    ) throws IOException {
-    if (stat.isDir()) {
-      for(FileStatus child: SHIMS.listLocatedStatus(fs, stat.getPath(), hiddenFileFilter)) {
-        findOriginals(fs, child, original);
+      List<HdfsFileStatusWithId> original, boolean useFileIds) throws IOException {
+    assert stat.isDir();
+    List<HdfsFileStatusWithId> childrenWithId = null;
+    if (useFileIds) {
+      try {
+        childrenWithId = SHIMS.listLocatedHdfsStatus(fs, stat.getPath(), hiddenFileFilter);
+      } catch (Throwable t) {
+        LOG.error("Failed to get files with ID; using regular API", t);
+        useFileIds = false;
+      }
+    }
+    if (childrenWithId != null) {
+      for (HdfsFileStatusWithId child : childrenWithId) {
+        if (child.getFileStatus().isDir()) {
+          findOriginals(fs, child.getFileStatus(), original, useFileIds);
+        } else {
+          original.add(child);
+        }
       }
     } else {
-      original.add(stat);
+      List<FileStatus> children = SHIMS.listLocatedStatus(fs, stat.getPath(), hiddenFileFilter);
+      for (FileStatus child : children) {
+        if (child.isDir()) {
+          findOriginals(fs, child, original, useFileIds);
+        } else {
+          original.add(createOriginalObj(null, child));
+        }
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/3b6825b5/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 6ed7872..fd6d2ad 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -48,12 +48,14 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat;
+import org.apache.hadoop.hive.ql.io.AcidInputFormat.DeltaMetaData;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
 import org.apache.hadoop.hive.ql.io.InputFormatChecker;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
 import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.Context;
 import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
 import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
@@ -63,6 +65,7 @@ import org.apache.hadoop.hive.serde2.SerDeStats;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.shims.HadoopShims;
+import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
@@ -73,6 +76,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.StringUtils;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.Lists;
@@ -436,26 +440,34 @@ public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
   static final class SplitInfo extends ACIDSplitStrategy {
     private final Context context;
     private final FileSystem fs;
-    private final FileStatus file;
+    private final HdfsFileStatusWithId fileWithId;
     private final FileInfo fileInfo;
     private final boolean isOriginal;
     private final List<DeltaMetaData> deltas;
     private final boolean hasBase;
 
     SplitInfo(Context context, FileSystem fs,
-        FileStatus file, FileInfo fileInfo,
+        HdfsFileStatusWithId fileWithId, FileInfo fileInfo,
         boolean isOriginal,
         List<DeltaMetaData> deltas,
         boolean hasBase, Path dir, boolean[] covered) throws IOException {
       super(dir, context.numBuckets, deltas, covered);
       this.context = context;
       this.fs = fs;
-      this.file = file;
+      this.fileWithId = fileWithId;
       this.fileInfo = fileInfo;
       this.isOriginal = isOriginal;
       this.deltas = deltas;
       this.hasBase = hasBase;
     }
+
+    @VisibleForTesting
+    public SplitInfo(Context context, FileSystem fs, FileStatus fileStatus, FileInfo fileInfo,
+        boolean isOriginal, ArrayList<DeltaMetaData> deltas, boolean hasBase, Path dir,
+        boolean[] covered) throws IOException {
+      this(context, fs, AcidUtils.createOriginalObj(null, fileStatus),
+          fileInfo, isOriginal, deltas, hasBase, dir, covered);
+    }
   }
 
   /**
@@ -465,14 +477,15 @@ public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
   static final class ETLSplitStrategy implements SplitStrategy<SplitInfo> {
     Context context;
     FileSystem fs;
-    List<FileStatus> files;
+    List<HdfsFileStatusWithId> files;
     boolean isOriginal;
     List<DeltaMetaData> deltas;
     Path dir;
     boolean[] covered;
 
-    public ETLSplitStrategy(Context context, FileSystem fs, Path dir, List<FileStatus> children,
-        boolean isOriginal, List<DeltaMetaData> deltas, boolean[] covered) {
+    public ETLSplitStrategy(Context context, FileSystem fs, Path dir,
+        List<HdfsFileStatusWithId> children, boolean isOriginal, List<DeltaMetaData> deltas,
+        boolean[] covered) {
       this.context = context;
       this.dir = dir;
       this.fs = fs;
@@ -516,14 +529,15 @@ public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
     @Override
     public List<SplitInfo> getSplits() throws IOException {
       List<SplitInfo> result = Lists.newArrayList();
-      for (FileStatus file : files) {
+      for (HdfsFileStatusWithId file : files) {
         FileInfo info = null;
         if (context.cacheStripeDetails) {
-          info = verifyCachedFileInfo(file);
+          info = verifyCachedFileInfo(file.getFileStatus());
         }
         // ignore files of 0 length
-        if (file.getLen() > 0) {
-          result.add(new SplitInfo(context, fs, file, info, isOriginal, deltas, true, dir, covered));
+        if (file.getFileStatus().getLen() > 0) {
+          result.add(new SplitInfo(
+              context, fs, file, info, isOriginal, deltas, true, dir, covered));
         }
       }
       return result;
@@ -540,7 +554,7 @@ public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
    * as opposed to query execution (split generation does not read or cache file footers).
    */
   static final class BISplitStrategy extends ACIDSplitStrategy {
-    List<FileStatus> fileStatuses;
+    List<HdfsFileStatusWithId> fileStatuses;
     boolean isOriginal;
     List<DeltaMetaData> deltas;
     FileSystem fs;
@@ -548,7 +562,7 @@ public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
     Path dir;
 
     public BISplitStrategy(Context context, FileSystem fs,
-        Path dir, List<FileStatus> fileStatuses, boolean isOriginal,
+        Path dir, List<HdfsFileStatusWithId> fileStatuses, boolean isOriginal,
         List<DeltaMetaData> deltas, boolean[] covered) {
       super(dir, context.numBuckets, deltas, covered);
       this.context = context;
@@ -562,11 +576,12 @@ public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
     @Override
     public List<OrcSplit> getSplits() throws IOException {
       List<OrcSplit> splits = Lists.newArrayList();
-      for (FileStatus fileStatus : fileStatuses) {
+      for (HdfsFileStatusWithId file : fileStatuses) {
+        FileStatus fileStatus = file.getFileStatus();
         String[] hosts = SHIMS.getLocationsWithOffset(fs, fileStatus).firstEntry().getValue()
             .getHosts();
-        OrcSplit orcSplit = new OrcSplit(fileStatus.getPath(), 0, fileStatus.getLen(), hosts,
-            null, isOriginal, true, deltas, -1);
+        OrcSplit orcSplit = new OrcSplit(fileStatus.getPath(), file.getFileId(), 0,
+            fileStatus.getLen(), hosts, null, isOriginal, true, deltas, -1);
         splits.add(orcSplit);
       }
 
@@ -606,7 +621,7 @@ public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
       if (!deltas.isEmpty()) {
         for (int b = 0; b < numBuckets; ++b) {
           if (!covered[b]) {
-            splits.add(new OrcSplit(dir, b, 0, new String[0], null, false, false, deltas, -1));
+            splits.add(new OrcSplit(dir, null, b, 0, new String[0], null, false, false, deltas, -1));
           }
         }
       }
@@ -627,21 +642,23 @@ public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
     private final Context context;
     private final FileSystem fs;
     private final Path dir;
+    private final boolean useFileIds;
 
-    FileGenerator(Context context, FileSystem fs, Path dir) {
+    FileGenerator(Context context, FileSystem fs, Path dir, boolean useFileIds) {
       this.context = context;
       this.fs = fs;
       this.dir = dir;
+      this.useFileIds = useFileIds;
     }
 
     @Override
     public SplitStrategy call() throws IOException {
       final SplitStrategy splitStrategy;
       AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir,
-          context.conf, context.transactionList);
+          context.conf, context.transactionList, useFileIds);
       List<DeltaMetaData> deltas = AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories());
       Path base = dirInfo.getBaseDirectory();
-      List<FileStatus> original = dirInfo.getOriginalFiles();
+      List<HdfsFileStatusWithId> original = dirInfo.getOriginalFiles();
       boolean[] covered = new boolean[context.numBuckets];
       boolean isOriginal = base == null;
 
@@ -649,17 +666,16 @@ public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
       if (base != null || !original.isEmpty()) {
 
         // find the base files (original or new style)
-        List<FileStatus> children = original;
+        List<HdfsFileStatusWithId> children = original;
         if (base != null) {
-          children = SHIMS.listLocatedStatus(fs, base,
-              AcidUtils.hiddenFileFilter);
+          children = findBaseFiles(base, useFileIds);
         }
 
         long totalFileSize = 0;
-        for (FileStatus child : children) {
-          totalFileSize += child.getLen();
+        for (HdfsFileStatusWithId child : children) {
+          totalFileSize += child.getFileStatus().getLen();
           AcidOutputFormat.Options opts = AcidUtils.parseBaseBucketFilename
-              (child.getPath(), context.conf);
+              (child.getFileStatus().getPath(), context.conf);
           int b = opts.getBucket();
           // If the bucket is in the valid range, mark it as covered.
           // I wish Hive actually enforced bucketing all of the time.
@@ -700,6 +716,24 @@ public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
 
       return splitStrategy;
     }
+
+    private List<HdfsFileStatusWithId> findBaseFiles(
+        Path base, boolean useFileIds) throws IOException {
+      if (useFileIds) {
+        try {
+          return SHIMS.listLocatedHdfsStatus(fs, base, AcidUtils.hiddenFileFilter);
+        } catch (Throwable t) {
+          LOG.error("Failed to get files with ID; using regular API", t);
+        }
+      }
+      // Fall back to regular API and create states without ID.
+      List<FileStatus> children = SHIMS.listLocatedStatus(fs, base, AcidUtils.hiddenFileFilter);
+      List<HdfsFileStatusWithId> result = new ArrayList<>(children.size());
+      for (FileStatus child : children) {
+        result.add(AcidUtils.createOriginalObj(null, child));
+      }
+      return result;
+    }
   }
 
   /**
@@ -709,6 +743,7 @@ public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
   static final class SplitGenerator implements Callable<List<OrcSplit>> {
     private final Context context;
     private final FileSystem fs;
+    private final HdfsFileStatusWithId fileWithId;
     private final FileStatus file;
     private final long blockSize;
     private final TreeMap<Long, BlockLocation> locations;
@@ -728,8 +763,9 @@ public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
     public SplitGenerator(SplitInfo splitInfo) throws IOException {
       this.context = splitInfo.context;
       this.fs = splitInfo.fs;
-      this.file = splitInfo.file;
-      this.blockSize = file.getBlockSize();
+      this.fileWithId = splitInfo.fileWithId;
+      this.file = this.fileWithId.getFileStatus();
+      this.blockSize = this.file.getBlockSize();
       this.fileInfo = splitInfo.fileInfo;
       locations = SHIMS.getLocationsWithOffset(fs, file);
       this.isOriginal = splitInfo.isOriginal;
@@ -837,8 +873,8 @@ public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
       final double splitRatio = (double) length / (double) fileLen;
       final long scaledProjSize = projColsUncompressedSize > 0 ?
           (long) (splitRatio * projColsUncompressedSize) : fileLen;
-      return new OrcSplit(file.getPath(), offset, length, hosts, fileMetaInfo,
-          isOriginal, hasBase, deltas, scaledProjSize);
+      return new OrcSplit(file.getPath(), fileWithId.getFileId(), offset, length, hosts,
+          fileMetaInfo, isOriginal, hasBase, deltas, scaledProjSize);
     }
 
     /**
@@ -1020,9 +1056,10 @@ public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
     List<Future<?>> splitFutures = Lists.newArrayList();
 
     // multi-threaded file statuses and split strategy
+    boolean useFileIds = HiveConf.getBoolVar(conf, ConfVars.HIVE_ORC_INCLUDE_FILE_ID_IN_SPLITS);
     for (Path dir : getInputPaths(conf)) {
       FileSystem fs = dir.getFileSystem(conf);
-      FileGenerator fileGenerator = new FileGenerator(context, fs, dir);
+      FileGenerator fileGenerator = new FileGenerator(context, fs, dir, useFileIds);
       pathFutures.add(context.threadPool.submit(fileGenerator));
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/3b6825b5/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
index 8cf4cc0..cc03df7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
@@ -25,6 +25,8 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -39,6 +41,8 @@ import org.apache.hadoop.mapred.FileSplit;
  *
  */
 public class OrcSplit extends FileSplit {
+  private static final Log LOG = LogFactory.getLog(OrcSplit.class);
+
   private ReaderImpl.FileMetaInfo fileMetaInfo;
   private boolean hasFooter;
   private boolean isOriginal;
@@ -46,7 +50,9 @@ public class OrcSplit extends FileSplit {
   private final List<AcidInputFormat.DeltaMetaData> deltas = new ArrayList<>();
   private OrcFile.WriterVersion writerVersion;
   private long projColsUncompressedSize;
+  private transient Long fileId;
 
+  static final int HAS_FILEID_FLAG = 8;
   static final int BASE_FLAG = 4;
   static final int ORIGINAL_FLAG = 2;
   static final int FOOTER_FLAG = 1;
@@ -58,10 +64,13 @@ public class OrcSplit extends FileSplit {
     super(null, 0, 0, (String[]) null);
   }
 
-  public OrcSplit(Path path, long offset, long length, String[] hosts,
+  public OrcSplit(Path path, Long fileId, long offset, long length, String[] hosts,
       ReaderImpl.FileMetaInfo fileMetaInfo, boolean isOriginal, boolean hasBase,
       List<AcidInputFormat.DeltaMetaData> deltas, long projectedDataSize) {
     super(path, offset, length, hosts);
+    // We could avoid serializing file ID and just replace the path with inode-based path.
+    // However, that breaks bunch of stuff because Hive later looks up things by split path.
+    this.fileId = fileId;
     this.fileMetaInfo = fileMetaInfo;
     hasFooter = this.fileMetaInfo != null;
     this.isOriginal = isOriginal;
@@ -77,7 +86,8 @@ public class OrcSplit extends FileSplit {
 
     int flags = (hasBase ? BASE_FLAG : 0) |
         (isOriginal ? ORIGINAL_FLAG : 0) |
-        (hasFooter ? FOOTER_FLAG : 0);
+        (hasFooter ? FOOTER_FLAG : 0) |
+        (fileId != null ? HAS_FILEID_FLAG : 0);
     out.writeByte(flags);
     out.writeInt(deltas.size());
     for(AcidInputFormat.DeltaMetaData delta: deltas) {
@@ -99,6 +109,9 @@ public class OrcSplit extends FileSplit {
           footerBuff.limit() - footerBuff.position());
       WritableUtils.writeVInt(out, fileMetaInfo.writerVersion.getId());
     }
+    if (fileId != null) {
+      out.writeLong(fileId.longValue());
+    }
   }
 
   @Override
@@ -110,6 +123,7 @@ public class OrcSplit extends FileSplit {
     hasFooter = (FOOTER_FLAG & flags) != 0;
     isOriginal = (ORIGINAL_FLAG & flags) != 0;
     hasBase = (BASE_FLAG & flags) != 0;
+    boolean hasFileId = (HAS_FILEID_FLAG & flags) != 0;
 
     deltas.clear();
     int numDeltas = in.readInt();
@@ -134,6 +148,9 @@ public class OrcSplit extends FileSplit {
       fileMetaInfo = new ReaderImpl.FileMetaInfo(compressionType, bufferSize,
           metadataSize, footerBuff, writerVersion);
     }
+    if (hasFileId) {
+      fileId = in.readLong();
+    }
   }
 
   ReaderImpl.FileMetaInfo getFileMetaInfo(){
@@ -159,4 +176,8 @@ public class OrcSplit extends FileSplit {
   public long getProjectedColumnsUncompressedSize() {
     return projColsUncompressedSize;
   }
+
+  public Long getFileId() {
+    return fileId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/3b6825b5/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 8e431b2..02fa725 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
@@ -133,7 +134,8 @@ public class CompactorMR {
     // and discovering that in getSplits is too late as we then have no way to pass it to our
     // mapper.
 
-    AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, txns);
+    AcidUtils.Directory dir = AcidUtils.getAcidState(
+        new Path(sd.getLocation()), conf, txns, false);
     StringableList dirsToSearch = new StringableList();
     Path baseDir = null;
     if (isMajor) {
@@ -141,12 +143,13 @@ public class CompactorMR {
       // partition is just now being converted to ACID.
       baseDir = dir.getBaseDirectory();
       if (baseDir == null) {
-        List<FileStatus> originalFiles = dir.getOriginalFiles();
+        List<HdfsFileStatusWithId> originalFiles = dir.getOriginalFiles();
         if (!(originalFiles == null) && !(originalFiles.size() == 0)) {
           // There are original format files
-          for (FileStatus stat : originalFiles) {
-            dirsToSearch.add(stat.getPath());
-            LOG.debug("Adding original file " + stat.getPath().toString() + " to dirs to search");
+          for (HdfsFileStatusWithId stat : originalFiles) {
+            Path path = stat.getFileStatus().getPath();
+            dirsToSearch.add(path);
+            LOG.debug("Adding original file " + path + " to dirs to search");
           }
           // Set base to the location so that the input format reads the original files.
           baseDir = new Path(sd.getLocation());

http://git-wip-us.apache.org/repos/asf/hive/blob/3b6825b5/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 73715c6..9bf725d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
 import org.apache.hadoop.hive.metastore.txn.TxnHandler;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 
@@ -223,7 +224,7 @@ public class Initiator extends CompactorThread {
     boolean noBase = false;
     Path location = new Path(sd.getLocation());
     FileSystem fs = location.getFileSystem(conf);
-    AcidUtils.Directory dir = AcidUtils.getAcidState(location, conf, txns);
+    AcidUtils.Directory dir = AcidUtils.getAcidState(location, conf, txns, false);
     Path base = dir.getBaseDirectory();
     long baseSize = 0;
     FileStatus stat = null;
@@ -236,9 +237,9 @@ public class Initiator extends CompactorThread {
       baseSize = sumDirSize(fs, base);
     }
 
-    List<FileStatus> originals = dir.getOriginalFiles();
-    for (FileStatus origStat : originals) {
-      baseSize += origStat.getLen();
+    List<HdfsFileStatusWithId> originals = dir.getOriginalFiles();
+    for (HdfsFileStatusWithId origStat : originals) {
+      baseSize += origStat.getFileStatus().getLen();
     }
 
     long deltaSize = 0;

http://git-wip-us.apache.org/repos/asf/hive/blob/3b6825b5/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
index f8ded12..b6ba862 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat;
 import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFile;
 import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFileSystem;
 import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockPath;
+import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
 import org.junit.Test;
 
 import java.util.List;
@@ -102,13 +103,14 @@ public class TestAcidUtils {
     assertEquals(null, dir.getBaseDirectory());
     assertEquals(0, dir.getCurrentDirectories().size());
     assertEquals(0, dir.getObsolete().size());
-    List<FileStatus> result = dir.getOriginalFiles();
+    List<HdfsFileStatusWithId> result = dir.getOriginalFiles();
     assertEquals(5, result.size());
-    assertEquals("mock:/tbl/part1/000000_0", result.get(0).getPath().toString());
-    assertEquals("mock:/tbl/part1/000001_1", result.get(1).getPath().toString());
-    assertEquals("mock:/tbl/part1/000002_0", result.get(2).getPath().toString());
-    assertEquals("mock:/tbl/part1/random", result.get(3).getPath().toString());
-    assertEquals("mock:/tbl/part1/subdir/000000_0", result.get(4).getPath().toString());
+    assertEquals("mock:/tbl/part1/000000_0", result.get(0).getFileStatus().getPath().toString());
+    assertEquals("mock:/tbl/part1/000001_1", result.get(1).getFileStatus().getPath().toString());
+    assertEquals("mock:/tbl/part1/000002_0", result.get(2).getFileStatus().getPath().toString());
+    assertEquals("mock:/tbl/part1/random", result.get(3).getFileStatus().getPath().toString());
+    assertEquals("mock:/tbl/part1/subdir/000000_0",
+        result.get(4).getFileStatus().getPath().toString());
   }
 
   @Test
@@ -136,13 +138,14 @@ public class TestAcidUtils {
         obsolete.get(0).getPath().toString());
     assertEquals("mock:/tbl/part1/delta_029_029",
         obsolete.get(1).getPath().toString());
-    List<FileStatus> result = dir.getOriginalFiles();
+    List<HdfsFileStatusWithId> result = dir.getOriginalFiles();
     assertEquals(5, result.size());
-    assertEquals("mock:/tbl/part1/000000_0", result.get(0).getPath().toString());
-    assertEquals("mock:/tbl/part1/000001_1", result.get(1).getPath().toString());
-    assertEquals("mock:/tbl/part1/000002_0", result.get(2).getPath().toString());
-    assertEquals("mock:/tbl/part1/random", result.get(3).getPath().toString());
-    assertEquals("mock:/tbl/part1/subdir/000000_0", result.get(4).getPath().toString());
+    assertEquals("mock:/tbl/part1/000000_0", result.get(0).getFileStatus().getPath().toString());
+    assertEquals("mock:/tbl/part1/000001_1", result.get(1).getFileStatus().getPath().toString());
+    assertEquals("mock:/tbl/part1/000002_0", result.get(2).getFileStatus().getPath().toString());
+    assertEquals("mock:/tbl/part1/random", result.get(3).getFileStatus().getPath().toString());
+    assertEquals("mock:/tbl/part1/subdir/000000_0",
+        result.get(4).getFileStatus().getPath().toString());
     List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories();
     assertEquals(2, deltas.size());
     AcidUtils.ParsedDelta delt = deltas.get(0);

http://git-wip-us.apache.org/repos/asf/hive/blob/3b6825b5/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
index 0c12c89..547e799 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
@@ -483,7 +483,7 @@ public class TestInputOutputFormat {
           final OrcInputFormat.Context context = new OrcInputFormat.Context(
               conf, n);
           OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(
-              context, fs, new MockPath(fs, "mock:/a/b"));
+              context, fs, new MockPath(fs, "mock:/a/b"), false);
           final SplitStrategy splitStrategy = gen.call();
           assertTrue(
               String.format(
@@ -507,7 +507,7 @@ public class TestInputOutputFormat {
         new MockFile("mock:/a/b/part-04", 1000, new byte[0]));
     OrcInputFormat.FileGenerator gen =
       new OrcInputFormat.FileGenerator(context, fs,
-          new MockPath(fs, "mock:/a/b"));
+          new MockPath(fs, "mock:/a/b"), false);
     SplitStrategy splitStrategy = gen.call();
     assertEquals(true, splitStrategy instanceof OrcInputFormat.BISplitStrategy);
 
@@ -520,7 +520,7 @@ public class TestInputOutputFormat {
         new MockFile("mock:/a/b/.part-03", 1000, new byte[1000]),
         new MockFile("mock:/a/b/part-04", 1000, new byte[1000]));
     gen = new OrcInputFormat.FileGenerator(context, fs,
-            new MockPath(fs, "mock:/a/b"));
+            new MockPath(fs, "mock:/a/b"), false);
     splitStrategy = gen.call();
     assertEquals(true, splitStrategy instanceof OrcInputFormat.ETLSplitStrategy);
 

http://git-wip-us.apache.org/repos/asf/hive/blob/3b6825b5/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
----------------------------------------------------------------------
diff --git a/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java b/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
index ffffcb7..a56309f 100644
--- a/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
+++ b/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
@@ -722,4 +722,15 @@ public class Hadoop20SShims extends HadoopShimsSecure {
     Token<?> fsToken = fs.getDelegationToken(uname);
     cred.addToken(fsToken.getService(), fsToken);
   }
+
+  @Override
+  public List<HdfsFileStatusWithId> listLocatedHdfsStatus(
+      FileSystem fs, Path path, PathFilter filter) throws IOException {
+    throw new UnsupportedOperationException("Not supported on old version");
+  }
+
+  @Override
+  public long getFileId(FileSystem fs, String path) throws IOException {
+    throw new UnsupportedOperationException("Not supported on old version");
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/3b6825b5/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
----------------------------------------------------------------------
diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
index 9eae0ac..e5be8d6 100644
--- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
+++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
@@ -61,10 +61,13 @@ import org.apache.hadoop.fs.permission.AclEntryType;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
 import org.apache.hadoop.hdfs.client.HdfsAdmin;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.hadoop.io.LongWritable;
@@ -662,6 +665,64 @@ public class Hadoop23Shims extends HadoopShimsSecure {
     return result;
   }
 
+  private static final class HdfsFileStatusWithIdImpl implements HdfsFileStatusWithId {
+    private final LocatedFileStatus lfs;
+    private final long fileId;
+
+    public HdfsFileStatusWithIdImpl(LocatedFileStatus lfs, long fileId) {
+      this.lfs = lfs;
+      this.fileId = fileId;
+    }
+
+    @Override
+    public FileStatus getFileStatus() {
+      return lfs;
+    }
+
+    @Override
+    public Long getFileId() {
+      return fileId;
+    }
+  }
+
+  @Override
+  public List<HdfsFileStatusWithId> listLocatedHdfsStatus(
+      FileSystem fs, Path p, PathFilter filter) throws IOException {
+    DistributedFileSystem dfs = ensureDfs(fs);
+    DFSClient dfsc = dfs.getClient();
+    final String src = p.toUri().getPath();
+    DirectoryListing current = dfsc.listPaths(src,
+        org.apache.hadoop.hdfs.protocol.HdfsFileStatus.EMPTY_NAME, true);
+    if (current == null) { // the directory does not exist
+      throw new FileNotFoundException("File " + p + " does not exist.");
+    }
+    final URI fsUri = fs.getUri();
+    List<HdfsFileStatusWithId> result = new ArrayList<HdfsFileStatusWithId>(
+        current.getPartialListing().length);
+    while (current != null) {
+      org.apache.hadoop.hdfs.protocol.HdfsFileStatus[] hfss = current.getPartialListing();
+      for (int i = 0; i < hfss.length; ++i) {
+        HdfsLocatedFileStatus next = (HdfsLocatedFileStatus)(hfss[i]);
+        if (filter != null) {
+          Path filterPath = next.getFullPath(p).makeQualified(fsUri, null);
+          if (!filter.accept(filterPath)) continue;
+        }
+        LocatedFileStatus lfs = next.makeQualifiedLocated(fsUri, p);
+        result.add(new HdfsFileStatusWithIdImpl(lfs, next.getFileId()));
+      }
+      current = current.hasMore() ? dfsc.listPaths(src, current.getLastName(), true) : null;
+    }
+    return result;
+  }
+
+  private DistributedFileSystem ensureDfs(FileSystem fs) {
+    if (!(fs instanceof DistributedFileSystem)) {
+      throw new UnsupportedOperationException("Only supported for DFS; got " + fs.getClass());
+    }
+    DistributedFileSystem dfs = (DistributedFileSystem)fs;
+    return dfs;
+  }
+
   @Override
   public BlockLocation[] getLocations(FileSystem fs,
                                       FileStatus status) throws IOException {
@@ -1352,4 +1413,9 @@ public class Hadoop23Shims extends HadoopShimsSecure {
     // Use method addDelegationTokens instead of getDelegationToken to get all the tokens including KMS.
     fs.addDelegationTokens(uname, cred);
   }
+
+  @Override
+  public long getFileId(FileSystem fs, String path) throws IOException {
+    return ensureDfs(fs).getClient().getFileInfo(path).getFileId();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/3b6825b5/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
----------------------------------------------------------------------
diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
index 74785e5..2b6f322 100644
--- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
+++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
@@ -256,6 +256,10 @@ public interface HadoopShims {
   List<FileStatus> listLocatedStatus(FileSystem fs, Path path,
                                      PathFilter filter) throws IOException;
 
+
+  List<HdfsFileStatusWithId> listLocatedHdfsStatus(
+      FileSystem fs, Path path, PathFilter filter) throws IOException;
+
   /**
    * For file status returned by listLocatedStatus, convert them into a list
    * of block locations.
@@ -316,6 +320,11 @@ public interface HadoopShims {
     public void debugLog();
   }
 
+  public interface HdfsFileStatusWithId {
+    public FileStatus getFileStatus();
+    public Long getFileId();
+  }
+
   public HCatHadoopShims getHCatShim();
   public interface HCatHadoopShims {
 
@@ -731,4 +740,10 @@ public interface HadoopShims {
    * @throws IOException If an error occurred on adding the token.
    */
   public void addDelegationTokens(FileSystem fs, Credentials cred, String uname) throws IOException;
+
+  /**
+   * Gets file ID. Only supported on hadoop-2.
+   * @return inode ID of the file.
+   */
+  long getFileId(FileSystem fs, String path) throws IOException;
 }