You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/03/15 21:50:26 UTC

[2/2] hive git commit: HIVE-12995 : LLAP: Synthetic file ids need collision checks (Sergey Shelukhin, reviewed by Gopal V)

HIVE-12995 : LLAP: Synthetic file ids need collision checks (Sergey Shelukhin, reviewed by Gopal V)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/26b5c7b5
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/26b5c7b5
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/26b5c7b5

Branch: refs/heads/master
Commit: 26b5c7b56a4f28ce3eabc0207566cce46b29b558
Parents: 4458b1a
Author: Sergey Shelukhin <se...@apache.org>
Authored: Tue Mar 15 13:49:35 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Tue Mar 15 13:50:14 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   2 +
 .../llap/IncrementalObjectSizeEstimator.java    |   7 +-
 .../apache/hadoop/hive/llap/cache/Cache.java    |  27 ---
 .../hadoop/hive/llap/cache/LowLevelCache.java   |   4 +-
 .../hive/llap/cache/LowLevelCacheImpl.java      |  28 +--
 .../hadoop/hive/llap/cache/NoopCache.java       |  33 ----
 .../hive/llap/io/api/impl/LlapIoImpl.java       |   6 +-
 .../llap/io/decode/EncodedDataConsumer.java     |  77 +-------
 .../llap/io/decode/OrcColumnVectorProducer.java |   8 +-
 .../llap/io/decode/OrcEncodedDataConsumer.java  |   2 +-
 .../llap/io/encoded/OrcEncodedDataReader.java   | 183 ++++---------------
 .../hive/llap/io/metadata/OrcFileMetadata.java  |  33 ++--
 .../hive/llap/io/metadata/OrcMetadataCache.java |  12 +-
 .../llap/io/metadata/OrcStripeMetadata.java     |  19 +-
 orc/src/java/org/apache/orc/FileMetadata.java   |   2 +-
 .../org/apache/hadoop/hive/ql/io/HdfsUtils.java |  18 +-
 .../hadoop/hive/ql/io/SyntheticFileId.java      | 100 ++++++++++
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   |  97 ++++++----
 .../apache/hadoop/hive/ql/io/orc/OrcSplit.java  |  46 +++--
 .../ql/io/orc/encoded/EncodedReaderImpl.java    |  32 ++--
 .../hive/ql/io/orc/encoded/OrcBatchKey.java     |  20 +-
 .../hive/ql/io/orc/encoded/OrcCacheKey.java     |  58 ------
 .../hadoop/hive/ql/io/orc/encoded/Reader.java   |  10 +-
 .../hive/ql/io/orc/encoded/ReaderImpl.java      |   4 +-
 .../hive/ql/io/orc/encoded/StreamUtils.java     |   1 -
 .../hive/ql/io/orc/TestInputOutputFormat.java   |  18 +-
 .../apache/hadoop/hive/common/io/DataCache.java |   4 +-
 .../common/io/encoded/EncodedColumnBatch.java   |   9 +-
 28 files changed, 335 insertions(+), 525 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/26b5c7b5/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index c992433..9fd6648 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1223,6 +1223,8 @@ public class HiveConf extends Configuration {
         "metastore calls if metastore metadata cache is used."),
     HIVE_ORC_INCLUDE_FILE_ID_IN_SPLITS("hive.orc.splits.include.fileid", true,
         "Include file ID in splits on file systems thaty support it."),
