You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2019/04/29 16:02:20 UTC

[spark] branch master updated: [SPARK-27536][CORE][ML][SQL][STREAMING] Remove most use of scala.language.existentials

This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a17d26  [SPARK-27536][CORE][ML][SQL][STREAMING] Remove most use of scala.language.existentials
8a17d26 is described below

commit 8a17d26784c53fb50b6373b566aab71135c8956f
Author: Sean Owen <se...@databricks.com>
AuthorDate: Mon Apr 29 11:02:01 2019 -0500

    [SPARK-27536][CORE][ML][SQL][STREAMING] Remove most use of scala.language.existentials
    
    ## What changes were proposed in this pull request?
    
    I want to get rid of as much use of `scala.language.existentials` as possible for 3.0. It's a complicated language feature that generates warnings unless this value is imported. It might even be on the way out of Scala: https://contributors.scala-lang.org/t/proposal-to-remove-existential-types-from-the-language/2785
    
    For Spark, it comes up mostly where the code plays fast and loose with generic types, not the advanced situations you'll often see referenced where this feature is explained. For example, it comes up in cases where a function returns something like `(String, Class[_])`. Scala doesn't like matching this to any other instance of `(String, Class[_])` because doing so requires inferring the existence of some type that satisfies both. Seems obvious if the generic type is a wildcard, but, n [...]
    
    This is a large PR, and it only gets rid of _most_ instances of `scala.language.existentials`. The change should be all compile-time and shouldn't affect APIs or logic.
    
    Many of the changes simply touch up sloppiness about generic types, making the known correct value explicit in the code.
    
    Some fixes involve being more explicit about the existence of generic types in methods. For instance, `def foo(arg: Class[_])` seems innocent enough but should really be declared `def foo[T](arg: Class[T])` to let Scala select and fix a single type when evaluating calls to `foo`.
    
    For kind of surprising reasons, this comes up in places where code evaluates a tuple of things that involve a generic type, but is OK if the two parts of the tuple are evaluated separately.
    
    One key change was altering `Utils.classForName(...): Class[_]` to the more correct `Utils.classForName[T](...): Class[T]`. This caused a number of small but positive changes to callers that otherwise had to cast the result.
    
    In several tests, `Dataset[_]` was used where `DataFrame` seems to be the clear intent.
    
    Finally, in a few cases in MLlib, the return type `this.type` was used where there are no subclasses of the class that uses it. This really isn't needed and causes issues for Scala reasoning about the return type. These are just changed to be concrete classes as return types.
    
    After this change, we have only a few classes that still import `scala.language.existentials` (because modifying them would require extensive rewrites to fix) and no build warnings.
    
    ## How was this patch tested?
    
    Existing tests.
    
    Closes #24431 from srowen/SPARK-27536.
    
    Authored-by: Sean Owen <se...@databricks.com>
    Signed-off-by: Sean Owen <se...@databricks.com>
---
 .../apache/spark/api/python/PythonHadoopUtil.scala | 13 +++--
 .../org/apache/spark/api/python/PythonRDD.scala    | 56 +++++++++++-----------
 .../org/apache/spark/api/r/RBackendHandler.scala   |  2 -
 .../spark/deploy/history/HistoryServer.scala       |  4 +-
 .../spark/internal/io/FileCommitProtocol.scala     |  2 +-
 .../org/apache/spark/io/CompressionCodec.scala     |  5 +-
 .../org/apache/spark/metrics/MetricsSystem.scala   | 19 +++++---
 .../spark/network/netty/NettyBlockRpcServer.scala  | 22 ++++-----
 .../scala/org/apache/spark/rdd/CoGroupedRDD.scala  |  1 -
 .../scala/org/apache/spark/rdd/CoalescedRDD.scala  |  1 -
 .../org/apache/spark/scheduler/DAGScheduler.scala  |  4 +-
 .../apache/spark/scheduler/DAGSchedulerEvent.scala |  2 -
 .../apache/spark/scheduler/ShuffleMapTask.scala    |  7 ++-
 .../apache/spark/security/SocketAuthServer.scala   |  1 -
 .../apache/spark/serializer/KryoSerializer.scala   |  4 +-
 .../org/apache/spark/util/ClosureCleaner.scala     |  5 +-
 .../main/scala/org/apache/spark/util/Utils.scala   | 14 +++---
 .../org/apache/spark/ContextCleanerSuite.scala     | 10 ++--
 .../test/scala/org/apache/spark/FileSuite.scala    |  7 +--
 .../scheduler/SchedulerIntegrationSuite.scala      |  9 ++--
 .../spark/util/MutableURLClassLoaderSuite.scala    |  3 +-
 .../kafka010/KafkaDelegationTokenProvider.scala    |  1 -
 .../org/apache/spark/kafka010/KafkaTokenUtil.scala |  5 +-
 .../apache/spark/graphx/util/BytecodeUtils.scala   |  8 ++--
 .../ml/classification/LogisticRegression.scala     |  2 +-
 .../apache/spark/ml/feature/StringIndexer.scala    |  2 -
 .../apache/spark/ml/feature/VectorAssembler.scala  | 13 +++--
 .../org/apache/spark/ml/image/HadoopUtils.scala    |  7 ++-
 .../org/apache/spark/ml/recommendation/ALS.scala   |  4 +-
 .../spark/mllib/api/python/PythonMLLibAPI.scala    |  1 -
 .../scala/org/apache/spark/mllib/fpm/FPTree.scala  |  4 +-
 .../classification/LogisticRegressionSuite.scala   | 13 +++--
 .../spark/ml/clustering/BisectingKMeansSuite.scala | 11 ++---
 .../spark/ml/clustering/GaussianMixtureSuite.scala | 14 +++---
 .../apache/spark/ml/clustering/KMeansSuite.scala   |  3 +-
 .../org/apache/spark/ml/clustering/LDASuite.scala  |  7 +--
 .../ml/evaluation/ClusteringEvaluatorSuite.scala   | 10 ++--
 .../apache/spark/ml/recommendation/ALSSuite.scala  |  1 -
 .../org/apache/spark/ml/util/MLTestingUtils.scala  |  5 +-
 .../org/apache/spark/mllib/fpm/FPGrowthSuite.scala |  6 +--
 .../org/apache/spark/mllib/fpm/FPTreeSuite.scala   |  2 -
 .../apache/spark/mllib/fpm/PrefixSpanSuite.scala   |  6 +--
 .../sql/catalyst/analysis/FunctionRegistry.scala   |  1 -
 .../expressions/codegen/CodeGenerator.scala        |  1 -
 .../catalyst/expressions/codegen/javaCode.scala    |  2 +-
 .../sql/catalyst/expressions/objects/objects.scala |  2 -
 .../spark/sql/catalyst/plans/logical/object.scala  | 17 +++----
 .../sql/catalyst/util/ArrayBasedMapData.scala      |  5 +-
 .../org/apache/spark/sql/types/DataType.scala      |  2 +-
 .../org/apache/spark/sql/types/ObjectType.scala    |  2 -
 .../expressions/ObjectExpressionsSuite.scala       |  3 +-
 .../org/apache/spark/sql/UDFRegistration.scala     |  4 +-
 .../aggregate/TypedAggregateExpression.scala       |  2 -
 .../sql/execution/datasources/DataSource.scala     |  3 +-
 .../scala/org/apache/spark/sql/JoinSuite.scala     |  6 +--
 .../parquet/ParquetInteroperabilitySuite.scala     |  5 +-
 .../spark/sql/sources/FilteredScanSuite.scala      |  2 -
 .../apache/spark/sql/sources/PrunedScanSuite.scala |  2 -
 .../org/apache/spark/sql/hive/TableReader.scala    |  2 +-
 .../spark/sql/hive/client/HiveClientImpl.scala     |  5 +-
 .../hive/execution/InsertIntoHiveDirCommand.scala  |  3 --
 .../hive/execution/ScriptTransformationExec.scala  | 12 ++---
 .../spark/sql/hive/execution/HiveDDLSuite.scala    |  2 -
 .../sql/hive/execution/Hive_2_1_DDLSuite.scala     |  3 --
 .../spark/streaming/dstream/WindowedDStream.scala  |  2 +-
 .../spark/streaming/receiver/ReceivedBlock.scala   |  1 -
 .../streaming/receiver/ReceivedBlockHandler.scala  |  1 -
 .../spark/streaming/scheduler/JobGenerator.scala   |  4 +-
 .../streaming/scheduler/ReceiverTracker.scala      |  1 -
 .../spark/streaming/util/WriteAheadLogUtils.scala  |  3 +-
 .../spark/streaming/BasicOperationsSuite.scala     |  7 ++-
 71 files changed, 180 insertions(+), 241 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
index 2ab8add..a4817b3 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
@@ -33,18 +33,17 @@ import org.apache.spark.util.{SerializableConfiguration, Utils}
  * A trait for use with reading custom classes in PySpark. Implement this trait and add custom
  * transformation code by overriding the convert method.
  */
