You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/01/27 20:06:23 UTC

svn commit: r1655107 - in /hive/branches/llap: common/src/java/org/apache/hadoop/hive/common/ llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/ llap-server/src/java/org/apache/hadoop/hive/llap/cache/ llap-server/src/test/org/apache/hadoop/...

Author: sershe
Date: Tue Jan 27 19:06:23 2015
New Revision: 1655107

URL: http://svn.apache.org/r1655107
Log:
HIVE-9418p1 : ORC using low-level cache

Added:
    hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRange.java
Modified:
    hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java
    hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
    hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
    hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
    hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java

Added: hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRange.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRange.java?rev=1655107&view=auto
==============================================================================
--- hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRange.java (added)
+++ hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRange.java Tue Jan 27 19:06:23 2015
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common;
+
+import java.nio.ByteBuffer;
+
+/**
+ * The sections of a file.
+ */
+public class DiskRange {
+  /** The first address. */
+  public long offset;
+  /** The address afterwards. */
+  public long end;
+
+  public DiskRange(long offset, long end) {
+    this.offset = offset;
+    this.end = end;
+    if (end < offset) {
+      throw new IllegalArgumentException("invalid range " + this);
+    }
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null || other.getClass() != getClass()) {
+      return false;
+    }
+    DiskRange otherR = (DiskRange) other;
+    return otherR.offset == offset && otherR.end == end;
+  }
+
+  @Override
+  public String toString() {
+    return "range start: " + offset + " end: " + end;
+  }
+
+  public int getLength() {
+    long len = this.end - this.offset;
+    assert len <= Integer.MAX_VALUE;
+    return (int)len;
+  }
+
+  // For subclasses
+  public boolean hasData() {
+    return false;
+  }
+
+  public DiskRange slice(long offset, long end) {
+    // Rather, unexpected usage exception.
+    throw new UnsupportedOperationException();
+  }
+
+  public ByteBuffer getData() {
+    throw new UnsupportedOperationException();
+  }
+}
\ No newline at end of file

Modified: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java?rev=1655107&r1=1655106&r2=1655107&view=diff
==============================================================================
--- hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java (original)
+++ hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java Tue Jan 27 19:06:23 2015
@@ -21,17 +21,12 @@ package org.apache.hadoop.hive.llap.io.a
 import java.nio.ByteBuffer;
 
 public abstract class LlapMemoryBuffer {
-  protected LlapMemoryBuffer(ByteBuffer byteBuffer, int offset, int length) {
-    initialize(byteBuffer, offset, length);
-  }
   protected LlapMemoryBuffer() {
   }
   protected void initialize(ByteBuffer byteBuffer, int offset, int length) {
-    this.byteBuffer = byteBuffer;
-    this.offset = offset;
-    this.length = length;
+    this.byteBuffer = byteBuffer.slice();
+    this.byteBuffer.position(offset);
+    this.byteBuffer.limit(offset + length);
   }
   public ByteBuffer byteBuffer;
-  public int offset;
-  public int length;
 }
\ No newline at end of file

Modified: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java?rev=1655107&r1=1655106&r2=1655107&view=diff
==============================================================================
--- hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java (original)
+++ hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java Tue Jan 27 19:06:23 2015
@@ -18,14 +18,17 @@
 
 package org.apache.hadoop.hive.llap.io.api.cache;
 
+import java.util.LinkedList;
+
+import org.apache.hadoop.hive.common.DiskRange;
 
