You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2018/11/13 16:49:08 UTC

[GitHub] keith-turner closed pull request #741: fixes #495 use file len cache for summary data

keith-turner closed pull request #741: fixes #495 use file len cache for summary data
URL: https://github.com/apache/accumulo/pull/741
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/java/org/apache/accumulo/core/client/AccumuloClient.java b/core/src/main/java/org/apache/accumulo/core/client/AccumuloClient.java
index e7f6a4bd4c..4c804719d5 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/AccumuloClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/AccumuloClient.java
@@ -35,8 +35,6 @@
  * Supports fluent API for creation. Various options can be provided to {@link Accumulo#newClient()}
  * and when finished a call to build() will return the AccumuloClient object. For example:
  *
- * <p>
- *
  * <pre>
  * <code>
  * try (AccumuloClient client = Accumulo.newClient()
diff --git a/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java b/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
index 029e86be5e..2d94d39094 100644
--- a/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
+++ b/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java
@@ -82,6 +82,7 @@
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
 import com.google.common.collect.Lists;
 import com.google.common.hash.Hashing;
 
@@ -97,7 +98,7 @@
  * execute {@link #processPartition(ExecutorService, int, int)}
  * <li>{@link #processPartition(ExecutorService, int, int)} will make RPC calls to multiple tserver
  * to remotely execute
- * <li>{@link #processFiles(FileSystemResolver, Map, BlockCache, BlockCache, ExecutorService)}
+ * <li>{@link #processFiles(FileSystemResolver, Map, BlockCache, BlockCache, Cache, ExecutorService)}
  * </ol>
  */
 public class Gatherer {
@@ -508,12 +509,12 @@ public SummaryCollection get(long timeout, TimeUnit unit)
    */
   public Future<SummaryCollection> processFiles(FileSystemResolver volMgr,
       Map<String,List<TRowRange>> files, BlockCache summaryCache, BlockCache indexCache,
-      ExecutorService srp) {
+      Cache<String,Long> fileLenCache, ExecutorService srp) {
     List<CompletableFuture<SummaryCollection>> futures = new ArrayList<>();
     for (Entry<String,List<TRowRange>> entry : files.entrySet()) {
       futures.add(CompletableFuture.supplyAsync(() -> {
         List<RowRange> rrl = Lists.transform(entry.getValue(), RowRange::new);
-        return getSummaries(volMgr, entry.getKey(), rrl, summaryCache, indexCache);
+        return getSummaries(volMgr, entry.getKey(), rrl, summaryCache, indexCache, fileLenCache);
       }, srp));
     }
 
@@ -664,10 +665,12 @@ public String toString() {
   }
 
   private SummaryCollection getSummaries(FileSystemResolver volMgr, String file,
-      List<RowRange> ranges, BlockCache summaryCache, BlockCache indexCache) {
+      List<RowRange> ranges, BlockCache summaryCache, BlockCache indexCache,
+      Cache<String,Long> fileLenCache) {
     Path path = new Path(file);
     Configuration conf = CachedConfiguration.getInstance();
     return SummaryReader.load(volMgr.get(path), conf, ctx.getConfiguration(), factory, path,
-        summarySelector, summaryCache, indexCache, cryptoService).getSummaries(ranges);
+        summarySelector, summaryCache, indexCache, fileLenCache, cryptoService)
+        .getSummaries(ranges);
   }
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
index 668de1ea86..ed71059d5e 100644
--- a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
+++ b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
@@ -45,6 +45,8 @@
 import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.io.WritableUtils;
 
+import com.google.common.cache.Cache;
+
 public class SummaryReader {
 
   private interface BlockReader {
@@ -185,15 +187,16 @@ public static SummaryReader load(Configuration conf, AccumuloConfiguration aConf
 
   public static SummaryReader load(FileSystem fs, Configuration conf, AccumuloConfiguration aConf,
       SummarizerFactory factory, Path file, Predicate<SummarizerConfiguration> summarySelector,
-      BlockCache summaryCache, BlockCache indexCache, CryptoService cryptoService) {
+      BlockCache summaryCache, BlockCache indexCache, Cache<String,Long> fileLenCache,
+      CryptoService cryptoService) {
     CachableBlockFile.Reader bcReader = null;
 
     try {
       // the reason BCFile is used instead of RFile is to avoid reading in the RFile meta block when
       // only summary data is wanted.
       CompositeCache compositeCache = new CompositeCache(summaryCache, indexCache);
-      bcReader = new CachableBlockFile.Reader(fs, file, conf, null, compositeCache, aConf,
-          cryptoService);
+      bcReader = new CachableBlockFile.Reader(fs, file, conf, fileLenCache, null, compositeCache,
+          null, aConf, cryptoService);
       return load(bcReader, summarySelector, factory);
     } catch (FileNotFoundException fne) {
       SummaryReader sr = new SummaryReader();
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 9a2b1ff527..4816697000 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -275,6 +275,7 @@
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
 
 public class TabletServer implements Runnable {
 
@@ -2118,9 +2119,11 @@ public TSummaries startGetSummariesFromFiles(TInfo tinfo, TCredentials credentia
           .getTableConfiguration(Table.ID.of(request.getTableId()));
       BlockCache summaryCache = resourceManager.getSummaryCache();
       BlockCache indexCache = resourceManager.getIndexCache();
+      Cache<String,Long> fileLenCache = resourceManager.getFileLenCache();
       FileSystemResolver volMgr = p -> fs.getVolumeByPath(p).getFileSystem();
       Future<SummaryCollection> future = new Gatherer(context, request, tableCfg,
-          context.getCryptoService()).processFiles(volMgr, files, summaryCache, indexCache, srp);
+          context.getCryptoService()).processFiles(volMgr, files, summaryCache, indexCache,
+              fileLenCache, srp);
 
       return startSummaryOperation(credentials, future);
     }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index 7e8f891922..9afba64e04 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -135,6 +135,8 @@
   private final ServerConfigurationFactory conf;
   private final ServerContext context;
 
+  private Cache<String,Long> fileLenCache;
+
   private ExecutorService addEs(String name, ExecutorService tp) {
     if (threadPools.containsKey(name)) {
       throw new IllegalArgumentException(
@@ -408,8 +410,8 @@ public TabletServerResourceManager(TabletServer tserver, VolumeManager fs,
 
     int maxOpenFiles = acuConf.getCount(Property.TSERV_SCAN_MAX_OPENFILES);
 
-    Cache<String,Long> fileLenCache = CacheBuilder.newBuilder()
-        .maximumSize(Math.min(maxOpenFiles * 1000L, 100_000)).build();
+    fileLenCache = CacheBuilder.newBuilder().maximumSize(Math.min(maxOpenFiles * 1000L, 100_000))
+        .build();
 
     fileManager = new FileManager(tserver.getContext(), fs, maxOpenFiles, fileLenCache, _dCache,
         _iCache);
@@ -1006,6 +1008,10 @@ public BlockCache getSummaryCache() {
     return _sCache;
   }
 
+  public Cache<String,Long> getFileLenCache() {
+    return fileLenCache;
+  }
+
   public ExecutorService getSummaryRetrievalExecutor() {
     return summaryRetrievalPool;
   }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java
index 8b1ba0854a..9221a223ec 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java
@@ -52,6 +52,7 @@
 import org.apache.hadoop.fs.FileSystem;
 
 import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
 
 /**
  * Information that can be used to determine how a tablet is to be major compacted, if needed.
@@ -65,10 +66,11 @@
   private final BlockCache summaryCache;
   private Map<FileRef,DataFileValue> files;
   private final ServerContext context;
+  private final Cache<String,Long> fileLenCache;
 
   public MajorCompactionRequest(KeyExtent extent, MajorCompactionReason reason,
       VolumeManager manager, AccumuloConfiguration tabletConfig, BlockCache summaryCache,
-      BlockCache indexCache, ServerContext context) {
+      BlockCache indexCache, Cache<String,Long> fileLenCache, ServerContext context) {
     this.extent = extent;
     this.reason = reason;
     this.volumeManager = manager;
@@ -76,17 +78,18 @@ public MajorCompactionRequest(KeyExtent extent, MajorCompactionReason reason,
     this.files = Collections.emptyMap();
     this.summaryCache = summaryCache;
     this.indexCache = indexCache;
+    this.fileLenCache = fileLenCache;
     this.context = context;
   }
 
   public MajorCompactionRequest(KeyExtent extent, MajorCompactionReason reason,
       AccumuloConfiguration tabletConfig, ServerContext context) {
-    this(extent, reason, null, tabletConfig, null, null, context);
+    this(extent, reason, null, tabletConfig, null, null, null, context);
   }
 
   public MajorCompactionRequest(MajorCompactionRequest mcr) {
     this(mcr.extent, mcr.reason, mcr.volumeManager, mcr.tableConfig, mcr.summaryCache,
-        mcr.indexCache, mcr.context);
+        mcr.indexCache, mcr.fileLenCache, mcr.context);
     // know this is already unmodifiable, no need to wrap again
     this.files = mcr.files;
   }
@@ -155,7 +158,7 @@ public MajorCompactionReason getReason() {
       Configuration conf = CachedConfiguration.getInstance();
       SummaryCollection fsc = SummaryReader
           .load(fs, conf, tableConfig, factory, file.path(), summarySelector, summaryCache,
-              indexCache, context.getCryptoService())
+              indexCache, fileLenCache, context.getCryptoService())
           .getSummaries(Collections.singletonList(new Gatherer.RowRange(extent)));
       sc.merge(fsc, factory);
     }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 914f7f139e..507ebdb42c 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -1823,8 +1823,10 @@ private CompactionStats _majorCompact(MajorCompactionReason reason)
     if (strategy != null) {
       BlockCache sc = tabletResources.getTabletServerResourceManager().getSummaryCache();
       BlockCache ic = tabletResources.getTabletServerResourceManager().getIndexCache();
+      Cache<String,Long> fileLenCache = tabletResources.getTabletServerResourceManager()
+          .getFileLenCache();
       MajorCompactionRequest request = new MajorCompactionRequest(extent, reason,
-          getTabletServer().getFileSystem(), tableConfiguration, sc, ic, context);
+          getTabletServer().getFileSystem(), tableConfiguration, sc, ic, fileLenCache, context);
       request.setFiles(getDatafileManager().getDatafileSizes());
       strategy.gatherInformation(request);
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services