You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2018/05/22 18:30:17 UTC

[GitHub] keith-turner closed pull request #492: fixes #467 cache file lengths

keith-turner closed pull request #492: fixes #467 cache file lengths
URL: https://github.com/apache/accumulo/pull/492
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
index 5608d18f32..5a26ad2cfd 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
@@ -34,6 +34,8 @@
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 
+import com.google.common.cache.Cache;
+
 public abstract class FileOperations {
 
   private static final HashSet<String> validExtensions = new HashSet<>(
@@ -373,6 +375,7 @@ public FileSKVWriter build() throws IOException {
       extends FileIOOperation<SubclassType> {
     private BlockCache dataCache;
     private BlockCache indexCache;
+    private Cache<String,Long> fileLenCache;
 
     /**
      * (Optional) Set the block cache pair to be used to optimize reads within the constructed
@@ -401,6 +404,12 @@ public SubclassType withIndexCache(BlockCache indexCache) {
       return (SubclassType) this;
     }
 
+    @SuppressWarnings("unchecked")
+    public SubclassType withFileLenCache(Cache<String,Long> fileLenCache) {
+      this.fileLenCache = fileLenCache;
+      return (SubclassType) this;
+    }
+
     public BlockCache getDataCache() {
       return dataCache;
     }
@@ -408,6 +417,10 @@ public BlockCache getDataCache() {
     public BlockCache getIndexCache() {
       return indexCache;
     }
+
+    public Cache<String,Long> getFileLenCache() {
+      return fileLenCache;
+    }
   }
 
   /** Builder interface parallel to {@link FileReaderOperation}. */
@@ -426,6 +439,11 @@ public BlockCache getIndexCache() {
      * (Optional) set the index cache to be used to optimize reads within the constructed reader.
      */
     public SubbuilderType withIndexCache(BlockCache indexCache);
+
+    /**
+     * (Optional) set the file len cache to be used to optimize reads within the constructed reader.
+     */
+    public SubbuilderType withFileLenCache(Cache<String,Long> fileLenCache);
   }
 
   /**
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
index 35a3e8ae83..a8d1e741fb 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
@@ -23,6 +23,8 @@
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.lang.ref.SoftReference;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.file.blockfile.ABlockReader;
@@ -45,6 +47,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.cache.Cache;
+
 /**
  *
  * This is a wrapper class for BCFile that includes a cache for independent caches for datablocks
@@ -149,11 +153,13 @@ public long getStartPos() throws IOException {
    *
    */
   public static class Reader implements BlockFileReader {
+
     private final RateLimiter readLimiter;
     private BCFile.Reader _bc;
     private final String fileName;
     private BlockCache _dCache = null;
     private BlockCache _iCache = null;
+    private Cache<String,Long> fileLenCache = null;
     private InputStream fin = null;
     private FileSystem fs;
     private Configuration conf;
@@ -238,12 +244,12 @@ public String getInfo() {
 
     public Reader(FileSystem fs, Path dataFile, Configuration conf, BlockCache data,
         BlockCache index, AccumuloConfiguration accumuloConfiguration) throws IOException {
-      this(fs, dataFile, conf, data, index, null, accumuloConfiguration);
+      this(fs, dataFile, conf, null, data, index, null, accumuloConfiguration);
     }
 
-    public Reader(FileSystem fs, Path dataFile, Configuration conf, BlockCache data,
-        BlockCache index, RateLimiter readLimiter, AccumuloConfiguration accumuloConfiguration)
-        throws IOException {
+    public Reader(FileSystem fs, Path dataFile, Configuration conf, Cache<String,Long> fileLenCache,
+        BlockCache data, BlockCache index, RateLimiter readLimiter,
+        AccumuloConfiguration accumuloConfiguration) throws IOException {
 
       /*
        * Grab path create input stream grab len create file
@@ -252,6 +258,7 @@ public Reader(FileSystem fs, Path dataFile, Configuration conf, BlockCache data,
       fileName = dataFile.toString();
       this._dCache = data;
       this._iCache = index;
+      this.fileLenCache = fileLenCache;
       this.fs = fs;
       this.conf = conf;
       this.accumuloConfiguration = accumuloConfiguration;
@@ -281,6 +288,20 @@ public Reader(FileSystem fs, Path dataFile, Configuration conf, BlockCache data,
       this._bc = new BCFile.Reader(this, fsin, len, conf, accumuloConfiguration);
     }
 
+    private static long getFileLen(Cache<String,Long> fileLenCache, final FileSystem fs,
+        final Path path) throws IOException {
+      try {
+        return fileLenCache.get(path.getName(), new Callable<Long>() {
+          @Override
+          public Long call() throws Exception {
+            return fs.getFileStatus(path).getLen();
+          }
+        });
+      } catch (ExecutionException e) {
+        throw new IOException("Failed to get " + path + " len from cache ", e);
+      }
+    }
+
     private synchronized BCFile.Reader getBCFile(AccumuloConfiguration accumuloConfiguration)
         throws IOException {
       if (closed)
@@ -288,10 +309,26 @@ public Reader(FileSystem fs, Path dataFile, Configuration conf, BlockCache data,
 
       if (_bc == null) {
         // lazily open file if needed
-        Path path = new Path(fileName);
+        final Path path = new Path(fileName);
+
         RateLimitedInputStream fsIn = new RateLimitedInputStream(fs.open(path), this.readLimiter);
         fin = fsIn;
-        init(fsIn, fs.getFileStatus(path).getLen(), conf, accumuloConfiguration);
+
+        if (fileLenCache != null) {
+          try {
+            init(fsIn, getFileLen(fileLenCache, fs, path), conf, accumuloConfiguration);
+          } catch (Exception e) {
+            log.debug("Failed to open {}, clearing file length cache and retrying", fileName, e);
+            fileLenCache.invalidate(path.getName());
+          }
+
+          if (_bc == null) {
+            init(fsIn, getFileLen(fileLenCache, fs, path), conf, accumuloConfiguration);
+          }
+        } else {
+          init(fsIn, fs.getFileStatus(path).getLen(), conf, accumuloConfiguration);
+
+        }
       }
 
       return _bc;
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
index fd10031c8a..131dee18c3 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
@@ -44,8 +44,9 @@
 
   private static RFile.Reader getReader(FileReaderOperation<?> options) throws IOException {
     CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(options.getFileSystem(),
-        new Path(options.getFilename()), options.getConfiguration(), options.getDataCache(),
-        options.getIndexCache(), options.getRateLimiter(), options.getTableConfiguration());
+        new Path(options.getFilename()), options.getConfiguration(), options.getFileLenCache(),
+        options.getDataCache(), options.getIndexCache(), options.getRateLimiter(),
+        options.getTableConfiguration());
     return new RFile.Reader(_cbr);
   }
 
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
index 9b9944466c..a067eaa643 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
@@ -58,6 +58,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.cache.Cache;
+
 public class FileManager {
 
   private static final Logger log = LoggerFactory.getLogger(FileManager.class);
@@ -113,6 +115,7 @@ public int hashCode() {
   // null if unallocated
   private BlockCache dataCache = null;
   private BlockCache indexCache = null;
+  private Cache<String,Long> fileLenCache;
 
   private long maxIdleTime;
 
@@ -165,13 +168,14 @@ public void run() {
    *          : underlying file can and should be able to handle a null cache
    */
   public FileManager(AccumuloServerContext context, VolumeManager fs, int maxOpen,
-      BlockCache dataCache, BlockCache indexCache) {
+      Cache<String,Long> fileLenCache, BlockCache dataCache, BlockCache indexCache) {
 
     if (maxOpen <= 0)
       throw new IllegalArgumentException("maxOpen <= 0");
     this.context = context;
     this.dataCache = dataCache;
     this.indexCache = indexCache;
+    this.fileLenCache = fileLenCache;
 
     this.filePermits = new Semaphore(maxOpen, false);
     this.maxOpen = maxOpen;
@@ -322,7 +326,7 @@ private void closeReaders(Collection<FileSKVIterator> filesToClose) {
             .forFile(path.toString(), ns, ns.getConf())
             .withTableConfiguration(
                 context.getServerConfigurationFactory().getTableConfiguration(tablet))
-            .withBlockCache(dataCache, indexCache).build();
+            .withBlockCache(dataCache, indexCache).withFileLenCache(fileLenCache).build();
         readersReserved.put(reader, file);
       } catch (Exception e) {
 
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index ff8202e500..4d1f3732bc 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -64,6 +64,8 @@
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 
 /**
  * ResourceManager is responsible for managing the resources of all tablets within a tablet server.
@@ -228,7 +230,10 @@ public TabletServerResourceManager(TabletServer tserver, VolumeManager fs) {
 
     int maxOpenFiles = acuConf.getCount(Property.TSERV_SCAN_MAX_OPENFILES);
 
-    fileManager = new FileManager(tserver, fs, maxOpenFiles, _dCache, _iCache);
+    Cache<String,Long> fileLenCache = CacheBuilder.newBuilder()
+        .maximumSize(Math.min(maxOpenFiles * 1000L, 100_000)).build();
+
+    fileManager = new FileManager(tserver, fs, maxOpenFiles, fileLenCache, _dCache, _iCache);
 
     memoryManager = Property.createInstanceFromPropertyName(acuConf, Property.TSERV_MEM_MGMT,
         MemoryManager.class, new LargestFirstMemoryManager());


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services