You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/04/26 19:54:02 UTC
[39/50] [abbrv] hive git commit: HIVE-13241 : LLAP: Incremental
Caching marks some small chunks as "incomplete CB" (Sergey Shelukhin,
reviewed by Prasanth Jayachandran)
HIVE-13241 : LLAP: Incremental Caching marks some small chunks as "incomplete CB" (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/eead54c9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/eead54c9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/eead54c9
Branch: refs/heads/llap
Commit: eead54c981984d946ef0d9f932667ea48650a2c3
Parents: 53249a3
Author: Sergey Shelukhin <se...@apache.org>
Authored: Mon Apr 25 18:20:53 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Mon Apr 25 18:20:53 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 4 +
.../hive/llap/cache/EvictionDispatcher.java | 5 +
.../hive/llap/io/api/impl/LlapIoImpl.java | 4 +-
.../llap/io/encoded/OrcEncodedDataReader.java | 16 ++-
.../llap/io/metadata/OrcFileEstimateErrors.java | 121 +++++++++++++++++++
.../hive/llap/io/metadata/OrcMetadataCache.java | 58 ++++++++-
.../hive/llap/cache/TestOrcMetadataCache.java | 2 +-
.../ql/io/orc/encoded/EncodedReaderImpl.java | 98 ++++++++-------
8 files changed, 256 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/eead54c9/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 8ccc262..bae3999 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2603,6 +2603,10 @@ public class HiveConf extends Configuration {
"modification time, which is almost certain to identify file uniquely. However, if you\n" +
"use a FS without file IDs and rewrite files a lot (or are paranoid), you might want\n" +
"to avoid this setting."),
+ LLAP_CACHE_ENABLE_ORC_GAP_CACHE("hive.llap.orc.gap.cache", true,
+ "Whether LLAP cache for ORC should remember gaps in ORC RG read estimates, to avoid\n" +
+ "re-reading the data that was read once and discarded because it is unneeded. This is\n" +
+ "only necessary for ORC files written before HIVE-9660 (Hive 2.1?)."),
LLAP_IO_USE_FILEID_PATH("hive.llap.io.use.fileid.path", true,
"Whether LLAP should use fileId (inode)-based path to ensure better consistency for the\n" +
"cases of file overwrites. This is supported on HDFS."),
http://git-wip-us.apache.org/repos/asf/hive/blob/eead54c9/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
index bae571e..91932e2 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/EvictionDispatcher.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hive.llap.cache;
+import org.apache.hadoop.hive.llap.io.metadata.OrcFileEstimateErrors;
import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata;
import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache;
import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
@@ -49,4 +50,8 @@ public final class EvictionDispatcher implements EvictionListener {
public void notifyEvicted(OrcStripeMetadata buffer) {
metadataCache.notifyEvicted(buffer);
}
+
+ public void notifyEvicted(OrcFileEstimateErrors buffer) {
+ metadataCache.notifyEvicted(buffer);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/eead54c9/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
index d43ff15..6a72b4c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.io.Allocator;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.cache.BuddyAllocator;
import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
import org.apache.hadoop.hive.llap.cache.EvictionAwareAllocator;
@@ -117,7 +118,8 @@ public class LlapIoImpl implements LlapIo<VectorizedRowBatch> {
EvictionAwareAllocator allocator = new BuddyAllocator(conf, memManager, cacheMetrics);
this.allocator = allocator;
orcCache = new LowLevelCacheImpl(cacheMetrics, cachePolicy, allocator, true);
- metadataCache = new OrcMetadataCache(memManager, cachePolicy);
+ boolean useGapCache = HiveConf.getBoolVar(conf, ConfVars.LLAP_CACHE_ENABLE_ORC_GAP_CACHE);
+ metadataCache = new OrcMetadataCache(memManager, cachePolicy, useGapCache);
// And finally cache policy uses cache to notify it of eviction. The cycle is complete!
cachePolicy.setEvictionListener(new EvictionDispatcher(orcCache, metadataCache));
cachePolicy.setParentDebugDumper(orcCache);
http://git-wip-us.apache.org/repos/asf/hive/blob/eead54c9/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index eb953c7..406f8f6 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@ -817,15 +817,23 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void>
@Override
public DiskRangeList getFileData(Object fileKey, DiskRangeList range,
long baseOffset, DiskRangeListFactory factory, BooleanRef gotAllData) {
- return (lowLevelCache == null) ? range : lowLevelCache.getFileData(
- fileKey, range, baseOffset, factory, counters, gotAllData);
+ DiskRangeList result = (lowLevelCache == null) ? range
+ : lowLevelCache.getFileData(fileKey, range, baseOffset, factory, counters, gotAllData);
+ if (gotAllData.value) return result;
+ return (metadataCache == null) ? range
+ : metadataCache.getIncompleteCbs(fileKey, range, baseOffset, factory, gotAllData);
}
@Override
public long[] putFileData(Object fileKey, DiskRange[] ranges,
MemoryBuffer[] data, long baseOffset) {
- return (lowLevelCache == null) ? null : lowLevelCache.putFileData(
- fileKey, ranges, data, baseOffset, Priority.NORMAL, counters);
+ if (data != null) {
+ return (lowLevelCache == null) ? null : lowLevelCache.putFileData(
+ fileKey, ranges, data, baseOffset, Priority.NORMAL, counters);
+ } else if (metadataCache != null) {
+ metadataCache.putIncompleteCbs(fileKey, ranges, baseOffset);
+ }
+ return null;
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/eead54c9/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java
new file mode 100644
index 0000000..ad88b98
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileEstimateErrors.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.metadata;
+
+import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
+import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
+import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory;
+import org.apache.hadoop.hive.common.io.DiskRangeList.MutateHelper;
+import org.apache.hadoop.hive.llap.IncrementalObjectSizeEstimator;
+import org.apache.hadoop.hive.llap.IncrementalObjectSizeEstimator.ObjectEstimator;
+import org.apache.hadoop.hive.llap.cache.EvictionDispatcher;
+import org.apache.hadoop.hive.llap.cache.LlapCacheableBuffer;
+import org.apache.hadoop.hive.ql.io.SyntheticFileId;
+
+public class OrcFileEstimateErrors extends LlapCacheableBuffer {
+ private final Object fileKey;
+ private int estimatedMemUsage;
+ private final ConcurrentHashMap<Long, Integer> cache = new ConcurrentHashMap<>();
+
+ private final static HashMap<Class<?>, ObjectEstimator> SIZE_ESTIMATORS;
+ private final static ObjectEstimator SIZE_ESTIMATOR;
+ static {
+ SIZE_ESTIMATORS = IncrementalObjectSizeEstimator.createEstimators(createDummy(
+ new SyntheticFileId(new Path("/"), 0, 0)));
+ SIZE_ESTIMATOR = SIZE_ESTIMATORS.get(OrcFileEstimateErrors.class);
+ }
+
+ public OrcFileEstimateErrors(Object fileKey) {
+ this.fileKey = fileKey;
+ }
+
+ public void addError(long offset, int length, long baseOffset) {
+ Long key = Long.valueOf(offset + baseOffset);
+ Integer existingLength = cache.get(key);
+ if (existingLength != null && existingLength >= length) return;
+ Integer value = Integer.valueOf(length);
+ while (true) {
+ existingLength = cache.putIfAbsent(key, value);
+ if (existingLength == null || existingLength >= length) return;
+ cache.remove(key, existingLength);
+ }
+ }
+
+ public DiskRangeList getIncompleteCbs(DiskRangeList ranges, long baseOffset,
+ DiskRangeListFactory factory, BooleanRef gotAllData) {
+ DiskRangeList prev = ranges.prev;
+ if (prev == null) {
+ prev = new MutateHelper(ranges);
+ }
+ DiskRangeList current = ranges;
+ while (current != null) {
+ // We assume ranges in "ranges" are non-overlapping; thus, we will save next in advance.
+ DiskRangeList check = current;
+ current = current.next;
+ if (check.hasData()) continue;
+ Integer badLength = cache.get(Long.valueOf(check.getOffset() + baseOffset));
+ if (badLength == null || badLength < check.getLength()) {
+ gotAllData.value = false;
+ continue;
+ }
+ check.removeSelf();
+ }
+ return prev.next;
+ }
+
+ public Object getFileKey() {
+ return fileKey;
+ }
+
+ public long estimateMemoryUsage() {
+ // Since we won't be able to update this as we add, for now, estimate 10x usage.
+ // This shouldn't be much and this cache should be remove later anyway.
+ estimatedMemUsage = 10 * SIZE_ESTIMATOR.estimate(this, SIZE_ESTIMATORS);
+ return estimatedMemUsage;
+ }
+
+ private static OrcFileEstimateErrors createDummy(Object fileKey) {
+ OrcFileEstimateErrors dummy = new OrcFileEstimateErrors(fileKey);
+ dummy.addError(0L, 0, 0L);
+ return dummy;
+ }
+
+ @Override
+ protected boolean invalidate() {
+ return true;
+ }
+
+ @Override
+ public long getMemoryUsage() {
+ return estimatedMemUsage;
+ }
+
+ @Override
+ public void notifyEvicted(EvictionDispatcher evictionDispatcher) {
+ evictionDispatcher.notifyEvicted(this);
+ }
+
+ @Override
+ protected boolean isLocked() {
+ return false;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/eead54c9/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
index e970137..66713d3 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
@@ -21,22 +21,29 @@ package org.apache.hadoop.hive.llap.io.metadata;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.hive.common.io.DiskRange;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
+import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
+import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory;
import org.apache.hadoop.hive.llap.cache.LowLevelCachePolicy;
import org.apache.hadoop.hive.llap.cache.MemoryManager;
import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey;
public class OrcMetadataCache {
- private final ConcurrentHashMap<Object, OrcFileMetadata> metadata =
- new ConcurrentHashMap<Object, OrcFileMetadata>();
+ private final ConcurrentHashMap<Object, OrcFileMetadata> metadata = new ConcurrentHashMap<>();
private final ConcurrentHashMap<OrcBatchKey, OrcStripeMetadata> stripeMetadata =
- new ConcurrentHashMap<OrcBatchKey, OrcStripeMetadata>();
+ new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<Object, OrcFileEstimateErrors> estimateErrors;
private final MemoryManager memoryManager;
private final LowLevelCachePolicy policy;
- public OrcMetadataCache(MemoryManager memoryManager, LowLevelCachePolicy policy) {
+ public OrcMetadataCache(MemoryManager memoryManager, LowLevelCachePolicy policy,
+ boolean useEstimateCache) {
this.memoryManager = memoryManager;
this.policy = policy;
+ this.estimateErrors = useEstimateCache
+ ? new ConcurrentHashMap<Object, OrcFileEstimateErrors>() : null;
}
public OrcFileMetadata putFileMetadata(OrcFileMetadata metaData) {
@@ -71,6 +78,37 @@ public class OrcMetadataCache {
return val;
}
+ public void putIncompleteCbs(Object fileKey, DiskRange[] ranges, long baseOffset) {
+ if (estimateErrors == null) return;
+ OrcFileEstimateErrors errorData = estimateErrors.get(fileKey);
+ boolean isNew = false;
+ // We should technically update memory usage if updating the old object, but we don't do it
+ // for now; there is no mechanism to properly notify the cache policy/etc. wrt parallel evicts.
+ if (errorData == null) {
+ errorData = new OrcFileEstimateErrors(fileKey);
+ for (DiskRange range : ranges) {
+ errorData.addError(range.getOffset(), range.getLength(), baseOffset);
+ }
+ long memUsage = errorData.estimateMemoryUsage();
+ memoryManager.reserveMemory(memUsage, false);
+ OrcFileEstimateErrors old = estimateErrors.putIfAbsent(fileKey, errorData);
+ if (old != null) {
+ errorData = old;
+ memoryManager.releaseMemory(memUsage);
+ policy.notifyLock(errorData);
+ } else {
+ isNew = true;
+ policy.cache(errorData, Priority.NORMAL);
+ }
+ }
+ if (!isNew) {
+ for (DiskRange range : ranges) {
+ errorData.addError(range.getOffset(), range.getLength(), baseOffset);
+ }
+ }
+ policy.notifyUnlock(errorData);
+ }
+
public OrcStripeMetadata getStripeMetadata(OrcBatchKey stripeKey) throws IOException {
return stripeMetadata.get(stripeKey);
}
@@ -79,6 +117,14 @@ public class OrcMetadataCache {
return metadata.get(fileKey);
}
+ public DiskRangeList getIncompleteCbs(Object fileKey, DiskRangeList ranges, long baseOffset,
+ DiskRangeListFactory factory, BooleanRef gotAllData) {
+ if (estimateErrors == null) return ranges;
+ OrcFileEstimateErrors errors = estimateErrors.get(fileKey);
+ if (errors == null) return ranges;
+ return errors.getIncompleteCbs(ranges, baseOffset, factory, gotAllData);
+ }
+
public void notifyEvicted(OrcFileMetadata buffer) {
metadata.remove(buffer.getFileKey());
// See OrcFileMetadata - we don't clear the object, it will be GCed when released by users.
@@ -88,4 +134,8 @@ public class OrcMetadataCache {
stripeMetadata.remove(buffer.getKey());
// See OrcStripeMetadata - we don't clear the object, it will be GCed when released by users.
}
+
+ public void notifyEvicted(OrcFileEstimateErrors buffer) {
+ estimateErrors.remove(buffer.getFileKey());
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/eead54c9/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
index 901e58a..3f2e750 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestOrcMetadataCache.java
@@ -88,7 +88,7 @@ public class TestOrcMetadataCache {
public void testGetPut() throws Exception {
DummyMemoryManager mm = new DummyMemoryManager();
DummyCachePolicy cp = new DummyCachePolicy();
- OrcMetadataCache cache = new OrcMetadataCache(mm, cp);
+ OrcMetadataCache cache = new OrcMetadataCache(mm, cp, false);
OrcFileMetadata ofm1 = OrcFileMetadata.createDummy(1), ofm2 = OrcFileMetadata.createDummy(2);
assertSame(ofm1, cache.putFileMetadata(ofm1));
assertEquals(1, mm.allocs);
http://git-wip-us.apache.org/repos/asf/hive/blob/eead54c9/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index 8ccedb7..40cc86f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -101,18 +101,18 @@ class EncodedReaderImpl implements EncodedReader {
private final int bufferSize;
private final List<OrcProto.Type> types;
private final long rowIndexStride;
- private final DataCache cache;
+ private final DataCache cacheWrapper;
private boolean isTracingEnabled;
public EncodedReaderImpl(Object fileKey, List<OrcProto.Type> types, CompressionCodec codec,
- int bufferSize, long strideRate, DataCache cache, DataReader dataReader, PoolFactory pf)
- throws IOException {
+ int bufferSize, long strideRate, DataCache cacheWrapper, DataReader dataReader,
+ PoolFactory pf) throws IOException {
this.fileKey = fileKey;
this.codec = codec;
this.types = types;
this.bufferSize = bufferSize;
this.rowIndexStride = strideRate;
- this.cache = cache;
+ this.cacheWrapper = cacheWrapper;
this.dataReader = dataReader;
if (POOLS != null) return;
if (pf == null) {
@@ -291,7 +291,7 @@ class EncodedReaderImpl implements EncodedReader {
}
BooleanRef isAllInCache = new BooleanRef();
if (hasFileId) {
- cache.getFileData(fileKey, toRead.next, stripeOffset, CC_FACTORY, isAllInCache);
+ cacheWrapper.getFileData(fileKey, toRead.next, stripeOffset, CC_FACTORY, isAllInCache);
if (isTracingEnabled && LOG.isInfoEnabled()) {
LOG.trace("Disk ranges after cache (file " + fileKey + ", base offset " + stripeOffset
+ "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next));
@@ -303,7 +303,7 @@ class EncodedReaderImpl implements EncodedReader {
this.dataReader.open();
isDataReaderOpen = true;
}
- dataReader.readFileData(toRead.next, stripeOffset, cache.getAllocator().isDirectAlloc());
+ dataReader.readFileData(toRead.next, stripeOffset, cacheWrapper.getAllocator().isDirectAlloc());
}
// 3. For uncompressed case, we need some special processing before read.
@@ -452,7 +452,7 @@ class EncodedReaderImpl implements EncodedReader {
CacheChunk cc = (CacheChunk)toFree;
if (cc.getBuffer() == null) continue;
MemoryBuffer buffer = cc.getBuffer();
- cache.releaseBuffer(buffer);
+ cacheWrapper.releaseBuffer(buffer);
cc.setBuffer(null);
}
}
@@ -501,11 +501,11 @@ class EncodedReaderImpl implements EncodedReader {
}
@Override
- public void handleCacheCollision(DataCache cache,
+ public void handleCacheCollision(DataCache cacheWrapper,
MemoryBuffer replacementBuffer, List<MemoryBuffer> cacheBuffers) {
assert cacheBuffers == null;
// This is done at pre-read stage where there's nothing special w/refcounts. Just release.
- cache.getAllocator().deallocate(getBuffer());
+ cacheWrapper.getAllocator().deallocate(getBuffer());
// Replace the buffer in our big range list, as well as in current results.
this.setBuffer(replacementBuffer);
}
@@ -544,14 +544,14 @@ class EncodedReaderImpl implements EncodedReader {
}
@Override
- public void handleCacheCollision(DataCache cache, MemoryBuffer replacementBuffer,
+ public void handleCacheCollision(DataCache cacheWrapper, MemoryBuffer replacementBuffer,
List<MemoryBuffer> cacheBuffers) {
assert originalCbIndex >= 0;
// Had the put succeeded for our new buffer, it would have refcount of 2 - 1 from put,
// and 1 from notifyReused call above. "Old" buffer now has the 1 from put; new buffer
// is not in cache.
- cache.getAllocator().deallocate(getBuffer());
- cache.reuseBuffer(replacementBuffer);
+ cacheWrapper.getAllocator().deallocate(getBuffer());
+ cacheWrapper.reuseBuffer(replacementBuffer);
// Replace the buffer in our big range list, as well as in current results.
this.buffer = replacementBuffer;
cacheBuffers.set(originalCbIndex, replacementBuffer);
@@ -594,9 +594,11 @@ class EncodedReaderImpl implements EncodedReader {
boolean isCompressed = codec != null;
List<ProcCacheChunk> toDecompress = null;
List<ByteBuffer> toRelease = null;
+ List<IncompleteCb> badEstimates = null;
if (isCompressed) {
toRelease = !dataReader.isTrackingDiskRanges() ? null : new ArrayList<ByteBuffer>();
- toDecompress = new ArrayList<ProcCacheChunk>();
+ toDecompress = new ArrayList<>();
+ badEstimates = new ArrayList<>();
}
// 1. Find our bearings in the stream. Normally, iter will already point either to where we
@@ -612,17 +614,25 @@ class EncodedReaderImpl implements EncodedReader {
// 2. Go thru the blocks; add stuff to results and prepare the decompression work (see below).
lastUncompressed = isCompressed ?
prepareRangesForCompressedRead(cOffset, endCOffset, streamOffset,
- unlockUntilCOffset, current, csd, toRelease, toDecompress)
+ unlockUntilCOffset, current, csd, toRelease, toDecompress, badEstimates)
: prepareRangesForUncompressedRead(
cOffset, endCOffset, streamOffset, unlockUntilCOffset, current, csd);
+ // 2.5. Remember the bad estimates for future reference.
+ if (badEstimates != null && !badEstimates.isEmpty()) {
+ // Relies on the fact that cache does not actually store these.
+ DiskRange[] cacheKeys = badEstimates.toArray(new DiskRange[badEstimates.size()]);
+ long[] result = cacheWrapper.putFileData(fileKey, cacheKeys, null, baseOffset);
+ assert result == null; // We don't expect conflicts from bad estimates.
+ }
+
+ if (toDecompress == null || toDecompress.isEmpty()) return lastUncompressed; // Nothing to do.
+
// 3. Allocate the buffers, prepare cache keys.
// At this point, we have read all the CBs we need to read. cacheBuffers contains some cache
// data and some unallocated membufs for decompression. toDecompress contains all the work we
// need to do, and each item points to one of the membufs in cacheBuffers as target. The iter
// has also been adjusted to point to these buffers instead of compressed data for the ranges.
- if (toDecompress == null) return lastUncompressed; // Nothing to decompress.
-
MemoryBuffer[] targetBuffers = new MemoryBuffer[toDecompress.size()];
DiskRange[] cacheKeys = new DiskRange[toDecompress.size()];
int ix = 0;
@@ -631,7 +641,7 @@ class EncodedReaderImpl implements EncodedReader {
targetBuffers[ix] = chunk.getBuffer();
++ix;
}
- cache.getAllocator().allocateMultiple(targetBuffers, bufferSize);
+ cacheWrapper.getAllocator().allocateMultiple(targetBuffers, bufferSize);
// 4. Now decompress (or copy) the data into cache buffers.
for (ProcCacheChunk chunk : toDecompress) {
@@ -646,7 +656,7 @@ class EncodedReaderImpl implements EncodedReader {
if (isTracingEnabled) {
LOG.trace("Locking " + chunk.getBuffer() + " due to reuse (after decompression)");
}
- cache.reuseBuffer(chunk.getBuffer());
+ cacheWrapper.reuseBuffer(chunk.getBuffer());
}
// 5. Release original compressed buffers to zero-copy reader if needed.
@@ -658,8 +668,9 @@ class EncodedReaderImpl implements EncodedReader {
}
// 6. Finally, put uncompressed data to cache.
+
if (fileKey != null) {
- long[] collisionMask = cache.putFileData(fileKey, cacheKeys, targetBuffers, baseOffset);
+ long[] collisionMask = cacheWrapper.putFileData(fileKey, cacheKeys, targetBuffers, baseOffset);
processCacheCollisions(collisionMask, toDecompress, targetBuffers, csd.getCacheBuffers());
}
@@ -674,7 +685,8 @@ class EncodedReaderImpl implements EncodedReader {
private CacheChunk prepareRangesForCompressedRead(long cOffset, long endCOffset,
long streamOffset, long unlockUntilCOffset, DiskRangeList current, ColumnStreamData columnStreamData,
- List<ByteBuffer> toRelease, List<ProcCacheChunk> toDecompress) throws IOException {
+ List<ByteBuffer> toRelease, List<ProcCacheChunk> toDecompress,
+ List<IncompleteCb> badEstimates) throws IOException {
if (cOffset > current.getOffset()) {
// Target compression block is in the middle of the range; slice the range in two.
current = current.split(cOffset).next;
@@ -689,7 +701,7 @@ class EncodedReaderImpl implements EncodedReader {
if (isTracingEnabled) {
LOG.trace("Locking " + cc.getBuffer() + " due to reuse");
}
- cache.reuseBuffer(cc.getBuffer());
+ cacheWrapper.reuseBuffer(cc.getBuffer());
columnStreamData.getCacheBuffers().add(cc.getBuffer());
currentOffset = cc.getEnd();
if (isTracingEnabled) {
@@ -710,7 +722,7 @@ class EncodedReaderImpl implements EncodedReader {
// several disk ranges, so we might need to combine them.
BufferChunk bc = (BufferChunk)current;
ProcCacheChunk newCached = addOneCompressionBuffer(
- bc, columnStreamData.getCacheBuffers(), toDecompress, toRelease);
+ bc, columnStreamData.getCacheBuffers(), toDecompress, toRelease, badEstimates);
lastUncompressed = (newCached == null) ? lastUncompressed : newCached;
next = (newCached != null) ? newCached.next : null;
currentOffset = (next != null) ? next.getOffset() : -1;
@@ -737,7 +749,7 @@ class EncodedReaderImpl implements EncodedReader {
if (isTracingEnabled) {
LOG.trace("Locking " + lastUncompressed.getBuffer() + " due to reuse");
}
- cache.reuseBuffer(lastUncompressed.getBuffer());
+ cacheWrapper.reuseBuffer(lastUncompressed.getBuffer());
if (isFirst) {
columnStreamData.setIndexBaseOffset((int)(lastUncompressed.getOffset() - streamOffset));
isFirst = false;
@@ -818,7 +830,7 @@ class EncodedReaderImpl implements EncodedReader {
if (noMoreDataForPart && hasEntirePartTo < partEnd && candidateCached != null) {
// We are missing a section at the end of the part... copy the start to non-cached.
lastUncompressed = copyAndReplaceCandidateToNonCached(
- candidateCached, partOffset, hasEntirePartTo, cache, singleAlloc);
+ candidateCached, partOffset, hasEntirePartTo, cacheWrapper, singleAlloc);
candidateCached = null;
}
current = next;
@@ -850,10 +862,10 @@ class EncodedReaderImpl implements EncodedReader {
if (candidateCached != null) {
assert hadEntirePartTo != -1;
copyAndReplaceCandidateToNonCached(
- candidateCached, partOffset, hadEntirePartTo, cache, singleAlloc);
+ candidateCached, partOffset, hadEntirePartTo, cacheWrapper, singleAlloc);
candidateCached = null;
}
- lastUncompressed = copyAndReplaceUncompressedToNonCached(curBc, cache, singleAlloc);
+ lastUncompressed = copyAndReplaceUncompressedToNonCached(curBc, cacheWrapper, singleAlloc);
next = lastUncompressed.next; // There may be more data after the gap.
} else {
// So far we have all the data from the beginning of the part.
@@ -885,7 +897,7 @@ class EncodedReaderImpl implements EncodedReader {
cacheKeys[ix] = chunk; // Relies on the fact that cache does not actually store these.
++ix;
}
- cache.getAllocator().allocateMultiple(
+ cacheWrapper.getAllocator().allocateMultiple(
targetBuffers, (int)(partCount == 1 ? streamLen : partSize));
// 4. Now copy the data into cache buffers.
@@ -909,7 +921,7 @@ class EncodedReaderImpl implements EncodedReader {
// 6. Finally, put uncompressed data to cache.
if (fileKey != null) {
- long[] collisionMask = cache.putFileData(fileKey, cacheKeys, targetBuffers, baseOffset);
+ long[] collisionMask = cacheWrapper.putFileData(fileKey, cacheKeys, targetBuffers, baseOffset);
processCacheCollisions(collisionMask, toCache, targetBuffers, null);
}
@@ -922,7 +934,7 @@ class EncodedReaderImpl implements EncodedReader {
// of the prevalent ORC compression buffer (the default), or maximum allocation (since we
// cannot allocate bigger chunks), whichever is less.
long orcCbSizeDefault = ((Number)OrcConf.BUFFER_SIZE.getDefaultValue()).longValue();
- int maxAllocSize = cache.getAllocator().getMaxAllocation();
+ int maxAllocSize = cacheWrapper.getAllocator().getMaxAllocation();
return (int)Math.min(maxAllocSize, orcCbSizeDefault);
}
@@ -942,14 +954,14 @@ class EncodedReaderImpl implements EncodedReader {
private static CacheChunk copyAndReplaceCandidateToNonCached(
UncompressedCacheChunk candidateCached, long partOffset,
- long candidateEnd, DataCache cache, MemoryBuffer[] singleAlloc) {
+ long candidateEnd, DataCache cacheWrapper, MemoryBuffer[] singleAlloc) {
// We thought we had the entire part to cache, but we don't; convert start to
// non-cached. Since we are at the first gap, the previous stuff must be contiguous.
singleAlloc[0] = null;
- cache.getAllocator().allocateMultiple(singleAlloc, (int)(candidateEnd - partOffset));
+ cacheWrapper.getAllocator().allocateMultiple(singleAlloc, (int)(candidateEnd - partOffset));
MemoryBuffer buffer = singleAlloc[0];
- cache.reuseBuffer(buffer);
+ cacheWrapper.reuseBuffer(buffer);
ByteBuffer dest = buffer.getByteBufferRaw();
CacheChunk tcc = POOLS.tccPool.take();
tcc.init(buffer, partOffset, candidateEnd);
@@ -958,11 +970,11 @@ class EncodedReaderImpl implements EncodedReader {
}
private static CacheChunk copyAndReplaceUncompressedToNonCached(
- BufferChunk bc, DataCache cache, MemoryBuffer[] singleAlloc) {
+ BufferChunk bc, DataCache cacheWrapper, MemoryBuffer[] singleAlloc) {
singleAlloc[0] = null;
- cache.getAllocator().allocateMultiple(singleAlloc, bc.getLength());
+ cacheWrapper.getAllocator().allocateMultiple(singleAlloc, bc.getLength());
MemoryBuffer buffer = singleAlloc[0];
- cache.reuseBuffer(buffer);
+ cacheWrapper.reuseBuffer(buffer);
ByteBuffer dest = buffer.getByteBufferRaw();
CacheChunk tcc = POOLS.tccPool.take();
tcc.init(buffer, bc.getOffset(), bc.getEnd());
@@ -1056,7 +1068,7 @@ class EncodedReaderImpl implements EncodedReader {
LOG.trace("Unlocking " + cc.getBuffer() + " for the fetching thread"
+ (isBacktracking ? "; backtracking" : ""));
}
- cache.releaseBuffer(cc.getBuffer());
+ cacheWrapper.releaseBuffer(cc.getBuffer());
cc.setBuffer(null);
}
@@ -1081,7 +1093,7 @@ class EncodedReaderImpl implements EncodedReader {
}
assert replacedChunk.getBuffer() != replacementBuffer : i + " was not replaced in the results "
+ "even though mask is [" + Long.toBinaryString(maskVal) + "]";
- replacedChunk.handleCacheCollision(cache, replacementBuffer, cacheBuffers);
+ replacedChunk.handleCacheCollision(cacheWrapper, replacementBuffer, cacheBuffers);
}
maskVal >>= 1;
}
@@ -1131,11 +1143,12 @@ class EncodedReaderImpl implements EncodedReader {
* @param toDecompress The list of work to decompress - pairs of compressed buffers and the
* target buffers (same as the ones added to cacheBuffers).
* @param toRelease The list of buffers to release to zcr because they are no longer in use.
+ * @param badEstimates The list of bad estimates that cannot be decompressed.
* @return The resulting cache chunk.
*/
private ProcCacheChunk addOneCompressionBuffer(BufferChunk current,
List<MemoryBuffer> cacheBuffers, List<ProcCacheChunk> toDecompress,
- List<ByteBuffer> toRelease) throws IOException {
+ List<ByteBuffer> toRelease, List<IncompleteCb> badEstimates) throws IOException {
ByteBuffer slice = null;
ByteBuffer compressed = current.getChunk();
long cbStartOffset = current.getOffset();
@@ -1166,7 +1179,7 @@ class EncodedReaderImpl implements EncodedReader {
return cc;
}
if (current.getEnd() < cbEndOffset && !current.hasContiguousNext()) {
- addIncompleteCompressionBuffer(cbStartOffset, current, 0);
+ badEstimates.add(addIncompleteCompressionBuffer(cbStartOffset, current, 0));
return null; // This is impossible to read from this chunk.
}
@@ -1221,13 +1234,13 @@ class EncodedReaderImpl implements EncodedReader {
}
tmp.removeSelf();
} else {
- addIncompleteCompressionBuffer(cbStartOffset, tmp, extraChunkCount);
+ badEstimates.add(addIncompleteCompressionBuffer(cbStartOffset, tmp, extraChunkCount));
return null; // This is impossible to read from this chunk.
}
}
}
- private void addIncompleteCompressionBuffer(
+ private IncompleteCb addIncompleteCompressionBuffer(
long cbStartOffset, DiskRangeList target, int extraChunkCount) {
IncompleteCb icb = new IncompleteCb(cbStartOffset, target.getEnd());
if (isTracingEnabled) {
@@ -1235,6 +1248,7 @@ class EncodedReaderImpl implements EncodedReader {
+ icb + " in the buffers");
}
target.replaceSelfWith(icb);
+ return icb;
}
/**
@@ -1253,7 +1267,7 @@ class EncodedReaderImpl implements EncodedReader {
boolean isUncompressed, long cbStartOffset, long cbEndOffset, int lastChunkLength,
BufferChunk lastChunk, List<ProcCacheChunk> toDecompress, List<MemoryBuffer> cacheBuffers) {
// Prepare future cache buffer.
- MemoryBuffer futureAlloc = cache.getAllocator().createUnallocated();
+ MemoryBuffer futureAlloc = cacheWrapper.getAllocator().createUnallocated();
// Add it to result in order we are processing.
cacheBuffers.add(futureAlloc);
// Add it to the list of work to decompress.