-trait Converter[T, + U] extends Serializable {
+trait Converter[-T, +U] extends Serializable {
   def convert(obj: T): U
 }
 
 private[python] object Converter extends Logging {
 
-  def getInstance(converterClass: Option[String],
-                  defaultConverter: Converter[Any, Any]): Converter[Any, Any] = {
+  def getInstance[T, U](converterClass: Option[String],
+                        defaultConverter: Converter[_ >: T, _ <: U]): Converter[T, U] = {
     converterClass.map { cc =>
       Try {
-        val c = Utils.classForName(cc).getConstructor().
-          newInstance().asInstanceOf[Converter[Any, Any]]
+        val c = Utils.classForName[Converter[T, U]](cc).getConstructor().newInstance()
         logInfo(s"Loaded converter: $cc")
         c
       } match {
@@ -177,8 +176,8 @@ private[python] object PythonHadoopUtil {
    * [[org.apache.hadoop.io.Writable]], into an RDD of base types, or vice versa.
    */
   def convertRDD[K, V](rdd: RDD[(K, V)],
-                       keyConverter: Converter[Any, Any],
-                       valueConverter: Converter[Any, Any]): RDD[(Any, Any)] = {
+                       keyConverter: Converter[K, Any],
+                       valueConverter: Converter[V, Any]): RDD[(Any, Any)] = {
     rdd.map { case (k, v) => (keyConverter.convert(k), valueConverter.convert(v)) }
   }
 
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 5b492b1..182c383 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -24,10 +24,6 @@ import java.util.{ArrayList => JArrayList, List => JList, Map => JMap}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
-import scala.concurrent.Promise
-import scala.concurrent.duration.Duration
-import scala.language.existentials
-import scala.util.Try
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io.compress.CompressionCodec
@@ -228,8 +224,8 @@ private[spark] object PythonRDD extends Logging {
       batchSize: Int): JavaRDD[Array[Byte]] = {
     val keyClass = Option(keyClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
     val valueClass = Option(valueClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
-    val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
-    val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
+    val kc = Utils.classForName[K](keyClass)
+    val vc = Utils.classForName[V](valueClass)
     val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
     val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration()))
     val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
@@ -296,9 +292,9 @@ private[spark] object PythonRDD extends Logging {
       keyClass: String,
       valueClass: String,
       conf: Configuration): RDD[(K, V)] = {
-    val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
-    val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
-    val fc = Utils.classForName(inputFormatClass).asInstanceOf[Class[F]]
+    val kc = Utils.classForName[K](keyClass)
+    val vc = Utils.classForName[V](valueClass)
+    val fc = Utils.classForName[F](inputFormatClass)
     if (path.isDefined) {
       sc.sc.newAPIHadoopFile[K, V, F](path.get, fc, kc, vc, conf)
     } else {
@@ -365,9 +361,9 @@ private[spark] object PythonRDD extends Logging {
       keyClass: String,
       valueClass: String,
       conf: Configuration) = {
-    val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
-    val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
-    val fc = Utils.classForName(inputFormatClass).asInstanceOf[Class[F]]
+    val kc = Utils.classForName[K](keyClass)
+    val vc = Utils.classForName[V](valueClass)
+    val fc = Utils.classForName[F](inputFormatClass)
     if (path.isDefined) {
       sc.sc.hadoopFile(path.get, fc, kc, vc)
     } else {
@@ -425,29 +421,33 @@ private[spark] object PythonRDD extends Logging {
     PythonHadoopUtil.mergeConfs(baseConf, conf)
   }
 
-  private def inferKeyValueTypes[K, V](rdd: RDD[(K, V)], keyConverterClass: String = null,
-      valueConverterClass: String = null): (Class[_], Class[_]) = {
+  private def inferKeyValueTypes[K, V, KK, VV](rdd: RDD[(K, V)], keyConverterClass: String = null,
+      valueConverterClass: String = null): (Class[_ <: KK], Class[_ <: VV]) = {
     // Peek at an element to figure out key/value types. Since Writables are not serializable,
     // we cannot call first() on the converted RDD. Instead, we call first() on the original RDD
     // and then convert locally.
     val (key, value) = rdd.first()
-    val (kc, vc) = getKeyValueConverters(keyConverterClass, valueConverterClass,
-      new JavaToWritableConverter)
+    val (kc, vc) = getKeyValueConverters[K, V, KK, VV](
+      keyConverterClass, valueConverterClass, new JavaToWritableConverter)
     (kc.convert(key).getClass, vc.convert(value).getClass)
   }
 
-  private def getKeyValueTypes(keyClass: String, valueClass: String):
-      Option[(Class[_], Class[_])] = {
+  private def getKeyValueTypes[K, V](keyClass: String, valueClass: String):
+      Option[(Class[K], Class[V])] = {
     for {
       k <- Option(keyClass)
       v <- Option(valueClass)
     } yield (Utils.classForName(k), Utils.classForName(v))
   }
 
-  private def getKeyValueConverters(keyConverterClass: String, valueConverterClass: String,
-      defaultConverter: Converter[Any, Any]): (Converter[Any, Any], Converter[Any, Any]) = {
-    val keyConverter = Converter.getInstance(Option(keyConverterClass), defaultConverter)
-    val valueConverter = Converter.getInstance(Option(valueConverterClass), defaultConverter)
+  private def getKeyValueConverters[K, V, KK, VV](
+      keyConverterClass: String,
+      valueConverterClass: String,
+      defaultConverter: Converter[_, _]): (Converter[K, KK], Converter[V, VV]) = {
+    val keyConverter = Converter.getInstance(Option(keyConverterClass),
+      defaultConverter.asInstanceOf[Converter[K, KK]])
+    val valueConverter = Converter.getInstance(Option(valueConverterClass),
+      defaultConverter.asInstanceOf[Converter[V, VV]])
     (keyConverter, valueConverter)
   }
 
@@ -459,7 +459,7 @@ private[spark] object PythonRDD extends Logging {
                                keyConverterClass: String,
                                valueConverterClass: String,
                                defaultConverter: Converter[Any, Any]): RDD[(Any, Any)] = {
-    val (kc, vc) = getKeyValueConverters(keyConverterClass, valueConverterClass,
+    val (kc, vc) = getKeyValueConverters[K, V, Any, Any](keyConverterClass, valueConverterClass,
       defaultConverter)
     PythonHadoopUtil.convertRDD(rdd, kc, vc)
   }
@@ -470,7 +470,7 @@ private[spark] object PythonRDD extends Logging {
    * [[org.apache.hadoop.io.Writable]] types already, since Writables are not Java
    * `Serializable` and we can't peek at them. The `path` can be on any Hadoop file system.
    */
-  def saveAsSequenceFile[K, V, C <: CompressionCodec](
+  def saveAsSequenceFile[C <: CompressionCodec](
       pyRDD: JavaRDD[Array[Byte]],
       batchSerialized: Boolean,
       path: String,
@@ -489,7 +489,7 @@ private[spark] object PythonRDD extends Logging {
    * `confAsMap` is merged with the default Hadoop conf associated with the SparkContext of
    * this RDD.
    */
-  def saveAsHadoopFile[K, V, F <: OutputFormat[_, _], C <: CompressionCodec](
+  def saveAsHadoopFile[F <: OutputFormat[_, _], C <: CompressionCodec](
       pyRDD: JavaRDD[Array[Byte]],
       batchSerialized: Boolean,
       path: String,
@@ -507,7 +507,7 @@ private[spark] object PythonRDD extends Logging {
     val codec = Option(compressionCodecClass).map(Utils.classForName(_).asInstanceOf[Class[C]])
     val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
       new JavaToWritableConverter)
-    val fc = Utils.classForName(outputFormatClass).asInstanceOf[Class[F]]
+    val fc = Utils.classForName[F](outputFormatClass)
     converted.saveAsHadoopFile(path, kc, vc, fc, new JobConf(mergedConf), codec = codec)
   }
 
@@ -520,7 +520,7 @@ private[spark] object PythonRDD extends Logging {
    * `confAsMap` is merged with the default Hadoop conf associated with the SparkContext of
    * this RDD.
    */
-  def saveAsNewAPIHadoopFile[K, V, F <: NewOutputFormat[_, _]](
+  def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]](
       pyRDD: JavaRDD[Array[Byte]],
       batchSerialized: Boolean,
       path: String,
@@ -548,7 +548,7 @@ private[spark] object PythonRDD extends Logging {
    * (mapred vs. mapreduce). Keys/values are converted for output using either user specified
    * converters or, by default, [[org.apache.spark.api.python.JavaToWritableConverter]].
    */
-  def saveAsHadoopDataset[K, V](
+  def saveAsHadoopDataset(
       pyRDD: JavaRDD[Array[Byte]],
       batchSerialized: Boolean,
       confAsMap: java.util.HashMap[String, String],
diff --git a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
index aaa432d..f2f81b1 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
@@ -20,8 +20,6 @@ package org.apache.spark.api.r
 import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream}
 import java.util.concurrent.TimeUnit
 
-import scala.language.existentials
-
 import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler}
 import io.netty.channel.ChannelHandler.Sharable
 import io.netty.handler.timeout.ReadTimeoutException
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index 7c9ce14..7df36c5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -34,7 +34,6 @@ import org.apache.spark.internal.config.History
 import org.apache.spark.internal.config.UI._
 import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, UIRoot}
 import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
-import org.apache.spark.ui.JettyUtils._
 import org.apache.spark.util.{ShutdownHookManager, SystemClock, Utils}
 
 /**
@@ -274,10 +273,9 @@ object HistoryServer extends Logging {
 
     val providerName = conf.get(History.PROVIDER)
       .getOrElse(classOf[FsHistoryProvider].getName())
-    val provider = Utils.classForName(providerName)
+    val provider = Utils.classForName[ApplicationHistoryProvider](providerName)
       .getConstructor(classOf[SparkConf])
       .newInstance(conf)
-      .asInstanceOf[ApplicationHistoryProvider]
 
     val port = conf.get(History.HISTORY_SERVER_UI_PORT)
 
diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
index e6e9c9e..8540938 100644
--- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
@@ -149,7 +149,7 @@ object FileCommitProtocol extends Logging {
 
     logDebug(s"Creating committer $className; job $jobId; output=$outputPath;" +
       s" dynamic=$dynamicPartitionOverwrite")
-    val clazz = Utils.classForName(className).asInstanceOf[Class[FileCommitProtocol]]
+    val clazz = Utils.classForName[FileCommitProtocol](className)
     // First try the constructor with arguments (jobId: String, outputPath: String,
     // dynamicPartitionOverwrite: Boolean).
     // If that doesn't exist, try the one with (jobId: string, outputPath: String).
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 288c0d1..065f05e 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -77,8 +77,9 @@ private[spark] object CompressionCodec {
     val codecClass =
       shortCompressionCodecNames.getOrElse(codecName.toLowerCase(Locale.ROOT), codecName)
     val codec = try {
-      val ctor = Utils.classForName(codecClass).getConstructor(classOf[SparkConf])
-      Some(ctor.newInstance(conf).asInstanceOf[CompressionCodec])
+      val ctor =
+        Utils.classForName[CompressionCodec](codecClass).getConstructor(classOf[SparkConf])
+      Some(ctor.newInstance(conf))
     } catch {
       case _: ClassNotFoundException | _: IllegalArgumentException => None
     }
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index 8dad42b..c96640a 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -179,8 +179,8 @@ private[spark] class MetricsSystem private (
     sourceConfigs.foreach { kv =>
       val classPath = kv._2.getProperty("class")
       try {
-        val source = Utils.classForName(classPath).getConstructor().newInstance()
-        registerSource(source.asInstanceOf[Source])
+        val source = Utils.classForName[Source](classPath).getConstructor().newInstance()
+        registerSource(source)
       } catch {
         case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e)
       }
@@ -195,13 +195,18 @@ private[spark] class MetricsSystem private (
       val classPath = kv._2.getProperty("class")
       if (null != classPath) {
         try {
-          val sink = Utils.classForName(classPath)
-            .getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
-            .newInstance(kv._2, registry, securityMgr)
           if (kv._1 == "servlet") {
-            metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
+            val servlet = Utils.classForName[MetricsServlet](classPath)
+              .getConstructor(
+                classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
+              .newInstance(kv._2, registry, securityMgr)
+            metricsServlet = Some(servlet)
           } else {
-            sinks += sink.asInstanceOf[Sink]
+            val sink = Utils.classForName[Sink](classPath)
+              .getConstructor(
+                classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
+              .newInstance(kv._2, registry, securityMgr)
+            sinks += sink
           }
         } catch {
           case e: Exception =>
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
index 27f4f94..55e7109 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
@@ -20,7 +20,6 @@ package org.apache.spark.network.netty
 import java.nio.ByteBuffer
 
 import scala.collection.JavaConverters._
-import scala.language.existentials
 import scala.reflect.ClassTag
 
 import org.apache.spark.internal.Logging
@@ -66,12 +65,7 @@ class NettyBlockRpcServer(
 
       case uploadBlock: UploadBlock =>
         // StorageLevel and ClassTag are serialized as bytes using our JavaSerializer.
-        val (level: StorageLevel, classTag: ClassTag[_]) = {
-          serializer
-            .newInstance()
-            .deserialize(ByteBuffer.wrap(uploadBlock.metadata))
-            .asInstanceOf[(StorageLevel, ClassTag[_])]
-        }
+        val (level, classTag) = deserializeMetadata(uploadBlock.metadata)
         val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData))
         val blockId = BlockId(uploadBlock.blockId)
         logDebug(s"Receiving replicated block $blockId with level ${level} " +
@@ -87,12 +81,7 @@ class NettyBlockRpcServer(
       responseContext: RpcResponseCallback): StreamCallbackWithID = {
     val message =
       BlockTransferMessage.Decoder.fromByteBuffer(messageHeader).asInstanceOf[UploadBlockStream]
-    val (level: StorageLevel, classTag: ClassTag[_]) = {
-      serializer
-        .newInstance()
-        .deserialize(ByteBuffer.wrap(message.metadata))
-        .asInstanceOf[(StorageLevel, ClassTag[_])]
-    }
+    val (level, classTag) = deserializeMetadata(message.metadata)
     val blockId = BlockId(message.blockId)
     logDebug(s"Receiving replicated block $blockId with level ${level} as stream " +
       s"from ${client.getSocketAddress}")
@@ -101,5 +90,12 @@ class NettyBlockRpcServer(
     blockManager.putBlockDataAsStream(blockId, level, classTag)
   }
 
+  private def deserializeMetadata[T](metadata: Array[Byte]): (StorageLevel, ClassTag[T]) = {
+    serializer
+      .newInstance()
+      .deserialize(ByteBuffer.wrap(metadata))
+      .asInstanceOf[(StorageLevel, ClassTag[T])]
+  }
+
   override def getStreamManager(): StreamManager = streamManager
 }
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 7e76731..909f585 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -20,7 +20,6 @@ package org.apache.spark.rdd
 import java.io.{IOException, ObjectOutputStream}
 
 import scala.collection.mutable.ArrayBuffer
-import scala.language.existentials
 import scala.reflect.ClassTag
 
 import org.apache.spark._
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index 44bc40b..836d3e2 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -21,7 +21,6 @@ import java.io.{IOException, ObjectOutputStream}
 
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
-import scala.language.existentials
 import scala.reflect.ClassTag
 
 import org.apache.spark._
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index af11a31..b817eb6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -26,7 +26,6 @@ import scala.annotation.tailrec
 import scala.collection.Map
 import scala.collection.mutable.{ArrayStack, HashMap, HashSet}
 import scala.concurrent.duration._
-import scala.language.existentials
 import scala.util.control.NonFatal
 
 import org.apache.commons.lang3.SerializationUtils
@@ -382,7 +381,8 @@ private[spark] class DAGScheduler(
    * locations that are still available from the previous shuffle to avoid unnecessarily
    * regenerating data.
    */
-  def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
+  def createShuffleMapStage[K, V, C](
+      shuffleDep: ShuffleDependency[K, V, C], jobId: Int): ShuffleMapStage = {
     val rdd = shuffleDep.rdd
     checkBarrierStageWithDynamicAllocation(rdd)
     checkBarrierStageWithNumSlots(rdd)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index 54ab8f8..b514c2e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -19,8 +19,6 @@ package org.apache.spark.scheduler
 
 import java.util.Properties
 
-import scala.language.existentials
-
 import org.apache.spark._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.util.{AccumulatorV2, CallSite}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index 189e35e..710f5eb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -21,13 +21,10 @@ import java.lang.management.ManagementFactory
 import java.nio.ByteBuffer
 import java.util.Properties
 
-import scala.language.existentials
-
 import org.apache.spark._
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
-import org.apache.spark.shuffle.ShuffleWriter
 
 /**
  * A ShuffleMapTask divides the elements of an RDD into multiple buckets (based on a partitioner
@@ -85,13 +82,15 @@ private[spark] class ShuffleMapTask(
       threadMXBean.getCurrentThreadCpuTime
     } else 0L
     val ser = SparkEnv.get.closureSerializer.newInstance()
-    val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
+    val rddAndDep = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
       ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
     _executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs
     _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
       threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
     } else 0L
 
+    val rdd = rddAndDep._1
+    val dep = rddAndDep._2
     dep.shuffleWriterProcessor.write(rdd, dep, partitionId, context, partition)
   }
 
diff --git a/core/src/main/scala/org/apache/spark/security/SocketAuthServer.scala b/core/src/main/scala/org/apache/spark/security/SocketAuthServer.scala
index c65c8fd..e616d23 100644
--- a/core/src/main/scala/org/apache/spark/security/SocketAuthServer.scala
+++ b/core/src/main/scala/org/apache/spark/security/SocketAuthServer.scala
@@ -21,7 +21,6 @@ import java.net.{InetAddress, ServerSocket, Socket}
 
 import scala.concurrent.Promise
 import scala.concurrent.duration.Duration
-import scala.language.existentials
 import scala.util.Try
 
 import org.apache.spark.SparkEnv
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index c426095..3969106 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -164,8 +164,8 @@ class KryoSerializer(conf: SparkConf)
         }
         // Allow the user to register their own classes by setting spark.kryo.registrator.
         userRegistrators
-          .map(Utils.classForName(_, noSparkClassLoader = true).getConstructor().
-            newInstance().asInstanceOf[KryoRegistrator])
+          .map(Utils.classForName[KryoRegistrator](_, noSparkClassLoader = true).
+            getConstructor().newInstance())
           .foreach { reg => reg.registerClasses(kryo) }
       } catch {
         case e: Exception =>
diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
index df696a3..6d6ef5a 100644
--- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
@@ -21,7 +21,6 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
 import java.lang.invoke.SerializedLambda
 
 import scala.collection.mutable.{Map, Set, Stack}
-import scala.language.existentials
 
 import org.apache.xbean.asm7.{ClassReader, ClassVisitor, MethodVisitor, Type}
 import org.apache.xbean.asm7.Opcodes._
@@ -309,7 +308,9 @@ private[spark] object ClosureCleaner extends Logging {
       var outerPairs: List[(Class[_], AnyRef)] = outerClasses.zip(outerObjects).reverse
       var parent: AnyRef = null
       if (outerPairs.nonEmpty) {
-        val (outermostClass, outermostObject) = outerPairs.head
+        val outermostClass = outerPairs.head._1
+        val outermostObject = outerPairs.head._2
+
         if (isClosure(outermostClass)) {
           logDebug(s" + outermost object is a closure, so we clone it: ${outermostClass}")
         } else if (outermostClass.getName.startsWith("$line")) {
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 7c8648d..bed5086 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -197,14 +197,15 @@ private[spark] object Utils extends Logging {
    * Preferred alternative to Class.forName(className), as well as
    * Class.forName(className, initialize, loader) with current thread's ContextClassLoader.
    */
-  def classForName(
+  def classForName[C](
       className: String,
       initialize: Boolean = true,
-      noSparkClassLoader: Boolean = false): Class[_] = {
+      noSparkClassLoader: Boolean = false): Class[C] = {
     if (!noSparkClassLoader) {
-      Class.forName(className, initialize, getContextOrSparkClassLoader)
+      Class.forName(className, initialize, getContextOrSparkClassLoader).asInstanceOf[Class[C]]
     } else {
-      Class.forName(className, initialize, Thread.currentThread().getContextClassLoader)
+      Class.forName(className, initialize, Thread.currentThread().getContextClassLoader).
+        asInstanceOf[Class[C]]
     }
     // scalastyle:on classforname
   }
@@ -2680,10 +2681,11 @@ private[spark] object Utils extends Logging {
    * other state) and decide they do not need to be added. A log message is printed in that case.
    * Other exceptions are bubbled up.
    */
-  def loadExtensions[T](extClass: Class[T], classes: Seq[String], conf: SparkConf): Seq[T] = {
+  def loadExtensions[T <: AnyRef](
+      extClass: Class[T], classes: Seq[String], conf: SparkConf): Seq[T] = {
     classes.flatMap { name =>
       try {
-        val klass = classForName(name)
+        val klass = classForName[T](name)
         require(extClass.isAssignableFrom(klass),
           s"$name is not a subclass of ${extClass.getName()}.")
 
diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
index c16e227..6a30a1d 100644
--- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
@@ -21,7 +21,6 @@ import java.lang.ref.WeakReference
 import java.util.concurrent.TimeUnit
 
 import scala.collection.mutable.HashSet
-import scala.language.existentials
 import scala.util.Random
 
 import org.scalatest.BeforeAndAfter
@@ -70,10 +69,11 @@ abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[So
   protected def newShuffleRDD() = newPairRDD().reduceByKey(_ + _)
   protected def newBroadcast() = sc.broadcast(1 to 100)
 
-  protected def newRDDWithShuffleDependencies(): (RDD[_], Seq[ShuffleDependency[_, _, _]]) = {
-    def getAllDependencies(rdd: RDD[_]): Seq[Dependency[_]] = {
+  protected def newRDDWithShuffleDependencies():
+      (RDD[(Int, Int)], Seq[ShuffleDependency[Int, Int, Int]]) = {
+    def getAllDependencies(rdd: RDD[(Int, Int)]): Seq[Dependency[_]] = {
       rdd.dependencies ++ rdd.dependencies.flatMap { dep =>
-        getAllDependencies(dep.rdd)
+        getAllDependencies(dep.rdd.asInstanceOf[RDD[(Int, Int)]])
       }
     }
     val rdd = newShuffleRDD()
@@ -81,7 +81,7 @@ abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[So
     // Get all the shuffle dependencies
     val shuffleDeps = getAllDependencies(rdd)
       .filter(_.isInstanceOf[ShuffleDependency[_, _, _]])
-      .map(_.asInstanceOf[ShuffleDependency[_, _, _]])
+      .map(_.asInstanceOf[ShuffleDependency[Int, Int, Int]])
     (rdd, shuffleDeps)
   }
 
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 8f212f6..6651e38 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -35,7 +35,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInp
 import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
 
 import org.apache.spark.internal.config._
-import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD, RDD}
+import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.Utils
 
@@ -206,8 +206,9 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
 
     Utils.withContextClassLoader(loader) {
       sc = new SparkContext("local", "test")
-      val objs = sc.makeRDD(1 to 3).map { x =>
-        Utils.classForName(className, noSparkClassLoader = true).getConstructor().newInstance()
+      val objs = sc.makeRDD(1 to 3).map { _ =>
+        Utils.classForName[AnyRef](className, noSparkClassLoader = true).
+          getConstructor().newInstance()
       }
       val outputDir = new File(tempDir, "output").getAbsolutePath
       objs.saveAsObjectFile(outputDir)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
index 76be693..577d77e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
@@ -23,7 +23,6 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
 import scala.concurrent.Future
 import scala.concurrent.duration.{Duration, SECONDS}
-import scala.language.existentials
 import scala.reflect.ClassTag
 
 import org.scalactic.TripleEquals
@@ -52,7 +51,7 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
   var taskScheduler: TestTaskScheduler = null
   var scheduler: DAGScheduler = null
   var backend: T = _
-  // Even though the tests aren't doing much, occassionally we see flakiness from pauses over
+  // Even though the tests aren't doing much, occasionally we see flakiness from pauses over
   // a second (probably from GC?) so we leave a long timeout in here
   val duration = Duration(10, SECONDS)
 
@@ -297,11 +296,11 @@ private[spark] abstract class MockBackend(
    * Test backends should call this to get a task that has been assigned to them by the scheduler.
    * Each task should be responded to with either [[taskSuccess]] or [[taskFailed]].
    */
-  def beginTask(): (TaskDescription, Task[_]) = {
+  def beginTask[T](): (TaskDescription, Task[T]) = {
     synchronized {
       val toRun = assignedTasksWaitingToRun.remove(assignedTasksWaitingToRun.size - 1)
       runningTasks += toRun._1.taskId
-      toRun
+      toRun.asInstanceOf[(TaskDescription, Task[T])]
     }
   }
 
@@ -589,7 +588,7 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
           backend.taskSuccess(taskDescription, 4321 + partition)
       }
     }
-    withBackend(runBackend _) {
+    withBackend(() => runBackend()) {
       val jobFuture = submit(d, (0 until 30).toArray)
       awaitJobTermination(jobFuture, duration)
     }
diff --git a/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala b/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala
index 8f38ee3..597e0b9 100644
--- a/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala
@@ -134,7 +134,8 @@ class MutableURLClassLoaderSuite extends SparkFunSuite with Matchers {
 
     try {
       sc.makeRDD(1 to 5, 2).mapPartitions { x =>
-        Utils.classForName(className, noSparkClassLoader = true).getConstructor().newInstance()
+        Utils.classForName[AnyRef](className, noSparkClassLoader = true).
+          getConstructor().newInstance()
         Seq().iterator
       }.count()
     }
diff --git a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaDelegationTokenProvider.scala b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaDelegationTokenProvider.scala
index cba4b40..da12e51 100644
--- a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaDelegationTokenProvider.scala
+++ b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaDelegationTokenProvider.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.kafka010
 
-import scala.language.existentials
 import scala.util.control.NonFatal
 
 import org.apache.hadoop.conf.Configuration
diff --git a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
index e0825e5..3f9a593 100644
--- a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
+++ b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
@@ -24,7 +24,7 @@ import scala.util.control.NonFatal
 
 import org.apache.hadoop.io.Text
 import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import org.apache.hadoop.security.token.Token
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions}
@@ -47,7 +47,8 @@ private[spark] object KafkaTokenUtil extends Logging {
     override def getKind: Text = TOKEN_KIND
   }
 
-  private[kafka010] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = {
+  private[kafka010] def obtainToken(sparkConf: SparkConf):
+      (Token[KafkaDelegationTokenIdentifier], Long) = {
     checkProxyUser()
 
     val adminClient = AdminClient.create(createAdminClientProperties(sparkConf))
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala
index 4ea09ec..5ece5ae 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala
@@ -20,7 +20,6 @@ package org.apache.spark.graphx.util
 import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
 
 import scala.collection.mutable.HashSet
-import scala.language.existentials
 
 import org.apache.xbean.asm7.{ClassReader, ClassVisitor, MethodVisitor}
 import org.apache.xbean.asm7.Opcodes._
@@ -48,7 +47,7 @@ private[graphx] object BytecodeUtils {
           return true
         }
       }
-      return false
+      false
     }
   }
 
@@ -59,7 +58,8 @@ private[graphx] object BytecodeUtils {
     var stack = List[(Class[_], String)]((cls, method))
 
     while (stack.nonEmpty) {
-      val (c, m) = stack.head
+      val c = stack.head._1
+      val m = stack.head._2
       stack = stack.tail
       seen.add((c, m))
       val finder = new MethodInvocationFinder(c.getName, m)
@@ -72,7 +72,7 @@ private[graphx] object BytecodeUtils {
         }
       }
     }
-    return false
+    false
   }
 
   /**
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
index c0f2366..7790de0 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
@@ -1290,7 +1290,7 @@ private[ml] class MultiClassSummarizer extends Serializable {
    * @param weight The weight of this instances.
    * @return This MultilabelSummarizer
    */
-  def add(label: Double, weight: Double = 1.0): this.type = {
+  def add(label: Double, weight: Double = 1.0): MultiClassSummarizer = {
     require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0")
 
     if (weight == 0.0) return this
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
index f2e6012..94f40c3 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.ml.feature
 
-import scala.language.existentials
-
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.SparkException
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala
index 57e23d5..e6e9bdf 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala
@@ -20,7 +20,6 @@ package org.apache.spark.ml.feature
 import java.util.NoSuchElementException
 
 import scala.collection.mutable
-import scala.language.existentials
 
 import org.apache.spark.SparkException
 import org.apache.spark.annotation.Since
@@ -131,14 +130,14 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String)
           throw new SparkException(s"VectorAssembler does not support the $otherType type")
       }
     }
-    val featureAttributes = featureAttributesMap.flatten[Attribute].toArray
-    val lengths = featureAttributesMap.map(a => a.length).toArray
+    val featureAttributes = featureAttributesMap.flatten[Attribute]
+    val lengths = featureAttributesMap.map(a => a.length)
     val metadata = new AttributeGroup($(outputCol), featureAttributes).toMetadata()
-    val (filteredDataset, keepInvalid) = $(handleInvalid) match {
-      case VectorAssembler.SKIP_INVALID => (dataset.na.drop($(inputCols)), false)
-      case VectorAssembler.KEEP_INVALID => (dataset, true)
-      case VectorAssembler.ERROR_INVALID => (dataset, false)
+    val filteredDataset = $(handleInvalid) match {
+      case VectorAssembler.SKIP_INVALID => dataset.na.drop($(inputCols))
+      case VectorAssembler.KEEP_INVALID | VectorAssembler.ERROR_INVALID => dataset
     }
+    val keepInvalid = $(handleInvalid) == VectorAssembler.KEEP_INVALID
     // Data transformation.
     val assembleFunc = udf { r: Row =>
       VectorAssembler.assemble(lengths, keepInvalid)(r.toSeq: _*)
diff --git a/mllib/src/main/scala/org/apache/spark/ml/image/HadoopUtils.scala b/mllib/src/main/scala/org/apache/spark/ml/image/HadoopUtils.scala
index 1fae1dc..faa8589 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/image/HadoopUtils.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/image/HadoopUtils.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.ml.image
 
-import scala.language.existentials
 import scala.util.Random
 
 import org.apache.commons.io.FilenameUtils
@@ -103,7 +102,7 @@ private object SamplePathFilter {
       // scalastyle:off hadoopconfiguration
       val hadoopConf = spark.sparkContext.hadoopConfiguration
       // scalastyle:on hadoopconfiguration
-      val old = Option(hadoopConf.getClass(flagName, null))
+      val old = hadoopConf.getClass(flagName, null)
       hadoopConf.setDouble(SamplePathFilter.ratioParam, sampleRatio)
       hadoopConf.setLong(SamplePathFilter.seedParam, seed)
       hadoopConf.setClass(flagName, classOf[SamplePathFilter], classOf[PathFilter])
@@ -111,8 +110,8 @@ private object SamplePathFilter {
         hadoopConf.unset(SamplePathFilter.ratioParam)
         hadoopConf.unset(SamplePathFilter.seedParam)
         old match {
-          case Some(v) => hadoopConf.setClass(flagName, v, classOf[PathFilter])
-          case None => hadoopConf.unset(flagName)
+          case null => hadoopConf.unset(flagName)
+          case v => hadoopConf.setClass(flagName, v, classOf[PathFilter])
         }
       }
     } else {
diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
index 50ef433..4fd3ed5 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
@@ -847,7 +847,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
     }
 
     /** Adds an observation. */
-    def add(a: Array[Float], b: Double, c: Double = 1.0): this.type = {
+    def add(a: Array[Float], b: Double, c: Double = 1.0): NormalEquation = {
       require(c >= 0.0)
       require(a.length == k)
       copyToDouble(a)
@@ -859,7 +859,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
     }
 
     /** Merges another normal equation object. */
-    def merge(other: NormalEquation): this.type = {
+    def merge(other: NormalEquation): NormalEquation = {
       require(other.k == k)
       blas.daxpy(ata.length, 1.0, other.ata, 1, ata, 1)
       blas.daxpy(atb.length, 1.0, other.atb, 1, atb, 1)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index 438a616..3e1bbba 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -23,7 +23,6 @@ import java.nio.charset.StandardCharsets
 import java.util.{ArrayList => JArrayList, List => JList, Map => JMap}
 
 import scala.collection.JavaConverters._
-import scala.language.existentials
 import scala.reflect.ClassTag
 
 import net.razorvine.pickle._
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPTree.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPTree.scala
index b0fa287..6ae8e20 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPTree.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPTree.scala
@@ -33,7 +33,7 @@ private[fpm] class FPTree[T] extends Serializable {
   private val summaries: mutable.Map[T, Summary[T]] = mutable.Map.empty
 
   /** Adds a transaction with count. */
-  def add(t: Iterable[T], count: Long = 1L): this.type = {
+  def add(t: Iterable[T], count: Long = 1L): FPTree[T] = {
     require(count > 0)
     var curr = root
     curr.count += count
@@ -53,7 +53,7 @@ private[fpm] class FPTree[T] extends Serializable {
   }
 
   /** Merges another FP-Tree. */
-  def merge(other: FPTree[T]): this.type = {
+  def merge(other: FPTree[T]): FPTree[T] = {
     other.transactions.foreach { case (t, c) =>
       add(t, c)
     }
diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
index 334f92b..6dea4b1 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.ml.classification
 
 import scala.collection.JavaConverters._
-import scala.language.existentials
 import scala.util.Random
 import scala.util.control.Breaks._
 
@@ -31,7 +30,7 @@ import org.apache.spark.ml.optim.aggregator.LogisticAggregator
 import org.apache.spark.ml.param.{ParamMap, ParamsSuite}
 import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
 import org.apache.spark.ml.util.TestingUtils._
-import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.{DataFrame, Row}
 import org.apache.spark.sql.functions.{col, lit, rand}
 import org.apache.spark.sql.types.LongType
 
@@ -40,11 +39,11 @@ class LogisticRegressionSuite extends MLTest with DefaultReadWriteTest {
   import testImplicits._
 
   private val seed = 42
-  @transient var smallBinaryDataset: Dataset[_] = _
-  @transient var smallMultinomialDataset: Dataset[_] = _
-  @transient var binaryDataset: Dataset[_] = _
-  @transient var multinomialDataset: Dataset[_] = _
-  @transient var multinomialDatasetWithZeroVar: Dataset[_] = _
+  @transient var smallBinaryDataset: DataFrame = _
+  @transient var smallMultinomialDataset: DataFrame = _
+  @transient var binaryDataset: DataFrame = _
+  @transient var multinomialDataset: DataFrame = _
+  @transient var multinomialDatasetWithZeroVar: DataFrame = _
   private val eps: Double = 1e-5
 
   override def beforeAll(): Unit = {
diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala
index 5708097..12122c0 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala
@@ -17,15 +17,13 @@
 
 package org.apache.spark.ml.clustering
 
-import scala.language.existentials
-
 import org.apache.spark.SparkException
 import org.apache.spark.ml.linalg.{Vector, Vectors}
 import org.apache.spark.ml.param.ParamMap
 import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
 import org.apache.spark.ml.util.TestingUtils._
 import org.apache.spark.mllib.clustering.DistanceMeasure
-import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.DataFrame
 
 
 class BisectingKMeansSuite extends MLTest with DefaultReadWriteTest {
@@ -33,9 +31,8 @@ class BisectingKMeansSuite extends MLTest with DefaultReadWriteTest {
   import testImplicits._
 
   final val k = 5
-  @transient var dataset: Dataset[_] = _
-
-  @transient var sparseDataset: Dataset[_] = _
+  @transient var dataset: DataFrame = _
+  @transient var sparseDataset: DataFrame = _
 
   override def beforeAll(): Unit = {
     super.beforeAll()
@@ -191,7 +188,7 @@ class BisectingKMeansSuite extends MLTest with DefaultReadWriteTest {
   }
 
   test("BisectingKMeans with Array input") {
-    def trainAndComputeCost(dataset: Dataset[_]): Double = {
+    def trainAndComputeCost(dataset: DataFrame): Double = {
       val model = new BisectingKMeans().setK(k).setMaxIter(1).setSeed(1).fit(dataset)
       model.computeCost(dataset)
     }
diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala
index 11fdd3a..133536f 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala
@@ -17,15 +17,13 @@
 
 package org.apache.spark.ml.clustering
 
-import scala.language.existentials
-
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.ml.linalg.{DenseMatrix, Matrices, Vector, Vectors}
 import org.apache.spark.ml.param.ParamMap
 import org.apache.spark.ml.stat.distribution.MultivariateGaussian
 import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
 import org.apache.spark.ml.util.TestingUtils._
-import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.{DataFrame, Dataset, Row}
 
 
 class GaussianMixtureSuite extends MLTest with DefaultReadWriteTest {
@@ -35,11 +33,11 @@ class GaussianMixtureSuite extends MLTest with DefaultReadWriteTest {
 
   final val k = 5
   private val seed = 538009335
-  @transient var dataset: Dataset[_] = _
-  @transient var denseDataset: Dataset[_] = _
-  @transient var sparseDataset: Dataset[_] = _
-  @transient var decompositionDataset: Dataset[_] = _
-  @transient var rDataset: Dataset[_] = _
+  @transient var dataset: DataFrame = _
+  @transient var denseDataset: DataFrame = _
+  @transient var sparseDataset: DataFrame = _
+  @transient var decompositionDataset: DataFrame = _
+  @transient var rDataset: DataFrame = _
 
   override def beforeAll(): Unit = {
     super.beforeAll()
diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala
index 5d439a2..e3c82fa 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.ml.clustering
 
-import scala.language.existentials
 import scala.util.Random
 
 import org.dmg.pmml.PMML
@@ -40,7 +39,7 @@ class KMeansSuite extends MLTest with DefaultReadWriteTest with PMMLReadWriteTes
   import testImplicits._
 
   final val k = 5
-  @transient var dataset: Dataset[_] = _
+  @transient var dataset: DataFrame = _
 
   override def beforeAll(): Unit = {
     super.beforeAll()
diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala
index bbd5408..d089822 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.ml.clustering
 
-import scala.language.existentials
-
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.ml.linalg.{Vector, Vectors}
@@ -64,7 +62,7 @@ class LDASuite extends MLTest with DefaultReadWriteTest {
 
   val k: Int = 5
   val vocabSize: Int = 30
-  @transient var dataset: Dataset[_] = _
+  @transient var dataset: DataFrame = _
 
   override def beforeAll(): Unit = {
     super.beforeAll()
@@ -329,8 +327,7 @@ class LDASuite extends MLTest with DefaultReadWriteTest {
       (model.logLikelihood(dataset), model.logPerplexity(dataset))
     }
 
-    val (newDataset, newDatasetD, newDatasetF) = MLTestingUtils.generateArrayFeatureDataset(dataset)
-    val (ll, lp) = trainAndLogLikelihoodAndPerplexity(newDataset)
+    val (_, newDatasetD, newDatasetF) = MLTestingUtils.generateArrayFeatureDataset(dataset)
     val (llD, lpD) = trainAndLogLikelihoodAndPerplexity(newDatasetD)
     val (llF, lpF) = trainAndLogLikelihoodAndPerplexity(newDatasetF)
     // TODO: need to compare the results once we fix the seed issue for LDA (SPARK-22210)
diff --git a/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala
index e2d7756..1d65faa 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.ml.param.ParamsSuite
 import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
 import org.apache.spark.ml.util.TestingUtils._
 import org.apache.spark.mllib.util.MLlibTestSparkContext
-import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.DataFrame
 
 
 class ClusteringEvaluatorSuite
@@ -32,10 +32,10 @@ class ClusteringEvaluatorSuite
 
   import testImplicits._
 
-  @transient var irisDataset: Dataset[_] = _
-  @transient var newIrisDataset: Dataset[_] = _
-  @transient var newIrisDatasetD: Dataset[_] = _
-  @transient var newIrisDatasetF: Dataset[_] = _
+  @transient var irisDataset: DataFrame = _
+  @transient var newIrisDataset: DataFrame = _
+  @transient var newIrisDatasetD: DataFrame = _
+  @transient var newIrisDatasetF: DataFrame = _
 
   override def beforeAll(): Unit = {
     super.beforeAll()
diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
index 2fc9754..6d0321c 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
@@ -23,7 +23,6 @@ import java.util.Random
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.{ArrayBuffer, WrappedArray}
-import scala.language.existentials
 
 import com.github.fommil.netlib.BLAS.{getInstance => blas}
 import org.apache.commons.io.FileUtils
diff --git a/mllib/src/test/scala/org/apache/spark/ml/util/MLTestingUtils.scala b/mllib/src/test/scala/org/apache/spark/ml/util/MLTestingUtils.scala
index 2c73700..ad4ac72 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/util/MLTestingUtils.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/util/MLTestingUtils.scala
@@ -24,7 +24,6 @@ import org.apache.spark.ml.feature.{Instance, LabeledPoint}
 import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT}
 import org.apache.spark.ml.param.ParamMap
 import org.apache.spark.ml.param.shared.HasWeightCol
-import org.apache.spark.ml.recommendation.{ALS, ALSModel}
 import org.apache.spark.ml.tree.impl.TreeTests
 import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
 import org.apache.spark.sql.functions._
@@ -255,8 +254,8 @@ object MLTestingUtils extends SparkFunSuite {
    * one having double array "features" column with float precision, and one having float array
    * "features" column.
    */
-  def generateArrayFeatureDataset(dataset: Dataset[_],
-    featuresColName: String = "features"): (Dataset[_], Dataset[_], Dataset[_]) = {
+  def generateArrayFeatureDataset(dataset: DataFrame,
+    featuresColName: String = "features"): (DataFrame, DataFrame, DataFrame) = {
     val toFloatVectorUDF = udf { (features: Vector) =>
       Vectors.dense(features.toArray.map(_.toFloat.toDouble))}
     val toDoubleArrayUDF = udf { (features: Vector) => features.toArray}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/fpm/FPGrowthSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/fpm/FPGrowthSuite.scala
index dc44c58..20bd2e5 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/fpm/FPGrowthSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/fpm/FPGrowthSuite.scala
@@ -16,8 +16,6 @@
  */
 package org.apache.spark.mllib.fpm
 
-import scala.language.existentials
-
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.mllib.util.MLlibTestSparkContext
 import org.apache.spark.util.Utils
@@ -301,7 +299,7 @@ class FPGrowthSuite extends SparkFunSuite with MLlibTestSparkContext {
     val path = tempDir.toURI.toString
     try {
       model3.save(sc, path)
-      val newModel = FPGrowthModel.load(sc, path)
+      val newModel = FPGrowthModel.load(sc, path).asInstanceOf[FPGrowthModel[String]]
       val newFreqItemsets = newModel.freqItemsets.collect().map { itemset =>
         (itemset.items.toSet, itemset.freq)
       }
@@ -335,7 +333,7 @@ class FPGrowthSuite extends SparkFunSuite with MLlibTestSparkContext {
     val path = tempDir.toURI.toString
     try {
       model3.save(sc, path)
-      val newModel = FPGrowthModel.load(sc, path)
+      val newModel = FPGrowthModel.load(sc, path).asInstanceOf[FPGrowthModel[String]]
       val newFreqItemsets = newModel.freqItemsets.collect().map { itemset =>
         (itemset.items.toSet, itemset.freq)
       }
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/fpm/FPTreeSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/fpm/FPTreeSuite.scala
index a56d7b3..4e2548d 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/fpm/FPTreeSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/fpm/FPTreeSuite.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.mllib.fpm
 
-import scala.language.existentials
-
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.mllib.util.MLlibTestSparkContext
 
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala
index c2e08d0..a700706 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala
@@ -16,8 +16,6 @@
  */
 package org.apache.spark.mllib.fpm
 
-import scala.language.existentials
-
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.mllib.util.MLlibTestSparkContext
 import org.apache.spark.util.Utils
@@ -420,7 +418,9 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext {
     val path = tempDir.toURI.toString
     try {
       model.save(sc, path)
-      val newModel = PrefixSpanModel.load(sc, path)
+      // Save/loading this model results in item type "Object" even if in this case
+      // the objects are Integers -- not Int as in the original saved model.
+      val newModel = PrefixSpanModel.load(sc, path).asInstanceOf[PrefixSpanModel[AnyRef]]
       val originalSet = model.freqSequences.collect().map { x =>
         (x.sequence.map(_.toSet).toSeq, x.freq)
       }.toSet
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index deb53cf..d725ef5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -21,7 +21,6 @@ import java.util.Locale
 import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.mutable
-import scala.language.existentials
 import scala.reflect.ClassTag
 import scala.util.{Failure, Success, Try}
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index de3223e..95fad41 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -23,7 +23,6 @@ import java.util.{Map => JavaMap}
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
-import scala.language.existentials
 import scala.util.control.NonFatal
 
 import com.google.common.cache.{CacheBuilder, CacheLoader}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala
index 2b73b96..80999ef 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.expressions.codegen
 import java.lang.{Boolean => JBool}
 
 import scala.collection.mutable.ArrayBuffer
-import scala.language.{existentials, implicitConversions}
+import scala.language.implicitConversions
 
 import org.apache.spark.sql.catalyst.trees.TreeNode
 import org.apache.spark.sql.types.{BooleanType, DataType}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index 8182730..a6a48b6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -21,7 +21,6 @@ import java.lang.reflect.{Method, Modifier}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.Builder
-import scala.language.existentials
 import scala.reflect.ClassTag
 import scala.util.Try
 
@@ -30,7 +29,6 @@ import org.apache.spark.serializer._
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, ScalaReflection}
 import org.apache.spark.sql.catalyst.ScalaReflection.universe.TermName
-import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, UnresolvedException}
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen._
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
index 1d44cc3..74a9f13 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.catalyst.plans.logical
 
-import scala.language.existentials
-
 import org.apache.spark.api.java.function.FilterFunction
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.sql.{Encoder, Row}
@@ -29,7 +27,6 @@ import org.apache.spark.sql.catalyst.expressions.objects.Invoke
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode}
 import org.apache.spark.sql.types._
-import org.apache.spark.util.Utils
 
 object CatalystSerde {
   def deserialize[T : Encoder](child: LogicalPlan): DeserializeToObject = {
@@ -245,12 +242,12 @@ case class TypedFilter(
   }
 
   def typedCondition(input: Expression): Expression = {
-    val (funcClass, methodName) = func match {
-      case m: FilterFunction[_] => classOf[FilterFunction[_]] -> "call"
+    val funcMethod = func match {
+      case _: FilterFunction[_] => (classOf[FilterFunction[_]], "call")
       case _ => FunctionUtils.getFunctionOneName(BooleanType, input.dataType)
     }
-    val funcObj = Literal.create(func, ObjectType(funcClass))
-    Invoke(funcObj, methodName, BooleanType, input :: Nil)
+    val funcObj = Literal.create(func, ObjectType(funcMethod._1))
+    Invoke(funcObj, funcMethod._2, BooleanType, input :: Nil)
   }
 }
 
@@ -266,9 +263,9 @@ object FunctionUtils {
     }
   }
 
-  def getFunctionOneName(outputDT: DataType, inputDT: DataType): (Class[_], String) = {
-    // load "scala.Function1" using Java API to avoid requirements of type parameters
-    Utils.classForName("scala.Function1") -> {
+  def getFunctionOneName(outputDT: DataType, inputDT: DataType):
+      (Class[scala.Function1[_, _]], String) = {
+    classOf[scala.Function1[_, _]] -> {
       // if a pair of an argument and return types is one of specific types
       // whose specialized method (apply$mc..$sp) is generated by scalac,
       // Catalyst generated a direct method call to the specialized method.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala
index 0989af2..5df2af9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala
@@ -48,11 +48,10 @@ object ArrayBasedMapData {
    * @param valueConverter This function is applied over all the values of the input map to
    *                       obtain the output map's values
    */
-  def apply(
-      javaMap: JavaMap[_, _],
+  def apply[K, V](
+      javaMap: JavaMap[K, V],
       keyConverter: (Any) => Any,
       valueConverter: (Any) => Any): ArrayBasedMapData = {
-    import scala.language.existentials
 
     val keys: Array[Any] = new Array[Any](javaMap.size())
     val values: Array[Any] = new Array[Any](javaMap.size())
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
index c58f7a2..d26a73a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
@@ -180,7 +180,7 @@ object DataType {
     ("pyClass", _),
     ("sqlType", _),
     ("type", JString("udt"))) =>
-      Utils.classForName(udtClass).getConstructor().newInstance().asInstanceOf[UserDefinedType[_]]
+      Utils.classForName[UserDefinedType[_]](udtClass).getConstructor().newInstance()
 
     // Python UDT
     case JSortedObject(
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ObjectType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ObjectType.scala
index 6756b20..e79c0a4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ObjectType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ObjectType.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.types
 
-import scala.language.existentials
-
 import org.apache.spark.annotation.Evolving
 
 @Evolving
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
index 436675b..8520300 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.catalyst.expressions
 import java.sql.{Date, Timestamp}
 
 import scala.collection.JavaConverters._
-import scala.language.existentials
 import scala.reflect.ClassTag
 import scala.reflect.runtime.universe.TypeTag
 import scala.util.Random
@@ -316,7 +315,7 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
   }
 
   test("SPARK-23587: MapObjects should support interpreted execution") {
-    def testMapObjects(collection: Any, collectionCls: Class[_], inputType: DataType): Unit = {
+    def testMapObjects[T](collection: Any, collectionCls: Class[T], inputType: DataType): Unit = {
       val function = (lambda: Expression) => Add(lambda, Literal(1))
       val elementType = IntegerType
       val expected = Seq(2, 3, 4)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
index 83425de..f0ef6e1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
@@ -635,7 +635,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
   private[sql] def registerJava(name: String, className: String, returnDataType: DataType): Unit = {
 
     try {
-      val clazz = Utils.classForName(className)
+      val clazz = Utils.classForName[AnyRef](className)
       val udfInterfaces = clazz.getGenericInterfaces
         .filter(_.isInstanceOf[ParameterizedType])
         .map(_.asInstanceOf[ParameterizedType])
@@ -699,7 +699,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
    */
   private[sql] def registerJavaUDAF(name: String, className: String): Unit = {
     try {
-      val clazz = Utils.classForName(className)
+      val clazz = Utils.classForName[AnyRef](className)
       if (!classOf[UserDefinedAggregateFunction].isAssignableFrom(clazz)) {
         throw new AnalysisException(s"class $className doesn't implement interface UserDefinedAggregateFunction")
       }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala
index b757529..a2def6b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.execution.aggregate
 
-import scala.language.existentials
-
 import org.apache.spark.sql.Encoder
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.UnresolvedDeserializer
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 08650c8..52f30ac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources
 import java.util.{Locale, ServiceConfigurationError, ServiceLoader}
 
 import scala.collection.JavaConverters._
-import scala.language.{existentials, implicitConversions}
+import scala.language.implicitConversions
 import scala.util.{Failure, Success, Try}
 
 import org.apache.hadoop.conf.Configuration
@@ -31,7 +31,6 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogUtils}
-import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.execution.SparkPlan
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 81cc958..38c634e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -21,7 +21,6 @@ import java.util.Locale
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
-import scala.language.existentials
 
 import org.apache.spark.TestUtils.{assertNotSpilled, assertSpilled}
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -50,8 +49,9 @@ class JoinSuite extends QueryTest with SharedSQLContext {
     assert(planned.size === 1)
   }
 
-  def assertJoin(pair: (String, Class[_])): Any = {
-    val (sqlString, c) = pair
+  def assertJoin(pair: (String, Class[_ <: BinaryExecNode])): Any = {
+    val sqlString = pair._1
+    val c = pair._2
     val df = sql(sqlString)
     val physical = df.queryExecution.sparkPlan
     val operators = physical.collect {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
index 09793bd..edbc249 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
@@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.datasources.parquet
 
 import java.io.File
 
-import scala.language.existentials
-
 import org.apache.commons.io.FileUtils
 import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
 import org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER
@@ -186,12 +184,11 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS
                 assert(typeName === PrimitiveTypeName.INT96)
                 val oneBlockMeta = oneFooter.getBlocks().get(0)
                 val oneBlockColumnMeta = oneBlockMeta.getColumns().get(0)
-                val columnStats = oneBlockColumnMeta.getStatistics
                 // This is the important assert.  Column stats are written, but they are ignored
                 // when the data is read back as mentioned above, b/c int96 is unsigned.  This
                 // assert makes sure this holds even if we change parquet versions (if eg. there
                 // were ever statistics even on unsigned columns).
-                assert(!columnStats.hasNonNullValue)
+                assert(!oneBlockColumnMeta.getStatistics.hasNonNullValue)
               }
 
               // These queries should return the entire dataset with the conversion applied,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
index a538b94..daac207 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
@@ -19,8 +19,6 @@ package org.apache.spark.sql.sources
 
 import java.util.Locale
 
-import scala.language.existentials
-
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.expressions.PredicateHelper
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala
index c1eaf94..309591d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.sources
 
-import scala.language.existentials
-
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
 import org.apache.spark.sql.internal.SQLConf
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index ad11719..3f9925e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -94,7 +94,7 @@ class HadoopTableReader(
   override def makeRDDForTable(hiveTable: HiveTable): RDD[InternalRow] =
     makeRDDForTable(
       hiveTable,
-      Utils.classForName(tableDesc.getSerdeClassName).asInstanceOf[Class[Deserializer]],
+      Utils.classForName[Deserializer](tableDesc.getSerdeClassName),
       filterOpt = None)
 
   /**
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 640cca0..92a1120 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -922,11 +922,10 @@ private[hive] object HiveClientImpl {
   }
 
   private def toInputFormat(name: String) =
-    Utils.classForName(name).asInstanceOf[Class[_ <: org.apache.hadoop.mapred.InputFormat[_, _]]]
+    Utils.classForName[org.apache.hadoop.mapred.InputFormat[_, _]](name)
 
   private def toOutputFormat(name: String) =
-    Utils.classForName(name)
-      .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]
+    Utils.classForName[org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]](name)
 
   /**
    * Converts the native table metadata representation format CatalogTable to Hive's Table.
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
index 1825af6..24a67f9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.hive.execution
 
-import scala.language.existentials
-
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.hive.common.FileUtils
 import org.apache.hadoop.hive.ql.plan.TableDesc
@@ -30,7 +28,6 @@ import org.apache.spark.SparkException
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
-import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.hive.client.HiveClientImpl
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala
index 905cb52..e12f663 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala
@@ -407,8 +407,8 @@ case class HiveScriptIOSchema (
       columnTypes: Seq[DataType],
       serdeProps: Seq[(String, String)]): AbstractSerDe = {
 
-    val serde = Utils.classForName(serdeClassName).getConstructor().
-      newInstance().asInstanceOf[AbstractSerDe]
+    val serde = Utils.classForName[AbstractSerDe](serdeClassName).getConstructor().
+      newInstance()
 
     val columnTypesNames = columnTypes.map(_.toTypeInfo.getTypeName()).mkString(",")
 
@@ -428,8 +428,8 @@ case class HiveScriptIOSchema (
       inputStream: InputStream,
       conf: Configuration): Option[RecordReader] = {
     recordReaderClass.map { klass =>
-      val instance = Utils.classForName(klass).getConstructor().
-        newInstance().asInstanceOf[RecordReader]
+      val instance = Utils.classForName[RecordReader](klass).getConstructor().
+        newInstance()
       val props = new Properties()
       // Can not use props.putAll(outputSerdeProps.toMap.asJava) in scala-2.12
       // See https://github.com/scala/bug/issues/10418
@@ -441,8 +441,8 @@ case class HiveScriptIOSchema (
 
   def recordWriter(outputStream: OutputStream, conf: Configuration): Option[RecordWriter] = {
     recordWriterClass.map { klass =>
-      val instance = Utils.classForName(klass).getConstructor().
-        newInstance().asInstanceOf[RecordWriter]
+      val instance = Utils.classForName[RecordWriter](klass).getConstructor().
+        newInstance()
       instance.initialize(outputStream, conf)
       instance
     }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 5e97a05..a907fca 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -20,8 +20,6 @@ package org.apache.spark.sql.hive.execution
 import java.io.File
 import java.net.URI
 
-import scala.language.existentials
-
 import org.apache.hadoop.fs.Path
 import org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER
 import org.apache.parquet.hadoop.ParquetFileReader
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala
index eaedac1..b20ef03 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala
@@ -17,15 +17,12 @@
 
 package org.apache.spark.sql.hive.execution
 
-import scala.language.existentials
-
 import org.apache.hadoop.conf.Configuration
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.launcher.SparkLauncher
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.StaticSQLConf._
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
index fe0f875..88bfc88 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
@@ -46,7 +46,7 @@ class WindowedDStream[T: ClassTag](
 
   def windowDuration: Duration = _windowDuration
 
-  override def dependencies: List[DStream[_]] = List(parent)
+  override def dependencies: List[DStream[T]] = List(parent)
 
   override def slideDuration: Duration = _slideDuration
 
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlock.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlock.scala
index 8c3a797..a0b49e0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlock.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlock.scala
@@ -20,7 +20,6 @@ package org.apache.spark.streaming.receiver
 import java.nio.ByteBuffer
 
 import scala.collection.mutable.ArrayBuffer
-import scala.language.existentials
 
 /** Trait representing a received block */
 private[streaming] sealed trait ReceivedBlock
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index f8ddabd..eb70232 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -19,7 +19,6 @@ package org.apache.spark.streaming.receiver
 
 import scala.concurrent.{ExecutionContext, Future}
 import scala.concurrent.duration._
-import scala.language.existentials
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 4f3fa05..7091598 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -51,11 +51,11 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
     val clockClass = ssc.sc.conf.get(
       "spark.streaming.clock", "org.apache.spark.util.SystemClock")
     try {
-      Utils.classForName(clockClass).getConstructor().newInstance().asInstanceOf[Clock]
+      Utils.classForName[Clock](clockClass).getConstructor().newInstance()
     } catch {
       case e: ClassNotFoundException if clockClass.startsWith("org.apache.spark.streaming") =>
         val newClockClass = clockClass.replace("org.apache.spark.streaming", "org.apache.spark")
-        Utils.classForName(newClockClass).getConstructor().newInstance().asInstanceOf[Clock]
+        Utils.classForName[Clock](newClockClass).getConstructor().newInstance()
     }
   }
 
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 829254b..551d376 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -21,7 +21,6 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
 
 import scala.collection.mutable.HashMap
 import scala.concurrent.ExecutionContext
-import scala.language.existentials
 import scala.util.{Failure, Success}
 
 import org.apache.spark._
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
index 7542e2f..b0a4c98 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
@@ -132,8 +132,7 @@ private[streaming] object WriteAheadLogUtils extends Logging {
     }
     val wal = classNameOption.map { className =>
       try {
-        instantiateClass(
-          Utils.classForName(className).asInstanceOf[Class[_ <: WriteAheadLog]], sparkConf)
+        instantiateClass(Utils.classForName[WriteAheadLog](className), sparkConf)
       } catch {
         case NonFatal(e) =>
           throw new SparkException(s"Could not create a write ahead log of class $className", e)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 0a764f6..287a43a 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -20,7 +20,6 @@ package org.apache.spark.streaming
 import java.util.concurrent.ConcurrentLinkedQueue
 
 import scala.collection.mutable
-import scala.language.existentials
 import scala.reflect.ClassTag
 
 import org.scalatest.concurrent.Eventually.eventually
@@ -656,12 +655,12 @@ class BasicOperationsSuite extends TestSuiteBase {
 
     runCleanupTest(
         conf,
-        operation _,
+        operation,
         numExpectedOutput = cleanupTestInput.size / 2,
         rememberDuration = Seconds(3)) { operatedStream =>
       eventually(eventuallyTimeout) {
-        val windowedStream2 = operatedStream.asInstanceOf[WindowedDStream[_]]
-        val windowedStream1 = windowedStream2.dependencies.head.asInstanceOf[WindowedDStream[_]]
+        val windowedStream2 = operatedStream.asInstanceOf[WindowedDStream[Int]]
+        val windowedStream1 = windowedStream2.dependencies.head.asInstanceOf[WindowedDStream[Int]]
         val mappedStream = windowedStream1.dependencies.head
 
         // Checkpoint remember durations


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org