You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2018/11/13 16:49:10 UTC

[accumulo] branch master updated: fixes #495 use file len cache for summary data (#741)

This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new bdb6f15  fixes #495 use file len cache for summary data (#741)
bdb6f15 is described below

commit bdb6f15a488cec871b6c6ebbe7820bc13c9153e5
Author: Keith Turner <kt...@apache.org>
AuthorDate: Tue Nov 13 11:49:05 2018 -0500

    fixes #495 use file len cache for summary data (#741)
---
 .../org/apache/accumulo/core/client/AccumuloClient.java     |  2 --
 .../java/org/apache/accumulo/core/summary/Gatherer.java     | 13 ++++++++-----
 .../org/apache/accumulo/core/summary/SummaryReader.java     |  9 ++++++---
 .../main/java/org/apache/accumulo/tserver/TabletServer.java |  5 ++++-
 .../accumulo/tserver/TabletServerResourceManager.java       | 10 ++++++++--
 .../accumulo/tserver/compaction/MajorCompactionRequest.java | 11 +++++++----
 .../java/org/apache/accumulo/tserver/tablet/Tablet.java     |  4 +++-
 7 files changed, 36 insertions(+), 18 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/client/AccumuloClient.java b/core/src/main/java/org/apache/accumulo/core/client/AccumuloClient.java
