You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ho...@apache.org on 2021/08/04 19:02:23 UTC

[spark] branch master updated: [SPARK-34309][BUILD][CORE][SQL][K8S] Use Caffeine instead of Guava Cache

This is an automated email from the ASF dual-hosted git repository.

holden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 01cf6f4  [SPARK-34309][BUILD][CORE][SQL][K8S] Use Caffeine instead of Guava Cache
01cf6f4 is described below

commit 01cf6f4c6b2a593a2a8717fd2cda13725424120e
Author: yangjie01 <ya...@baidu.com>
AuthorDate: Wed Aug 4 12:01:44 2021 -0700

    [SPARK-34309][BUILD][CORE][SQL][K8S] Use Caffeine instead of Guava Cache
    
    ### What changes were proposed in this pull request?
    There are 3 ways to use Guava cache in spark code:
    
    1. `Loadingcache` is the main way to use Guava cache in spark code and the key usages are as follows:
      a. `LoadingCache` with `maximumsize` data eviction policy, such as `appCache` in `ApplicationCache`, `cache` in `Codegenerator`
      b. `LoadingCache` with `maximumWeight` data eviction policy, such as `shuffleIndexCache` in `ExternalShuffleBlockResolver`
      c. `LoadingCache` with 'expireAfterWrite' data eviction policy, such as `tableRelationCache` in `SessionCatalog`
    2. `ManualCache` is another way to use Guava cache in spark code and the key usage is `cache` in `SharedInMemoryCache`, it use to caches partition file statuses in memory
    
    3. The last use way is `hadoopJobMetadata` in `SparkEnv`, it uses Guava Cache to build a `soft-reference map`.
    
    The goal of this pr is use `Caffeine` instead of `Guava Cache` because `Caffeine` is faster than `Guava Cache` from benchmarks, the main changes as follows:
    
    1. Add `Caffeine` deps to maven `pom.xml`
    
    2. Use `Caffeine` instead of Guava `LoadingCache`, `ManualCache` and soft-reference map in `SparkEnv`
    
    3. Add `LocalCacheBenchmark` to compare performance of `Loadingcache` between `Guava Cache` and `Caffeine`
    
    ### Why are the changes needed?
    `Caffeine` is faster than `Guava Cache` from benchmarks
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    
    - Pass the Jenkins or GitHub Action
    - Add `LocalCacheBenchmark` to compare performance of `Loadingcache` between `Guava Cache` and `Caffeine`
    
    Closes #31517 from LuciferYang/guava-cache-to-caffeine.
    
    Authored-by: yangjie01 <ya...@baidu.com>
    Signed-off-by: Holden Karau <hk...@netflix.com>
---
 common/network-shuffle/pom.xml                     |  4 +
 .../shuffle/ExternalShuffleBlockResolver.java      | 32 +++-----
 .../network/shuffle/RemoteBlockPushResolver.java   | 24 +++---
 .../LocalCacheBenchmark-jdk11-results.txt          | 12 +++
 core/benchmarks/LocalCacheBenchmark-results.txt    | 12 +++
 core/pom.xml                                       |  4 +
 .../src/main/scala/org/apache/spark/SparkEnv.scala |  4 +-
 .../spark/deploy/history/ApplicationCache.scala    | 37 +++++----
 .../apache/spark/rdd/ReliableCheckpointRDD.scala   | 24 +++---
 .../org/apache/spark/storage/BlockManager.scala    |  4 +-
 .../org/apache/spark/storage/BlockManagerId.scala  | 15 ++--
 .../spark/storage/BlockManagerMasterEndpoint.scala |  4 +-
 .../main/scala/org/apache/spark/util/Utils.scala   | 19 +++--
 .../org/apache/spark/LocalCacheBenchmark.scala     | 93 ++++++++++++++++++++++
 .../org/apache/spark/executor/ExecutorSuite.scala  | 18 ++---
 dev/deps/spark-deps-hadoop-2.7-hive-2.3            |  3 +
 dev/deps/spark-deps-hadoop-3.2-hive-2.3            |  3 +
 pom.xml                                            |  6 ++
 resource-managers/kubernetes/core/pom.xml          |  5 ++
 .../cluster/k8s/ExecutorPodsLifecycleManager.scala |  9 ++-
 sql/catalyst/pom.xml                               |  4 +
 .../sql/catalyst/catalog/SessionCatalog.scala      |  9 +--
 .../expressions/SubExprEvaluationRuntime.scala     | 21 ++---
 .../expressions/codegen/CodeGenerator.scala        |  6 +-
 .../catalyst/util/DateTimeFormatterHelper.scala    |  4 +-
 ...CodeGeneratorWithInterpretedFallbackSuite.scala |  4 +-
 .../SubExprEvaluationRuntimeSuite.scala            | 20 ++---
 sql/core/pom.xml                                   |  5 +-
 .../execution/datasources/FileStatusCache.scala    | 13 ++-
 .../spark/sql/execution/metric/SQLMetrics.scala    |  4 +-
 30 files changed, 283 insertions(+), 139 deletions(-)

diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml
index d3d78f2..1b78182 100644
--- a/common/network-shuffle/pom.xml
+++ b/common/network-shuffle/pom.xml
@@ -59,6 +59,10 @@
       <scope>provided</scope>
     </dependency>
     <dependency>
+      <groupId>com.github.ben-manes.caffeine</groupId>
+      <artifactId>caffeine</artifactId>
+    </dependency>
+    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
index 73d4e6c..650f33e 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
@@ -20,10 +20,7 @@ package org.apache.spark.network.shuffle;
 import java.io.*;
 import java.nio.charset.StandardCharsets;
 import java.util.*;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
