You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/07/27 22:48:45 UTC
[4/4] hive git commit: HIVE-11259 : LLAP: clean up ORC dependencies
part 1 (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
HIVE-11259 : LLAP: clean up ORC dependencies part 1 (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1e3b59d3
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1e3b59d3
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1e3b59d3
Branch: refs/heads/llap
Commit: 1e3b59d3714810bb8a75420607e10ff266b426b4
Parents: 5cd092b
Author: Sergey Shelukhin <se...@apache.org>
Authored: Mon Jul 27 13:48:33 2015 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Mon Jul 27 13:48:33 2015 -0700
----------------------------------------------------------------------
.../hadoop/hive/common/DiskRangeList.java | 9 +-
.../hive/common/io/storage_api/Allocator.java | 51 +
.../hive/common/io/storage_api/DataCache.java | 101 ++
.../hive/common/io/storage_api/DataReader.java | 60 +
.../io/storage_api/EncodedColumnBatch.java | 139 +++
.../common/io/storage_api/MemoryBuffer.java | 28 +
.../org/apache/hadoop/hive/ql/QTestUtil.java | 1 -
.../org/apache/hadoop/hive/llap/Consumer.java | 30 -
.../hadoop/hive/llap/cache/Allocator.java | 34 -
.../llap/counters/LowLevelCacheCounters.java | 26 -
.../hive/llap/io/api/EncodedColumnBatch.java | 96 --
.../hadoop/hive/llap/io/api/LlapIoProxy.java | 2 +-
.../llap/io/api/cache/LlapMemoryBuffer.java | 30 -
.../hive/llap/io/api/cache/LowLevelCache.java | 96 --
.../hive/llap/io/api/orc/OrcBatchKey.java | 60 -
.../hive/llap/io/api/orc/OrcCacheKey.java | 58 -
.../hadoop/hive/llap/cache/BuddyAllocator.java | 32 +-
.../apache/hadoop/hive/llap/cache/Cache.java | 6 +-
.../hive/llap/cache/EvictionAwareAllocator.java | 5 +-
.../hadoop/hive/llap/cache/LlapDataBuffer.java | 4 +-
.../hadoop/hive/llap/cache/LowLevelCache.java | 76 ++
.../hive/llap/cache/LowLevelCacheCounters.java | 26 +
.../hive/llap/cache/LowLevelCacheImpl.java | 73 +-
.../hive/llap/cache/LowLevelCachePolicy.java | 2 +-
.../llap/cache/LowLevelFifoCachePolicy.java | 2 +-
.../llap/cache/LowLevelLrfuCachePolicy.java | 2 +-
.../hadoop/hive/llap/cache/NoopCache.java | 6 +-
.../llap/counters/QueryFragmentCounters.java | 1 +
.../hive/llap/io/api/impl/LlapInputFormat.java | 2 +-
.../hive/llap/io/api/impl/LlapIoImpl.java | 4 +-
.../llap/io/decode/ColumnVectorProducer.java | 2 +-
.../llap/io/decode/EncodedDataConsumer.java | 10 +-
.../llap/io/decode/OrcColumnVectorProducer.java | 6 +-
.../llap/io/decode/OrcEncodedDataConsumer.java | 27 +-
.../llap/io/encoded/OrcEncodedDataReader.java | 137 ++-
.../hive/llap/io/metadata/OrcMetadataCache.java | 7 +-
.../llap/io/metadata/OrcStripeMetadata.java | 2 +-
.../hive/llap/cache/TestBuddyAllocator.java | 22 +-
.../TestIncrementalObjectSizeEstimator.java | 2 +-
.../hive/llap/cache/TestLowLevelCacheImpl.java | 119 +-
.../llap/cache/TestLowLevelLrfuCachePolicy.java | 2 +-
.../hive/llap/cache/TestOrcMetadataCache.java | 2 +-
.../org/apache/hadoop/hive/llap/DebugUtils.java | 4 -
.../org/apache/hadoop/hive/ql/io/HdfsUtils.java | 61 +
.../hadoop/hive/ql/io/orc/EncodedReader.java | 12 +-
.../hive/ql/io/orc/EncodedReaderImpl.java | 1118 +++++++++++++++---
.../ql/io/orc/EncodedTreeReaderFactory.java | 203 ++--
.../apache/hadoop/hive/ql/io/orc/InStream.java | 947 +--------------
.../hive/ql/io/orc/MetadataReaderImpl.java | 6 +-
.../apache/hadoop/hive/ql/io/orc/Reader.java | 15 +-
.../hadoop/hive/ql/io/orc/ReaderImpl.java | 28 +-
.../hadoop/hive/ql/io/orc/RecordReader.java | 8 -
.../hadoop/hive/ql/io/orc/RecordReaderImpl.java | 71 +-
.../hive/ql/io/orc/RecordReaderUtils.java | 105 +-
.../hadoop/hive/ql/io/orc/StreamUtils.java | 12 +-
.../hive/ql/io/orc/TreeReaderFactory.java | 20 +-
.../hadoop/hive/ql/io/orc/llap/Consumer.java | 30 +
.../hadoop/hive/ql/io/orc/llap/OrcBatchKey.java | 60 +
.../hadoop/hive/ql/io/orc/llap/OrcCacheKey.java | 58 +
59 files changed, 2205 insertions(+), 1953 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/common/src/java/org/apache/hadoop/hive/common/DiskRangeList.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/DiskRangeList.java b/common/src/java/org/apache/hadoop/hive/common/DiskRangeList.java
index 250f901..4fa72a2 100644
--- a/common/src/java/org/apache/hadoop/hive/common/DiskRangeList.java
+++ b/common/src/java/org/apache/hadoop/hive/common/DiskRangeList.java
@@ -152,10 +152,8 @@ public class DiskRangeList extends DiskRange {
return result;
}
- public static class DiskRangeListCreateHelper {
+ public static class CreateHelper {
private DiskRangeList tail = null, head;
- public DiskRangeListCreateHelper() {
- }
public DiskRangeList getTail() {
return tail;
@@ -175,7 +173,6 @@ public class DiskRangeList extends DiskRange {
}
}
-
public DiskRangeList get() {
return head;
}
@@ -192,8 +189,8 @@ public class DiskRangeList extends DiskRange {
* and thus remains constant even if head is replaced with some new range via in-place list
* mutation. extract() can be used to obtain the modified list.
*/
- public static class DiskRangeListMutateHelper extends DiskRangeList {
- public DiskRangeListMutateHelper(DiskRangeList head) {
+ public static class MutateHelper extends DiskRangeList {
+ public MutateHelper(DiskRangeList head) {
super(-1, -1);
assert head != null;
assert head.prev == null;
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/common/src/java/org/apache/hadoop/hive/common/io/storage_api/Allocator.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/io/storage_api/Allocator.java b/common/src/java/org/apache/hadoop/hive/common/io/storage_api/Allocator.java
new file mode 100644
index 0000000..0814fe7
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/io/storage_api/Allocator.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common.io.storage_api;
+
+/** An allocator provided externally to storage classes to allocate MemoryBuffer-s. */
+public interface Allocator {
+ public static class AllocatorOutOfMemoryException extends RuntimeException {
+ public AllocatorOutOfMemoryException(String msg) {
+ super(msg);
+ }
+
+ private static final long serialVersionUID = 268124648177151761L;
+ }
+
+ /**
+ * Allocates multiple buffers of a given size.
+ * @param dest Array where buffers are placed. Objects are reused if already there
+ * (see createUnallocated), created otherwise.
+ * @param size Allocation size.
+ * @throws AllocatorOutOfMemoryException Cannot allocate.
+ */
+ void allocateMultiple(MemoryBuffer[] dest, int size) throws AllocatorOutOfMemoryException;
+
+ /**
+ * Creates an unallocated memory buffer object. This object can be passed to allocateMultiple
+ * to allocate; this is useful if data structures are created for separate buffers that can
+ * later be allocated together.
+ */
+ MemoryBuffer createUnallocated();
+ /** Deallocates a memory buffer. */
+ void deallocate(MemoryBuffer buffer);
+ /** Whether the allocator uses direct buffers. */
+ boolean isDirectAlloc();
+ /** Maximum allocation size supported by this allocator. */
+ int getMaxAllocation();
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/common/src/java/org/apache/hadoop/hive/common/io/storage_api/DataCache.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/io/storage_api/DataCache.java b/common/src/java/org/apache/hadoop/hive/common/io/storage_api/DataCache.java
new file mode 100644
index 0000000..0ec67ea
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/io/storage_api/DataCache.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common.io.storage_api;
+
+import org.apache.hadoop.hive.common.DiskRange;
+import org.apache.hadoop.hive.common.DiskRangeList;
+
+/** An abstract data cache that IO formats can use to retrieve and cache data. */
+public interface DataCache {
+ public static final class BooleanRef {
+ public boolean value;
+ }
+
+ /** Disk range factory used during cache retrieval. */
+ public interface DiskRangeListFactory {
+ DiskRangeList createCacheChunk(MemoryBuffer buffer, long startOffset, long endOffset);
+ }
+
+ /**
+ * Gets file data for particular offsets. The range list is modified in place; it is then
+ * returned (since the list head could have changed). Ranges are replaced with cached ranges.
+ *
+ * Any such buffer is locked in cache to prevent eviction, and must therefore be released
+ * back to cache via a corresponding call (releaseBuffer) when the caller is done with it.
+ *
+ * In case of partial overlap with cached data, full cache blocks are always returned;
+ * there's no capacity for partial matches in return type. The rules are as follows:
+ * 1) If the requested range starts in the middle of a cached range, that cached range will not
+ * be returned by default (e.g. if [100,200) and [200,300) are cached, the request for
+ * [150,300) will only return [200,300) from cache). This may be configurable in impls.
+ * This is because we assume well-known range start offsets are used (rg/stripe offsets), so
+ * a request from the middle of the start doesn't make sense.
+ * 2) If the requested range ends in the middle of a cached range, that entire cached range will
+ * be returned (e.g. if [100,200) and [200,300) are cached, the request for [100,250) will
+ * return both ranges). It should really be same as #1, however currently ORC uses estimated
+ * end offsets; if we don't return the end block, the caller may read it from disk needlessly.
+ *
+ * @param fileId Unique ID of the target file on the file system.
+ * @param range A set of DiskRange-s (linked list) that is to be retrieved. May be modified.
+ * @param baseOffset base offset for the ranges (stripe/stream offset in case of ORC).
+ * @param factory A factory to produce DiskRangeList-s out of cached MemoryBuffer-s.
+ * @param gotAllData An out param - whether all the requested data was found in cache.
+ * @return The new or modified list of DiskRange-s, where some ranges may contain cached data.
+ */
+ DiskRangeList getFileData(long fileId, DiskRangeList range, long baseOffset,
+ DiskRangeListFactory factory, BooleanRef gotAllData);
+
+ /**
+ * Puts file data into cache, or gets older data in case of collisions.
+ *
+ * The memory buffers provided MUST be allocated via an allocator returned by getAllocator
+ * method, to allow cache implementations that evict and then de-allocate the buffer.
+ *
+ * It is assumed that the caller will use the data immediately, therefore any buffers provided
+ * to putFileData (or returned due to cache collision) are locked in cache to prevent eviction,
+ * and must therefore be released back to cache via a corresponding call (releaseBuffer) when the
+ * caller is done with it. Buffers rejected due to conflict will neither be locked, nor
+ * automatically deallocated. The caller must take care to discard these buffers.
+ *
+ * @param fileId Unique ID of the target file on the file system.
+ * @param ranges The ranges for which the data is being cached. These objects will not be stored.
+ * @param data The data for the corresponding ranges.
+ * @param baseOffset base offset for the ranges (stripe/stream offset in case of ORC).
+ * @return null if all data was put; bitmask indicating which chunks were not put otherwise;
+ * the replacement chunks from cache are updated directly in the array.
+ */
+ long[] putFileData(long fileId, DiskRange[] ranges, MemoryBuffer[] data, long baseOffset);
+
+ /**
+ * Releases the buffer returned by getFileData/provided to putFileData back to cache.
+ * See respective javadocs for details.
+ */
+ void releaseBuffer(MemoryBuffer buffer);
+
+ /**
+ * Notifies the cache that the buffer returned from getFileData/provided to putFileData will
+ * be used by another consumer and therefore released multiple times (one more time per call).
+ */
+ void reuseBuffer(MemoryBuffer buffer);
+
+ /**
+ * Gets the allocator associated with this DataCache.
+ */
+ Allocator getAllocator();
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/common/src/java/org/apache/hadoop/hive/common/io/storage_api/DataReader.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/io/storage_api/DataReader.java b/common/src/java/org/apache/hadoop/hive/common/io/storage_api/DataReader.java
new file mode 100644
index 0000000..0e11e4e
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/io/storage_api/DataReader.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common.io.storage_api;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.hadoop.hive.common.DiskRangeList;
+import org.apache.hadoop.hive.common.io.storage_api.DataCache.DiskRangeListFactory;
+
+/** An abstract data reader that IO formats can use to read bytes from underlying storage. */
+public interface DataReader {
+
+ /** Opens the DataReader, making it ready to use. */
+ void open() throws IOException;
+
+ /** Closes the DataReader. */
+ void close() throws IOException;
+
+ /** Reads the data.
+ *
+ * Note that for the cases such as zero-copy read, caller must release the disk ranges
+ * produced after being done with them. Call isTrackingDiskRanges to find out if this is needed.
+ * @param range List if disk ranges to read. Ranges with data will be ignored.
+ * @param baseOffset Base offset from the start of the file of the ranges in disk range list.
+ * @param doForceDirect Whether the data should be read into direct buffers.
+ * @return New or modified list of DiskRange-s, where all the ranges are filled with data.
+ */
+ DiskRangeList readFileData(
+ DiskRangeList range, long baseOffset, boolean doForceDirect) throws IOException;
+
+
+ /**
+ * Whether the user should release buffers created by readFileData. See readFileData javadoc.
+ */
+ boolean isTrackingDiskRanges();
+
+ /**
+ * Releases buffers created by readFileData. See readFileData javadoc.
+ * @param toRelease The buffer to release.
+ */
+ void releaseBuffer(ByteBuffer toRelease);
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/common/src/java/org/apache/hadoop/hive/common/io/storage_api/EncodedColumnBatch.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/io/storage_api/EncodedColumnBatch.java b/common/src/java/org/apache/hadoop/hive/common/io/storage_api/EncodedColumnBatch.java
new file mode 100644
index 0000000..d51e3b4
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/io/storage_api/EncodedColumnBatch.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common.io.storage_api;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A block of data for a given section of a file, similar to VRB but in encoded form.
+ * Stores a set of buffers for each encoded stream that is a part of each column.
+ */
+public class EncodedColumnBatch<BatchKey> {
+ /**
+ * Slice of the data for a stream for some column, stored inside MemoryBuffer's.
+ * ColumnStreamData can be reused for many EncodedColumnBatch-es (e.g. dictionary stream), so
+ * it tracks the number of such users via a refcount.
+ */
+ public static class ColumnStreamData {
+ private List<MemoryBuffer> cacheBuffers;
+ /** Base offset from the beginning of the indexable unit; for example, for ORC,
+ * offset from the CB in a compressed file, from the stream in uncompressed file. */
+ private int indexBaseOffset;
+ /** Stream type; format-specific. */
+ private int streamKind;
+
+ /** Reference count. */
+ private AtomicInteger refCount = new AtomicInteger(0);
+
+ public void init(int kind) {
+ streamKind = kind;
+ indexBaseOffset = 0;
+ }
+
+ public void reset() {
+ cacheBuffers.clear();
+ refCount.set(0);
+ }
+
+ public void incRef() {
+ refCount.incrementAndGet();
+ }
+
+ public int decRef() {
+ int i = refCount.decrementAndGet();
+ assert i >= 0;
+ return i;
+ }
+
+ public List<MemoryBuffer> getCacheBuffers() {
+ return cacheBuffers;
+ }
+
+ public void setCacheBuffers(List<MemoryBuffer> cacheBuffers) {
+ this.cacheBuffers = cacheBuffers;
+ }
+
+ public int getIndexBaseOffset() {
+ return indexBaseOffset;
+ }
+
+ public void setIndexBaseOffset(int indexBaseOffset) {
+ this.indexBaseOffset = indexBaseOffset;
+ }
+
+ public int getStreamKind() {
+ return streamKind;
+ }
+ }
+
+ /** The key that is used to map this batch to source location. */
+ protected BatchKey batchKey;
+ /** Stream data for each stream, for each included column. */
+ protected ColumnStreamData[][] columnData;
+ /** Column indexes included in the batch. Correspond to columnData elements. */
+ protected int[] columnIxs;
+ // TODO: Maybe remove when solving the pooling issue.
+ /** Generation version necessary to sync pooling reuse with the fact that two separate threads
+ * operate on batches - the one that decodes them, and potential separate thread w/a "stop" call
+ * that cleans them up. We don't want the decode thread to use the ECB that was thrown out and
+ * reused, so it remembers the version and checks it after making sure no cleanup thread can ever
+ * get to this ECB anymore. All this sync is ONLY needed because of high level cache code. */
+ public int version = Integer.MIN_VALUE;
+
+ public void reset() {
+ if (columnData != null) {
+ for (int i = 0; i < columnData.length; ++i) {
+ columnData[i] = null;
+ }
+ }
+ }
+
+ public void initColumn(int colIxMod, int colIx, int streamCount) {
+ columnIxs[colIxMod] = colIx;
+ columnData[colIxMod] = new ColumnStreamData[streamCount];
+ }
+
+ public void setStreamData(int colIxMod, int streamIx, ColumnStreamData sb) {
+ columnData[colIxMod][streamIx] = sb;
+ }
+
+ public void setAllStreamsData(int colIxMod, int colIx, ColumnStreamData[] sbs) {
+ columnIxs[colIxMod] = colIx;
+ columnData[colIxMod] = sbs;
+ }
+
+ public BatchKey getBatchKey() {
+ return batchKey;
+ }
+
+ public ColumnStreamData[][] getColumnData() {
+ return columnData;
+ }
+
+ public int[] getColumnIxs() {
+ return columnIxs;
+ }
+
+ protected void resetColumnArrays(int columnCount) {
+ if (columnIxs != null && columnCount == columnIxs.length) return;
+ columnIxs = new int[columnCount];
+ columnData = new ColumnStreamData[columnCount][];
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/common/src/java/org/apache/hadoop/hive/common/io/storage_api/MemoryBuffer.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/io/storage_api/MemoryBuffer.java b/common/src/java/org/apache/hadoop/hive/common/io/storage_api/MemoryBuffer.java
new file mode 100644
index 0000000..4dd2f09
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/io/storage_api/MemoryBuffer.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common.io.storage_api;
+
+import java.nio.ByteBuffer;
+
+/** Abstract interface for any class wrapping a ByteBuffer. */
+public interface MemoryBuffer {
+ /** Note - raw buffer should not be modified. */
+ public ByteBuffer getByteBufferRaw();
+ public ByteBuffer getByteBufferDup();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
index a0e0c94..c9ed0d4 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
@@ -82,7 +82,6 @@ import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
-import org.apache.hadoop.hive.llap.io.api.LlapIoProxy;
import org.apache.hadoop.hive.ql.lockmgr.zookeeper.CuratorFrameworkSingleton;
import org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager;
import org.apache.hadoop.hive.ql.metadata.Hive;
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-client/src/java/org/apache/hadoop/hive/llap/Consumer.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/Consumer.java b/llap-client/src/java/org/apache/hadoop/hive/llap/Consumer.java
deleted file mode 100644
index 9db9f75c..0000000
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/Consumer.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.llap;
-
-/**
- * Data consumer; an equivalent of a data queue for an asynchronous data producer.
- */
-public interface Consumer<T> {
- /** Some data has been produced. */
- public void consumeData(T data);
- /** No more data will be produced; done */
- public void setDone();
- /** No more data will be produced; error during production */
- public void setError(Throwable t);
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-client/src/java/org/apache/hadoop/hive/llap/cache/Allocator.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/cache/Allocator.java b/llap-client/src/java/org/apache/hadoop/hive/llap/cache/Allocator.java
deleted file mode 100644
index 4e990ef..0000000
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/cache/Allocator.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.llap.cache;
-
-import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
-
-public interface Allocator {
- public static class LlapCacheOutOfMemoryException extends RuntimeException {
- public LlapCacheOutOfMemoryException(String msg) {
- super(msg);
- }
-
- private static final long serialVersionUID = 268124648177151761L;
- }
- void allocateMultiple(LlapMemoryBuffer[] dest, int size) throws LlapCacheOutOfMemoryException;
- void deallocate(LlapMemoryBuffer buffer);
- boolean isDirectAlloc();
- int getMaxAllocation();
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-client/src/java/org/apache/hadoop/hive/llap/counters/LowLevelCacheCounters.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/counters/LowLevelCacheCounters.java b/llap-client/src/java/org/apache/hadoop/hive/llap/counters/LowLevelCacheCounters.java
deleted file mode 100644
index d862a83..0000000
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/counters/LowLevelCacheCounters.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.llap.counters;
-
-public interface LowLevelCacheCounters {
- void recordCacheHit(long bytesHit);
- void recordCacheMiss(long bytesMissed);
- void recordAllocBytes(long bytesWasted, long bytesAllocated);
- void recordHdfsTime(long timeUs);
- long startTimeCounter();
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumnBatch.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumnBatch.java b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumnBatch.java
deleted file mode 100644
index 9734fc0..0000000
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumnBatch.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.io.api;
-
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
-
-public class EncodedColumnBatch<BatchKey> {
- // TODO: temporary class. Will be filled in when reading (ORC) is implemented. Need to balance
- // generality, and ability to not copy data from underlying low-level cached buffers.
- public static class StreamBuffer {
- // Decoder knows which stream this belongs to, and each buffer is a compression block,
- // so he can figure out the offsets from metadata.
- public List<LlapMemoryBuffer> cacheBuffers;
- public int streamKind;
- /** Base offset from the beginning of the indexable unit;
- * CB in compressed, stream in uncompressed file. */
- public int indexBaseOffset;
-
- // StreamBuffer can be reused for many RGs (e.g. dictionary case). To avoid locking every
- // LlapMemoryBuffer 500 times, have a separate refcount on StreamBuffer itself.
- public AtomicInteger refCount = new AtomicInteger(0);
-
- public void init(int kind) {
- streamKind = kind;
- indexBaseOffset = 0;
- }
-
- public void reset() {
- cacheBuffers.clear();
- refCount.set(0);
- }
-
- public void incRef() {
- refCount.incrementAndGet();
- }
-
- public int decRef() {
- int i = refCount.decrementAndGet();
- assert i >= 0;
- return i;
- }
- }
-
- public BatchKey batchKey;
- public StreamBuffer[][] columnData;
- public int[] columnIxs;
- /** Generation version necessary to sync pooling reuse with the fact that two separate threads
- * operate on batches - the one that decodes them, and potential separate thread w/a "stop" call
- * that cleans them up. We don't want the decode thread to use the ECB that was thrown out and
- * reused, so it remembers the version and checks it after making sure no cleanup thread can ever
- * get to this ECB anymore. All this sync is ONLY needed because of high level cache code (sync
- * in decode thread is for the map that combines columns coming from cache and from file), so
- * if we throw this presently-unused code out, we'd be able to get rid of this. */
- public int version = Integer.MIN_VALUE;
-
- public void reset() {
- if (columnData != null) {
- for (int i = 0; i < columnData.length; ++i) {
- columnData[i] = null;
- }
- }
- }
-
- public void initColumn(int colIxMod, int colIx, int streamCount) {
- columnIxs[colIxMod] = colIx;
- columnData[colIxMod] = new StreamBuffer[streamCount];
- }
-
- public void setStreamData(int colIxMod, int streamIx, StreamBuffer sb) {
- columnData[colIxMod][streamIx] = sb;
- }
-
- public void setAllStreams(int colIxMod, int colIx, StreamBuffer[] sbs) {
- columnIxs[colIxMod] = colIx;
- columnData[colIxMod] = sbs;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIoProxy.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIoProxy.java b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIoProxy.java
index c48af7b..4c31e32 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIoProxy.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/LlapIoProxy.java
@@ -28,7 +28,7 @@ public class LlapIoProxy {
private final static String IMPL_CLASS = "org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl";
// Llap server depends on Hive execution, so the reverse cannot be true. We create the I/O
- // singleton once (on daemon startup); the said singleton server as the IO interface.
+ // singleton once (on daemon startup); the said singleton serves as the IO interface.
private static LlapIo io = null;
private static boolean isDaemon = false;
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java
deleted file mode 100644
index 5ec4d63..0000000
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LlapMemoryBuffer.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.io.api.cache;
-
-import java.nio.ByteBuffer;
-import org.apache.hadoop.metrics2.MetricsSource;
-
-
-public interface LlapMemoryBuffer {
- /** Note - position of the raw buffer should NOT be modified ever;
- * limit should not be modified after it's in cache. */
- public ByteBuffer getByteBufferRaw();
- public ByteBuffer getByteBufferDup();
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
deleted file mode 100644
index fcc7eed..0000000
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.io.api.cache;
-
-import java.util.List;
-
-import org.apache.hadoop.hive.common.DiskRange;
-import org.apache.hadoop.hive.common.DiskRangeList;
-import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListMutateHelper;
-import org.apache.hadoop.hive.llap.cache.Allocator;
-import org.apache.hadoop.hive.llap.counters.LowLevelCacheCounters;
-
-public interface LowLevelCache {
- public enum Priority {
- NORMAL,
- HIGH
- }
-
- public class CacheListHelper extends DiskRangeListMutateHelper {
- public CacheListHelper(DiskRangeList head) {
- super(head);
- }
-
- /** Workaround for Java's limitations, used to return stuff from getFileData. */
- public boolean didGetAllData;
- }
-
- /**
- * Gets file data for particular offsets. The range list is modified in place; it is then
- * returned (since the list head could have changed). Ranges are replaced with cached ranges.
- * In case of partial overlap with cached data, full cache blocks are always returned;
- * there's no capacity for partial matches in return type. The rules are as follows:
- * 1) If the requested range starts in the middle of a cached range, that cached range will not
- * be returned by default (e.g. if [100,200) and [200,300) are cached, the request for
- * [150,300) will only return [200,300) from cache). This may be configurable in impls.
- * This is because we assume well-known range start offsets are used (rg/stripe offsets), so
- * a request from the middle of the start doesn't make sense.
- * 2) If the requested range ends in the middle of a cached range, that entire cached range will
- * be returned (e.g. if [100,200) and [200,300) are cached, the request for [100,250) will
- * return both ranges). It should really be same as #1, however currently ORC uses estimated
- * end offsets; we do in fact know in such cases that partially-matched cached block (rg)
- * can be thrown away, the reader will never touch it; but we need code in the reader to
- * handle such cases to avoid disk reads for these "tails" vs real unmatched ranges.
- * Some sort of InvalidCacheChunk could be placed to avoid them. TODO
- * @param base base offset for the ranges (stripe/stream offset in case of ORC).
- */
- DiskRangeList getFileData(long fileId, DiskRangeList range, long baseOffset,
- CacheChunkFactory factory, LowLevelCacheCounters qfCounters);
-
- DiskRangeList getFileData(
- long fileId, DiskRangeList range, long baseOffset, CacheChunkFactory factory);
-
- /**
- * Puts file data into cache.
- * @return null if all data was put; bitmask indicating which chunks were not put otherwise;
- * the replacement chunks from cache are updated directly in the array.
- */
- long[] putFileData(long fileId, DiskRange[] ranges, LlapMemoryBuffer[] chunks,
- long base, Priority priority, LowLevelCacheCounters qfCounters);
-
- long[] putFileData(long fileId, DiskRange[] ranges, LlapMemoryBuffer[] chunks,
- long base, Priority priority);
-
- Allocator getAllocator();
-
- /**
- * Releases the buffer returned by getFileData or allocateMultiple.
- */
- void releaseBuffer(LlapMemoryBuffer buffer);
-
- void releaseBuffers(List<LlapMemoryBuffer> cacheBuffers);
-
- LlapMemoryBuffer createUnallocated();
-
- boolean notifyReused(LlapMemoryBuffer buffer);
-
- public interface CacheChunkFactory {
- DiskRangeList createCacheChunk(LlapMemoryBuffer buffer, long startOffset, long endOffset);
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/orc/OrcBatchKey.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/orc/OrcBatchKey.java b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/orc/OrcBatchKey.java
deleted file mode 100644
index 4dae98f..0000000
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/orc/OrcBatchKey.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.io.api.orc;
-
-public class OrcBatchKey {
- public long file;
- public int stripeIx, rgIx;
-
- public OrcBatchKey(long file, int stripeIx, int rgIx) {
- set(file, stripeIx, rgIx);
- }
-
- public void set(long file, int stripeIx, int rgIx) {
- this.file = file;
- this.stripeIx = stripeIx;
- this.rgIx = rgIx;
- }
-
- @Override
- public String toString() {
- return "[" + file + ", stripe " + stripeIx + ", rgIx " + rgIx + "]";
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = prime + (int)(file ^ (file >>> 32));
- return (prime * result + rgIx) * prime + stripeIx;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) return true;
- if (!(obj instanceof OrcBatchKey)) return false;
- OrcBatchKey other = (OrcBatchKey)obj;
- // Strings are interned and can thus be compared like this.
- return stripeIx == other.stripeIx && rgIx == other.rgIx && file == other.file;
- }
-
- @Override
- public OrcBatchKey clone() throws CloneNotSupportedException {
- return new OrcBatchKey(file, stripeIx, rgIx);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/orc/OrcCacheKey.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/orc/OrcCacheKey.java b/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/orc/OrcCacheKey.java
deleted file mode 100644
index 95efeba..0000000
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/orc/OrcCacheKey.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.io.api.orc;
-
-public class OrcCacheKey extends OrcBatchKey {
- public int colIx;
-
- public OrcCacheKey(long file, int stripeIx, int rgIx, int colIx) {
- super(file, stripeIx, rgIx);
- this.colIx = colIx;
- }
-
- public OrcCacheKey(OrcBatchKey batchKey, int colIx) {
- super(batchKey.file, batchKey.stripeIx, batchKey.rgIx);
- this.colIx = colIx;
- }
-
- public OrcBatchKey copyToPureBatchKey() {
- return new OrcBatchKey(file, stripeIx, rgIx);
- }
-
- @Override
- public String toString() {
- return "[" + file + ", stripe " + stripeIx + ", rgIx " + rgIx + ", rgIx " + colIx + "]";
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- return super.hashCode() * prime + colIx;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) return true;
- if (!(obj instanceof OrcCacheKey)) return false;
- OrcCacheKey other = (OrcCacheKey)obj;
- // Strings are interned and can thus be compared like this.
- return stripeIx == other.stripeIx && rgIx == other.rgIx
- && file == other.file && other.colIx == colIx;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
index fca6249..65854fc 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/BuddyAllocator.java
@@ -22,9 +22,9 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.io.storage_api.MemoryBuffer;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
@@ -97,8 +97,8 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
// TODO: would it make sense to return buffers asynchronously?
@Override
- public void allocateMultiple(LlapMemoryBuffer[] dest, int size)
- throws LlapCacheOutOfMemoryException {
+ public void allocateMultiple(MemoryBuffer[] dest, int size)
+ throws AllocatorOutOfMemoryException {
assert size > 0 : "size is " + size;
if (size > maxAllocation) {
throw new RuntimeException("Trying to allocate " + size + "; max is " + maxAllocation);
@@ -114,7 +114,7 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
int ix = 0;
for (int i = 0; i < dest.length; ++i) {
if (dest[i] != null) continue;
- dest[i] = new LlapDataBuffer(); // TODO: pool of objects?
+ dest[i] = createUnallocated(); // TODO: pool of objects?
}
// First try to quickly lock some of the correct-sized free lists and allocate from them.
int arenaCount = allocatedArenas.get();
@@ -173,20 +173,20 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
String msg = "Failed to allocate " + size + "; at " + ix + " out of " + dest.length;
LlapIoImpl.LOG.error(msg + "\nALLOCATOR STATE:\n" + debugDump()
+ "\nPARENT STATE:\n" + memoryManager.debugDumpForOom());
- throw new LlapCacheOutOfMemoryException(msg);
+ throw new AllocatorOutOfMemoryException(msg);
}
@Override
- public void deallocate(LlapMemoryBuffer buffer) {
+ public void deallocate(MemoryBuffer buffer) {
deallocateInternal(buffer, true);
}
@Override
- public void deallocateEvicted(LlapMemoryBuffer buffer) {
+ public void deallocateEvicted(MemoryBuffer buffer) {
deallocateInternal(buffer, false);
}
- private void deallocateInternal(LlapMemoryBuffer buffer, boolean doReleaseMemory) {
+ private void deallocateInternal(MemoryBuffer buffer, boolean doReleaseMemory) {
LlapDataBuffer buf = (LlapDataBuffer)buffer;
long memUsage = buf.getMemoryUsage();
metrics.decrCacheCapacityUsed(buf.byteBuffer.capacity());
@@ -303,7 +303,7 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
}
private int allocateFast(
- int arenaIx, int freeListIx, LlapMemoryBuffer[] dest, int ix, int size) {
+ int arenaIx, int freeListIx, MemoryBuffer[] dest, int ix, int size) {
if (data == null) return -1; // not allocated yet
FreeList freeList = freeLists[freeListIx];
if (!freeList.lock.tryLock()) return ix;
@@ -315,7 +315,7 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
}
private int allocateWithSplit(int arenaIx, int freeListIx,
- LlapMemoryBuffer[] dest, int ix, int allocationSize) {
+ MemoryBuffer[] dest, int ix, int allocationSize) {
if (data == null) return -1; // not allocated yet
FreeList freeList = freeLists[freeListIx];
int remaining = -1;
@@ -404,7 +404,7 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
}
private int allocateWithExpand(
- int arenaIx, int freeListIx, LlapMemoryBuffer[] dest, int ix, int size) {
+ int arenaIx, int freeListIx, MemoryBuffer[] dest, int ix, int size) {
while (true) {
int arenaCount = allocatedArenas.get(), allocArenaCount = arenaCount;
if (arenaCount < 0) {
@@ -449,7 +449,7 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
}
public int allocateFromFreeListUnderLock(int arenaIx, FreeList freeList,
- int freeListIx, LlapMemoryBuffer[] dest, int ix, int size) {
+ int freeListIx, MemoryBuffer[] dest, int ix, int size) {
int current = freeList.listHead;
while (current >= 0 && ix < dest.length) {
int offset = offsetFromHeaderIndex(current);
@@ -532,9 +532,6 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
}
}
- private static class Request {
-
- }
private static class FreeList {
ReentrantLock lock = new ReentrantLock(false);
int listHead = -1; // Index of where the buffer is; in minAllocation units
@@ -542,4 +539,9 @@ public final class BuddyAllocator implements EvictionAwareAllocator, BuddyAlloca
// blocks requested, to be able to wait for pending splits and reduce fragmentation.
// However, we are trying to increase fragmentation now, since we cater to single-size.
}
+
+ @Override
+ public MemoryBuffer createUnallocated() {
+ return new LlapDataBuffer();
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java
index 1fe339c..4d294b9 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java
@@ -18,10 +18,10 @@
package org.apache.hadoop.hive.llap.cache;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer;
+import org.apache.hadoop.hive.common.io.storage_api.EncodedColumnBatch.ColumnStreamData;
/** Dummy interface for now, might be different. */
public interface Cache<CacheKey> {
- public StreamBuffer[] cacheOrGet(CacheKey key, StreamBuffer[] value);
- public StreamBuffer[] get(CacheKey key);
+ public ColumnStreamData[] cacheOrGet(CacheKey key, ColumnStreamData[] value);
+ public ColumnStreamData[] get(CacheKey key);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionAwareAllocator.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionAwareAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionAwareAllocator.java
index db08130..3baacfd 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionAwareAllocator.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionAwareAllocator.java
@@ -17,7 +17,8 @@
*/
package org.apache.hadoop.hive.llap.cache;
-import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
+import org.apache.hadoop.hive.common.io.storage_api.Allocator;
+import org.apache.hadoop.hive.common.io.storage_api.MemoryBuffer;
/**
* An allocator that has additional, internal-only call to deallocate evicted buffer.
@@ -25,5 +26,5 @@ import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
* ourselves, so we set the value atomically to account for both eviction and the new demand.
*/
public interface EvictionAwareAllocator extends Allocator {
- void deallocateEvicted(LlapMemoryBuffer buffer);
+ void deallocateEvicted(MemoryBuffer buffer);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
index c8ea475..83eb0af 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LlapDataBuffer.java
@@ -21,13 +21,13 @@ package org.apache.hadoop.hive.llap.cache;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.hive.common.io.storage_api.MemoryBuffer;
import org.apache.hadoop.hive.llap.DebugUtils;
-import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
import com.google.common.annotations.VisibleForTesting;
-public final class LlapDataBuffer extends LlapCacheableBuffer implements LlapMemoryBuffer {
+public final class LlapDataBuffer extends LlapCacheableBuffer implements MemoryBuffer {
// For now, we don't track refcount for metadata blocks, don't clear them, don't reuse them and
// basically rely on GC to remove them. So, refcount only applies to data blocks. If that
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
new file mode 100644
index 0000000..13944ff
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCache.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cache;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.common.DiskRange;
+import org.apache.hadoop.hive.common.DiskRangeList;
+import org.apache.hadoop.hive.common.io.storage_api.DataCache.BooleanRef;
+import org.apache.hadoop.hive.common.io.storage_api.Allocator;
+import org.apache.hadoop.hive.common.io.storage_api.DataCache.DiskRangeListFactory;
+import org.apache.hadoop.hive.common.io.storage_api.MemoryBuffer;
+
+public interface LowLevelCache {
+ public enum Priority {
+ NORMAL,
+ HIGH
+ }
+
+ /**
+ * Gets file data for particular offsets. The range list is modified in place; it is then
+ * returned (since the list head could have changed). Ranges are replaced with cached ranges.
+ * In case of partial overlap with cached data, full cache blocks are always returned;
+ * there's no capacity for partial matches in return type. The rules are as follows:
+ * 1) If the requested range starts in the middle of a cached range, that cached range will not
+ * be returned by default (e.g. if [100,200) and [200,300) are cached, the request for
+ * [150,300) will only return [200,300) from cache). This may be configurable in impls.
+ * This is because we assume well-known range start offsets are used (rg/stripe offsets), so
+ * a request from the middle of the start doesn't make sense.
+ * 2) If the requested range ends in the middle of a cached range, that entire cached range will
+ * be returned (e.g. if [100,200) and [200,300) are cached, the request for [100,250) will
+ * return both ranges). It should really be same as #1, however currently ORC uses estimated
+ * end offsets; we do in fact know in such cases that partially-matched cached block (rg)
+ * can be thrown away, the reader will never touch it; but we need code in the reader to
+ * handle such cases to avoid disk reads for these "tails" vs real unmatched ranges.
+ * Some sort of InvalidCacheChunk could be placed to avoid them. TODO
+ * @param base base offset for the ranges (stripe/stream offset in case of ORC).
+ */
+ DiskRangeList getFileData(long fileId, DiskRangeList range, long baseOffset,
+ DiskRangeListFactory factory, LowLevelCacheCounters qfCounters, BooleanRef gotAllData);
+
+ /**
+ * Puts file data into cache.
+ * @return null if all data was put; bitmask indicating which chunks were not put otherwise;
+ * the replacement chunks from cache are updated directly in the array.
+ */
+ long[] putFileData(long fileId, DiskRange[] ranges, MemoryBuffer[] chunks,
+ long baseOffset, Priority priority, LowLevelCacheCounters qfCounters);
+
+ Allocator getAllocator();
+
+ /**
+ * Releases the buffer returned by getFileData.
+ */
+ void releaseBuffer(MemoryBuffer buffer);
+
+ void releaseBuffers(List<MemoryBuffer> cacheBuffers);
+
+ boolean reuseBuffer(MemoryBuffer buffer);
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheCounters.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheCounters.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheCounters.java
new file mode 100644
index 0000000..a0810f0
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheCounters.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.cache;
+
+public interface LowLevelCacheCounters {
+ void recordCacheHit(long bytesHit);
+ void recordCacheMiss(long bytesMissed);
+ void recordAllocBytes(long bytesWasted, long bytesAllocated);
+ void recordHdfsTime(long timeUs);
+ long startTimeCounter();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
index 3798aaa..6a54623 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
@@ -27,11 +27,12 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hive.common.DiskRange;
import org.apache.hadoop.hive.common.DiskRangeList;
-import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListMutateHelper;
+import org.apache.hadoop.hive.common.DiskRangeList.MutateHelper;
+import org.apache.hadoop.hive.common.io.storage_api.Allocator;
+import org.apache.hadoop.hive.common.io.storage_api.DataCache.BooleanRef;
+import org.apache.hadoop.hive.common.io.storage_api.DataCache.DiskRangeListFactory;
+import org.apache.hadoop.hive.common.io.storage_api.MemoryBuffer;
import org.apache.hadoop.hive.llap.DebugUtils;
-import org.apache.hadoop.hive.llap.counters.LowLevelCacheCounters;
-import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
-import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
@@ -74,14 +75,8 @@ public class LowLevelCacheImpl implements LowLevelCache, LlapOomDebugDump {
}
@Override
- public DiskRangeList getFileData(
- long fileId, DiskRangeList ranges, long baseOffset, CacheChunkFactory factory) {
- return getFileData(fileId, ranges, baseOffset, factory, null);
- }
-
- @Override
public DiskRangeList getFileData(long fileId, DiskRangeList ranges, long baseOffset,
- CacheChunkFactory factory, LowLevelCacheCounters qfCounters) {
+ DiskRangeListFactory factory, LowLevelCacheCounters qfCounters, BooleanRef gotAllData) {
if (ranges == null) return null;
DiskRangeList prev = ranges.prev;
FileCache subCache = cache.get(fileId);
@@ -91,25 +86,24 @@ public class LowLevelCacheImpl implements LowLevelCache, LlapOomDebugDump {
if (qfCounters != null) {
qfCounters.recordCacheMiss(totalMissed);
}
- if (prev != null && prev instanceof CacheListHelper) {
- ((CacheListHelper)prev).didGetAllData = false;
+ if (prev != null && gotAllData != null) {
+ gotAllData.value = false;
}
return ranges;
}
- CacheListHelper resultObj = null;
try {
if (prev == null) {
- prev = new DiskRangeListMutateHelper(ranges);
- } else if (prev instanceof CacheListHelper) {
- resultObj = (CacheListHelper)prev;
- resultObj.didGetAllData = true;
+ prev = new MutateHelper(ranges);
+ }
+ if (gotAllData != null) {
+ gotAllData.value = true;
}
DiskRangeList current = ranges;
while (current != null) {
metrics.incrCacheRequestedBytes(current.getLength());
// We assume ranges in "ranges" are non-overlapping; thus, we will save next in advance.
DiskRangeList next = current.next;
- getOverlappingRanges(baseOffset, current, subCache.cache, factory, resultObj);
+ getOverlappingRanges(baseOffset, current, subCache.cache, factory, gotAllData);
current = next;
}
} finally {
@@ -134,8 +128,8 @@ public class LowLevelCacheImpl implements LowLevelCache, LlapOomDebugDump {
}
private void getOverlappingRanges(long baseOffset, DiskRangeList currentNotCached,
- ConcurrentSkipListMap<Long, LlapDataBuffer> cache, CacheChunkFactory factory,
- CacheListHelper resultObj) {
+ ConcurrentSkipListMap<Long, LlapDataBuffer> cache, DiskRangeListFactory factory,
+ BooleanRef gotAllData) {
long absOffset = currentNotCached.getOffset() + baseOffset;
if (!doAssumeGranularBlocks) {
// This currently only happens in tests. See getFileData comment on the interface.
@@ -161,8 +155,8 @@ public class LowLevelCacheImpl implements LowLevelCache, LlapOomDebugDump {
if (!lockBuffer(buffer, true)) {
// If we cannot lock, remove this from cache and continue.
matches.remove();
- if (resultObj != null) {
- resultObj.didGetAllData = false;
+ if (gotAllData != null) {
+ gotAllData.value = false;
}
continue;
}
@@ -174,13 +168,13 @@ public class LowLevelCacheImpl implements LowLevelCache, LlapOomDebugDump {
cacheEnd = cacheOffset + buffer.declaredCachedLength;
DiskRangeList currentCached = factory.createCacheChunk(buffer,
cacheOffset - baseOffset, cacheEnd - baseOffset);
- currentNotCached = addCachedBufferToIter(currentNotCached, currentCached, resultObj);
+ currentNotCached = addCachedBufferToIter(currentNotCached, currentCached, gotAllData);
metrics.incrCacheHitBytes(Math.min(requestedLength, currentCached.getLength()));
}
if (currentNotCached != null) {
assert !currentNotCached.hasData();
- if (resultObj != null) {
- resultObj.didGetAllData = false;
+ if (gotAllData != null) {
+ gotAllData.value = false;
}
}
}
@@ -193,7 +187,7 @@ public class LowLevelCacheImpl implements LowLevelCache, LlapOomDebugDump {
* @return The new currentNotCached pointer, following the cached buffer insertion.
*/
private DiskRangeList addCachedBufferToIter(
- DiskRangeList currentNotCached, DiskRangeList currentCached, CacheListHelper resultObj) {
+ DiskRangeList currentNotCached, DiskRangeList currentCached, BooleanRef gotAllData) {
if (currentNotCached.getOffset() >= currentCached.getOffset()) {
if (currentNotCached.getEnd() <= currentCached.getEnd()) { // we assume it's always "==" now
// Replace the entire current DiskRange with new cached range.
@@ -206,8 +200,8 @@ public class LowLevelCacheImpl implements LowLevelCache, LlapOomDebugDump {
}
} else {
// There's some part of current buffer that is not cached.
- if (resultObj != null) {
- resultObj.didGetAllData = false;
+ if (gotAllData != null) {
+ gotAllData.value = false;
}
assert currentNotCached.getOffset() < currentCached.getOffset()
|| currentNotCached.prev == null
@@ -238,13 +232,7 @@ public class LowLevelCacheImpl implements LowLevelCache, LlapOomDebugDump {
}
@Override
- public long[] putFileData(long fileId, DiskRange[] ranges,
- LlapMemoryBuffer[] buffers, long baseOffset, Priority priority) {
- return putFileData(fileId, ranges, buffers, baseOffset, priority, null);
- }
-
- @Override
- public long[] putFileData(long fileId, DiskRange[] ranges, LlapMemoryBuffer[] buffers,
+ public long[] putFileData(long fileId, DiskRange[] ranges, MemoryBuffer[] buffers,
long baseOffset, Priority priority, LowLevelCacheCounters qfCounters) {
long[] result = null;
assert buffers.length == ranges.length;
@@ -346,13 +334,13 @@ public class LowLevelCacheImpl implements LowLevelCache, LlapOomDebugDump {
}
@Override
- public void releaseBuffer(LlapMemoryBuffer buffer) {
+ public void releaseBuffer(MemoryBuffer buffer) {
unlockBuffer((LlapDataBuffer)buffer, true);
}
@Override
- public void releaseBuffers(List<LlapMemoryBuffer> cacheBuffers) {
- for (LlapMemoryBuffer b : cacheBuffers) {
+ public void releaseBuffers(List<MemoryBuffer> cacheBuffers) {
+ for (MemoryBuffer b : cacheBuffers) {
unlockBuffer((LlapDataBuffer)b, true);
}
}
@@ -514,12 +502,7 @@ public class LowLevelCacheImpl implements LowLevelCache, LlapOomDebugDump {
}
@Override
- public LlapMemoryBuffer createUnallocated() {
- return new LlapDataBuffer();
- }
-
- @Override
- public boolean notifyReused(LlapMemoryBuffer buffer) {
+ public boolean reuseBuffer(MemoryBuffer buffer) {
// notifyReused implies that buffer is already locked; it's also called once for new
// buffers that are not cached yet. Don't notify cache policy.
return lockBuffer(((LlapDataBuffer)buffer), false);
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
index 0b50749..acbaf85 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCachePolicy.java
@@ -18,7 +18,7 @@
package org.apache.hadoop.hive.llap.cache;
-import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache.Priority;
+import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
public interface LowLevelCachePolicy extends LlapOomDebugDump {
void cache(LlapCacheableBuffer buffer, Priority priority);
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
index a1ed7ea..9de159c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelFifoCachePolicy.java
@@ -28,7 +28,7 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache.Priority;
+import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
public class LowLevelFifoCachePolicy implements LowLevelCachePolicy {
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
index b43b31d..f551edb 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.DebugUtils;
-import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache.Priority;
+import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java
index f422911..6cd0c4a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java
@@ -18,16 +18,16 @@
package org.apache.hadoop.hive.llap.cache;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer;
+import org.apache.hadoop.hive.common.io.storage_api.EncodedColumnBatch.ColumnStreamData;
public class NoopCache<CacheKey> implements Cache<CacheKey> {
@Override
- public StreamBuffer[] cacheOrGet(CacheKey key, StreamBuffer[] value) {
+ public ColumnStreamData[] cacheOrGet(CacheKey key, ColumnStreamData[] value) {
return value;
}
@Override
- public StreamBuffer[] get(CacheKey key) {
+ public ColumnStreamData[] get(CacheKey key) {
return null; // TODO: ensure real implementation increases refcount
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java
index 5d2915f..5d16f72 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/counters/QueryFragmentCounters.java
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicLongArray;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.cache.LowLevelCacheCounters;
/**
* Per query counters.
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
index e90dbbc..152230c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
@@ -23,7 +23,6 @@ import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
-import org.apache.hadoop.hive.llap.Consumer;
import org.apache.hadoop.hive.llap.ConsumerFeedback;
import org.apache.hadoop.hive.llap.DebugUtils;
import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
@@ -36,6 +35,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.llap.Consumer;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
import org.apache.hadoop.hive.ql.metadata.HiveException;
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index 40daeec..063eb08 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -26,6 +26,7 @@ import javax.management.ObjectName;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.io.storage_api.Allocator;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.LogLevels;
import org.apache.hadoop.hive.llap.cache.BuddyAllocator;
@@ -39,8 +40,6 @@ import org.apache.hadoop.hive.llap.cache.LowLevelFifoCachePolicy;
import org.apache.hadoop.hive.llap.cache.LowLevelLrfuCachePolicy;
import org.apache.hadoop.hive.llap.cache.NoopCache;
import org.apache.hadoop.hive.llap.io.api.LlapIo;
-import org.apache.hadoop.hive.llap.cache.Allocator;
-import org.apache.hadoop.hive.llap.io.api.orc.OrcCacheKey;
import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer;
import org.apache.hadoop.hive.llap.io.decode.OrcColumnVectorProducer;
import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache;
@@ -48,6 +47,7 @@ import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonQueueMetrics;
import org.apache.hadoop.hive.llap.metrics.MetricsUtils;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.orc.llap.OrcCacheKey;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.metrics2.util.MBeans;
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
index f561270..79d3b32 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
@@ -20,9 +20,9 @@ package org.apache.hadoop.hive.llap.io.decode;
import java.util.List;
-import org.apache.hadoop.hive.llap.Consumer;
import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
+import org.apache.hadoop.hive.ql.io.orc.llap.Consumer;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.mapred.InputSplit;
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
index 4c61463..b1d34ec 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java
@@ -22,11 +22,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Callable;
-import org.apache.hadoop.hive.llap.Consumer;
+import org.apache.hadoop.hive.common.io.storage_api.EncodedColumnBatch;
import org.apache.hadoop.hive.llap.ConsumerFeedback;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonQueueMetrics;
+import org.apache.hadoop.hive.ql.io.orc.llap.Consumer;
import org.apache.hive.common.util.FixedSizedObjectPool;
import org.apache.hive.common.util.FixedSizedObjectPool.PoolObjectHelper;
@@ -80,10 +80,10 @@ public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedCol
synchronized (pendingData) {
localIsStopped = isStopped;
if (!localIsStopped) {
- targetBatch = pendingData.get(data.batchKey);
+ targetBatch = pendingData.get(data.getBatchKey());
if (targetBatch == null) {
targetBatch = data;
- pendingData.put(data.batchKey, data);
+ pendingData.put(data.getBatchKey(), data);
}
// We have the map locked; the code the throws things away from map only bumps the version
// under the same map lock; code the throws things away here only bumps the version when
@@ -102,7 +102,7 @@ public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedCol
throw new UnsupportedOperationException("Merging is not supported");
}
synchronized (pendingData) {
- targetBatch = isStopped ? null : pendingData.remove(data.batchKey);
+ targetBatch = isStopped ? null : pendingData.remove(data.getBatchKey());
// Check if someone already threw this away and changed the version.
localIsStopped = (targetBatchVersion != targetBatch.version);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1e3b59d3/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
index 7db60e0..765ade3 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
@@ -22,18 +22,18 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.Consumer;
import org.apache.hadoop.hive.llap.cache.Cache;
+import org.apache.hadoop.hive.llap.cache.LowLevelCache;
import org.apache.hadoop.hive.llap.cache.LowLevelCacheImpl;
import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
-import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
-import org.apache.hadoop.hive.llap.io.api.orc.OrcCacheKey;
import org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataReader;
import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonQueueMetrics;
+import org.apache.hadoop.hive.ql.io.orc.llap.Consumer;
+import org.apache.hadoop.hive.ql.io.orc.llap.OrcCacheKey;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.mapred.InputSplit;