You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2018/11/13 16:49:10 UTC
[accumulo] branch master updated: fixes #495 use file len cache for
summary data (#741)
This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push:
new bdb6f15 fixes #495 use file len cache for summary data (#741)
bdb6f15 is described below
commit bdb6f15a488cec871b6c6ebbe7820bc13c9153e5
Author: Keith Turner <kt...@apache.org>
AuthorDate: Tue Nov 13 11:49:05 2018 -0500
fixes #495 use file len cache for summary data (#741)
---
.../org/apache/accumulo/core/client/AccumuloClient.java | 2 --
.../java/org/apache/accumulo/core/summary/Gatherer.java | 13 ++++++++-----
.../org/apache/accumulo/core/summary/SummaryReader.java | 9 ++++++---
.../main/java/org/apache/accumulo/tserver/TabletServer.java | 5 ++++-
.../accumulo/tserver/TabletServerResourceManager.java | 10 ++++++++--
.../accumulo/tserver/compaction/MajorCompactionRequest.java | 11 +++++++----
.../java/org/apache/accumulo/tserver/tablet/Tablet.java | 4 +++-
7 files changed, 36 insertions(+), 18 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/client/AccumuloClient.java b/core/src/main/java/org/apache/accumulo/core/client/AccumuloClient.java
index 2955972..5371830 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/AccumuloClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/AccumuloClient.java
@@ -35,8 +35,6 @@ import org.apache.accumulo.core.security.Authorizations;
* Supports fluent API for creation. Various options can be provided to {@link Accumulo#newClient()}
* and when finished a call to build() will return the AccumuloClient object. For example:
*
- * <p>
- *
* <pre>
* <code>
* try (AccumuloClient client = Accumulo.newClient()
diff --git a/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java b/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
index 029e86b..2d94d39 100644
--- a/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
+++ b/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
@@ -82,6 +82,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
@@ -97,7 +98,7 @@ import com.google.common.hash.Hashing;
* execute {@link #processPartition(ExecutorService, int, int)}
* <li>{@link #processPartition(ExecutorService, int, int)} will make RPC calls to multiple tserver
* to remotely execute
- * <li>{@link #processFiles(FileSystemResolver, Map, BlockCache, BlockCache, ExecutorService)}
+ * <li>{@link #processFiles(FileSystemResolver, Map, BlockCache, BlockCache, Cache, ExecutorService)}
* </ol>
*/
public class Gatherer {
@@ -508,12 +509,12 @@ public class Gatherer {
*/
public Future<SummaryCollection> processFiles(FileSystemResolver volMgr,
Map<String,List<TRowRange>> files, BlockCache summaryCache, BlockCache indexCache,
- ExecutorService srp) {
+ Cache<String,Long> fileLenCache, ExecutorService srp) {
List<CompletableFuture<SummaryCollection>> futures = new ArrayList<>();
for (Entry<String,List<TRowRange>> entry : files.entrySet()) {
futures.add(CompletableFuture.supplyAsync(() -> {
List<RowRange> rrl = Lists.transform(entry.getValue(), RowRange::new);
- return getSummaries(volMgr, entry.getKey(), rrl, summaryCache, indexCache);
+ return getSummaries(volMgr, entry.getKey(), rrl, summaryCache, indexCache, fileLenCache);
}, srp));
}
@@ -664,10 +665,12 @@ public class Gatherer {
}
private SummaryCollection getSummaries(FileSystemResolver volMgr, String file,
- List<RowRange> ranges, BlockCache summaryCache, BlockCache indexCache) {
+ List<RowRange> ranges, BlockCache summaryCache, BlockCache indexCache,
+ Cache<String,Long> fileLenCache) {
Path path = new Path(file);
Configuration conf = CachedConfiguration.getInstance();
return SummaryReader.load(volMgr.get(path), conf, ctx.getConfiguration(), factory, path,
- summarySelector, summaryCache, indexCache, cryptoService).getSummaries(ranges);
+ summarySelector, summaryCache, indexCache, fileLenCache, cryptoService)
+ .getSummaries(ranges);
}
}
diff --git a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
index 668de1e..ed71059 100644
--- a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
+++ b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
@@ -45,6 +45,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.WritableUtils;
+import com.google.common.cache.Cache;
+
public class SummaryReader {
private interface BlockReader {
@@ -185,15 +187,16 @@ public class SummaryReader {
public static SummaryReader load(FileSystem fs, Configuration conf, AccumuloConfiguration aConf,
SummarizerFactory factory, Path file, Predicate<SummarizerConfiguration> summarySelector,
- BlockCache summaryCache, BlockCache indexCache, CryptoService cryptoService) {
+ BlockCache summaryCache, BlockCache indexCache, Cache<String,Long> fileLenCache,
+ CryptoService cryptoService) {
CachableBlockFile.Reader bcReader = null;
try {
// the reason BCFile is used instead of RFile is to avoid reading in the RFile meta block when
// only summary data is wanted.
CompositeCache compositeCache = new CompositeCache(summaryCache, indexCache);
- bcReader = new CachableBlockFile.Reader(fs, file, conf, null, compositeCache, aConf,
- cryptoService);
+ bcReader = new CachableBlockFile.Reader(fs, file, conf, fileLenCache, null, compositeCache,
+ null, aConf, cryptoService);
return load(bcReader, summarySelector, factory);
} catch (FileNotFoundException fne) {
SummaryReader sr = new SummaryReader();
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 9a2b1ff..4816697 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -275,6 +275,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
public class TabletServer implements Runnable {
@@ -2118,9 +2119,11 @@ public class TabletServer implements Runnable {
.getTableConfiguration(Table.ID.of(request.getTableId()));
BlockCache summaryCache = resourceManager.getSummaryCache();
BlockCache indexCache = resourceManager.getIndexCache();
+ Cache<String,Long> fileLenCache = resourceManager.getFileLenCache();
FileSystemResolver volMgr = p -> fs.getVolumeByPath(p).getFileSystem();
Future<SummaryCollection> future = new Gatherer(context, request, tableCfg,
- context.getCryptoService()).processFiles(volMgr, files, summaryCache, indexCache, srp);
+ context.getCryptoService()).processFiles(volMgr, files, summaryCache, indexCache,
+ fileLenCache, srp);
return startSummaryOperation(credentials, future);
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index 7e8f891..9afba64 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -135,6 +135,8 @@ public class TabletServerResourceManager {
private final ServerConfigurationFactory conf;
private final ServerContext context;
+ private Cache<String,Long> fileLenCache;
+
private ExecutorService addEs(String name, ExecutorService tp) {
if (threadPools.containsKey(name)) {
throw new IllegalArgumentException(
@@ -408,8 +410,8 @@ public class TabletServerResourceManager {
int maxOpenFiles = acuConf.getCount(Property.TSERV_SCAN_MAX_OPENFILES);
- Cache<String,Long> fileLenCache = CacheBuilder.newBuilder()
- .maximumSize(Math.min(maxOpenFiles * 1000L, 100_000)).build();
+ fileLenCache = CacheBuilder.newBuilder().maximumSize(Math.min(maxOpenFiles * 1000L, 100_000))
+ .build();
fileManager = new FileManager(tserver.getContext(), fs, maxOpenFiles, fileLenCache, _dCache,
_iCache);
@@ -1006,6 +1008,10 @@ public class TabletServerResourceManager {
return _sCache;
}
+ public Cache<String,Long> getFileLenCache() {
+ return fileLenCache;
+ }
+
public ExecutorService getSummaryRetrievalExecutor() {
return summaryRetrievalPool;
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java
index 8b1ba08..9221a22 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
/**
* Information that can be used to determine how a tablet is to be major compacted, if needed.
@@ -65,10 +66,11 @@ public class MajorCompactionRequest implements Cloneable {
private final BlockCache summaryCache;
private Map<FileRef,DataFileValue> files;
private final ServerContext context;
+ private final Cache<String,Long> fileLenCache;
public MajorCompactionRequest(KeyExtent extent, MajorCompactionReason reason,
VolumeManager manager, AccumuloConfiguration tabletConfig, BlockCache summaryCache,
- BlockCache indexCache, ServerContext context) {
+ BlockCache indexCache, Cache<String,Long> fileLenCache, ServerContext context) {
this.extent = extent;
this.reason = reason;
this.volumeManager = manager;
@@ -76,17 +78,18 @@ public class MajorCompactionRequest implements Cloneable {
this.files = Collections.emptyMap();
this.summaryCache = summaryCache;
this.indexCache = indexCache;
+ this.fileLenCache = fileLenCache;
this.context = context;
}
public MajorCompactionRequest(KeyExtent extent, MajorCompactionReason reason,
AccumuloConfiguration tabletConfig, ServerContext context) {
- this(extent, reason, null, tabletConfig, null, null, context);
+ this(extent, reason, null, tabletConfig, null, null, null, context);
}
public MajorCompactionRequest(MajorCompactionRequest mcr) {
this(mcr.extent, mcr.reason, mcr.volumeManager, mcr.tableConfig, mcr.summaryCache,
- mcr.indexCache, mcr.context);
+ mcr.indexCache, mcr.fileLenCache, mcr.context);
// know this is already unmodifiable, no need to wrap again
this.files = mcr.files;
}
@@ -155,7 +158,7 @@ public class MajorCompactionRequest implements Cloneable {
Configuration conf = CachedConfiguration.getInstance();
SummaryCollection fsc = SummaryReader
.load(fs, conf, tableConfig, factory, file.path(), summarySelector, summaryCache,
- indexCache, context.getCryptoService())
+ indexCache, fileLenCache, context.getCryptoService())
.getSummaries(Collections.singletonList(new Gatherer.RowRange(extent)));
sc.merge(fsc, factory);
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 914f7f1..507ebdb 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -1823,8 +1823,10 @@ public class Tablet implements TabletCommitter {
if (strategy != null) {
BlockCache sc = tabletResources.getTabletServerResourceManager().getSummaryCache();
BlockCache ic = tabletResources.getTabletServerResourceManager().getIndexCache();
+ Cache<String,Long> fileLenCache = tabletResources.getTabletServerResourceManager()
+ .getFileLenCache();
MajorCompactionRequest request = new MajorCompactionRequest(extent, reason,
- getTabletServer().getFileSystem(), tableConfiguration, sc, ic, context);
+ getTabletServer().getFileSystem(), tableConfiguration, sc, ic, fileLenCache, context);
request.setFiles(getDatafileManager().getDatafileSizes());
strategy.gatherInformation(request);
}