You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/01/27 20:06:23 UTC
svn commit: r1655107 - in /hive/branches/llap:
common/src/java/org/apache/hadoop/hive/common/
llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/
llap-server/src/java/org/apache/hadoop/hive/llap/cache/
llap-server/src/test/org/apache/hadoop/...
Author: sershe
Date: Tue Jan 27 19:06:23 2015
New Revision: 1655107
URL: http://svn.apache.org/r1655107
Log:
HIVE-9418p1 : ORC using low-level cache
Added:
hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRange.java
Modified:
hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java
hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java
Added: hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRange.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRange.java?rev=1655107&view=auto
==============================================================================
--- hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRange.java (added)
+++ hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRange.java Tue Jan 27 19:06:23 2015
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common;
+
+import java.nio.ByteBuffer;
+
+/**
+ * The sections of a file.
+ */
+public class DiskRange {
+ /** The first address. */
+ public long offset;
+ /** The address afterwards. */
+ public long end;
+
+ public DiskRange(long offset, long end) {
+ this.offset = offset;
+ this.end = end;
+ if (end < offset) {
+ throw new IllegalArgumentException("invalid range " + this);
+ }
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null || other.getClass() != getClass()) {
+ return false;
+ }
+ DiskRange otherR = (DiskRange) other;
+ return otherR.offset == offset && otherR.end == end;
+ }
+
+ @Override
+ public String toString() {
+ return "range start: " + offset + " end: " + end;
+ }
+
+ public int getLength() {
+ long len = this.end - this.offset;
+ assert len <= Integer.MAX_VALUE;
+ return (int)len;
+ }
+
+ // For subclasses
+ public boolean hasData() {
+ return false;
+ }
+
+ public DiskRange slice(long offset, long end) {
+ // Rather, unexpected usage exception.
+ throw new UnsupportedOperationException();
+ }
+
+ public ByteBuffer getData() {
+ throw new UnsupportedOperationException();
+ }
+}
\ No newline at end of file
Modified: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java?rev=1655107&r1=1655106&r2=1655107&view=diff
==============================================================================
--- hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java (original)
+++ hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java Tue Jan 27 19:06:23 2015
@@ -21,17 +21,12 @@ package org.apache.hadoop.hive.llap.io.a
import java.nio.ByteBuffer;
public abstract class LlapMemoryBuffer {
- protected LlapMemoryBuffer(ByteBuffer byteBuffer, int offset, int length) {
- initialize(byteBuffer, offset, length);
- }
protected LlapMemoryBuffer() {
}
protected void initialize(ByteBuffer byteBuffer, int offset, int length) {
- this.byteBuffer = byteBuffer;
- this.offset = offset;
- this.length = length;
+ this.byteBuffer = byteBuffer.slice();
+ this.byteBuffer.position(offset);
+ this.byteBuffer.limit(offset + length);
}
public ByteBuffer byteBuffer;
- public int offset;
- public int length;
}
\ No newline at end of file
Modified: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java?rev=1655107&r1=1655106&r2=1655107&view=diff
==============================================================================
--- hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java (original)
+++ hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java Tue Jan 27 19:06:23 2015
@@ -18,14 +18,17 @@
package org.apache.hadoop.hive.llap.io.api.cache;
+import java.util.LinkedList;
+
+import org.apache.hadoop.hive.common.DiskRange;
-public interface LowLevelCache {
+public interface LowLevelCache {
/**
* Gets file data for particular offsets. Null entries mean no data.
* @param file File name; MUST be interned.
*/
- LlapMemoryBuffer[] getFileData(String fileName, long[] offsets);
+ void getFileData(String fileName, LinkedList<DiskRange> ranges);
/**
* Puts file data into cache.
@@ -33,7 +36,7 @@ public interface LowLevelCache {
* @return null if all data was put; bitmask indicating which chunks were not put otherwise;
* the replacement chunks from cache are updated directly in the array.
*/
- long[] putFileData(String file, long[] offsets, LlapMemoryBuffer[] chunks);
+ long[] putFileData(String file, DiskRange[] ranges, LlapMemoryBuffer[] chunks);
/**
* Releases the buffer returned by getFileData or allocateMultiple.
Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java?rev=1655107&r1=1655106&r2=1655107&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java Tue Jan 27 19:06:23 2015
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.llap.io.ap
public final class BuddyAllocator implements Allocator {
private static final Log LOG = LogFactory.getLog(BuddyAllocator.class);
+
private final Arena[] arenas;
private AtomicInteger allocatedArenas = new AtomicInteger(0);
@@ -128,12 +129,6 @@ public final class BuddyAllocator implem
return false;
}
- public static LlapCacheableBuffer allocateFake() {
- LlapCacheableBuffer fake = new LlapCacheableBuffer();
- fake.initialize(-1, null, -1, 1);
- return fake;
- }
-
@Override
public void deallocate(LlapMemoryBuffer buffer) {
LlapCacheableBuffer buf = (LlapCacheableBuffer)buffer;
@@ -338,8 +333,8 @@ public final class BuddyAllocator implem
public void deallocate(LlapCacheableBuffer buffer) {
assert data != null;
- int freeListIx = 31 - Integer.numberOfLeadingZeros(buffer.length) - minAllocLog2,
- headerIx = buffer.offset >>> minAllocLog2;
+ int freeListIx = 31 - Integer.numberOfLeadingZeros(buffer.byteBuffer.remaining())
+ - minAllocLog2, headerIx = buffer.byteBuffer.position() >>> minAllocLog2;
while (true) {
FreeList freeList = freeLists[freeListIx];
int bHeaderIx = headerIx ^ (1 << freeListIx);
Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java?rev=1655107&r1=1655106&r2=1655107&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java Tue Jan 27 19:06:23 2015
@@ -43,39 +43,39 @@ public final class LlapCacheableBuffer e
private final AtomicInteger refCount = new AtomicInteger(0);
+ // All kinds of random stuff needed by various parts of the system, beyond the publicly
+ // visible bytes "interface". This is not so pretty since all concerns are mixed here.
+ // But at least we don't waste bunch of memory per every buffer and bunch of virtual calls.
+ /** Allocator uses this to remember which arena to alloc from.
+ * TODO Could wrap ByteBuffer instead? This needs reference anyway. */
public int arenaIndex = -1;
+ /** ORC cache uses this to store compressed length; buffer is cached uncompressed, but
+ * the lookup is on compressed ranges, so we need to know this. */
+ public int declaredLength;
+
+ /** Priority for cache policy (should be pretty universal). */
public double priority;
+ /** Last priority update time for cache policy (should be pretty universal). */
public long lastUpdate = -1;
+ /** Linked list pointers for LRFU/LRU cache policies. Given that each block is in cache
+ * that might be better than external linked list. Or not, since this is not concurrent. */
public LlapCacheableBuffer prev = null, next = null;
+ /** Index in heap for LRFU/LFU cache policies. */
public int indexInHeap = NOT_IN_CACHE;
+ // TODO: Add 4 more bytes of crap here!
+
@VisibleForTesting
int getRefCount() {
return refCount.get();
}
- @Override
- public int hashCode() {
- if (this.byteBuffer == null) return 0;
- return (System.identityHashCode(this.byteBuffer) * 37 + offset) * 37 + length;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) return true;
- if (!(obj instanceof LlapCacheableBuffer)) return false;
- LlapCacheableBuffer other = (LlapCacheableBuffer)obj;
- // We only compare objects, and not contents of the ByteBuffer.
- return byteBuffer == other.byteBuffer
- && this.offset == other.offset && this.length == other.length;
- }
-
int incRef() {
int newRefCount = -1;
while (true) {
int oldRefCount = refCount.get();
if (oldRefCount == EVICTED_REFCOUNT) return -1;
- assert oldRefCount >= 0;
+ assert oldRefCount >= 0 : "oldRefCount is " + oldRefCount + " " + this;
newRefCount = oldRefCount + 1;
if (refCount.compareAndSet(oldRefCount, newRefCount)) break;
}
@@ -95,14 +95,14 @@ public final class LlapCacheableBuffer e
int decRef() {
int newRefCount = refCount.decrementAndGet();
if (newRefCount < 0) {
- throw new AssertionError("Unexpected refCount " + newRefCount);
+ throw new AssertionError("Unexpected refCount " + newRefCount + ": " + this);
}
return newRefCount;
}
@Override
public String toString() {
- return "0x" + Integer.toHexString(hashCode());
+ return "0x" + Integer.toHexString(System.identityHashCode(this));
}
/**
Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java?rev=1655107&r1=1655106&r2=1655107&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java Tue Jan 27 19:06:23 2015
@@ -17,22 +17,32 @@
*/
package org.apache.hadoop.hive.llap.cache;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.common.DiskRange;
import org.apache.hadoop.hive.llap.DebugUtils;
import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.CacheChunk;
import com.google.common.annotations.VisibleForTesting;
public class LowLevelCacheImpl implements LowLevelCache, EvictionListener {
+ private static final Log LOG = LogFactory.getLog(LowLevelCacheImpl.class);
+
private final Allocator allocator;
private AtomicInteger newEvictions = new AtomicInteger(0);
@@ -66,30 +76,81 @@ public class LowLevelCacheImpl implement
}
@Override
- public LlapMemoryBuffer[] getFileData(String fileName, long[] offsets) {
- LlapMemoryBuffer[] result = null;
+ public void getFileData(String fileName, LinkedList<DiskRange> ranges) {
FileCache subCache = cache.get(fileName);
- if (subCache == null || !subCache.incRef()) return result;
+ if (subCache == null || !subCache.incRef()) return;
try {
- for (int i = 0; i < offsets.length; ++i) {
- while (true) { // Overwhelmingly only runs once.
- long offset = offsets[i];
- LlapCacheableBuffer buffer = subCache.cache.get(offset);
- if (buffer == null) break;
- if (lockBuffer(buffer)) {
- if (result == null) {
- result = new LlapCacheableBuffer[offsets.length];
- }
- result[i] = buffer;
- break;
- }
- if (subCache.cache.remove(offset, buffer)) break;
- }
+ ListIterator<DiskRange> dr = ranges.listIterator();
+ while (dr.hasNext()) {
+ getOverlappingRanges(dr, subCache.cache);
}
} finally {
subCache.decRef();
}
- return result;
+ }
+
+ private void getOverlappingRanges(ListIterator<DiskRange> drIter,
+ ConcurrentSkipListMap<Long, LlapCacheableBuffer> cache) {
+ DiskRange currentNotCached = drIter.next();
+ Iterator<Map.Entry<Long, LlapCacheableBuffer>> matches =
+ cache.subMap(currentNotCached.offset, currentNotCached.end).entrySet().iterator();
+ long cacheEnd = -1;
+ while (matches.hasNext()) {
+ assert currentNotCached != null;
+ Map.Entry<Long, LlapCacheableBuffer> e = matches.next();
+ LlapCacheableBuffer buffer = e.getValue();
+ // Lock the buffer, validate it and add to results.
+ if (!lockBuffer(buffer)) {
+ // If we cannot lock, remove this from cache and continue.
+ matches.remove();
+ continue;
+ }
+ LOG.info("TODO# get +1 " + buffer.toString());
+ long cacheOffset = e.getKey();
+ if (cacheEnd > cacheOffset) { // compare with old cacheEnd
+ throw new AssertionError("Cache has overlapping buffers: " + cacheEnd + ") and ["
+ + cacheOffset + ", " + (cacheOffset + buffer.declaredLength) + ")");
+ }
+ cacheEnd = cacheOffset + buffer.declaredLength;
+ CacheChunk currentCached = new CacheChunk(buffer, cacheOffset, cacheEnd);
+ currentNotCached = addCachedBufferToIter(drIter, currentNotCached, currentCached);
+ }
+ }
+
+ private DiskRange addCachedBufferToIter(ListIterator<DiskRange> drIter,
+ DiskRange currentNotCached, CacheChunk currentCached) {
+ if (currentNotCached.offset == currentCached.offset) {
+ if (currentNotCached.end <= currentCached.end) { // we assume it's always "==" now
+ // Replace the entire current DiskRange with new cached range. Java LL is idiotic, so...
+ drIter.remove();
+ drIter.add(currentCached);
+ currentNotCached = null;
+ } else {
+ // Insert the new cache range before the disk range.
+ currentNotCached.offset = currentCached.end;
+ drIter.previous();
+ drIter.add(currentCached);
+ DiskRange dr = drIter.next();
+ assert dr == currentNotCached;
+ }
+ } else {
+ assert currentNotCached.offset < currentCached.offset;
+ long originalEnd = currentNotCached.end;
+ currentNotCached.end = currentCached.offset;
+ drIter.add(currentCached);
+ if (originalEnd <= currentCached.end) { // we assume it's always "==" now
+ // We have reached the end of the range and truncated the last non-cached range.
+ currentNotCached = null;
+ } else {
+ // Insert the new disk range after the cache range. TODO: not strictly necessary yet?
+ currentNotCached = new DiskRange(currentCached.end, originalEnd);
+ drIter.add(currentNotCached);
+ DiskRange dr = drIter.previous();
+ assert dr == currentNotCached;
+ drIter.next();
+ }
+ }
+ return currentNotCached;
}
private boolean lockBuffer(LlapCacheableBuffer buffer) {
@@ -97,18 +158,19 @@ public class LowLevelCacheImpl implement
if (rc == 1) {
cachePolicy.notifyLock(buffer);
}
- return rc >= 0;
+ return rc > 0;
}
@Override
- public long[] putFileData(String fileName, long[] offsets, LlapMemoryBuffer[] buffers) {
+ public long[] putFileData(String fileName, DiskRange[] ranges, LlapMemoryBuffer[] buffers) {
long[] result = null;
- assert buffers.length == offsets.length;
+ assert buffers.length == ranges.length;
FileCache subCache = getOrAddFileSubCache(fileName);
try {
- for (int i = 0; i < offsets.length; ++i) {
+ for (int i = 0; i < ranges.length; ++i) {
LlapCacheableBuffer buffer = (LlapCacheableBuffer)buffers[i];
- long offset = offsets[i];
+ long offset = ranges[i].offset;
+ buffer.declaredLength = ranges[i].getLength();
assert buffer.isLocked();
while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value).
LlapCacheableBuffer oldVal = subCache.cache.putIfAbsent(offset, buffer);
@@ -122,8 +184,14 @@ public class LowLevelCacheImpl implement
+ fileName + "@" + offset + "; old " + oldVal + ", new " + buffer);
}
if (lockBuffer(oldVal)) {
+ // We don't do proper overlap checking because it would cost cycles and we
+ // think it will never happen. We do perform the most basic check here.
+ if (oldVal.declaredLength != buffer.declaredLength) {
+ throw new RuntimeException("Found a block with different length at the same offset: "
+ + oldVal.declaredLength + " vs " + buffer.declaredLength + " @" + offset);
+ }
// We found an old, valid block for this key in the cache.
- releaseBufferInternal(buffer);
+s releaseBufferInternal(buffer);
buffers[i] = oldVal;
if (result == null) {
result = new long[align64(buffers.length) >>> 6];
@@ -196,9 +264,10 @@ public class LowLevelCacheImpl implement
}
}
+ private static final ByteBuffer fakeBuf = ByteBuffer.wrap(new byte[1]);
public static LlapCacheableBuffer allocateFake() {
LlapCacheableBuffer fake = new LlapCacheableBuffer();
- fake.initialize(-1, null, -1, 1);
+ fake.initialize(-1, fakeBuf, 0, 1);
return fake;
}
@@ -210,9 +279,12 @@ public class LowLevelCacheImpl implement
private static class FileCache {
private static final int EVICTED_REFCOUNT = -1, EVICTING_REFCOUNT = -2;
- // TODO: given the specific data, perhaps the nested thing should not be CHM
- private ConcurrentHashMap<Long, LlapCacheableBuffer> cache
- = new ConcurrentHashMap<Long, LlapCacheableBuffer>();
+ // TODO: given the specific data and lookups, perhaps the nested thing should not be a map
+ // In fact, CSLM has slow single-threaded operation, and one file is probably often read
+ // by just one (or few) threads, so a much more simple DS with locking might be better.
+ // Let's use CSLM for now, since it's available.
+ private ConcurrentSkipListMap<Long, LlapCacheableBuffer> cache
+ = new ConcurrentSkipListMap<Long, LlapCacheableBuffer>();
private AtomicInteger refCount = new AtomicInteger(0);
boolean incRef() {
Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java?rev=1655107&r1=1655106&r2=1655107&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java Tue Jan 27 19:06:23 2015
@@ -69,7 +69,7 @@ public class LowLevelFifoCachePolicy ext
LlapCacheableBuffer candidate = iter.next();
if (candidate.invalidate()) {
iter.remove();
- evicted += candidate.length;
+ evicted += candidate.byteBuffer.remaining();
listener.notifyEvicted(candidate);
}
}
Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java?rev=1655107&r1=1655106&r2=1655107&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java Tue Jan 27 19:06:23 2015
@@ -167,7 +167,7 @@ public class LowLevelLrfuCachePolicy ext
// Update the state to removed-from-list, so that parallel notifyUnlock doesn't modify us.
// TODO#: double check this is valid!
nextCandidate.indexInHeap = LlapCacheableBuffer.NOT_IN_CACHE;
- evicted += nextCandidate.length;
+ evicted += nextCandidate.byteBuffer.remaining();
}
if (firstCandidate != nextCandidate) {
if (nextCandidate == null) {
@@ -194,7 +194,7 @@ public class LowLevelLrfuCachePolicy ext
buffer = evictFromHeapUnderLock(time);
}
if (buffer == null) return evicted;
- evicted += buffer.length;
+ evicted += buffer.byteBuffer.remaining();
listener.notifyEvicted(buffer);
}
return evicted;
Modified: hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java?rev=1655107&r1=1655106&r2=1655107&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java (original)
+++ hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestBuddyAllocator.java Tue Jan 27 19:06:23 2015
@@ -179,10 +179,10 @@ public class TestBuddyAllocator {
for (int j = 0; j < allocCount; ++j) {
LlapMemoryBuffer mem = allocs[index][j];
long testValue = testValues[index][j] = rdm.nextLong();
- mem.byteBuffer.putLong(mem.offset, testValue);
- int halfLength = mem.length >> 1;
- if (halfLength + 8 <= mem.length) {
- mem.byteBuffer.putLong(mem.offset + halfLength, testValue);
+ mem.byteBuffer.putLong(0, testValue);
+ int halfLength = mem.byteBuffer.remaining() >> 1;
+ if (halfLength + 8 <= mem.byteBuffer.remaining()) {
+ mem.byteBuffer.putLong(halfLength, testValue);
}
}
}
@@ -204,10 +204,10 @@ public class TestBuddyAllocator {
BuddyAllocator a, LlapMemoryBuffer[] allocs, long[] testValues) {
for (int j = 0; j < allocs.length; ++j) {
LlapCacheableBuffer mem = (LlapCacheableBuffer)allocs[j];
- assertEquals(testValues[j], mem.byteBuffer.getLong(mem.offset));
- int halfLength = mem.length >> 1;
- if (halfLength + 8 <= mem.length) {
- assertEquals(testValues[j], mem.byteBuffer.getLong(mem.offset + halfLength));
+ assertEquals(testValues[j], mem.byteBuffer.getLong(0));
+ int halfLength = mem.byteBuffer.remaining() >> 1;
+ if (halfLength + 8 <= mem.byteBuffer.remaining()) {
+ assertEquals(testValues[j], mem.byteBuffer.getLong(halfLength));
}
a.deallocate(mem);
}
Modified: hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java?rev=1655107&r1=1655106&r2=1655107&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java (original)
+++ hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java Tue Jan 27 19:06:23 2015
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hive.llap.cache;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
@@ -25,16 +27,15 @@ import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicInteger;
-import javax.management.RuntimeErrorException;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.common.DiskRange;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
-import org.junit.Assume;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.CacheChunk;
import org.junit.Test;
+
import static org.junit.Assert.*;
public class TestLowLevelCacheImpl {
@@ -83,22 +84,76 @@ public class TestLowLevelCacheImpl {
String fn1 = "file1".intern(), fn2 = "file2".intern();
LlapMemoryBuffer[] fakes = new LlapMemoryBuffer[] { fb(), fb(), fb(), fb(), fb(), fb() };
verifyRefcount(fakes, 1, 1, 1, 1, 1, 1);
- assertNull(cache.putFileData(fn1, new long[] { 1, 2 }, fbs(fakes, 0, 1)));
- assertNull(cache.putFileData(fn2, new long[] { 1, 2 }, fbs(fakes, 2, 3)));
- assertArrayEquals(fbs(fakes, 0, 1), cache.getFileData(fn1, new long[] { 1, 2 }));
- assertArrayEquals(fbs(fakes, 2, 3), cache.getFileData(fn2, new long[] { 1, 2 }));
- assertArrayEquals(fbs(fakes, 1, -1), cache.getFileData(fn1, new long[] { 2, 3 }));
+ assertNull(cache.putFileData(fn1, drs(1, 2), fbs(fakes, 0, 1)));
+ assertNull(cache.putFileData(fn2, drs(1, 2), fbs(fakes, 2, 3)));
+ verifyCacheGet(cache, fn1, 1, 3, fakes[0], fakes[1]);
+ verifyCacheGet(cache, fn2, 1, 3, fakes[2], fakes[3]);
+ verifyCacheGet(cache, fn1, 2, 4, fakes[1], dr(3, 4));
verifyRefcount(fakes, 2, 3, 2, 2, 1, 1);
LlapMemoryBuffer[] bufsDiff = fbs(fakes, 4, 5);
- long[] mask = cache.putFileData(fn1, new long[] { 3, 1 }, bufsDiff);
+ long[] mask = cache.putFileData(fn1, drs(3, 1), bufsDiff);
assertEquals(1, mask.length);
assertEquals(2, mask[0]); // 2nd bit set - element 2 was already in cache.
assertSame(fakes[0], bufsDiff[1]); // Should have been replaced
verifyRefcount(fakes, 3, 3, 2, 2, 1, 0);
- assertArrayEquals(fbs(fakes, 0, 1, 4), cache.getFileData(fn1, new long[] { 1, 2, 3 }));
+ verifyCacheGet(cache, fn1, 1, 4, fakes[0], fakes[1], fakes[4]);
verifyRefcount(fakes, 4, 4, 2, 2, 2, 0);
}
+ private void verifyCacheGet(LowLevelCacheImpl cache, String fileName, Object... stuff) {
+ LinkedList<DiskRange> input = new LinkedList<DiskRange>();
+ Iterator<DiskRange> iter = null;
+ int intCount = 0, lastInt = -1;
+ int resultCount = stuff.length;
+ for (Object obj : stuff) {
+ if (obj instanceof Integer) {
+ --resultCount;
+ assertTrue(intCount >= 0);
+ if (intCount == 0) {
+ lastInt = (Integer)obj;
+ intCount = 1;
+ } else {
+ input.add(new DiskRange(lastInt, (Integer)obj));
+ intCount = 0;
+ }
+ continue;
+ } else if (intCount >= 0) {
+ assertTrue(intCount == 0);
+ assertFalse(input.isEmpty());
+ intCount = -1;
+ cache.getFileData(fileName, input);
+ assertEquals(resultCount, input.size());
+ iter = input.iterator();
+ }
+ assertTrue(iter.hasNext());
+ DiskRange next = iter.next();
+ if (obj instanceof LlapMemoryBuffer) {
+ assertTrue(next instanceof CacheChunk);
+ assertSame(obj, ((CacheChunk)next).buffer);
+ } else {
+ assertTrue(next.equals(obj));
+ }
+ }
+ }
+
+ @Test
+ public void testMultiMatch() {
+ Configuration conf = createConf();
+ LowLevelCacheImpl cache = new LowLevelCacheImpl(
+ conf, new DummyCachePolicy(10), new DummyAllocator(), -1); // no cleanup thread
+ String fn = "file1".intern();
+ LlapMemoryBuffer[] fakes = new LlapMemoryBuffer[] { fb(), fb() };
+ assertNull(cache.putFileData(fn, new DiskRange[] { dr(2, 4), dr(6, 8) }, fakes));
+ verifyCacheGet(cache, fn, 1, 9, dr(1, 2), fakes[0], dr(4, 6), fakes[1], dr(8, 9));
+ verifyCacheGet(cache, fn, 2, 8, fakes[0], dr(4, 6), fakes[1]);
+ verifyCacheGet(cache, fn, 1, 5, dr(1, 2), fakes[0], dr(4, 5));
+ verifyCacheGet(cache, fn, 1, 3, dr(1, 2), fakes[0]);
+ verifyCacheGet(cache, fn, 3, 4, dr(3, 4)); // We don't expect cache requests from the middle.
+ verifyCacheGet(cache, fn, 3, 7, dr(3, 6), fakes[1]);
+ verifyCacheGet(cache, fn, 0, 2, 4, 6, dr(0, 2), dr(4, 6));
+ verifyCacheGet(cache, fn, 2, 4, 6, 8, fakes[0], fakes[1]);
+ }
+
@Test
public void testStaleValueGet() {
Configuration conf = createConf();
@@ -106,15 +161,15 @@ public class TestLowLevelCacheImpl {
conf, new DummyCachePolicy(10), new DummyAllocator(), -1); // no cleanup thread
String fn1 = "file1".intern(), fn2 = "file2".intern();
LlapMemoryBuffer[] fakes = new LlapMemoryBuffer[] { fb(), fb(), fb() };
- assertNull(cache.putFileData(fn1, new long[] { 1, 2 }, fbs(fakes, 0, 1)));
- assertNull(cache.putFileData(fn2, new long[] { 1 }, fbs(fakes, 2)));
- assertArrayEquals(fbs(fakes, 0, 1), cache.getFileData(fn1, new long[] { 1, 2 }));
- assertArrayEquals(fbs(fakes, 2), cache.getFileData(fn2, new long[] { 1 }));
+ assertNull(cache.putFileData(fn1, drs(1, 2), fbs(fakes, 0, 1)));
+ assertNull(cache.putFileData(fn2, drs(1), fbs(fakes, 2)));
+ verifyCacheGet(cache, fn1, 1, 3, fakes[0], fakes[1]);
+ verifyCacheGet(cache, fn2, 1, 2, fakes[2]);
verifyRefcount(fakes, 2, 2, 2);
evict(cache, fakes[0]);
evict(cache, fakes[2]);
- assertArrayEquals(fbs(fakes, -1, 1), cache.getFileData(fn1, new long[] { 1, 2 }));
- assertNull(cache.getFileData(fn2, new long[] { 1 }));
+ verifyCacheGet(cache, fn1, 1, 3, dr(1, 2), fakes[1]);
+ verifyCacheGet(cache, fn2, 1, 2, dr(1, 2));
verifyRefcount(fakes, -1, 3, -1);
}
@@ -126,15 +181,15 @@ public class TestLowLevelCacheImpl {
String fn1 = "file1".intern(), fn2 = "file2".intern();
LlapMemoryBuffer[] fakes = new LlapMemoryBuffer[] {
fb(), fb(), fb(), fb(), fb(), fb(), fb(), fb(), fb() };
- assertNull(cache.putFileData(fn1, new long[] { 1, 2, 3 }, fbs(fakes, 0, 1, 2)));
- assertNull(cache.putFileData(fn2, new long[] { 1 }, fbs(fakes, 3)));
+ assertNull(cache.putFileData(fn1, drs(1, 2, 3), fbs(fakes, 0, 1, 2)));
+ assertNull(cache.putFileData(fn2, drs(1), fbs(fakes, 3)));
evict(cache, fakes[0]);
evict(cache, fakes[3]);
- long[] mask = cache.putFileData(fn1, new long[] { 1, 2, 3, 4 }, fbs(fakes, 4, 5, 6, 7));
+ long[] mask = cache.putFileData(fn1, drs(1, 2, 3, 4), fbs(fakes, 4, 5, 6, 7));
assertEquals(1, mask.length);
assertEquals(6, mask[0]); // Buffers at offset 2 & 3 exist; 1 exists and is stale; 4 doesn't
- assertNull(cache.putFileData(fn2, new long[] { 1 }, fbs(fakes, 8)));
- assertArrayEquals(fbs(fakes, 4, 2, 3, 7), cache.getFileData(fn1, new long[] { 1, 2, 3, 4 }));
+ assertNull(cache.putFileData(fn2, drs(1), fbs(fakes, 8)));
+ verifyCacheGet(cache, fn1, 1, 5, fakes[4], fakes[1], fakes[2], fakes[7]);
}
@Test
@@ -157,19 +212,23 @@ public class TestLowLevelCacheImpl {
String fileName = isFn1 ? fn1 : fn2;
int fileIndex = isFn1 ? 1 : 2;
int count = rdm.nextInt(offsetsToUse);
- long[] offsets = new long[count];
- for (int j = 0; j < offsets.length; ++j) {
- offsets[j] = rdm.nextInt(offsetsToUse);
+ LinkedList<DiskRange> input = new LinkedList<DiskRange>();
+ int[] offsets = new int[count];
+ for (int j = 0; j < count; ++j) {
+ int next = rdm.nextInt(offsetsToUse);
+ input.add(dr(next, next + 1));
+ offsets[j] = next;
}
if (isGet) {
- LlapMemoryBuffer[] results = cache.getFileData(fileName, offsets);
- if (results == null) continue;
- for (int j = 0; j < offsets.length; ++j) {
- if (results[j] == null) continue;
+ cache.getFileData(fileName, input);
+ int j = -1;
+ for (DiskRange dr : input) {
+ ++j;
+ if (!(dr instanceof CacheChunk)) continue;
++gets;
- LlapCacheableBuffer result = (LlapCacheableBuffer)(results[j]);
+ LlapCacheableBuffer result = (LlapCacheableBuffer)((CacheChunk)dr).buffer;
assertEquals(makeFakeArenaIndex(fileIndex, offsets[j]), result.arenaIndex);
- result.decRef();
+ cache.releaseBuffer(result);
}
} else {
LlapMemoryBuffer[] buffers = new LlapMemoryBuffer[count];
@@ -179,7 +238,8 @@ public class TestLowLevelCacheImpl {
buf.arenaIndex = makeFakeArenaIndex(fileIndex, offsets[j]);
buffers[j] = buf;
}
- long[] mask = cache.putFileData(fileName, offsets, buffers);
+ long[] mask = cache.putFileData(
+ fileName, input.toArray(new DiskRange[count]), buffers);
puts += buffers.length;
long maskVal = 0;
if (mask != null) {
@@ -192,7 +252,7 @@ public class TestLowLevelCacheImpl {
assertEquals(makeFakeArenaIndex(fileIndex, offsets[j]), buf.arenaIndex);
}
maskVal >>= 1;
- buf.decRef();
+ cache.releaseBuffer(buf);
}
}
}
@@ -210,24 +270,25 @@ public class TestLowLevelCacheImpl {
FutureTask<Integer> evictionTask = new FutureTask<Integer>(new Callable<Integer>() {
public Integer call() {
boolean isFirstFile = false;
- long[] offsets = new long[offsetsToUse];
Random rdm = new Random(1234 + Thread.currentThread().getId());
- for (int i = 0; i < offsetsToUse; ++i) {
- offsets[i] = i;
- }
+ LinkedList<DiskRange> input = new LinkedList<DiskRange>();
+ DiskRange allOffsets = new DiskRange(0, offsetsToUse + 1);
int evictions = 0;
syncThreadStart(cdlIn, cdlOut);
while (rdmsDone.get() < 3) {
+ input.clear();
+ input.add(allOffsets);
isFirstFile = !isFirstFile;
String fileName = isFirstFile ? fn1 : fn2;
- LlapMemoryBuffer[] results = cache.getFileData(fileName, offsets);
- if (results == null) continue;
- int startIndex = rdm.nextInt(results.length), index = startIndex;
+ cache.getFileData(fileName, input);
+ DiskRange[] results = input.toArray(new DiskRange[input.size()]);
+ int startIndex = rdm.nextInt(input.size()), index = startIndex;
LlapCacheableBuffer victim = null;
do {
- if (results[index] != null) {
- LlapCacheableBuffer result = (LlapCacheableBuffer)results[index];
- result.decRef();
+ DiskRange r = results[index];
+ if (r instanceof CacheChunk) {
+ LlapCacheableBuffer result = (LlapCacheableBuffer)((CacheChunk)r).buffer;
+ cache.releaseBuffer(result);
if (victim == null && result.invalidate()) {
++evictions;
victim = result;
@@ -305,6 +366,18 @@ public class TestLowLevelCacheImpl {
return fake;
}
+ private DiskRange dr(int from, int to) {
+ return new DiskRange(from, to);
+ }
+
+ private DiskRange[] drs(int... offsets) {
+ DiskRange[] result = new DiskRange[offsets.length];
+ for (int i = 0; i < offsets.length; ++i) {
+ result[i] = new DiskRange(offsets[i], offsets[i] + 1);
+ }
+ return result;
+ }
+
private Configuration createConf() {
Configuration conf = new Configuration();
conf.setInt(ConfVars.LLAP_ORC_CACHE_MIN_ALLOC.varname, 3);
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java?rev=1655107&r1=1655106&r2=1655107&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java Tue Jan 27 19:06:23 2015
@@ -20,28 +20,33 @@ package org.apache.hadoop.hive.ql.io.orc
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.common.DiskRange;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.CacheChunk;
+import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
+import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
-abstract class InStream extends InputStream {
+import com.google.common.annotations.VisibleForTesting;
+abstract class InStream extends InputStream {
private static final Log LOG = LogFactory.getLog(InStream.class);
private static class UncompressedStream extends InStream {
private final String name;
- private final ByteBuffer[] bytes;
- private final long[] offsets;
+ private final List<DiskRange> bytes;
private final long length;
private long currentOffset;
private ByteBuffer range;
private int currentRange;
- public UncompressedStream(String name, ByteBuffer[] input, long[] offsets,
- long length) {
+ public UncompressedStream(String name, List<DiskRange> input, long length) {
this.name = name;
this.bytes = input;
- this.offsets = offsets;
this.length = length;
currentRange = 0;
currentOffset = 0;
@@ -83,12 +88,10 @@ abstract class InStream extends InputStr
@Override
public void close() {
- currentRange = bytes.length;
+ currentRange = bytes.size();
currentOffset = length;
// explicit de-ref of bytes[]
- for(int i = 0; i < bytes.length; i++) {
- bytes[i] = null;
- }
+ bytes.clear();
}
@Override
@@ -97,14 +100,15 @@ abstract class InStream extends InputStr
}
public void seek(long desired) {
- for(int i = 0; i < bytes.length; ++i) {
- if (offsets[i] <= desired &&
- desired - offsets[i] < bytes[i].remaining()) {
+ for(int i = 0; i < bytes.size(); ++i) {
+ DiskRange curRange = bytes.get(i);
+ if (curRange.offset <= desired &&
+ (desired - curRange.offset) < curRange.getLength()) {
currentOffset = desired;
currentRange = i;
- this.range = bytes[i].duplicate();
+ this.range = curRange.getData();
int pos = range.position();
- pos += (int)(desired - offsets[i]); // this is why we duplicate
+ pos += (int)(desired - curRange.offset); // this is why we duplicate
this.range.position(pos);
return;
}
@@ -122,50 +126,66 @@ abstract class InStream extends InputStr
}
private static class CompressedStream extends InStream {
+ private final String fileName;
private final String name;
- private final ByteBuffer[] bytes;
- private final long[] offsets;
+ private final List<DiskRange> bytes;
private final int bufferSize;
private final long length;
+ private LlapMemoryBuffer cacheBuffer;
private ByteBuffer uncompressed;
private final CompressionCodec codec;
private ByteBuffer compressed;
private long currentOffset;
private int currentRange;
private boolean isUncompressedOriginal;
- private boolean isDirect = false;
+ private final LowLevelCache cache;
+ private final boolean doManageBuffers = true;
- public CompressedStream(String name, ByteBuffer[] input,
- long[] offsets, long length,
- CompressionCodec codec, int bufferSize
- ) {
+ public CompressedStream(String fileName, String name, List<DiskRange> input, long length,
+ CompressionCodec codec, int bufferSize, LowLevelCache cache) {
+ this.fileName = fileName;
this.bytes = input;
this.name = name;
this.codec = codec;
this.length = length;
- if(this.length > 0) {
- isDirect = this.bytes[0].isDirect();
- }
- this.offsets = offsets;
this.bufferSize = bufferSize;
currentOffset = 0;
currentRange = 0;
+ this.cache = cache;
}
- // TODO: this should allocate from cache
- private ByteBuffer allocateBuffer(int size) {
+ private ByteBuffer allocateBuffer(int size, boolean isDirect) {
// TODO: use the same pool as the ORC readers
- if(isDirect == true) {
+ if (isDirect) {
return ByteBuffer.allocateDirect(size);
} else {
return ByteBuffer.allocate(size);
}
}
+ // TODO: This should not be used for main path.
+ private final LlapMemoryBuffer[] singleAllocDest = new LlapMemoryBuffer[1];
+ private void allocateForUncompressed(int size, boolean isDirect) {
+ if (cache == null) {
+ cacheBuffer = null;
+ uncompressed = allocateBuffer(size, isDirect);
+ } else {
+ singleAllocDest[0] = null;
+ cache.allocateMultiple(singleAllocDest, size);
+ cacheBuffer = singleAllocDest[0];
+ uncompressed = cacheBuffer.byteBuffer;
+ }
+ }
+
private void readHeader() throws IOException {
if (compressed == null || compressed.remaining() <= 0) {
seek(currentOffset);
}
+ if (cacheBuffer != null) {
+ assert compressed == null;
+ return; // Next block is ready from cache.
+ }
+ long originalOffset = currentOffset;
if (compressed.remaining() > OutStream.HEADER_SIZE) {
int b0 = compressed.get() & 0xff;
int b1 = compressed.get() & 0xff;
@@ -188,14 +208,19 @@ abstract class InStream extends InputStr
isUncompressedOriginal = true;
} else {
if (isUncompressedOriginal) {
- uncompressed = allocateBuffer(bufferSize);
+ allocateForUncompressed(bufferSize, slice.isDirect());
isUncompressedOriginal = false;
} else if (uncompressed == null) {
- uncompressed = allocateBuffer(bufferSize);
+ allocateForUncompressed(bufferSize, slice.isDirect());
} else {
uncompressed.clear();
}
codec.decompress(slice, uncompressed);
+ if (cache != null) {
+ // TODO: this is the inefficient path
+ cache.putFileData(fileName, new DiskRange[] { new DiskRange(originalOffset,
+ chunkLength + OutStream.HEADER_SIZE) }, new LlapMemoryBuffer[] { cacheBuffer });
+ }
}
} else {
throw new IllegalStateException("Can't read header at " + this);
@@ -239,13 +264,20 @@ abstract class InStream extends InputStr
@Override
public void close() {
+ cacheBuffer = null;
uncompressed = null;
compressed = null;
- currentRange = bytes.length;
+ currentRange = bytes.size();
currentOffset = length;
- for(int i = 0; i < bytes.length; i++) {
- bytes[i] = null;
+ if (doManageBuffers) {
+ // TODO: this is the inefficient path for now. LLAP will used this differently.
+ for (DiskRange range : bytes) {
+ if (range instanceof CacheChunk) {
+ cache.releaseBuffer(((CacheChunk)range).buffer);
+ }
+ }
}
+ bytes.clear();
}
@Override
@@ -262,8 +294,8 @@ abstract class InStream extends InputStr
}
}
- /* slices a read only contigous buffer of chunkLength */
- private ByteBuffer slice(int chunkLength) throws IOException {
+ /* slices a read only contiguous buffer of chunkLength */
+ private ByteBuffer slice(int chunkLength) throws IOException {
int len = chunkLength;
final long oldOffset = currentOffset;
ByteBuffer slice;
@@ -274,7 +306,7 @@ abstract class InStream extends InputStr
currentOffset += len;
compressed.position(compressed.position() + len);
return slice;
- } else if (currentRange >= (bytes.length - 1)) {
+ } else if (currentRange >= (bytes.size() - 1)) {
// nothing has been modified yet
throw new IOException("EOF in " + this + " while trying to read " +
chunkLength + " bytes");
@@ -288,16 +320,20 @@ abstract class InStream extends InputStr
// we need to consolidate 2 or more buffers into 1
// first copy out compressed buffers
- ByteBuffer copy = allocateBuffer(chunkLength);
+ ByteBuffer copy = allocateBuffer(chunkLength, compressed.isDirect());
currentOffset += compressed.remaining();
len -= compressed.remaining();
copy.put(compressed);
- while (len > 0 && (++currentRange) < bytes.length) {
+ while (len > 0 && (++currentRange) < bytes.size()) {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Read slow-path, >1 cross block reads with %s", this.toString()));
}
- compressed = bytes[currentRange].duplicate();
+ DiskRange range = bytes.get(currentRange);
+ if (!(range instanceof BufferChunk)) {
+ throw new IOException("Trying to extend compressed block into uncompressed block");
+ }
+ compressed = range.getData().duplicate();
if (compressed.remaining() >= len) {
slice = compressed.slice();
slice.limit(len);
@@ -318,40 +354,61 @@ abstract class InStream extends InputStr
}
private void seek(long desired) throws IOException {
- for(int i = 0; i < bytes.length; ++i) {
- if (offsets[i] <= desired &&
- desired - offsets[i] < bytes[i].remaining()) {
+ for(int i = 0; i < bytes.size(); ++i) {
+ DiskRange range = bytes.get(i);
+ if (range.offset <= desired && desired < range.end) {
currentRange = i;
- compressed = bytes[i].duplicate();
- int pos = compressed.position();
- pos += (int)(desired - offsets[i]);
- compressed.position(pos);
+ if (range instanceof BufferChunk) {
+ cacheBuffer = null;
+ compressed = range.getData().duplicate();
+ int pos = compressed.position();
+ pos += (int)(desired - range.offset);
+ compressed.position(pos);
+ } else {
+ compressed = null;
+ cacheBuffer = ((CacheChunk)range).buffer;
+ uncompressed = cacheBuffer.byteBuffer.duplicate();
+ if (desired != range.offset) {
+ throw new IOException("Cannot seek into the middle of uncompressed cached data");
+ }
+ }
currentOffset = desired;
return;
}
}
// if they are seeking to the precise end, go ahead and let them go there
- int segments = bytes.length;
- if (segments != 0 &&
- desired == offsets[segments - 1] + bytes[segments - 1].remaining()) {
+ int segments = bytes.size();
+ if (segments != 0 && desired == bytes.get(segments - 1).end) {
+ DiskRange range = bytes.get(segments - 1);
currentRange = segments - 1;
- compressed = bytes[currentRange].duplicate();
- compressed.position(compressed.limit());
- currentOffset = desired;
+ if (range instanceof BufferChunk) {
+ cacheBuffer = null;
+ compressed = range.getData().duplicate();
+ compressed.position(compressed.limit());
+ } else {
+ compressed = null;
+ cacheBuffer = ((CacheChunk)range).buffer;
+ uncompressed = cacheBuffer.byteBuffer.duplicate();
+ uncompressed.position(uncompressed.limit());
+ if (desired != range.offset) {
+ throw new IOException("Cannot seek into the middle of uncompressed cached data");
+ }
+ currentOffset = desired;
+ }
return;
}
- throw new IOException("Seek outside of data in " + this + " to " +
- desired);
+ throw new IOException("Seek outside of data in " + this + " to " + desired);
}
private String rangeString() {
StringBuilder builder = new StringBuilder();
- for(int i=0; i < offsets.length; ++i) {
+ for(int i=0; i < bytes.size(); ++i) {
if (i != 0) {
builder.append("; ");
}
- builder.append(" range " + i + " = " + offsets[i] + " to " +
- bytes[i].remaining());
+ DiskRange range = bytes.get(i);
+ builder.append(" range " + i + " = " + range.offset
+ + " to " + (range.end - range.offset));
}
return builder.toString();
}
@@ -382,17 +439,43 @@ abstract class InStream extends InputStr
* @return an input stream
* @throws IOException
*/
+ @VisibleForTesting
+ @Deprecated
public static InStream create(String name,
- ByteBuffer[] input,
+ ByteBuffer[] buffers,
long[] offsets,
long length,
CompressionCodec codec,
int bufferSize) throws IOException {
+ List<DiskRange> input = new ArrayList<DiskRange>(buffers.length);
+ for (int i = 0; i < buffers.length; ++i) {
+ input.add(new BufferChunk(buffers[i], offsets[i]));
+ }
+ return create(null, name, input, length, codec, bufferSize, null);
+ }
+
+ /**
+ * Create an input stream from a list of disk ranges with data.
+ * @param name the name of the stream
+ * @param input the list of ranges of bytes for the stream; from disk or cache
+ * @param length the length in bytes of the stream
+ * @param codec the compression codec
+ * @param bufferSize the compression buffer size
+ * @param cache Low-level cache to use to put data, if any. Only works with compressed streams.
+ * @return an input stream
+ * @throws IOException
+ */
+ public static InStream create(String fileName,
+ String name,
+ List<DiskRange> input,
+ long length,
+ CompressionCodec codec,
+ int bufferSize,
+ LowLevelCache cache) throws IOException {
if (codec == null) {
- return new UncompressedStream(name, input, offsets, length);
+ return new UncompressedStream(name, input, length);
} else {
- return new CompressedStream(name, input, offsets, length, codec,
- bufferSize);
+ return new CompressedStream(fileName, name, input, length, codec, bufferSize, cache);
}
}
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java?rev=1655107&r1=1655106&r2=1655107&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java Tue Jan 27 19:06:23 2015
@@ -33,12 +33,14 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.DiskRange;
import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.orc.OrcProto.UserMetadataItem;
import org.apache.hadoop.hive.ql.util.JavaDataModel;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -463,14 +465,14 @@ public class ReaderImpl implements Reade
int footerBufferSize = footerBuffer.limit() - footerBuffer.position() - metadataSize;
footerBuffer.limit(position + metadataSize);
- InputStream instream = InStream.create("metadata", new ByteBuffer[]{footerBuffer},
- new long[]{0L}, metadataSize, codec, bufferSize);
+ InputStream instream = InStream.create(null, "metadata", Lists.<DiskRange>newArrayList(
+ new BufferChunk(footerBuffer, 0)), metadataSize, codec, bufferSize, null);
this.metadata = OrcProto.Metadata.parseFrom(instream);
footerBuffer.position(position + metadataSize);
footerBuffer.limit(position + metadataSize + footerBufferSize);
- instream = InStream.create("footer", new ByteBuffer[]{footerBuffer},
- new long[]{0L}, footerBufferSize, codec, bufferSize);
+ instream = InStream.create(null, "footer", Lists.<DiskRange>newArrayList(
+ new BufferChunk(footerBuffer, 0)), footerBufferSize, codec, bufferSize, null);
this.footer = OrcProto.Footer.parseFrom(instream);
footerBuffer.position(position);
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1655107&r1=1655106&r2=1655107&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Tue Jan 27 19:06:23 2015
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.io.orc
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_ZEROCOPY;
import java.io.EOFException;
+import java.io.FileDescriptor;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
@@ -29,7 +30,9 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.List;
+import java.util.ListIterator;
import java.util.Map;
import java.util.TreeMap;
@@ -41,10 +44,12 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.DiskRange;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.Consumer;
import org.apache.hadoop.hive.llap.io.api.EncodedColumn;
+import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
@@ -79,12 +84,14 @@ import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text;
import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.Lists;
public class RecordReaderImpl implements RecordReader {
private static final Log LOG = LogFactory.getLog(RecordReaderImpl.class);
private static final boolean isLogTraceEnabled = LOG.isTraceEnabled();
+ private final String fileName;
private final FSDataInputStream file;
private final long firstRow;
protected final List<StripeInformation> stripes =
@@ -102,16 +109,16 @@ public class RecordReaderImpl implements
private long rowCountInStripe = 0;
private final Map<StreamName, InStream> streams =
new HashMap<StreamName, InStream>();
- List<BufferChunk> bufferChunks = new ArrayList<BufferChunk>(0);
+ List<DiskRange> bufferChunks = new ArrayList<DiskRange>(0);
private final TreeReader reader;
private final OrcProto.RowIndex[] indexes;
private final SearchArgument sarg;
// the leaf predicates for the sarg
private final List<PredicateLeaf> sargLeaves;
- // an array the same length as the sargLeaves that map them to column ids
- private final int[] filterColumns;
// an array about which row groups aren't skipped
private boolean[] includedRowGroups = null;
+ // an array the same length as the sargLeaves that map them to column ids
+ private final int[] filterColumns;
private final Configuration conf;
private final ByteBufferAllocatorPool pool = new ByteBufferAllocatorPool();
@@ -245,6 +252,7 @@ public class RecordReaderImpl implements
long strideRate,
Configuration conf
) throws IOException {
+ this.fileName = path.toString().intern(); // should we normalize this, like DFS would?
this.file = fileSystem.open(path);
this.codec = codec;
this.types = types;
@@ -2280,9 +2288,9 @@ public class RecordReaderImpl implements
ByteBuffer tailBuf = ByteBuffer.allocate(tailLength);
file.seek(offset);
file.readFully(tailBuf.array(), tailBuf.arrayOffset(), tailLength);
- return OrcProto.StripeFooter.parseFrom(InStream.create("footer",
- new ByteBuffer[]{tailBuf}, new long[]{0}, tailLength, codec,
- bufferSize));
+ return OrcProto.StripeFooter.parseFrom(InStream.create(null, "footer",
+ Lists.<DiskRange>newArrayList(new BufferChunk(tailBuf, 0)),
+ tailLength, codec, bufferSize, null));
}
static enum Location {
@@ -2633,8 +2641,10 @@ public class RecordReaderImpl implements
}
if(bufferChunks != null) {
if(zcr != null) {
- for (BufferChunk bufChunk : bufferChunks) {
- zcr.releaseBuffer(bufChunk.chunk);
+ for (DiskRange range : bufferChunks) {
+ if (range instanceof BufferChunk) {
+ zcr.releaseBuffer(((BufferChunk)range).chunk);
+ }
}
}
bufferChunks.clear();
@@ -2691,68 +2701,76 @@ public class RecordReaderImpl implements
) throws IOException {
long start = stripe.getIndexLength();
long end = start + stripe.getDataLength();
- // TODO: planning should be added here too, to take cache into account
// explicitly trigger 1 big read
- DiskRange[] ranges = new DiskRange[]{new DiskRange(start, end)};
- bufferChunks = readDiskRanges(file, zcr, stripe.getOffset(), Arrays.asList(ranges));
+ LinkedList<DiskRange> rangesToRead = Lists.newLinkedList();
+ rangesToRead.add(new DiskRange(start, end));
+ if (this.cache != null) {
+ cache.getFileData(fileName, rangesToRead);
+ }
+ readDiskRanges(file, zcr, stripe.getOffset(), rangesToRead);
+ bufferChunks = rangesToRead;
List<OrcProto.Stream> streamDescriptions = stripeFooter.getStreamsList();
- createStreams(streamDescriptions, bufferChunks, null, codec, bufferSize, streams);
+ createStreams(
+ streamDescriptions, bufferChunks, null, codec, bufferSize, streams, cache);
// TODO: decompressed data from streams should be put in cache
}
/**
- * The sections of stripe that we need to read.
+ * The sections of stripe that we have read.
+ * This might not match diskRange - 1 disk range can be multiple buffer chunks, depending on DFS block boundaries.
*/
- static class DiskRange {
- /** the first address we need to read. */
- long offset;
- /** the first address afterwards. */
- long end;
+ public static class BufferChunk extends DiskRange {
+ final ByteBuffer chunk;
- DiskRange(long offset, long end) {
- this.offset = offset;
- this.end = end;
- if (end < offset) {
- throw new IllegalArgumentException("invalid range " + this);
- }
+ BufferChunk(ByteBuffer chunk, long offset) {
+ super(offset, offset + chunk.remaining());
+ this.chunk = chunk;
}
@Override
- public boolean equals(Object other) {
- if (other == null || other.getClass() != getClass()) {
- return false;
- }
- DiskRange otherR = (DiskRange) other;
- return otherR.offset == offset && otherR.end == end;
+ public boolean hasData() {
+ return chunk != null;
}
@Override
- public String toString() {
- return "range start: " + offset + " end: " + end;
+ public final String toString() {
+ return "range start: " + offset + " size: " + chunk.remaining() + " type: "
+ + (chunk.isDirect() ? "direct" : "array-backed");
+ }
+
+ @Override
+ public DiskRange slice(long offset, long end) {
+ assert offset <= end && offset >= this.offset && end <= this.end;
+ ByteBuffer sliceBuf = chunk.slice();
+ int newPos = (int)(offset - this.offset);
+ int newLen = chunk.limit() - chunk.position() - (int)(this.end - end);
+ sliceBuf.position(newPos);
+ sliceBuf.limit(newPos + newLen);
+ return new BufferChunk(sliceBuf, offset);
+ }
+
+ @Override
+ public ByteBuffer getData() {
+ return chunk;
}
}
- /**
- * The sections of stripe that we have read.
- * This might not match diskRange - 1 disk range can be multiple buffer chunks, depending on DFS block boundaries.
- */
- static class BufferChunk {
- final ByteBuffer chunk;
- /** the first address we need to read. */
- final long offset;
- /** end of the buffer **/
- final long end;
+ public static class CacheChunk extends DiskRange {
+ public final LlapMemoryBuffer buffer;
- BufferChunk(ByteBuffer chunk, long offset) {
- this.offset = offset;
- this.chunk = chunk;
- end = offset + chunk.remaining();
+ public CacheChunk(LlapMemoryBuffer buffer, long offset, long end) {
+ super(offset, end);
+ this.buffer = buffer;
}
@Override
- public final String toString() {
- return "range start: " + offset + " size: " + chunk.remaining() + " type: "
- + (chunk.isDirect() ? "direct" : "array-backed");
+ public boolean hasData() {
+ return buffer != null;
+ }
+
+ @Override
+ public ByteBuffer getData() {
+ return buffer.byteBuffer;
}
}
@@ -2860,7 +2878,7 @@ public class RecordReaderImpl implements
* @param compressionSize the compression block size
* @return the list of disk ranges that will be loaded
*/
- static List<DiskRange> planReadPartialDataStreams
+ static LinkedList<DiskRange> planReadPartialDataStreams
(List<OrcProto.Stream> streamList,
OrcProto.RowIndex[] indexes,
boolean[] includedColumns,
@@ -2869,7 +2887,7 @@ public class RecordReaderImpl implements
List<OrcProto.ColumnEncoding> encodings,
List<OrcProto.Type> types,
int compressionSize) {
- List<DiskRange> result = new ArrayList<DiskRange>();
+ LinkedList<DiskRange> result = new LinkedList<DiskRange>();
long offset = 0;
// figure out which columns have a present stream
boolean[] hasNull = new boolean[types.size()];
@@ -2889,6 +2907,7 @@ public class RecordReaderImpl implements
isDictionary(streamKind, encodings.get(column))) {
result.add(new DiskRange(offset, offset + length));
} else {
+ DiskRange lastRange = null;
for(int group=0; group < includedRowGroups.length; ++group) {
if (includedRowGroups[group]) {
int posn = getIndexPosition(encodings.get(column).getKind(),
@@ -2901,7 +2920,6 @@ public class RecordReaderImpl implements
} else {
nextGroupOffset = length;
}
-
// figure out the worst case last location
// if adjacent groups have the same compressed block offset then stretch the slop
@@ -2911,7 +2929,15 @@ public class RecordReaderImpl implements
: WORST_UNCOMPRESSED_SLOP;
long end = (group == includedRowGroups.length - 1) ? length : Math.min(length,
nextGroupOffset + slop);
- result.add(new DiskRange(offset + start, offset + end));
+ start += offset;
+ end += offset;
+ if (lastRange != null && overlap(lastRange.offset, lastRange.end, start, end)) {
+ lastRange.offset = Math.min(lastRange.offset, start);
+ lastRange.end = Math.max(lastRange.end, end);
+ } else {
+ lastRange = new DiskRange(start, end);
+ result.add(lastRange);
+ }
}
}
}
@@ -2951,19 +2977,23 @@ public class RecordReaderImpl implements
* ranges
* @throws IOException
*/
- static List<BufferChunk> readDiskRanges(FSDataInputStream file,
+ static void readDiskRanges(FSDataInputStream file,
ZeroCopyReaderShim zcr,
long base,
- List<DiskRange> ranges) throws IOException {
- ArrayList<BufferChunk> result = new ArrayList<RecordReaderImpl.BufferChunk>(ranges.size());
- for(DiskRange range: ranges) {
+ LinkedList<DiskRange> ranges) throws IOException {
+ ListIterator<DiskRange> rangeIter = ranges.listIterator();
+ while (rangeIter.hasNext()) {
+ DiskRange range = rangeIter.next();
+ if (range.hasData()) continue;
+ rangeIter.remove();
+ rangeIter.previous(); // TODO: is this valid on single-element list?
int len = (int) (range.end - range.offset);
long off = range.offset;
file.seek(base + off);
if(zcr != null) {
while(len > 0) {
ByteBuffer partial = zcr.readBuffer(len, false);
- result.add(new BufferChunk(partial, off));
+ ranges.add(new BufferChunk(partial, off));
int read = partial.remaining();
len -= read;
off += read;
@@ -2971,10 +3001,9 @@ public class RecordReaderImpl implements
} else {
byte[] buffer = new byte[len];
file.readFully(buffer, 0, buffer.length);
- result.add(new BufferChunk(ByteBuffer.wrap(buffer), range.offset));
+ ranges.add(new BufferChunk(ByteBuffer.wrap(buffer), range.offset));
}
}
- return result;
}
/**
@@ -3010,78 +3039,79 @@ public class RecordReaderImpl implements
return buffer.toString();
}
- static void createStreams(List<OrcProto.Stream> streamDescriptions,
- List<BufferChunk> ranges,
+ void createStreams(List<OrcProto.Stream> streamDescriptions,
+ List<DiskRange> ranges,
boolean[] includeColumn,
CompressionCodec codec,
int bufferSize,
- Map<StreamName, InStream> streams
- ) throws IOException {
- long offset = 0;
+ Map<StreamName, InStream> streams,
+ LowLevelCache cache) throws IOException {
+ long streamOffset = 0;
for(OrcProto.Stream streamDesc: streamDescriptions) {
int column = streamDesc.getColumn();
- if ((includeColumn == null || includeColumn[column]) &&
- StreamName.getArea(streamDesc.getKind()) == StreamName.Area.DATA) {
- long length = streamDesc.getLength();
- int first = -1;
- int last = -2;
- for(int i=0; i < ranges.size(); ++i) {
- BufferChunk range = ranges.get(i);
- if (overlap(offset, offset+length, range.offset, range.end)) {
- if (first == -1) {
- first = i;
- }
- last = i;
- }
- }
- ByteBuffer[] buffers = new ByteBuffer[last - first + 1];
- long[] offsets = new long[last - first + 1];
- for(int i=0; i < buffers.length; ++i) {
- BufferChunk range = ranges.get(i + first);
- long start = Math.max(range.offset, offset);
- long end = Math.min(range.end, offset+length);
- buffers[i] = range.chunk.slice();
- assert range.chunk.position() == 0; // otherwise we'll mix up positions
- /*
- * buffers are positioned in-wards if the offset > range.offset
- * offsets[i] == range.offset - offset, except if offset > range.offset
- */
- if(offset > range.offset) {
- buffers[i].position((int)(offset - range.offset));
- buffers[i].limit((int)(end - range.offset));
- offsets[i] = 0;
- } else {
- buffers[i].position(0);
- buffers[i].limit((int)(end - range.offset));
- offsets[i] = (range.offset - offset);
- }
+ if ((includeColumn != null && !includeColumn[column]) ||
+ StreamName.getArea(streamDesc.getKind()) != StreamName.Area.DATA) {
+ streamOffset += streamDesc.getLength();
+ continue;
+ }
+ long streamEnd = streamOffset + streamDesc.getLength();
+ // TODO: This assumes sorted ranges (as do many other parts of ORC code.
+ ArrayList<DiskRange> buffers = new ArrayList<DiskRange>();
+ boolean inRange = false;
+ for (int i = 0; i < ranges.size(); ++i) {
+ DiskRange range = ranges.get(i);
+ boolean isLast = range.end >= streamEnd;
+ if (!inRange) {
+ if (range.end >= streamOffset) continue; // Skip until we are in range.
+ inRange = true;
+ if (range.offset < streamOffset) {
+ // Partial first buffer, add a slice of it.
+ buffers.add(range.slice(Math.max(range.offset, streamOffset),
+ Math.min(streamEnd, range.end)));
+ if (isLast) break; // Partial first buffer is also partial last buffer.
+ continue;
+ }
+ }
+ if (range.end > streamEnd) {
+ // Partial last buffer (may also be the first buffer), add a slice of it.
+ buffers.add(range.slice(range.offset, Math.min(streamEnd, range.end)));
+ break;
}
- StreamName name = new StreamName(column, streamDesc.getKind());
- streams.put(name, InStream.create(name.toString(), buffers, offsets,
- length, codec, bufferSize));
+ buffers.add(range); // Normal buffer.
}
- offset += streamDesc.getLength();
+ StreamName name = new StreamName(column, streamDesc.getKind());
+ streams.put(name, InStream.create(fileName, name.toString(), buffers,
+ streamEnd - streamOffset, codec, bufferSize, cache));
+ streamOffset += streamDesc.getLength();
}
}
+ private LowLevelCache cache = null;
+ public void setCache(LowLevelCache cache) {
+ this.cache = cache;
+ }
+
private void readPartialDataStreams(StripeInformation stripe
) throws IOException {
List<OrcProto.Stream> streamList = stripeFooter.getStreamsList();
- // TODO: planning should take cache into account
- List<DiskRange> chunks =
+ LinkedList<DiskRange> rangesToRead =
planReadPartialDataStreams(streamList,
indexes, included, includedRowGroups, codec != null,
stripeFooter.getColumnsList(), types, bufferSize);
if (LOG.isDebugEnabled()) {
- LOG.debug("chunks = " + stringifyDiskRanges(chunks));
+ LOG.debug("chunks = " + stringifyDiskRanges(rangesToRead));
}
- mergeDiskRanges(chunks);
+ mergeDiskRanges(rangesToRead);
+ if (this.cache != null) {
+ cache.getFileData(fileName, rangesToRead);
+ }
+ readDiskRanges(file, zcr, stripe.getOffset(), rangesToRead);
+ bufferChunks = rangesToRead;
if (LOG.isDebugEnabled()) {
- LOG.debug("merge = " + stringifyDiskRanges(chunks));
+ LOG.debug("merge = " + stringifyDiskRanges(rangesToRead));
}
- bufferChunks = readDiskRanges(file, zcr, stripe.getOffset(), chunks);
- // TODO: decompressed data from streams should be put in cache
- createStreams(streamList, bufferChunks, included, codec, bufferSize, streams);
+
+ createStreams(streamList, bufferChunks, included, codec, bufferSize, streams, cache);
}
@Override
@@ -3272,9 +3302,9 @@ public class RecordReaderImpl implements
byte[] buffer = new byte[(int) stream.getLength()];
file.seek(offset);
file.readFully(buffer);
- indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create("index",
- new ByteBuffer[] {ByteBuffer.wrap(buffer)}, new long[]{0},
- stream.getLength(), codec, bufferSize));
+ indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create(null, "index",
+ Lists.<DiskRange>newArrayList(new BufferChunk(ByteBuffer.wrap(buffer), 0)),
+ stream.getLength(), codec, bufferSize, null));
}
}
offset += stream.getLength();
Modified: hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java?rev=1655107&r1=1655106&r2=1655107&view=diff
==============================================================================
--- hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java (original)
+++ hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java Tue Jan 27 19:06:23 2015
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.hive.common.DiskRange;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.Location;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
@@ -769,18 +770,18 @@ public class TestRecordReaderImpl {
assertTrue(!RecordReaderImpl.overlap(0, 10, 11, 12));
}
- private static List<RecordReaderImpl.DiskRange> diskRanges(Integer... points) {
- List<RecordReaderImpl.DiskRange> result =
- new ArrayList<RecordReaderImpl.DiskRange>();
+ private static List<DiskRange> diskRanges(Integer... points) {
+ List<DiskRange> result =
+ new ArrayList<DiskRange>();
for(int i=0; i < points.length; i += 2) {
- result.add(new RecordReaderImpl.DiskRange(points[i], points[i+1]));
+ result.add(new DiskRange(points[i], points[i+1]));
}
return result;
}
@Test
public void testMergeDiskRanges() throws Exception {
- List<RecordReaderImpl.DiskRange> list = diskRanges();
+ List<DiskRange> list = diskRanges();
RecordReaderImpl.mergeDiskRanges(list);
assertThat(list, is(diskRanges()));
list = diskRanges(100, 200, 300, 400, 500, 600);
@@ -860,7 +861,7 @@ public class TestRecordReaderImpl {
@Test
public void testPartialPlan() throws Exception {
- List<RecordReaderImpl.DiskRange> result;
+ List<DiskRange> result;
// set the streams
List<OrcProto.Stream> streams = new ArrayList<OrcProto.Stream>();
@@ -968,7 +969,7 @@ public class TestRecordReaderImpl {
@Test
public void testPartialPlanCompressed() throws Exception {
- List<RecordReaderImpl.DiskRange> result;
+ List<DiskRange> result;
// set the streams
List<OrcProto.Stream> streams = new ArrayList<OrcProto.Stream>();
@@ -1050,7 +1051,7 @@ public class TestRecordReaderImpl {
@Test
public void testPartialPlanString() throws Exception {
- List<RecordReaderImpl.DiskRange> result;
+ List<DiskRange> result;
// set the streams
List<OrcProto.Stream> streams = new ArrayList<OrcProto.Stream>();