You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ya...@apache.org on 2023/12/01 03:52:02 UTC

(spark) branch master updated: [SPARK-45629][CORE][SQL][CONNECT][ML][STREAMING][BUILD][EXAMPLES] Fix `Implicit definition should have explicit type`

This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a186324b9ad1 [SPARK-45629][CORE][SQL][CONNECT][ML][STREAMING][BUILD][EXAMPLES] Fix `Implicit definition should have explicit type`
a186324b9ad1 is described below

commit a186324b9ad11d1ca7672d1b120153b8606ca9bc
Author: tangjiafu <ji...@qq.com>
AuthorDate: Fri Dec 1 11:51:46 2023 +0800

    [SPARK-45629][CORE][SQL][CONNECT][ML][STREAMING][BUILD][EXAMPLES] Fix `Implicit definition should have explicit type`
    
    ### What changes were proposed in this pull request?
    This PR aims to fix `Implicit definition should have explicit type` in Scala 2.13.
    
    This pr includes:
    1. Declaration types for global variables of implicit
    2. Add scala.annotation.warn
    
    ### Why are the changes needed?
    
    - For implicit global variables without explicit type declaration, will get warnning :
    warning: Implicit definition should have explicit type (inferred String) [quickfixable]
    - No modifications are required for local variables.
    Additionally, to handle cases involving reflection-related types like ClassTag in implicit variables, the [scala.annotation.warn](https://github.com/scala.annotation.warn) annotation is used to suppress the warning.
    
    Furthermore, warnings generated in Spark will be treated as errors:
    
    [error] ... Implicit definition should have explicit type (inferred org.json4s.DefaultFormats.type) [quickfixable]
    ...
    [error]   implicit val formats = org.json4s.DefaultFormats
    
    Jira link:
    SPARK-45314: https://issues.apache.org/jira/browse/SPARK-45629
    
    Related issue link about `implicit` :
    https://github.com/scala/bug/issues/5265
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    Most of the testing is completed through CI, and the example module is locally compiled and tested in IDEA Additionally, there are some writing changes that are verified through demo code
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    no
    
    Closes #43526 from laglangyue/SPARK-45629.
    
    Lead-authored-by: tangjiafu <ji...@qq.com>
    Co-authored-by: tangjiafu <ta...@corp.netease.com>
    Signed-off-by: yangjie01 <ya...@baidu.com>
---
 .../spark/sql/connect/client/GrpcExceptionConverter.scala    |  4 ++--
 .../main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala | 10 ++++------
 .../scala/org/apache/spark/deploy/FaultToleranceTest.scala   |  5 +++--
 .../org/apache/spark/deploy/StandaloneResourceUtils.scala    |  4 ++--
 .../apache/spark/executor/CoarseGrainedExecutorBackend.scala |  3 ---
 .../org/apache/spark/resource/ResourceInformation.scala      |  6 +++---
 .../main/scala/org/apache/spark/resource/ResourceUtils.scala |  4 ++--
 .../main/scala/org/apache/spark/status/AppStatusSource.scala |  2 +-
 .../apache/spark/storage/BlockManagerMasterEndpoint.scala    |  5 +++--
 .../apache/spark/storage/BlockManagerStorageEndpoint.scala   |  5 +++--
 core/src/main/scala/org/apache/spark/util/ThreadUtils.scala  |  2 +-
 .../test/scala/org/apache/spark/ContextCleanerSuite.scala    |  2 +-
 .../scala/org/apache/spark/deploy/worker/WorkerSuite.scala   |  4 ++--
 .../spark/executor/CoarseGrainedExecutorBackendSuite.scala   |  4 ++--
 .../scala/org/apache/spark/memory/MemoryManagerSuite.scala   |  4 ++--
 .../org/apache/spark/storage/BlockInfoManagerSuite.scala     |  4 ++--
 .../src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala |  2 +-
 core/src/test/scala/org/apache/spark/util/KeyLockSuite.scala |  4 ++--
 .../org/apache/spark/examples/sql/SparkSQLExample.scala      |  5 +++--
 .../src/main/scala/org/apache/spark/ml/clustering/LDA.scala  |  4 ++--
 .../org/apache/spark/ml/linalg/JsonMatrixConverter.scala     |  4 ++--
 .../org/apache/spark/ml/linalg/JsonVectorConverter.scala     |  4 ++--
 mllib/src/main/scala/org/apache/spark/ml/param/params.scala  | 12 ++++++------
 .../org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala |  2 +-
 mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala  |  2 +-
 .../spark/mllib/classification/ClassificationModel.scala     |  4 ++--
 .../apache/spark/mllib/clustering/BisectingKMeansModel.scala |  4 ++--
 .../apache/spark/mllib/clustering/GaussianMixtureModel.scala |  4 ++--
 .../org/apache/spark/mllib/clustering/KMeansModel.scala      |  4 ++--
 .../scala/org/apache/spark/mllib/clustering/LDAModel.scala   |  6 +++---
 .../spark/mllib/clustering/PowerIterationClustering.scala    |  2 +-
 .../scala/org/apache/spark/mllib/feature/ChiSqSelector.scala |  2 +-
 .../main/scala/org/apache/spark/mllib/feature/Word2Vec.scala |  4 ++--
 .../src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala |  4 ++--
 .../main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala   |  4 ++--
 .../main/scala/org/apache/spark/mllib/linalg/Vectors.scala   |  4 ++--
 .../mllib/recommendation/MatrixFactorizationModel.scala      |  2 +-
 .../apache/spark/mllib/regression/IsotonicRegression.scala   |  2 +-
 .../org/apache/spark/mllib/regression/RegressionModel.scala  |  4 ++--
 .../apache/spark/mllib/tree/model/DecisionTreeModel.scala    |  2 +-
 .../apache/spark/mllib/tree/model/treeEnsembleModels.scala   |  2 +-
 .../scala/org/apache/spark/mllib/util/modelSaveLoad.scala    |  2 +-
 pom.xml                                                      |  4 ----
 project/SparkBuild.scala                                     |  2 --
 .../spark/scheduler/cluster/YarnSchedulerBackend.scala       |  4 ++--
 .../sql/catalyst/expressions/ObjectExpressionsSuite.scala    |  3 ++-
 .../spark/sql/catalyst/optimizer/ColumnPruningSuite.scala    |  3 ++-
 .../sql/catalyst/optimizer/EliminateMapObjectsSuite.scala    |  6 ++++--
 .../sql/catalyst/optimizer/EliminateSerializationSuite.scala |  5 +++--
 .../catalyst/optimizer/ObjectSerializerPruningSuite.scala    |  3 ++-
 .../catalyst/optimizer/TypedFilterOptimizationSuite.scala    |  3 ++-
 .../sql/catalyst/plans/logical/DistinctKeyVisitorSuite.scala |  3 ++-
 sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala   |  7 ++++---
 .../scala/org/apache/spark/sql/KeyValueGroupedDataset.scala  |  4 ++--
 .../spark/sql/execution/datasources/DataSourceUtils.scala    |  4 ++--
 .../python/ApplyInPandasWithStatePythonRunner.scala          |  2 +-
 .../org/apache/spark/sql/execution/streaming/CommitLog.scala |  4 ++--
 .../sql/execution/streaming/CompactibleFileStreamLog.scala   |  5 +++--
 .../spark/sql/execution/streaming/FileStreamSinkLog.scala    |  4 ----
 .../spark/sql/execution/streaming/FileStreamSourceLog.scala  |  5 -----
 .../sql/execution/streaming/FileStreamSourceOffset.scala     |  4 ++--
 .../spark/sql/execution/streaming/GroupStateImpl.scala       |  2 +-
 .../spark/sql/execution/streaming/HDFSMetadataLog.scala      |  5 +++--
 .../org/apache/spark/sql/execution/streaming/OffsetSeq.scala |  4 ++--
 .../spark/sql/execution/streaming/StreamMetadata.scala       |  4 ++--
 .../streaming/continuous/ContinuousTextSocketSource.scala    |  4 ++--
 .../execution/streaming/sources/ContinuousMemoryStream.scala |  6 +++---
 .../streaming/sources/RatePerMicroBatchStream.scala          |  4 ++--
 .../execution/streaming/state/OperatorStateMetadata.scala    |  5 +++--
 .../apache/spark/sql/execution/streaming/state/RocksDB.scala |  4 ++--
 .../sql/execution/streaming/state/RocksDBFileManager.scala   |  4 ++--
 .../scala/org/apache/spark/sql/DatasetAggregatorSuite.scala  |  2 +-
 .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala   |  2 +-
 .../spark/sql/streaming/StreamingQueryListenerSuite.scala    |  4 ++--
 .../org/apache/spark/sql/streaming/StreamingQuerySuite.scala |  4 ++--
 .../api/v1/sql/SqlResourceWithActualMetricsSuite.scala       |  2 +-
 .../sql/hive/thriftserver/HiveThriftServer2Suites.scala      |  4 ++--
 .../src/main/scala/org/apache/spark/streaming/Time.scala     |  2 +-
 .../spark/streaming/receiver/ReceivedBlockHandler.scala      |  6 +++---
 79 files changed, 151 insertions(+), 155 deletions(-)

diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
index f4254c75a2a8..c06498684fa6 100644
--- a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
+++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
@@ -24,7 +24,7 @@ import scala.reflect.ClassTag
 import com.google.rpc.ErrorInfo
 import io.grpc.{ManagedChannel, StatusRuntimeException}
 import io.grpc.protobuf.StatusProto
-import org.json4s.DefaultFormats
+import org.json4s.{DefaultFormats, Formats}
 import org.json4s.jackson.JsonMethods
 
 import org.apache.spark.{QueryContext, QueryContextType, SparkArithmeticException, SparkArrayIndexOutOfBoundsException, SparkDateTimeException, SparkException, SparkIllegalArgumentException, SparkNumberFormatException, SparkRuntimeException, SparkUnsupportedOperationException, SparkUpgradeException}
@@ -354,7 +354,7 @@ private[client] object GrpcExceptionConverter {
    * truncated error message.
    */
   private def errorInfoToThrowable(info: ErrorInfo, message: String): Throwable = {
-    implicit val formats = DefaultFormats
+    implicit val formats: Formats = DefaultFormats
     val classes =
       JsonMethods.parse(info.getMetadataOrDefault("classes", "[]")).extract[Array[String]]
     val errorClass = info.getMetadataOrDefault("errorClass", null)
diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
index 6dd5af2389a8..7f3eb0370a07 100644
--- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
+++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/JsonUtils.scala
@@ -21,14 +21,14 @@ import scala.collection.mutable.HashMap
 import scala.util.control.NonFatal
 
 import org.apache.kafka.common.TopicPartition
-import org.json4s.NoTypeHints
+import org.json4s.{Formats, NoTypeHints}
 import org.json4s.jackson.Serialization
 
 /**
  * Utilities for converting Kafka related objects to and from json.
  */
 private object JsonUtils {
-  private implicit val formats = Serialization.formats(NoTypeHints)
+  private implicit val formats: Formats = Serialization.formats(NoTypeHints)
 
   /**
    * Read TopicPartitions from json string
@@ -96,10 +96,8 @@ private object JsonUtils {
    */
   def partitionOffsets(partitionOffsets: Map[TopicPartition, Long]): String = {
     val result = new HashMap[String, HashMap[Int, Long]]()
-    implicit val order = new Ordering[TopicPartition] {
-      override def compare(x: TopicPartition, y: TopicPartition): Int = {
-        Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition))
-      }
+    implicit val order: Ordering[TopicPartition] = (x: TopicPartition, y: TopicPartition) => {
+      Ordering.Tuple2[String, Int].compare((x.topic, x.partition), (y.topic, y.partition))
     }
     val partitions = partitionOffsets.keySet.toSeq.sorted  // sort for more determinism
     partitions.foreach { tp =>
diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
index 7209e2c373ab..d6a50ff84f56 100644
--- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
@@ -30,6 +30,7 @@ import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent.duration._
 import scala.sys.process._
 
+import org.json4s.Formats
 import org.json4s.jackson.JsonMethods
 
 import org.apache.spark.{SparkConf, SparkContext}
@@ -340,7 +341,7 @@ private object FaultToleranceTest extends App with Logging {
 private class TestMasterInfo(val ip: String, val dockerId: DockerId, val logFile: File)
   extends Logging  {
 
-  implicit val formats = org.json4s.DefaultFormats
+  implicit val formats: Formats = org.json4s.DefaultFormats
   var state: RecoveryState.Value = _
   var liveWorkerIPs: List[String] = _
   var numLiveApps = 0
@@ -383,7 +384,7 @@ private class TestMasterInfo(val ip: String, val dockerId: DockerId, val logFile
 private class TestWorkerInfo(val ip: String, val dockerId: DockerId, val logFile: File)
   extends Logging {
 
-  implicit val formats = org.json4s.DefaultFormats
+  implicit val formats: Formats = org.json4s.DefaultFormats
 
   logDebug("Created worker: " + this)
 
diff --git a/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala b/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala
index db9f5c516cf6..509049550ad4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala
@@ -23,7 +23,7 @@ import java.nio.file.Files
 import scala.collection.mutable
 import scala.util.control.NonFatal
 
-import org.json4s.{DefaultFormats, Extraction}
+import org.json4s.{DefaultFormats, Extraction, Formats}
 import org.json4s.jackson.JsonMethods.{compact, render}
 
 import org.apache.spark.SparkException
@@ -115,7 +115,7 @@ private[spark] object StandaloneResourceUtils extends Logging {
   private def writeResourceAllocationJson[T](
       allocations: Seq[T],
       jsonFile: File): Unit = {
-    implicit val formats = DefaultFormats
+    implicit val formats: Formats = DefaultFormats
     val allocationJson = Extraction.decompose(allocations)
     Files.write(jsonFile.toPath, compact(render(allocationJson)).getBytes())
   }
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index f964e2b50b57..f1a9aa353e76 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -27,7 +27,6 @@ import scala.util.{Failure, Success}
 import scala.util.control.NonFatal
 
 import io.netty.util.internal.PlatformDependent
-import org.json4s.DefaultFormats
 
 import org.apache.spark._
 import org.apache.spark.TaskState.TaskState
@@ -60,8 +59,6 @@ private[spark] class CoarseGrainedExecutorBackend(
 
   import CoarseGrainedExecutorBackend._
 
-  private implicit val formats = DefaultFormats
-
   private[spark] val stopping = new AtomicBoolean(false)
   var executor: Executor = null
   @volatile var driver: Option[RpcEndpointRef] = None
diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceInformation.scala b/core/src/main/scala/org/apache/spark/resource/ResourceInformation.scala
index 51b6f19513ba..c7cce05f46f4 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceInformation.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceInformation.scala
@@ -19,7 +19,7 @@ package org.apache.spark.resource
 
 import scala.util.control.NonFatal
 
-import org.json4s.{DefaultFormats, Extraction, JValue}
+import org.json4s.{DefaultFormats, Extraction, Formats, JValue}
 import org.json4s.jackson.JsonMethods._
 
 import org.apache.spark.SparkException
@@ -71,7 +71,7 @@ private[spark] object ResourceInformation {
    * Parses a JSON string into a [[ResourceInformation]] instance.
    */
   def parseJson(json: String): ResourceInformation = {
-    implicit val formats = DefaultFormats
+    implicit val formats: Formats = DefaultFormats
     try {
       parse(json).extract[ResourceInformationJson].toResourceInformation
     } catch {
@@ -82,7 +82,7 @@ private[spark] object ResourceInformation {
   }
 
   def parseJson(json: JValue): ResourceInformation = {
-    implicit val formats = DefaultFormats
+    implicit val formats: Formats = DefaultFormats
     try {
       json.extract[ResourceInformationJson].toResourceInformation
     } catch {
diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
index fe08e8337f76..a6f2ac35af7a 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
@@ -22,7 +22,7 @@ import java.util.Optional
 
 import scala.util.control.NonFatal
 
-import org.json4s.DefaultFormats
+import org.json4s.{DefaultFormats, Formats}
 import org.json4s.jackson.JsonMethods._
 
 import org.apache.spark.{SparkConf, SparkException}
@@ -253,7 +253,7 @@ private[spark] object ResourceUtils extends Logging {
 
   def parseAllocatedFromJsonFile(resourcesFile: String): Seq[ResourceAllocation] = {
     withResourcesJson[ResourceAllocation](resourcesFile) { json =>
-      implicit val formats = DefaultFormats
+      implicit val formats: Formats = DefaultFormats
       parse(json).extract[Seq[ResourceAllocation]]
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusSource.scala b/core/src/main/scala/org/apache/spark/status/AppStatusSource.scala
index d19744db089b..96dc5ac44b47 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusSource.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusSource.scala
@@ -31,7 +31,7 @@ private [spark] class JobDuration(val value: AtomicLong) extends Gauge[Long] {
 
 private[spark] class AppStatusSource extends Source {
 
-  override implicit val metricRegistry = new MetricRegistry()
+  override implicit val metricRegistry: MetricRegistry = new MetricRegistry()
 
   override val sourceName = "appStatus"
 
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 17cd0891532b..b4920c7cb841 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -22,7 +22,7 @@ import java.util.{HashMap => JHashMap}
 import java.util.concurrent.TimeUnit
 
 import scala.collection.mutable
-import scala.concurrent.{ExecutionContext, Future, TimeoutException}
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future, TimeoutException}
 import scala.jdk.CollectionConverters._
 import scala.util.Random
 import scala.util.control.NonFatal
@@ -100,7 +100,8 @@ class BlockManagerMasterEndpoint(
 
   private val askThreadPool =
     ThreadUtils.newDaemonCachedThreadPool("block-manager-ask-thread-pool", 100)
-  private implicit val askExecutionContext = ExecutionContext.fromExecutorService(askThreadPool)
+  private implicit val askExecutionContext: ExecutionContextExecutorService =
+    ExecutionContext.fromExecutorService(askThreadPool)
 
   private val topologyMapper = {
     val topologyMapperClassName = conf.get(
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala
index 476be80e67df..5cc08714d41c 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.storage
 
-import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future}
 
 import org.apache.spark.{MapOutputTracker, SparkEnv}
 import org.apache.spark.internal.Logging
@@ -38,7 +38,8 @@ class BlockManagerStorageEndpoint(
 
   private val asyncThreadPool =
     ThreadUtils.newDaemonCachedThreadPool("block-manager-storage-async-thread-pool", 100)
-  private implicit val asyncExecutionContext = ExecutionContext.fromExecutorService(asyncThreadPool)
+  private implicit val asyncExecutionContext: ExecutionContextExecutorService =
+    ExecutionContext.fromExecutorService(asyncThreadPool)
 
   // Operations that involve removing blocks may be slow and should be done asynchronously
   override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
index 2d3d6ec89ffb..5addac737192 100644
--- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
@@ -379,7 +379,7 @@ private[spark] object ThreadUtils {
   def parmap[I, O](in: Seq[I], prefix: String, maxThreads: Int)(f: I => O): Seq[O] = {
     val pool = newForkJoinPool(prefix, maxThreads)
     try {
-      implicit val ec = ExecutionContext.fromExecutor(pool)
+      implicit val ec: ExecutionContextExecutor = ExecutionContext.fromExecutor(pool)
 
       val futures = in.map(x => Future(f(x)))
       val futureSeq = Future.sequence(futures)
diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
index b78bb1f57564..05709c9bdd75 100644
--- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
@@ -42,7 +42,7 @@ import org.apache.spark.storage._
 abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[SortShuffleManager])
   extends SparkFunSuite with BeforeAndAfter with LocalSparkContext
 {
-  implicit val defaultTimeout = timeout(10.seconds)
+  implicit val defaultTimeout: PatienceConfiguration.Timeout = timeout(10.seconds)
   val conf = new SparkConf()
     .setMaster("local[2]")
     .setAppName("ContextCleanerSuite")
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
index 1b2d92af4b02..f3bae2066e14 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
@@ -23,7 +23,7 @@ import java.util.function.Supplier
 
 import scala.concurrent.duration._
 
-import org.json4s.{DefaultFormats, Extraction}
+import org.json4s.{DefaultFormats, Extraction, Formats}
 import org.mockito.{Mock, MockitoAnnotations}
 import org.mockito.Answers.RETURNS_SMART_NULLS
 import org.mockito.ArgumentMatchers.any
@@ -60,7 +60,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter with P
   }
   def conf(opts: (String, String)*): SparkConf = new SparkConf(loadDefaults = false).setAll(opts)
 
-  implicit val formats = DefaultFormats
+  implicit val formats: Formats = DefaultFormats
 
   private val _generateWorkerId = PrivateMethod[String](Symbol("generateWorkerId"))
 
diff --git a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
index 35fe0b0d1c90..45c27aea6022 100644
--- a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
@@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger
 import scala.collection.concurrent.TrieMap
 import scala.concurrent.duration._
 
-import org.json4s.{DefaultFormats, Extraction}
+import org.json4s.{DefaultFormats, Extraction, Formats}
 import org.json4s.JsonAST.{JArray, JObject}
 import org.json4s.JsonDSL._
 import org.mockito.ArgumentMatchers.any
@@ -50,7 +50,7 @@ import org.apache.spark.util.{SerializableBuffer, ThreadUtils, Utils}
 class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
     with LocalSparkContext with MockitoSugar {
 
-  implicit val formats = DefaultFormats
+  implicit val formats: Formats = DefaultFormats
 
   def createSparkConf(): SparkConf = {
     new SparkConf()
diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
index 642216a7a471..a6f1707a1aab 100644
--- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.memory
 import java.util.concurrent.atomic.AtomicLong
 
 import scala.collection.mutable
-import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
 import scala.concurrent.duration.Duration
 
 import org.mockito.ArgumentMatchers.{any, anyLong}
@@ -148,7 +148,7 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite {
   // -- Tests of sharing of execution memory between tasks ----------------------------------------
   // Prior to Spark 1.6, these tests were part of ShuffleMemoryManagerSuite.
 
-  implicit val ec = ExecutionContext.global
+  implicit val ec: ExecutionContextExecutor = ExecutionContext.global
 
   test("single task requesting on-heap execution memory") {
     val manager = createMemoryManager(1000L)
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
index f133a38269d7..f7c7ca2bd936 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.storage
 
 import java.util.Properties
 
-import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
 import scala.language.implicitConversions
 import scala.reflect.ClassTag
 
@@ -31,7 +31,7 @@ import org.apache.spark.util.ThreadUtils
 
 class BlockInfoManagerSuite extends SparkFunSuite {
 
-  private implicit val ec = ExecutionContext.global
+  private implicit val ec: ExecutionContextExecutor = ExecutionContext.global
   private var blockInfoManager: BlockInfoManager = _
 
   override protected def beforeEach(): Unit = {
diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
index a2d1293fc5f5..8689a9c16d69 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
@@ -83,7 +83,7 @@ private[spark] class SparkUICssErrorHandler extends DefaultCssErrorHandler {
 class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers {
 
   implicit var webDriver: WebDriver = _
-  implicit val formats = DefaultFormats
+  implicit val formats: Formats = DefaultFormats
 
 
   override def beforeAll(): Unit = {
diff --git a/core/src/test/scala/org/apache/spark/util/KeyLockSuite.scala b/core/src/test/scala/org/apache/spark/util/KeyLockSuite.scala
index 6888e492a8d3..6902493dc3c5 100644
--- a/core/src/test/scala/org/apache/spark/util/KeyLockSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/KeyLockSuite.scala
@@ -22,14 +22,14 @@ import java.util.concurrent.atomic.AtomicInteger
 
 import scala.concurrent.duration._
 
-import org.scalatest.concurrent.{ThreadSignaler, TimeLimits}
+import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
 
 import org.apache.spark.SparkFunSuite
 
 class KeyLockSuite extends SparkFunSuite with TimeLimits {
 
   // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x
-  private implicit val defaultSignaler = ThreadSignaler
+  private implicit val defaultSignaler: Signaler = ThreadSignaler
 
   private val foreverMs = 60 * 1000L
 
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala
index b17b86c08314..4957c3d3f6c1 100644
--- a/examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.examples.sql
 
 // $example on:programmatic_schema$
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{Encoder, Row}
 // $example off:programmatic_schema$
 // $example on:init_session$
 import org.apache.spark.sql.SparkSession
@@ -220,7 +220,8 @@ object SparkSQLExample {
     // +------------+
 
     // No pre-defined encoders for Dataset[Map[K,V]], define explicitly
-    implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
+    implicit val mapEncoder: Encoder[Map[String, Any]] =
+      org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
     // Primitive types and case classes can be also defined as
     // implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()
 
diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
index 2dac53fe5169..3f6bdda9e050 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
@@ -22,7 +22,7 @@ import java.util.Locale
 import breeze.linalg.normalize
 import breeze.numerics.exp
 import org.apache.hadoop.fs.Path
-import org.json4s.DefaultFormats
+import org.json4s.{DefaultFormats, Formats}
 import org.json4s.JsonAST.JObject
 import org.json4s.jackson.JsonMethods._
 
@@ -385,7 +385,7 @@ private object LDAParams {
   def getAndSetParams(model: LDAParams, metadata: Metadata): Unit = {
     VersionUtils.majorMinorVersion(metadata.sparkVersion) match {
       case (1, 6) =>
-        implicit val format = DefaultFormats
+        implicit val format: Formats = DefaultFormats
         metadata.params match {
           case JObject(pairs) =>
             pairs.foreach { case (paramName, jsonValue) =>
diff --git a/mllib/src/main/scala/org/apache/spark/ml/linalg/JsonMatrixConverter.scala b/mllib/src/main/scala/org/apache/spark/ml/linalg/JsonMatrixConverter.scala
index c58306470e2f..3882a97e0bb4 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/linalg/JsonMatrixConverter.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/linalg/JsonMatrixConverter.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.ml.linalg
 
-import org.json4s.DefaultFormats
+import org.json4s.{DefaultFormats, Formats}
 import org.json4s.JsonDSL._
 import org.json4s.jackson.JsonMethods.{compact, parse => parseJson, render}
 
@@ -31,7 +31,7 @@ private[ml] object JsonMatrixConverter {
    * Parses the JSON representation of a Matrix into a [[Matrix]].
    */
   def fromJson(json: String): Matrix = {
-    implicit val formats = DefaultFormats
+    implicit val formats: Formats = DefaultFormats
     val jValue = parseJson(json)
     (jValue \ "type").extract[Int] match {
       case 0 => // sparse
diff --git a/mllib/src/main/scala/org/apache/spark/ml/linalg/JsonVectorConverter.scala b/mllib/src/main/scala/org/apache/spark/ml/linalg/JsonVectorConverter.scala
index 129a99c4e400..3e1dff58764d 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/linalg/JsonVectorConverter.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/linalg/JsonVectorConverter.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.ml.linalg
 
-import org.json4s.DefaultFormats
+import org.json4s.{DefaultFormats, Formats}
 import org.json4s.JsonDSL._
 import org.json4s.jackson.JsonMethods.{compact, parse => parseJson, render}
 
@@ -29,7 +29,7 @@ private[ml] object JsonVectorConverter {
    * Parses the JSON representation of a vector into a [[Vector]].
    */
   def fromJson(json: String): Vector = {
-    implicit val formats = DefaultFormats
+    implicit val formats: Formats = DefaultFormats
     val jValue = parseJson(json)
     (jValue \ "type").extract[Int] match {
       case 0 => // sparse
diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala
index d97f92355bdc..055c1c4d4228 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/param/params.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/param/params.scala
@@ -130,7 +130,7 @@ private[ml] object Param {
       case JObject(v) =>
         val keys = v.map(_._1)
         if (keys.contains("class")) {
-          implicit val formats = DefaultFormats
+          implicit val formats: Formats = DefaultFormats
           val className = (jValue \ "class").extract[String]
           className match {
             case JsonMatrixConverter.className =>
@@ -399,7 +399,7 @@ class IntParam(parent: String, name: String, doc: String, isValid: Int => Boolea
   }
 
   override def jsonDecode(json: String): Int = {
-    implicit val formats = DefaultFormats
+    implicit val formats: Formats = DefaultFormats
     parse(json).extract[Int]
   }
 }
@@ -485,7 +485,7 @@ class LongParam(parent: String, name: String, doc: String, isValid: Long => Bool
   }
 
   override def jsonDecode(json: String): Long = {
-    implicit val formats = DefaultFormats
+    implicit val formats: Formats = DefaultFormats
     parse(json).extract[Long]
   }
 }
@@ -506,7 +506,7 @@ class BooleanParam(parent: String, name: String, doc: String) // No need for isV
   }
 
   override def jsonDecode(json: String): Boolean = {
-    implicit val formats = DefaultFormats
+    implicit val formats: Formats = DefaultFormats
     parse(json).extract[Boolean]
   }
 }
@@ -529,7 +529,7 @@ class StringArrayParam(parent: Params, name: String, doc: String, isValid: Array
   }
 
   override def jsonDecode(json: String): Array[String] = {
-    implicit val formats = DefaultFormats
+    implicit val formats: Formats = DefaultFormats
     parse(json).extract[Seq[String]].toArray
   }
 }
@@ -619,7 +619,7 @@ class IntArrayParam(parent: Params, name: String, doc: String, isValid: Array[In
   }
 
   override def jsonDecode(json: String): Array[Int] = {
-    implicit val formats = DefaultFormats
+    implicit val formats: Formats = DefaultFormats
     parse(json).extract[Seq[Int]].toArray
   }
 }
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala
index 1d9316bfc384..7eef3ced422e 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/AFTSurvivalRegressionWrapper.scala
@@ -138,7 +138,7 @@ private[r] object AFTSurvivalRegressionWrapper extends MLReadable[AFTSurvivalReg
   class AFTSurvivalRegressionWrapperReader extends MLReader[AFTSurvivalRegressionWrapper] {
 
     override def load(path: String): AFTSurvivalRegressionWrapper = {
-      implicit val format = DefaultFormats
+      implicit val format: Formats = DefaultFormats
       val rMetadataPath = new Path(path, "rMetadata").toString
       val pipelinePath = new Path(path, "pipeline").toString
 
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala
index ad13cced4667..125cdf7259fe 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/ALSWrapper.scala
@@ -103,7 +103,7 @@ private[r] object ALSWrapper extends MLReadable[ALSWrapper] {
   class ALSWrapperReader extends MLReader[ALSWrapper] {
 
     override def load(path: String): ALSWrapper = {
-      implicit val format = DefaultFormats
+      implicit val format: Formats = DefaultFormats
       val rMetadataPath = new Path(path, "rMetadata").toString
       val modelPath = new Path(path, "model").toString
 
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/ClassificationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/ClassificationModel.scala
index 5161bc72659c..ad7435ce5be7 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/classification/ClassificationModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/ClassificationModel.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.mllib.classification
 
-import org.json4s.{DefaultFormats, JValue}
+import org.json4s.{DefaultFormats, Formats, JValue}
 
 import org.apache.spark.annotation.Since
 import org.apache.spark.api.java.JavaRDD
@@ -65,7 +65,7 @@ private[mllib] object ClassificationModel {
    * @return (numFeatures, numClasses)
    */
   def getNumFeaturesClasses(metadata: JValue): (Int, Int) = {
-    implicit val formats = DefaultFormats
+    implicit val formats: Formats = DefaultFormats
     ((metadata \ "numFeatures").extract[Int], (metadata \ "numClasses").extract[Int])
   }
 }
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala
index ceaae27d8342..9f3aad923897 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala
@@ -224,7 +224,7 @@ object BisectingKMeansModel extends Loader[BisectingKMeansModel] {
     }
 
     def load(sc: SparkContext, path: String): BisectingKMeansModel = {
-      implicit val formats: DefaultFormats = DefaultFormats
+      implicit val formats: Formats = DefaultFormats
       val (className, formatVersion, metadata) = Loader.loadMetadata(sc, path)
       assert(className == thisClassName)
       assert(formatVersion == thisFormatVersion)
@@ -262,7 +262,7 @@ object BisectingKMeansModel extends Loader[BisectingKMeansModel] {
     }
 
     def load(sc: SparkContext, path: String): BisectingKMeansModel = {
-      implicit val formats: DefaultFormats = DefaultFormats
+      implicit val formats: Formats = DefaultFormats
       val (className, formatVersion, metadata) = Loader.loadMetadata(sc, path)
       assert(className == thisClassName)
       assert(formatVersion == thisFormatVersion)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
index 42ec37f438f4..8982a8ca7c6c 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.mllib.clustering
 
 import breeze.linalg.{DenseVector => BreezeVector}
-import org.json4s.DefaultFormats
+import org.json4s.{DefaultFormats, Formats}
 import org.json4s.JsonDSL._
 import org.json4s.jackson.JsonMethods._
 
@@ -177,7 +177,7 @@ object GaussianMixtureModel extends Loader[GaussianMixtureModel] {
   @Since("1.4.0")
   override def load(sc: SparkContext, path: String): GaussianMixtureModel = {
     val (loadedClassName, version, metadata) = Loader.loadMetadata(sc, path)
-    implicit val formats = DefaultFormats
+    implicit val formats: Formats = DefaultFormats
     val k = (metadata \ "k").extract[Int]
     val classNameV1_0 = SaveLoadV1_0.classNameV1_0
     (loadedClassName, version) match {
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
index c4ea263ee116..476df64581f7 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
@@ -181,7 +181,7 @@ object KMeansModel extends Loader[KMeansModel] {
     }
 
     def load(sc: SparkContext, path: String): KMeansModel = {
-      implicit val formats = DefaultFormats
+      implicit val formats: Formats = DefaultFormats
       val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
       val (className, formatVersion, metadata) = Loader.loadMetadata(sc, path)
       assert(className == thisClassName)
@@ -216,7 +216,7 @@ object KMeansModel extends Loader[KMeansModel] {
     }
 
     def load(sc: SparkContext, path: String): KMeansModel = {
-      implicit val formats = DefaultFormats
+      implicit val formats: Formats = DefaultFormats
       val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
       val (className, formatVersion, metadata) = Loader.loadMetadata(sc, path)
       assert(className == thisClassName)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
index 7891159cf44d..831c7a9316fd 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
@@ -20,7 +20,7 @@ package org.apache.spark.mllib.clustering
 import breeze.linalg.{argmax, argtopk, normalize, sum, DenseMatrix => BDM, DenseVector => BDV}
 import breeze.numerics.{exp, lgamma}
 import org.apache.hadoop.fs.Path
-import org.json4s.DefaultFormats
+import org.json4s.{DefaultFormats, Formats}
 import org.json4s.JsonDSL._
 import org.json4s.jackson.JsonMethods._
 
@@ -497,7 +497,7 @@ object LocalLDAModel extends Loader[LocalLDAModel] {
   @Since("1.5.0")
   override def load(sc: SparkContext, path: String): LocalLDAModel = {
     val (loadedClassName, loadedVersion, metadata) = Loader.loadMetadata(sc, path)
-    implicit val formats = DefaultFormats
+    implicit val formats: Formats = DefaultFormats
     val expectedK = (metadata \ "k").extract[Int]
     val expectedVocabSize = (metadata \ "vocabSize").extract[Int]
     val docConcentration =
@@ -924,7 +924,7 @@ object DistributedLDAModel extends Loader[DistributedLDAModel] {
   @Since("1.5.0")
   override def load(sc: SparkContext, path: String): DistributedLDAModel = {
     val (loadedClassName, loadedVersion, metadata) = Loader.loadMetadata(sc, path)
-    implicit val formats = DefaultFormats
+    implicit val formats: Formats = DefaultFormats
     val expectedK = (metadata \ "k").extract[Int]
     val vocabSize = (metadata \ "vocabSize").extract[Int]
     val docConcentration =
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
index ba541bbcccd2..12c7ae5066c8 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
@@ -79,7 +79,7 @@ object PowerIterationClusteringModel extends Loader[PowerIterationClusteringMode
 
     @Since("1.4.0")
     def load(sc: SparkContext, path: String): PowerIterationClusteringModel = {
-      implicit val formats = DefaultFormats
+      implicit val formats: Formats = DefaultFormats
       val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
 
       val (className, formatVersion, metadata) = Loader.loadMetadata(sc, path)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala
index a1b00bccbc34..41eb6567b845 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala
@@ -147,7 +147,7 @@ object ChiSqSelectorModel extends Loader[ChiSqSelectorModel] {
     }
 
     def load(sc: SparkContext, path: String): ChiSqSelectorModel = {
-      implicit val formats = DefaultFormats
+      implicit val formats: Formats = DefaultFormats
       val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
       val (className, formatVersion, metadata) = Loader.loadMetadata(sc, path)
       assert(className == thisClassName)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
index d6a493a011b9..0dddbec8a7ed 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
@@ -23,7 +23,7 @@ import scala.collection.mutable
 import scala.jdk.CollectionConverters._
 
 import com.google.common.collect.{Ordering => GuavaOrdering}
-import org.json4s.DefaultFormats
+import org.json4s.{DefaultFormats, Formats}
 import org.json4s.JsonDSL._
 import org.json4s.jackson.JsonMethods._
 
@@ -704,7 +704,7 @@ object Word2VecModel extends Loader[Word2VecModel] {
   override def load(sc: SparkContext, path: String): Word2VecModel = {
 
     val (loadedClassName, loadedVersion, metadata) = Loader.loadMetadata(sc, path)
-    implicit val formats = DefaultFormats
+    implicit val formats: Formats = DefaultFormats
     val expectedVectorSize = (metadata \ "vectorSize").extract[Int]
     val expectedNumWords = (metadata \ "numWords").extract[Int]
     val classNameV1_0 = SaveLoadV1_0.classNameV1_0
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala
index 856baa65b0d7..246146a831f8 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala
@@ -25,7 +25,7 @@ import scala.jdk.CollectionConverters._
 import scala.reflect.ClassTag
 import scala.reflect.runtime.universe._
 
-import org.json4s.DefaultFormats
+import org.json4s.{DefaultFormats, Formats}
 import org.json4s.JsonDSL._
 import org.json4s.jackson.JsonMethods.{compact, render}
 
@@ -127,7 +127,7 @@ object FPGrowthModel extends Loader[FPGrowthModel[_]] {
     }
 
     def load(sc: SparkContext, path: String): FPGrowthModel[_] = {
-      implicit val formats = DefaultFormats
+      implicit val formats: Formats = DefaultFormats
       val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
 
       val (className, formatVersion, metadata) = Loader.loadMetadata(sc, path)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
index f2b7151feb16..59d22f0eac99 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
@@ -25,7 +25,7 @@ import scala.jdk.CollectionConverters._
 import scala.reflect.ClassTag
 import scala.reflect.runtime.universe._
 
-import org.json4s.DefaultFormats
+import org.json4s.{DefaultFormats, Formats}
 import org.json4s.JsonDSL._
 import org.json4s.jackson.JsonMethods.{compact, render}
 
@@ -670,7 +670,7 @@ object PrefixSpanModel extends Loader[PrefixSpanModel[_]] {
     }
 
     def load(sc: SparkContext, path: String): PrefixSpanModel[_] = {
-      implicit val formats = DefaultFormats
+      implicit val formats: Formats = DefaultFormats
       val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
 
       val (className, formatVersion, metadata) = Loader.loadMetadata(sc, path)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
index 8626cffb2bc6..47209a5c9aa9 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
@@ -25,7 +25,7 @@ import scala.jdk.CollectionConverters._
 import scala.language.implicitConversions
 
 import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => BV}
-import org.json4s.DefaultFormats
+import org.json4s.{DefaultFormats, Formats}
 import org.json4s.JsonDSL._
 import org.json4s.jackson.JsonMethods.{compact, parse => parseJson, render}
 
@@ -432,7 +432,7 @@ object Vectors {
    */
   @Since("1.6.0")
   def fromJson(json: String): Vector = {
-    implicit val formats = DefaultFormats
+    implicit val formats: Formats = DefaultFormats
     val jValue = parseJson(json)
     (jValue \ "type").extract[Int] match {
       case 0 => // sparse
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
index 0cf0a94bb3bf..9ffee8832db9 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
@@ -392,7 +392,7 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {
     }
 
     def load(sc: SparkContext, path: String): MatrixFactorizationModel = {
-      implicit val formats = DefaultFormats
+      implicit val formats: Formats = DefaultFormats
       val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
       val (className, formatVersion, metadata) = loadMetadata(sc, path)
       assert(className == thisClassName)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
index a21baf1b1fdc..e957d8ebd74a 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
@@ -210,7 +210,7 @@ object IsotonicRegressionModel extends Loader[IsotonicRegressionModel] {
 
   @Since("1.4.0")
   override def load(sc: SparkContext, path: String): IsotonicRegressionModel = {
-    implicit val formats = DefaultFormats
+    implicit val formats: Formats = DefaultFormats
     val (loadedClassName, version, metadata) = loadMetadata(sc, path)
     val isotonic = (metadata \ "isotonic").extract[Boolean]
     val classNameV1_0 = SaveLoadV1_0.thisClassName
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/RegressionModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/RegressionModel.scala
index a95a54225a08..0e2dbe43e45b 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/RegressionModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/RegressionModel.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.mllib.regression
 
-import org.json4s.{DefaultFormats, JValue}
+import org.json4s.{DefaultFormats, Formats, JValue}
 
 import org.apache.spark.annotation.Since
 import org.apache.spark.api.java.JavaRDD
@@ -64,7 +64,7 @@ private[mllib] object RegressionModel {
    * @return numFeatures
    */
   def getNumFeatures(metadata: JValue): Int = {
-    implicit val formats = DefaultFormats
+    implicit val formats: Formats = DefaultFormats
     (metadata \ "numFeatures").extract[Int]
   }
 }
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
index eec7cb5b90ea..c282dc59fa8d 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
@@ -313,7 +313,7 @@ object DecisionTreeModel extends Loader[DecisionTreeModel] with Logging {
    */
   @Since("1.3.0")
   override def load(sc: SparkContext, path: String): DecisionTreeModel = {
-    implicit val formats = DefaultFormats
+    implicit val formats: Formats = DefaultFormats
     val (loadedClassName, version, metadata) = Loader.loadMetadata(sc, path)
     val algo = (metadata \ "algo").extract[String]
     val numNodes = (metadata \ "numNodes").extract[Int]
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala
index b403770f5616..579d6b77f62c 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala
@@ -440,7 +440,7 @@ private[tree] object TreeEnsembleModel extends Logging {
      * Read metadata from the loaded JSON metadata.
      */
     def readMetadata(metadata: JValue): Metadata = {
-      implicit val formats = DefaultFormats
+      implicit val formats: Formats = DefaultFormats
       (metadata \ "metadata").extract[Metadata]
     }
 
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/modelSaveLoad.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/modelSaveLoad.scala
index c13bc4099ce7..74e8ae75caf3 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/modelSaveLoad.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/modelSaveLoad.scala
@@ -117,7 +117,7 @@ private[mllib] object Loader {
    * @return (class name, version, metadata)
    */
   def loadMetadata(sc: SparkContext, path: String): (String, String, JValue) = {
-    implicit val formats = DefaultFormats
+    implicit val formats: Formats = DefaultFormats
     val metadata = parse(sc.textFile(metadataPath(path)).first())
     val clazz = (metadata \ "class").extract[String]
     val version = (metadata \ "version").extract[String]
diff --git a/pom.xml b/pom.xml
index 2ff934a2f767..7f072062ee89 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2994,10 +2994,6 @@
                 `procedure syntax is deprecated`
               -->
               <arg>-Wconf:cat=deprecation&amp;msg=procedure syntax is deprecated:e</arg>
-              <!--
-                SPARK-40497 Upgrade Scala to 2.13.11 and suppress `Implicit definition should have explicit type`
-              -->
-              <arg>-Wconf:msg=Implicit definition should have explicit type:s</arg>
               <!--
                 SPARK-45627 Symbol literals are deprecated in Scala 2.13 and it's a compile error in Scala 3.
               -->
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 72ea06a8d050..993509ebaee8 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -247,8 +247,6 @@ object SparkBuild extends PomBuild {
         "-Wconf:cat=deprecation&msg=Auto-application to \\`\\(\\)\\` is deprecated&site=org.apache.spark.streaming.kafka010.KafkaRDDSuite:s",
         // SPARK-35574 Prevent the recurrence of compilation warnings related to `procedure syntax is deprecated`
         "-Wconf:cat=deprecation&msg=procedure syntax is deprecated:e",
-        // SPARK-40497 Upgrade Scala to 2.13.11 and suppress `Implicit definition should have explicit type`
-        "-Wconf:msg=Implicit definition should have explicit type:s",
         // SPARK-45627 Symbol literals are deprecated in Scala 2.13 and it's a compile error in Scala 3.
         "-Wconf:cat=deprecation&msg=symbol literal is deprecated:e",
         // SPARK-45627 `enum`, `export` and `given` will become keywords in Scala 3,
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index 1cfb3d955704..8b88d38f033b 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -64,14 +64,14 @@ private[spark] abstract class YarnSchedulerBackend(
   private val yarnSchedulerEndpointRef = rpcEnv.setupEndpoint(
     YarnSchedulerBackend.ENDPOINT_NAME, yarnSchedulerEndpoint)
 
-  private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf)
+  private implicit val askTimeout: RpcTimeout = RpcUtils.askRpcTimeout(sc.conf)
 
   /**
    * Declare implicit single thread execution context for futures doRequestTotalExecutors and
    * doKillExecutors below, avoiding using the global execution context that may cause conflict
    * with user code's execution of futures.
    */
-  private implicit val schedulerEndpointEC = ExecutionContext.fromExecutorService(
+  private implicit val schedulerEndpointEC: ExecutionContext = ExecutionContext.fromExecutorService(
       ThreadUtils.newDaemonSingleThreadExecutor("yarn-scheduler-endpoint"))
 
   /** Application ID. */
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
index 01ecebf1e6ab..a54f490dd146 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
@@ -492,7 +492,8 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
     }
   }
 
-  implicit private def mapIntStrEncoder = ExpressionEncoder[Map[Int, String]]()
+  implicit private def mapIntStrEncoder: ExpressionEncoder[Map[Int, String]] =
+    ExpressionEncoder[Map[Int, String]]()
 
   test("SPARK-23588 CatalystToExternalMap should support interpreted execution") {
     // To get a resolved `CatalystToExternalMap` expression, we build a deserializer plan
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
index f28df3839d0a..266c369894ec 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
@@ -423,7 +423,8 @@ class ColumnPruningSuite extends PlanTest {
     comparePlans(optimized, expected)
   }
 
-  implicit private def productEncoder[T <: Product : TypeTag] = ExpressionEncoder[T]()
+  implicit private def productEncoder[T <: Product : TypeTag]: ExpressionEncoder[T] =
+    ExpressionEncoder[T]()
   private val func = identity[Iterator[OtherTuple]] _
 
   test("Column pruning on MapPartitions") {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateMapObjectsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateMapObjectsSuite.scala
index 1c818eee1224..74cd91795528 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateMapObjectsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateMapObjectsSuite.scala
@@ -37,8 +37,10 @@ class EliminateMapObjectsSuite extends PlanTest {
     }
   }
 
-  implicit private def intArrayEncoder = ExpressionEncoder[Array[Int]]()
-  implicit private def doubleArrayEncoder = ExpressionEncoder[Array[Double]]()
+  implicit private def intArrayEncoder: ExpressionEncoder[Array[Int]] =
+    ExpressionEncoder[Array[Int]]()
+  implicit private def doubleArrayEncoder: ExpressionEncoder[Array[Double]] =
+    ExpressionEncoder[Array[Double]]()
 
   test("SPARK-20254: Remove unnecessary data conversion for primitive array") {
     val intObjType = ObjectType(classOf[Array[Int]])
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSerializationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSerializationSuite.scala
index 0d654cc1ac93..f1acac39a767 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSerializationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSerializationSuite.scala
@@ -35,8 +35,9 @@ class EliminateSerializationSuite extends PlanTest {
         EliminateSerialization) :: Nil
   }
 
-  implicit private def productEncoder[T <: Product : TypeTag] = ExpressionEncoder[T]()
-  implicit private def intEncoder = ExpressionEncoder[Int]()
+  implicit private def productEncoder[T <: Product : TypeTag]: ExpressionEncoder[T] =
+    ExpressionEncoder[T]()
+  implicit private def intEncoder: ExpressionEncoder[Int] = ExpressionEncoder[Int]()
 
   test("back to back serialization") {
     val input = LocalRelation($"obj".obj(classOf[(Int, Int)]))
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ObjectSerializerPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ObjectSerializerPruningSuite.scala
index a1039b051ce4..8c4daacd1b2f 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ObjectSerializerPruningSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ObjectSerializerPruningSuite.scala
@@ -40,7 +40,8 @@ class ObjectSerializerPruningSuite extends PlanTest {
       RemoveNoopOperators) :: Nil
   }
 
-  implicit private def productEncoder[T <: Product : TypeTag] = ExpressionEncoder[T]()
+  implicit private def productEncoder[T <: Product : TypeTag]: ExpressionEncoder[T] =
+    ExpressionEncoder[T]()
 
   test("collect struct types") {
     val dataTypes = Seq(
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala
index 4385777e79c0..fb29f6f17e75 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala
@@ -37,7 +37,8 @@ class TypedFilterOptimizationSuite extends PlanTest {
         CombineTypedFilters) :: Nil
   }
 
-  implicit private def productEncoder[T <: Product : TypeTag] = ExpressionEncoder[T]()
+  implicit private def productEncoder[T <: Product : TypeTag]: ExpressionEncoder[T] =
+    ExpressionEncoder[T]()
 
   val testRelation = LocalRelation($"_1".int, $"_2".int)
 
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitorSuite.scala
index acf62d07bc39..2324c33806d4 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitorSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/logical/DistinctKeyVisitorSuite.scala
@@ -47,7 +47,8 @@ class DistinctKeyVisitorSuite extends PlanTest {
     assert(plan.analyze.distinctKeys === distinctKeys)
   }
 
-  implicit private def productEncoder[T <: Product : TypeTag] = ExpressionEncoder[T]()
+  implicit private def productEncoder[T <: Product : TypeTag]: ExpressionEncoder[T] =
+    ExpressionEncoder[T]()
 
   test("Aggregate's distinct attributes") {
     checkDistinctAttributes(t1.groupBy($"a", $"b")($"a", $"b", 1), Set(ExpressionSet(Seq(a, b))))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 32d456c69174..293f20c453ae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -22,6 +22,7 @@ import java.io.{ByteArrayOutputStream, CharArrayWriter, DataOutputStream}
 import scala.annotation.varargs
 import scala.collection.mutable.{ArrayBuffer, HashSet}
 import scala.jdk.CollectionConverters._
+import scala.reflect.ClassTag
 import scala.reflect.runtime.universe.TypeTag
 import scala.util.control.NonFatal
 
@@ -241,7 +242,7 @@ class Dataset[T] private[sql](
     exprEnc.resolveAndBind(logicalPlan.output, sparkSession.sessionState.analyzer)
   }
 
-  private implicit def classTag = exprEnc.clsTag
+  private implicit def classTag: ClassTag[T] = exprEnc.clsTag
 
   // sqlContext must be val because a stable identifier is expected when you import implicits
   @transient lazy val sqlContext: SQLContext = sparkSession.sqlContext
@@ -1628,7 +1629,7 @@ class Dataset[T] private[sql](
    * @since 1.6.0
    */
   def select[U1](c1: TypedColumn[T, U1]): Dataset[U1] = withOrigin {
-    implicit val encoder = c1.encoder
+    implicit val encoder: ExpressionEncoder[U1] = c1.encoder
     val project = Project(c1.withInputType(exprEnc, logicalPlan.output).named :: Nil, logicalPlan)
 
     if (!encoder.isSerializedAsStructForTopLevel) {
@@ -3471,7 +3472,7 @@ class Dataset[T] private[sql](
    * @since 1.6.0
    */
   def map[U](func: MapFunction[T, U], encoder: Encoder[U]): Dataset[U] = withOrigin {
-    implicit val uEnc = encoder
+    implicit val uEnc: Encoder[U] = encoder
     withTypedPlan(MapElements[T, U](func, logicalPlan))
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
index 22dfed3ea4c5..83498740be40 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
@@ -43,8 +43,8 @@ class KeyValueGroupedDataset[K, V] private[sql](
     private val groupingAttributes: Seq[Attribute]) extends Serializable {
 
   // Similar to [[Dataset]], we turn the passed in encoder to `ExpressionEncoder` explicitly.
-  private implicit val kExprEnc = encoderFor(kEncoder)
-  private implicit val vExprEnc = encoderFor(vEncoder)
+  private implicit val kExprEnc: ExpressionEncoder[K] = encoderFor(kEncoder)
+  private implicit val vExprEnc: ExpressionEncoder[V] = encoderFor(vEncoder)
 
   private def logicalPlan = queryExecution.analyzed
   private def sparkSession = queryExecution.sparkSession
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
index ace22ade9159..a5c1f6613b4c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
@@ -22,7 +22,7 @@ import java.util.Locale
 import scala.jdk.CollectionConverters._
 
 import org.apache.hadoop.fs.Path
-import org.json4s.NoTypeHints
+import org.json4s.{Formats, NoTypeHints}
 import org.json4s.jackson.Serialization
 
 import org.apache.spark.SparkUpgradeException
@@ -55,7 +55,7 @@ object DataSourceUtils extends PredicateHelper {
   /**
    * Utility methods for converting partitionBy columns to options and back.
    */
-  private implicit val formats = Serialization.formats(NoTypeHints)
+  private implicit val formats: Formats = Serialization.formats(NoTypeHints)
 
   def encodePartitioningColumns(columns: Seq[String]): String = {
     Serialization.write(columns)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala
index 86ebc2e7ef14..cfe01f85cbe7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala
@@ -216,7 +216,7 @@ class ApplyInPandasWithStatePythonRunner(
         STATE_METADATA_SCHEMA_FROM_PYTHON_WORKER)
 
       stateMetadataBatch.rowIterator().asScala.take(numRows).flatMap { row =>
-        implicit val formats = org.json4s.DefaultFormats
+        implicit val formats: Formats = org.json4s.DefaultFormats
 
         // NOTE: See ApplyInPandasWithStatePythonRunner.STATE_METADATA_SCHEMA_FROM_PYTHON_WORKER
         // for the schema.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala
index 1d428e68fe60..dc52f33474c6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala
@@ -22,7 +22,7 @@ import java.nio.charset.StandardCharsets._
 
 import scala.io.{Source => IOSource}
 
-import org.json4s.NoTypeHints
+import org.json4s.{Formats, NoTypeHints}
 import org.json4s.jackson.Serialization
 
 import org.apache.spark.sql.SparkSession
@@ -82,7 +82,7 @@ case class CommitMetadata(nextBatchWatermarkMs: Long = 0) {
 }
 
 object CommitMetadata {
-  implicit val format = Serialization.formats(NoTypeHints)
+  implicit val format: Formats = Serialization.formats(NoTypeHints)
 
   def apply(json: String): CommitMetadata = Serialization.read[CommitMetadata](json)
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
index 140367b3236f..fa1beb9d15c7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
@@ -24,7 +24,7 @@ import scala.io.{Source => IOSource}
 import scala.reflect.ClassTag
 
 import org.apache.hadoop.fs.Path
-import org.json4s.NoTypeHints
+import org.json4s.{Formats, NoTypeHints}
 import org.json4s.jackson.Serialization
 
 import org.apache.spark.sql.SparkSession
@@ -49,9 +49,10 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
 
   import CompactibleFileStreamLog._
 
-  private implicit val formats = Serialization.formats(NoTypeHints)
+  private implicit val formats: Formats = Serialization.formats(NoTypeHints)
 
   /** Needed to serialize type T into JSON when using Jackson */
+  @scala.annotation.nowarn
   private implicit val manifest = Manifest.classType[T](implicitly[ClassTag[T]].runtimeClass)
 
   protected val minBatchesToRetain = sparkSession.sessionState.conf.minBatchesToRetain
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
index 94ba8b8aa515..d8aa31be4797 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
@@ -18,8 +18,6 @@
 package org.apache.spark.sql.execution.streaming
 
 import org.apache.hadoop.fs.FileStatus
-import org.json4s.NoTypeHints
-import org.json4s.jackson.Serialization
 
 import org.apache.spark.paths.SparkPath
 import org.apache.spark.sql.SparkSession
@@ -90,8 +88,6 @@ class FileStreamSinkLog(
     _retentionMs: Option[Long] = None)
   extends CompactibleFileStreamLog[SinkFileStatus](metadataLogVersion, sparkSession, path) {
 
-  private implicit val formats = Serialization.formats(NoTypeHints)
-
   protected override val fileCleanupDelayMs = sparkSession.sessionState.conf.fileSinkLogCleanupDelay
 
   protected override val isDeletingExpiredLog = sparkSession.sessionState.conf.fileSinkLogDeletion
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
index ceadbca2b122..14653864a292 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
@@ -22,9 +22,6 @@ import java.util.Map.Entry
 
 import scala.collection.mutable
 
-import org.json4s.NoTypeHints
-import org.json4s.jackson.Serialization
-
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry
 import org.apache.spark.sql.internal.SQLConf
@@ -52,8 +49,6 @@ class FileStreamSourceLog(
 
   protected override val isDeletingExpiredLog = sparkSession.sessionState.conf.fileSourceLogDeletion
 
-  private implicit val formats = Serialization.formats(NoTypeHints)
-
   // A fixed size log entry cache to cache the file entries belong to the compaction batch. It is
   // used to avoid scanning the compacted log file to retrieve it's own batch data.
   private val cacheSize = compactInterval
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala
index a2b49d944a68..ba79c77f3867 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceOffset.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.streaming
 
 import scala.util.control.Exception._
 
-import org.json4s.NoTypeHints
+import org.json4s.{Formats, NoTypeHints}
 import org.json4s.jackson.Serialization
 
 /**
@@ -34,7 +34,7 @@ case class FileStreamSourceOffset(logOffset: Long) extends Offset {
 }
 
 object FileStreamSourceOffset {
-  implicit val format = Serialization.formats(NoTypeHints)
+  implicit val format: Formats = Serialization.formats(NoTypeHints)
 
   def apply(offset: Offset): FileStreamSourceOffset = {
     offset match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
index 3af9c9aebf33..c9ade7b568e8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala
@@ -246,7 +246,7 @@ private[sql] object GroupStateImpl {
   }
 
   def fromJson[S](value: Option[S], json: JValue): GroupStateImpl[S] = {
-    implicit val formats = org.json4s.DefaultFormats
+    implicit val formats: Formats = org.json4s.DefaultFormats
 
     val hmap = json.extract[Map[String, Any]]
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index a7b0483ea08a..79627030e1eb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -26,7 +26,7 @@ import scala.reflect.ClassTag
 
 import org.apache.commons.io.IOUtils
 import org.apache.hadoop.fs._
-import org.json4s.NoTypeHints
+import org.json4s.{Formats, NoTypeHints}
 import org.json4s.jackson.Serialization
 
 import org.apache.spark.internal.Logging
@@ -50,9 +50,10 @@ import org.apache.spark.util.ArrayImplicits._
 class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: String)
   extends MetadataLog[T] with Logging {
 
-  private implicit val formats = Serialization.formats(NoTypeHints)
+  private implicit val formats: Formats = Serialization.formats(NoTypeHints)
 
   /** Needed to serialize type T into JSON when using Jackson */
+  @scala.annotation.nowarn
   private implicit val manifest = Manifest.classType[T](implicitly[ClassTag[T]].runtimeClass)
 
   // Avoid serializing generic sequences, see SPARK-17372
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
index dea75e3ec478..006d6221e55a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.streaming
 
-import org.json4s.NoTypeHints
+import org.json4s.{Formats, NoTypeHints}
 import org.json4s.jackson.Serialization
 
 import org.apache.spark.internal.Logging
@@ -89,7 +89,7 @@ case class OffsetSeqMetadata(
 }
 
 object OffsetSeqMetadata extends Logging {
-  private implicit val format = Serialization.formats(NoTypeHints)
+  private implicit val format: Formats = Serialization.formats(NoTypeHints)
   /**
    * These configs are related to streaming query execution and should not be changed across
    * batches of a streaming query. The values of these configs are persisted into the offset
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
index cb18988b4687..978cb3c34f60 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala
@@ -25,7 +25,7 @@ import scala.util.control.NonFatal
 import org.apache.commons.io.IOUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileAlreadyExistsException, FSDataInputStream, Path}
-import org.json4s.NoTypeHints
+import org.json4s.{Formats, NoTypeHints}
 import org.json4s.jackson.Serialization
 
 import org.apache.spark.internal.Logging
@@ -45,7 +45,7 @@ case class StreamMetadata(id: String) {
 }
 
 object StreamMetadata extends Logging {
-  implicit val format = Serialization.formats(NoTypeHints)
+  implicit val format: Formats = Serialization.formats(NoTypeHints)
 
   /** Read the metadata from file if it exists */
   def read(metadataFile: Path, hadoopConf: Configuration): Option[StreamMetadata] = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala
index 3f5f81bee4e8..5463a1fa4e99 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala
@@ -25,7 +25,7 @@ import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.mutable.ListBuffer
 
-import org.json4s.{DefaultFormats, NoTypeHints}
+import org.json4s.{DefaultFormats, Formats, NoTypeHints}
 import org.json4s.jackson.Serialization
 
 import org.apache.spark.SparkEnv
@@ -286,6 +286,6 @@ class TextSocketContinuousPartitionReader(
 }
 
 case class TextSocketOffset(offsets: List[Int]) extends Offset {
-  private implicit val formats = Serialization.formats(NoTypeHints)
+  private implicit val formats: Formats = Serialization.formats(NoTypeHints)
   override def json: String = Serialization.write(offsets)
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
index dc97386d8fcf..d0ba95ffd8a2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
@@ -22,7 +22,7 @@ import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.mutable.ListBuffer
 
-import org.json4s.NoTypeHints
+import org.json4s.{Formats, NoTypeHints}
 import org.json4s.jackson.Serialization
 
 import org.apache.spark.{SparkEnv, TaskContext}
@@ -46,7 +46,7 @@ import org.apache.spark.util.RpcUtils
 class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPartitions: Int = 2)
   extends MemoryStreamBase[A](sqlContext) with ContinuousStream {
 
-  private implicit val formats = Serialization.formats(NoTypeHints)
+  private implicit val formats: Formats = Serialization.formats(NoTypeHints)
 
   // ContinuousReader implementation
 
@@ -182,6 +182,6 @@ class ContinuousMemoryStreamPartitionReader(
 
 case class ContinuousMemoryStreamOffset(partitionNums: Map[Int, Int])
   extends Offset {
-  private implicit val formats = Serialization.formats(NoTypeHints)
+  private implicit val formats: Formats = Serialization.formats(NoTypeHints)
   override def json(): String = Serialization.write(partitionNums)
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchStream.scala
index 6954e4534e49..8dca7d40704a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchStream.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchStream.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.streaming.sources
 
-import org.json4s.NoTypeHints
+import org.json4s.{Formats, NoTypeHints}
 import org.json4s.jackson.Serialization
 
 import org.apache.spark.internal.Logging
@@ -130,7 +130,7 @@ case class RatePerMicroBatchStreamOffset(offset: Long, timestamp: Long) extends
 }
 
 object RatePerMicroBatchStreamOffset {
-  implicit val formats = Serialization.formats(NoTypeHints)
+  implicit val formats: Formats = Serialization.formats(NoTypeHints)
 
   def apply(json: String): RatePerMicroBatchStreamOffset =
     Serialization.read[RatePerMicroBatchStreamOffset](json)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
index 4ef39ddfd255..b58c805af9d6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala
@@ -24,7 +24,7 @@ import scala.reflect.ClassTag
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FSDataOutputStream, Path}
-import org.json4s.NoTypeHints
+import org.json4s.{Formats, NoTypeHints}
 import org.json4s.jackson.Serialization
 
 import org.apache.spark.internal.Logging
@@ -64,8 +64,9 @@ case class OperatorStateMetadataV1(
 
 object OperatorStateMetadataV1 {
 
-  private implicit val formats = Serialization.formats(NoTypeHints)
+  private implicit val formats: Formats = Serialization.formats(NoTypeHints)
 
+  @scala.annotation.nowarn
   private implicit val manifest = Manifest
     .classType[OperatorStateMetadataV1](implicitly[ClassTag[OperatorStateMetadataV1]].runtimeClass)
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 3dfc27b14c24..feb745b04023 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -26,7 +26,7 @@ import scala.ref.WeakReference
 import scala.util.Try
 
 import org.apache.hadoop.conf.Configuration
-import org.json4s.NoTypeHints
+import org.json4s.{Formats, NoTypeHints}
 import org.json4s.jackson.Serialization
 import org.rocksdb.{RocksDB => NativeRocksDB, _}
 import org.rocksdb.CompressionType._
@@ -886,7 +886,7 @@ case class RocksDBMetrics(
 }
 
 object RocksDBMetrics {
-  val format = Serialization.formats(NoTypeHints)
+  val format: Formats = Serialization.formats(NoTypeHints)
 }
 
 /** Class to wrap RocksDB's native histogram */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index 654d52bf9165..e6d9b6f8d96a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -34,7 +34,7 @@ import com.fasterxml.jackson.module.scala.{ClassTagExtensions, DefaultScalaModul
 import org.apache.commons.io.{FilenameUtils, IOUtils}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path, PathFilter}
-import org.json4s.NoTypeHints
+import org.json4s.{Formats, NoTypeHints}
 import org.json4s.jackson.Serialization
 
 import org.apache.spark.{SparkConf, SparkEnv}
@@ -715,7 +715,7 @@ case class RocksDBCheckpointMetadata(
 object RocksDBCheckpointMetadata {
   val VERSION = 1
 
-  implicit val format = Serialization.formats(NoTypeHints)
+  implicit val format: Formats = Serialization.formats(NoTypeHints)
 
   /** Used to convert between classes and JSON. */
   lazy val mapper = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
index 2de2d90e7dd2..5cdafc90cfec 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
@@ -231,7 +231,7 @@ case class FooAgg(s: Int) extends Aggregator[Row, Int, Int] {
 class DatasetAggregatorSuite extends QueryTest with SharedSparkSession {
   import testImplicits._
 
-  private implicit val ordering = Ordering.by((c: AggData) => c.a -> c.b)
+  private implicit val ordering: Ordering[AggData] = Ordering.by((c: AggData) => c.a -> c.b)
 
   test("typed aggregation: complex result type") {
     val ds = Seq("a" -> 1, "a" -> 3, "b" -> 3).toDS()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 739d1f6827d5..cd28c60d83c7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -71,7 +71,7 @@ class DatasetSuite extends QueryTest
   with AdaptiveSparkPlanHelper {
   import testImplicits._
 
-  private implicit val ordering = Ordering.by((c: ClassData) => c.a -> c.b)
+  private implicit val ordering: Ordering[ClassData] = Ordering.by((c: ClassData) => c.a -> c.b)
 
   test("checkAnswer should compare map correctly") {
     val data = Seq((1, "2", Map(1 -> 2, 2 -> 1)))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index e685b40a800f..2504cd17acfc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -21,7 +21,7 @@ import java.util.UUID
 
 import scala.collection.mutable
 
-import org.scalactic.TolerantNumerics
+import org.scalactic.{Equality, TolerantNumerics}
 import org.scalatest.BeforeAndAfter
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
 import org.scalatest.concurrent.Waiters.Waiter
@@ -44,7 +44,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
   import testImplicits._
 
   // To make === between double tolerate inexact values
-  implicit val doubleEquality = TolerantNumerics.tolerantDoubleEquality(0.01)
+  implicit val doubleEquality: Equality[Double] = TolerantNumerics.tolerantDoubleEquality(0.01)
 
   after {
     spark.streams.active.foreach(_.stop())
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index a265bd9339b8..7b3d89979470 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -29,7 +29,7 @@ import org.apache.commons.io.FileUtils
 import org.apache.commons.lang3.RandomStringUtils
 import org.apache.hadoop.fs.Path
 import org.mockito.Mockito.when
-import org.scalactic.TolerantNumerics
+import org.scalactic.{Equality, TolerantNumerics}
 import org.scalatest.BeforeAndAfter
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
 import org.scalatestplus.mockito.MockitoSugar
@@ -60,7 +60,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
   import testImplicits._
 
   // To make === between double tolerate inexact values
-  implicit val doubleEquality = TolerantNumerics.tolerantDoubleEquality(0.01)
+  implicit val doubleEquality: Equality[Double] = TolerantNumerics.tolerantDoubleEquality(0.01)
 
   after {
     sqlContext.streams.active.foreach(_.stop())
diff --git a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala
index e94faf5e0668..fc9e4513d12e 100644
--- a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceWithActualMetricsSuite.scala
@@ -48,7 +48,7 @@ class SqlResourceWithActualMetricsSuite
   // Exclude nodes which may not have the metrics
   val excludedNodes = List("WholeStageCodegen", "Project", "SerializeFromObject")
 
-  implicit val formats = new DefaultFormats {
+  implicit val formats: DefaultFormats = new DefaultFormats {
     override def dateFormatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")
   }
 
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index ddfe01bb7ac2..805d93bee0ee 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -25,7 +25,7 @@ import java.util.{Locale, UUID}
 
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future, Promise}
 import scala.concurrent.duration._
 import scala.io.Source
 import scala.jdk.CollectionConverters._
@@ -441,7 +441,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftServer2Test {
         s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map")
 
       queries.foreach(statement.execute)
-      implicit val ec = ExecutionContext.fromExecutorService(
+      implicit val ec: ExecutionContextExecutorService = ExecutionContext.fromExecutorService(
         ThreadUtils.newDaemonSingleThreadExecutor("test-jdbc-cancel"))
       try {
         // Start a very-long-running query that will take hours to finish, then cancel it in order
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Time.scala b/streaming/src/main/scala/org/apache/spark/streaming/Time.scala
index 92cfd7d40338..9681245baa0f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Time.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Time.scala
@@ -89,5 +89,5 @@ case class Time(private val millis: Long) {
 }
 
 object Time {
-  implicit val ordering = Ordering.by((time: Time) => time.millis)
+  implicit val ordering: Ordering[Time] = Ordering.by((time: Time) => time.millis)
 }
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index 7087f167003d..e5b98dd714b3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.streaming.receiver
 
-import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future}
 import scala.concurrent.duration._
 
 import org.apache.hadoop.conf.Configuration
@@ -159,8 +159,8 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
 
   // For processing futures used in parallel block storing into block manager and write ahead log
   // # threads = 2, so that both writing to BM and WAL can proceed in parallel
-  implicit private val executionContext = ExecutionContext.fromExecutorService(
-    ThreadUtils.newDaemonFixedThreadPool(2, this.getClass.getSimpleName))
+  implicit private val executionContext: ExecutionContextExecutorService = ExecutionContext
+    .fromExecutorService(ThreadUtils.newDaemonFixedThreadPool(2, this.getClass.getSimpleName))
 
   /**
    * This implementation stores the block into the block manager as well as a write ahead log.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org