+import java.util.concurrent.*;
 import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.builder.ToStringBuilder;
@@ -32,11 +29,10 @@ import org.apache.commons.lang3.tuple.Pair;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Weigher;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.cache.Weigher;
 import com.google.common.collect.Maps;
 import org.iq80.leveldb.DB;
 import org.iq80.leveldb.DBIterator;
@@ -109,23 +105,13 @@ public class ExternalShuffleBlockResolver {
       Executor directoryCleaner) throws IOException {
     this.conf = conf;
     this.rddFetchEnabled =
-      Boolean.valueOf(conf.get(Constants.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, "false"));
+      Boolean.parseBoolean(conf.get(Constants.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, "false"));
     this.registeredExecutorFile = registeredExecutorFile;
     String indexCacheSize = conf.get("spark.shuffle.service.index.cache.size", "100m");
-    CacheLoader<File, ShuffleIndexInformation> indexCacheLoader =
-        new CacheLoader<File, ShuffleIndexInformation>() {
-          public ShuffleIndexInformation load(File file) throws IOException {
-            return new ShuffleIndexInformation(file);
-          }
-        };
-    shuffleIndexCache = CacheBuilder.newBuilder()
+    shuffleIndexCache = Caffeine.newBuilder()
       .maximumWeight(JavaUtils.byteStringAsBytes(indexCacheSize))
-      .weigher(new Weigher<File, ShuffleIndexInformation>() {
-        public int weigh(File file, ShuffleIndexInformation indexInfo) {
-          return indexInfo.getSize();
-        }
-      })
-      .build(indexCacheLoader);
+      .weigher((Weigher<File, ShuffleIndexInformation>)(file, indexInfo) -> indexInfo.getSize())
+      .build(ShuffleIndexInformation::new);
     db = LevelDBProvider.initLevelDB(this.registeredExecutorFile, CURRENT_VERSION, mapper);
     if (db != null) {
       executors = reloadRegisteredExecutors(db);
@@ -317,7 +303,7 @@ public class ExternalShuffleBlockResolver {
           "shuffle_" + shuffleId + "_" + mapId + "_0.data"),
         shuffleIndexRecord.getOffset(),
         shuffleIndexRecord.getLength());
-    } catch (ExecutionException e) {
+    } catch (CompletionException e) {
       throw new RuntimeException("Failed to open file: " + indexFile, e);
     }
   }
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index 9a45f2c..8578843 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -31,9 +31,13 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.*;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Weigher;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicReference;
@@ -43,10 +47,6 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.cache.Weigher;
 import com.google.common.primitives.Ints;
 import com.google.common.primitives.Longs;
 import org.roaringbitmap.RoaringBitmap;
@@ -115,16 +115,10 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
       NettyUtils.createThreadFactory("spark-shuffle-merged-shuffle-directory-cleaner"));
     this.minChunkSize = conf.minChunkSizeInMergedShuffleFile();
     this.ioExceptionsThresholdDuringMerge = conf.ioExceptionsThresholdDuringMerge();
-    CacheLoader<File, ShuffleIndexInformation> indexCacheLoader =
-      new CacheLoader<File, ShuffleIndexInformation>() {
-        public ShuffleIndexInformation load(File file) throws IOException {
-          return new ShuffleIndexInformation(file);
-        }
-      };
-    indexCache = CacheBuilder.newBuilder()
+    indexCache = Caffeine.newBuilder()
       .maximumWeight(conf.mergedIndexCacheSize())
-      .weigher((Weigher<File, ShuffleIndexInformation>) (file, indexInfo) -> indexInfo.getSize())
-      .build(indexCacheLoader);
+      .weigher((Weigher<File, ShuffleIndexInformation>)(file, indexInfo) -> indexInfo.getSize())
+      .build(ShuffleIndexInformation::new);
     this.errorHandler = new ErrorHandler.BlockPushErrorHandler();
   }
 
@@ -299,7 +293,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
       ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(chunkId);
       return new FileSegmentManagedBuffer(
         conf, dataFile, shuffleIndexRecord.getOffset(), shuffleIndexRecord.getLength());
-    } catch (ExecutionException e) {
+    } catch (CompletionException e) {
       throw new RuntimeException(String.format(
         "Failed to open merged shuffle index file %s", indexFile.getPath()), e);
     }
diff --git a/core/benchmarks/LocalCacheBenchmark-jdk11-results.txt b/core/benchmarks/LocalCacheBenchmark-jdk11-results.txt
new file mode 100644
index 0000000..ceca386
--- /dev/null
+++ b/core/benchmarks/LocalCacheBenchmark-jdk11-results.txt
@@ -0,0 +1,12 @@
+================================================================================================
+Loading Cache
+================================================================================================
+
+OpenJDK 64-Bit Server VM 11.0.8+10-LTS on Mac OS X 10.15.7
+Intel(R) Core(TM) i5-7360U CPU @ 2.30GHz
+Loading Cache:                              Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+--------------------------------------------------------------------------------------------------------------------------
+Guava Cache                                             5              6           1         15.9          62.8       1.0X
+Caffeine                                                2              2           0         46.1          21.7       2.9X
+
+
diff --git a/core/benchmarks/LocalCacheBenchmark-results.txt b/core/benchmarks/LocalCacheBenchmark-results.txt
new file mode 100644
index 0000000..563d470
--- /dev/null
+++ b/core/benchmarks/LocalCacheBenchmark-results.txt
@@ -0,0 +1,12 @@
+================================================================================================
+Loading Cache
+================================================================================================
+
+OpenJDK 64-Bit Server VM 1.8.0_232-b18 on Mac OS X 10.15.7
+Intel(R) Core(TM) i5-7360U CPU @ 2.30GHz
+Loading Cache:                              Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+--------------------------------------------------------------------------------------------------------------------------
+Guava Cache                                             5              5           0         16.7          60.0       1.0X
+Caffeine                                                2              2           1         44.3          22.6       2.7X
+
+
diff --git a/core/pom.xml b/core/pom.xml
index be44964..f3f9e48 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -48,6 +48,10 @@
       <artifactId>guava</artifactId>
     </dependency>
     <dependency>