-public interface LowLevelCache {
 
+public interface LowLevelCache {
   /**
    * Gets file data for particular offsets. Null entries mean no data.
    * @param file File name; MUST be interned.
    */
-  LlapMemoryBuffer[] getFileData(String fileName, long[] offsets);
+  void getFileData(String fileName, LinkedList<DiskRange> ranges);
 
   /**
    * Puts file data into cache.
@@ -33,7 +36,7 @@ public interface LowLevelCache {
    * @return null if all data was put; bitmask indicating which chunks were not put otherwise;
    *         the replacement chunks from cache are updated directly in the array.
    */
-  long[] putFileData(String file, long[] offsets, LlapMemoryBuffer[] chunks);
+  long[] putFileData(String file, DiskRange[] ranges, LlapMemoryBuffer[] chunks);
 
   /**
    * Releases the buffer returned by getFileData or allocateMultiple.

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java?rev=1655107&r1=1655106&r2=1655107&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java Tue Jan 27 19:06:23 2015
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.llap.io.ap
 public final class BuddyAllocator implements Allocator {
   private static final Log LOG = LogFactory.getLog(BuddyAllocator.class);
 
+
   private final Arena[] arenas;
   private AtomicInteger allocatedArenas = new AtomicInteger(0);
 
@@ -128,12 +129,6 @@ public final class BuddyAllocator implem
     return false;
   }
 
-  public static LlapCacheableBuffer allocateFake() {
-    LlapCacheableBuffer fake = new LlapCacheableBuffer();
-    fake.initialize(-1, null, -1, 1);
-    return fake;
-  }
-
   @Override
   public void deallocate(LlapMemoryBuffer buffer) {
     LlapCacheableBuffer buf = (LlapCacheableBuffer)buffer;
@@ -338,8 +333,8 @@ public final class BuddyAllocator implem
 
     public void deallocate(LlapCacheableBuffer buffer) {
       assert data != null;
-      int freeListIx = 31 - Integer.numberOfLeadingZeros(buffer.length) - minAllocLog2,
-          headerIx = buffer.offset >>> minAllocLog2;
+      int freeListIx = 31 - Integer.numberOfLeadingZeros(buffer.byteBuffer.remaining())
+          - minAllocLog2, headerIx = buffer.byteBuffer.position() >>> minAllocLog2;
       while (true) {
         FreeList freeList = freeLists[freeListIx];
         int bHeaderIx = headerIx ^ (1 << freeListIx);

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java?rev=1655107&r1=1655106&r2=1655107&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java Tue Jan 27 19:06:23 2015
@@ -43,39 +43,39 @@ public final class LlapCacheableBuffer e
 
   private final AtomicInteger refCount = new AtomicInteger(0);
 
+  // All kinds of random stuff needed by various parts of the system, beyond the publicly
+  // visible bytes "interface". This is not so pretty since all concerns are mixed here.
+  // But at least we don't waste bunch of memory per every buffer and bunch of virtual calls.
+  /** Allocator uses this to remember which arena to alloc from.
+   * TODO Could wrap ByteBuffer instead? This needs reference anyway. */
   public int arenaIndex = -1;
+  /** ORC cache uses this to store compressed length; buffer is cached uncompressed, but
+   * the lookup is on compressed ranges, so we need to know this. */
+  public int declaredLength;
+
+  /** Priority for cache policy (should be pretty universal). */
   public double priority;
+  /** Last priority update time for cache policy (should be pretty universal). */
   public long lastUpdate = -1;
+  /** Linked list pointers for LRFU/LRU cache policies. Given that each block is in cache
+   * that might be better than external linked list. Or not, since this is not concurrent. */
   public LlapCacheableBuffer prev = null, next = null;
+  /** Index in heap for LRFU/LFU cache policies. */
   public int indexInHeap = NOT_IN_CACHE;
+  // TODO: Add 4 more bytes of crap here!
+
 
   @VisibleForTesting
   int getRefCount() {
     return refCount.get();
   }
 
-  @Override
-  public int hashCode() {
-    if (this.byteBuffer == null) return 0;
-    return (System.identityHashCode(this.byteBuffer) * 37 + offset) * 37 + length;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj) return true;
-    if (!(obj instanceof LlapCacheableBuffer)) return false;
-    LlapCacheableBuffer other = (LlapCacheableBuffer)obj;
-    // We only compare objects, and not contents of the ByteBuffer.
-    return byteBuffer == other.byteBuffer
-        && this.offset == other.offset && this.length == other.length;
-  }
-
   int incRef() {
     int newRefCount = -1;
     while (true) {
       int oldRefCount = refCount.get();
       if (oldRefCount == EVICTED_REFCOUNT) return -1;
-      assert oldRefCount >= 0;
+      assert oldRefCount >= 0 : "oldRefCount is " + oldRefCount + " " + this;
       newRefCount = oldRefCount + 1;
       if (refCount.compareAndSet(oldRefCount, newRefCount)) break;
     }
@@ -95,14 +95,14 @@ public final class LlapCacheableBuffer e
   int decRef() {
     int newRefCount = refCount.decrementAndGet();
     if (newRefCount < 0) {
-      throw new AssertionError("Unexpected refCount " + newRefCount);
+      throw new AssertionError("Unexpected refCount " + newRefCount + ": " + this);
     }
     return newRefCount;
   }
 
   @Override
   public String toString() {
-    return "0x" + Integer.toHexString(hashCode());
+    return "0x" + Integer.toHexString(System.identityHashCode(this));
   }
 
   /**

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java?rev=1655107&r1=1655106&r2=1655107&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java Tue Jan 27 19:06:23 2015
@@ -17,22 +17,32 @@
  */
 package org.apache.hadoop.hive.llap.cache;
 
+import java.nio.ByteBuffer;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.common.DiskRange;
 import org.apache.hadoop.hive.llap.DebugUtils;
 import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
 import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
 import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.CacheChunk;
 
 import com.google.common.annotations.VisibleForTesting;
 
 public class LowLevelCacheImpl implements LowLevelCache, EvictionListener {
+  private static final Log LOG = LogFactory.getLog(LowLevelCacheImpl.class);
+
   private final Allocator allocator;
 
   private AtomicInteger newEvictions = new AtomicInteger(0);
@@ -66,30 +76,81 @@ public class LowLevelCacheImpl implement
   }
 
   @Override
-  public LlapMemoryBuffer[] getFileData(String fileName, long[] offsets) {
-    LlapMemoryBuffer[] result = null;
+  public void getFileData(String fileName, LinkedList<DiskRange> ranges) {
     FileCache subCache = cache.get(fileName);
-    if (subCache == null || !subCache.incRef()) return result;
+    if (subCache == null || !subCache.incRef()) return;
     try {
-      for (int i = 0; i < offsets.length; ++i) {
-        while (true) { // Overwhelmingly only runs once.
-          long offset = offsets[i];
-          LlapCacheableBuffer buffer = subCache.cache.get(offset);
-          if (buffer == null) break;
-          if (lockBuffer(buffer)) {
-            if (result == null) {
-              result = new LlapCacheableBuffer[offsets.length];
-            }
-            result[i] = buffer;
-            break;
-          }
-          if (subCache.cache.remove(offset, buffer)) break;
-        }
+      ListIterator<DiskRange> dr = ranges.listIterator();
+      while (dr.hasNext()) {
+        getOverlappingRanges(dr, subCache.cache);
       }
     } finally {
       subCache.decRef();
     }
-    return result;
+  }
+
+  private void getOverlappingRanges(ListIterator<DiskRange> drIter,
+      ConcurrentSkipListMap<Long, LlapCacheableBuffer> cache) {
+    DiskRange currentNotCached = drIter.next();
+    Iterator<Map.Entry<Long, LlapCacheableBuffer>> matches =
+        cache.subMap(currentNotCached.offset, currentNotCached.end).entrySet().iterator();
+    long cacheEnd = -1;
+    while (matches.hasNext()) {
+      assert currentNotCached != null;
+      Map.Entry<Long, LlapCacheableBuffer> e = matches.next();
+      LlapCacheableBuffer buffer = e.getValue();
+      // Lock the buffer, validate it and add to results.
+      if (!lockBuffer(buffer)) {
+        // If we cannot lock, remove this from cache and continue.
+        matches.remove();
+        continue;
+      }
+      LOG.info("TODO# get +1 " + buffer.toString());
+      long cacheOffset = e.getKey();
+      if (cacheEnd > cacheOffset) { // compare with old cacheEnd
+        throw new AssertionError("Cache has overlapping buffers: " + cacheEnd + ") and ["
+            + cacheOffset + ", " + (cacheOffset + buffer.declaredLength) + ")");
+      }
+      cacheEnd = cacheOffset + buffer.declaredLength;
+      CacheChunk currentCached = new CacheChunk(buffer, cacheOffset, cacheEnd);
+      currentNotCached = addCachedBufferToIter(drIter, currentNotCached, currentCached);
+    }
+  }
+
+  private DiskRange addCachedBufferToIter(ListIterator<DiskRange> drIter,
+      DiskRange currentNotCached, CacheChunk currentCached) {
+    if (currentNotCached.offset == currentCached.offset) {
+      if (currentNotCached.end <= currentCached.end) {  // we assume it's always "==" now
+        // Replace the entire current DiskRange with new cached range. Java LL is idiotic, so...
+        drIter.remove();
+        drIter.add(currentCached);
+        currentNotCached = null;
+      } else {
+        // Insert the new cache range before the disk range.
+        currentNotCached.offset = currentCached.end;
+        drIter.previous();
+        drIter.add(currentCached); 
+        DiskRange dr = drIter.next();
+        assert dr == currentNotCached;
+      }
+    } else {
+      assert currentNotCached.offset < currentCached.offset;
+      long originalEnd = currentNotCached.end;
+      currentNotCached.end = currentCached.offset;
+      drIter.add(currentCached);
+      if (originalEnd <= currentCached.end) { // we assume it's always "==" now
+        // We have reached the end of the range and truncated the last non-cached range.
+        currentNotCached = null;
+      } else {
+        // Insert the new disk range after the cache range. TODO: not strictly necessary yet?
+        currentNotCached = new DiskRange(currentCached.end, originalEnd);
+        drIter.add(currentNotCached);
+        DiskRange dr = drIter.previous();
+        assert dr == currentNotCached;
+        drIter.next();
+      }
+    }
+    return currentNotCached;
   }
 
   private boolean lockBuffer(LlapCacheableBuffer buffer) {
@@ -97,18 +158,19 @@ public class LowLevelCacheImpl implement
     if (rc == 1) {
       cachePolicy.notifyLock(buffer);
     }
-    return rc >= 0;
+    return rc > 0;
   }
 
   @Override
-  public long[] putFileData(String fileName, long[] offsets, LlapMemoryBuffer[] buffers) {
+  public long[] putFileData(String fileName, DiskRange[] ranges, LlapMemoryBuffer[] buffers) {
     long[] result = null;
-    assert buffers.length == offsets.length;
+    assert buffers.length == ranges.length;
     FileCache subCache = getOrAddFileSubCache(fileName);
     try {
-      for (int i = 0; i < offsets.length; ++i) {
+      for (int i = 0; i < ranges.length; ++i) {
         LlapCacheableBuffer buffer = (LlapCacheableBuffer)buffers[i];
-        long offset = offsets[i];
+        long offset = ranges[i].offset;
+        buffer.declaredLength = ranges[i].getLength();
         assert buffer.isLocked();
         while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value).
           LlapCacheableBuffer oldVal = subCache.cache.putIfAbsent(offset, buffer);
@@ -122,8 +184,14 @@ public class LowLevelCacheImpl implement
                 + fileName + "@" + offset  + "; old " + oldVal + ", new " + buffer);
           }
           if (lockBuffer(oldVal)) {
+            // We don't do proper overlap checking because it would cost cycles and we
+            // think it will never happen. We do perform the most basic check here.
+            if (oldVal.declaredLength != buffer.declaredLength) {
+              throw new RuntimeException("Found a block with different length at the same offset: "
+                  + oldVal.declaredLength + " vs " + buffer.declaredLength + " @" + offset);
+            }
             // We found an old, valid block for this key in the cache.
-            releaseBufferInternal(buffer);
+s            releaseBufferInternal(buffer);
             buffers[i] = oldVal;
             if (result == null) {
               result = new long[align64(buffers.length) >>> 6];
@@ -196,9 +264,10 @@ public class LowLevelCacheImpl implement
     }
   }
 
+  private static final ByteBuffer fakeBuf = ByteBuffer.wrap(new byte[1]);
   public static LlapCacheableBuffer allocateFake() {
     LlapCacheableBuffer fake = new LlapCacheableBuffer();
-    fake.initialize(-1, null, -1, 1);
+    fake.initialize(-1, fakeBuf, 0, 1);
     return fake;
   }
 
@@ -210,9 +279,12 @@ public class LowLevelCacheImpl implement
 
   private static class FileCache {
     private static final int EVICTED_REFCOUNT = -1, EVICTING_REFCOUNT = -2;
-    // TODO: given the specific data, perhaps the nested thing should not be CHM
-    private ConcurrentHashMap<Long, LlapCacheableBuffer> cache
-      = new ConcurrentHashMap<Long, LlapCacheableBuffer>();
+    // TODO: given the specific data and lookups, perhaps the nested thing should not be a map
+    //       In fact, CSLM has slow single-threaded operation, and one file is probably often read
+    //       by just one (or few) threads, so a much more simple DS with locking might be better.
+    //       Let's use CSLM for now, since it's available.
+    private ConcurrentSkipListMap<Long, LlapCacheableBuffer> cache
+      = new ConcurrentSkipListMap<Long, LlapCacheableBuffer>();
     private AtomicInteger refCount = new AtomicInteger(0);
 
     boolean incRef() {

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java?rev=1655107&r1=1655106&r2=1655107&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java Tue Jan 27 19:06:23 2015
@@ -69,7 +69,7 @@ public class LowLevelFifoCachePolicy ext
         LlapCacheableBuffer candidate = iter.next();
         if (candidate.invalidate()) {
           iter.remove();
-          evicted += candidate.length;
+          evicted += candidate.byteBuffer.remaining();
           listener.notifyEvicted(candidate);
         }
       }

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java?rev=1655107&r1=1655106&r2=1655107&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java Tue Jan 27 19:06:23 2015
@@ -167,7 +167,7 @@ public class LowLevelLrfuCachePolicy ext
         // Update the state to removed-from-list, so that parallel notifyUnlock doesn't modify us.
         // TODO#: double check this is valid!
         nextCandidate.indexInHeap = LlapCacheableBuffer.NOT_IN_CACHE;
-        evicted += nextCandidate.length;
+        evicted += nextCandidate.byteBuffer.remaining();
       }
       if (firstCandidate != nextCandidate) {
         if (nextCandidate == null) {
@@ -194,7 +194,7 @@ public class LowLevelLrfuCachePolicy ext
         buffer = evictFromHeapUnderLock(time);
       }
       if (buffer == null) return evicted;
-      evicted += buffer.length;
+      evicted += buffer.byteBuffer.remaining();
       listener.notifyEvicted(buffer);
     }
     return evicted;

Modified: hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java?rev=1655107&r1=1655106&r2=1655107&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java (original)
+++ hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java Tue Jan 27 19:06:23 2015
@@ -179,10 +179,10 @@ public class TestBuddyAllocator {
     for (int j = 0; j < allocCount; ++j) {
       LlapMemoryBuffer mem = allocs[index][j];
       long testValue = testValues[index][j] = rdm.nextLong();
-      mem.byteBuffer.putLong(mem.offset, testValue);
-      int halfLength = mem.length >> 1;
-      if (halfLength + 8 <= mem.length) {
-        mem.byteBuffer.putLong(mem.offset + halfLength, testValue);
+      mem.byteBuffer.putLong(0, testValue);
+      int halfLength = mem.byteBuffer.remaining() >> 1;
+      if (halfLength + 8 <= mem.byteBuffer.remaining()) {
+        mem.byteBuffer.putLong(halfLength, testValue);
       }
     }
   }
@@ -204,10 +204,10 @@ public class TestBuddyAllocator {
       BuddyAllocator a, LlapMemoryBuffer[] allocs, long[] testValues) {
     for (int j = 0; j < allocs.length; ++j) {
       LlapCacheableBuffer mem = (LlapCacheableBuffer)allocs[j];
-      assertEquals(testValues[j], mem.byteBuffer.getLong(mem.offset));
-      int halfLength = mem.length >> 1;
-      if (halfLength + 8 <= mem.length) {
-        assertEquals(testValues[j], mem.byteBuffer.getLong(mem.offset + halfLength));
+      assertEquals(testValues[j], mem.byteBuffer.getLong(0));
+      int halfLength = mem.byteBuffer.remaining() >> 1;
+      if (halfLength + 8 <= mem.byteBuffer.remaining()) {
+        assertEquals(testValues[j], mem.byteBuffer.getLong(halfLength));
       }
       a.deallocate(mem);
     }

Modified: hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java?rev=1655107&r1=1655106&r2=1655107&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java (original)
+++ hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java Tue Jan 27 19:06:23 2015
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hive.llap.cache;
 
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
@@ -25,16 +27,15 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import javax.management.RuntimeErrorException;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.common.DiskRange;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
-import org.junit.Assume;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.CacheChunk;
 import org.junit.Test;
+
 import static org.junit.Assert.*;
 
 public class TestLowLevelCacheImpl {
@@ -83,22 +84,76 @@ public class TestLowLevelCacheImpl {
     String fn1 = "file1".intern(), fn2 = "file2".intern();
     LlapMemoryBuffer[] fakes = new LlapMemoryBuffer[] { fb(), fb(), fb(), fb(), fb(), fb() };
     verifyRefcount(fakes, 1, 1, 1, 1, 1, 1);
-    assertNull(cache.putFileData(fn1, new long[] { 1, 2 }, fbs(fakes, 0, 1)));
-    assertNull(cache.putFileData(fn2, new long[] { 1, 2 }, fbs(fakes, 2, 3)));
-    assertArrayEquals(fbs(fakes, 0, 1), cache.getFileData(fn1, new long[] { 1, 2 }));
-    assertArrayEquals(fbs(fakes, 2, 3), cache.getFileData(fn2, new long[] { 1, 2 }));
-    assertArrayEquals(fbs(fakes, 1, -1), cache.getFileData(fn1, new long[] { 2, 3 }));
+    assertNull(cache.putFileData(fn1, drs(1, 2), fbs(fakes, 0, 1)));
+    assertNull(cache.putFileData(fn2, drs(1, 2), fbs(fakes, 2, 3)));
+    verifyCacheGet(cache, fn1, 1, 3, fakes[0], fakes[1]);
+    verifyCacheGet(cache, fn2, 1, 3, fakes[2], fakes[3]);
+    verifyCacheGet(cache, fn1, 2, 4, fakes[1], dr(3, 4));
     verifyRefcount(fakes, 2, 3, 2, 2, 1, 1);
     LlapMemoryBuffer[] bufsDiff = fbs(fakes, 4, 5);
-    long[] mask = cache.putFileData(fn1, new long[] { 3, 1 }, bufsDiff);
+    long[] mask = cache.putFileData(fn1, drs(3, 1), bufsDiff);
     assertEquals(1, mask.length);
     assertEquals(2, mask[0]); // 2nd bit set - element 2 was already in cache.
     assertSame(fakes[0], bufsDiff[1]); // Should have been replaced
     verifyRefcount(fakes, 3, 3, 2, 2, 1, 0);
-    assertArrayEquals(fbs(fakes, 0, 1, 4), cache.getFileData(fn1, new long[] { 1, 2, 3 }));
+    verifyCacheGet(cache, fn1, 1, 4, fakes[0], fakes[1], fakes[4]);
     verifyRefcount(fakes, 4, 4, 2, 2, 2, 0);
   }
 
+  private void verifyCacheGet(LowLevelCacheImpl cache, String fileName, Object... stuff) {
+    LinkedList<DiskRange> input = new LinkedList<DiskRange>();
+    Iterator<DiskRange> iter = null;
+    int intCount = 0, lastInt = -1;
+    int resultCount = stuff.length;
+    for (Object obj : stuff) {
+      if (obj instanceof Integer) {
+        --resultCount;
+        assertTrue(intCount >= 0);
+        if (intCount == 0) {
+          lastInt = (Integer)obj;
+          intCount = 1;
+        } else {
+          input.add(new DiskRange(lastInt, (Integer)obj));
+          intCount = 0;
+        }
+        continue;
+      } else if (intCount >= 0) {
+        assertTrue(intCount == 0);
+        assertFalse(input.isEmpty());
+        intCount = -1;
+        cache.getFileData(fileName, input);
+        assertEquals(resultCount, input.size());
+        iter = input.iterator();
+      }
+      assertTrue(iter.hasNext());
+      DiskRange next = iter.next();
+      if (obj instanceof LlapMemoryBuffer) {
+        assertTrue(next instanceof CacheChunk);
+        assertSame(obj, ((CacheChunk)next).buffer);
+      } else {
+        assertTrue(next.equals(obj));
+      }
+    }
+  }
+
+  @Test
+  public void testMultiMatch() {
+    Configuration conf = createConf();
+    LowLevelCacheImpl cache = new LowLevelCacheImpl(
+        conf, new DummyCachePolicy(10), new DummyAllocator(), -1); // no cleanup thread
+    String fn = "file1".intern();
+    LlapMemoryBuffer[] fakes = new LlapMemoryBuffer[] { fb(), fb() };
+    assertNull(cache.putFileData(fn, new DiskRange[] { dr(2, 4), dr(6, 8) }, fakes));
+    verifyCacheGet(cache, fn, 1, 9, dr(1, 2), fakes[0], dr(4, 6), fakes[1], dr(8, 9));
+    verifyCacheGet(cache, fn, 2, 8, fakes[0], dr(4, 6), fakes[1]);
+    verifyCacheGet(cache, fn, 1, 5, dr(1, 2), fakes[0], dr(4, 5));
+    verifyCacheGet(cache, fn, 1, 3, dr(1, 2), fakes[0]);
+    verifyCacheGet(cache, fn, 3, 4, dr(3, 4)); // We don't expect cache requests from the middle.
+    verifyCacheGet(cache, fn, 3, 7, dr(3, 6), fakes[1]);
+    verifyCacheGet(cache, fn, 0, 2, 4, 6, dr(0, 2), dr(4, 6));
+    verifyCacheGet(cache, fn, 2, 4, 6, 8, fakes[0], fakes[1]);
+  }
+
   @Test
   public void testStaleValueGet() {
     Configuration conf = createConf();
@@ -106,15 +161,15 @@ public class TestLowLevelCacheImpl {
         conf, new DummyCachePolicy(10), new DummyAllocator(), -1); // no cleanup thread
     String fn1 = "file1".intern(), fn2 = "file2".intern();
     LlapMemoryBuffer[] fakes = new LlapMemoryBuffer[] { fb(), fb(), fb() };
-    assertNull(cache.putFileData(fn1, new long[] { 1, 2 }, fbs(fakes, 0, 1)));
-    assertNull(cache.putFileData(fn2, new long[] { 1 }, fbs(fakes, 2)));
-    assertArrayEquals(fbs(fakes, 0, 1), cache.getFileData(fn1, new long[] { 1, 2 }));
-    assertArrayEquals(fbs(fakes, 2), cache.getFileData(fn2, new long[] { 1 }));
+    assertNull(cache.putFileData(fn1, drs(1, 2), fbs(fakes, 0, 1)));
+    assertNull(cache.putFileData(fn2, drs(1), fbs(fakes, 2)));
+    verifyCacheGet(cache, fn1, 1, 3, fakes[0], fakes[1]);
+    verifyCacheGet(cache, fn2, 1, 2, fakes[2]);
     verifyRefcount(fakes, 2, 2, 2);
     evict(cache, fakes[0]);
     evict(cache, fakes[2]);
-    assertArrayEquals(fbs(fakes, -1, 1), cache.getFileData(fn1, new long[] { 1, 2 }));
-    assertNull(cache.getFileData(fn2, new long[] { 1 }));
+    verifyCacheGet(cache, fn1, 1, 3, dr(1, 2), fakes[1]);
+    verifyCacheGet(cache, fn2, 1, 2, dr(1, 2));
     verifyRefcount(fakes, -1, 3, -1);
   }
 
@@ -126,15 +181,15 @@ public class TestLowLevelCacheImpl {
     String fn1 = "file1".intern(), fn2 = "file2".intern();
     LlapMemoryBuffer[] fakes = new LlapMemoryBuffer[] {
         fb(), fb(), fb(), fb(), fb(), fb(), fb(), fb(), fb() };
-    assertNull(cache.putFileData(fn1, new long[] { 1, 2, 3 }, fbs(fakes, 0, 1, 2)));
-    assertNull(cache.putFileData(fn2, new long[] { 1 }, fbs(fakes, 3)));
+    assertNull(cache.putFileData(fn1, drs(1, 2, 3), fbs(fakes, 0, 1, 2)));
+    assertNull(cache.putFileData(fn2, drs(1), fbs(fakes, 3)));
     evict(cache, fakes[0]);
     evict(cache, fakes[3]);
-    long[] mask = cache.putFileData(fn1, new long[] { 1, 2, 3, 4 }, fbs(fakes, 4, 5, 6, 7));
+    long[] mask = cache.putFileData(fn1, drs(1, 2, 3, 4), fbs(fakes, 4, 5, 6, 7));
     assertEquals(1, mask.length);
     assertEquals(6, mask[0]); // Buffers at offset 2 & 3 exist; 1 exists and is stale; 4 doesn't
-    assertNull(cache.putFileData(fn2, new long[] { 1 }, fbs(fakes, 8)));
-    assertArrayEquals(fbs(fakes, 4, 2, 3, 7), cache.getFileData(fn1, new long[] { 1, 2, 3, 4 }));
+    assertNull(cache.putFileData(fn2, drs(1), fbs(fakes, 8)));
+    verifyCacheGet(cache, fn1, 1, 5, fakes[4], fakes[1], fakes[2], fakes[7]);
   }
 
   @Test
@@ -157,19 +212,23 @@ public class TestLowLevelCacheImpl {
             String fileName = isFn1 ? fn1 : fn2;
             int fileIndex = isFn1 ? 1 : 2;
             int count = rdm.nextInt(offsetsToUse);
-            long[] offsets = new long[count];
-            for (int j = 0; j < offsets.length; ++j) {
-              offsets[j] = rdm.nextInt(offsetsToUse);
+            LinkedList<DiskRange> input = new LinkedList<DiskRange>();
+            int[] offsets = new int[count];
+            for (int j = 0; j < count; ++j) {
+              int next = rdm.nextInt(offsetsToUse);
+              input.add(dr(next, next + 1));
+              offsets[j] = next;
             }
             if (isGet) {
-              LlapMemoryBuffer[] results = cache.getFileData(fileName, offsets);
-              if (results == null) continue;
-              for (int j = 0; j < offsets.length; ++j) {
-                if (results[j] == null) continue;
+              cache.getFileData(fileName, input);
+              int j = -1;
+              for (DiskRange dr : input) {
+                ++j;
+                if (!(dr instanceof CacheChunk)) continue;
                 ++gets;
-                LlapCacheableBuffer result = (LlapCacheableBuffer)(results[j]);
+                LlapCacheableBuffer result = (LlapCacheableBuffer)((CacheChunk)dr).buffer;
                 assertEquals(makeFakeArenaIndex(fileIndex, offsets[j]), result.arenaIndex);
-                result.decRef();
+                cache.releaseBuffer(result);
               }
             } else {
               LlapMemoryBuffer[] buffers = new LlapMemoryBuffer[count];
@@ -179,7 +238,8 @@ public class TestLowLevelCacheImpl {
                 buf.arenaIndex = makeFakeArenaIndex(fileIndex, offsets[j]);
                 buffers[j] = buf;
               }
-              long[] mask = cache.putFileData(fileName, offsets, buffers);
+              long[] mask = cache.putFileData(
+                  fileName, input.toArray(new DiskRange[count]), buffers);
               puts += buffers.length;
               long maskVal = 0;
               if (mask != null) {
@@ -192,7 +252,7 @@ public class TestLowLevelCacheImpl {
                   assertEquals(makeFakeArenaIndex(fileIndex, offsets[j]), buf.arenaIndex);
                 }
                 maskVal >>= 1;
-                buf.decRef();
+                cache.releaseBuffer(buf);
               }
             }
           }
@@ -210,24 +270,25 @@ public class TestLowLevelCacheImpl {
     FutureTask<Integer> evictionTask = new FutureTask<Integer>(new Callable<Integer>() {
       public Integer call() {
         boolean isFirstFile = false;
-        long[] offsets = new long[offsetsToUse];
         Random rdm = new Random(1234 + Thread.currentThread().getId());
-        for (int i = 0; i < offsetsToUse; ++i) {
-          offsets[i] = i;
-        }
+        LinkedList<DiskRange> input = new LinkedList<DiskRange>();
+        DiskRange allOffsets = new DiskRange(0, offsetsToUse + 1);
         int evictions = 0;
         syncThreadStart(cdlIn, cdlOut);
         while (rdmsDone.get() < 3) {
+          input.clear();
+          input.add(allOffsets);
           isFirstFile = !isFirstFile;
           String fileName = isFirstFile ? fn1 : fn2;
-          LlapMemoryBuffer[] results = cache.getFileData(fileName, offsets);
-          if (results == null) continue;
-          int startIndex = rdm.nextInt(results.length), index = startIndex;
+          cache.getFileData(fileName, input);
+          DiskRange[] results = input.toArray(new DiskRange[input.size()]);
+          int startIndex = rdm.nextInt(input.size()), index = startIndex;
           LlapCacheableBuffer victim = null;
           do {
-            if (results[index] != null) {
-              LlapCacheableBuffer result = (LlapCacheableBuffer)results[index];
-              result.decRef();
+            DiskRange r = results[index];
+            if (r instanceof CacheChunk) {
+              LlapCacheableBuffer result = (LlapCacheableBuffer)((CacheChunk)r).buffer;
+              cache.releaseBuffer(result);
               if (victim == null && result.invalidate()) {
                 ++evictions;
                 victim = result;
@@ -305,6 +366,18 @@ public class TestLowLevelCacheImpl {
     return fake;
   }
 
+  private DiskRange dr(int from, int to) {
+    return new DiskRange(from, to);
+  }
+
+  private DiskRange[] drs(int... offsets) {
+    DiskRange[] result = new DiskRange[offsets.length];
+    for (int i = 0; i < offsets.length; ++i) {
+      result[i] = new DiskRange(offsets[i], offsets[i] + 1);
+    }
+    return result;
+  }
+
   private Configuration createConf() {
     Configuration conf = new Configuration();
     conf.setInt(ConfVars.LLAP_ORC_CACHE_MIN_ALLOC.varname, 3);

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java?rev=1655107&r1=1655106&r2=1655107&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java Tue Jan 27 19:06:23 2015
@@ -20,28 +20,33 @@ package org.apache.hadoop.hive.ql.io.orc
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.common.DiskRange;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.CacheChunk;
+import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
+import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
 
-abstract class InStream extends InputStream {
+import com.google.common.annotations.VisibleForTesting;
 
+abstract class InStream extends InputStream {
   private static final Log LOG = LogFactory.getLog(InStream.class);
 
   private static class UncompressedStream extends InStream {
     private final String name;
-    private final ByteBuffer[] bytes;
-    private final long[] offsets;
+    private final List<DiskRange> bytes;
     private final long length;
     private long currentOffset;
     private ByteBuffer range;
     private int currentRange;
 
-    public UncompressedStream(String name, ByteBuffer[] input, long[] offsets,
-                              long length) {
+    public UncompressedStream(String name, List<DiskRange> input, long length) {
       this.name = name;
       this.bytes = input;
-      this.offsets = offsets;
       this.length = length;
       currentRange = 0;
       currentOffset = 0;
@@ -83,12 +88,10 @@ abstract class InStream extends InputStr
 
     @Override
     public void close() {
-      currentRange = bytes.length;
+      currentRange = bytes.size();
       currentOffset = length;
       // explicit de-ref of bytes[]
-      for(int i = 0; i < bytes.length; i++) {
-        bytes[i] = null;
-      }
+      bytes.clear();
     }
 
     @Override
@@ -97,14 +100,15 @@ abstract class InStream extends InputStr
     }
 
     public void seek(long desired) {
-      for(int i = 0; i < bytes.length; ++i) {
-        if (offsets[i] <= desired &&
-            desired - offsets[i] < bytes[i].remaining()) {
+      for(int i = 0; i < bytes.size(); ++i) {
+        DiskRange curRange = bytes.get(i);
+        if (curRange.offset <= desired &&
+            (desired - curRange.offset) < curRange.getLength()) {
           currentOffset = desired;
           currentRange = i;
-          this.range = bytes[i].duplicate();
+          this.range = curRange.getData();
           int pos = range.position();
-          pos += (int)(desired - offsets[i]); // this is why we duplicate
+          pos += (int)(desired - curRange.offset); // this is why we duplicate
           this.range.position(pos);
           return;
         }
@@ -122,50 +126,66 @@ abstract class InStream extends InputStr
   }
 
   private static class CompressedStream extends InStream {
+    private final String fileName;
     private final String name;
-    private final ByteBuffer[] bytes;
-    private final long[] offsets;
+    private final List<DiskRange> bytes;
     private final int bufferSize;
     private final long length;
+    private LlapMemoryBuffer cacheBuffer;
     private ByteBuffer uncompressed;
     private final CompressionCodec codec;
     private ByteBuffer compressed;
     private long currentOffset;
     private int currentRange;
     private boolean isUncompressedOriginal;
-    private boolean isDirect = false;
+    private final LowLevelCache cache;
+    private final boolean doManageBuffers = true;
 
-    public CompressedStream(String name, ByteBuffer[] input,
-                            long[] offsets, long length,
-                            CompressionCodec codec, int bufferSize
-                           ) {
+    public CompressedStream(String fileName, String name, List<DiskRange> input, long length,
+                            CompressionCodec codec, int bufferSize, LowLevelCache cache) {
+      this.fileName = fileName;
       this.bytes = input;
       this.name = name;
       this.codec = codec;
       this.length = length;
-      if(this.length > 0) {
-        isDirect = this.bytes[0].isDirect();
-      }
-      this.offsets = offsets;
       this.bufferSize = bufferSize;
       currentOffset = 0;
       currentRange = 0;
+      this.cache = cache;
     }
 
-    // TODO: this should allocate from cache
-    private ByteBuffer allocateBuffer(int size) {
+    private ByteBuffer allocateBuffer(int size, boolean isDirect) {
       // TODO: use the same pool as the ORC readers
-      if(isDirect == true) {
+      if (isDirect) {
         return ByteBuffer.allocateDirect(size);
       } else {
         return ByteBuffer.allocate(size);
       }
     }
 
+    // TODO: This should not be used for main path.
+    private final LlapMemoryBuffer[] singleAllocDest = new LlapMemoryBuffer[1];
+    private void allocateForUncompressed(int size, boolean isDirect) {
+      if (cache == null) {
+        cacheBuffer = null;
+        uncompressed = allocateBuffer(size, isDirect);
+      } else {
+        singleAllocDest[0] = null;
+        cache.allocateMultiple(singleAllocDest, size);
+        cacheBuffer = singleAllocDest[0];
+        uncompressed = cacheBuffer.byteBuffer;
+      }
+    }
+
     private void readHeader() throws IOException {
       if (compressed == null || compressed.remaining() <= 0) {
         seek(currentOffset);
       }
+      if (cacheBuffer != null) {
+        assert compressed == null;
+        return; // Next block is ready from cache.
+      }
+      long originalOffset = currentOffset;
       if (compressed.remaining() > OutStream.HEADER_SIZE) {
         int b0 = compressed.get() & 0xff;
         int b1 = compressed.get() & 0xff;
@@ -188,14 +208,19 @@ abstract class InStream extends InputStr
           isUncompressedOriginal = true;
         } else {
           if (isUncompressedOriginal) {
-            uncompressed = allocateBuffer(bufferSize);
+            allocateForUncompressed(bufferSize, slice.isDirect());
             isUncompressedOriginal = false;
           } else if (uncompressed == null) {
-            uncompressed = allocateBuffer(bufferSize);
+            allocateForUncompressed(bufferSize, slice.isDirect());
           } else {
             uncompressed.clear();
           }
           codec.decompress(slice, uncompressed);
+          if (cache != null) {
+            // TODO: this is the inefficient path
+            cache.putFileData(fileName, new DiskRange[] { new DiskRange(originalOffset,
+                chunkLength + OutStream.HEADER_SIZE) }, new LlapMemoryBuffer[] { cacheBuffer });
+          }
         }
       } else {
         throw new IllegalStateException("Can't read header at " + this);
@@ -239,13 +264,20 @@ abstract class InStream extends InputStr
 
     @Override
     public void close() {
+      cacheBuffer = null;
       uncompressed = null;
       compressed = null;
-      currentRange = bytes.length;
+      currentRange = bytes.size();
       currentOffset = length;
-      for(int i = 0; i < bytes.length; i++) {
-        bytes[i] = null;
+      if (doManageBuffers) {
+        // TODO: this is the inefficient path for now. LLAP will used this differently.
+        for (DiskRange range : bytes) {
+          if (range instanceof CacheChunk) {
+            cache.releaseBuffer(((CacheChunk)range).buffer);
+          }
+        }
       }
+      bytes.clear();
     }
 
     @Override
@@ -262,8 +294,8 @@ abstract class InStream extends InputStr
       }
     }
 
-    /* slices a read only contigous buffer of chunkLength */
-    private ByteBuffer slice(int chunkLength) throws IOException {
+    /* slices a read only contiguous buffer of chunkLength */
+      private ByteBuffer slice(int chunkLength) throws IOException {
       int len = chunkLength;
       final long oldOffset = currentOffset;
       ByteBuffer slice;
@@ -274,7 +306,7 @@ abstract class InStream extends InputStr
         currentOffset += len;
         compressed.position(compressed.position() + len);
         return slice;
-      } else if (currentRange >= (bytes.length - 1)) {
+      } else if (currentRange >= (bytes.size() - 1)) {
         // nothing has been modified yet
         throw new IOException("EOF in " + this + " while trying to read " +
             chunkLength + " bytes");
@@ -288,16 +320,20 @@ abstract class InStream extends InputStr
 
       // we need to consolidate 2 or more buffers into 1
       // first copy out compressed buffers
-      ByteBuffer copy = allocateBuffer(chunkLength);
+      ByteBuffer copy = allocateBuffer(chunkLength, compressed.isDirect());
       currentOffset += compressed.remaining();
       len -= compressed.remaining();
       copy.put(compressed);
 
-      while (len > 0 && (++currentRange) < bytes.length) {
+      while (len > 0 && (++currentRange) < bytes.size()) {
         if (LOG.isDebugEnabled()) {
           LOG.debug(String.format("Read slow-path, >1 cross block reads with %s", this.toString()));
         }
-        compressed = bytes[currentRange].duplicate();
+        DiskRange range = bytes.get(currentRange);
+        if (!(range instanceof BufferChunk)) {
+          throw new IOException("Trying to extend compressed block into uncompressed block");
+        }
+        compressed = range.getData().duplicate();
         if (compressed.remaining() >= len) {
           slice = compressed.slice();
           slice.limit(len);
@@ -318,40 +354,61 @@ abstract class InStream extends InputStr
     }
 
     private void seek(long desired) throws IOException {
-      for(int i = 0; i < bytes.length; ++i) {
-        if (offsets[i] <= desired &&
-            desired - offsets[i] < bytes[i].remaining()) {
+      for(int i = 0; i < bytes.size(); ++i) {
+        DiskRange range = bytes.get(i);
+        if (range.offset <= desired && desired < range.end) {
           currentRange = i;
-          compressed = bytes[i].duplicate();
-          int pos = compressed.position();
-          pos += (int)(desired - offsets[i]);
-          compressed.position(pos);
+          if (range instanceof BufferChunk) {
+            cacheBuffer = null;
+            compressed = range.getData().duplicate();
+            int pos = compressed.position();
+            pos += (int)(desired - range.offset);
+            compressed.position(pos);
+          } else {
+            compressed = null;
+            cacheBuffer = ((CacheChunk)range).buffer;
+            uncompressed = cacheBuffer.byteBuffer.duplicate();
+            if (desired != range.offset) {
+              throw new IOException("Cannot seek into the middle of uncompressed cached data");
+            }
+          }
           currentOffset = desired;
           return;
         }
       }
       // if they are seeking to the precise end, go ahead and let them go there
-      int segments = bytes.length;
-      if (segments != 0 &&
-          desired == offsets[segments - 1] + bytes[segments - 1].remaining()) {
+      int segments = bytes.size();
+      if (segments != 0 && desired == bytes.get(segments - 1).end) {
+        DiskRange range = bytes.get(segments - 1);
         currentRange = segments - 1;
-        compressed = bytes[currentRange].duplicate();
-        compressed.position(compressed.limit());
-        currentOffset = desired;
+        if (range instanceof BufferChunk) {
+          cacheBuffer = null;
+          compressed = range.getData().duplicate();
+          compressed.position(compressed.limit());
+        } else {
+          compressed = null;
+          cacheBuffer = ((CacheChunk)range).buffer;
+          uncompressed = cacheBuffer.byteBuffer.duplicate();
+          uncompressed.position(uncompressed.limit());
+          if (desired != range.offset) {
+            throw new IOException("Cannot seek into the middle of uncompressed cached data");
+          }
+          currentOffset = desired;
+        }
         return;
       }
-      throw new IOException("Seek outside of data in " + this + " to " +
-        desired);
+      throw new IOException("Seek outside of data in " + this + " to " + desired);
     }
 
     private String rangeString() {
       StringBuilder builder = new StringBuilder();
-      for(int i=0; i < offsets.length; ++i) {
+      for(int i=0; i < bytes.size(); ++i) {
         if (i != 0) {
           builder.append("; ");
         }
-        builder.append(" range " + i + " = " + offsets[i] + " to " +
-            bytes[i].remaining());
+        DiskRange range = bytes.get(i);
+        builder.append(" range " + i + " = " + range.offset
+            + " to " + (range.end - range.offset));
       }
       return builder.toString();
     }
@@ -382,17 +439,43 @@ abstract class InStream extends InputStr
    * @return an input stream
    * @throws IOException
    */
+  @VisibleForTesting
+  @Deprecated
   public static InStream create(String name,
-                                ByteBuffer[] input,
+                                ByteBuffer[] buffers,
                                 long[] offsets,
                                 long length,
                                 CompressionCodec codec,
                                 int bufferSize) throws IOException {
+    List<DiskRange> input = new ArrayList<DiskRange>(buffers.length);
+    for (int i = 0; i < buffers.length; ++i) {
+      input.add(new BufferChunk(buffers[i], offsets[i]));
+    }
+    return create(null, name, input, length, codec, bufferSize, null);
+  }
+
+  /**
+   * Create an input stream from a list of disk ranges with data.
+   * @param name the name of the stream
+   * @param input the list of ranges of bytes for the stream; from disk or cache
+   * @param length the length in bytes of the stream
+   * @param codec the compression codec
+   * @param bufferSize the compression buffer size
+   * @param cache Low-level cache to use to put data, if any. Only works with compressed streams.
+   * @return an input stream
+   * @throws IOException
+   */
+  public static InStream create(String fileName,
+                                String name,
+                                List<DiskRange> input,
+                                long length,
+                                CompressionCodec codec,
+                                int bufferSize,
+                                LowLevelCache cache) throws IOException {
     if (codec == null) {
-      return new UncompressedStream(name, input, offsets, length);
+      return new UncompressedStream(name, input, length);
     } else {
-      return new CompressedStream(name, input, offsets, length, codec,
-          bufferSize);
+      return new CompressedStream(fileName, name, input, length, codec, bufferSize, cache);
     }
   }
 }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java?rev=1655107&r1=1655106&r2=1655107&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java Tue Jan 27 19:06:23 2015
@@ -33,12 +33,14 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.DiskRange;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto.UserMetadataItem;
 import org.apache.hadoop.hive.ql.util.JavaDataModel;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -463,14 +465,14 @@ public class ReaderImpl implements Reade
       int footerBufferSize = footerBuffer.limit() - footerBuffer.position() - metadataSize;
       footerBuffer.limit(position + metadataSize);
 
-      InputStream instream = InStream.create("metadata", new ByteBuffer[]{footerBuffer},
-          new long[]{0L}, metadataSize, codec, bufferSize);
+      InputStream instream = InStream.create(null, "metadata", Lists.<DiskRange>newArrayList(
+          new BufferChunk(footerBuffer, 0)), metadataSize, codec, bufferSize, null);
       this.metadata = OrcProto.Metadata.parseFrom(instream);
 
       footerBuffer.position(position + metadataSize);
       footerBuffer.limit(position + metadataSize + footerBufferSize);
-      instream = InStream.create("footer", new ByteBuffer[]{footerBuffer},
-          new long[]{0L}, footerBufferSize, codec, bufferSize);
+      instream = InStream.create(null, "footer", Lists.<DiskRange>newArrayList(
+          new BufferChunk(footerBuffer, 0)), footerBufferSize, codec, bufferSize, null);
       this.footer = OrcProto.Footer.parseFrom(instream);
 
       footerBuffer.position(position);

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1655107&r1=1655106&r2=1655107&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Tue Jan 27 19:06:23 2015
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.io.orc
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_ZEROCOPY;
 
 import java.io.EOFException;
+import java.io.FileDescriptor;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.math.BigInteger;
@@ -29,7 +30,9 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.ListIterator;
 import java.util.Map;
 import java.util.TreeMap;
 
@@ -41,10 +44,12 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.DiskRange;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.Consumer;
 import org.apache.hadoop.hive.llap.io.api.EncodedColumn;
+import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
 import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
 import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
 import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
@@ -79,12 +84,14 @@ import org.apache.hadoop.io.LongWritable
 import org.apache.hadoop.io.Text;
 
 import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.Lists;
 
 public class RecordReaderImpl implements RecordReader {
 
   private static final Log LOG = LogFactory.getLog(RecordReaderImpl.class);
   private static final boolean isLogTraceEnabled = LOG.isTraceEnabled();
 
+  private final String fileName;
   private final FSDataInputStream file;
   private final long firstRow;
   protected final List<StripeInformation> stripes =
@@ -102,16 +109,16 @@ public class RecordReaderImpl implements
   private long rowCountInStripe = 0;
   private final Map<StreamName, InStream> streams =
       new HashMap<StreamName, InStream>();
-  List<BufferChunk> bufferChunks = new ArrayList<BufferChunk>(0);
+  List<DiskRange> bufferChunks = new ArrayList<DiskRange>(0);
   private final TreeReader reader;
   private final OrcProto.RowIndex[] indexes;
   private final SearchArgument sarg;
   // the leaf predicates for the sarg
   private final List<PredicateLeaf> sargLeaves;
-  // an array the same length as the sargLeaves that map them to column ids
-  private final int[] filterColumns;
   // an array about which row groups aren't skipped
   private boolean[] includedRowGroups = null;
+  // an array the same length as the sargLeaves that map them to column ids
+  private final int[] filterColumns;
   private final Configuration conf;
 
   private final ByteBufferAllocatorPool pool = new ByteBufferAllocatorPool();
@@ -245,6 +252,7 @@ public class RecordReaderImpl implements
       long strideRate,
       Configuration conf
   ) throws IOException {
+    this.fileName = path.toString().intern(); // should we normalize this, like DFS would?
     this.file = fileSystem.open(path);
     this.codec = codec;
     this.types = types;
@@ -2280,9 +2288,9 @@ public class RecordReaderImpl implements
     ByteBuffer tailBuf = ByteBuffer.allocate(tailLength);
     file.seek(offset);
     file.readFully(tailBuf.array(), tailBuf.arrayOffset(), tailLength);
-    return OrcProto.StripeFooter.parseFrom(InStream.create("footer",
-        new ByteBuffer[]{tailBuf}, new long[]{0}, tailLength, codec,
-        bufferSize));
+    return OrcProto.StripeFooter.parseFrom(InStream.create(null, "footer",
+        Lists.<DiskRange>newArrayList(new BufferChunk(tailBuf, 0)),
+        tailLength, codec, bufferSize, null));
   }
 
   static enum Location {
@@ -2633,8 +2641,10 @@ public class RecordReaderImpl implements
     }
     if(bufferChunks != null) {
       if(zcr != null) {
-        for (BufferChunk bufChunk : bufferChunks) {
-          zcr.releaseBuffer(bufChunk.chunk);
+        for (DiskRange range : bufferChunks) {
+          if (range instanceof BufferChunk) {
+            zcr.releaseBuffer(((BufferChunk)range).chunk);
+          }
         }
       }
       bufferChunks.clear();
@@ -2691,68 +2701,76 @@ public class RecordReaderImpl implements
                                   ) throws IOException {
     long start = stripe.getIndexLength();
     long end = start + stripe.getDataLength();
-    // TODO: planning should be added here too, to take cache into account
     // explicitly trigger 1 big read
-    DiskRange[] ranges = new DiskRange[]{new DiskRange(start, end)};
-    bufferChunks = readDiskRanges(file, zcr, stripe.getOffset(), Arrays.asList(ranges));
+    LinkedList<DiskRange> rangesToRead = Lists.newLinkedList();
+    rangesToRead.add(new DiskRange(start, end));
+    if (this.cache != null) {
+      cache.getFileData(fileName, rangesToRead);
+    }
+    readDiskRanges(file, zcr, stripe.getOffset(), rangesToRead);
+    bufferChunks = rangesToRead;
     List<OrcProto.Stream> streamDescriptions = stripeFooter.getStreamsList();
-    createStreams(streamDescriptions, bufferChunks, null, codec, bufferSize, streams);
+    createStreams(
+        streamDescriptions, bufferChunks, null, codec, bufferSize, streams, cache);
     // TODO: decompressed data from streams should be put in cache
   }
 
   /**
-   * The sections of stripe that we need to read.
+   * The sections of stripe that we have read.
+   * This might not match diskRange - 1 disk range can be multiple buffer chunks, depending on DFS block boundaries.
    */
-  static class DiskRange {
-    /** the first address we need to read. */
-    long offset;
-    /** the first address afterwards. */
-    long end;
+  public static class BufferChunk extends DiskRange {
+    final ByteBuffer chunk;
 
-    DiskRange(long offset, long end) {
-      this.offset = offset;
-      this.end = end;
-      if (end < offset) {
-        throw new IllegalArgumentException("invalid range " + this);
-      }
+    BufferChunk(ByteBuffer chunk, long offset) {
+      super(offset, offset + chunk.remaining());
+      this.chunk = chunk;
     }
 
     @Override
-    public boolean equals(Object other) {
-      if (other == null || other.getClass() != getClass()) {
-        return false;
-      }
-      DiskRange otherR = (DiskRange) other;
-      return otherR.offset == offset && otherR.end == end;
+    public boolean hasData() {
+      return chunk != null;
     }
 
     @Override
-    public String toString() {
-      return "range start: " + offset + " end: " + end;
+    public final String toString() {
+      return "range start: " + offset + " size: " + chunk.remaining() + " type: "
+          + (chunk.isDirect() ? "direct" : "array-backed");
+    }
+
+    @Override
+    public DiskRange slice(long offset, long end) {
+      assert offset <= end && offset >= this.offset && end <= this.end;
+      ByteBuffer sliceBuf = chunk.slice();
+      int newPos = (int)(offset - this.offset);
+      int newLen = chunk.limit() - chunk.position() - (int)(this.end - end);
+      sliceBuf.position(newPos);
+      sliceBuf.limit(newPos + newLen);
+      return new BufferChunk(sliceBuf, offset);
+    }
+
+    @Override
+    public ByteBuffer getData() {
+      return chunk;
     }
   }
 
-  /**
-   * The sections of stripe that we have read.
-   * This might not match diskRange - 1 disk range can be multiple buffer chunks, depending on DFS block boundaries.
-   */
-  static class BufferChunk {
-    final ByteBuffer chunk;
-    /** the first address we need to read. */
-    final long offset;
-    /** end of the buffer **/
-    final long end;
+  public static class CacheChunk extends DiskRange {
+    public final LlapMemoryBuffer buffer;
 
-    BufferChunk(ByteBuffer chunk, long offset) {
-      this.offset = offset;
-      this.chunk = chunk;
-      end = offset + chunk.remaining();
+    public CacheChunk(LlapMemoryBuffer buffer, long offset, long end) {
+      super(offset, end);
+      this.buffer = buffer;
     }
 
     @Override
-    public final String toString() {
-      return "range start: " + offset + " size: " + chunk.remaining() + " type: "
-          + (chunk.isDirect() ? "direct" : "array-backed");
+    public boolean hasData() {
+      return buffer != null;
+    }
+
+    @Override
+    public ByteBuffer getData() {
+      return buffer.byteBuffer;
     }
   }
 
@@ -2860,7 +2878,7 @@ public class RecordReaderImpl implements
    * @param compressionSize the compression block size
    * @return the list of disk ranges that will be loaded
    */
-  static List<DiskRange> planReadPartialDataStreams
+  static LinkedList<DiskRange> planReadPartialDataStreams
       (List<OrcProto.Stream> streamList,
        OrcProto.RowIndex[] indexes,
        boolean[] includedColumns,
@@ -2869,7 +2887,7 @@ public class RecordReaderImpl implements
        List<OrcProto.ColumnEncoding> encodings,
        List<OrcProto.Type> types,
        int compressionSize) {
-    List<DiskRange> result = new ArrayList<DiskRange>();
+    LinkedList<DiskRange> result = new LinkedList<DiskRange>();
     long offset = 0;
     // figure out which columns have a present stream
     boolean[] hasNull = new boolean[types.size()];
@@ -2889,6 +2907,7 @@ public class RecordReaderImpl implements
             isDictionary(streamKind, encodings.get(column))) {
           result.add(new DiskRange(offset, offset + length));
         } else {
+          DiskRange lastRange = null;
           for(int group=0; group < includedRowGroups.length; ++group) {
             if (includedRowGroups[group]) {
               int posn = getIndexPosition(encodings.get(column).getKind(),
@@ -2901,7 +2920,6 @@ public class RecordReaderImpl implements
               } else {
                 nextGroupOffset = length;
               }
-
               // figure out the worst case last location
 
               // if adjacent groups have the same compressed block offset then stretch the slop
@@ -2911,7 +2929,15 @@ public class RecordReaderImpl implements
                   : WORST_UNCOMPRESSED_SLOP;
               long end = (group == includedRowGroups.length - 1) ? length : Math.min(length,
                   nextGroupOffset + slop);
-              result.add(new DiskRange(offset + start, offset + end));
+              start += offset;
+              end += offset;
+              if (lastRange != null && overlap(lastRange.offset, lastRange.end, start, end)) {
+                lastRange.offset = Math.min(lastRange.offset, start);
+                lastRange.end = Math.max(lastRange.end, end);
+              } else {
+                lastRange = new DiskRange(start, end);
+                result.add(lastRange);
+              }
             }
           }
         }
@@ -2951,19 +2977,23 @@ public class RecordReaderImpl implements
    *    ranges
    * @throws IOException
    */
-  static List<BufferChunk> readDiskRanges(FSDataInputStream file,
+  static void readDiskRanges(FSDataInputStream file,
                                  ZeroCopyReaderShim zcr,
                                  long base,
-                                 List<DiskRange> ranges) throws IOException {
-    ArrayList<BufferChunk> result = new ArrayList<RecordReaderImpl.BufferChunk>(ranges.size());
-    for(DiskRange range: ranges) {
+                                 LinkedList<DiskRange> ranges) throws IOException {
+    ListIterator<DiskRange> rangeIter = ranges.listIterator();
+    while (rangeIter.hasNext()) {
+      DiskRange range = rangeIter.next();
+      if (range.hasData()) continue;
+      rangeIter.remove();
+      rangeIter.previous(); // TODO: is this valid on single-element list?
       int len = (int) (range.end - range.offset);
       long off = range.offset;
       file.seek(base + off);
       if(zcr != null) {
         while(len > 0) {
           ByteBuffer partial = zcr.readBuffer(len, false);
-          result.add(new BufferChunk(partial, off));
+          ranges.add(new BufferChunk(partial, off));
           int read = partial.remaining();
           len -= read;
           off += read;
@@ -2971,10 +3001,9 @@ public class RecordReaderImpl implements
       } else {
         byte[] buffer = new byte[len];
         file.readFully(buffer, 0, buffer.length);
-        result.add(new BufferChunk(ByteBuffer.wrap(buffer), range.offset));
+        ranges.add(new BufferChunk(ByteBuffer.wrap(buffer), range.offset));
       }
     }
-    return result;
   }
 
   /**
@@ -3010,78 +3039,79 @@ public class RecordReaderImpl implements
     return buffer.toString();
   }
 
-  static void createStreams(List<OrcProto.Stream> streamDescriptions,
-                            List<BufferChunk> ranges,
+  void createStreams(List<OrcProto.Stream> streamDescriptions,
+                            List<DiskRange> ranges,
                             boolean[] includeColumn,
                             CompressionCodec codec,
                             int bufferSize,
-                            Map<StreamName, InStream> streams
-                           ) throws IOException {
-    long offset = 0;
+                            Map<StreamName, InStream> streams,
+                            LowLevelCache cache) throws IOException {
+    long streamOffset = 0;
     for(OrcProto.Stream streamDesc: streamDescriptions) {
       int column = streamDesc.getColumn();
-      if ((includeColumn == null || includeColumn[column]) &&
-          StreamName.getArea(streamDesc.getKind()) == StreamName.Area.DATA) {
-        long length = streamDesc.getLength();
-        int first = -1;
-        int last = -2;
-        for(int i=0; i < ranges.size(); ++i) {
-          BufferChunk range = ranges.get(i);
-          if (overlap(offset, offset+length, range.offset, range.end)) {
-            if (first == -1) {
-              first = i;
-            }
-            last = i;
-          }
-        }
-        ByteBuffer[] buffers = new ByteBuffer[last - first + 1];
-        long[] offsets = new long[last - first + 1];
-        for(int i=0; i < buffers.length; ++i) {
-          BufferChunk range = ranges.get(i + first);
-          long start = Math.max(range.offset, offset);
-          long end = Math.min(range.end, offset+length);
-          buffers[i] = range.chunk.slice();
-          assert range.chunk.position() == 0; // otherwise we'll mix up positions
-          /*
-           * buffers are positioned in-wards if the offset > range.offset
-           * offsets[i] == range.offset - offset, except if offset > range.offset
-           */
-          if(offset > range.offset) {
-            buffers[i].position((int)(offset - range.offset));
-            buffers[i].limit((int)(end - range.offset));
-            offsets[i] = 0;
-          } else {
-            buffers[i].position(0);
-            buffers[i].limit((int)(end - range.offset));
-            offsets[i] = (range.offset - offset);
-          }
+      if ((includeColumn != null && !includeColumn[column]) ||
+          StreamName.getArea(streamDesc.getKind()) != StreamName.Area.DATA) {
+        streamOffset += streamDesc.getLength();
+        continue;
+      }
+      long streamEnd = streamOffset + streamDesc.getLength();
+      // TODO: This assumes sorted ranges (as do many other parts of ORC code.
+      ArrayList<DiskRange> buffers = new ArrayList<DiskRange>();
+      boolean inRange = false;
+      for (int i = 0; i < ranges.size(); ++i) {
+        DiskRange range = ranges.get(i);
+        boolean isLast = range.end >= streamEnd;
+        if (!inRange) {
+          if (range.end >= streamOffset) continue; // Skip until we are in range.
+          inRange = true;
+          if (range.offset < streamOffset) {
+            // Partial first buffer, add a slice of it.
+            buffers.add(range.slice(Math.max(range.offset, streamOffset),
+                Math.min(streamEnd, range.end)));
+            if (isLast) break; // Partial first buffer is also partial last buffer.
+            continue;
+          }
+        }
+        if (range.end > streamEnd) {
+          // Partial last buffer (may also be the first buffer), add a slice of it.
+          buffers.add(range.slice(range.offset, Math.min(streamEnd, range.end)));
+          break;
         }
-        StreamName name = new StreamName(column, streamDesc.getKind());
-        streams.put(name, InStream.create(name.toString(), buffers, offsets,
-            length, codec, bufferSize));
+        buffers.add(range); // Normal buffer.
       }
-      offset += streamDesc.getLength();
+      StreamName name = new StreamName(column, streamDesc.getKind());
+      streams.put(name, InStream.create(fileName, name.toString(), buffers,
+          streamEnd - streamOffset, codec, bufferSize, cache));
+      streamOffset += streamDesc.getLength();
     }
   }
 
+  private LowLevelCache cache = null;
+  public void setCache(LowLevelCache cache) {
+    this.cache = cache;
+  }
+
   private void readPartialDataStreams(StripeInformation stripe
                                       ) throws IOException {
     List<OrcProto.Stream> streamList = stripeFooter.getStreamsList();
-    // TODO: planning should take cache into account
-    List<DiskRange> chunks =
+    LinkedList<DiskRange> rangesToRead =
         planReadPartialDataStreams(streamList,
             indexes, included, includedRowGroups, codec != null,
             stripeFooter.getColumnsList(), types, bufferSize);
     if (LOG.isDebugEnabled()) {
-      LOG.debug("chunks = " + stringifyDiskRanges(chunks));
+      LOG.debug("chunks = " + stringifyDiskRanges(rangesToRead));
     }
-    mergeDiskRanges(chunks);
+    mergeDiskRanges(rangesToRead);
+    if (this.cache != null) {
+      cache.getFileData(fileName, rangesToRead);
+    }
+    readDiskRanges(file, zcr, stripe.getOffset(), rangesToRead);
+    bufferChunks = rangesToRead;
     if (LOG.isDebugEnabled()) {
-      LOG.debug("merge = " + stringifyDiskRanges(chunks));
+      LOG.debug("merge = " + stringifyDiskRanges(rangesToRead));
     }
-    bufferChunks = readDiskRanges(file, zcr, stripe.getOffset(), chunks);
-    // TODO: decompressed data from streams should be put in cache
-    createStreams(streamList, bufferChunks, included, codec, bufferSize, streams);
+
+    createStreams(streamList, bufferChunks, included, codec, bufferSize, streams, cache);
   }
 
   @Override
@@ -3272,9 +3302,9 @@ public class RecordReaderImpl implements
           byte[] buffer = new byte[(int) stream.getLength()];
           file.seek(offset);
           file.readFully(buffer);
-          indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create("index",
-              new ByteBuffer[] {ByteBuffer.wrap(buffer)}, new long[]{0},
-              stream.getLength(), codec, bufferSize));
+          indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create(null, "index",
+              Lists.<DiskRange>newArrayList(new BufferChunk(ByteBuffer.wrap(buffer), 0)),
+              stream.getLength(), codec, bufferSize, null));
         }
       }
       offset += stream.getLength();

Modified: hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java?rev=1655107&r1=1655106&r2=1655107&view=diff
==============================================================================
--- hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java (original)
+++ hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java Tue Jan 27 19:06:23 2015
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PositionedReadable;
 import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.hive.common.DiskRange;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.Location;
 import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
@@ -769,18 +770,18 @@ public class TestRecordReaderImpl {
     assertTrue(!RecordReaderImpl.overlap(0, 10, 11, 12));
   }
 
-  private static List<RecordReaderImpl.DiskRange> diskRanges(Integer... points) {
-    List<RecordReaderImpl.DiskRange> result =
-        new ArrayList<RecordReaderImpl.DiskRange>();
+  private static List<DiskRange> diskRanges(Integer... points) {
+    List<DiskRange> result =
+        new ArrayList<DiskRange>();
     for(int i=0; i < points.length; i += 2) {
-      result.add(new RecordReaderImpl.DiskRange(points[i], points[i+1]));
+      result.add(new DiskRange(points[i], points[i+1]));
     }
     return result;
   }
 
   @Test
   public void testMergeDiskRanges() throws Exception {
-    List<RecordReaderImpl.DiskRange> list = diskRanges();
+    List<DiskRange> list = diskRanges();
     RecordReaderImpl.mergeDiskRanges(list);
     assertThat(list, is(diskRanges()));
     list = diskRanges(100, 200, 300, 400, 500, 600);
@@ -860,7 +861,7 @@ public class TestRecordReaderImpl {
 
   @Test
   public void testPartialPlan() throws Exception {
-    List<RecordReaderImpl.DiskRange> result;
+    List<DiskRange> result;
 
     // set the streams
     List<OrcProto.Stream> streams = new ArrayList<OrcProto.Stream>();
@@ -968,7 +969,7 @@ public class TestRecordReaderImpl {
 
   @Test
   public void testPartialPlanCompressed() throws Exception {
-    List<RecordReaderImpl.DiskRange> result;
+    List<DiskRange> result;
 
     // set the streams
     List<OrcProto.Stream> streams = new ArrayList<OrcProto.Stream>();
@@ -1050,7 +1051,7 @@ public class TestRecordReaderImpl {
 
   @Test
   public void testPartialPlanString() throws Exception {
-    List<RecordReaderImpl.DiskRange> result;
+    List<DiskRange> result;
 
     // set the streams
     List<OrcProto.Stream> streams = new ArrayList<OrcProto.Stream>();