You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/01/10 03:38:18 UTC
svn commit: r1650717 - in /hive/branches/llap:
common/src/java/org/apache/hadoop/hive/conf/
llap-client/src/java/org/apache/hadoop/hive/llap/io/api/
llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/
llap-server/src/java/org/apache/hadoop/h...
Author: sershe
Date: Sat Jan 10 02:38:17 2015
New Revision: 1650717
URL: http://svn.apache.org/r1650717
Log:
Preliminary patch for low-level cache, needs few more touches and LRFU policy would not be thread-safe
Added:
hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java
hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionListener.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelBuddyCache.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
Removed:
hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/Allocator.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/JavaAllocator.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/old/FifoCachePolicy.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/old/LrfuCachePolicy.java
Modified:
hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/VectorReader.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataReader.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/old/BufferPool.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/old/ChunkPool.java
hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/old/TestLrfuCachePolicy.java
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
Modified: hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1650717&r1=1650716&r2=1650717&view=diff
==============================================================================
--- hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Sat Jan 10 02:38:17 2015
@@ -1969,8 +1969,10 @@ public class HiveConf extends Configurat
"Updates tez job execution progress in-place in the terminal."),
LLAP_ENABLED("hive.llap.enabled", true, ""),
- LLAP_CACHE_SIZE("hive.llap.cache.size", 1024L * 1024 * 1024, ""),
- LLAP_BUFFER_SIZE("hive.llap.buffer.size", 16 * 1024 * 1024, ""),
+ LLAP_ORC_CACHE_MIN_ALLOC("hive.llap.cache.orc.minalloc", 128 * 1024, ""),
+ LLAP_ORC_CACHE_MAX_ALLOC("hive.llap.cache.orc.minalloc", 16 * 1024 * 1024, ""),
+ LLAP_ORC_CACHE_ARENA_SIZE("hive.llap.cache.orc.minalloc", 128L * 1024 * 1024, ""),
+ LLAP_ORC_CACHE_MAX_SIZE("hive.llap.cache.orc.minalloc", 1024L * 1024 * 1024, ""),
LLAP_REQUEST_THREAD_COUNT("hive.llap.request.thread.count", 16, ""),
LLAP_USE_LRFU("hive.llap.use.lrfu", true, ""),
LLAP_LRFU_LAMBDA("hive.llap.lrfu.lambda", 0.01f, "")
Modified: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java?rev=1650717&r1=1650716&r2=1650717&view=diff
==============================================================================
--- hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java (original)
+++ hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java Sat Jan 10 02:38:17 2015
@@ -18,10 +18,11 @@
package org.apache.hadoop.hive.llap.io.api;
-import org.apache.hadoop.hive.llap.io.api.cache.Allocator.LlapBuffer;
-
public class EncodedColumn<BatchKey> {
- public EncodedColumn(BatchKey batchKey, int columnIndex, LlapBuffer columnData) {
+ // TODO: temporary class. Will be filled in when reading (ORC) is implemented. Need to balance
+ // generality, and ability to not copy data from underlying low-level cached buffers.
+ public static class ColumnBuffer {}
+ public EncodedColumn(BatchKey batchKey, int columnIndex, ColumnBuffer columnData) {
this.batchKey = batchKey;
this.columnIndex = columnIndex;
this.columnData = columnData;
@@ -29,5 +30,5 @@ public class EncodedColumn<BatchKey> {
public BatchKey batchKey;
public int columnIndex;
- public LlapBuffer columnData;
+ public ColumnBuffer columnData;
}
\ No newline at end of file
Added: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java?rev=1650717&view=auto
==============================================================================
--- hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java (added)
+++ hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java Sat Jan 10 02:38:17 2015
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.io.api.cache;
+
+import java.nio.ByteBuffer;
+
+public abstract class LlapMemoryBuffer {
+ protected LlapMemoryBuffer(ByteBuffer byteBuffer, int offset, int length) {
+ initialize(byteBuffer, offset, length);
+ }
+ public void initialize(ByteBuffer byteBuffer, int offset, int length) {
+ this.byteBuffer = byteBuffer;
+ this.offset = offset;
+ this.length = length;
+ }
+ public ByteBuffer byteBuffer;
+ public int offset;
+ public int length;
+
+}
\ No newline at end of file
Added: hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java?rev=1650717&view=auto
==============================================================================
--- hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java (added)
+++ hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java Sat Jan 10 02:38:17 2015
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.io.api.cache;
+
+
+public interface LowLevelCache {
+
+ /**
+ * Gets file data for particular offsets. Null entries mean no data.
+ */
+ LlapMemoryBuffer[] getFileData(String fileName, long[] offsets);
+
+ /**
+ * Puts file data into cache.
+ * @return null if all data was put; bitmask indicating which chunks were not put otherwise;
+ * the replacement chunks from cache are updated directly in the array.
+ */
+ long[] putFileData(String file, long[] offsets, LlapMemoryBuffer[] chunks);
+
+ /**
+ * Releases the buffer returned by getFileData or allocateMultiple.
+ */
+ void releaseBuffer(LlapMemoryBuffer buffer);
+
+ /**
+ * Allocate dest.length new blocks of size into dest.
+ */
+ void allocateMultiple(LlapMemoryBuffer[] dest, int size);
+}
Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java?rev=1650717&r1=1650716&r2=1650717&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java Sat Jan 10 02:38:17 2015
@@ -18,10 +18,10 @@
package org.apache.hadoop.hive.llap.cache;
-import org.apache.hadoop.hive.llap.io.api.cache.Allocator.LlapBuffer;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumn.ColumnBuffer;
/** Dummy interface for now, might be different. */
public interface Cache<CacheKey> {
- public LlapBuffer cacheOrGet(CacheKey key, LlapBuffer value);
- public LlapBuffer get(CacheKey key);
+ public ColumnBuffer cacheOrGet(CacheKey key, ColumnBuffer value);
+ public ColumnBuffer get(CacheKey key);
}
Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionListener.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionListener.java?rev=1650717&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionListener.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionListener.java Sat Jan 10 02:38:17 2015
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cache;
+
+interface EvictionListener {
+ void notifyEvicted(LlapCacheableBuffer buffer);
+}
Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java?rev=1650717&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapCacheableBuffer.java Sat Jan 10 02:38:17 2015
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cache;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
+import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+
+public final class LlapCacheableBuffer extends LlapMemoryBuffer {
+ public LlapCacheableBuffer(ByteBuffer byteBuffer, int offset, int length) {
+ super(byteBuffer, offset, length);
+ }
+
+ public String toStringForCache() {
+ return "[" + Integer.toHexString(hashCode()) + " " + String.format("%1$.2f", priority) + " "
+ + lastUpdate + " " + (isLocked() ? "!" : ".") + "]";
+ }
+
+ private static final int EVICTED_REFCOUNT = -1;
+ private final AtomicInteger refCount = new AtomicInteger(0);
+
+ // TODO: Fields pertaining to cache policy. Perhaps they should live in separate object.
+ public double priority;
+ public long lastUpdate = -1;
+ public int indexInHeap = -1;
+ public boolean isLockedInHeap; // TODO#: this flag is invalid and not thread safe
+
+ @Override
+ public int hashCode() {
+ if (this.byteBuffer == null) return 0;
+ return (System.identityHashCode(this.byteBuffer) * 37 + offset) * 37 + length;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (!(obj instanceof LlapCacheableBuffer)) return false;
+ LlapCacheableBuffer other = (LlapCacheableBuffer)obj;
+ // We only compare objects, and not contents of the ByteBuffer.
+ return byteBuffer == other.byteBuffer
+ && this.offset == other.offset && this.length == other.length;
+ }
+
+ int lock() {
+ int oldRefCount = -1;
+ while (true) {
+ oldRefCount = refCount.get();
+ if (oldRefCount == EVICTED_REFCOUNT) return -1;
+ assert oldRefCount >= 0;
+ if (refCount.compareAndSet(oldRefCount, oldRefCount + 1)) break;
+ }
+ return oldRefCount;
+ }
+
+ public boolean isLocked() {
+ // Best-effort check. We cannot do a good check against caller thread, since
+ // refCount could still be > 0 if someone else locked. This is used for asserts.
+ return refCount.get() > 0;
+ }
+
+ public boolean isInvalid() {
+ return refCount.get() == EVICTED_REFCOUNT;
+ }
+
+ int unlock() {
+ int newRefCount = refCount.decrementAndGet();
+ if (newRefCount < 0) {
+ throw new AssertionError("Unexpected refCount " + newRefCount);
+ }
+ return newRefCount;
+ }
+
+ @Override
+ public String toString() {
+ return "0x" + Integer.toHexString(hashCode());
+ }
+
+ /**
+ * @return Whether the we can invalidate; false if locked or already evicted.
+ */
+ boolean invalidate() {
+ while (true) {
+ int value = refCount.get();
+ if (value != 0) return false;
+ if (refCount.compareAndSet(value, EVICTED_REFCOUNT)) break;
+ }
+ if (DebugUtils.isTraceLockingEnabled()) {
+ LlapIoImpl.LOG.info("Invalidated " + this + " due to eviction");
+ }
+ return true;
+ }
+}
Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelBuddyCache.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelBuddyCache.java?rev=1650717&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelBuddyCache.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelBuddyCache.java Sat Jan 10 02:38:17 2015
@@ -0,0 +1,528 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.cache;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
+import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
+import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+
+public class LowLevelBuddyCache implements LowLevelCache, EvictionListener {
+ private final ArrayList<arena> arenas;
+ private AtomicInteger newEvictions = new AtomicInteger(0);
+ private final Thread cleanupThread;
+ private final ConcurrentHashMap<String, FileCache> cache =
+ new ConcurrentHashMap<String, FileCache>();
+ private final LowLevelCachePolicy cachePolicy;
+
+ // Config settings
+ private final int minAllocLog2, maxAllocLog2, arenaSizeLog2, maxArenas;
+
+ private final int minAllocation, maxAllocation;
+ private final long maxSize, arenaSize;
+
+ public LowLevelBuddyCache(Configuration conf) {
+ minAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MIN_ALLOC);
+ maxAllocation = HiveConf.getIntVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_ALLOC);
+ arenaSize = HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_ARENA_SIZE);
+ maxSize = HiveConf.getLongVar(conf, ConfVars.LLAP_ORC_CACHE_MAX_SIZE);
+ if (maxSize < arenaSize || arenaSize > maxAllocation || maxAllocation < minAllocation) {
+ throw new AssertionError("Inconsistent sizes of cache, arena and allocations: "
+ + minAllocation + ", " + maxAllocation + ", " + arenaSize + ", " + maxSize);
+ }
+ if ((Integer.bitCount(minAllocation) != 1) || (Integer.bitCount(maxAllocation) != 1)
+ || (Long.bitCount(arenaSize) != 1) || (minAllocation == 1)) {
+ // TODO: technically, arena size is not required to be so; needs to be divisible by maxAlloc
+ throw new AssertionError("Allocation and arena sizes must be powers of two > 1: "
+ + minAllocation + ", " + maxAllocation + ", " + arenaSize);
+ }
+ if ((maxSize % arenaSize) > 0 || (maxSize / arenaSize) > Integer.MAX_VALUE) {
+ throw new AssertionError(
+ "Cache size not consistent with arena size: " + arenaSize + "," + maxSize);
+ }
+ minAllocLog2 = 31 - Integer.numberOfLeadingZeros(minAllocation);
+ maxAllocLog2 = 31 - Integer.numberOfLeadingZeros(maxAllocation);
+ arenaSizeLog2 = 31 - Long.numberOfLeadingZeros(arenaSize);
+ maxArenas = (int)(maxSize / arenaSize);
+ arenas = new ArrayList<arena>(maxArenas);
+ for (int i = 0; i < maxArenas; ++i) {
+ arenas.add(new arena());
+ }
+ arenas.get(0).init();
+ cachePolicy = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_USE_LRFU)
+ ? new LowLevelLrfuCachePolicy(conf, minAllocation, maxSize, this)
+ : new LowLevelFifoCachePolicy(minAllocation, maxSize, this);
+ cleanupThread = new CleanupThread();
+ cleanupThread.start();
+ }
+
+ // TODO: would it make sense to return buffers asynchronously?
+ @Override
+ public void allocateMultiple(LlapMemoryBuffer[] dest, int size) {
+ assert size > 0;
+ int freeListIndex = 31 - Integer.numberOfLeadingZeros(size);
+ if (size != (1 << freeListIndex)) ++freeListIndex; // not a power of two, add one more
+ freeListIndex = Math.max(freeListIndex - minAllocLog2, 0);
+ int allocationSize = 1 << (freeListIndex + minAllocLog2);
+ int total = dest.length * allocationSize;
+ cachePolicy.reserveMemory(total);
+
+ int ix = 0;
+ for (int i = 0; i < dest.length; ++i) {
+ if (dest[i] != null) continue;
+ dest[i] = new LlapCacheableBuffer(null, -1, -1); // TODO: pool of objects?
+ }
+ // TODO: instead of waiting, loop only ones we haven't tried w/tryLock?
+ for (arena block : arenas) {
+ int newIx = allocateFast(block, freeListIndex, dest, ix, allocationSize);
+ if (newIx == -1) break;
+ if (newIx == dest.length) return;
+ ix = newIx;
+ }
+ // Then try to split bigger blocks.
+ for (arena block : arenas) {
+ int newIx = allocateWithSplit(block, freeListIndex, dest, ix, allocationSize);
+ if (newIx == -1) break;
+ if (newIx == dest.length) return;
+ ix = newIx;
+ }
+ // Then try to allocate memory if we haven't allocated all the way to maxSize yet; very rare.
+ for (arena block : arenas) {
+ ix = allocateWithExpand(block, freeListIndex, dest, ix, allocationSize);
+ if (ix == dest.length) return;
+ }
+ }
+
+ private int allocateFast(arena block,
+ int freeListIndex, LlapMemoryBuffer[] dest, int ix, int size) {
+ if (block.data == null) return -1; // not allocated yet
+ FreeList freeList = block.freeLists[freeListIndex];
+ freeList.lock.lock();
+ try {
+ ix = allocateFromFreeListUnderLock(block, freeList, freeListIndex, dest, ix, size);
+ } finally {
+ freeList.lock.unlock();
+ }
+ return ix;
+ }
+
+ private int allocateWithSplit(
+ arena arena, int freeListIndex, LlapMemoryBuffer[] dest, int ix, int allocationSize) {
+ if (arena.data == null) return -1; // not allocated yet
+ FreeList freeList = arena.freeLists[freeListIndex];
+ int remaining = -1;
+ freeList.lock.lock();
+ try {
+ ix = allocateFromFreeListUnderLock(arena, freeList, freeListIndex, dest, ix, allocationSize);
+ remaining = dest.length - ix;
+ if (remaining == 0) return ix;
+ } finally {
+ freeList.lock.unlock();
+ }
+ int splitListIndex = freeListIndex;
+ byte headerData = (byte)((freeListIndex << 1) | 1);
+ while (remaining > 0) {
+ ++splitListIndex;
+ int splitWays = 1 << (splitListIndex - freeListIndex);
+ int headerStep = 1 << splitListIndex;
+ int lastSplitBlocksRemaining = -1, lastSplitNextHeader = -1;
+ FreeList splitList = arena.freeLists[splitListIndex];
+ splitList.lock.lock();
+ try {
+ int headerIx = splitList.listHead;
+ while (headerIx >= 0 && remaining > 0) {
+ int origOffset = offsetFromHeaderIndex(headerIx), offset = origOffset;
+ int toTake = Math.min(splitWays, remaining);
+ remaining -= toTake;
+ lastSplitBlocksRemaining = splitWays - toTake;
+ for (; toTake > 0; ++ix, --toTake, headerIx += headerStep, offset += allocationSize) {
+ arena.headers[headerIx] = headerData;
+ dest[ix].initialize(arena.data, offset, allocationSize);
+ }
+ lastSplitNextHeader = headerIx;
+ headerIx = arena.data.getInt(origOffset + 4);
+ arena.data.putLong(origOffset, -1); // overwrite list pointers for safety
+ }
+ splitList.listHead = headerIx;
+ } finally {
+ splitList.lock.unlock();
+ }
+ if (remaining == 0) {
+ // We have just obtained all we needed by splitting at lastSplitBlockOffset; now
+ // we need to put the space remaining from that block into lower free lists.
+ // TODO: if we could return blocks asynchronously, we could do this
+ int newListIndex = freeListIndex;
+ while (lastSplitBlocksRemaining > 0) {
+ if ((lastSplitBlocksRemaining & 1) == 1) {
+ arena.headers[lastSplitNextHeader] = (byte)((newListIndex << 1) | 1);
+ int offset = offsetFromHeaderIndex(lastSplitNextHeader);
+ FreeList newFreeList = arena.freeLists[newListIndex];
+ newFreeList.lock.lock();
+ try {
+ arena.data.putInt(offset, -1);
+ arena.data.putInt(offset, newFreeList.listHead);
+ newFreeList.listHead = lastSplitNextHeader;
+ } finally {
+ newFreeList.lock.unlock();
+ }
+ lastSplitNextHeader += (1 << newListIndex);
+ }
+ lastSplitBlocksRemaining >>>= 1;
+ ++newListIndex;
+ continue;
+ }
+ }
+ }
+ return ix;
+ }
+
+ public int offsetFromHeaderIndex(int lastSplitNextHeader) {
+ return lastSplitNextHeader << minAllocLog2;
+ }
+
+ public int allocateFromFreeListUnderLock(arena block, FreeList freeList,
+ int freeListIndex, LlapMemoryBuffer[] dest, int ix, int size) {
+ int current = freeList.listHead;
+ while (current >= 0 && ix < dest.length) {
+ int offset = offsetFromHeaderIndex(current);
+ block.headers[current] = (byte)((freeListIndex << 1) | 1);
+ current = block.data.getInt(offset + 4);
+ dest[ix].initialize(block.data, offset, size);
+ block.data.putLong(offset, -1); // overwrite list pointers for safety
+ ++ix;
+ }
+ freeList.listHead = current;
+ return ix;
+ }
+
+ private int allocateWithExpand(
+ arena block, int freeListIndex, LlapMemoryBuffer[] dest, int ix, int size) {
+ if (block.data != null) return ix; // already allocated
+ synchronized (block) {
+ // Never goes from non-null to null, so this is the only place we need sync.
+ if (block.data == null) {
+ block.init();
+ }
+ }
+ return allocateWithSplit(block, freeListIndex, dest, ix, size);
+ }
+
+ @Override
+ public LlapMemoryBuffer[] getFileData(String fileName, long[] offsets) {
+ LlapMemoryBuffer[] result = null;
+ // TODO: string must be internalized
+ FileCache subCache = cache.get(fileName);
+ if (subCache == null || !subCache.incRef()) return result;
+ try {
+ for (int i = 0; i < offsets.length; ++i) {
+ while (true) { // Overwhelmingly only runs once.
+ long offset = offsets[i];
+ LlapCacheableBuffer buffer = subCache.cache.get(offset);
+ if (buffer == null) break;
+ if (lockBuffer(buffer)) {
+ if (result == null) {
+ result = new LlapCacheableBuffer[offsets.length];
+ }
+ result[i] = buffer;
+ break;
+ }
+ if (subCache.cache.remove(offset, buffer)) break;
+ }
+ }
+ } finally {
+ subCache.decRef();
+ }
+ return result;
+ }
+
+ private boolean lockBuffer(LlapCacheableBuffer buffer) {
+ int rc = buffer.lock();
+ if (rc == 0) {
+ cachePolicy.notifyLock(buffer);
+ }
+ return rc >= 0;
+ }
+
+ @Override
+ public long[] putFileData(String fileName, long[] offsets, LlapMemoryBuffer[] buffers) {
+ long[] result = null;
+ assert buffers.length == offsets.length;
+ // TODO: string must be internalized
+ FileCache subCache = getOrAddFileSubCache(fileName);
+ try {
+ for (int i = 0; i < offsets.length; ++i) {
+ LlapCacheableBuffer buffer = (LlapCacheableBuffer)buffers[i];
+ long offset = offsets[i];
+ assert buffer.isLocked();
+ while (true) { // Overwhelmingly executes once, or maybe twice (replacing stale value).
+ LlapCacheableBuffer oldVal = subCache.cache.putIfAbsent(offset, buffer);
+ if (oldVal == null) break; // Cached successfully.
+ if (DebugUtils.isTraceCachingEnabled()) {
+ LlapIoImpl.LOG.info("Trying to cache when the chunk is already cached for "
+ + fileName + "@" + offset + "; old " + oldVal + ", new " + buffer);
+ }
+ if (lockBuffer(oldVal)) {
+ // We found an old, valid block for this key in the cache.
+ releaseBufferInternal(buffer);
+ buffers[i] = oldVal;
+ if (result == null) {
+ result = new long[align64(buffers.length) >>> 6];
+ }
+ result[i >>> 6] |= (1 << (i & 63)); // indicate that we've replaced the value
+ break;
+ }
+ // We found some old value but couldn't lock it; remove it.
+ subCache.cache.remove(offset, oldVal);
+ }
+ }
+ } finally {
+ subCache.decRef();
+ }
+ return result;
+ }
+
+ /**
+ * All this mess is necessary because we want to be able to remove sub-caches for fully
+ * evicted files. It may actually be better to have non-nested map with object keys?
+ */
+ public FileCache getOrAddFileSubCache(String fileName) {
+ FileCache newSubCache = null;
+ while (true) { // Overwhelmingly executes once.
+ FileCache subCache = cache.get(fileName);
+ if (subCache != null) {
+ if (subCache.incRef()) return subCache; // Main path - found it, incRef-ed it.
+ if (newSubCache == null) {
+ newSubCache = new FileCache();
+ newSubCache.incRef();
+ }
+ // Found a stale value we cannot incRef; try to replace it with new value.
+ if (cache.replace(fileName, subCache, newSubCache)) return newSubCache;
+ continue; // Someone else replaced/removed a stale value, try again.
+ }
+ // No value found.
+ if (newSubCache == null) {
+ newSubCache = new FileCache();
+ newSubCache.incRef();
+ }
+ FileCache oldSubCache = cache.putIfAbsent(fileName, newSubCache);
+ if (oldSubCache == null) return newSubCache; // Main path 2 - created a new file cache.
+ if (oldSubCache.incRef()) return oldSubCache; // Someone created one in parallel.
+ // Someone created one in parallel and then it went stale.
+ if (cache.replace(fileName, oldSubCache, newSubCache)) return newSubCache;
+ // Someone else replaced/removed a parallel-added stale value, try again. Max confusion.
+ }
+ }
+
+ private static int align64(int number) {
+ return ((number + 63) & ~63);
+ }
+
+
+ @Override
+ public void releaseBuffer(LlapMemoryBuffer buffer) {
+ releaseBufferInternal((LlapCacheableBuffer)buffer);
+ }
+
+ public void releaseBufferInternal(LlapCacheableBuffer buffer) {
+ if (buffer.unlock() == 0) {
+ cachePolicy.notifyUnlock(buffer);
+ unblockEviction();
+ }
+ }
+
+ public static LlapCacheableBuffer allocateFake() {
+ return new LlapCacheableBuffer(null, -1, -1);
+ }
+
+ public void unblockEviction() {
+ newEvictions.incrementAndGet();
+ }
+
+ @Override
+ public void notifyEvicted(LlapCacheableBuffer buffer) {
+
+ }
+
+ private final class CleanupThread extends Thread {
+ private int APPROX_CLEANUP_INTERVAL_SEC = 600;
+
+ public CleanupThread() {
+ super("Llap ChunkPool cleanup thread");
+ setDaemon(true);
+ setPriority(1);
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ doOneCleanupRound();
+ } catch (InterruptedException ex) {
+ LlapIoImpl.LOG.warn("Cleanup thread has been interrupted");
+ Thread.currentThread().interrupt();
+ break;
+ } catch (Throwable t) {
+ LlapIoImpl.LOG.error("Cleanup has failed; the thread will now exit", t);
+ break;
+ }
+ }
+ }
+
+ private void doOneCleanupRound() throws InterruptedException {
+ while (true) {
+ int evictionsSinceLast = newEvictions.getAndSet(0);
+ if (evictionsSinceLast > 0) break;
+ synchronized (newEvictions) {
+ newEvictions.wait(10000);
+ }
+ }
+ // Duration is an estimate; if the size of the map changes, it can be very different.
+ long endTime = System.nanoTime() + APPROX_CLEANUP_INTERVAL_SEC * 1000000000L;
+ int leftToCheck = 0; // approximate
+ for (FileCache fc : cache.values()) {
+ leftToCheck += fc.cache.size();
+ }
+ // TODO: if these super-long-lived iterator affects the map in some bad way,
+ // we'd need to sleep once per round instead.
+ // Iterate thru all the filecaches. This is best-effort.
+ Iterator<Map.Entry<String, FileCache>> iter = cache.entrySet().iterator();
+ while (iter.hasNext()) {
+ FileCache fc = iter.next().getValue();
+ if (!fc.incRef()) {
+ throw new AssertionError("Something other than cleanup is removing elements from map");
+ }
+ // Iterate thru the file cache. This is best-effort.
+ Iterator<Map.Entry<Long, LlapCacheableBuffer>> subIter = fc.cache.entrySet().iterator();
+ boolean isEmpty = true;
+ while (subIter.hasNext()) {
+ Thread.sleep((leftToCheck <= 0)
+ ? 1 : (endTime - System.nanoTime()) / (1000000L * leftToCheck));
+ if (subIter.next().getValue().isInvalid()) {
+ subIter.remove();
+ } else {
+ isEmpty = false;
+ }
+ --leftToCheck;
+ }
+ if (!isEmpty) {
+ fc.decRef();
+ continue;
+ }
+ // FileCache might be empty; see if we can remove it. "tryWriteLock"
+ if (!fc.startEvicting()) continue;
+ if (fc.cache.isEmpty()) {
+ fc.commitEvicting();
+ iter.remove();
+ } else {
+ fc.abortEvicting();
+ }
+ }
+ }
+ }
+
+ private class arena {
+ void init() {
+ data = ByteBuffer.allocateDirect(maxAllocation);
+ int maxMinAllocs = 1 << (arenaSizeLog2 - minAllocLog2);
+ headers = new byte[maxMinAllocs];
+ int allocLog2Diff = maxAllocLog2 - minAllocLog2;
+ freeLists = new FreeList[allocLog2Diff];
+ for (int i = 0; i < maxAllocLog2; ++i) {
+ freeLists[i] = new FreeList();
+ }
+ int maxMaxAllocs = 1 << (arenaSizeLog2 - maxAllocLog2),
+ headerIndex = 0, headerIncrement = 1 << allocLog2Diff;
+ freeLists[maxAllocLog2 - 1].listHead = 0;
+ for (int i = 0, offset = 0; i < maxMaxAllocs; ++i, offset += maxAllocation) {
+ // TODO: will this cause bugs on large numbers due to some Java sign bit stupidity?
+ headers[headerIndex] = (byte)(allocLog2Diff << 1); // Maximum allocation size
+ data.putInt(offset, (i == 0) ? -1 : (headerIndex - headerIncrement));
+ data.putInt(offset + 4, (i == maxMaxAllocs - 1) ? -1 : (headerIndex + headerIncrement));
+ headerIndex += headerIncrement;
+ }
+ }
+ ByteBuffer data;
+ // Avoid storing headers with data since we expect binary size allocations.
+ // Each headers[i] is a "virtual" byte at i * minAllocation.
+ byte[] headers;
+ FreeList[] freeLists;
+ }
+
+ private static class FreeList {
+ ReentrantLock lock = new ReentrantLock(false);
+ int listHead = -1; // Index of where the buffer is; in minAllocation units
+ // TODO: One possible improvement - store blocks arriving left over from splits, and
+ // blocks requested, to be able to wait for pending splits and reduce fragmentation.
+ // However, we are trying to increase fragmentation now, since we cater to single-size.
+ }
+
+ // TODO##: separate the classes?
+ private static class FileCache {
+ private static final int EVICTED_REFCOUNT = -1, EVICTING_REFCOUNT = -2;
+ // TODO: given the specific data, perhaps the nested thing should not be CHM
+ private ConcurrentHashMap<Long, LlapCacheableBuffer> cache
+ = new ConcurrentHashMap<Long, LlapCacheableBuffer>();
+ private AtomicInteger refCount = new AtomicInteger(0);
+
+ boolean incRef() {
+ while (true) {
+ int value = refCount.get();
+ if (value == EVICTED_REFCOUNT) return false;
+ if (value == EVICTING_REFCOUNT) continue; // spin until it resolves
+ assert value >= 0;
+ if (refCount.compareAndSet(value, value + 1)) return true;
+ }
+ }
+
+ void decRef() {
+ int value = refCount.decrementAndGet();
+ if (value < 0) {
+ throw new AssertionError("Unexpected refCount " + value);
+ }
+ }
+
+ boolean startEvicting() {
+ while (true) {
+ int value = refCount.get();
+ if (value != 1) return false;
+ if (refCount.compareAndSet(value, EVICTING_REFCOUNT)) return true;
+ }
+ }
+
+ void commitEvicting() {
+ boolean result = refCount.compareAndSet(EVICTING_REFCOUNT, EVICTED_REFCOUNT);
+ assert result;
+ }
+
+ void abortEvicting() {
+ boolean result = refCount.compareAndSet(EVICTING_REFCOUNT, 0);
+ assert result;
+ }
+ }
+}
Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java?rev=1650717&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java Sat Jan 10 02:38:17 2015
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cache;
+
+public interface LowLevelCachePolicy {
+ void cache(LlapCacheableBuffer buffer);
+ void notifyLock(LlapCacheableBuffer buffer);
+ void notifyUnlock(LlapCacheableBuffer buffer);
+ void reserveMemory(long total);
+}
Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java?rev=1650717&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicyBase.java Sat Jan 10 02:38:17 2015
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cache;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public abstract class LowLevelCachePolicyBase implements LowLevelCachePolicy {
+ private final AtomicLong usedMemory;
+ private final long maxSize;
+ private EvictionListener evictionListener;
+
+ public LowLevelCachePolicyBase(long maxSize, EvictionListener listener) {
+ this.maxSize = maxSize;
+ this.usedMemory = new AtomicLong(0);
+ this.evictionListener = listener;
+ }
+
+ @Override
+ public void reserveMemory(long memoryToReserve) {
+ // TODO: if this cannot evict enough, it will spin infinitely. Terminate at some point?
+ while (memoryToReserve > 0) {
+ long usedMem = usedMemory.get(), newUsedMem = usedMem + memoryToReserve;
+ if (newUsedMem <= maxSize) {
+ if (usedMemory.compareAndSet(usedMem, newUsedMem)) break;
+ continue;
+ }
+ // TODO: for one-block case, we could move notification for the last block out of the loop.
+ long evicted = evictSomeBlocks(memoryToReserve, evictionListener);
+ // Adjust the memory - we have to account for what we have just evicted.
+ while (true) {
+ long reserveWithEviction = Math.min(memoryToReserve, maxSize - usedMem + evicted);
+ if (usedMemory.compareAndSet(usedMem, usedMem + reserveWithEviction)) {
+ memoryToReserve -= reserveWithEviction;
+ break;
+ }
+ usedMem = usedMemory.get();
+ }
+ }
+ }
+
+ protected abstract long evictSomeBlocks(long memoryToReserve, EvictionListener listener);
+}
Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java?rev=1650717&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java Sat Jan 10 02:38:17 2015
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cache;
+
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class LowLevelFifoCachePolicy extends LowLevelCachePolicyBase {
+ private final Lock lock = new ReentrantLock();
+ private final LinkedHashSet<LlapCacheableBuffer> buffers;
+
+ public LowLevelFifoCachePolicy(
+ int expectedBufferSize, long maxCacheSize, EvictionListener listener) {
+ super(maxCacheSize, listener);
+ int expectedBuffers = (int)Math.ceil((maxCacheSize * 1.0) / expectedBufferSize);
+ buffers = new LinkedHashSet<LlapCacheableBuffer>((int)(expectedBuffers / 0.75f));
+ }
+
+ @Override
+ public void cache(LlapCacheableBuffer buffer) {
+ lock.lock();
+ try {
+ buffers.add(buffer);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void notifyLock(LlapCacheableBuffer buffer) {
+ // FIFO policy doesn't care.
+ }
+
+ @Override
+ public void notifyUnlock(LlapCacheableBuffer buffer) {
+ // FIFO policy doesn't care.
+ }
+
+ @Override
+ protected long evictSomeBlocks(long memoryToReserve, EvictionListener listener) {
+ long evicted = 0;
+ lock.lock();
+ try {
+ Iterator<LlapCacheableBuffer> iter = buffers.iterator();
+ while (evicted < memoryToReserve && iter.hasNext()) {
+ LlapCacheableBuffer candidate = iter.next();
+ if (candidate.invalidate()) {
+ iter.remove();
+ evicted += candidate.length;
+ listener.notifyEvicted(candidate);
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ return evicted;
+ }
+}
Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java?rev=1650717&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java (added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java Sat Jan 10 02:38:17 2015
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cache;
+
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Implementation of the "simple" algorithm from "On the Existence of a Spectrum of Policies
+ * that Subsumes the Least Recently Used (LRU) and Least Frequently Used (LFU) Policies".
+ * TODO: fix this, no longer true; with ORC as is, 4k buffers per gig of cache
+ * We expect the number of buffers to be relatively small (1000s), so we just use one heap.
+ **/
+public class LowLevelLrfuCachePolicy extends LowLevelCachePolicyBase {
+ private final double lambda;
+ private final double f(long x) {
+ return Math.pow(0.5, lambda * x);
+ }
+ private static final double F0 = 1; // f(0) is always 1
+ private final double touchPriority(long time, long lastAccess, double previous) {
+ return F0 + f(time - lastAccess) * previous;
+ }
+ private final double expirePriority(long time, long lastAccess, double previous) {
+ return f(time - lastAccess) * previous;
+ }
+
+ private final AtomicLong timer = new AtomicLong(0);
+ /**
+ * The heap. Currently synchronized on itself; there is a number of papers out there
+ * with various lock-free/efficient priority queues which we can use if needed.
+ */
+ private final LlapCacheableBuffer[] heap;
+ /** Number of elements. */
+ private int heapSize = 0;
+
+ public LowLevelLrfuCachePolicy(Configuration conf,
+ long minBufferSize, long maxCacheSize, EvictionListener listener) {
+ super(maxCacheSize, listener);
+ heap = new LlapCacheableBuffer[(int)Math.ceil((maxCacheSize * 1.0) / minBufferSize)];
+ lambda = HiveConf.getFloatVar(conf, HiveConf.ConfVars.LLAP_LRFU_LAMBDA);
+ }
+
+ @Override
+ public void cache(LlapCacheableBuffer buffer) {
+ buffer.lastUpdate = timer.incrementAndGet();
+ buffer.priority = F0;
+ assert buffer.isLocked();
+ buffer.isLockedInHeap = true;
+ synchronized (heap) {
+ // Ensured by reserveMemory.
+ assert heapSize < heap.length : heap.length + " >= " + heapSize;
+ buffer.indexInHeap = heapSize;
+ heapifyUpUnderLock(buffer, buffer.lastUpdate);
+ if (DebugUtils.isTraceEnabled()) {
+ LlapIoImpl.LOG.info(buffer + " inserted at " + buffer.lastUpdate);
+ }
+ ++heapSize;
+ }
+ }
+
+ @Override
+ public void notifyLock(LlapCacheableBuffer buffer) {
+ long time = timer.get();
+ synchronized (heap) {
+ buffer.isLockedInHeap = true;
+ heapifyDownUnderLock(buffer, time);
+ }
+ }
+
+ @Override
+ public void notifyUnlock(LlapCacheableBuffer buffer) {
+ long time = timer.incrementAndGet();
+ synchronized (heap) {
+ if (DebugUtils.isTraceCachingEnabled()) {
+ LlapIoImpl.LOG.info("Touching " + buffer + " at " + time);
+ }
+ buffer.priority = touchPriority(time, buffer.lastUpdate, buffer.priority);
+ buffer.lastUpdate = time;
+ buffer.isLockedInHeap = false;
+ // Buffer's priority just decreased from boosted lock priority, so move up.
+ heapifyUpUnderLock(buffer, time);
+ }
+ }
+
+ private LlapCacheableBuffer evictFromHeapUnderLock(long time) {
+ if (heapSize == 0) return null;
+ LlapCacheableBuffer result = heap[0];
+ if (!result.invalidate()) {
+ // We boost the priority of locked buffers to a very large value;
+ // this means entire heap is locked. TODO: need to work around that for small pools?
+ if (DebugUtils.isTraceCachingEnabled()) {
+ LlapIoImpl.LOG.info("Failed to invalidate head " + result.toString() + "; size = " + heapSize);
+ }
+ return null;
+ }
+ if (DebugUtils.isTraceCachingEnabled()) {
+ LlapIoImpl.LOG.info("Evicting " + result + " at " + time);
+ }
+ result.indexInHeap = -1;
+ --heapSize;
+ LlapCacheableBuffer newRoot = heap[heapSize];
+ newRoot.indexInHeap = 0;
+ if (newRoot.lastUpdate != time && !newRoot.isLockedInHeap) {
+ newRoot.priority = expirePriority(time, newRoot.lastUpdate, newRoot.priority);
+ newRoot.lastUpdate = time;
+ }
+ heapifyDownUnderLock(newRoot, time);
+ return result;
+ }
+
+ private void heapifyDownUnderLock(LlapCacheableBuffer buffer, long time) {
+ // Relative positions of the blocks don't change over time; priorities we expire can only
+ // decrease; we only have one block that could have broken heap rule and we always move it
+ // down; therefore, we can update priorities of other blocks as we go for part of the heap -
+ // we correct any discrepancy w/the parent after expiring priority, and any block we expire
+ // the priority for already has lower priority than that of its children.
+ // TODO: avoid expiring priorities if times are close? might be needlessly expensive.
+ int ix = buffer.indexInHeap;
+ double priority = buffer.isLockedInHeap ? Double.MAX_VALUE : buffer.priority;
+ while (true) {
+ int leftIx = (ix << 1) + 1, rightIx = leftIx + 1;
+ if (leftIx >= heapSize) break; // Buffer is at the leaf node.
+ LlapCacheableBuffer left = heap[leftIx], right = null;
+ if (rightIx < heapSize) {
+ right = heap[rightIx];
+ }
+ double leftPri = getHeapifyPriority(left, time), rightPri = getHeapifyPriority(right, time);
+ if (priority <= leftPri && priority <= rightPri) break;
+ if (leftPri <= rightPri) { // prefer left, cause right might be missing
+ heap[ix] = left;
+ left.indexInHeap = ix;
+ ix = leftIx;
+ } else {
+ heap[ix] = right;
+ right.indexInHeap = ix;
+ ix = rightIx;
+ }
+ }
+ buffer.indexInHeap = ix;
+ heap[ix] = buffer;
+ }
+
+ private void heapifyUpUnderLock(LlapCacheableBuffer buffer, long time) {
+ // See heapifyDown comment.
+ int ix = buffer.indexInHeap;
+ double priority = buffer.isLockedInHeap ? Double.MAX_VALUE : buffer.priority;
+ while (true) {
+ if (ix == 0) break; // Buffer is at the top of the heap.
+ int parentIx = (ix - 1) >>> 1;
+ LlapCacheableBuffer parent = heap[parentIx];
+ double parentPri = getHeapifyPriority(parent, time);
+ if (priority >= parentPri) break;
+ heap[ix] = parent;
+ parent.indexInHeap = ix;
+ ix = parentIx;
+ }
+ buffer.indexInHeap = ix;
+ heap[ix] = buffer;
+ }
+
+ private double getHeapifyPriority(LlapCacheableBuffer buf, long time) {
+ if (buf == null || buf.isLockedInHeap) return Double.MAX_VALUE;
+ if (buf.lastUpdate != time) {
+ buf.priority = expirePriority(time, buf.lastUpdate, buf.priority);
+ buf.lastUpdate = time;
+ }
+ return buf.priority;
+ }
+
+ public String debugDumpHeap() {
+ if (heapSize == 0) return "<empty>";
+ int levels = 32 - Integer.numberOfLeadingZeros(heapSize);
+ StringBuilder result = new StringBuilder();
+ int ix = 0;
+ int spacesCount = heap[0].toStringForCache().length() + 3;
+ String full = StringUtils.repeat(" ", spacesCount),
+ half = StringUtils.repeat(" ", spacesCount / 2);
+ int maxWidth = 1 << (levels - 1);
+ for (int i = 0; i < levels; ++i) {
+ int width = 1 << i;
+ int middleGap = (maxWidth - width) / width;
+ for (int j = 0; j < (middleGap >>> 1); ++j) {
+ result.append(full);
+ }
+ if ((middleGap & 1) == 1) {
+ result.append(half);
+ }
+ for (int j = 0; j < width && ix < heapSize; ++j, ++ix) {
+ if (j != 0) {
+ for (int k = 0; k < middleGap; ++k) {
+ result.append(full);
+ }
+ if (middleGap == 0) {
+ result.append(" ");
+ }
+ }
+ if ((j & 1) == 0) {
+ result.append("(");
+ }
+ result.append(heap[ix].toStringForCache());
+ if ((j & 1) == 1) {
+ result.append(")");
+ }
+ }
+ result.append("\n");
+ }
+ return result.toString();
+ }
+
+ @VisibleForTesting
+ public LlapCacheableBuffer evictOneMoreBlock() {
+ synchronized (heap) {
+ return evictFromHeapUnderLock(timer.get());
+ }
+ }
+
+ @Override
+ protected long evictSomeBlocks(long memoryToReserve, EvictionListener listener) {
+ long evicted = 0;
+ while (evicted < memoryToReserve) {
+ LlapCacheableBuffer buffer = evictOneMoreBlock();
+ if (buffer == null) return evicted;
+ evicted += buffer.length;
+ listener.notifyEvicted(buffer);
+ }
+ return evicted;
+ }
+}
Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java?rev=1650717&r1=1650716&r2=1650717&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java Sat Jan 10 02:38:17 2015
@@ -18,16 +18,16 @@
package org.apache.hadoop.hive.llap.cache;
-import org.apache.hadoop.hive.llap.io.api.cache.Allocator.LlapBuffer;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumn.ColumnBuffer;
public class NoopCache<CacheKey> implements Cache<CacheKey> {
@Override
- public LlapBuffer cacheOrGet(CacheKey key, LlapBuffer value) {
+ public ColumnBuffer cacheOrGet(CacheKey key, ColumnBuffer value) {
return value;
}
@Override
- public LlapBuffer get(CacheKey key) {
+ public ColumnBuffer get(CacheKey key) {
return null; // TODO: ensure real implementation increases refcount
}
}
Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/VectorReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/VectorReader.java?rev=1650717&r1=1650716&r2=1650717&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/VectorReader.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/VectorReader.java Sat Jan 10 02:38:17 2015
@@ -22,7 +22,7 @@ package org.apache.hadoop.hive.llap.io.a
import java.util.List;
import java.io.IOException;
-import org.apache.hadoop.hive.llap.io.api.cache.Allocator.LlapBuffer;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumn.ColumnBuffer;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
public interface VectorReader {
@@ -33,7 +33,7 @@ public interface VectorReader {
public static class ColumnVectorBatch {
public ColumnVector[] cols;
public int size;
- public List<LlapBuffer> lockedBuffers;
+ public List<ColumnBuffer> lockedBuffers;
}
public ColumnVectorBatch next() throws InterruptedException, IOException;
public void close() throws IOException;
Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java?rev=1650717&r1=1650716&r2=1650717&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java Sat Jan 10 02:38:17 2015
@@ -26,11 +26,10 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.llap.cache.Cache;
-import org.apache.hadoop.hive.llap.cache.JavaAllocator;
+import org.apache.hadoop.hive.llap.cache.LowLevelBuddyCache;
import org.apache.hadoop.hive.llap.cache.NoopCache;
import org.apache.hadoop.hive.llap.io.api.LlapIo;
import org.apache.hadoop.hive.llap.io.api.VectorReader;
-import org.apache.hadoop.hive.llap.io.api.cache.Allocator;
import org.apache.hadoop.hive.llap.io.api.orc.OrcCacheKey;
import org.apache.hadoop.hive.llap.io.decode.OrcColumnVectorProducer;
import org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataProducer;
@@ -53,11 +52,8 @@ public class LlapIoImpl implements LlapI
private LlapIoImpl(Configuration conf) throws IOException {
this.conf = conf;
- // ChunkPool<OrcLoader.ChunkKey> chunkPool = new ChunkPool<OrcLoader.ChunkKey>();
- // new BufferPool(conf, chunkPool)
- Allocator allocator = new JavaAllocator();
- Cache<OrcCacheKey> cache = new NoopCache<OrcCacheKey>();
- this.edp = new OrcEncodedDataProducer(allocator, cache, conf);
+ Cache<OrcCacheKey> cache = new NoopCache<OrcCacheKey>(); // High-level cache not supported yet.
+ this.edp = new OrcEncodedDataProducer(new LowLevelBuddyCache(conf), cache, conf);
this.cvp = new OrcColumnVectorProducer(edp, conf);
}
@@ -66,7 +62,7 @@ public class LlapIoImpl implements LlapI
getOrCreateInstance(conf);
}
- // TODO#: Add "create" method in a well-defined place when server is started
+ // TODO: Add "create" method in a well-defined place when server is started
public static LlapIo getOrCreateInstance(Configuration conf) {
if (ioImpl != null) return ioImpl;
synchronized (instanceLock) {
Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java?rev=1650717&r1=1650716&r2=1650717&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java Sat Jan 10 02:38:17 2015
@@ -26,9 +26,8 @@ import java.util.List;
import org.apache.hadoop.hive.llap.Consumer;
import org.apache.hadoop.hive.llap.ConsumerFeedback;
import org.apache.hadoop.hive.llap.io.api.EncodedColumn;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumn.ColumnBuffer;
import org.apache.hadoop.hive.llap.io.api.VectorReader.ColumnVectorBatch;
-import org.apache.hadoop.hive.llap.io.api.cache.Allocator;
-import org.apache.hadoop.hive.llap.io.api.cache.Allocator.LlapBuffer;
import org.apache.hadoop.hive.llap.io.encoded.EncodedDataProducer;
import org.apache.hadoop.hive.llap.io.encoded.EncodedDataReader;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
@@ -38,10 +37,10 @@ import org.apache.hadoop.mapred.InputSpl
public abstract class ColumnVectorProducer<BatchKey> {
static class EncodedColumnBatch {
public EncodedColumnBatch(int colCount) {
- columnDatas = new LlapBuffer[colCount];
+ columnDatas = new ColumnBuffer[colCount];
columnsRemaining = colCount;
}
- public LlapBuffer[] columnDatas;
+ public ColumnBuffer[] columnDatas;
public int columnsRemaining;
}
@@ -51,7 +50,7 @@ public abstract class ColumnVectorProduc
// TODO: use array, precreate array based on metadata first? Works for ORC. For now keep dumb.
private final HashMap<BatchKey, EncodedColumnBatch> pendingData =
new HashMap<BatchKey, EncodedColumnBatch>();
- private ConsumerFeedback<LlapBuffer> upstreamFeedback;
+ private ConsumerFeedback<ColumnBuffer> upstreamFeedback;
private final Consumer<ColumnVectorBatch> downstreamConsumer;
private final int colCount;
@@ -60,7 +59,7 @@ public abstract class ColumnVectorProduc
this.colCount = colCount;
}
- public void init(ConsumerFeedback<LlapBuffer> upstreamFeedback) {
+ public void init(ConsumerFeedback<ColumnBuffer> upstreamFeedback) {
this.upstreamFeedback = upstreamFeedback;
}
@@ -134,13 +133,14 @@ public abstract class ColumnVectorProduc
@Override
public void returnData(ColumnVectorBatch data) {
- for (LlapBuffer lockedBuffer : data.lockedBuffers) {
+ // TODO#: this should happen earlier, when data is decoded buffers are not needed
+ for (ColumnBuffer lockedBuffer : data.lockedBuffers) {
upstreamFeedback.returnData(lockedBuffer);
}
}
private void dicardPendingData(boolean isStopped) {
- List<LlapBuffer> dataToDiscard = new ArrayList<LlapBuffer>(pendingData.size() * colCount);
+ List<ColumnBuffer> dataToDiscard = new ArrayList<ColumnBuffer>(pendingData.size() * colCount);
List<EncodedColumnBatch> batches = new ArrayList<EncodedColumnBatch>(pendingData.size());
synchronized (pendingData) {
if (isStopped) {
@@ -151,13 +151,13 @@ public abstract class ColumnVectorProduc
}
for (EncodedColumnBatch batch : batches) {
synchronized (batch) {
- for (LlapBuffer b : batch.columnDatas) {
+ for (ColumnBuffer b : batch.columnDatas) {
dataToDiscard.add(b);
}
batch.columnDatas = null;
}
}
- for (LlapBuffer data : dataToDiscard) {
+ for (ColumnBuffer data : dataToDiscard) {
upstreamFeedback.returnData(data);
}
}
Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataReader.java?rev=1650717&r1=1650716&r2=1650717&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataReader.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataReader.java Sat Jan 10 02:38:17 2015
@@ -21,8 +21,8 @@ package org.apache.hadoop.hive.llap.io.e
import java.io.IOException;
import org.apache.hadoop.hive.llap.ConsumerFeedback;
-import org.apache.hadoop.hive.llap.io.api.cache.Allocator.LlapBuffer;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumn.ColumnBuffer;
-public interface EncodedDataReader<BatchKey> extends ConsumerFeedback<LlapBuffer> {
+public interface EncodedDataReader<BatchKey> extends ConsumerFeedback<ColumnBuffer> {
public void start() throws IOException;
}
Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java?rev=1650717&r1=1650716&r2=1650717&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java Sat Jan 10 02:38:17 2015
@@ -30,8 +30,8 @@ import org.apache.hadoop.hive.llap.Consu
import org.apache.hadoop.hive.llap.DebugUtils;
import org.apache.hadoop.hive.llap.cache.Cache;
import org.apache.hadoop.hive.llap.io.api.EncodedColumn;
-import org.apache.hadoop.hive.llap.io.api.cache.Allocator;
-import org.apache.hadoop.hive.llap.io.api.cache.Allocator.LlapBuffer;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumn.ColumnBuffer;
+import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
import org.apache.hadoop.hive.llap.io.api.orc.OrcCacheKey;
@@ -48,9 +48,9 @@ import org.apache.hadoop.mapred.InputSpl
public class OrcEncodedDataProducer implements EncodedDataProducer<OrcBatchKey> {
private FileSystem cachedFs = null;
+ private final LowLevelCache lowLevelCache;
private Configuration conf;
private OrcMetadataCache metadataCache;
- private final Allocator allocator;
private final Cache<OrcCacheKey> cache;
private class OrcEncodedDataReader implements EncodedDataReader<OrcBatchKey>,
@@ -140,7 +140,7 @@ public class OrcEncodedDataProducer impl
}
RecordReader stripeReader = orcReader.rows(si.getOffset(), si.getLength(), includes);
// We pass in the already-filtered RGs, as well as sarg. ORC can apply additional filtering.
- stripeReader.readEncodedColumns(colRgs, rgCount, sarg, this, allocator);
+ stripeReader.readEncodedColumns(colRgs, rgCount, sarg, this, lowLevelCache);
stripeReader.close();
}
@@ -151,7 +151,7 @@ public class OrcEncodedDataProducer impl
}
@Override
- public void returnData(LlapBuffer data) {
+ public void returnData(ColumnBuffer data) {
// TODO#: return the data to cache (unlock)
}
@@ -231,7 +231,7 @@ public class OrcEncodedDataProducer impl
boolean areAllRgsInCache = true;
for (int rgIx = 0; rgIx < rgCount; ++rgIx) {
key.rgIx = rgIx;
- LlapBuffer cached = cache.get(key);
+ ColumnBuffer cached = cache.get(key);
if (cached == null) {
areAllRgsInCache = false;
continue;
@@ -274,9 +274,9 @@ public class OrcEncodedDataProducer impl
public void consumeData(EncodedColumn<OrcBatchKey> data) {
// Store object in cache; create new key object - cannot be reused.
OrcCacheKey key = new OrcCacheKey(data.batchKey, data.columnIndex);
- LlapBuffer cached = cache.cacheOrGet(key, data.columnData);
+ ColumnBuffer cached = cache.cacheOrGet(key, data.columnData);
if (data.columnData != cached) {
- allocator.deallocate(data.columnData);
+ // TODO: deallocate columnData
data.columnData = cached;
}
consumer.consumeData(data);
@@ -301,15 +301,15 @@ public class OrcEncodedDataProducer impl
}
private static int align64(int number) {
- int rem = number & 63;
- return number - rem + (rem == 0 ? 0 : 64);
+ return ((number + 63) & ~63);
}
- public OrcEncodedDataProducer(Allocator allocator, Cache<OrcCacheKey> cache, Configuration conf) throws IOException {
+ public OrcEncodedDataProducer(LowLevelCache lowLevelCache, Cache<OrcCacheKey> cache,
+ Configuration conf) throws IOException {
// We assume all splits will come from the same FS.
this.cachedFs = FileSystem.get(conf);
this.cache = cache;
- this.allocator = allocator;
+ this.lowLevelCache = lowLevelCache;
this.conf = conf;
this.metadataCache = null;
}
@@ -319,5 +319,4 @@ public class OrcEncodedDataProducer impl
SearchArgument sarg, Consumer<EncodedColumn<OrcBatchKey>> consumer) {
return new OrcEncodedDataReader(split, columnIds, sarg, consumer);
}
-
}
Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java?rev=1650717&r1=1650716&r2=1650717&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java Sat Jan 10 02:38:17 2015
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.llap.Consumer;
import org.apache.hadoop.hive.llap.io.api.EncodedColumn;
-import org.apache.hadoop.hive.llap.io.api.cache.Allocator;
+import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.orc.*;
@@ -99,7 +99,7 @@ public class LLAPRecordReaderImpl extend
@Override
public void readEncodedColumns(long[][] colRgs, int rgCount, SearchArgument sarg,
- Consumer<EncodedColumn<OrcBatchKey>> consumer, Allocator allocator) {
+ Consumer<EncodedColumn<OrcBatchKey>> consumer, LowLevelCache cache) {
}
}
Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/old/BufferPool.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/old/BufferPool.java?rev=1650717&r1=1650716&r2=1650717&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/old/BufferPool.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/old/BufferPool.java Sat Jan 10 02:38:17 2015
@@ -39,11 +39,9 @@ public class BufferPool {
public BufferPool(Configuration conf) {
- this.maxCacheSize = HiveConf.getLongVar(conf, HiveConf.ConfVars.LLAP_CACHE_SIZE);
- this.bufferSize = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_BUFFER_SIZE);
- this.cachePolicy = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_USE_LRFU)
- ? new LrfuCachePolicy(conf, bufferSize, maxCacheSize)
- : new FifoCachePolicy(bufferSize, maxCacheSize);
+ this.maxCacheSize = 0;// HiveConf.getLongVar(conf, HiveConf.ConfVars.LLAP_CACHE_SIZE);
+ this.bufferSize = 0; // HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_BUFFER_SIZE);
+ this.cachePolicy = null;
}
/**
Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/old/ChunkPool.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/old/ChunkPool.java?rev=1650717&r1=1650716&r2=1650717&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/old/ChunkPool.java (original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/old/ChunkPool.java Sat Jan 10 02:38:17 2015
@@ -54,11 +54,9 @@ public class ChunkPool<K> /*implements E
* @return Chunk corresponding to k.
*/
public Chunk getChunk(K key, HashSet<WeakBuffer> lockedBuffers) {
- Chunk result = chunkCache.get(key);
- if (result == null) {
- return null;
- }
while (true) {
+ Chunk result = chunkCache.get(key);
+ if (result == null) return null;
if (lockChunk(result, lockedBuffers)) return result;
if (chunkCache.remove(key, result)) return null;
}
Modified: hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/old/TestLrfuCachePolicy.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/old/TestLrfuCachePolicy.java?rev=1650717&r1=1650716&r2=1650717&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/old/TestLrfuCachePolicy.java (original)
+++ hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/old/TestLrfuCachePolicy.java Sat Jan 10 02:38:17 2015
@@ -29,7 +29,7 @@ import org.junit.Assume;
import org.junit.Test;
import static org.junit.Assert.*;
-public class TestLrfuCachePolicy {
+public class TestLrfuCachePolicy {/* TODO: switch to LowLevel one
private static final Log LOG = LogFactory.getLog(TestLrfuCachePolicy.class);
@Test
@@ -221,5 +221,5 @@ public class TestLrfuCachePolicy {
debugStr += inserted.get(i);
}
return debugStr;
- }
+ }*/
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java?rev=1650717&r1=1650716&r2=1650717&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java Sat Jan 10 02:38:17 2015
@@ -152,6 +152,7 @@ abstract class InStream extends InputStr
currentRange = 0;
}
+ // TODO: this should allocate from cache
private ByteBuffer allocateBuffer(int size) {
// TODO: use the same pool as the ORC readers
if(isDirect == true) {
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java?rev=1650717&r1=1650716&r2=1650717&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java Sat Jan 10 02:38:17 2015
@@ -23,7 +23,7 @@ import org.apache.hadoop.hive.ql.exec.ve
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.llap.Consumer;
import org.apache.hadoop.hive.llap.io.api.EncodedColumn;
-import org.apache.hadoop.hive.llap.io.api.cache.Allocator;
+import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
@@ -97,5 +97,5 @@ public interface RecordReader {
* @param allocator Allocator to allocate memory.
*/
void readEncodedColumns(long[][] colRgs, int rgCount, SearchArgument sarg,
- Consumer<EncodedColumn<OrcBatchKey>> consumer, Allocator allocator);
+ Consumer<EncodedColumn<OrcBatchKey>> consumer, LowLevelCache cache);
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1650717&r1=1650716&r2=1650717&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Sat Jan 10 02:38:17 2015
@@ -45,7 +45,7 @@ import org.apache.hadoop.hive.common.typ
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.Consumer;
import org.apache.hadoop.hive.llap.io.api.EncodedColumn;
-import org.apache.hadoop.hive.llap.io.api.cache.Allocator;
+import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
@@ -2676,11 +2676,13 @@ public class RecordReaderImpl implements
) throws IOException {
long start = stripe.getIndexLength();
long end = start + stripe.getDataLength();
+ // TODO: planning should be added here too, to take cache into account
// explicitly trigger 1 big read
DiskRange[] ranges = new DiskRange[]{new DiskRange(start, end)};
bufferChunks = readDiskRanges(file, zcr, stripe.getOffset(), Arrays.asList(ranges));
List<OrcProto.Stream> streamDescriptions = stripeFooter.getStreamsList();
createStreams(streamDescriptions, bufferChunks, null, codec, bufferSize, streams);
+ // TODO: decompressed data from streams should be put in cache
}
/**
@@ -3050,6 +3052,7 @@ public class RecordReaderImpl implements
private void readPartialDataStreams(StripeInformation stripe
) throws IOException {
List<OrcProto.Stream> streamList = stripeFooter.getStreamsList();
+ // TODO: planning should take cache into account
List<DiskRange> chunks =
planReadPartialDataStreams(streamList,
indexes, included, includedRowGroups, codec != null,
@@ -3062,8 +3065,8 @@ public class RecordReaderImpl implements
LOG.debug("merge = " + stringifyDiskRanges(chunks));
}
bufferChunks = readDiskRanges(file, zcr, stripe.getOffset(), chunks);
- createStreams(streamList, bufferChunks, included, codec, bufferSize,
- streams);
+ // TODO: decompressed data from streams should be put in cache
+ createStreams(streamList, bufferChunks, included, codec, bufferSize, streams);
}
@Override
@@ -3300,7 +3303,7 @@ public class RecordReaderImpl implements
@Override
public void readEncodedColumns(long[][] colRgs, int rgCount, SearchArgument sarg,
- Consumer<EncodedColumn<OrcBatchKey>> consumer, Allocator allocator) {
+ Consumer<EncodedColumn<OrcBatchKey>> consumer, LowLevelCache allocator) {
// TODO: HERE read encoded data
}