+      <groupId>com.github.ben-manes.caffeine</groupId>
+      <artifactId>caffeine</artifactId>
+    </dependency>
+    <dependency>
       <groupId>com.twitter</groupId>
       <artifactId>chill_${scala.binary.version}</artifactId>
     </dependency>
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index ee50a8f..dae5568 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -26,7 +26,7 @@ import scala.collection.concurrent
 import scala.collection.mutable
 import scala.util.Properties
 
-import com.google.common.cache.CacheBuilder
+import com.github.benmanes.caffeine.cache.Caffeine
 import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark.annotation.DeveloperApi
@@ -77,7 +77,7 @@ class SparkEnv (
   // A general, soft-reference map for metadata needed during HadoopRDD split computation
   // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
   private[spark] val hadoopJobMetadata =
-    CacheBuilder.newBuilder().softValues().build[String, AnyRef]().asMap()
+    Caffeine.newBuilder().softValues().build[String, AnyRef]().asMap()
 
   private[spark] var driverTmpDir: Option[String] = None
 
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala
index 89b30a3..2be417a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala
@@ -18,15 +18,14 @@
 package org.apache.spark.deploy.history
 
 import java.util.NoSuchElementException
-import java.util.concurrent.ExecutionException
+import java.util.concurrent.CompletionException
 import javax.servlet.{DispatcherType, Filter, FilterChain, FilterConfig, ServletException, ServletRequest, ServletResponse}
 import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
 
 import scala.collection.JavaConverters._
 
 import com.codahale.metrics.{Counter, MetricRegistry, Timer}
-import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache, RemovalListener, RemovalNotification}
-import com.google.common.util.concurrent.UncheckedExecutionException
+import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine, LoadingCache, RemovalCause, RemovalListener}
 import org.eclipse.jetty.servlet.FilterHolder
 
 import org.apache.spark.internal.Logging