+    HIVE_ORC_ALLOW_SYNTHETIC_FILE_ID_IN_SPLITS("hive.orc.splits.allow.synthetic.fileid", true,
+        "Allow synthetic file ID in splits on file systems that don't have a native one."),
     HIVE_ORC_CACHE_STRIPE_DETAILS_SIZE("hive.orc.cache.stripe.details.size", 10000,
         "Max cache size for keeping meta info about orc splits cached in the client."),
     HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS("hive.orc.compute.splits.num.threads", 10,

http://git-wip-us.apache.org/repos/asf/hive/blob/26b5c7b5/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java
index d33f724..7d68294 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/IncrementalObjectSizeEstimator.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hive.llap;
 
+import com.google.common.collect.Lists;
+import com.google.protobuf.UnknownFieldSet;
 import java.lang.reflect.AccessibleObject;
 import java.lang.reflect.Array;
 import java.lang.reflect.Field;
@@ -35,13 +37,11 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.hive.llap.IncrementalObjectSizeEstimator.ObjectEstimator;
 import org.apache.hadoop.hive.llap.cache.LlapCacheableBuffer;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
 import org.apache.hadoop.hive.ql.util.JavaDataModel;
 
-import com.google.common.collect.Lists;
-import com.google.protobuf.UnknownFieldSet;
-
 /**
  * Creates size estimators for java objects. The estimators attempt to do most of the reflection
  * work at initialization time, and also take some shortcuts, to minimize the amount of work done
@@ -622,6 +622,7 @@ public class IncrementalObjectSizeEstimator {
     } catch (ClassNotFoundException e) {
       // Ignore and hope for the best.
       LlapIoImpl.LOG.warn("Cannot find " + className);
+      return;
     }
     IncrementalObjectSizeEstimator.createEstimators(clazz, sizeEstimators);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/26b5c7b5/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java
deleted file mode 100644
index cee23a9..0000000
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.cache;
-
-import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
-
-/** Dummy interface for now, might be different. */
-public interface Cache<CacheKey> {
-  public ColumnStreamData[] cacheOrGet(CacheKey key, ColumnStreamData[] value);
-  public ColumnStreamData[] get(CacheKey key);
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/26b5c7b5/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
index 17d9fdf..1b61a6e 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
@@ -49,7 +49,7 @@ public interface LowLevelCache {
    *    Some sort of InvalidCacheChunk could be placed to avoid them. TODO
    * @param base base offset for the ranges (stripe/stream offset in case of ORC).
    */
-  DiskRangeList getFileData(long fileId, DiskRangeList range, long baseOffset,
+  DiskRangeList getFileData(Object fileKey, DiskRangeList range, long baseOffset,
       DiskRangeListFactory factory, LowLevelCacheCounters qfCounters, BooleanRef gotAllData);
 
   /**
@@ -57,6 +57,6 @@ public interface LowLevelCache {
    * @return null if all data was put; bitmask indicating which chunks were not put otherwise;
    *         the replacement chunks from cache are updated directly in the array.
    */
-  long[] putFileData(long fileId, DiskRange[] ranges, MemoryBuffer[] chunks,
+  long[] putFileData(Object fileKey, DiskRange[] ranges, MemoryBuffer[] chunks,
       long baseOffset, Priority priority, LowLevelCacheCounters qfCounters);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/26b5c7b5/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
index 1132171..a60fed3 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
@@ -43,8 +43,8 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
   private final EvictionAwareAllocator allocator;
   private final AtomicInteger newEvictions = new AtomicInteger(0);
   private Thread cleanupThread = null;
-  private final ConcurrentHashMap<Long, FileCache> cache =
-      new ConcurrentHashMap<Long, FileCache>();
+  private final ConcurrentHashMap<Object, FileCache> cache =
+      new ConcurrentHashMap<Object, FileCache>();
   private final LowLevelCachePolicy cachePolicy;
   private final long cleanupInterval;
   private final LlapDaemonCacheMetrics metrics;
@@ -75,11 +75,11 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
   }
 
   @Override
-  public DiskRangeList getFileData(long fileId, DiskRangeList ranges, long baseOffset,
+  public DiskRangeList getFileData(Object fileKey, DiskRangeList ranges, long baseOffset,
       DiskRangeListFactory factory, LowLevelCacheCounters qfCounters, BooleanRef gotAllData) {
     if (ranges == null) return null;
     DiskRangeList prev = ranges.prev;
-    FileCache subCache = cache.get(fileId);
+    FileCache subCache = cache.get(fileKey);
     if (subCache == null || !subCache.incRef()) {
       long totalMissed = ranges.getTotalLength();
       metrics.incrCacheRequestedBytes(totalMissed);
@@ -232,11 +232,11 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
   }
 
   @Override
-  public long[] putFileData(long fileId, DiskRange[] ranges, MemoryBuffer[] buffers,
+  public long[] putFileData(Object fileKey, DiskRange[] ranges, MemoryBuffer[] buffers,
       long baseOffset, Priority priority, LowLevelCacheCounters qfCounters) {
     long[] result = null;
     assert buffers.length == ranges.length;
-    FileCache subCache = getOrAddFileSubCache(fileId);
+    FileCache subCache = getOrAddFileSubCache(fileKey);
     try {
       for (int i = 0; i < ranges.length; ++i) {
         LlapDataBuffer buffer = (LlapDataBuffer)buffers[i];
@@ -260,7 +260,7 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
           }
           if (DebugUtils.isTraceCachingEnabled()) {
             LlapIoImpl.LOG.info("Trying to cache when the chunk is already cached for "
-                + fileId + "@" + offset  + " (base " + baseOffset + "); old " + oldVal
+                + fileKey + "@" + offset  + " (base " + baseOffset + "); old " + oldVal
                 + ", new " + buffer);
           }
           if (DebugUtils.isTraceLockingEnabled()) {
@@ -301,10 +301,10 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
    * All this mess is necessary because we want to be able to remove sub-caches for fully
    * evicted files. It may actually be better to have non-nested map with object keys?
    */
-  private FileCache getOrAddFileSubCache(long fileId) {
+  private FileCache getOrAddFileSubCache(Object fileKey) {
     FileCache newSubCache = null;
     while (true) { // Overwhelmingly executes once.
-      FileCache subCache = cache.get(fileId);
+      FileCache subCache = cache.get(fileKey);
       if (subCache != null) {
         if (subCache.incRef()) return subCache; // Main path - found it, incRef-ed it.
         if (newSubCache == null) {
@@ -312,7 +312,7 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
           newSubCache.incRef();
         }
         // Found a stale value we cannot incRef; try to replace it with new value.
-        if (cache.replace(fileId, subCache, newSubCache)) return newSubCache;
+        if (cache.replace(fileKey, subCache, newSubCache)) return newSubCache;
         continue; // Someone else replaced/removed a stale value, try again.
       }
       // No value found.
@@ -320,11 +320,11 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
         newSubCache = new FileCache();
         newSubCache.incRef();
       }
-      FileCache oldSubCache = cache.putIfAbsent(fileId, newSubCache);
+      FileCache oldSubCache = cache.putIfAbsent(fileKey, newSubCache);
       if (oldSubCache == null) return newSubCache; // Main path 2 - created a new file cache.
       if (oldSubCache.incRef()) return oldSubCache; // Someone created one in parallel.
       // Someone created one in parallel and then it went stale.
-      if (cache.replace(fileId, oldSubCache, newSubCache)) return newSubCache;
+      if (cache.replace(fileKey, oldSubCache, newSubCache)) return newSubCache;
       // Someone else replaced/removed a parallel-added stale value, try again. Max confusion.
     }
   }
@@ -463,7 +463,7 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
       // Iterate thru all the filecaches. This is best-effort.
       // If these super-long-lived iterators affect the map in some bad way,
       // we'd need to sleep once per round instead.
-      Iterator<Map.Entry<Long, FileCache>> iter = cache.entrySet().iterator();
+      Iterator<Map.Entry<Object, FileCache>> iter = cache.entrySet().iterator();
       boolean isPastEndTime = false;
       while (iter.hasNext()) {
         FileCache fc = iter.next().getValue();
@@ -516,7 +516,7 @@ public class LowLevelCacheImpl implements LowLevelCache, BufferUsageManager, Lla
   @Override
   public String debugDumpForOom() {
     StringBuilder sb = new StringBuilder("File cache state ");
-    for (Map.Entry<Long, FileCache> e : cache.entrySet()) {
+    for (Map.Entry<Object, FileCache> e : cache.entrySet()) {
       if (!e.getValue().incRef()) continue;
       try {
         sb.append("\n  file " + e.getKey());

http://git-wip-us.apache.org/repos/asf/hive/blob/26b5c7b5/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java
deleted file mode 100644
index d0461e8..0000000
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.cache;
-
-import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
-
-public class NoopCache<CacheKey> implements Cache<CacheKey> {
-  @Override
-  public ColumnStreamData[] cacheOrGet(CacheKey key, ColumnStreamData[] value) {
-    return value;
-  }
-
-  @Override
-  public ColumnStreamData[] get(CacheKey key) {
-    return null;  // TODO: ensure real implementation increases refcount
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/26b5c7b5/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index d2c1907..dbee823 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.hive.common.io.Allocator;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.cache.BuddyAllocator;
 import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
-import org.apache.hadoop.hive.llap.cache.Cache;
 import org.apache.hadoop.hive.llap.cache.EvictionAwareAllocator;
 import org.apache.hadoop.hive.llap.cache.EvictionDispatcher;
 import org.apache.hadoop.hive.llap.cache.LowLevelCacheImpl;
@@ -50,7 +49,6 @@ import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonQueueMetrics;
 import org.apache.hadoop.hive.llap.metrics.MetricsUtils;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.io.orc.encoded.OrcCacheKey;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.metrics2.util.MBeans;
@@ -91,8 +89,6 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
     LOG.info("Started llap daemon metrics with displayName: " + displayName +
         " sessionId: " + sessionId);
 
-    Cache<OrcCacheKey> cache = null; // High-level cache is not implemented or supported.
-
     OrcMetadataCache metadataCache = null;
     LowLevelCacheImpl orcCache = null;
     BufferUsageManager bufferManager = null;
@@ -131,7 +127,7 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
 
     // TODO: this should depends on input format and be in a map, or something.
     this.cvp = new OrcColumnVectorProducer(
-        metadataCache, orcCache, bufferManager, cache, conf, cacheMetrics, queueMetrics);
+        metadataCache, orcCache, bufferManager, conf, cacheMetrics, queueMetrics);
     if (LOGL.isInfoEnabled()) {
       LOG.info("LLAP IO initialized");
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/26b5c7b5/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
index b81e97d..137acb0 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
@@ -17,9 +17,6 @@
  */
 package org.apache.hadoop.hive.llap.io.decode;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
 import java.util.concurrent.Callable;
 
 import org.apache.hadoop.hive.common.Pool;
@@ -30,14 +27,9 @@ import org.apache.hadoop.hive.llap.metrics.LlapDaemonQueueMetrics;
 import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
 import org.apache.hive.common.util.FixedSizedObjectPool;
 
-/**
- *
- */
 public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedColumnBatch<BatchKey>>
   implements Consumer<BatchType>, ReadPipeline {
   private volatile boolean isStopped = false;
-  // TODO: use array, precreate array based on metadata first? Works for ORC. For now keep dumb.
-  private final HashMap<BatchKey, BatchType> pendingData = new HashMap<>();
   private ConsumerFeedback<BatchType> upstreamFeedback;
   private final Consumer<ColumnVectorBatch> downstreamConsumer;
   private Callable<Void> readCallable;
@@ -76,50 +68,15 @@ public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedCol
 
   @Override
   public void consumeData(BatchType data) {
-    // TODO: data arrives in whole batches now, not in columns. We could greatly simplify this.
-    BatchType targetBatch = null;
-    boolean localIsStopped = false;
-    Integer targetBatchVersion = null;
-    synchronized (pendingData) {
-      localIsStopped = isStopped;
-      if (!localIsStopped) {
-        targetBatch = pendingData.get(data.getBatchKey());
-        if (targetBatch == null) {
-          targetBatch = data;
-          pendingData.put(data.getBatchKey(), data);
-        }
-        // We have the map locked; the code the throws things away from map only bumps the version
-        // under the same map lock; code the throws things away here only bumps the version when
-        // the batch was taken out of the map.
-        targetBatchVersion = targetBatch.version;
-      }
-      queueMetrics.setQueueSize(pendingData.size());
-    }
-    if (localIsStopped) {
-      returnSourceData(data);
-      return;
-    }
-    assert targetBatchVersion != null;
-    synchronized (targetBatch) {
-      if (targetBatch != data) {
-        throw new UnsupportedOperationException("Merging is not supported");
-      }
-      synchronized (pendingData) {
-        targetBatch = isStopped ? null : pendingData.remove(data.getBatchKey());
-        // Check if someone already threw this away and changed the version.
-        localIsStopped = (targetBatchVersion != targetBatch.version);
-      }
-      // We took the batch out of the map. No more contention with stop possible.
-    }
-    if (localIsStopped && (targetBatch != data)) {
+    if (isStopped) {
       returnSourceData(data);
       return;
     }
     long start = System.currentTimeMillis();
-    decodeBatch(targetBatch, downstreamConsumer);
+    decodeBatch(data, downstreamConsumer);
     long end = System.currentTimeMillis();
     queueMetrics.addProcessingTime(end - start);
-    returnSourceData(targetBatch);
+    returnSourceData(data);
   }
 
   /**
@@ -127,7 +84,6 @@ public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedCol
    * of the ECB in question; or, if ECB is still in pendingData, pendingData must be locked.
    */
   private void returnSourceData(BatchType data) {
-    ++data.version;
     upstreamFeedback.returnData(data);
   }
 
@@ -136,19 +92,12 @@ public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedCol
 
   @Override
   public void setDone() {
-    synchronized (pendingData) {
-      if (!pendingData.isEmpty()) {
-        throw new AssertionError("Not all data has been sent downstream: " + pendingData.size());
-      }
-    }
     downstreamConsumer.setDone();
   }
 
-
   @Override
   public void setError(Throwable t) {
     downstreamConsumer.setError(t);
-    dicardPendingData(false);
   }
 
   @Override
@@ -156,28 +105,10 @@ public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedCol
     cvbPool.offer(data);
   }
 
-  private void dicardPendingData(boolean isStopped) {
-    List<BatchType> batches = new ArrayList<BatchType>(
-        pendingData.size());
-    synchronized (pendingData) {
-      if (isStopped) {
-        this.isStopped = true;
-      }
-      for (BatchType ecb : pendingData.values()) {
-        ++ecb.version;
-        batches.add(ecb);
-      }
-      pendingData.clear();
-    }
-    for (BatchType batch : batches) {
-      upstreamFeedback.returnData(batch);
-    }
-  }
-
   @Override
   public void stop() {
     upstreamFeedback.stop();
-    dicardPendingData(true);
+    this.isStopped = true;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/26b5c7b5/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
index 18191da..37fc8d0 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
@@ -23,7 +23,6 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
-import org.apache.hadoop.hive.llap.cache.Cache;
 import org.apache.hadoop.hive.llap.cache.LowLevelCache;
 import org.apache.hadoop.hive.llap.cache.LowLevelCacheImpl;
 import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
@@ -34,14 +33,12 @@ import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonQueueMetrics;
 import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
-import org.apache.hadoop.hive.ql.io.orc.encoded.OrcCacheKey;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.mapred.FileSplit;
 
 public class OrcColumnVectorProducer implements ColumnVectorProducer {
 
   private final OrcMetadataCache metadataCache;
-  private final Cache<OrcCacheKey> cache;
   private final LowLevelCache lowLevelCache;
   private final BufferUsageManager bufferManager;
   private final Configuration conf;
@@ -50,7 +47,7 @@ public class OrcColumnVectorProducer implements ColumnVectorProducer {
   private LlapDaemonQueueMetrics queueMetrics;
 
   public OrcColumnVectorProducer(OrcMetadataCache metadataCache,
-      LowLevelCacheImpl lowLevelCache, BufferUsageManager bufferManager, Cache<OrcCacheKey> cache,
+      LowLevelCacheImpl lowLevelCache, BufferUsageManager bufferManager,
       Configuration conf, LlapDaemonCacheMetrics metrics, LlapDaemonQueueMetrics queueMetrics) {
     if (LlapIoImpl.LOGL.isInfoEnabled()) {
       LlapIoImpl.LOG.info("Initializing ORC column vector producer");
@@ -59,7 +56,6 @@ public class OrcColumnVectorProducer implements ColumnVectorProducer {
     this.metadataCache = metadataCache;
     this.lowLevelCache = lowLevelCache;
     this.bufferManager = bufferManager;
-    this.cache = cache;
     this.conf = conf;
     this._skipCorrupt = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ORC_SKIP_CORRUPT_DATA);
     this.cacheMetrics = metrics;
@@ -74,7 +70,7 @@ public class OrcColumnVectorProducer implements ColumnVectorProducer {
     cacheMetrics.incrCacheReadRequests();
     OrcEncodedDataConsumer edc = new OrcEncodedDataConsumer(consumer, columnIds.size(),
         _skipCorrupt, counters, queueMetrics);
-    OrcEncodedDataReader reader = new OrcEncodedDataReader(lowLevelCache, bufferManager, cache,
+    OrcEncodedDataReader reader = new OrcEncodedDataReader(lowLevelCache, bufferManager,
         metadataCache, conf, split, columnIds, sarg, columnNames, edc, counters);
     edc.init(reader, reader);
     return edc;

http://git-wip-us.apache.org/repos/asf/hive/blob/26b5c7b5/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
index 28cae87..7ee263d 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
@@ -54,6 +54,7 @@ public class OrcEncodedDataConsumer
       Consumer<ColumnVectorBatch> consumer, int colCount, boolean skipCorrupt,
       QueryFragmentCounters counters, LlapDaemonQueueMetrics queueMetrics) {
     super(consumer, colCount, queueMetrics);
+    // TODO: get rid of this
     this.skipCorrupt = skipCorrupt;
     this.counters = counters;
   }
@@ -62,7 +63,6 @@ public class OrcEncodedDataConsumer
     assert fileMetadata == null;
     fileMetadata = f;
     stripes = new OrcStripeMetadata[f.getStripes().size()];
-    // TODO: get rid of this
     codec = WriterImpl.createCodec(fileMetadata.getCompressionKind());
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/26b5c7b5/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index 8111c6d..eb251a8 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -44,7 +44,6 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.ConsumerFeedback;
 import org.apache.hadoop.hive.llap.DebugUtils;
 import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
-import org.apache.hadoop.hive.llap.cache.Cache;
 import org.apache.hadoop.hive.llap.cache.LowLevelCache;
 import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
 import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
@@ -71,7 +70,6 @@ import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
 import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedOrcFile;
 import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReader;
 import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey;
-import org.apache.hadoop.hive.ql.io.orc.encoded.OrcCacheKey;
 import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
 import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.PoolFactory;
 import org.apache.hadoop.hive.ql.io.orc.RecordReaderUtils;
@@ -90,7 +88,7 @@ import org.apache.tez.common.CallableWithNdc;
  * consumer. It also serves as ConsumerFeedback that receives processed EncodedColumnBatch-es.
  */
 public class OrcEncodedDataReader extends CallableWithNdc<Void>
-    implements ConsumerFeedback<OrcEncodedColumnBatch>, Consumer<OrcEncodedColumnBatch> {
+    implements ConsumerFeedback<OrcEncodedColumnBatch> {
   private static final Logger LOG = LoggerFactory.getLogger(OrcEncodedDataReader.class);
   public static final FixedSizedObjectPool<ColumnStreamData> CSD_POOL =
       new FixedSizedObjectPool<>(8192, new PoolObjectHelper<ColumnStreamData>() {
@@ -135,7 +133,6 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
   private final LowLevelCache lowLevelCache;
   private final BufferUsageManager bufferManager;
   private final Configuration conf;
-  private final Cache<OrcCacheKey> cache;
   private final FileSplit split;
   private List<Integer> columnIds;
   private final SearchArgument sarg;
@@ -150,7 +147,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
   private Reader orcReader;
   private MetadataReader metadataReader;
   private EncodedReader stripeReader;
-  private Long fileId;
+  private Object fileKey;
   private FileSystem fs;
   /**
    * readState[stripeIx'][colIx'] => boolean array (could be a bitmask) of rg-s that need to be
@@ -162,13 +159,12 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
   private volatile boolean isPaused = false;
 
   public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager bufferManager,
-      Cache<OrcCacheKey> cache, OrcMetadataCache metadataCache, Configuration conf,
-      FileSplit split, List<Integer> columnIds, SearchArgument sarg, String[] columnNames,
-      OrcEncodedDataConsumer consumer, QueryFragmentCounters counters) {
+      OrcMetadataCache metadataCache, Configuration conf, FileSplit split, List<Integer> columnIds,
+      SearchArgument sarg, String[] columnNames, OrcEncodedDataConsumer consumer,
+      QueryFragmentCounters counters) {
     this.lowLevelCache = lowLevelCache;
     this.metadataCache = metadataCache;
     this.bufferManager = bufferManager;
-    this.cache = cache;
     this.conf = conf;
     this.split = split;
     this.columnIds = columnIds;
@@ -230,10 +226,10 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     // 1. Get file metadata from cache, or create the reader and read it.
     // Don't cache the filesystem object for now; Tez closes it and FS cache will fix all that
     fs = split.getPath().getFileSystem(conf);
-    fileId = determineFileId(fs, split,
+    fileKey = determineFileId(fs, split,
         HiveConf.getBoolVar(conf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID));
     counters.setDesc(QueryFragmentCounters.Desc.FILE, split.getPath()
-        + (fileId == null ? "" : " (" + fileId + ")"));
+        + (fileKey == null ? "" : " (" + fileKey + ")"));
 
     try {
       fileMetadata = getOrReadFileMetadata();
@@ -307,27 +303,13 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     //    read for every stripe (null means read all of them - the usual path). In any case,
     //    readState will be modified for column x rgs that were fetched from high-level cache.
     List<Integer>[] stripeColsToRead = null;
-    if (cache != null) {
-      try {
-        stripeColsToRead = produceDataFromCache(stride);
-      } catch (Throwable t) {
-        // produceDataFromCache handles its own cleanup.
-        consumer.setError(t);
-        cleanupReaders();
-        recordReaderTime(startTime);
-        return null;
-      }
-    }
 
     // 5. Create encoded data reader.
-    // In case if we have high-level cache, we will intercept the data and add it there;
-    // otherwise just pass the data directly to the consumer.
-    Consumer<OrcEncodedColumnBatch> dataConsumer = (cache == null) ? this.consumer : this;
     try {
       ensureOrcReader();
       // Reader creating updates HDFS counters, don't do it here.
       DataWrapperForOrc dw = new DataWrapperForOrc();
-      stripeReader = orcReader.encodedReader(fileId, dw, dw, POOL_FACTORY);
+      stripeReader = orcReader.encodedReader(fileKey, dw, dw, POOL_FACTORY);
       stripeReader.setDebugTracing(DebugUtils.isTraceOrcEnabled());
     } catch (Throwable t) {
       consumer.setError(t);
@@ -338,9 +320,8 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
 
     // 6. Read data.
     // TODO: I/O threadpool could be here - one thread per stripe; for now, linear.
-    boolean hasFileId = this.fileId != null;
-    long fileId = hasFileId ? this.fileId : 0;
-    OrcBatchKey stripeKey = hasFileId ? new OrcBatchKey(fileId, -1, 0) : null;
+    boolean hasFileId = this.fileKey != null;
+    OrcBatchKey stripeKey = hasFileId ? new OrcBatchKey(fileKey, -1, 0) : null;
     for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) {
       if (processStop()) {
         cleanupReaders();
@@ -369,7 +350,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
         if (colRgs.length > 0 && colRgs[0] == SargApplier.READ_NO_RGS) continue;
 
         // 6.1. Determine the columns to read (usually the same as requested).
-        if (cache == null || cols == null || cols.size() == colRgs.length) {
+        if (cols == null || cols.size() == colRgs.length) {
           cols = columnIds;
           stripeIncludes = globalIncludes;
         } else {
@@ -393,7 +374,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
             counters.incrCounter(LlapIOCounters.METADATA_CACHE_MISS);
             ensureMetadataReader();
             long startTimeHdfs = counters.startTimeCounter();
-            stripeMetadata = new OrcStripeMetadata(new OrcBatchKey(fileId, stripeIx, 0),
+            stripeMetadata = new OrcStripeMetadata(new OrcBatchKey(fileKey, stripeIx, 0),
                 metadataReader, stripe, stripeIncludes, sargColumns);
             counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTimeHdfs);
             if (hasFileId && metadataCache != null) {
@@ -439,7 +420,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
         // data it receives for one stripe. We could probably interrupt it, if it checked that.
         stripeReader.readEncodedColumns(stripeIx, stripe, stripeMetadata.getRowIndexes(),
             stripeMetadata.getEncodings(), stripeMetadata.getStreams(), stripeIncludes,
-            colRgs, dataConsumer);
+            colRgs, consumer);
       } catch (Throwable t) {
         consumer.setError(t);
         cleanupReaders();
@@ -450,7 +431,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
 
     // Done with all the things.
     recordReaderTime(startTime);
-    dataConsumer.setDone();
+    consumer.setDone();
     if (DebugUtils.isTraceMttEnabled()) {
       LlapIoImpl.LOG.info("done processing " + split);
     }
@@ -525,12 +506,12 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     return true;
   }
 
-  private static Long determineFileId(FileSystem fs, FileSplit split,
+  private static Object determineFileId(FileSystem fs, FileSplit split,
       boolean allowSynthetic) throws IOException {
     if (split instanceof OrcSplit) {
-      Long fileId = ((OrcSplit)split).getFileId();
-      if (fileId != null) {
-        return fileId;
+      Object fileKey = ((OrcSplit)split).getFileKey();
+      if (fileKey != null) {
+        return fileKey;
       }
     }
     LOG.warn("Split for " + split.getPath() + " (" + split.getClass() + ") does not have file ID");
@@ -600,8 +581,8 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
   private void ensureOrcReader() throws IOException {
     if (orcReader != null) return;
     Path path = split.getPath();
-    if (fileId != null && HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_USE_FILEID_PATH)) {
-      path = HdfsUtils.getFileIdPath(fs, path, fileId);
+    if (fileKey instanceof Long && HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_USE_FILEID_PATH)) {
+      path = HdfsUtils.getFileIdPath(fs, path, (long)fileKey);
     }
     if (DebugUtils.isTraceOrcEnabled()) {
       LOG.info("Creating reader for " + path + " (" + split.getPath() + ")");
@@ -617,8 +598,8 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
    */
   private OrcFileMetadata getOrReadFileMetadata() throws IOException {
     OrcFileMetadata metadata = null;
-    if (fileId != null && metadataCache != null) {
-      metadata = metadataCache.getFileMetadata(fileId);
+    if (fileKey != null && metadataCache != null) {
+      metadata = metadataCache.getFileMetadata(fileKey);
       if (metadata != null) {
         counters.incrCounter(LlapIOCounters.METADATA_CACHE_HIT);
         return metadata;
@@ -628,8 +609,8 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     }
     ensureOrcReader();
     // We assume this call doesn't touch HDFS because everything is already read; don't add time.
-    metadata = new OrcFileMetadata(fileId != null ? fileId : 0, orcReader);
-    if (fileId == null || metadataCache == null) return metadata;
+    metadata = new OrcFileMetadata(fileKey, orcReader);
+    if (fileKey == null || metadataCache == null) return metadata;
     return metadataCache.putFileMetadata(metadata);
   }
 
@@ -639,9 +620,8 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
   private ArrayList<OrcStripeMetadata> readStripesMetadata(
       boolean[] globalInc, boolean[] sargColumns) throws IOException {
     ArrayList<OrcStripeMetadata> result = new ArrayList<OrcStripeMetadata>(readState.length);
-    boolean hasFileId = this.fileId != null;
-    long fileId = hasFileId ? this.fileId : 0;
-    OrcBatchKey stripeKey = hasFileId ? new OrcBatchKey(fileId, 0, 0) : null;
+    boolean hasFileId = this.fileKey != null;
+    OrcBatchKey stripeKey = hasFileId ? new OrcBatchKey(fileKey, 0, 0) : null;
     for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) {
       OrcStripeMetadata value = null;
       int stripeIx = stripeIxMod + stripeIxFrom;
@@ -655,7 +635,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
         StripeInformation si = fileMetadata.getStripes().get(stripeIx);
         if (value == null) {
           long startTime = counters.startTimeCounter();
-          value = new OrcStripeMetadata(new OrcBatchKey(fileId, stripeIx, 0),
+          value = new OrcStripeMetadata(new OrcBatchKey(fileKey, stripeIx, 0),
               metadataReader, si, globalInc, sargColumns);
           counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime);
           if (hasFileId && metadataCache != null) {
@@ -836,105 +816,6 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     readState = new boolean[stripeIxTo - stripeIxFrom][][];
   }
 
-  // TODO: split by stripe? we do everything by stripe, and it might be faster
-  /**
-   * Takes the data from high-level cache for all stripes and returns to consumer.
-   * @return List of columns to read per stripe, if any columns were fully eliminated by cache.
-   */
-  private List<Integer>[] produceDataFromCache(int rowIndexStride) throws IOException {
-    OrcCacheKey key = new OrcCacheKey(fileId, -1, -1, -1);
-    // For each stripe, keep a list of columns that are not fully in cache (null => all of them).
-    @SuppressWarnings("unchecked")
-    List<Integer>[] stripeColsNotInCache = new List[readState.length];
-    for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) {
-      key.stripeIx = stripeIxFrom + stripeIxMod;
-      boolean[][] cols = readState[stripeIxMod];
-      boolean[] isMissingAnyRgs = new boolean[cols.length];
-      int totalRgCount = getRgCount(fileMetadata.getStripes().get(key.stripeIx), rowIndexStride);
-      for (int rgIx = 0; rgIx < totalRgCount; ++rgIx) {
-        OrcEncodedColumnBatch col = ECB_POOL.take();
-        col.init(fileId, key.stripeIx, rgIx, cols.length);
-        boolean hasAnyCached = false;
-        try {
-          key.rgIx = rgIx;
-          for (int colIxMod = 0; colIxMod < cols.length; ++colIxMod) {
-            boolean[] readMask = cols[colIxMod];
-            // Check if RG is eliminated by SARG
-            if ((readMask == SargApplier.READ_NO_RGS) || (readMask != SargApplier.READ_ALL_RGS
-                && (readMask.length <= rgIx || !readMask[rgIx]))) continue;
-            key.colIx = columnIds.get(colIxMod);
-            ColumnStreamData[] cached = cache.get(key);
-            if (cached == null) {
-              isMissingAnyRgs[colIxMod] = true;
-              continue;
-            }
-            assert cached.length == OrcEncodedColumnBatch.MAX_DATA_STREAMS;
-            col.setAllStreamsData(colIxMod, key.colIx, cached);
-            hasAnyCached = true;
-            if (readMask == SargApplier.READ_ALL_RGS) {
-              // We were going to read all RGs, but some were in cache, allocate the mask.
-              cols[colIxMod] = readMask = new boolean[totalRgCount];
-              Arrays.fill(readMask, true);
-            }
-            readMask[rgIx] = false; // Got from cache, don't read from disk.
-          }
-        } catch (Throwable t) {
-          // TODO: Any cleanup needed to release data in col back to cache should be here.
-          throw (t instanceof IOException) ? (IOException)t : new IOException(t);
-        }
-        if (hasAnyCached) {
-          consumer.consumeData(col);
-        }
-      }
-      boolean makeStripeColList = false; // By default assume we'll fetch all original columns.
-      for (int colIxMod = 0; colIxMod < cols.length; ++colIxMod) {
-        if (isMissingAnyRgs[colIxMod]) {
-          if (makeStripeColList) {
-            stripeColsNotInCache[stripeIxMod].add(columnIds.get(colIxMod));
-          }
-        } else if (!makeStripeColList) {
-          // Some columns were fully in cache. Make a per-stripe col list, add previous columns.
-          makeStripeColList = true;
-          stripeColsNotInCache[stripeIxMod] = new ArrayList<Integer>(cols.length - 1);
-          for (int i = 0; i < colIxMod; ++i) {
-            stripeColsNotInCache[stripeIxMod].add(columnIds.get(i));
-          }
-        }
-      }
-    }
-    return stripeColsNotInCache;
-  }
-
-  @Override
-  public void setDone() {
-    consumer.setDone();
-  }
-
-  @Override
-  public void consumeData(OrcEncodedColumnBatch data) {
-    // Store object in cache; create new key object - cannot be reused.
-    assert cache != null;
-    throw new UnsupportedOperationException("not implemented");
-    /*for (int i = 0; i < data.getColumnData().length; ++i) {
-      OrcCacheKey key = new OrcCacheKey(data.getBatchKey(), data.getColumnIxs()[i]);
-      ColumnStreamData[] toCache = data.getColumnData()[i];
-      ColumnStreamData[] cached = cache.cacheOrGet(key, toCache);
-      if (toCache != cached) {
-        for (ColumnStreamData sb : toCache) {
-          if (sb.decRef() != 0) continue;
-          lowLevelCache.releaseBuffers(sb.getCacheBuffers());
-        }
-        data.getColumnData()[i] = cached;
-      }
-    }
-    consumer.consumeData(data);*/
-  }
-
-  @Override
-  public void setError(Throwable t) {
-    consumer.setError(t);
-  }
-
   private class DataWrapperForOrc implements DataReader, DataCache {
     private final DataReader orcDataReader;
 
@@ -948,17 +829,17 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
     }
 
     @Override
-    public DiskRangeList getFileData(long fileId, DiskRangeList range,
+    public DiskRangeList getFileData(Object fileKey, DiskRangeList range,
         long baseOffset, DiskRangeListFactory factory, BooleanRef gotAllData) {
       return (lowLevelCache == null) ? range : lowLevelCache.getFileData(
-          fileId, range, baseOffset, factory, counters, gotAllData);
+          fileKey, range, baseOffset, factory, counters, gotAllData);
     }
 
     @Override
-    public long[] putFileData(long fileId, DiskRange[] ranges,
+    public long[] putFileData(Object fileKey, DiskRange[] ranges,
         MemoryBuffer[] data, long baseOffset) {
       return (lowLevelCache == null) ? null : lowLevelCache.putFileData(
-          fileId, ranges, data, baseOffset, Priority.NORMAL, counters);
+          fileKey, ranges, data, baseOffset, Priority.NORMAL, counters);
     }
 
     @Override
@@ -989,7 +870,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
       DiskRangeList result = orcDataReader.readFileData(range, baseOffset, doForceDirect);
       counters.recordHdfsTime(startTime);
       if (DebugUtils.isTraceOrcEnabled() && LOG.isInfoEnabled()) {
-        LOG.info("Disk ranges after disk read (file " + fileId + ", base offset " + baseOffset
+        LOG.info("Disk ranges after disk read (file " + fileKey + ", base offset " + baseOffset
               + "): " + RecordReaderUtils.stringifyDiskRanges(result));
       }
       return result;

http://git-wip-us.apache.org/repos/asf/hive/blob/26b5c7b5/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
index 2e4e0c5..4e42a0f 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
@@ -18,23 +18,22 @@
 
 package org.apache.hadoop.hive.llap.io.metadata;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
-
 import org.apache.hadoop.hive.llap.IncrementalObjectSizeEstimator;
 import org.apache.hadoop.hive.llap.IncrementalObjectSizeEstimator.ObjectEstimator;
 import org.apache.hadoop.hive.llap.cache.EvictionDispatcher;
 import org.apache.hadoop.hive.llap.cache.LlapCacheableBuffer;
-import org.apache.orc.CompressionKind;
-import org.apache.orc.FileMetadata;
+import org.apache.hadoop.hive.ql.io.SyntheticFileId;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
 import org.apache.hadoop.hive.ql.io.orc.Reader;
 import org.apache.hadoop.hive.ql.io.orc.ReaderImpl.StripeInformationImpl;
-import org.apache.orc.StripeInformation;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.FileMetadata;
 import org.apache.orc.OrcProto;
-
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.orc.StripeInformation;
 
 /** ORC file metadata. Currently contains some duplicate info due to how different parts
  * of ORC use different info. Ideally we would get rid of protobuf structs in code beyond reading,
@@ -46,7 +45,7 @@ public final class OrcFileMetadata extends LlapCacheableBuffer implements FileMe
   private final List<OrcProto.StripeStatistics> stripeStats;
   private final List<OrcProto.Type> types;
   private final List<OrcProto.ColumnStatistics> fileStats;
-  private final long fileId;
+  private final Object fileKey;
   private final CompressionKind compressionKind;
   private final int rowIndexStride;
   private final int compressionBufferSize;
@@ -61,16 +60,18 @@ public final class OrcFileMetadata extends LlapCacheableBuffer implements FileMe
   private final static HashMap<Class<?>, ObjectEstimator> SIZE_ESTIMATORS;
   private final static ObjectEstimator SIZE_ESTIMATOR;
   static {
-    OrcFileMetadata ofm = createDummy(0);
+    OrcFileMetadata ofm = createDummy(new SyntheticFileId());
     SIZE_ESTIMATORS = IncrementalObjectSizeEstimator.createEstimators(ofm);
     IncrementalObjectSizeEstimator.addEstimator(
         "com.google.protobuf.LiteralByteString", SIZE_ESTIMATORS);
+    // Add long for the regular file ID estimation.
+    IncrementalObjectSizeEstimator.createEstimators(Long.class, SIZE_ESTIMATORS);
     SIZE_ESTIMATOR = SIZE_ESTIMATORS.get(OrcFileMetadata.class);
   }
 
   @VisibleForTesting
-  public static OrcFileMetadata createDummy(int fileId) {
-    OrcFileMetadata ofm = new OrcFileMetadata(fileId);
+  public static OrcFileMetadata createDummy(Object fileKey) {
+    OrcFileMetadata ofm = new OrcFileMetadata(fileKey);
     ofm.stripes.add(new StripeInformationImpl(
         OrcProto.StripeInformation.getDefaultInstance()));
     ofm.fileStats.add(OrcProto.ColumnStatistics.getDefaultInstance());
@@ -87,8 +88,8 @@ public final class OrcFileMetadata extends LlapCacheableBuffer implements FileMe
   }
 
   // Ctor for memory estimation and tests
-  private OrcFileMetadata(int fileId) {
-    this.fileId = fileId;
+  private OrcFileMetadata(Object fileKey) {
+    this.fileKey = fileKey;
     stripes = new ArrayList<StripeInformation>();
     versionList = new ArrayList<Integer>();
     fileStats = new ArrayList<>();
@@ -101,8 +102,8 @@ public final class OrcFileMetadata extends LlapCacheableBuffer implements FileMe
     compressionKind = CompressionKind.NONE;
   }
 
-  public OrcFileMetadata(long fileId, Reader reader) {
-    this.fileId = fileId;
+  public OrcFileMetadata(Object fileKey, Reader reader) {
+    this.fileKey = fileKey;
     this.stripeStats = reader.getOrcProtoStripeStatistics();
     this.compressionKind = reader.getCompressionKind();
     this.compressionBufferSize = reader.getCompressionSize();
@@ -183,8 +184,8 @@ public final class OrcFileMetadata extends LlapCacheableBuffer implements FileMe
   }
 
   @Override
-  public long getFileId() {
-    return fileId;
+  public Object getFileKey() {
+    return fileKey;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/26b5c7b5/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
index 43c8fb3..e970137 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
@@ -27,8 +27,8 @@ import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
 import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey;
 
 public class OrcMetadataCache {
-  private final ConcurrentHashMap<Long, OrcFileMetadata> metadata =
-      new ConcurrentHashMap<Long, OrcFileMetadata>();
+  private final ConcurrentHashMap<Object, OrcFileMetadata> metadata =
+      new ConcurrentHashMap<Object, OrcFileMetadata>();
   private final ConcurrentHashMap<OrcBatchKey, OrcStripeMetadata> stripeMetadata =
       new ConcurrentHashMap<OrcBatchKey, OrcStripeMetadata>();
   private final MemoryManager memoryManager;
@@ -42,7 +42,7 @@ public class OrcMetadataCache {
   public OrcFileMetadata putFileMetadata(OrcFileMetadata metaData) {
     long memUsage = metaData.getMemoryUsage();
     memoryManager.reserveMemory(memUsage, false);
-    OrcFileMetadata val = metadata.putIfAbsent(metaData.getFileId(), metaData);
+    OrcFileMetadata val = metadata.putIfAbsent(metaData.getFileKey(), metaData);
     // See OrcFileMetadata; it is always unlocked, so we just "touch" it here to simulate use.
     if (val == null) {
       val = metaData;
@@ -75,12 +75,12 @@ public class OrcMetadataCache {
     return stripeMetadata.get(stripeKey);
   }
 
-  public OrcFileMetadata getFileMetadata(long fileId) throws IOException {
-    return metadata.get(fileId);
+  public OrcFileMetadata getFileMetadata(Object fileKey) throws IOException {
+    return metadata.get(fileKey);
   }
 
   public void notifyEvicted(OrcFileMetadata buffer) {
-    metadata.remove(buffer.getFileId());
+    metadata.remove(buffer.getFileKey());
     // See OrcFileMetadata - we don't clear the object, it will be GCed when released by users.
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/26b5c7b5/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
index 8479d22..82187bd 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
@@ -17,22 +17,21 @@
  */
 package org.apache.hadoop.hive.llap.io.metadata;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
-
 import org.apache.hadoop.hive.llap.IncrementalObjectSizeEstimator;
 import org.apache.hadoop.hive.llap.IncrementalObjectSizeEstimator.ObjectEstimator;
 import org.apache.hadoop.hive.llap.cache.EvictionDispatcher;
 import org.apache.hadoop.hive.llap.cache.LlapCacheableBuffer;
-import org.apache.orc.impl.MetadataReader;
-import org.apache.orc.impl.OrcIndex;
-import org.apache.orc.StripeInformation;
+import org.apache.hadoop.hive.ql.io.SyntheticFileId;
 import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey;
 import org.apache.orc.OrcProto;
-
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.impl.MetadataReader;
+import org.apache.orc.impl.OrcIndex;
 
 public class OrcStripeMetadata extends LlapCacheableBuffer {
   private final OrcBatchKey stripeKey;
@@ -46,10 +45,12 @@ public class OrcStripeMetadata extends LlapCacheableBuffer {
   private final static HashMap<Class<?>, ObjectEstimator> SIZE_ESTIMATORS;
   private final static ObjectEstimator SIZE_ESTIMATOR;
   static {
-    OrcStripeMetadata osm = createDummy(0);
+    OrcStripeMetadata osm = createDummy(new SyntheticFileId());
     SIZE_ESTIMATORS = IncrementalObjectSizeEstimator.createEstimators(osm);
     IncrementalObjectSizeEstimator.addEstimator(
         "com.google.protobuf.LiteralByteString", SIZE_ESTIMATORS);
+    // Add long for the regular file ID estimation.
+    IncrementalObjectSizeEstimator.createEstimators(Long.class, SIZE_ESTIMATORS);
     SIZE_ESTIMATOR = SIZE_ESTIMATORS.get(OrcStripeMetadata.class);
   }
 
@@ -65,7 +66,7 @@ public class OrcStripeMetadata extends LlapCacheableBuffer {
     estimatedMemUsage = SIZE_ESTIMATOR.estimate(this, SIZE_ESTIMATORS);
   }
 
-  private OrcStripeMetadata(long id) {
+  private OrcStripeMetadata(Object id) {
     stripeKey = new OrcBatchKey(id, 0, 0);
     encodings = new ArrayList<>();
     streams = new ArrayList<>();
@@ -73,7 +74,7 @@ public class OrcStripeMetadata extends LlapCacheableBuffer {
   }
 
   @VisibleForTesting
-  public static OrcStripeMetadata createDummy(long id) {
+  public static OrcStripeMetadata createDummy(Object id) {
     OrcStripeMetadata dummy = new OrcStripeMetadata(id);
     dummy.encodings.add(OrcProto.ColumnEncoding.getDefaultInstance());
     dummy.streams.add(OrcProto.Stream.getDefaultInstance());

http://git-wip-us.apache.org/repos/asf/hive/blob/26b5c7b5/orc/src/java/org/apache/orc/FileMetadata.java
----------------------------------------------------------------------
diff --git a/orc/src/java/org/apache/orc/FileMetadata.java b/orc/src/java/org/apache/orc/FileMetadata.java
index d63bdcc..807e696 100644
--- a/orc/src/java/org/apache/orc/FileMetadata.java
+++ b/orc/src/java/org/apache/orc/FileMetadata.java
@@ -44,7 +44,7 @@ public interface FileMetadata {
 
   int getFlattenedColumnCount();
 
-  long getFileId();
+  Object getFileKey();
 
   List<Integer> getVersionList();
 

http://git-wip-us.apache.org/repos/asf/hive/blob/26b5c7b5/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java
index af64fc8..1a40847 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java
@@ -33,30 +33,18 @@ public class HdfsUtils {
   private static final HadoopShims SHIMS = ShimLoader.getHadoopShims();
   private static final Logger LOG = LoggerFactory.getLogger(HdfsUtils.class);
 
-  public static Long getFileId(
+  public static Object getFileId(
       FileSystem fileSystem, Path path, boolean allowSynthetic) throws IOException {
-    String pathStr = path.toUri().getPath();
     if (fileSystem instanceof DistributedFileSystem) {
-      return SHIMS.getFileId(fileSystem, pathStr);
+      return SHIMS.getFileId(fileSystem, path.toUri().getPath());
     }
     if (!allowSynthetic) {
       LOG.warn("Cannot get unique file ID from "
         + fileSystem.getClass().getSimpleName() + "; returning null");
       return null;
     }
-    // If we are not on DFS, we just hash the file name + size and hope for the best.
-    // TODO: we assume it only happens in tests. Fix?
-    int nameHash = pathStr.hashCode();
     FileStatus fs = fileSystem.getFileStatus(path);
-    long fileSize = fs.getLen(), modTime = fs.getModificationTime();
-    int fileSizeHash = (int)(fileSize ^ (fileSize >>> 32)),
-        modTimeHash = (int)(modTime ^ (modTime >>> 32)),
-        combinedHash = modTimeHash ^ fileSizeHash;
-    long id = (((long)nameHash & 0xffffffffL) << 32) | ((long)combinedHash & 0xffffffffL);
-    LOG.warn("Cannot get unique file ID from "
-        + fileSystem.getClass().getSimpleName() + "; using " + id + " (" + pathStr
-        + "," + nameHash + "," + fileSize + ")");
-    return id;
+    return new SyntheticFileId(path, fs.getLen(), fs.getModificationTime());
   }
 
   // TODO: this relies on HDFS not changing the format; we assume if we could get inode ID, this

http://git-wip-us.apache.org/repos/asf/hive/blob/26b5c7b5/ql/src/java/org/apache/hadoop/hive/ql/io/SyntheticFileId.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/SyntheticFileId.java b/ql/src/java/org/apache/hadoop/hive/ql/io/SyntheticFileId.java
new file mode 100644
index 0000000..905bbb9
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/SyntheticFileId.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+
+public final class SyntheticFileId implements Writable {
+  private long pathHash;
+  private long modTime;
+  private long length;
+
+  /** Writable ctor. */
+  public SyntheticFileId() {
+  }
+
+  public SyntheticFileId(Path path, long len, long modificationTime) {
+    this.pathHash = hashCode(path.toUri().getPath());
+    this.modTime = modificationTime;
+    this.length = len;
+  }
+
+  public SyntheticFileId(FileStatus file) {
+    this(file.getPath(), file.getLen(), file.getModificationTime());
+  }
+
+  @Override
+  public String toString() {
+    return "[" + pathHash + ", " + modTime + ", " + length + "]";
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = prime + (int) (length ^ (length >>> 32));
+    result = prime * result + (int) (modTime ^ (modTime >>> 32));
+    return prime * result + (int) (pathHash ^ (pathHash >>> 32));
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) return true;
+    if (!(obj instanceof SyntheticFileId)) return false;
+    SyntheticFileId other = (SyntheticFileId)obj;
+    return length == other.length && modTime == other.modTime && pathHash == other.pathHash;
+  }
+
+  private long hashCode(String path) {
+    long h = 0;
+    for (int i = 0; i < path.length(); ++i) {
+      h = 1223 * h + path.charAt(i);
+    }
+    return h;
+  }
+
+  /** Length allows for some backward compatibility wrt field addition. */
+  private static final short THREE_LONGS = 24;
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeShort(THREE_LONGS);
+    out.writeLong(pathHash);
+    out.writeLong(modTime);
+    out.writeLong(length);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    short len = in.readShort();
+    if (len < THREE_LONGS) throw new IOException("Need at least " + THREE_LONGS + " bytes");
+    pathHash = in.readLong();
+    modTime = in.readLong();
+    length = in.readLong();
+    int extraBytes = len - THREE_LONGS;
+    if (extraBytes > 0) {
+      in.skipBytes(extraBytes);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/26b5c7b5/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 0ebcd2a..cd2a668 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -84,6 +84,7 @@ import org.apache.hadoop.hive.ql.io.LlapWrappableInputFormatInterface;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
 import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
 import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader;
+import org.apache.hadoop.hive.ql.io.SyntheticFileId;
 import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
 import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
@@ -727,10 +728,11 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     boolean[] covered;
     private List<Future<List<OrcSplit>>> splitFuturesRef;
     private final UserGroupInformation ugi;
+    private final boolean allowSyntheticFileIds;
 
     public ETLSplitStrategy(Context context, FileSystem fs, Path dir,
         List<HdfsFileStatusWithId> children, boolean isOriginal, List<DeltaMetaData> deltas,
-        boolean[] covered, UserGroupInformation ugi) {
+        boolean[] covered, UserGroupInformation ugi, boolean allowSyntheticFileIds) {
       assert !children.isEmpty();
       this.context = context;
       this.dirs = Lists.newArrayList(new ETLDir(dir, fs, children.size()));
@@ -739,6 +741,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       this.deltas = deltas;
       this.covered = covered;
       this.ugi = ugi;
+      this.allowSyntheticFileIds = allowSyntheticFileIds;
     }
 
     @Override
@@ -860,7 +863,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       List<Future<List<OrcSplit>>> localList = new ArrayList<>(splits.size());
       UserGroupInformation tpUgi = ugi == null ? UserGroupInformation.getCurrentUser() : ugi;
       for (SplitInfo splitInfo : splits) {
-        localList.add(Context.threadPool.submit(new SplitGenerator(splitInfo, tpUgi)));
+        localList.add(Context.threadPool.submit(
+            new SplitGenerator(splitInfo, tpUgi, allowSyntheticFileIds)));
       }
       synchronized (splitFutures) {
         splitFutures.addAll(localList);
@@ -873,16 +877,17 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
    * as opposed to query execution (split generation does not read or cache file footers).
    */
   static final class BISplitStrategy extends ACIDSplitStrategy {
-    List<HdfsFileStatusWithId> fileStatuses;
-    boolean isOriginal;
-    List<DeltaMetaData> deltas;
-    FileSystem fs;
-    Context context;
-    Path dir;
+    private final List<HdfsFileStatusWithId> fileStatuses;
+    private final boolean isOriginal;
+    private final List<DeltaMetaData> deltas;
+    private final FileSystem fs;
+    private final Context context;
+    private final Path dir;
+    private final boolean allowSyntheticFileIds;
 
     public BISplitStrategy(Context context, FileSystem fs,
         Path dir, List<HdfsFileStatusWithId> fileStatuses, boolean isOriginal,
-        List<DeltaMetaData> deltas, boolean[] covered) {
+        List<DeltaMetaData> deltas, boolean[] covered, boolean allowSyntheticFileIds) {
       super(dir, context.numBuckets, deltas, covered);
       this.context = context;
       this.fileStatuses = fileStatuses;
@@ -890,6 +895,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       this.deltas = deltas;
       this.fs = fs;
       this.dir = dir;
+      this.allowSyntheticFileIds = allowSyntheticFileIds;
     }
 
     @Override
@@ -900,7 +906,11 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
         if (fileStatus.getLen() != 0) {
           String[] hosts = SHIMS.getLocationsWithOffset(fs, fileStatus).firstEntry().getValue()
               .getHosts();
-          OrcSplit orcSplit = new OrcSplit(fileStatus.getPath(), file.getFileId(), 0,
+          Object fileKey = file.getFileId();
+          if (fileKey == null && allowSyntheticFileIds) {
+            fileKey = new SyntheticFileId(fileStatus);
+          }
+          OrcSplit orcSplit = new OrcSplit(fileStatus.getPath(), fileKey, 0,
               fileStatus.getLen(), hosts, null, isOriginal, true, deltas, -1);
           splits.add(orcSplit);
         }
@@ -1029,8 +1039,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
   static final class SplitGenerator implements Callable<List<OrcSplit>> {
     private final Context context;
     private final FileSystem fs;
-    private final HdfsFileStatusWithId fileWithId;
     private final FileStatus file;
+    private final Long fsFileId;
     private final long blockSize;
     private final TreeMap<Long, BlockLocation> locations;
     private final FileInfo fileInfo;
@@ -1046,31 +1056,34 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     private long projColsUncompressedSize;
     private final List<OrcSplit> deltaSplits;
     private final UserGroupInformation ugi;
+    private final boolean allowSyntheticFileIds;
 
-    public SplitGenerator(SplitInfo splitInfo, UserGroupInformation ugi) throws IOException {
+    public SplitGenerator(SplitInfo splitInfo, UserGroupInformation ugi,
+        boolean allowSyntheticFileIds) throws IOException {
       this.ugi = ugi;
       this.context = splitInfo.context;
       this.fs = splitInfo.fs;
-      this.fileWithId = splitInfo.fileWithId;
-      this.file = this.fileWithId.getFileStatus();
+      this.file = splitInfo.fileWithId.getFileStatus();
+      this.fsFileId = splitInfo.fileWithId.getFileId();
       this.blockSize = this.file.getBlockSize();
       this.fileInfo = splitInfo.fileInfo;
       // TODO: potential DFS call
-      this.locations = SHIMS.getLocationsWithOffset(fs, fileWithId.getFileStatus());
+      this.locations = SHIMS.getLocationsWithOffset(fs, file);
       this.isOriginal = splitInfo.isOriginal;
       this.deltas = splitInfo.deltas;
       this.hasBase = splitInfo.hasBase;
       this.projColsUncompressedSize = -1;
       this.deltaSplits = splitInfo.getSplits();
+      this.allowSyntheticFileIds = allowSyntheticFileIds;
     }
 
     Path getPath() {
-      return fileWithId.getFileStatus().getPath();
+      return file.getPath();
     }
 
     @Override
     public String toString() {
-      return "splitter(" + fileWithId.getFileStatus().getPath() + ")";
+      return "splitter(" + file.getPath() + ")";
     }
 
     /**
@@ -1133,7 +1146,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
               maxSize = Math.max(maxSize, val.get());
             }
           } else {
-            throw new IOException("File " + fileWithId.getFileStatus().getPath().toString() +
+            throw new IOException("File " + file.getPath().toString() +
                     " should have had overlap on block starting at " + block.getOffset());
           }
         }
@@ -1161,7 +1174,11 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       final double splitRatio = (double) length / (double) fileLen;
       final long scaledProjSize = projColsUncompressedSize > 0 ?
           (long) (splitRatio * projColsUncompressedSize) : fileLen;
-      return new OrcSplit(file.getPath(), fileWithId.getFileId(), offset, length, hosts,
+      Object fileKey = fsFileId;
+      if (fileKey == null && allowSyntheticFileIds) {
+        fileKey = new SyntheticFileId(file);
+      }
+      return new OrcSplit(file.getPath(), fileKey, offset, length, hosts,
           fileMetaInfo, isOriginal, hasBase, deltas, scaledProjSize);
     }
 
@@ -1274,7 +1291,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
               && fileInfo.writerVersion != null;
           // We assume that if we needed to create a reader, we need to cache it to meta cache.
           // TODO: This will also needlessly overwrite it in local cache for now.
-          context.footerCache.put(fileWithId.getFileId(), file, fileInfo.fileMetaInfo, orcReader);
+          context.footerCache.put(fsFileId, file, fileInfo.fileMetaInfo, orcReader);
         }
       } else {
         Reader orcReader = createOrcReader();
@@ -1286,8 +1303,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
         fileMetaInfo = context.footerInSplits ?
             ((ReaderImpl) orcReader).getFileMetaInfo() : null;
         if (context.cacheStripeDetails) {
-          Long fileId = fileWithId.getFileId();
-          context.footerCache.put(fileId, file, fileMetaInfo, orcReader);
+          context.footerCache.put(fsFileId, file, fileMetaInfo, orcReader);
         }
       }
       includedCols = genIncludedColumns(types, context.conf, isOriginal);
@@ -1338,6 +1354,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       LOG.info("ORC pushdown predicate: " + context.sarg);
     }
     boolean useFileIds = HiveConf.getBoolVar(conf, ConfVars.HIVE_ORC_INCLUDE_FILE_ID_IN_SPLITS);
+    boolean allowSyntheticFileIds = useFileIds && HiveConf.getBoolVar(
+        conf, ConfVars.HIVE_ORC_ALLOW_SYNTHETIC_FILE_ID_IN_SPLITS);
     List<OrcSplit> splits = Lists.newArrayList();
     List<Future<AcidDirInfo>> pathFutures = Lists.newArrayList();
     List<Future<Void>> strategyFutures = Lists.newArrayList();
@@ -1380,8 +1398,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
 
         // We have received a new directory information, make a split strategy.
         --resultsLeft;
-        SplitStrategy<?> splitStrategy = determineSplitStrategy(combinedCtx, context,
-            adi.fs, adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles, ugi);
+        SplitStrategy<?> splitStrategy = determineSplitStrategy(combinedCtx, context, adi.fs,
+            adi.splitPath, adi.acidInfo, adi.baseOrOriginalFiles, ugi, allowSyntheticFileIds);
         if (splitStrategy == null) continue; // Combined.
 
         if (isDebugEnabled) {
@@ -1451,12 +1469,13 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
   private static SplitStrategy<?> combineOrCreateETLStrategy(CombinedCtx combinedCtx,
       Context context, FileSystem fs, Path dir, List<HdfsFileStatusWithId> files,
       List<DeltaMetaData> deltas, boolean[] covered, boolean isOriginal,
-      UserGroupInformation ugi) {
+      UserGroupInformation ugi, boolean allowSyntheticFileIds) {
     if (!deltas.isEmpty() || combinedCtx == null) {
-      return new ETLSplitStrategy(context, fs, dir, files, isOriginal, deltas, covered, ugi);
+      return new ETLSplitStrategy(
+          context, fs, dir, files, isOriginal, deltas, covered, ugi, allowSyntheticFileIds);
     } else if (combinedCtx.combined == null) {
       combinedCtx.combined = new ETLSplitStrategy(
-          context, fs, dir, files, isOriginal, deltas, covered, ugi);
+          context, fs, dir, files, isOriginal, deltas, covered, ugi, allowSyntheticFileIds);
       combinedCtx.combineStartUs = System.nanoTime();
       return null;
     } else {
@@ -1465,11 +1484,12 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       switch (r) {
       case YES: return null;
       case NO_AND_CONTINUE:
-        return new ETLSplitStrategy(context, fs, dir, files, isOriginal, deltas, covered, ugi);
+        return new ETLSplitStrategy(
+            context, fs, dir, files, isOriginal, deltas, covered, ugi, allowSyntheticFileIds);
       case NO_AND_SWAP: {
         ETLSplitStrategy oldBase = combinedCtx.combined;
         combinedCtx.combined = new ETLSplitStrategy(
-            context, fs, dir, files, isOriginal, deltas, covered, ugi);
+            context, fs, dir, files, isOriginal, deltas, covered, ugi, allowSyntheticFileIds);
         combinedCtx.combineStartUs = System.nanoTime();
         return oldBase;
       }
@@ -1798,7 +1818,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
   @VisibleForTesting
   static SplitStrategy<?> determineSplitStrategy(CombinedCtx combinedCtx, Context context,
       FileSystem fs, Path dir, AcidUtils.Directory dirInfo,
-      List<HdfsFileStatusWithId> baseOrOriginalFiles, UserGroupInformation ugi) {
+      List<HdfsFileStatusWithId> baseOrOriginalFiles, UserGroupInformation ugi,
+      boolean allowSyntheticFileIds) {
     Path base = dirInfo.getBaseDirectory();
     List<HdfsFileStatusWithId> original = dirInfo.getOriginalFiles();
     List<DeltaMetaData> deltas = AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories());
@@ -1826,20 +1847,20 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       switch(context.splitStrategyKind) {
         case BI:
           // BI strategy requested through config
-          return new BISplitStrategy(
-              context, fs, dir, baseOrOriginalFiles, isOriginal, deltas, covered);
+          return new BISplitStrategy(context, fs, dir, baseOrOriginalFiles,
+              isOriginal, deltas, covered, allowSyntheticFileIds);
         case ETL:
           // ETL strategy requested through config
-          return combineOrCreateETLStrategy(combinedCtx, context, fs,
-            dir, baseOrOriginalFiles, deltas, covered, isOriginal, ugi);
+          return combineOrCreateETLStrategy(combinedCtx, context, fs, dir, baseOrOriginalFiles,
+              deltas, covered, isOriginal, ugi, allowSyntheticFileIds);
         default:
           // HYBRID strategy
           if (avgFileSize > context.maxSize || totalFiles <= context.minSplits) {
-            return combineOrCreateETLStrategy(combinedCtx, context, fs,
-                dir, baseOrOriginalFiles, deltas, covered, isOriginal, ugi);
+            return combineOrCreateETLStrategy(combinedCtx, context, fs, dir, baseOrOriginalFiles,
+                deltas, covered, isOriginal, ugi, allowSyntheticFileIds);
           } else {
-            return new BISplitStrategy(
-                context, fs, dir, baseOrOriginalFiles, isOriginal, deltas, covered);
+            return new BISplitStrategy(context, fs, dir, baseOrOriginalFiles,
+                isOriginal, deltas, covered, allowSyntheticFileIds);
           }
       }
     } else {

http://git-wip-us.apache.org/repos/asf/hive/blob/26b5c7b5/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
index 4a27ee7..407fd62 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
@@ -26,13 +26,13 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.orc.FileMetaInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.io.ColumnarSplit;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat;
 import org.apache.hadoop.hive.ql.io.LlapAwareSplit;
+import org.apache.hadoop.hive.ql.io.SyntheticFileId;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapred.FileSplit;
 
@@ -48,11 +48,11 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
   private boolean isOriginal;
   private boolean hasBase;
   private final List<AcidInputFormat.DeltaMetaData> deltas = new ArrayList<>();
-  private OrcFile.WriterVersion writerVersion;
   private long projColsUncompressedSize;
-  private transient Long fileId;
+  private transient Object fileKey;
 
-  static final int HAS_FILEID_FLAG = 8;
+  static final int HAS_SYNTHETIC_FILEID_FLAG = 16;
+  static final int HAS_LONG_FILEID_FLAG = 8;
   static final int BASE_FLAG = 4;
   static final int ORIGINAL_FLAG = 2;
   static final int FOOTER_FLAG = 1;
@@ -64,13 +64,13 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
     super(null, 0, 0, (String[]) null);
   }
 
-  public OrcSplit(Path path, Long fileId, long offset, long length, String[] hosts,
+  public OrcSplit(Path path, Object fileId, long offset, long length, String[] hosts,
       FileMetaInfo fileMetaInfo, boolean isOriginal, boolean hasBase,
       List<AcidInputFormat.DeltaMetaData> deltas, long projectedDataSize) {
     super(path, offset, length, hosts);
-    // We could avoid serializing file ID and just replace the path with inode-based path.
-    // However, that breaks bunch of stuff because Hive later looks up things by split path.
-    this.fileId = fileId;
+    // For HDFS, we could avoid serializing file ID and just replace the path with inode-based
+    // path. However, that breaks bunch of stuff because Hive later looks up things by split path.
+    this.fileKey = fileId;
     this.fileMetaInfo = fileMetaInfo;
     hasFooter = this.fileMetaInfo != null;
     this.isOriginal = isOriginal;
@@ -84,10 +84,12 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
     //serialize path, offset, length using FileSplit
     super.write(out);
 
+    boolean isFileIdLong = fileKey instanceof Long, isFileIdWritable = fileKey instanceof Writable;
     int flags = (hasBase ? BASE_FLAG : 0) |
         (isOriginal ? ORIGINAL_FLAG : 0) |
         (hasFooter ? FOOTER_FLAG : 0) |
-        (fileId != null ? HAS_FILEID_FLAG : 0);
+        (isFileIdLong ? HAS_LONG_FILEID_FLAG : 0) |
+        (isFileIdWritable ? HAS_SYNTHETIC_FILEID_FLAG : 0);
     out.writeByte(flags);
     out.writeInt(deltas.size());
     for(AcidInputFormat.DeltaMetaData delta: deltas) {
@@ -109,8 +111,10 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
           footerBuff.limit() - footerBuff.position());
       WritableUtils.writeVInt(out, fileMetaInfo.writerVersion.getId());
     }
-    if (fileId != null) {
-      out.writeLong(fileId.longValue());
+    if (isFileIdLong) {
+      out.writeLong(((Long)fileKey).longValue());
+    } else if (isFileIdWritable) {
+      ((Writable)fileKey).write(out);
     }
   }
 
@@ -123,7 +127,11 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
     hasFooter = (FOOTER_FLAG & flags) != 0;
     isOriginal = (ORIGINAL_FLAG & flags) != 0;
     hasBase = (BASE_FLAG & flags) != 0;
-    boolean hasFileId = (HAS_FILEID_FLAG & flags) != 0;
+    boolean hasLongFileId = (HAS_LONG_FILEID_FLAG & flags) != 0,
+        hasWritableFileId = (HAS_SYNTHETIC_FILEID_FLAG & flags) != 0;
+    if (hasLongFileId && hasWritableFileId) {
+      throw new IOException("Invalid split - both file ID types present");
+    }
 
     deltas.clear();
     int numDeltas = in.readInt();
@@ -148,8 +156,12 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
       fileMetaInfo = new FileMetaInfo(compressionType, bufferSize,
           metadataSize, footerBuff, writerVersion);
     }
-    if (hasFileId) {
-      fileId = in.readLong();
+    if (hasLongFileId) {
+      fileKey = in.readLong();
+    } else if (hasWritableFileId) {
+      SyntheticFileId fileId = new SyntheticFileId();
+      fileId.readFields(in);
+      this.fileKey = fileId;
     }
   }
 
@@ -186,8 +198,8 @@ public class OrcSplit extends FileSplit implements ColumnarSplit, LlapAwareSplit
     return projColsUncompressedSize;
   }
 
-  public Long getFileId() {
-    return fileId;
+  public Object getFileKey() {
+    return fileKey;
   }
 
   @Override