You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2017/12/01 17:53:59 UTC
[accumulo] branch master updated: ACCUMULO-4641 Added loading to
cache API (#321)
This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push:
new d1d9344 ACCUMULO-4641 Added loading to cache API (#321)
d1d9344 is described below
commit d1d9344082702206278b3eee442351479171e644
Author: Keith Turner <ke...@deenlo.com>
AuthorDate: Fri Dec 1 12:53:56 2017 -0500
ACCUMULO-4641 Added loading to cache API (#321)
---
.../accumulo/core/client/rfile/RFileScanner.java | 44 ++-
.../core/client/rfile/RFileSummariesRetriever.java | 5 +-
.../accumulo/core/file/blockfile/ABlockReader.java | 7 +-
.../core/file/blockfile/cache/BlockCache.java | 46 ++-
.../core/file/blockfile/cache/CacheEntry.java | 22 +-
.../core/file/blockfile/cache/lru/CachedBlock.java | 66 +++-
.../file/blockfile/cache/lru/LruBlockCache.java | 76 +++-
.../cache/lru/SynchronousLoadingBlockCache.java | 139 +++++++
.../blockfile/cache/tinylfu/TinyLfuBlockCache.java | 160 ++++++--
.../file/blockfile/impl/CachableBlockFile.java | 431 +++++++++++----------
.../accumulo/core/file/rfile/BlockIndex.java | 31 +-
.../accumulo/core/file/rfile/bcfile/BCFile.java | 180 +++------
.../accumulo/core/summary/SummaryReader.java | 37 +-
.../file/blockfile/cache/TestLruBlockCache.java | 16 +-
.../accumulo/core/file/rfile/BlockIndexTest.java | 17 +-
.../core/file/rfile/MultiLevelIndexTest.java | 2 +-
.../apache/accumulo/core/file/rfile/RFileTest.java | 3 +-
17 files changed, 847 insertions(+), 435 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
index 301dfc5..15c89ed 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
@@ -23,10 +23,12 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
@@ -95,10 +97,6 @@ class RFileScanner extends ScannerOptions implements Scanner {
// block is accessed the entire thing is read into memory immediately allocating and deallocating
// a decompressor. If the user does not read all data, no decompressors are left allocated.
private static class NoopCache implements BlockCache {
- @Override
- public CacheEntry cacheBlock(String blockName, byte[] buf, boolean inMemory) {
- return null;
- }
@Override
public CacheEntry cacheBlock(String blockName, byte[] buf) {
@@ -134,6 +132,43 @@ class RFileScanner extends ScannerOptions implements Scanner {
}
};
}
+
+ @Override
+ public CacheEntry getBlock(String blockName, Loader loader) {
+ Map<String,Loader> depLoaders = loader.getDependencies();
+ Map<String,byte[]> depData;
+
+ switch (depLoaders.size()) {
+ case 0:
+ depData = Collections.emptyMap();
+ break;
+ case 1:
+ Entry<String,Loader> entry = depLoaders.entrySet().iterator().next();
+ depData = Collections.singletonMap(entry.getKey(), getBlock(entry.getKey(), entry.getValue()).getBuffer());
+ break;
+ default:
+ depData = new HashMap<>();
+ depLoaders.forEach((k, v) -> depData.put(k, getBlock(k, v).getBuffer()));
+ }
+
+ byte[] data = loader.load(Integer.MAX_VALUE, depData);
+
+ return new CacheEntry() {
+
+ @Override
+ public byte[] getBuffer() {
+ return data;
+ }
+
+ @Override
+ public <T extends Weighbable> T getIndex(Supplier<T> supplier) {
+ return null;
+ }
+
+ @Override
+ public void indexWeightChanged() {}
+ };
+ }
}
RFileScanner(Opts opts) {
@@ -305,6 +340,7 @@ class RFileScanner extends ScannerOptions implements Scanner {
RFileSource[] sources = opts.in.getSources();
List<SortedKeyValueIterator<Key,Value>> readers = new ArrayList<>(sources.length);
for (int i = 0; i < sources.length; i++) {
+ // TODO may have been a bug with multiple files and caching in older version...
FSDataInputStream inputStream = (FSDataInputStream) sources[i].getInputStream();
readers.add(new RFile.Reader(new CachableBlockFile.Reader("source-" + i, inputStream, sources[i].getLength(), opts.in.getConf(), dataCache, indexCache,
DefaultConfiguration.getInstance())));
diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSummariesRetriever.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSummariesRetriever.java
index d3a83b0..e4b3e05 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSummariesRetriever.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileSummariesRetriever.java
@@ -86,9 +86,8 @@ class RFileSummariesRetriever implements SummaryInputArguments, SummaryFSOptions
RFileSource[] sources = in.getSources();
try {
SummaryCollection all = new SummaryCollection();
- for (int i = 0; i < sources.length; i++) {
- RFileSource source = sources[i];
- SummaryReader fileSummary = SummaryReader.load("source-"+i, conf, acuconf, source.getInputStream(), source.getLength(), summarySelector, factory);
+ for (RFileSource source : sources) {
+ SummaryReader fileSummary = SummaryReader.load(conf, acuconf, source.getInputStream(), source.getLength(), summarySelector, factory);
SummaryCollection sc = fileSummary.getSummaries(Collections.singletonList(new Gatherer.RowRange(startRow, endRow)));
all.merge(sc, factory);
}
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockReader.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockReader.java
index 9f485d8..a0a7da8 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockReader.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/ABlockReader.java
@@ -19,6 +19,9 @@ package org.apache.accumulo.core.file.blockfile;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
+import java.util.function.Supplier;
+
+import org.apache.accumulo.core.file.blockfile.cache.CacheEntry.Weighbable;
/*
* Minimal interface to read a block from a
@@ -48,7 +51,9 @@ public interface ABlockReader extends DataInput {
*/
int getPosition();
- <T> T getIndex(Class<T> clazz);
+ <T extends Weighbable> T getIndex(Supplier<T> supplier);
+
+ void indexWeightChanged();
byte[] getBuffer();
}
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java
index b27c918..d5f8215 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java
@@ -17,6 +17,10 @@
*/
package org.apache.accumulo.core.file.blockfile.cache;
+import java.util.Map;
+
+import org.apache.accumulo.core.file.blockfile.cache.lru.SynchronousLoadingBlockCache;
+
/**
* Block cache interface.
*/
@@ -29,29 +33,47 @@ public interface BlockCache {
* Zero-based file block number.
* @param buf
* The block contents wrapped in a ByteBuffer.
- * @param inMemory
- * Whether block should be treated as in-memory
*/
- CacheEntry cacheBlock(String blockName, byte buf[], boolean inMemory);
+ CacheEntry cacheBlock(String blockName, byte buf[]);
/**
- * Add block to cache (defaults to not in-memory).
+ * Fetch block from cache.
*
* @param blockName
- * Zero-based file block number.
- * @param buf
- * The block contents wrapped in a ByteBuffer.
+ * Block name to fetch.
+ * @return Block or null if block is not in the cache.
*/
- CacheEntry cacheBlock(String blockName, byte buf[]);
+ CacheEntry getBlock(String blockName);
+
+ public static interface Loader {
+ /**
+ * The cache blocks that this loader depends on. If a loader has no dependencies, then it should return an empty map. All dependencies must be loaded before
+ * calling {@link #load(int, Map)}.
+ */
+ Map<String,Loader> getDependencies();
+
+ /**
+ * Loads a block. Anything returned by {@link #getDependencies()} should be loaded and passed.
+ *
+ * @param maxSize
+ * This is the maximum block size that will be cached.
+ * @return The loaded block or null if loading the block would exceed maxSize.
+ */
+ byte[] load(int maxSize, Map<String,byte[]> dependencies);
+ }
/**
- * Fetch block from cache.
+ * This method allows a cache to prevent concurrent loads of the same block. However a cache implementation is not required to prevent concurrent loads.
+ * {@link SynchronousLoadingBlockCache} is an abstract class that a cache can extent which does prevent concurrent loading of the same block.
+ *
*
* @param blockName
- * Block number to fetch.
- * @return Block or null if block is not in the cache.
+ * Block name to fetch
+ * @param loader
+ * If the block is not present in the cache, the loader can be called to load it.
+ * @return Block or null if block is not in the cache or didn't load.
*/
- CacheEntry getBlock(String blockName);
+ CacheEntry getBlock(String blockName, Loader loader);
/**
* Get the maximum amount of on heap memory this cache will use.
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CacheEntry.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CacheEntry.java
index 2faf696..b1aeced 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CacheEntry.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CacheEntry.java
@@ -16,11 +16,27 @@
*/
package org.apache.accumulo.core.file.blockfile.cache;
+import java.util.function.Supplier;
+
public interface CacheEntry {
- byte[] getBuffer();
- Object getIndex();
+ public static interface Weighbable {
+ int weight();
+ }
+
+ byte[] getBuffer();
- void setIndex(Object idx);
+ /**
+ * Optionally cache what is returned by the supplier along with this cache entry. If caching what is returned by the supplier is not supported, its ok to
+ * return null.
+ *
+ * <p>
+ * This method exists to support building indexes of frequently accessed cached data.
+ */
+ <T extends Weighbable> T getIndex(Supplier<T> supplier);
+ /**
+ * The object optionally stored by {@link #getIndex(Supplier)} is a mutable object. Accumulo will call this method whenever the weight of that object changes.
+ */
+ void indexWeightChanged();
}
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/CachedBlock.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/CachedBlock.java
index c1124f4..ec72283 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/CachedBlock.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/CachedBlock.java
@@ -18,8 +18,10 @@
package org.apache.accumulo.core.file.blockfile.cache.lru;
import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
-import org.apache.accumulo.core.file.blockfile.cache.CacheEntry;
+import org.apache.accumulo.core.file.blockfile.cache.CacheEntry.Weighbable;
import org.apache.accumulo.core.file.blockfile.cache.impl.ClassSize;
import org.apache.accumulo.core.file.blockfile.cache.impl.SizeConstants;
@@ -30,10 +32,10 @@ import org.apache.accumulo.core.file.blockfile.cache.impl.SizeConstants;
* Makes the block memory-aware with {@link HeapSize} and Comparable to sort by access time for the LRU. It also takes care of priority by either instantiating
* as in-memory or handling the transition from single to multiple access.
*/
-public class CachedBlock implements HeapSize, Comparable<CachedBlock>, CacheEntry {
+public class CachedBlock implements HeapSize, Comparable<CachedBlock> {
public final static long PER_BLOCK_OVERHEAD = ClassSize.align(ClassSize.OBJECT + (3 * ClassSize.REFERENCE) + (2 * SizeConstants.SIZEOF_LONG)
- + ClassSize.STRING + ClassSize.BYTE_BUFFER);
+ + ClassSize.STRING + ClassSize.BYTE_BUFFER + ClassSize.REFERENCE);
public static enum BlockPriority {
/**
@@ -50,18 +52,17 @@ public class CachedBlock implements HeapSize, Comparable<CachedBlock>, CacheEntr
MEMORY
}
+ private byte[] buffer;
private final String blockName;
- private final byte buf[];
private volatile long accessTime;
- private long size;
+ private volatile long recordedSize;
private BlockPriority priority;
- private Object index;
+ private Weighbable index;
public CachedBlock(String blockName, byte buf[], long accessTime, boolean inMemory) {
+ this.buffer = buf;
this.blockName = blockName;
- this.buf = buf;
this.accessTime = accessTime;
- this.size = ClassSize.align(blockName.length()) + ClassSize.align(buf.length) + PER_BLOCK_OVERHEAD;
if (inMemory) {
this.priority = BlockPriority.MEMORY;
} else {
@@ -81,7 +82,10 @@ public class CachedBlock implements HeapSize, Comparable<CachedBlock>, CacheEntr
@Override
public long heapSize() {
- return size;
+ if (recordedSize < 0) {
+ throw new IllegalStateException("Block was evicted");
+ }
+ return recordedSize;
}
@Override
@@ -101,11 +105,6 @@ public class CachedBlock implements HeapSize, Comparable<CachedBlock>, CacheEntr
return this.accessTime < that.accessTime ? 1 : -1;
}
- @Override
- public byte[] getBuffer() {
- return this.buf;
- }
-
public String getName() {
return this.blockName;
}
@@ -114,13 +113,40 @@ public class CachedBlock implements HeapSize, Comparable<CachedBlock>, CacheEntr
return this.priority;
}
- @Override
- public Object getIndex() {
- return index;
+ public byte[] getBuffer() {
+ return buffer;
}
- @Override
- public void setIndex(Object idx) {
- this.index = idx;
+ @SuppressWarnings("unchecked")
+ public synchronized <T extends Weighbable> T getIndex(Supplier<T> supplier) {
+ if (index == null && recordedSize >= 0) {
+ index = supplier.get();
+ }
+
+ return (T) index;
+ }
+
+ public synchronized long recordSize(AtomicLong totalSize) {
+ if (recordedSize >= 0) {
+ long indexSize = (index == null) ? 0 : index.weight();
+ long newSize = ClassSize.align(blockName.length()) + ClassSize.align(buffer.length) + PER_BLOCK_OVERHEAD + indexSize;
+ long delta = newSize - recordedSize;
+ recordedSize = newSize;
+ return totalSize.addAndGet(delta);
+ }
+
+ throw new IllegalStateException("Block was evicted");
+ }
+
+ public synchronized long evicted(AtomicLong totalSize) {
+ if (recordedSize >= 0) {
+ totalSize.addAndGet(recordedSize * -1);
+ long tmp = recordedSize;
+ recordedSize = -1;
+ index = null;
+ return tmp;
+ }
+
+ throw new IllegalStateException("already evicted");
}
}
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java
index f27fd43..7211f5d 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
import org.apache.accumulo.core.file.blockfile.cache.CacheEntry;
@@ -65,7 +66,7 @@ import org.slf4j.LoggerFactory;
* then while scanning determines the fewest least-recently-used blocks necessary from each of the three priorities (would be 3 times bytes to free). It then
* uses the priority chunk sizes to evict fairly according to the relative sizes and usage.
*/
-public class LruBlockCache implements BlockCache, HeapSize {
+public class LruBlockCache extends SynchronousLoadingBlockCache implements BlockCache, HeapSize {
private static final Logger log = LoggerFactory.getLogger(LruBlockCache.class);
@@ -114,6 +115,7 @@ public class LruBlockCache implements BlockCache, HeapSize {
* block cache configuration
*/
public LruBlockCache(final LruBlockCacheConfiguration conf) {
+ super();
this.conf = conf;
int mapInitialSize = (int) Math.ceil(1.2 * conf.getMaxSize() / conf.getBlockSize());
@@ -145,6 +147,43 @@ public class LruBlockCache implements BlockCache, HeapSize {
return overhead;
}
+ /*
+ * This class exists so that every cache entry does not have a reference to the cache.
+ */
+ private class LruCacheEntry implements CacheEntry {
+ private final CachedBlock block;
+
+ LruCacheEntry(CachedBlock block) {
+ this.block = block;
+ }
+
+ @Override
+ public byte[] getBuffer() {
+ return block.getBuffer();
+ }
+
+ @Override
+ public <T extends Weighbable> T getIndex(Supplier<T> supplier) {
+ return block.getIndex(supplier);
+ }
+
+ @Override
+ public void indexWeightChanged() {
+ long newSize = block.recordSize(size);
+ if (newSize > acceptableSize() && !evictionInProgress) {
+ runEviction();
+ }
+ }
+ }
+
+ private CacheEntry wrap(CachedBlock cb) {
+ if (cb == null) {
+ return null;
+ }
+
+ return new LruCacheEntry(cb);
+ }
+
// BlockCache implementation
/**
@@ -160,7 +199,6 @@ public class LruBlockCache implements BlockCache, HeapSize {
* @param inMemory
* if block is in-memory
*/
- @Override
public CacheEntry cacheBlock(String blockName, byte buf[], boolean inMemory) {
CachedBlock cb = map.get(blockName);
if (cb != null) {
@@ -175,7 +213,7 @@ public class LruBlockCache implements BlockCache, HeapSize {
cb.access(count.incrementAndGet());
} else {
// Actually added block to cache
- long newSize = size.addAndGet(cb.heapSize());
+ long newSize = cb.recordSize(size);
elements.incrementAndGet();
if (newSize > acceptableSize() && !evictionInProgress) {
runEviction();
@@ -183,7 +221,7 @@ public class LruBlockCache implements BlockCache, HeapSize {
}
}
- return cb;
+ return wrap(cb);
}
/**
@@ -210,7 +248,7 @@ public class LruBlockCache implements BlockCache, HeapSize {
* @return buffer of specified block name, or null if not in cache
*/
@Override
- public CachedBlock getBlock(String blockName) {
+ public CacheEntry getBlock(String blockName) {
CachedBlock cb = map.get(blockName);
if (cb == null) {
stats.miss();
@@ -218,15 +256,25 @@ public class LruBlockCache implements BlockCache, HeapSize {
}
stats.hit();
cb.access(count.incrementAndGet());
- return cb;
+ return wrap(cb);
+ }
+
+ @Override
+ protected CacheEntry getBlockNoStats(String blockName) {
+ CachedBlock cb = map.get(blockName);
+ if (cb != null) {
+ cb.access(count.incrementAndGet());
+ }
+ return wrap(cb);
}
protected long evictBlock(CachedBlock block) {
- map.remove(block.getName());
- size.addAndGet(-1 * block.heapSize());
- elements.decrementAndGet();
- stats.evicted();
- return block.heapSize();
+ if (map.remove(block.getName()) != null) {
+ elements.decrementAndGet();
+ stats.evicted();
+ return block.evicted(size);
+ }
+ return 0;
}
/**
@@ -386,6 +434,11 @@ public class LruBlockCache implements BlockCache, HeapSize {
return this.conf.getMaxSize();
}
+ @Override
+ public int getMaxEntrySize() {
+ return (int) Math.min(Integer.MAX_VALUE, getMaxSize());
+ }
+
/**
* Get the current size of this cache.
*
@@ -507,6 +560,7 @@ public class LruBlockCache implements BlockCache, HeapSize {
* <p>
* Includes: total accesses, hits, misses, evicted blocks, and runs of the eviction processes.
*/
+ @Override
public CacheStats getStats() {
return this.stats;
}
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/SynchronousLoadingBlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/SynchronousLoadingBlockCache.java
new file mode 100644
index 0000000..82c5ef4
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/SynchronousLoadingBlockCache.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.file.blockfile.cache.lru;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
+import org.apache.accumulo.core.file.blockfile.cache.CacheEntry;
+
+/**
+ * This class implements loading in such a way that load operations for the same block will not run concurrently.
+ */
+public abstract class SynchronousLoadingBlockCache implements BlockCache {
+
+ private final Lock[] loadLocks;
+
+ /**
+ * @param numLocks
+ * this controls how many load operations can run concurrently
+ */
+ SynchronousLoadingBlockCache(int numLocks) {
+ loadLocks = new Lock[numLocks];
+ for (int i = 0; i < loadLocks.length; i++) {
+ loadLocks[i] = new ReentrantLock();
+ }
+ }
+
+ public SynchronousLoadingBlockCache() {
+ this(2017);
+ }
+
+ private Map<String,byte[]> resolveDependencies(Map<String,Loader> loaderDeps) {
+ Map<String,byte[]> depData;
+
+ switch (loaderDeps.size()) {
+ case 0:
+ depData = Collections.emptyMap();
+ break;
+ case 1: {
+ Entry<String,Loader> entry = loaderDeps.entrySet().iterator().next();
+ CacheEntry dce = getBlock(entry.getKey(), entry.getValue());
+ if (dce == null) {
+ depData = null;
+ } else {
+ depData = Collections.singletonMap(entry.getKey(), dce.getBuffer());
+ }
+ break;
+ }
+ default: {
+ depData = new HashMap<>();
+ Set<Entry<String,Loader>> es = loaderDeps.entrySet();
+ for (Entry<String,Loader> entry : es) {
+ CacheEntry dce = getBlock(entry.getKey(), entry.getValue());
+ if (dce == null) {
+ depData = null;
+ break;
+ }
+
+ depData.put(entry.getKey(), dce.getBuffer());
+ }
+ break;
+ }
+ }
+
+ return depData;
+ }
+
+ /**
+ * Get the maximum size of an individual cache entry.
+ */
+ protected abstract int getMaxEntrySize();
+
+ /**
+ * Get a block from the cache without changing any stats the cache is keeping.
+ */
+ protected abstract CacheEntry getBlockNoStats(String blockName);
+
+ @Override
+ public CacheEntry getBlock(String blockName, Loader loader) {
+
+ CacheEntry ce = getBlock(blockName);
+ if (ce != null) {
+ return ce;
+ }
+
+ // intentionally done before getting lock
+ Map<String,byte[]> depData = resolveDependencies(loader.getDependencies());
+ if (depData == null) {
+ return null;
+ }
+
+ int lockIndex = (blockName.hashCode() & 0x7fffffff) % loadLocks.length;
+ Lock loadLock = loadLocks[lockIndex];
+
+ try {
+ loadLock.lock();
+
+ // check again after getting lock, could have loaded while waiting on lock
+ ce = getBlockNoStats(blockName);
+ if (ce != null) {
+ return ce;
+ }
+
+ // not in cache so load data
+ byte[] data = loader.load(getMaxEntrySize(), depData);
+ if (data == null) {
+ return null;
+ }
+
+ // attempt to add data to cache
+ return cacheBlock(blockName, data);
+ } finally {
+ loadLock.unlock();
+ }
+ }
+
+}
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java
index 6edfb01..6339e8e 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java
@@ -17,15 +17,19 @@
*/
package org.apache.accumulo.core.file.blockfile.cache.tinylfu;
-import static java.util.Objects.requireNonNull;
-
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
import org.apache.accumulo.core.file.blockfile.cache.BlockCacheManager.Configuration;
import org.apache.accumulo.core.file.blockfile.cache.CacheEntry;
+import org.apache.accumulo.core.file.blockfile.cache.CacheEntry.Weighbable;
import org.apache.accumulo.core.file.blockfile.cache.CacheType;
import org.apache.accumulo.core.file.blockfile.cache.impl.ClassSize;
import org.apache.accumulo.core.file.blockfile.cache.impl.SizeConstants;
@@ -62,10 +66,9 @@ public final class TinyLfuBlockCache implements BlockCache {
return keyWeight + block.weight();
}).maximumWeight(conf.getMaxSize(type)).recordStats().build();
policy = cache.policy().eviction().get();
- statsExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("TinyLfuBlockCacheStatsExecutor").setDaemon(true)
- .build());
+ statsExecutor = Executors
+ .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("TinyLfuBlockCacheStatsExecutor").setDaemon(true).build());
statsExecutor.scheduleAtFixedRate(this::logStats, STATS_PERIOD_SEC, STATS_PERIOD_SEC, TimeUnit.SECONDS);
-
}
@Override
@@ -80,23 +83,14 @@ public final class TinyLfuBlockCache implements BlockCache {
@Override
public CacheEntry getBlock(String blockName) {
- return cache.getIfPresent(blockName);
+ return wrap(blockName, cache.getIfPresent(blockName));
}
@Override
public CacheEntry cacheBlock(String blockName, byte[] buffer) {
- return cache.asMap().compute(blockName, (key, block) -> {
- if (block == null) {
- return new Block(buffer);
- }
- block.buffer = buffer;
- return block;
- });
- }
-
- @Override
- public CacheEntry cacheBlock(String blockName, byte[] buffer, /* ignored */boolean inMemory) {
- return cacheBlock(blockName, buffer);
+ return wrap(blockName, cache.asMap().compute(blockName, (key, block) -> {
+ return new Block(buffer);
+ }));
}
@Override
@@ -123,31 +117,139 @@ public final class TinyLfuBlockCache implements BlockCache {
log.debug(cache.stats().toString());
}
- private static final class Block implements CacheEntry {
- private volatile byte[] buffer;
- private volatile Object index;
+ private static final class Block {
+
+ private final byte[] buffer;
+ private Weighbable index;
+ private volatile int lastIndexWeight;
Block(byte[] buffer) {
- this.buffer = requireNonNull(buffer);
+ this.buffer = buffer;
+ this.lastIndexWeight = buffer.length / 100;
+ }
+
+ int weight() {
+ int indexWeight = lastIndexWeight + SizeConstants.SIZEOF_INT + ClassSize.REFERENCE;
+ return indexWeight + ClassSize.align(getBuffer().length) + SizeConstants.SIZEOF_LONG + ClassSize.REFERENCE + ClassSize.OBJECT + ClassSize.ARRAY;
}
- @Override
public byte[] getBuffer() {
return buffer;
}
+ @SuppressWarnings("unchecked")
+ public synchronized <T extends Weighbable> T getIndex(Supplier<T> supplier) {
+ if (index == null) {
+ index = supplier.get();
+ }
+
+ return (T) index;
+ }
+
+ public synchronized boolean indexWeightChanged() {
+ if (index != null) {
+ int indexWeight = index.weight();
+ if (indexWeight > lastIndexWeight) {
+ lastIndexWeight = indexWeight;
+ return true;
+ }
+ }
+
+ return false;
+ }
+ }
+
+ private CacheEntry wrap(String cacheKey, Block block) {
+ if (block != null) {
+ return new TlfuCacheEntry(cacheKey, block);
+ }
+
+ return null;
+ }
+
+ private class TlfuCacheEntry implements CacheEntry {
+
+ private final String cacheKey;
+ private final Block block;
+
+ TlfuCacheEntry(String k, Block b) {
+ this.cacheKey = k;
+ this.block = b;
+ }
+
@Override
- public Object getIndex() {
- return index;
+ public byte[] getBuffer() {
+ return block.getBuffer();
}
@Override
- public void setIndex(Object index) {
- this.index = index;
+ public <T extends Weighbable> T getIndex(Supplier<T> supplier) {
+ return block.getIndex(supplier);
}
- int weight() {
- return ClassSize.align(buffer.length) + SizeConstants.SIZEOF_LONG + ClassSize.REFERENCE + ClassSize.OBJECT + ClassSize.ARRAY;
+ @Override
+ public void indexWeightChanged() {
+ if (block.indexWeightChanged()) {
+ // update weight
+ cache.put(cacheKey, block);
+ }
+ }
+ }
+
+ private Block load(String key, Loader loader, Map<String,byte[]> resolvedDeps) {
+ byte[] data = loader.load((int) Math.min(Integer.MAX_VALUE, policy.getMaximum()), resolvedDeps);
+ if (data == null) {
+ return null;
}
+
+ return new Block(data);
+ }
+
+ private Map<String,byte[]> resolveDependencies(Map<String,Loader> deps) {
+ if (deps.size() == 1) {
+ Entry<String,Loader> entry = deps.entrySet().iterator().next();
+ CacheEntry ce = getBlock(entry.getKey(), entry.getValue());
+ if (ce == null) {
+ return null;
+ }
+ return Collections.singletonMap(entry.getKey(), ce.getBuffer());
+ } else {
+ HashMap<String,byte[]> resolvedDeps = new HashMap<>();
+ for (Entry<String,Loader> entry : deps.entrySet()) {
+ CacheEntry ce = getBlock(entry.getKey(), entry.getValue());
+ if (ce == null) {
+ return null;
+ }
+ resolvedDeps.put(entry.getKey(), ce.getBuffer());
+ }
+ return resolvedDeps;
+ }
+ }
+
+ @Override
+ public CacheEntry getBlock(String blockName, Loader loader) {
+ Map<String,Loader> deps = loader.getDependencies();
+ Block block;
+ if (deps.size() == 0) {
+ block = cache.get(blockName, k -> load(blockName, loader, Collections.emptyMap()));
+ } else {
+ // This code path exist to handle the case where dependencies may need to be loaded. Loading dependencies will access the cache. Cache load functions
+ // should not access the cache.
+ block = cache.getIfPresent(blockName);
+
+ if (block == null) {
+ // Load dependencies outside of cache load function.
+ Map<String,byte[]> resolvedDeps = resolveDependencies(deps);
+ if (resolvedDeps == null) {
+ return null;
+ }
+
+ // Use asMap because it will not increment stats, getIfPresent recorded a miss above. Use computeIfAbsent because it is possible another thread loaded
+ // the data since this thread called getIfPresent.
+ block = cache.asMap().computeIfAbsent(blockName, k -> load(blockName, loader, resolvedDeps));
+ }
+ }
+
+ return wrap(blockName, block);
}
}
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
index 3ecb5ca..d5ab62c 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
@@ -16,13 +16,16 @@
*/
package org.apache.accumulo.core.file.blockfile.impl;
-import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.lang.ref.SoftReference;
+import java.io.UncheckedIOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.file.blockfile.ABlockReader;
@@ -30,7 +33,9 @@ import org.apache.accumulo.core.file.blockfile.ABlockWriter;
import org.apache.accumulo.core.file.blockfile.BlockFileReader;
import org.apache.accumulo.core.file.blockfile.BlockFileWriter;
import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCache.Loader;
import org.apache.accumulo.core.file.blockfile.cache.CacheEntry;
+import org.apache.accumulo.core.file.blockfile.cache.CacheEntry.Weighbable;
import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
import org.apache.accumulo.core.file.rfile.bcfile.BCFile.Reader.BlockReader;
import org.apache.accumulo.core.file.rfile.bcfile.BCFile.Writer.BlockAppender;
@@ -45,8 +50,9 @@ import org.apache.hadoop.fs.Seekable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
/**
- *
* This is a wrapper class for BCFile that includes a cache for independent caches for datablocks and metadatablocks
*/
@@ -138,267 +144,279 @@ public class CachableBlockFile {
}
+ private static interface IoeSupplier<T> {
+ T get() throws IOException;
+ }
+
/**
- *
- *
* Class wraps the BCFile reader.
- *
*/
public static class Reader implements BlockFileReader {
private final RateLimiter readLimiter;
- private BCFile.Reader _bc;
- private final String fileName;
- private BlockCache _dCache = null;
- private BlockCache _iCache = null;
- private InputStream fin = null;
- private FileSystem fs;
- private Configuration conf;
+ // private BCFile.Reader _bc;
+ private final String cacheId;
+ private final BlockCache _dCache;
+ private final BlockCache _iCache;
+ private volatile InputStream fin = null;
private boolean closed = false;
- private AccumuloConfiguration accumuloConfiguration = null;
+ private final Configuration conf;
+ private final AccumuloConfiguration accumuloConfiguration;
+
+ private final IoeSupplier<InputStream> inputSupplier;
+ private final IoeSupplier<Long> lengthSupplier;
+ private final AtomicReference<BCFile.Reader> bcfr = new AtomicReference<>();
+
+ private static final String ROOT_BLOCK_NAME = "!RootData";
// ACCUMULO-4716 - Define MAX_ARRAY_SIZE smaller than Integer.MAX_VALUE to prevent possible OutOfMemory
// errors when allocating arrays - described in stackoverflow post: https://stackoverflow.com/a/8381338
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
- private interface BlockLoader {
- BlockReader get() throws IOException;
+ private BCFile.Reader getBCFile(byte[] serializedMetadata) throws IOException {
- String getInfo();
- }
+ BCFile.Reader reader = bcfr.get();
+ if (reader == null) {
+ RateLimitedInputStream fsIn = new RateLimitedInputStream((InputStream & Seekable) inputSupplier.get(), readLimiter);
+ BCFile.Reader tmpReader;
+ if (serializedMetadata == null) {
+ tmpReader = new BCFile.Reader(fsIn, lengthSupplier.get(), conf, accumuloConfiguration);
+ } else {
+ tmpReader = new BCFile.Reader(serializedMetadata, fsIn, conf, accumuloConfiguration);
+ }
- private class OffsetBlockLoader implements BlockLoader {
+ if (!bcfr.compareAndSet(null, tmpReader)) {
+ fsIn.close();
+ tmpReader.close();
+ return bcfr.get();
+ } else {
+ fin = fsIn;
+ return tmpReader;
+ }
+ }
- private int blockIndex;
+ return reader;
+ }
- OffsetBlockLoader(int blockIndex) {
- this.blockIndex = blockIndex;
+ private BCFile.Reader getBCFile() throws IOException {
+ if (_iCache != null) {
+ CacheEntry mce = _iCache.getBlock(cacheId + ROOT_BLOCK_NAME, new BCFileLoader());
+ if (mce != null) {
+ return getBCFile(mce.getBuffer());
+ }
}
+ return getBCFile(null);
+ }
+
+ private class BCFileLoader implements Loader {
+
@Override
- public BlockReader get() throws IOException {
- return getBCFile(accumuloConfiguration).getDataBlock(blockIndex);
+ public Map<String,Loader> getDependencies() {
+ return Collections.emptyMap();
}
@Override
- public String getInfo() {
- return "" + blockIndex;
+ public byte[] load(int maxSize, Map<String,byte[]> dependencies) {
+ try {
+ return getBCFile(null).serializeMetadata(maxSize);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
}
-
}
- private class RawBlockLoader implements BlockLoader {
-
+ private class RawBlockLoader extends BaseBlockLoader {
private long offset;
private long compressedSize;
private long rawSize;
- RawBlockLoader(long offset, long compressedSize, long rawSize) {
+ private RawBlockLoader(long offset, long compressedSize, long rawSize, boolean loadingMeta) {
+ super(loadingMeta);
this.offset = offset;
this.compressedSize = compressedSize;
this.rawSize = rawSize;
}
@Override
- public BlockReader get() throws IOException {
- return getBCFile(accumuloConfiguration).getDataBlock(offset, compressedSize, rawSize);
+ BlockReader getBlockReader(int maxSize, BCFile.Reader bcfr) throws IOException {
+ if (rawSize > Math.min(maxSize, MAX_ARRAY_SIZE)) {
+ return null;
+ }
+ return bcfr.getDataBlock(offset, compressedSize, rawSize);
}
@Override
- public String getInfo() {
- return "" + offset + "," + compressedSize + "," + rawSize;
+ String getBlockId() {
+ return "raw-(" + offset + "," + compressedSize + "," + rawSize + ")";
}
}
- private class MetaBlockLoader implements BlockLoader {
-
- private String name;
- private AccumuloConfiguration accumuloConfiguration;
+ private class OffsetBlockLoader extends BaseBlockLoader {
+ private int blockIndex;
- MetaBlockLoader(String name, AccumuloConfiguration accumuloConfiguration) {
- this.name = name;
- this.accumuloConfiguration = accumuloConfiguration;
+ private OffsetBlockLoader(int blockIndex, boolean loadingMeta) {
+ super(loadingMeta);
+ this.blockIndex = blockIndex;
}
@Override
- public BlockReader get() throws IOException {
- return getBCFile(accumuloConfiguration).getMetaBlock(name);
+ BlockReader getBlockReader(int maxSize, BCFile.Reader bcfr) throws IOException {
+ if (bcfr.getDataBlockRawSize(blockIndex) > Math.min(maxSize, MAX_ARRAY_SIZE)) {
+ return null;
+ }
+ return bcfr.getDataBlock(blockIndex);
}
@Override
- public String getInfo() {
- return name;
+ String getBlockId() {
+ return "bi-" + blockIndex;
}
}
- public Reader(FileSystem fs, Path dataFile, Configuration conf, BlockCache data, BlockCache index, AccumuloConfiguration accumuloConfiguration)
- throws IOException {
- this(fs, dataFile, conf, data, index, null, accumuloConfiguration);
- }
-
- public Reader(FileSystem fs, Path dataFile, Configuration conf, BlockCache data, BlockCache index, RateLimiter readLimiter,
- AccumuloConfiguration accumuloConfiguration) throws IOException {
-
- /*
- * Grab path create input stream grab len create file
- */
-
- fileName = dataFile.toString();
- this._dCache = data;
- this._iCache = index;
- this.fs = fs;
- this.conf = conf;
- this.accumuloConfiguration = accumuloConfiguration;
- this.readLimiter = readLimiter;
- }
-
- public <InputStreamType extends InputStream & Seekable> Reader(String cacheId, InputStreamType fsin, long len, Configuration conf, BlockCache data,
- BlockCache index, AccumuloConfiguration accumuloConfiguration) throws IOException {
- this.fileName = cacheId;
- this._dCache = data;
- this._iCache = index;
- this.readLimiter = null;
- init(fsin, len, conf, accumuloConfiguration);
- }
+ private class MetaBlockLoader extends BaseBlockLoader {
+ String blockName;
- public <InputStreamType extends InputStream & Seekable> Reader(String cacheId, InputStreamType fsin, long len, Configuration conf,
- AccumuloConfiguration accumuloConfiguration) throws IOException {
- this.fileName = cacheId;
- this.readLimiter = null;
- init(fsin, len, conf, accumuloConfiguration);
- }
-
- private <InputStreamT extends InputStream & Seekable> void init(InputStreamT fsin, long len, Configuration conf, AccumuloConfiguration accumuloConfiguration)
- throws IOException {
- this._bc = new BCFile.Reader(this, fsin, len, conf, accumuloConfiguration);
- }
-
- private synchronized BCFile.Reader getBCFile(AccumuloConfiguration accumuloConfiguration) throws IOException {
- if (closed)
- throw new IllegalStateException("File " + fileName + " is closed");
-
- if (_bc == null) {
- // lazily open file if needed
- Path path = new Path(fileName);
- RateLimitedInputStream fsIn = new RateLimitedInputStream(fs.open(path), this.readLimiter);
- fin = fsIn;
- init(fsIn, fs.getFileStatus(path).getLen(), conf, accumuloConfiguration);
+ MetaBlockLoader(String blockName) {
+ super(true);
+ this.blockName = blockName;
}
- return _bc;
- }
-
- public BlockRead getCachedMetaBlock(String blockName) throws IOException {
- String _lookup = fileName + "M" + blockName;
-
- if (_iCache != null) {
- CacheEntry cacheEntry = _iCache.getBlock(_lookup);
-
- if (cacheEntry != null) {
- return new CachedBlockRead(cacheEntry, cacheEntry.getBuffer());
+ @Override
+ BlockReader getBlockReader(int maxSize, BCFile.Reader bcfr) throws IOException {
+ if (bcfr.getMetaBlockRawSize(blockName) > Math.min(maxSize, MAX_ARRAY_SIZE)) {
+ return null;
}
-
+ return bcfr.getMetaBlock(blockName);
}
- return null;
- }
-
- public BlockRead cacheMetaBlock(String blockName, BlockReader _currBlock) throws IOException {
- String _lookup = fileName + "M" + blockName;
- return cacheBlock(_lookup, _iCache, _currBlock, blockName);
+ @Override
+ String getBlockId() {
+ return "meta-" + blockName;
+ }
}
- public void cacheMetaBlock(String blockName, byte[] b) {
+ private abstract class BaseBlockLoader implements Loader {
- if (_iCache == null)
- return;
+ abstract BlockReader getBlockReader(int maxSize, BCFile.Reader bcfr) throws IOException;
- String _lookup = fileName + "M" + blockName;
- try {
- _iCache.cacheBlock(_lookup, b);
- } catch (Exception e) {
- log.warn("Already cached block: " + _lookup, e);
- }
- }
+ abstract String getBlockId();
- private BlockRead getBlock(String _lookup, BlockCache cache, BlockLoader loader) throws IOException {
+ private boolean loadingMetaBlock;
- BlockReader _currBlock;
-
- if (cache != null) {
- CacheEntry cb = null;
- cb = cache.getBlock(_lookup);
+ public BaseBlockLoader(boolean loadingMetaBlock) {
+ super();
+ this.loadingMetaBlock = loadingMetaBlock;
+ }
- if (cb != null) {
- return new CachedBlockRead(cb, cb.getBuffer());
+ @Override
+ public Map<String,Loader> getDependencies() {
+ if (bcfr.get() == null && loadingMetaBlock) {
+ String _lookup = cacheId + ROOT_BLOCK_NAME;
+ return Collections.singletonMap(_lookup, new BCFileLoader());
}
-
+ return Collections.emptyMap();
}
- /**
- * grab the currBlock at this point the block is still in the data stream
- *
- */
- _currBlock = loader.get();
- /**
- * If the block is bigger than the cache just return the stream
- */
- return cacheBlock(_lookup, cache, _currBlock, loader.getInfo());
+ @Override
+ public byte[] load(int maxSize, Map<String,byte[]> dependencies) {
- }
+ try {
+ BCFile.Reader reader = bcfr.get();
+ if (reader == null) {
+ if (loadingMetaBlock) {
+ byte[] serializedMetadata = dependencies.get(cacheId + ROOT_BLOCK_NAME);
+ reader = getBCFile(serializedMetadata);
+ } else {
+ reader = getBCFile();
+ }
+ }
- private BlockRead cacheBlock(String _lookup, BlockCache cache, BlockReader _currBlock, String block) throws IOException {
+ BlockReader _currBlock = getBlockReader(maxSize, reader);
+ if (_currBlock == null) {
+ return null;
+ }
- if ((cache == null) || (_currBlock.getRawSize() > Math.min(cache.getMaxSize(), MAX_ARRAY_SIZE))) {
- return new BlockRead(_currBlock, _currBlock.getRawSize());
- } else {
+ byte b[] = null;
+ try {
+ b = new byte[(int) _currBlock.getRawSize()];
+ _currBlock.readFully(b);
+ } catch (IOException e) {
+ log.debug("Error full blockRead for file " + cacheId + " for block " + getBlockId(), e);
+ throw new UncheckedIOException(e);
+ } finally {
+ _currBlock.close();
+ }
- /**
- * Try to fully read block for meta data if error try to close file
- *
- */
- byte b[] = null;
- try {
- b = new byte[(int) _currBlock.getRawSize()];
- _currBlock.readFully(b);
+ return b;
} catch (IOException e) {
- log.debug("Error full blockRead for file " + fileName + " for block " + block, e);
- throw e;
- } finally {
- _currBlock.close();
+ throw new UncheckedIOException(e);
}
+ }
+ }
- CacheEntry ce = null;
- try {
- ce = cache.cacheBlock(_lookup, b);
- } catch (Exception e) {
- log.warn("Already cached block: " + _lookup, e);
- }
+ private Reader(String cacheId, IoeSupplier<InputStream> inputSupplier, IoeSupplier<Long> lenghtSupplier, BlockCache data, BlockCache index,
+ RateLimiter readLimiter, Configuration conf, AccumuloConfiguration accumuloConfiguration) {
+ Preconditions.checkArgument(cacheId != null || (data == null && index == null));
+ this.cacheId = cacheId;
+ this.inputSupplier = inputSupplier;
+ this.lengthSupplier = lenghtSupplier;
+ this._dCache = data;
+ this._iCache = index;
+ this.readLimiter = readLimiter;
+ this.conf = conf;
+ this.accumuloConfiguration = accumuloConfiguration;
+ }
- if (ce == null)
- return new BlockRead(new DataInputStream(new ByteArrayInputStream(b)), b.length);
- else
- return new CachedBlockRead(ce, ce.getBuffer());
+ public Reader(FileSystem fs, Path dataFile, Configuration conf, BlockCache data, BlockCache index, AccumuloConfiguration accumuloConfiguration)
+ throws IOException {
+ this(fs, dataFile, conf, data, index, null, accumuloConfiguration);
+ }
- }
+ public Reader(FileSystem fs, Path dataFile, Configuration conf, BlockCache data, BlockCache index, RateLimiter readLimiter,
+ AccumuloConfiguration accumuloConfiguration) throws IOException {
+ this(dataFile.toString(), () -> fs.open(dataFile), () -> fs.getFileStatus(dataFile).getLen(), data, index, readLimiter, conf, accumuloConfiguration);
+ }
+
+ public <InputStreamType extends InputStream & Seekable> Reader(String cacheId, InputStreamType fsin, long len, Configuration conf, BlockCache data,
+ BlockCache index, AccumuloConfiguration accumuloConfiguration) throws IOException {
+ this(cacheId, () -> fsin, () -> len, data, index, null, conf, accumuloConfiguration);
+ }
+
+ public <InputStreamType extends InputStream & Seekable> Reader(InputStreamType fsin, long len, Configuration conf,
+ AccumuloConfiguration accumuloConfiguration) throws IOException {
+ this(null, () -> fsin, () -> len, null, null, null, conf, accumuloConfiguration);
}
/**
* It is intended that once the BlockRead object is returned to the caller, that the caller will read the entire block and then call close on the BlockRead
* class.
- *
- * NOTE: In the case of multi-read threads: This method can do redundant work where an entry is read from disk and other threads check the cache before it
- * has been inserted.
*/
@Override
public BlockRead getMetaBlock(String blockName) throws IOException {
- String _lookup = this.fileName + "M" + blockName;
- return getBlock(_lookup, _iCache, new MetaBlockLoader(blockName, accumuloConfiguration));
+ if (_iCache != null) {
+ String _lookup = this.cacheId + "M" + blockName;
+ CacheEntry ce = _iCache.getBlock(_lookup, new MetaBlockLoader(blockName));
+ if (ce != null) {
+ return new CachedBlockRead(ce, ce.getBuffer());
+ }
+ }
+
+ BlockReader _currBlock = getBCFile(null).getMetaBlock(blockName);
+ return new BlockRead(_currBlock, _currBlock.getRawSize());
}
@Override
public ABlockReader getMetaBlock(long offset, long compressedSize, long rawSize) throws IOException {
- String _lookup = this.fileName + "R" + offset;
- return getBlock(_lookup, _iCache, new RawBlockLoader(offset, compressedSize, rawSize));
+ if (_iCache != null) {
+ String _lookup = this.cacheId + "R" + offset;
+ CacheEntry ce = _iCache.getBlock(_lookup, new RawBlockLoader(offset, compressedSize, rawSize, true));
+ if (ce != null) {
+ return new CachedBlockRead(ce, ce.getBuffer());
+ }
+ }
+
+ BlockReader _currBlock = getBCFile(null).getDataBlock(offset, compressedSize, rawSize);
+ return new BlockRead(_currBlock, _currBlock.getRawSize());
}
/**
@@ -411,15 +429,30 @@ public class CachableBlockFile {
@Override
public BlockRead getDataBlock(int blockIndex) throws IOException {
- String _lookup = this.fileName + "O" + blockIndex;
- return getBlock(_lookup, _dCache, new OffsetBlockLoader(blockIndex));
+ if (_dCache != null) {
+ String _lookup = this.cacheId + "O" + blockIndex;
+ CacheEntry ce = _dCache.getBlock(_lookup, new OffsetBlockLoader(blockIndex, false));
+ if (ce != null) {
+ return new CachedBlockRead(ce, ce.getBuffer());
+ }
+ }
+ BlockReader _currBlock = getBCFile().getDataBlock(blockIndex);
+ return new BlockRead(_currBlock, _currBlock.getRawSize());
}
@Override
public ABlockReader getDataBlock(long offset, long compressedSize, long rawSize) throws IOException {
- String _lookup = this.fileName + "R" + offset;
- return getBlock(_lookup, _dCache, new RawBlockLoader(offset, compressedSize, rawSize));
+ if (_dCache != null) {
+ String _lookup = this.cacheId + "R" + offset;
+ CacheEntry ce = _dCache.getBlock(_lookup, new RawBlockLoader(offset, compressedSize, rawSize, false));
+ if (ce != null) {
+ return new CachedBlockRead(ce, ce.getBuffer());
+ }
+ }
+
+ BlockReader _currBlock = getBCFile().getDataBlock(offset, compressedSize, rawSize);
+ return new BlockRead(_currBlock, _currBlock.getRawSize());
}
@Override
@@ -429,8 +462,9 @@ public class CachableBlockFile {
closed = true;
- if (_bc != null)
- _bc.close();
+ BCFile.Reader reader = bcfr.get();
+ if (reader != null)
+ reader.close();
if (fin != null) {
// synchronize on the FSDataInputStream to ensure thread safety with the BoundedRangeFileInputStream
@@ -477,25 +511,13 @@ public class CachableBlockFile {
}
@Override
- @SuppressWarnings("unchecked")
- public <T> T getIndex(Class<T> clazz) {
- T bi = null;
- synchronized (cb) {
- SoftReference<T> softRef = (SoftReference<T>) cb.getIndex();
- if (softRef != null)
- bi = softRef.get();
-
- if (bi == null) {
- try {
- bi = clazz.newInstance();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- cb.setIndex(new SoftReference<>(bi));
- }
- }
+ public <T extends Weighbable> T getIndex(Supplier<T> indexSupplier) {
+ return cb.getIndex(indexSupplier);
+ }
- return bi;
+ @Override
+ public void indexWeightChanged() {
+ cb.indexWeightChanged();
}
}
@@ -537,7 +559,7 @@ public class CachableBlockFile {
}
@Override
- public <T> T getIndex(Class<T> clazz) {
+ public <T extends Weighbable> T getIndex(Supplier<T> clazz) {
throw new UnsupportedOperationException();
}
@@ -549,5 +571,10 @@ public class CachableBlockFile {
throw new UnsupportedOperationException();
}
+ @Override
+ public void indexWeightChanged() {
+ throw new UnsupportedOperationException();
+ }
+
}
}
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java
index 652515e..3401396 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/BlockIndex.java
@@ -24,22 +24,28 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.file.blockfile.ABlockReader;
+import org.apache.accumulo.core.file.blockfile.cache.CacheEntry.Weighbable;
+import org.apache.accumulo.core.file.blockfile.cache.impl.ClassSize;
+import org.apache.accumulo.core.file.blockfile.cache.impl.SizeConstants;
import org.apache.accumulo.core.file.rfile.MultiLevelIndex.IndexEntry;
/**
*
*/
-public class BlockIndex {
+public class BlockIndex implements Weighbable {
public static BlockIndex getIndex(ABlockReader cacheBlock, IndexEntry indexEntry) throws IOException {
- BlockIndex blockIndex = cacheBlock.getIndex(BlockIndex.class);
+ BlockIndex blockIndex = cacheBlock.getIndex(BlockIndex::new);
+ if (blockIndex == null)
+ return null;
int accessCount = blockIndex.accessCount.incrementAndGet();
// 1 is a power of two, but do not care about it
if (accessCount >= 2 && isPowerOfTwo(accessCount)) {
blockIndex.buildIndex(accessCount, cacheBlock, indexEntry);
+ cacheBlock.indexWeightChanged();
}
if (blockIndex.blockIndex != null)
@@ -98,8 +104,12 @@ public class BlockIndex {
@Override
public int hashCode() {
- assert false : "hashCode not designed";
- return 42; // any arbitrary constant will do
+ throw new UnsupportedOperationException("hashCode not designed");
+ }
+
+ int weight() {
+ int keyWeight = ClassSize.align(prevKey.getSize()) + ClassSize.OBJECT + SizeConstants.SIZEOF_LONG + 4 * (ClassSize.ARRAY + ClassSize.REFERENCE);
+ return 2 * SizeConstants.SIZEOF_INT + ClassSize.REFERENCE + ClassSize.OBJECT + keyWeight;
}
}
@@ -186,4 +196,17 @@ public class BlockIndex {
BlockIndexEntry[] getIndexEntries() {
return blockIndex;
}
+
+ @Override
+ public synchronized int weight() {
+ int weight = 0;
+ if (blockIndex != null) {
+ for (BlockIndexEntry blockIndexEntry : blockIndex) {
+ weight += blockIndexEntry.weight();
+ }
+ }
+
+ weight += ClassSize.ATOMIC_INTEGER + ClassSize.OBJECT + 2 * ClassSize.REFERENCE + ClassSize.ARRAY;
+ return weight;
+ }
}
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
index e745583..e998a87 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
@@ -17,6 +17,7 @@
package org.apache.accumulo.core.file.rfile.bcfile;
+import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.DataInput;
@@ -26,6 +27,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
@@ -36,8 +38,6 @@ import java.util.TreeMap;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
-import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile.BlockRead;
import org.apache.accumulo.core.file.rfile.bcfile.CompareUtils.Scalar;
import org.apache.accumulo.core.file.rfile.bcfile.Compression.Algorithm;
import org.apache.accumulo.core.file.rfile.bcfile.Utils.Version;
@@ -595,8 +595,6 @@ public final class BCFile {
* BCFile Reader, interface to read the file's data and meta blocks.
*/
static public class Reader implements Closeable {
- private static final String META_NAME = "BCFile.metaindex";
- private static final String CRYPTO_BLOCK_NAME = "BCFile.cryptoparams";
private final SeekableDataInputStream in;
private final Configuration conf;
final DataIndex dataIndex;
@@ -759,6 +757,37 @@ public final class BCFile {
}
}
+ public byte[] serializeMetadata(int maxSize) {
+ try {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(baos);
+
+ metaIndex.write(out);
+ if (out.size() > maxSize) {
+ return null;
+ }
+ dataIndex.write(out);
+ if (out.size() > maxSize) {
+ return null;
+ }
+ if (cryptoParams == null) {
+ out.writeBoolean(false);
+ } else {
+ out.writeBoolean(true);
+ cryptoParams.write(out);
+ }
+
+ if (out.size() > maxSize) {
+ return null;
+ }
+
+ out.close();
+ return baos.toByteArray();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
/**
* Constructor
*
@@ -847,131 +876,24 @@ public final class BCFile {
}
}
- public <InputStreamType extends InputStream & Seekable> Reader(CachableBlockFile.Reader cache, InputStreamType fin, long fileLength, Configuration conf,
+ public <InputStreamType extends InputStream & Seekable> Reader(byte[] serializedMetadata, InputStreamType fin, Configuration conf,
AccumuloConfiguration accumuloConfiguration) throws IOException {
this.in = new SeekableDataInputStream(fin);
this.conf = conf;
- BlockRead cachedMetaIndex = cache.getCachedMetaBlock(META_NAME);
- BlockRead cachedDataIndex = cache.getCachedMetaBlock(DataIndex.BLOCK_NAME);
- BlockRead cachedCryptoParams = cache.getCachedMetaBlock(CRYPTO_BLOCK_NAME);
-
- if (cachedMetaIndex == null || cachedDataIndex == null || cachedCryptoParams == null) {
- // move the cursor to the beginning of the tail, containing: offset to the
- // meta block index, version and magic
- // Move the cursor to grab the version and the magic first
- this.in.seek(fileLength - Magic.size() - Version.size());
- version = new Version(this.in);
- Magic.readAndVerify(this.in);
-
- // Do a version check
- if (!version.compatibleWith(BCFile.API_VERSION) && !version.equals(BCFile.API_VERSION_1)) {
- throw new RuntimeException("Incompatible BCFile fileBCFileVersion.");
- }
-
- // Read the right number offsets based on version
- long offsetIndexMeta = 0;
- long offsetCryptoParameters = 0;
-
- if (version.equals(API_VERSION_1)) {
- this.in.seek(fileLength - Magic.size() - Version.size() - (Long.SIZE / Byte.SIZE));
- offsetIndexMeta = this.in.readLong();
-
- } else {
- this.in.seek(fileLength - Magic.size() - Version.size() - (2 * (Long.SIZE / Byte.SIZE)));
- offsetIndexMeta = this.in.readLong();
- offsetCryptoParameters = this.in.readLong();
- }
-
- // read meta index
- this.in.seek(offsetIndexMeta);
- metaIndex = new MetaIndex(this.in);
-
- // If they exist, read the crypto parameters
- if (!version.equals(BCFile.API_VERSION_1) && cachedCryptoParams == null) {
-
- // read crypto parameters
- this.in.seek(offsetCryptoParameters);
- cryptoParams = new BCFileCryptoModuleParameters();
- cryptoParams.read(this.in);
-
- if (accumuloConfiguration.getBoolean(Property.CRYPTO_OVERRIDE_KEY_STRATEGY_WITH_CONFIGURED_STRATEGY)) {
- Map<String,String> cryptoConfFromAccumuloConf = accumuloConfiguration.getAllPropertiesWithPrefix(Property.CRYPTO_PREFIX);
- Map<String,String> instanceConf = accumuloConfiguration.getAllPropertiesWithPrefix(Property.INSTANCE_PREFIX);
-
- cryptoConfFromAccumuloConf.putAll(instanceConf);
-
- for (String name : cryptoParams.getAllOptions().keySet()) {
- if (!name.equals(Property.CRYPTO_SECRET_KEY_ENCRYPTION_STRATEGY_CLASS.getKey())) {
- cryptoConfFromAccumuloConf.put(name, cryptoParams.getAllOptions().get(name));
- } else {
- cryptoParams.setKeyEncryptionStrategyClass(cryptoConfFromAccumuloConf.get(Property.CRYPTO_SECRET_KEY_ENCRYPTION_STRATEGY_CLASS.getKey()));
- }
- }
-
- cryptoParams.setAllOptions(cryptoConfFromAccumuloConf);
- }
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(baos);
- cryptoParams.write(dos);
- dos.close();
- cache.cacheMetaBlock(CRYPTO_BLOCK_NAME, baos.toByteArray());
-
- this.cryptoModule = CryptoModuleFactory.getCryptoModule(cryptoParams.getAllOptions().get(Property.CRYPTO_MODULE_CLASS.getKey()));
- this.secretKeyEncryptionStrategy = CryptoModuleFactory.getSecretKeyEncryptionStrategy(cryptoParams.getKeyEncryptionStrategyClass());
+ ByteArrayInputStream bais = new ByteArrayInputStream(serializedMetadata);
+ DataInputStream dis = new DataInputStream(bais);
- // This call should put the decrypted session key within the cryptoParameters object
- // secretKeyEncryptionStrategy.decryptSecretKey(cryptoParameters);
+ version = null;
- cryptoParams = (BCFileCryptoModuleParameters) secretKeyEncryptionStrategy.decryptSecretKey(cryptoParams);
-
- } else if (cachedCryptoParams != null) {
- setupCryptoFromCachedData(cachedCryptoParams);
- } else {
- // Place something in cache that indicates this file has no crypto metadata. See ACCUMULO-4141
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(baos);
- NO_CRYPTO.write(dos);
- dos.close();
- cache.cacheMetaBlock(CRYPTO_BLOCK_NAME, baos.toByteArray());
- }
-
- if (cachedMetaIndex == null) {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(baos);
- metaIndex.write(dos);
- dos.close();
- cache.cacheMetaBlock(META_NAME, baos.toByteArray());
- }
-
- // read data:BCFile.index, the data block index
- if (cachedDataIndex == null) {
- BlockReader blockR = getMetaBlock(DataIndex.BLOCK_NAME);
- cachedDataIndex = cache.cacheMetaBlock(DataIndex.BLOCK_NAME, blockR);
- }
-
- try {
- dataIndex = new DataIndex(cachedDataIndex);
- } catch (IOException e) {
- LOG.error("Got IOException when trying to create DataIndex block");
- throw e;
- } finally {
- cachedDataIndex.close();
- }
-
- } else {
- // We have cached versions of the metaIndex, dataIndex and cryptoParams objects.
- // Use them to fill out this reader's members.
- version = null;
-
- metaIndex = new MetaIndex(cachedMetaIndex);
- dataIndex = new DataIndex(cachedDataIndex);
- setupCryptoFromCachedData(cachedCryptoParams);
+ metaIndex = new MetaIndex(dis);
+ dataIndex = new DataIndex(dis);
+ if (dis.readBoolean()) {
+ setupCryptoFromCachedData(dis);
}
}
- private void setupCryptoFromCachedData(BlockRead cachedCryptoParams) throws IOException {
+ private void setupCryptoFromCachedData(DataInput cachedCryptoParams) throws IOException {
BCFileCryptoModuleParameters params = new BCFileCryptoModuleParameters();
params.read(cachedCryptoParams);
@@ -1052,6 +974,15 @@ public final class BCFile {
return createReader(imeBCIndex.getCompressionAlgorithm(), region);
}
+ public long getMetaBlockRawSize(String name) throws IOException, MetaBlockDoesNotExist {
+ MetaIndexEntry imeBCIndex = metaIndex.getMetaByName(name);
+ if (imeBCIndex == null) {
+ throw new MetaBlockDoesNotExist("name=" + name);
+ }
+
+ return imeBCIndex.getRegion().getRawSize();
+ }
+
/**
* Stream access to a Data Block.
*
@@ -1073,11 +1004,18 @@ public final class BCFile {
return createReader(dataIndex.getDefaultCompressionAlgorithm(), region);
}
+ public long getDataBlockRawSize(int blockIndex) {
+ if (blockIndex < 0 || blockIndex >= getBlockCount()) {
+ throw new IndexOutOfBoundsException(String.format("blockIndex=%d, numBlocks=%d", blockIndex, getBlockCount()));
+ }
+
+ return dataIndex.getBlockRegionList().get(blockIndex).getRawSize();
+ }
+
private BlockReader createReader(Algorithm compressAlgo, BlockRegion region) throws IOException {
RBlockState rbs = new RBlockState(compressAlgo, in, region, conf, cryptoModule, version, cryptoParams);
return new BlockReader(rbs);
}
-
}
/**
diff --git a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
index fa92fe0..e633f13 100644
--- a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
+++ b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
@@ -66,11 +66,6 @@ public class SummaryReader {
}
@Override
- public CacheEntry cacheBlock(String blockName, byte[] buf, boolean inMemory) {
- return summaryCache.cacheBlock(blockName, buf, inMemory);
- }
-
- @Override
public CacheEntry getBlock(String blockName) {
CacheEntry ce = summaryCache.getBlock(blockName);
if (ce == null) {
@@ -81,6 +76,34 @@ public class SummaryReader {
}
@Override
+ public CacheEntry getBlock(String blockName, Loader loader) {
+ Loader idxLoader = new Loader() {
+
+ CacheEntry idxCacheEntry;
+
+ @Override
+ public Map<String,Loader> getDependencies() {
+ idxCacheEntry = indexCache.getBlock(blockName);
+ if (idxCacheEntry == null) {
+ return loader.getDependencies();
+ } else {
+ return Collections.emptyMap();
+ }
+ }
+
+ @Override
+ public byte[] load(int maxSize, Map<String,byte[]> dependencies) {
+ if (idxCacheEntry == null) {
+ return loader.load(maxSize, dependencies);
+ } else {
+ return idxCacheEntry.getBuffer();
+ }
+ }
+ };
+ return summaryCache.getBlock(blockName, idxLoader);
+ }
+
+ @Override
public long getMaxSize() {
return summaryCache.getMaxSize();
}
@@ -144,9 +167,9 @@ public class SummaryReader {
return fileSummaries;
}
- public static SummaryReader load(String id, Configuration conf, AccumuloConfiguration aConf, InputStream inputStream, long length,
+ public static SummaryReader load(Configuration conf, AccumuloConfiguration aConf, InputStream inputStream, long length,
Predicate<SummarizerConfiguration> summarySelector, SummarizerFactory factory) throws IOException {
- org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile.Reader bcReader = new CachableBlockFile.Reader(id, (InputStream & Seekable) inputStream,
+ org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile.Reader bcReader = new CachableBlockFile.Reader((InputStream & Seekable) inputStream,
length, conf, aConf);
return load(bcReader, summarySelector, factory);
}
diff --git a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java
index 8b7f32b..4b4a886 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java
@@ -51,8 +51,8 @@ public class TestLruBlockCache extends TestCase {
cc.set(Property.TSERV_DATACACHE_SIZE, Long.toString(1000027));
cc.set(Property.TSERV_SUMMARYCACHE_SIZE, Long.toString(1000029));
- LruBlockCacheConfiguration.builder(CacheType.INDEX).useEvictionThread(false).minFactor(0.93f).acceptableFactor(0.97f).singleFactor(0.20f)
- .multiFactor(0.30f).memoryFactor(0.50f).mapConcurrencyLevel(5).buildMap().forEach(cc::set);
+ LruBlockCacheConfiguration.builder(CacheType.INDEX).useEvictionThread(false).minFactor(0.93f).acceptableFactor(0.97f).singleFactor(0.20f).multiFactor(0.30f)
+ .memoryFactor(0.50f).mapConcurrencyLevel(5).buildMap().forEach(cc::set);
String defaultPrefix = BlockCacheManager.CACHE_PROPERTY_BASE + LruBlockCacheConfiguration.PROPERTY_PREFIX + ".default.";
@@ -225,8 +225,8 @@ public class TestLruBlockCache extends TestCase {
BlockCacheManager manager = BlockCacheManagerFactory.getInstance(cc);
cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(blockSize));
cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(maxSize));
- LruBlockCacheConfiguration.builder(CacheType.INDEX).useEvictionThread(false).minFactor(0.98f).acceptableFactor(0.99f).singleFactor(0.25f)
- .multiFactor(0.50f).memoryFactor(0.25f).buildMap().forEach(cc::set);
+ LruBlockCacheConfiguration.builder(CacheType.INDEX).useEvictionThread(false).minFactor(0.98f).acceptableFactor(0.99f).singleFactor(0.25f).multiFactor(0.50f)
+ .memoryFactor(0.25f).buildMap().forEach(cc::set);
manager.start(new BlockCacheConfiguration(cc));
LruBlockCache cache = (LruBlockCache) manager.getBlockCache(CacheType.INDEX);
@@ -289,8 +289,8 @@ public class TestLruBlockCache extends TestCase {
BlockCacheManager manager = BlockCacheManagerFactory.getInstance(cc);
cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(blockSize));
cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(maxSize));
- LruBlockCacheConfiguration.builder(CacheType.INDEX).useEvictionThread(false).minFactor(0.98f).acceptableFactor(0.99f).singleFactor(0.33f)
- .multiFactor(0.33f).memoryFactor(0.34f).buildMap().forEach(cc::set);
+ LruBlockCacheConfiguration.builder(CacheType.INDEX).useEvictionThread(false).minFactor(0.98f).acceptableFactor(0.99f).singleFactor(0.33f).multiFactor(0.33f)
+ .memoryFactor(0.34f).buildMap().forEach(cc::set);
manager.start(new BlockCacheConfiguration(cc));
LruBlockCache cache = (LruBlockCache) manager.getBlockCache(CacheType.INDEX);
@@ -412,8 +412,8 @@ public class TestLruBlockCache extends TestCase {
BlockCacheManager manager = BlockCacheManagerFactory.getInstance(cc);
cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(blockSize));
cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(maxSize));
- LruBlockCacheConfiguration.builder(CacheType.INDEX).useEvictionThread(false).minFactor(0.66f).acceptableFactor(0.99f).singleFactor(0.33f)
- .multiFactor(0.33f).memoryFactor(0.34f).buildMap().forEach(cc::set);
+ LruBlockCacheConfiguration.builder(CacheType.INDEX).useEvictionThread(false).minFactor(0.66f).acceptableFactor(0.99f).singleFactor(0.33f).multiFactor(0.33f)
+ .memoryFactor(0.34f).buildMap().forEach(cc::set);
manager.start(new BlockCacheConfiguration(cc));
LruBlockCache cache = (LruBlockCache) manager.getBlockCache(CacheType.INDEX);
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/BlockIndexTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/BlockIndexTest.java
index 2985591..93c54e5 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/BlockIndexTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/BlockIndexTest.java
@@ -19,6 +19,7 @@ package org.apache.accumulo.core.file.rfile;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.util.function.Supplier;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
@@ -43,20 +44,22 @@ public class BlockIndexTest {
this.data = d;
}
+ @SuppressWarnings("unchecked")
@Override
- public void setIndex(Object idx) {
- this.idx = idx;
- }
-
- @Override
- public Object getIndex() {
- return idx;
+ public <T extends Weighbable> T getIndex(Supplier<T> indexSupplier) {
+ if (idx == null) {
+ idx = indexSupplier.get();
+ }
+ return (T) idx;
}
@Override
public byte[] getBuffer() {
return data;
}
+
+ @Override
+ public void indexWeightChanged() {}
}
@Test
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
index 36e7af4..1c61162 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
@@ -77,7 +77,7 @@ public class MultiLevelIndexTest extends TestCase {
byte[] data = baos.toByteArray();
SeekableByteArrayInputStream bais = new SeekableByteArrayInputStream(data);
FSDataInputStream in = new FSDataInputStream(bais);
- CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader("source1", in, data.length, CachedConfiguration.getInstance(), aconf);
+ CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in, data.length, CachedConfiguration.getInstance(), aconf);
Reader reader = new Reader(_cbr, RFile.RINDEX_VER_8);
BlockRead rootIn = _cbr.getMetaBlock("root");
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
index 2581f91..0645880 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
@@ -287,7 +287,6 @@ public class RFileTest {
LruBlockCache dataCache = (LruBlockCache) manager.getBlockCache(CacheType.DATA);
CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader("source-1", in, fileLength, conf, dataCache, indexCache, DefaultConfiguration.getInstance());
-
reader = new RFile.Reader(_cbr);
if (cfsi)
iter = new ColumnFamilySkippingIterator(reader);
@@ -1647,7 +1646,7 @@ public class RFileTest {
SeekableByteArrayInputStream bais = new SeekableByteArrayInputStream(data);
FSDataInputStream in2 = new FSDataInputStream(bais);
AccumuloConfiguration aconf = DefaultConfiguration.getInstance();
- CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader("source-1", in2, data.length, CachedConfiguration.getInstance(), aconf);
+ CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in2, data.length, CachedConfiguration.getInstance(), aconf);
Reader reader = new RFile.Reader(_cbr);
checkIndex(reader);
--
To stop receiving notification emails like this one, please contact
['"commits@accumulo.apache.org" <co...@accumulo.apache.org>'].