@@ -62,21 +61,27 @@ private[history] class ApplicationCache(
 
     /**
      * Removal event notifies the provider to detach the UI.
-     * @param rm removal notification
+     * @param key removal key
+     * @param value removal value
+     * @param cause the reason why a `CacheEntry` was removed, it should
+     *              always be `SIZE` because `appCache` configured with
+     *              `maximumSize` eviction strategy
      */
-    override def onRemoval(rm: RemovalNotification[CacheKey, CacheEntry]): Unit = {
+    override def onRemoval(key: CacheKey, value: CacheEntry, cause: RemovalCause): Unit = {
       metrics.evictionCount.inc()
-      val key = rm.getKey
-      logDebug(s"Evicting entry ${key}")
-      operations.detachSparkUI(key.appId, key.attemptId, rm.getValue().loadedUI.ui)
+      logDebug(s"Evicting entry $key")
+      operations.detachSparkUI(key.appId, key.attemptId, value.loadedUI.ui)
     }
   }
 
   private val appCache: LoadingCache[CacheKey, CacheEntry] = {
-    CacheBuilder.newBuilder()
-        .maximumSize(retainedApplications)
-        .removalListener(removalListener)
-        .build(appLoader)
+    val builder = Caffeine.newBuilder()
+      .maximumSize(retainedApplications)
+      .removalListener(removalListener)
+      // SPARK-34309: Use custom Executor to compatible with
+      // the data eviction behavior of Guava cache
+      .executor((command: Runnable) => command.run())
+    builder.build[CacheKey, CacheEntry](appLoader)
   }
 
   /**
@@ -86,9 +91,9 @@ private[history] class ApplicationCache(
 
   def get(appId: String, attemptId: Option[String] = None): CacheEntry = {
     try {
-      appCache.get(new CacheKey(appId, attemptId))
+      appCache.get(CacheKey(appId, attemptId))
     } catch {
-      case e @ (_: ExecutionException | _: UncheckedExecutionException) =>
+      case e @ (_: CompletionException | _: RuntimeException) =>
         throw Option(e.getCause()).getOrElse(e)
     }
   }
@@ -127,7 +132,7 @@ private[history] class ApplicationCache(
   }
 
   /** @return Number of cached UIs. */
-  def size(): Long = appCache.size()
+  def size(): Long = appCache.estimatedSize()
 
   private def time[T](t: Timer)(f: => T): T = {
     val timeCtx = t.time()
@@ -197,7 +202,7 @@ private[history] class ApplicationCache(
     val sb = new StringBuilder(s"ApplicationCache(" +
           s" retainedApplications= $retainedApplications)")
     sb.append(s"; time= ${clock.getTimeMillis()}")
-    sb.append(s"; entry count= ${appCache.size()}\n")
+    sb.append(s"; entry count= ${appCache.estimatedSize()}\n")
     sb.append("----\n")
     appCache.asMap().asScala.foreach {
       case(key, entry) => sb.append(s"  $key -> $entry\n")
diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
index 7339eb6..f47f823 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit
 import scala.reflect.ClassTag
 import scala.util.control.NonFatal
 
-import com.google.common.cache.{CacheBuilder, CacheLoader}
+import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine}
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark._
@@ -85,16 +85,18 @@ private[spark] class ReliableCheckpointRDD[T: ClassTag](
   }
 
   // Cache of preferred locations of checkpointed files.
-  @transient private[spark] lazy val cachedPreferredLocations = CacheBuilder.newBuilder()
-    .expireAfterWrite(
-      SparkEnv.get.conf.get(CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_TIME).get,
-      TimeUnit.MINUTES)
-    .build(
-      new CacheLoader[Partition, Seq[String]]() {
-        override def load(split: Partition): Seq[String] = {
-          getPartitionBlockLocations(split)
-        }
-      })
+  @transient private[spark] lazy val cachedPreferredLocations = {
+    val builder = Caffeine.newBuilder()
+      .expireAfterWrite(
+        SparkEnv.get.conf.get(CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_TIME).get,
+        TimeUnit.MINUTES)
+    val loader = new CacheLoader[Partition, Seq[String]]() {
+      override def load(split: Partition): Seq[String] = {
+        getPartitionBlockLocations(split)
+      }
+    }
+    builder.build[Partition, Seq[String]](loader)
+  }
 
   // Returns the block locations of given partition on file system.
   private def getPartitionBlockLocations(split: Partition): Seq[String] = {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 4c646b2..20d1e03 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -34,7 +34,7 @@ import scala.util.{Failure, Random, Success, Try}
 import scala.util.control.NonFatal
 
 import com.codahale.metrics.{MetricRegistry, MetricSet}
-import com.google.common.cache.CacheBuilder
+import com.github.benmanes.caffeine.cache.Caffeine
 import org.apache.commons.io.IOUtils
 
 import org.apache.spark._
@@ -123,7 +123,7 @@ private[spark] class HostLocalDirManager(
     blockStoreClient: BlockStoreClient) extends Logging {
 
   private val executorIdToLocalDirsCache =
-    CacheBuilder
+    Caffeine
       .newBuilder()
       .maximumSize(cacheSize)
       .build[String, Array[String]]()
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
index c6a4457..316ad69 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
@@ -19,7 +19,7 @@ package org.apache.spark.storage
 
 import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
 
-import com.google.common.cache.{CacheBuilder, CacheLoader}
+import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine}
 
 import org.apache.spark.SparkContext
 import org.apache.spark.annotation.DeveloperApi
@@ -136,11 +136,14 @@ private[spark] object BlockManagerId {
    * The max cache size is hardcoded to 10000, since the size of a BlockManagerId
    * object is about 48B, the total memory cost should be below 1MB which is feasible.
    */
-  val blockManagerIdCache = CacheBuilder.newBuilder()
-    .maximumSize(10000)
-    .build(new CacheLoader[BlockManagerId, BlockManagerId]() {
-      override def load(id: BlockManagerId) = id
-    })
+  val blockManagerIdCache = {
+    Caffeine.newBuilder()
+      .maximumSize(10000)
+      .build[BlockManagerId, BlockManagerId](
+        new CacheLoader[BlockManagerId, BlockManagerId]() {
+          override def load(id: BlockManagerId): BlockManagerId = id
+        })
+  }
 
   def getCachedBlockManagerId(id: BlockManagerId): BlockManagerId = {
     blockManagerIdCache.get(id)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 29c605d..ef82d52 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -27,7 +27,7 @@ import scala.concurrent.{ExecutionContext, Future, TimeoutException}
 import scala.util.Random
 import scala.util.control.NonFatal
 
-import com.google.common.cache.CacheBuilder
+import com.github.benmanes.caffeine.cache.Caffeine
 
 import org.apache.spark.{MapOutputTrackerMaster, SparkConf}
 import org.apache.spark.annotation.DeveloperApi
@@ -56,7 +56,7 @@ class BlockManagerMasterEndpoint(
 
   // Mapping from executor id to the block manager's local disk directories.
   private val executorIdToLocalDirs =
-    CacheBuilder
+    Caffeine
       .newBuilder()
       .maximumSize(conf.get(config.STORAGE_LOCAL_DISK_BY_EXECUTORS_CACHE_SIZE))
       .build[String, Array[String]]()
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 7ea96fe..f3268cb 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -44,7 +44,7 @@ import scala.util.control.{ControlThrowable, NonFatal}
 import scala.util.matching.Regex
 
 import _root_.io.netty.channel.unix.Errors.NativeIoException
-import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
+import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine, LoadingCache}
 import com.google.common.collect.Interners
 import com.google.common.io.{ByteStreams, Files => GFiles}
 import com.google.common.net.InetAddresses
@@ -1616,13 +1616,16 @@ private[spark] object Utils extends Logging {
     if (compressedLogFileLengthCache == null) {
       val compressedLogFileLengthCacheSize = sparkConf.get(
         UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE_CONF)
-      compressedLogFileLengthCache = CacheBuilder.newBuilder()
-        .maximumSize(compressedLogFileLengthCacheSize)
-        .build[String, java.lang.Long](new CacheLoader[String, java.lang.Long]() {
-        override def load(path: String): java.lang.Long = {
-          Utils.getCompressedFileLength(new File(path))
-        }
-      })
+      compressedLogFileLengthCache = {
+        val builder = Caffeine.newBuilder()
+          .maximumSize(compressedLogFileLengthCacheSize)
+        builder.build[String, java.lang.Long](
+          new CacheLoader[String, java.lang.Long]() {
+            override def load(path: String): java.lang.Long = {
+              Utils.getCompressedFileLength(new File(path))
+            }
+          })
+      }
     }
     compressedLogFileLengthCache
   }
diff --git a/core/src/test/scala/org/apache/spark/LocalCacheBenchmark.scala b/core/src/test/scala/org/apache/spark/LocalCacheBenchmark.scala
new file mode 100644
index 0000000..5eadfdf
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/LocalCacheBenchmark.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.Callable
+
+import scala.concurrent.duration.Duration
+import scala.util.Random
+
+import com.github.benmanes.caffeine.cache.{CacheLoader => CaffeineCacheLoader, Caffeine}
+import com.google.common.cache.{CacheBuilder, CacheLoader}
+
+import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Benchmark for Guava Cache vs Caffeine.
+ * To run this benchmark:
+ * {{{
+ *   1. without sbt:
+ *      bin/spark-submit --class <this class> --jars <spark core test jar>
+ *   2. build/sbt "core/test:runMain <this class>"
+ *   3. generate result:
+ *      SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/test:runMain <this class>"
+ *      Results will be written to "benchmarks/LocalCacheBenchmark-results.txt".
+ * }}}
+ */
+object LocalCacheBenchmark extends BenchmarkBase {
+
+  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+    runBenchmark("Loading Cache") {
+      val size = 10000
+      val parallelism = 8
+      val guavaCacheConcurrencyLevel = 8
+      val dataset = (1 to parallelism)
+        .map(_ => Random.shuffle(List.range(0, size)))
+        .map(list => list.map(i => TestData(i)))
+      val executor = ThreadUtils.newDaemonFixedThreadPool(parallelism, "Loading Cache Test Pool")
+      val guavaCacheLoader = new CacheLoader[TestData, TestData]() {
+        override def load(id: TestData): TestData = {
+          id
+        }
+      }
+      val caffeineCacheLoader = new CaffeineCacheLoader[TestData, TestData]() {
+        override def load(id: TestData): TestData = {
+          id
+        }
+      }
+
+      val benchmark = new Benchmark("Loading Cache", size * parallelism, 3, output = output)
+      benchmark.addCase("Guava Cache") { _ =>
+        val cache = CacheBuilder.newBuilder()
+          .concurrencyLevel(guavaCacheConcurrencyLevel).build[TestData, TestData](guavaCacheLoader)
+        dataset.map(dataList => executor.submit(new Callable[Unit] {
+          override def call(): Unit = {
+            dataList.foreach(key => cache.get(key))
+          }
+        })).foreach(future => ThreadUtils.awaitResult(future, Duration.Inf))
+        cache.cleanUp()
+      }
+
+      benchmark.addCase("Caffeine") { _ =>
+        val cache = Caffeine.newBuilder().build[TestData, TestData](caffeineCacheLoader)
+        dataset.map(dataList => executor.submit(new Callable[Unit] {
+          override def call(): Unit = {
+            dataList.foreach(key => cache.get(key))
+          }
+        })).foreach(future => ThreadUtils.awaitResult(future, Duration.Inf))
+        cache.cleanUp()
+      }
+
+      benchmark.run()
+    }
+  }
+
+  case class TestData(content: Int)
+}
+
diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
index a237447..8ec279a 100644
--- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
@@ -29,7 +29,7 @@ import scala.collection.immutable
 import scala.collection.mutable.{ArrayBuffer, Map}
 import scala.concurrent.duration._
 
-import com.google.common.cache.{CacheBuilder, CacheLoader}
+import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine}
 import org.mockito.ArgumentCaptor
 import org.mockito.ArgumentMatchers.{any, eq => meq}
 import org.mockito.Mockito.{inOrder, verify, when}
@@ -467,9 +467,9 @@ class ExecutorSuite extends SparkFunSuite
       }
     }
 
-    def errorInGuavaCache(e: => Throwable): Throwable = {
-      val cache = CacheBuilder.newBuilder()
-        .build(new CacheLoader[String, String] {
+    def errorInCaffeine(e: => Throwable): Throwable = {
+      val cache = Caffeine.newBuilder().build[String, String](
+        new CacheLoader[String, String] {
           override def load(key: String): String = throw e
         })
       intercept[Throwable] {
@@ -484,18 +484,18 @@ class ExecutorSuite extends SparkFunSuite
       import Executor.isFatalError
       // `e`'s depth is 1 so `depthToCheck` needs to be at least 3 to detect fatal errors.
       assert(isFatalError(e, depthToCheck) == (depthToCheck >= 1 && isFatal))
+      assert(isFatalError(errorInCaffeine(e), depthToCheck) == (depthToCheck >= 1 && isFatal))
       // `e`'s depth is 2 so `depthToCheck` needs to be at least 3 to detect fatal errors.
       assert(isFatalError(errorInThreadPool(e), depthToCheck) == (depthToCheck >= 2 && isFatal))
-      assert(isFatalError(errorInGuavaCache(e), depthToCheck) == (depthToCheck >= 2 && isFatal))
       assert(isFatalError(
         new SparkException("foo", e),
         depthToCheck) == (depthToCheck >= 2 && isFatal))
-      // `e`'s depth is 3 so `depthToCheck` needs to be at least 3 to detect fatal errors.
       assert(isFatalError(
-        errorInThreadPool(errorInGuavaCache(e)),
-        depthToCheck) == (depthToCheck >= 3 && isFatal))
+        errorInThreadPool(errorInCaffeine(e)),
+        depthToCheck) == (depthToCheck >= 2 && isFatal))
+      // `e`'s depth is 3 so `depthToCheck` needs to be at least 3 to detect fatal errors.
       assert(isFatalError(
-        errorInGuavaCache(errorInThreadPool(e)),
+        errorInCaffeine(errorInThreadPool(e)),
         depthToCheck) == (depthToCheck >= 3 && isFatal))
       assert(isFatalError(
         new SparkException("foo", new SparkException("foo", e)),
diff --git a/dev/deps/spark-deps-hadoop-2.7-hive-2.3 b/dev/deps/spark-deps-hadoop-2.7-hive-2.3
index 50b5bf0..92f6308 100644
--- a/dev/deps/spark-deps-hadoop-2.7-hive-2.3
+++ b/dev/deps/spark-deps-hadoop-2.7-hive-2.3
@@ -30,7 +30,9 @@ blas/2.2.0//blas-2.2.0.jar
 bonecp/0.8.0.RELEASE//bonecp-0.8.0.RELEASE.jar
 breeze-macros_2.12/1.2//breeze-macros_2.12-1.2.jar
 breeze_2.12/1.2//breeze_2.12-1.2.jar
+caffeine/2.9.1//caffeine-2.9.1.jar
 cats-kernel_2.12/2.1.1//cats-kernel_2.12-2.1.1.jar
+checker-qual/3.10.0//checker-qual-3.10.0.jar
 chill-java/0.10.0//chill-java-0.10.0.jar
 chill_2.12/0.10.0//chill_2.12-0.10.0.jar
 commons-beanutils/1.9.4//commons-beanutils-1.9.4.jar
@@ -62,6 +64,7 @@ datanucleus-core/4.1.17//datanucleus-core-4.1.17.jar
 datanucleus-rdbms/4.1.19//datanucleus-rdbms-4.1.19.jar
 derby/10.14.2.0//derby-10.14.2.0.jar
 dropwizard-metrics-hadoop-metrics2-reporter/0.1.2//dropwizard-metrics-hadoop-metrics2-reporter-0.1.2.jar
+error_prone_annotations/2.5.1//error_prone_annotations-2.5.1.jar
 flatbuffers-java/1.9.0//flatbuffers-java-1.9.0.jar
 generex/1.0.2//generex-1.0.2.jar
 gson/2.2.4//gson-2.2.4.jar
diff --git a/dev/deps/spark-deps-hadoop-3.2-hive-2.3 b/dev/deps/spark-deps-hadoop-3.2-hive-2.3
index 31c9b96..4cacf0a 100644
--- a/dev/deps/spark-deps-hadoop-3.2-hive-2.3
+++ b/dev/deps/spark-deps-hadoop-3.2-hive-2.3
@@ -25,7 +25,9 @@ blas/2.2.0//blas-2.2.0.jar
 bonecp/0.8.0.RELEASE//bonecp-0.8.0.RELEASE.jar
 breeze-macros_2.12/1.2//breeze-macros_2.12-1.2.jar
 breeze_2.12/1.2//breeze_2.12-1.2.jar
+caffeine/2.9.1//caffeine-2.9.1.jar
 cats-kernel_2.12/2.1.1//cats-kernel_2.12-2.1.1.jar
+checker-qual/3.10.0//checker-qual-3.10.0.jar
 chill-java/0.10.0//chill-java-0.10.0.jar
 chill_2.12/0.10.0//chill_2.12-0.10.0.jar
 commons-cli/1.2//commons-cli-1.2.jar
@@ -53,6 +55,7 @@ datanucleus-core/4.1.17//datanucleus-core-4.1.17.jar
 datanucleus-rdbms/4.1.19//datanucleus-rdbms-4.1.19.jar
 derby/10.14.2.0//derby-10.14.2.0.jar
 dropwizard-metrics-hadoop-metrics2-reporter/0.1.2//dropwizard-metrics-hadoop-metrics2-reporter-0.1.2.jar
+error_prone_annotations/2.5.1//error_prone_annotations-2.5.1.jar
 flatbuffers-java/1.9.0//flatbuffers-java-1.9.0.jar
 generex/1.0.2//generex-1.0.2.jar
 gson/2.2.4//gson-2.2.4.jar
diff --git a/pom.xml b/pom.xml
index d17e055..8f82d1e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -182,6 +182,7 @@
     <commons-pool2.version>2.6.2</commons-pool2.version>
     <datanucleus-core.version>4.1.17</datanucleus-core.version>
     <guava.version>14.0.1</guava.version>
+    <caffeine.version>2.9.1</caffeine.version>
     <janino.version>3.0.16</janino.version>
     <jersey.version>2.34</jersey.version>
     <joda.version>2.10.10</joda.version>
@@ -493,6 +494,11 @@
         <scope>provided</scope>
       </dependency>
       <dependency>
+        <groupId>com.github.ben-manes.caffeine</groupId>
+        <artifactId>caffeine</artifactId>
+        <version>${caffeine.version}</version>
+      </dependency>
+      <dependency>
         <groupId>org.jpmml</groupId>
         <artifactId>pmml-model</artifactId>
         <version>1.4.8</version>
diff --git a/resource-managers/kubernetes/core/pom.xml b/resource-managers/kubernetes/core/pom.xml
index 0cb5e11..cb9c7b1 100644
--- a/resource-managers/kubernetes/core/pom.xml
+++ b/resource-managers/kubernetes/core/pom.xml
@@ -53,6 +53,11 @@
     </dependency>
 
     <dependency>
+      <groupId>com.github.ben-manes.caffeine</groupId>
+      <artifactId>caffeine</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>io.fabric8</groupId>
       <artifactId>kubernetes-client</artifactId>
       <version>${kubernetes-client.version}</version>
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
index e255de4..38e7f99 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
@@ -18,12 +18,13 @@ package org.apache.spark.scheduler.cluster.k8s
 
 import java.util.concurrent.TimeUnit
 
-import com.google.common.cache.CacheBuilder
-import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
-import io.fabric8.kubernetes.client.KubernetesClient
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
+import com.github.benmanes.caffeine.cache.Caffeine
+import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
+import io.fabric8.kubernetes.client.KubernetesClient
+
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
@@ -47,7 +48,7 @@ private[spark] class ExecutorPodsLifecycleManager(
   // to avoid doing so. Expire cache entries so that this data structure doesn't grow beyond
   // bounds.
   private lazy val removedExecutorsCache =
-    CacheBuilder.newBuilder()
+    Caffeine.newBuilder()
       .expireAfterWrite(3, TimeUnit.MINUTES)
       .build[java.lang.Long, java.lang.Long]()
 
diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml
index f1d3a3a..22fa097 100644
--- a/sql/catalyst/pom.xml
+++ b/sql/catalyst/pom.xml
@@ -93,6 +93,10 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>com.github.ben-manes.caffeine</groupId>
+      <artifactId>caffeine</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.codehaus.janino</groupId>
       <artifactId>janino</artifactId>
     </dependency>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 4860f46..6b1f519 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -26,7 +26,7 @@ import javax.annotation.concurrent.GuardedBy
 import scala.collection.mutable
 import scala.util.{Failure, Success, Try}
 
-import com.google.common.cache.{Cache, CacheBuilder}
+import com.github.benmanes.caffeine.cache.{Cache, Caffeine}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
@@ -159,19 +159,18 @@ class SessionCatalog(
   }
 
   private val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = {
-    var builder = CacheBuilder.newBuilder()
+    var builder = Caffeine.newBuilder()
       .maximumSize(cacheSize)
 
     if (cacheTTL > 0) {
       builder = builder.expireAfterWrite(cacheTTL, TimeUnit.SECONDS)
     }
-
-    builder.build[QualifiedTableName, LogicalPlan]()
+    builder.build()
   }
 
   /** This method provides a way to get a cached plan. */
   def getCachedPlan(t: QualifiedTableName, c: Callable[LogicalPlan]): LogicalPlan = {
-    tableRelationCache.get(t, c)
+    tableRelationCache.get(t, (_: QualifiedTableName) => c.call())
   }
 
   /** This method provides a way to get a cached plan if the key exists. */
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntime.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntime.scala
index fcc8ee6..8cfa466 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntime.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntime.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.expressions
 
 import java.util.IdentityHashMap
 
-import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
+import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine, LoadingCache}
 import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException}
 
 import org.apache.spark.sql.catalyst.InternalRow
@@ -38,14 +38,17 @@ class SubExprEvaluationRuntime(cacheMaxEntries: Int) {
   // won't be use by multi-threads so we don't need to consider concurrency here.
   private var proxyExpressionCurrentId = 0
 
-  private[sql] val cache: LoadingCache[ExpressionProxy, ResultProxy] = CacheBuilder.newBuilder()
-    .maximumSize(cacheMaxEntries)
-    .build(
-      new CacheLoader[ExpressionProxy, ResultProxy]() {
-        override def load(expr: ExpressionProxy): ResultProxy = {
-          ResultProxy(expr.proxyEval(currentInput))
-        }
-      })
+  private[sql] val cache: LoadingCache[ExpressionProxy, ResultProxy] =
+    Caffeine.newBuilder().maximumSize(cacheMaxEntries)
+      // SPARK-34309: Use custom Executor to compatible with
+      // the data eviction behavior of Guava cache
+      .executor((command: Runnable) => command.run())
+      .build[ExpressionProxy, ResultProxy](
+        new CacheLoader[ExpressionProxy, ResultProxy]() {
+          override def load(expr: ExpressionProxy): ResultProxy = {
+            ResultProxy(expr.proxyEval(currentInput))
+          }
+        })
 
   private var currentInput: InternalRow = null
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 7f2c1c6..4dbfd77 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -25,7 +25,7 @@ import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 import scala.util.control.NonFatal
 
-import com.google.common.cache.{CacheBuilder, CacheLoader}
+import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine}
 import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException}
 import org.codehaus.commons.compiler.CompileException
 import org.codehaus.janino.{ByteArrayClassLoader, ClassBodyEvaluator, InternalCompilerException, SimpleCompiler}
@@ -1577,9 +1577,9 @@ object CodeGenerator extends Logging {
    * automatically, in order to constrain its memory footprint.  Note that this cache does not use
    * weak keys/values and thus does not respond to memory pressure.
    */
-  private val cache = CacheBuilder.newBuilder()
+  private val cache = Caffeine.newBuilder()
     .maximumSize(SQLConf.get.codegenCacheMaxEntries)
-    .build(
+    .build[CodeAndComment, (GeneratedClass, ByteCodeStats)](
       new CacheLoader[CodeAndComment, (GeneratedClass, ByteCodeStats)]() {
         override def load(code: CodeAndComment): (GeneratedClass, ByteCodeStats) = {
           val startTime = System.nanoTime()
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala
index b00113b..9ae12c0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala
@@ -23,7 +23,7 @@ import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder, ResolverSt
 import java.time.temporal.{ChronoField, TemporalAccessor, TemporalQueries}
 import java.util.{Date, Locale}
 
-import com.google.common.cache.CacheBuilder
+import com.github.benmanes.caffeine.cache.Caffeine
 
 import org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper._
 import org.apache.spark.sql.errors.QueryExecutionErrors
@@ -194,7 +194,7 @@ trait DateTimeFormatterHelper {
 }
 
 private object DateTimeFormatterHelper {
-  val cache = CacheBuilder.newBuilder()
+  val cache = Caffeine.newBuilder()
     .maximumSize(128)
     .build[(String, Locale, Boolean), DateTimeFormatter]()
 
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala
index da5bddb..ab177d0 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
-import java.util.concurrent.ExecutionException
+import java.util.concurrent.CompletionException
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.InternalRow
@@ -83,7 +83,7 @@ class CodeGeneratorWithInterpretedFallbackSuite extends SparkFunSuite with PlanT
   }
 
   test("codegen failures in the CODEGEN_ONLY mode") {
-    val errMsg = intercept[ExecutionException] {
+    val errMsg = intercept[CompletionException] {
       val input = Seq(BoundReference(0, IntegerType, nullable = true))
       withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenOnly) {
         FailedCodegenProjection.createObject(input)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntimeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntimeSuite.scala
index f8dca26..88c1c0d 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntimeSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntimeSuite.scala
@@ -23,47 +23,47 @@ class SubExprEvaluationRuntimeSuite extends SparkFunSuite {
   test("Evaluate ExpressionProxy should create cached result") {
     val runtime = new SubExprEvaluationRuntime(1)
     val proxy = ExpressionProxy(Literal(1), 0, runtime)
-    assert(runtime.cache.size() == 0)
+    assert(runtime.cache.estimatedSize() == 0)
     proxy.eval()
-    assert(runtime.cache.size() == 1)
+    assert(runtime.cache.estimatedSize() == 1)
     assert(runtime.cache.get(proxy) == ResultProxy(1))
   }
 
   test("SubExprEvaluationRuntime cannot exceed configured max entries") {
     val runtime = new SubExprEvaluationRuntime(2)
-    assert(runtime.cache.size() == 0)
+    assert(runtime.cache.estimatedSize() == 0)
 
     val proxy1 = ExpressionProxy(Literal(1), 0, runtime)
     proxy1.eval()
-    assert(runtime.cache.size() == 1)
+    assert(runtime.cache.estimatedSize() == 1)
     assert(runtime.cache.get(proxy1) == ResultProxy(1))
 
     val proxy2 = ExpressionProxy(Literal(2), 1, runtime)
     proxy2.eval()
-    assert(runtime.cache.size() == 2)
+    assert(runtime.cache.estimatedSize() == 2)
     assert(runtime.cache.get(proxy2) == ResultProxy(2))
 
     val proxy3 = ExpressionProxy(Literal(3), 2, runtime)
     proxy3.eval()
-    assert(runtime.cache.size() == 2)
+    assert(runtime.cache.estimatedSize() == 2)
     assert(runtime.cache.get(proxy3) == ResultProxy(3))
   }
 
   test("setInput should empty cached result") {
     val runtime = new SubExprEvaluationRuntime(2)
     val proxy1 = ExpressionProxy(Literal(1), 0, runtime)
-    assert(runtime.cache.size() == 0)
+    assert(runtime.cache.estimatedSize() == 0)
     proxy1.eval()
-    assert(runtime.cache.size() == 1)
+    assert(runtime.cache.estimatedSize() == 1)
     assert(runtime.cache.get(proxy1) == ResultProxy(1))
 
     val proxy2 = ExpressionProxy(Literal(2), 1, runtime)
     proxy2.eval()
-    assert(runtime.cache.size() == 2)
+    assert(runtime.cache.estimatedSize() == 2)
     assert(runtime.cache.get(proxy2) == ResultProxy(2))
 
     runtime.setInput()
-    assert(runtime.cache.size() == 0)
+    assert(runtime.cache.estimatedSize() == 0)
   }
 
   test("Wrap ExpressionProxy on subexpressions") {
diff --git a/sql/core/pom.xml b/sql/core/pom.xml
index 73fa60c..5993e98 100644
--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -89,7 +89,10 @@
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
-
+    <dependency>
+      <groupId>com.github.ben-manes.caffeine</groupId>
+      <artifactId>caffeine</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.apache.orc</groupId>
       <artifactId>orc-core</artifactId>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
index b5d800f..5db69a3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean
 
 import scala.collection.JavaConverters._
 
-import com.google.common.cache._
+import com.github.benmanes.caffeine.cache.{Cache, Caffeine, RemovalCause, RemovalListener, Weigher}
 import org.apache.hadoop.fs.{FileStatus, Path}
 
 import org.apache.spark.internal.Logging
@@ -119,11 +119,10 @@ private class SharedInMemoryCache(maxSizeInBytes: Long, cacheTTL: Long) extends
         }
       }
     }
-    val removalListener = new RemovalListener[(ClientId, Path), Array[FileStatus]]() {
-      override def onRemoval(
-          removed: RemovalNotification[(ClientId, Path),
-          Array[FileStatus]]): Unit = {
-        if (removed.getCause == RemovalCause.SIZE &&
+    val removalListener = new RemovalListener[(ClientId, Path), Array[FileStatus]] {
+      override def onRemoval(key: (ClientId, Path), value: Array[FileStatus],
+          cause: RemovalCause): Unit = {
+        if (cause == RemovalCause.SIZE &&
           warnedAboutEviction.compareAndSet(false, true)) {
           logWarning(
             "Evicting cached table partition metadata from memory due to size constraints " +
@@ -133,7 +132,7 @@ private class SharedInMemoryCache(maxSizeInBytes: Long, cacheTTL: Long) extends
       }
     }
 
-    var builder = CacheBuilder.newBuilder()
+    var builder = Caffeine.newBuilder()
       .weigher(weigher)
       .removalListener(removalListener)
       .maximumWeight(maxSizeInBytes / weightScale)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index a613a39..d8fba8b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -22,7 +22,7 @@ import java.util.{Arrays, Locale}
 
 import scala.concurrent.duration._
 
-import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
+import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine, LoadingCache}
 
 import org.apache.spark.SparkContext
 import org.apache.spark.scheduler.AccumulableInfo
@@ -97,7 +97,7 @@ object SQLMetrics {
   val cachedSQLAccumIdentifier = Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)
 
   private val metricsCache: LoadingCache[String, Option[String]] =
-    CacheBuilder.newBuilder().maximumSize(10000)
+    Caffeine.newBuilder().maximumSize(10000)
     .build(new CacheLoader[String, Option[String]] {
       override def load(name: String): Option[String] = {
         Option(name)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org