You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/05/03 17:38:07 UTC

[GitHub] [iceberg] steveloughran commented on a diff in pull request #4518: core: Provide mechanism to cache manifest file content

steveloughran commented on code in PR #4518:
URL: https://github.com/apache/iceberg/pull/4518#discussion_r864026701


##########
core/src/main/java/org/apache/iceberg/io/CachingFileIO.java:
##########
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Weigher;
+import com.github.benmanes.caffeine.cache.stats.CacheStats;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of FileIO that adds metadata content caching features.
+ * <p>
+ * CachingFileIO intend to speedup scan planning by caching most of the table metadata files in memory and reduce
+ * remote reads. It is not meant to be dynamically loaded. Instead, set
+ * {@link CatalogProperties#IO_CACHE_ENABLED} to true and use
+ * {@link org.apache.iceberg.CatalogUtil#loadFileIO(String, Map, Object)} to wrap another FileIO with this
+ * CachingFileIO.
+ */
+public class CachingFileIO implements FileIO {
+  private static final Logger LOG = LoggerFactory.getLogger(CachingFileIO.class);
+  private static final int BUFFER_CHUNK_SIZE = 4 * 1024 * 1024; // 4MB
+  private static ContentCache sharedCache;
+
+  private static ContentCache createCache(Map<String, String> properties) {
+    long durationMs = PropertyUtil.propertyAsLong(properties,
+        CatalogProperties.IO_CACHE_EXPIRATION_INTERVAL_MS,
+        CatalogProperties.IO_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT);
+    long totalBytes = PropertyUtil.propertyAsLong(properties,
+        CatalogProperties.IO_CACHE_MAX_TOTAL_BYTES,
+        CatalogProperties.IO_CACHE_MAX_TOTAL_BYTES_DEFAULT);
+    long contentLength = PropertyUtil.propertyAsLong(properties,
+        CatalogProperties.IO_CACHE_MAX_CONTENT_LENGTH,
+        CatalogProperties.IO_CACHE_MAX_CONTENT_LENGTH_DEFAULT);
+
+    if (durationMs >= 0 && totalBytes > 0) {
+      return new ContentCache(durationMs, totalBytes, contentLength);
+    } else {
+      return null;
+    }
+  }
+
+  public static synchronized ContentCache getOrCreate(Map<String, String> properties) {
+    if (sharedCache == null) {
+      sharedCache = createCache(properties);
+    }
+    return sharedCache;
+  }
+
+  public static boolean ioCacheEnabled(Map<String, String> properties) {
+    boolean enabled = PropertyUtil.propertyAsBoolean(properties,
+        CatalogProperties.IO_CACHE_ENABLED,
+        CatalogProperties.IO_CACHE_ENABLED_DEFAULT);
+    long durationMs = PropertyUtil.propertyAsLong(properties,
+        CatalogProperties.IO_CACHE_EXPIRATION_INTERVAL_MS,
+        CatalogProperties.IO_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT);
+    long totalBytes = PropertyUtil.propertyAsLong(properties,
+        CatalogProperties.IO_CACHE_MAX_TOTAL_BYTES,
+        CatalogProperties.IO_CACHE_MAX_TOTAL_BYTES_DEFAULT);
+
+    return enabled && durationMs >= 0 && totalBytes > 0;
+  }
+
+  private ContentCache fileContentCache;
+  private Map<String, String> properties;
+  private FileIO wrappedIO;
+
+  /**
+   * CachingFileIO is not meant to be dynamically loaded.
+   */
+  private CachingFileIO() {
+  }
+
+  public CachingFileIO(FileIO io) {
+    assignIO(io);
+  }
+
+  public void assignIO(FileIO io) {
+    this.wrappedIO = io;
+  }
+
+  /**
+   * Initialize File IO from catalog properties.
+   * <p>
+   * It assumes that the inner FileIO is already initialized and therefore does not reinitialize it.
+   * If this is a reinitialization, it is the caller responsibility to call {@link #close()} and
+   * {@link #assignIO(FileIO)} prior to reinitialization to close the previously assigned FileIO and reassign a new one.
+   * Otherwise, previously assigned FileIO will be kept in use.
+   * <p>
+   * Reinitialization will discard previous content cache unless shared cache is being used
+   * ({@link CatalogProperties#IO_CACHE_SHARED} stays true).
+   * @param newProperties catalog properties
+   */
+  @Override
+  public void initialize(Map<String, String> newProperties) {
+    this.properties = newProperties;
+    initCache();
+  }
+
+  private void initCache() {
+    boolean isSharedCache = PropertyUtil.propertyAsBoolean(properties,
+        CatalogProperties.IO_CACHE_SHARED,
+        CatalogProperties.IO_CACHE_SHARED_DEFAULT);
+    if (isSharedCache) {
+      this.fileContentCache = getOrCreate(properties);
+      LOG.info("CachingFileIO created with shared cache. {} ", this.fileContentCache);
+    } else {
+      this.fileContentCache = createCache(properties);
+      LOG.info("CachingFileIO created. {} ", this.fileContentCache);
+    }
+  }
+
+  @Override
+  public void close() {
+    if (wrappedIO != null) {
+      wrappedIO.close();
+      wrappedIO = null;
+    }
+  }
+
+  @Override
+  public InputFile newInputFile(String path) {
+    InputFile inFile = wrappedIO.newInputFile(path);
+    if (fileContentCache != null && path.contains("/metadata/")) {
+      // TODO: Currently we just believe that this is a metadata file if it has "metadata" dir on its path. But metadata
+      //  location can be set differently for different tables. A table might have different metadata path by
+      //  setting custom "write.metadata.path" TableProperties.
+      //  We might need to extend FileIO interface. Add method 'newMetadataInputFile' and let caller use it to
+      //  differentiate metadata files vs other files.
+      return new CachingInputFile(fileContentCache, inFile);
+    }
+    return inFile;
+  }
+
+  @Override
+  public OutputFile newOutputFile(String path) {
+    return wrappedIO.newOutputFile(path);
+  }
+
+  @Override
+  public void deleteFile(String path) {
+    wrappedIO.deleteFile(path);
+    if (fileContentCache != null) {
+      fileContentCache.cache.invalidate(path);
+    }
+  }
+
+  public ContentCache getCache() {
+    return fileContentCache;
+  }
+
+  public long cacheEstimatedSize() {
+    return fileContentCache == null ? 0 : fileContentCache.cache.estimatedSize();
+  }
+
+  private static class CacheEntry {
+    private final int length;
+    private final List<ByteBuffer> buffers;
+
+    private CacheEntry(int length, List<ByteBuffer> buffers) {
+      this.length = length;
+      this.buffers = buffers;
+    }
+  }
+
+  public static class ContentCache {
+    private final long expireAfterAccessMs;
+    private final long maxTotalBytes;
+    private final long maxContentLength;
+    private final Cache<String, CacheEntry> cache;
+
+    private ContentCache(long expireAfterAccessMs, long maxTotalBytes, long maxContentLength) {
+      this.expireAfterAccessMs = expireAfterAccessMs;
+      this.maxTotalBytes = maxTotalBytes;
+      this.maxContentLength = maxContentLength;
+
+      Caffeine<Object, Object> builder = Caffeine.newBuilder();
+      if (expireAfterAccessMs > 0) {
+        builder = builder.expireAfterAccess(Duration.ofMillis(expireAfterAccessMs));
+      }
+
+      this.cache = builder.maximumWeight(maxTotalBytes)
+          .weigher((Weigher<String, CacheEntry>) (key, value) -> value.length)
+          .recordStats()
+          .build();
+    }
+
+    public long expireAfterAccess() {
+      return expireAfterAccessMs;
+    }
+
+    public long maxContentLength() {
+      return maxContentLength;
+    }
+
+    public long maxTotalBytes() {
+      return maxTotalBytes;
+    }
+
+    public Cache<String, CacheEntry> cache() {
+      return cache;
+    }
+
+    public CacheStats stats() {
+      return cache.stats();
+    }
+
+    @Override
+    public String toString() {
+      return getClass().getSimpleName() + '{' +
+          "expireAfterAccessMs=" + expireAfterAccessMs + ", " +
+          "maxContentLength=" + maxContentLength + ", " +
+          "maxTotalBytes=" + maxTotalBytes + ", " +
+          "cacheStats=" + cache.stats() + '}';
+    }
+  }
+
+  private static class CachingInputFile implements InputFile {
+    private final ContentCache contentCache;
+    private final InputFile wrappedInputFile;
+
+    private CachingInputFile(ContentCache cache, InputFile inFile) {
+      this.contentCache = cache;
+      this.wrappedInputFile = inFile;
+    }
+
+    @Override
+    public long getLength() {
+      CacheEntry buf = contentCache.cache.getIfPresent(location());
+      return (buf != null) ? buf.length : wrappedInputFile.getLength();
+    }
+
+    /**
+     * Opens a new {@link SeekableInputStream} for the underlying data file, either through cache or through the
+     * inner FileIO.
+     * <p>
+     * If data file is not cached yet, and it can fit in the cache, the file content will be cached first before
+     * returning a {@link ByteBufferInputStream}. Otherwise, return a new SeekableInputStream from the inner FIleIO.
+     *
+     * @return a {@link ByteBufferInputStream} if file exist in the cache or can fit in the cache. Otherwise, return a
+     * new SeekableInputStream from the inner FIleIO.
+     */
+    @Override
+    public SeekableInputStream newStream() {
+      try {
+        // read-through cache if file length is less than or equal to maximum length allowed to cache.
+        if (getLengthChecked() <= contentCache.maxContentLength()) {
+          return cachedStream();
+        }
+
+        // fallback to non-caching input stream.
+        return wrappedInputFile.newStream();
+      } catch (FileNotFoundException e) {
+        throw new NotFoundException(e, "Failed to open input stream for file: %s", wrappedInputFile.location());
+      } catch (IOException e) {
+        throw new RuntimeIOException(e, "Failed to open input stream for file: %s", wrappedInputFile.location());
+      }
+    }
+
+    @Override
+    public String location() {
+      return wrappedInputFile.location();
+    }
+
+    @Override
+    public boolean exists() {
+      CacheEntry buf = contentCache.cache.getIfPresent(location());
+      return buf != null || wrappedInputFile.exists();
+    }
+
+    public long getLengthChecked() throws IOException {
+      try {
+        return getLength();
+      } catch (RuntimeIOException ex) {
+        throw ex.getCause();
+      }
+    }
+
+    private CacheEntry getFileContent() {
+      try {
+        int fileLength = (int) getLength();

Review Comment:
   something bigger than an int long should automatically be excluded from caching, presumably. though if someone did want to declare a cache of > 2GB then it may be needed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org