index 2955972..5371830 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/AccumuloClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/AccumuloClient.java
@@ -35,8 +35,6 @@ import org.apache.accumulo.core.security.Authorizations;
  * Supports fluent API for creation. Various options can be provided to {@link Accumulo#newClient()}
  * and when finished a call to build() will return the AccumuloClient object. For example:
  *
- * <p>
- *
  * <pre>
  * <code>
  * try (AccumuloClient client = Accumulo.newClient()
diff --git a/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java b/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
index 029e86b..2d94d39 100644
--- a/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
+++ b/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
@@ -82,6 +82,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
 import com.google.common.collect.Lists;
 import com.google.common.hash.Hashing;
 
@@ -97,7 +98,7 @@ import com.google.common.hash.Hashing;
  * execute {@link #processPartition(ExecutorService, int, int)}
  * <li>{@link #processPartition(ExecutorService, int, int)} will make RPC calls to multiple tserver
  * to remotely execute
- * <li>{@link #processFiles(FileSystemResolver, Map, BlockCache, BlockCache, ExecutorService)}
+ * <li>{@link #processFiles(FileSystemResolver, Map, BlockCache, BlockCache, Cache, ExecutorService)}
  * </ol>
  */
 public class Gatherer {
@@ -508,12 +509,12 @@ public class Gatherer {
    */
   public Future<SummaryCollection> processFiles(FileSystemResolver volMgr,
       Map<String,List<TRowRange>> files, BlockCache summaryCache, BlockCache indexCache,
-      ExecutorService srp) {
+      Cache<String,Long> fileLenCache, ExecutorService srp) {
     List<CompletableFuture<SummaryCollection>> futures = new ArrayList<>();
     for (Entry<String,List<TRowRange>> entry : files.entrySet()) {
       futures.add(CompletableFuture.supplyAsync(() -> {
         List<RowRange> rrl = Lists.transform(entry.getValue(), RowRange::new);
-        return getSummaries(volMgr, entry.getKey(), rrl, summaryCache, indexCache);
+        return getSummaries(volMgr, entry.getKey(), rrl, summaryCache, indexCache, fileLenCache);
       }, srp));
     }
 
@@ -664,10 +665,12 @@ public class Gatherer {
   }
 
   private SummaryCollection getSummaries(FileSystemResolver volMgr, String file,
-      List<RowRange> ranges, BlockCache summaryCache, BlockCache indexCache) {
+      List<RowRange> ranges, BlockCache summaryCache, BlockCache indexCache,
+      Cache<String,Long> fileLenCache) {
     Path path = new Path(file);
     Configuration conf = CachedConfiguration.getInstance();
     return SummaryReader.load(volMgr.get(path), conf, ctx.getConfiguration(), factory, path,
-        summarySelector, summaryCache, indexCache, cryptoService).getSummaries(ranges);
+        summarySelector, summaryCache, indexCache, fileLenCache, cryptoService)
+        .getSummaries(ranges);
   }
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
index 668de1e..ed71059 100644
--- a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
+++ b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
@@ -45,6 +45,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.io.WritableUtils;
 
+import com.google.common.cache.Cache;
+
 public class SummaryReader {
 
   private interface BlockReader {
@@ -185,15 +187,16 @@ public class SummaryReader {
 
   public static SummaryReader load(FileSystem fs, Configuration conf, AccumuloConfiguration aConf,
       SummarizerFactory factory, Path file, Predicate<SummarizerConfiguration> summarySelector,
-      BlockCache summaryCache, BlockCache indexCache, CryptoService cryptoService) {
+      BlockCache summaryCache, BlockCache indexCache, Cache<String,Long> fileLenCache,
+      CryptoService cryptoService) {
     CachableBlockFile.Reader bcReader = null;
 
     try {
       // the reason BCFile is used instead of RFile is to avoid reading in the RFile meta block when
       // only summary data is wanted.
       CompositeCache compositeCache = new CompositeCache(summaryCache, indexCache);
-      bcReader = new CachableBlockFile.Reader(fs, file, conf, null, compositeCache, aConf,
-          cryptoService);
+      bcReader = new CachableBlockFile.Reader(fs, file, conf, fileLenCache, null, compositeCache,
+          null, aConf, cryptoService);
       return load(bcReader, summarySelector, factory);
     } catch (FileNotFoundException fne) {
       SummaryReader sr = new SummaryReader();
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 9a2b1ff..4816697 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -275,6 +275,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
 
 public class TabletServer implements Runnable {
 
@@ -2118,9 +2119,11 @@ public class TabletServer implements Runnable {
           .getTableConfiguration(Table.ID.of(request.getTableId()));
       BlockCache summaryCache = resourceManager.getSummaryCache();
       BlockCache indexCache = resourceManager.getIndexCache();
+      Cache<String,Long> fileLenCache = resourceManager.getFileLenCache();
       FileSystemResolver volMgr = p -> fs.getVolumeByPath(p).getFileSystem();
       Future<SummaryCollection> future = new Gatherer(context, request, tableCfg,
-          context.getCryptoService()).processFiles(volMgr, files, summaryCache, indexCache, srp);
+          context.getCryptoService()).processFiles(volMgr, files, summaryCache, indexCache,
+              fileLenCache, srp);
 
       return startSummaryOperation(credentials, future);
     }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index 7e8f891..9afba64 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -135,6 +135,8 @@ public class TabletServerResourceManager {
   private final ServerConfigurationFactory conf;
   private final ServerContext context;
 
+  private Cache<String,Long> fileLenCache;
+
   private ExecutorService addEs(String name, ExecutorService tp) {
     if (threadPools.containsKey(name)) {
       throw new IllegalArgumentException(
@@ -408,8 +410,8 @@ public class TabletServerResourceManager {
 
     int maxOpenFiles = acuConf.getCount(Property.TSERV_SCAN_MAX_OPENFILES);
 
-    Cache<String,Long> fileLenCache = CacheBuilder.newBuilder()
-        .maximumSize(Math.min(maxOpenFiles * 1000L, 100_000)).build();
+    fileLenCache = CacheBuilder.newBuilder().maximumSize(Math.min(maxOpenFiles * 1000L, 100_000))
+        .build();
 
     fileManager = new FileManager(tserver.getContext(), fs, maxOpenFiles, fileLenCache, _dCache,
         _iCache);
@@ -1006,6 +1008,10 @@ public class TabletServerResourceManager {
     return _sCache;
   }
 
+  public Cache<String,Long> getFileLenCache() {
+    return fileLenCache;
+  }
+
   public ExecutorService getSummaryRetrievalExecutor() {
     return summaryRetrievalPool;
   }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java
index 8b1ba08..9221a22 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 
 import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
 
 /**
  * Information that can be used to determine how a tablet is to be major compacted, if needed.
@@ -65,10 +66,11 @@ public class MajorCompactionRequest implements Cloneable {
   private final BlockCache summaryCache;
   private Map<FileRef,DataFileValue> files;
   private final ServerContext context;
+  private final Cache<String,Long> fileLenCache;
 
   public MajorCompactionRequest(KeyExtent extent, MajorCompactionReason reason,
       VolumeManager manager, AccumuloConfiguration tabletConfig, BlockCache summaryCache,
-      BlockCache indexCache, ServerContext context) {
+      BlockCache indexCache, Cache<String,Long> fileLenCache, ServerContext context) {
     this.extent = extent;
     this.reason = reason;
     this.volumeManager = manager;
@@ -76,17 +78,18 @@ public class MajorCompactionRequest implements Cloneable {
     this.files = Collections.emptyMap();
     this.summaryCache = summaryCache;
     this.indexCache = indexCache;
+    this.fileLenCache = fileLenCache;
     this.context = context;
   }
 
   public MajorCompactionRequest(KeyExtent extent, MajorCompactionReason reason,
       AccumuloConfiguration tabletConfig, ServerContext context) {
-    this(extent, reason, null, tabletConfig, null, null, context);
+    this(extent, reason, null, tabletConfig, null, null, null, context);
   }
 
   public MajorCompactionRequest(MajorCompactionRequest mcr) {
     this(mcr.extent, mcr.reason, mcr.volumeManager, mcr.tableConfig, mcr.summaryCache,
-        mcr.indexCache, mcr.context);
+        mcr.indexCache, mcr.fileLenCache, mcr.context);
     // know this is already unmodifiable, no need to wrap again
     this.files = mcr.files;
   }
@@ -155,7 +158,7 @@ public class MajorCompactionRequest implements Cloneable {
       Configuration conf = CachedConfiguration.getInstance();
       SummaryCollection fsc = SummaryReader
           .load(fs, conf, tableConfig, factory, file.path(), summarySelector, summaryCache,
-              indexCache, context.getCryptoService())
+              indexCache, fileLenCache, context.getCryptoService())
           .getSummaries(Collections.singletonList(new Gatherer.RowRange(extent)));
       sc.merge(fsc, factory);
     }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 914f7f1..507ebdb 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -1823,8 +1823,10 @@ public class Tablet implements TabletCommitter {
     if (strategy != null) {
       BlockCache sc = tabletResources.getTabletServerResourceManager().getSummaryCache();
       BlockCache ic = tabletResources.getTabletServerResourceManager().getIndexCache();
+      Cache<String,Long> fileLenCache = tabletResources.getTabletServerResourceManager()
+          .getFileLenCache();
       MajorCompactionRequest request = new MajorCompactionRequest(extent, reason,
-          getTabletServer().getFileSystem(), tableConfiguration, sc, ic, context);
+          getTabletServer().getFileSystem(), tableConfiguration, sc, ic, fileLenCache, context);
       request.setFiles(getDatafileManager().getDatafileSizes());
       strategy.gatherInformation(request);
     }