You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2019/04/29 16:02:20 UTC
[spark] branch master updated:
[SPARK-27536][CORE][ML][SQL][STREAMING] Remove most use of
scala.language.existentials
This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8a17d26 [SPARK-27536][CORE][ML][SQL][STREAMING] Remove most use of scala.language.existentials
8a17d26 is described below
commit 8a17d26784c53fb50b6373b566aab71135c8956f
Author: Sean Owen <se...@databricks.com>
AuthorDate: Mon Apr 29 11:02:01 2019 -0500
[SPARK-27536][CORE][ML][SQL][STREAMING] Remove most use of scala.language.existentials
## What changes were proposed in this pull request?
I want to get rid of as much use of `scala.language.existentials` as possible for 3.0. It's a complicated language feature that generates warnings unless this value is imported. It might even be on the way out of Scala: https://contributors.scala-lang.org/t/proposal-to-remove-existential-types-from-the-language/2785
For Spark, it comes up mostly where the code plays fast and loose with generic types, not the advanced situations you'll often see referenced where this feature is explained. For example, it comes up in cases where a function returns something like `(String, Class[_])`. Scala doesn't like matching this to any other instance of `(String, Class[_])` because doing so requires inferring the existence of some type that satisfies both. Seems obvious if the generic type is a wildcard, but, n [...]
This is a large PR, and it only gets rid of _most_ instances of `scala.language.existentials`. The change should be all compile-time and shouldn't affect APIs or logic.
Many of the changes simply touch up sloppiness about generic types, making the known correct value explicit in the code.
Some fixes involve being more explicit about the existence of generic types in methods. For instance, `def foo(arg: Class[_])` seems innocent enough but should really be declared `def foo[T](arg: Class[T])` to let Scala select and fix a single type when evaluating calls to `foo`.
For kind of surprising reasons, this comes up in places where code evaluates a tuple of things that involve a generic type, but is OK if the two parts of the tuple are evaluated separately.
One key change was altering `Utils.classForName(...): Class[_]` to the more correct `Utils.classForName[T](...): Class[T]`. This caused a number of small but positive changes to callers that otherwise had to cast the result.
In several tests, `Dataset[_]` was used where `DataFrame` seems to be the clear intent.
Finally, in a few cases in MLlib, the return type `this.type` was used where there are no subclasses of the class that uses it. This really isn't needed and causes issues for Scala reasoning about the return type. These are just changed to be concrete classes as return types.
After this change, we have only a few classes that still import `scala.language.existentials` (because modifying them would require extensive rewrites to fix) and no build warnings.
## How was this patch tested?
Existing tests.
Closes #24431 from srowen/SPARK-27536.
Authored-by: Sean Owen <se...@databricks.com>
Signed-off-by: Sean Owen <se...@databricks.com>
---
.../apache/spark/api/python/PythonHadoopUtil.scala | 13 +++--
.../org/apache/spark/api/python/PythonRDD.scala | 56 +++++++++++-----------
.../org/apache/spark/api/r/RBackendHandler.scala | 2 -
.../spark/deploy/history/HistoryServer.scala | 4 +-
.../spark/internal/io/FileCommitProtocol.scala | 2 +-
.../org/apache/spark/io/CompressionCodec.scala | 5 +-
.../org/apache/spark/metrics/MetricsSystem.scala | 19 +++++---
.../spark/network/netty/NettyBlockRpcServer.scala | 22 ++++-----
.../scala/org/apache/spark/rdd/CoGroupedRDD.scala | 1 -
.../scala/org/apache/spark/rdd/CoalescedRDD.scala | 1 -
.../org/apache/spark/scheduler/DAGScheduler.scala | 4 +-
.../apache/spark/scheduler/DAGSchedulerEvent.scala | 2 -
.../apache/spark/scheduler/ShuffleMapTask.scala | 7 ++-
.../apache/spark/security/SocketAuthServer.scala | 1 -
.../apache/spark/serializer/KryoSerializer.scala | 4 +-
.../org/apache/spark/util/ClosureCleaner.scala | 5 +-
.../main/scala/org/apache/spark/util/Utils.scala | 14 +++---
.../org/apache/spark/ContextCleanerSuite.scala | 10 ++--
.../test/scala/org/apache/spark/FileSuite.scala | 7 +--
.../scheduler/SchedulerIntegrationSuite.scala | 9 ++--
.../spark/util/MutableURLClassLoaderSuite.scala | 3 +-
.../kafka010/KafkaDelegationTokenProvider.scala | 1 -
.../org/apache/spark/kafka010/KafkaTokenUtil.scala | 5 +-
.../apache/spark/graphx/util/BytecodeUtils.scala | 8 ++--
.../ml/classification/LogisticRegression.scala | 2 +-
.../apache/spark/ml/feature/StringIndexer.scala | 2 -
.../apache/spark/ml/feature/VectorAssembler.scala | 13 +++--
.../org/apache/spark/ml/image/HadoopUtils.scala | 7 ++-
.../org/apache/spark/ml/recommendation/ALS.scala | 4 +-
.../spark/mllib/api/python/PythonMLLibAPI.scala | 1 -
.../scala/org/apache/spark/mllib/fpm/FPTree.scala | 4 +-
.../classification/LogisticRegressionSuite.scala | 13 +++--
.../spark/ml/clustering/BisectingKMeansSuite.scala | 11 ++---
.../spark/ml/clustering/GaussianMixtureSuite.scala | 14 +++---
.../apache/spark/ml/clustering/KMeansSuite.scala | 3 +-
.../org/apache/spark/ml/clustering/LDASuite.scala | 7 +--
.../ml/evaluation/ClusteringEvaluatorSuite.scala | 10 ++--
.../apache/spark/ml/recommendation/ALSSuite.scala | 1 -
.../org/apache/spark/ml/util/MLTestingUtils.scala | 5 +-
.../org/apache/spark/mllib/fpm/FPGrowthSuite.scala | 6 +--
.../org/apache/spark/mllib/fpm/FPTreeSuite.scala | 2 -
.../apache/spark/mllib/fpm/PrefixSpanSuite.scala | 6 +--
.../sql/catalyst/analysis/FunctionRegistry.scala | 1 -
.../expressions/codegen/CodeGenerator.scala | 1 -
.../catalyst/expressions/codegen/javaCode.scala | 2 +-
.../sql/catalyst/expressions/objects/objects.scala | 2 -
.../spark/sql/catalyst/plans/logical/object.scala | 17 +++----
.../sql/catalyst/util/ArrayBasedMapData.scala | 5 +-
.../org/apache/spark/sql/types/DataType.scala | 2 +-
.../org/apache/spark/sql/types/ObjectType.scala | 2 -
.../expressions/ObjectExpressionsSuite.scala | 3 +-
.../org/apache/spark/sql/UDFRegistration.scala | 4 +-
.../aggregate/TypedAggregateExpression.scala | 2 -
.../sql/execution/datasources/DataSource.scala | 3 +-
.../scala/org/apache/spark/sql/JoinSuite.scala | 6 +--
.../parquet/ParquetInteroperabilitySuite.scala | 5 +-
.../spark/sql/sources/FilteredScanSuite.scala | 2 -
.../apache/spark/sql/sources/PrunedScanSuite.scala | 2 -
.../org/apache/spark/sql/hive/TableReader.scala | 2 +-
.../spark/sql/hive/client/HiveClientImpl.scala | 5 +-
.../hive/execution/InsertIntoHiveDirCommand.scala | 3 --
.../hive/execution/ScriptTransformationExec.scala | 12 ++---
.../spark/sql/hive/execution/HiveDDLSuite.scala | 2 -
.../sql/hive/execution/Hive_2_1_DDLSuite.scala | 3 --
.../spark/streaming/dstream/WindowedDStream.scala | 2 +-
.../spark/streaming/receiver/ReceivedBlock.scala | 1 -
.../streaming/receiver/ReceivedBlockHandler.scala | 1 -
.../spark/streaming/scheduler/JobGenerator.scala | 4 +-
.../streaming/scheduler/ReceiverTracker.scala | 1 -
.../spark/streaming/util/WriteAheadLogUtils.scala | 3 +-
.../spark/streaming/BasicOperationsSuite.scala | 7 ++-
71 files changed, 180 insertions(+), 241 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
index 2ab8add..a4817b3 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
@@ -33,18 +33,17 @@ import org.apache.spark.util.{SerializableConfiguration, Utils}
* A trait for use with reading custom classes in PySpark. Implement this trait and add custom
* transformation code by overriding the convert method.
*/
-trait Converter[T, + U] extends Serializable {
+trait Converter[-T, +U] extends Serializable {
def convert(obj: T): U
}
private[python] object Converter extends Logging {
- def getInstance(converterClass: Option[String],
- defaultConverter: Converter[Any, Any]): Converter[Any, Any] = {
+ def getInstance[T, U](converterClass: Option[String],
+ defaultConverter: Converter[_ >: T, _ <: U]): Converter[T, U] = {
converterClass.map { cc =>
Try {
- val c = Utils.classForName(cc).getConstructor().
- newInstance().asInstanceOf[Converter[Any, Any]]
+ val c = Utils.classForName[Converter[T, U]](cc).getConstructor().newInstance()
logInfo(s"Loaded converter: $cc")
c
} match {
@@ -177,8 +176,8 @@ private[python] object PythonHadoopUtil {
* [[org.apache.hadoop.io.Writable]], into an RDD of base types, or vice versa.
*/
def convertRDD[K, V](rdd: RDD[(K, V)],
- keyConverter: Converter[Any, Any],
- valueConverter: Converter[Any, Any]): RDD[(Any, Any)] = {
+ keyConverter: Converter[K, Any],
+ valueConverter: Converter[V, Any]): RDD[(Any, Any)] = {
rdd.map { case (k, v) => (keyConverter.convert(k), valueConverter.convert(v)) }
}
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 5b492b1..182c383 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -24,10 +24,6 @@ import java.util.{ArrayList => JArrayList, List => JList, Map => JMap}
import scala.collection.JavaConverters._
import scala.collection.mutable
-import scala.concurrent.Promise
-import scala.concurrent.duration.Duration
-import scala.language.existentials
-import scala.util.Try
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.compress.CompressionCodec
@@ -228,8 +224,8 @@ private[spark] object PythonRDD extends Logging {
batchSize: Int): JavaRDD[Array[Byte]] = {
val keyClass = Option(keyClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
val valueClass = Option(valueClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
- val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
- val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
+ val kc = Utils.classForName[K](keyClass)
+ val vc = Utils.classForName[V](valueClass)
val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration()))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
@@ -296,9 +292,9 @@ private[spark] object PythonRDD extends Logging {
keyClass: String,
valueClass: String,
conf: Configuration): RDD[(K, V)] = {
- val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
- val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
- val fc = Utils.classForName(inputFormatClass).asInstanceOf[Class[F]]
+ val kc = Utils.classForName[K](keyClass)
+ val vc = Utils.classForName[V](valueClass)
+ val fc = Utils.classForName[F](inputFormatClass)
if (path.isDefined) {
sc.sc.newAPIHadoopFile[K, V, F](path.get, fc, kc, vc, conf)
} else {
@@ -365,9 +361,9 @@ private[spark] object PythonRDD extends Logging {
keyClass: String,
valueClass: String,
conf: Configuration) = {
- val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
- val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
- val fc = Utils.classForName(inputFormatClass).asInstanceOf[Class[F]]
+ val kc = Utils.classForName[K](keyClass)
+ val vc = Utils.classForName[V](valueClass)
+ val fc = Utils.classForName[F](inputFormatClass)
if (path.isDefined) {
sc.sc.hadoopFile(path.get, fc, kc, vc)
} else {
@@ -425,29 +421,33 @@ private[spark] object PythonRDD extends Logging {
PythonHadoopUtil.mergeConfs(baseConf, conf)
}
- private def inferKeyValueTypes[K, V](rdd: RDD[(K, V)], keyConverterClass: String = null,
- valueConverterClass: String = null): (Class[_], Class[_]) = {
+ private def inferKeyValueTypes[K, V, KK, VV](rdd: RDD[(K, V)], keyConverterClass: String = null,
+ valueConverterClass: String = null): (Class[_ <: KK], Class[_ <: VV]) = {
// Peek at an element to figure out key/value types. Since Writables are not serializable,
// we cannot call first() on the converted RDD. Instead, we call first() on the original RDD
// and then convert locally.
val (key, value) = rdd.first()
- val (kc, vc) = getKeyValueConverters(keyConverterClass, valueConverterClass,
- new JavaToWritableConverter)
+ val (kc, vc) = getKeyValueConverters[K, V, KK, VV](
+ keyConverterClass, valueConverterClass, new JavaToWritableConverter)
(kc.convert(key).getClass, vc.convert(value).getClass)
}
- private def getKeyValueTypes(keyClass: String, valueClass: String):
- Option[(Class[_], Class[_])] = {
+ private def getKeyValueTypes[K, V](keyClass: String, valueClass: String):
+ Option[(Class[K], Class[V])] = {
for {
k <- Option(keyClass)
v <- Option(valueClass)
} yield (Utils.classForName(k), Utils.classForName(v))
}
- private def getKeyValueConverters(keyConverterClass: String, valueConverterClass: String,
- defaultConverter: Converter[Any, Any]): (Converter[Any, Any], Converter[Any, Any]) = {
- val keyConverter = Converter.getInstance(Option(keyConverterClass), defaultConverter)
- val valueConverter = Converter.getInstance(Option(valueConverterClass), defaultConverter)
+ private def getKeyValueConverters[K, V, KK, VV](
+ keyConverterClass: String,
+ valueConverterClass: String,
+ defaultConverter: Converter[_, _]): (Converter[K, KK], Converter[V, VV]) = {
+ val keyConverter = Converter.getInstance(Option(keyConverterClass),
+ defaultConverter.asInstanceOf[Converter[K, KK]])
+ val valueConverter = Converter.getInstance(Option(valueConverterClass),
+ defaultConverter.asInstanceOf[Converter[V, VV]])
(keyConverter, valueConverter)
}
@@ -459,7 +459,7 @@ private[spark] object PythonRDD extends Logging {
keyConverterClass: String,
valueConverterClass: String,
defaultConverter: Converter[Any, Any]): RDD[(Any, Any)] = {
- val (kc, vc) = getKeyValueConverters(keyConverterClass, valueConverterClass,
+ val (kc, vc) = getKeyValueConverters[K, V, Any, Any](keyConverterClass, valueConverterClass,
defaultConverter)
PythonHadoopUtil.convertRDD(rdd, kc, vc)
}
@@ -470,7 +470,7 @@ private[spark] object PythonRDD extends Logging {
* [[org.apache.hadoop.io.Writable]] types already, since Writables are not Java
* `Serializable` and we can't peek at them. The `path` can be on any Hadoop file system.
*/
- def saveAsSequenceFile[K, V, C <: CompressionCodec](
+ def saveAsSequenceFile[C <: CompressionCodec](
pyRDD: JavaRDD[Array[Byte]],
batchSerialized: Boolean,
path: String,
@@ -489,7 +489,7 @@ private[spark] object PythonRDD extends Logging {
* `confAsMap` is merged with the default Hadoop conf associated with the SparkContext of
* this RDD.
*/
- def saveAsHadoopFile[K, V, F <: OutputFormat[_, _], C <: CompressionCodec](
+ def saveAsHadoopFile[F <: OutputFormat[_, _], C <: CompressionCodec](
pyRDD: JavaRDD[Array[Byte]],
batchSerialized: Boolean,
path: String,
@@ -507,7 +507,7 @@ private[spark] object PythonRDD extends Logging {
val codec = Option(compressionCodecClass).map(Utils.classForName(_).asInstanceOf[Class[C]])
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new JavaToWritableConverter)
- val fc = Utils.classForName(outputFormatClass).asInstanceOf[Class[F]]
+ val fc = Utils.classForName[F](outputFormatClass)
converted.saveAsHadoopFile(path, kc, vc, fc, new JobConf(mergedConf), codec = codec)
}
@@ -520,7 +520,7 @@ private[spark] object PythonRDD extends Logging {
* `confAsMap` is merged with the default Hadoop conf associated with the SparkContext of
* this RDD.
*/
- def saveAsNewAPIHadoopFile[K, V, F <: NewOutputFormat[_, _]](
+ def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]](
pyRDD: JavaRDD[Array[Byte]],
batchSerialized: Boolean,
path: String,
@@ -548,7 +548,7 @@ private[spark] object PythonRDD extends Logging {
* (mapred vs. mapreduce). Keys/values are converted for output using either user specified
* converters or, by default, [[org.apache.spark.api.python.JavaToWritableConverter]].
*/
- def saveAsHadoopDataset[K, V](
+ def saveAsHadoopDataset(
pyRDD: JavaRDD[Array[Byte]],
batchSerialized: Boolean,
confAsMap: java.util.HashMap[String, String],
diff --git a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
index aaa432d..f2f81b1 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
@@ -20,8 +20,6 @@ package org.apache.spark.api.r
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream}
import java.util.concurrent.TimeUnit
-import scala.language.existentials
-
import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler}
import io.netty.channel.ChannelHandler.Sharable
import io.netty.handler.timeout.ReadTimeoutException
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index 7c9ce14..7df36c5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -34,7 +34,6 @@ import org.apache.spark.internal.config.History
import org.apache.spark.internal.config.UI._
import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, UIRoot}
import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
-import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.{ShutdownHookManager, SystemClock, Utils}
/**
@@ -274,10 +273,9 @@ object HistoryServer extends Logging {
val providerName = conf.get(History.PROVIDER)
.getOrElse(classOf[FsHistoryProvider].getName())
- val provider = Utils.classForName(providerName)
+ val provider = Utils.classForName[ApplicationHistoryProvider](providerName)
.getConstructor(classOf[SparkConf])
.newInstance(conf)
- .asInstanceOf[ApplicationHistoryProvider]
val port = conf.get(History.HISTORY_SERVER_UI_PORT)
diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
index e6e9c9e..8540938 100644
--- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
@@ -149,7 +149,7 @@ object FileCommitProtocol extends Logging {
logDebug(s"Creating committer $className; job $jobId; output=$outputPath;" +
s" dynamic=$dynamicPartitionOverwrite")
- val clazz = Utils.classForName(className).asInstanceOf[Class[FileCommitProtocol]]
+ val clazz = Utils.classForName[FileCommitProtocol](className)
// First try the constructor with arguments (jobId: String, outputPath: String,
// dynamicPartitionOverwrite: Boolean).
// If that doesn't exist, try the one with (jobId: string, outputPath: String).
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 288c0d1..065f05e 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -77,8 +77,9 @@ private[spark] object CompressionCodec {
val codecClass =
shortCompressionCodecNames.getOrElse(codecName.toLowerCase(Locale.ROOT), codecName)
val codec = try {
- val ctor = Utils.classForName(codecClass).getConstructor(classOf[SparkConf])
- Some(ctor.newInstance(conf).asInstanceOf[CompressionCodec])
+ val ctor =
+ Utils.classForName[CompressionCodec](codecClass).getConstructor(classOf[SparkConf])
+ Some(ctor.newInstance(conf))
} catch {
case _: ClassNotFoundException | _: IllegalArgumentException => None
}
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index 8dad42b..c96640a 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -179,8 +179,8 @@ private[spark] class MetricsSystem private (
sourceConfigs.foreach { kv =>
val classPath = kv._2.getProperty("class")
try {
- val source = Utils.classForName(classPath).getConstructor().newInstance()
- registerSource(source.asInstanceOf[Source])
+ val source = Utils.classForName[Source](classPath).getConstructor().newInstance()
+ registerSource(source)
} catch {
case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e)
}
@@ -195,13 +195,18 @@ private[spark] class MetricsSystem private (
val classPath = kv._2.getProperty("class")
if (null != classPath) {
try {
- val sink = Utils.classForName(classPath)
- .getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
- .newInstance(kv._2, registry, securityMgr)
if (kv._1 == "servlet") {
- metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
+ val servlet = Utils.classForName[MetricsServlet](classPath)
+ .getConstructor(
+ classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
+ .newInstance(kv._2, registry, securityMgr)
+ metricsServlet = Some(servlet)
} else {
- sinks += sink.asInstanceOf[Sink]
+ val sink = Utils.classForName[Sink](classPath)
+ .getConstructor(
+ classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
+ .newInstance(kv._2, registry, securityMgr)
+ sinks += sink
}
} catch {
case e: Exception =>
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
index 27f4f94..55e7109 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
@@ -20,7 +20,6 @@ package org.apache.spark.network.netty
import java.nio.ByteBuffer
import scala.collection.JavaConverters._
-import scala.language.existentials
import scala.reflect.ClassTag
import org.apache.spark.internal.Logging
@@ -66,12 +65,7 @@ class NettyBlockRpcServer(
case uploadBlock: UploadBlock =>
// StorageLevel and ClassTag are serialized as bytes using our JavaSerializer.
- val (level: StorageLevel, classTag: ClassTag[_]) = {
- serializer
- .newInstance()
- .deserialize(ByteBuffer.wrap(uploadBlock.metadata))
- .asInstanceOf[(StorageLevel, ClassTag[_])]
- }
+ val (level, classTag) = deserializeMetadata(uploadBlock.metadata)
val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData))
val blockId = BlockId(uploadBlock.blockId)
logDebug(s"Receiving replicated block $blockId with level ${level} " +
@@ -87,12 +81,7 @@ class NettyBlockRpcServer(
responseContext: RpcResponseCallback): StreamCallbackWithID = {
val message =
BlockTransferMessage.Decoder.fromByteBuffer(messageHeader).asInstanceOf[UploadBlockStream]
- val (level: StorageLevel, classTag: ClassTag[_]) = {
- serializer
- .newInstance()
- .deserialize(ByteBuffer.wrap(message.metadata))
- .asInstanceOf[(StorageLevel, ClassTag[_])]
- }
+ val (level, classTag) = deserializeMetadata(message.metadata)
val blockId = BlockId(message.blockId)
logDebug(s"Receiving replicated block $blockId with level ${level} as stream " +
s"from ${client.getSocketAddress}")
@@ -101,5 +90,12 @@ class NettyBlockRpcServer(
blockManager.putBlockDataAsStream(blockId, level, classTag)
}
+ private def deserializeMetadata[T](metadata: Array[Byte]): (StorageLevel, ClassTag[T]) = {
+ serializer
+ .newInstance()
+ .deserialize(ByteBuffer.wrap(metadata))
+ .asInstanceOf[(StorageLevel, ClassTag[T])]
+ }
+
override def getStreamManager(): StreamManager = streamManager
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 7e76731..909f585 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -20,7 +20,6 @@ package org.apache.spark.rdd
import java.io.{IOException, ObjectOutputStream}
import scala.collection.mutable.ArrayBuffer
-import scala.language.existentials
import scala.reflect.ClassTag
import org.apache.spark._
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index 44bc40b..836d3e2 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -21,7 +21,6 @@ import java.io.{IOException, ObjectOutputStream}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
-import scala.language.existentials
import scala.reflect.ClassTag
import org.apache.spark._
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index af11a31..b817eb6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -26,7 +26,6 @@ import scala.annotation.tailrec
import scala.collection.Map
import scala.collection.mutable.{ArrayStack, HashMap, HashSet}
import scala.concurrent.duration._
-import scala.language.existentials
import scala.util.control.NonFatal
import org.apache.commons.lang3.SerializationUtils
@@ -382,7 +381,8 @@ private[spark] class DAGScheduler(
* locations that are still available from the previous shuffle to avoid unnecessarily
* regenerating data.
*/
- def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
+ def createShuffleMapStage[K, V, C](
+ shuffleDep: ShuffleDependency[K, V, C], jobId: Int): ShuffleMapStage = {
val rdd = shuffleDep.rdd
checkBarrierStageWithDynamicAllocation(rdd)
checkBarrierStageWithNumSlots(rdd)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index 54ab8f8..b514c2e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -19,8 +19,6 @@ package org.apache.spark.scheduler
import java.util.Properties
-import scala.language.existentials
-
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.util.{AccumulatorV2, CallSite}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index 189e35e..710f5eb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -21,13 +21,10 @@ import java.lang.management.ManagementFactory
import java.nio.ByteBuffer
import java.util.Properties
-import scala.language.existentials
-
import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
-import org.apache.spark.shuffle.ShuffleWriter
/**
* A ShuffleMapTask divides the elements of an RDD into multiple buckets (based on a partitioner
@@ -85,13 +82,15 @@ private[spark] class ShuffleMapTask(
threadMXBean.getCurrentThreadCpuTime
} else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
- val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
+ val rddAndDep = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L
+ val rdd = rddAndDep._1
+ val dep = rddAndDep._2
dep.shuffleWriterProcessor.write(rdd, dep, partitionId, context, partition)
}
diff --git a/core/src/main/scala/org/apache/spark/security/SocketAuthServer.scala b/core/src/main/scala/org/apache/spark/security/SocketAuthServer.scala
index c65c8fd..e616d23 100644
--- a/core/src/main/scala/org/apache/spark/security/SocketAuthServer.scala
+++ b/core/src/main/scala/org/apache/spark/security/SocketAuthServer.scala
@@ -21,7 +21,6 @@ import java.net.{InetAddress, ServerSocket, Socket}
import scala.concurrent.Promise
import scala.concurrent.duration.Duration
-import scala.language.existentials
import scala.util.Try
import org.apache.spark.SparkEnv
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index c426095..3969106 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -164,8 +164,8 @@ class KryoSerializer(conf: SparkConf)
}
// Allow the user to register their own classes by setting spark.kryo.registrator.
userRegistrators
- .map(Utils.classForName(_, noSparkClassLoader = true).getConstructor().
- newInstance().asInstanceOf[KryoRegistrator])
+ .map(Utils.classForName[KryoRegistrator](_, noSparkClassLoader = true).
+ getConstructor().newInstance())
.foreach { reg => reg.registerClasses(kryo) }
} catch {
case e: Exception =>
diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
index df696a3..6d6ef5a 100644
--- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
@@ -21,7 +21,6 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import java.lang.invoke.SerializedLambda
import scala.collection.mutable.{Map, Set, Stack}
-import scala.language.existentials
import org.apache.xbean.asm7.{ClassReader, ClassVisitor, MethodVisitor, Type}
import org.apache.xbean.asm7.Opcodes._
@@ -309,7 +308,9 @@ private[spark] object ClosureCleaner extends Logging {
var outerPairs: List[(Class[_], AnyRef)] = outerClasses.zip(outerObjects).reverse
var parent: AnyRef = null
if (outerPairs.nonEmpty) {
- val (outermostClass, outermostObject) = outerPairs.head
+ val outermostClass = outerPairs.head._1
+ val outermostObject = outerPairs.head._2
+
if (isClosure(outermostClass)) {
logDebug(s" + outermost object is a closure, so we clone it: ${outermostClass}")
} else if (outermostClass.getName.startsWith("$line")) {
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 7c8648d..bed5086 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -197,14 +197,15 @@ private[spark] object Utils extends Logging {
* Preferred alternative to Class.forName(className), as well as
* Class.forName(className, initialize, loader) with current thread's ContextClassLoader.
*/
- def classForName(
+ def classForName[C](
className: String,
initialize: Boolean = true,
- noSparkClassLoader: Boolean = false): Class[_] = {
+ noSparkClassLoader: Boolean = false): Class[C] = {
if (!noSparkClassLoader) {
- Class.forName(className, initialize, getContextOrSparkClassLoader)
+ Class.forName(className, initialize, getContextOrSparkClassLoader).asInstanceOf[Class[C]]
} else {
- Class.forName(className, initialize, Thread.currentThread().getContextClassLoader)
+ Class.forName(className, initialize, Thread.currentThread().getContextClassLoader).
+ asInstanceOf[Class[C]]
}
// scalastyle:on classforname
}
@@ -2680,10 +2681,11 @@ private[spark] object Utils extends Logging {
* other state) and decide they do not need to be added. A log message is printed in that case.
* Other exceptions are bubbled up.
*/
- def loadExtensions[T](extClass: Class[T], classes: Seq[String], conf: SparkConf): Seq[T] = {
+ def loadExtensions[T <: AnyRef](
+ extClass: Class[T], classes: Seq[String], conf: SparkConf): Seq[T] = {
classes.flatMap { name =>
try {
- val klass = classForName(name)
+ val klass = classForName[T](name)
require(extClass.isAssignableFrom(klass),
s"$name is not a subclass of ${extClass.getName()}.")
diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
index c16e227..6a30a1d 100644
--- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
@@ -21,7 +21,6 @@ import java.lang.ref.WeakReference
import java.util.concurrent.TimeUnit
import scala.collection.mutable.HashSet
-import scala.language.existentials
import scala.util.Random
import org.scalatest.BeforeAndAfter
@@ -70,10 +69,11 @@ abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[So
protected def newShuffleRDD() = newPairRDD().reduceByKey(_ + _)
protected def newBroadcast() = sc.broadcast(1 to 100)
- protected def newRDDWithShuffleDependencies(): (RDD[_], Seq[ShuffleDependency[_, _, _]]) = {
- def getAllDependencies(rdd: RDD[_]): Seq[Dependency[_]] = {
+ protected def newRDDWithShuffleDependencies():
+ (RDD[(Int, Int)], Seq[ShuffleDependency[Int, Int, Int]]) = {
+ def getAllDependencies(rdd: RDD[(Int, Int)]): Seq[Dependency[_]] = {
rdd.dependencies ++ rdd.dependencies.flatMap { dep =>
- getAllDependencies(dep.rdd)
+ getAllDependencies(dep.rdd.asInstanceOf[RDD[(Int, Int)]])
}
}
val rdd = newShuffleRDD()
@@ -81,7 +81,7 @@ abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[So
// Get all the shuffle dependencies
val shuffleDeps = getAllDependencies(rdd)
.filter(_.isInstanceOf[ShuffleDependency[_, _, _]])
- .map(_.asInstanceOf[ShuffleDependency[_, _, _]])
+ .map(_.asInstanceOf[ShuffleDependency[Int, Int, Int]])
(rdd, shuffleDeps)
}
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 8f212f6..6651e38 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -35,7 +35,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInp
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
import org.apache.spark.internal.config._
-import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD, RDD}
+import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
@@ -206,8 +206,9 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
Utils.withContextClassLoader(loader) {
sc = new SparkContext("local", "test")
- val objs = sc.makeRDD(1 to 3).map { x =>
- Utils.classForName(className, noSparkClassLoader = true).getConstructor().newInstance()
+ val objs = sc.makeRDD(1 to 3).map { _ =>
+ Utils.classForName[AnyRef](className, noSparkClassLoader = true).
+ getConstructor().newInstance()
}
val outputDir = new File(tempDir, "output").getAbsolutePath
objs.saveAsObjectFile(outputDir)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
index 76be693..577d77e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
@@ -23,7 +23,6 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.Future
import scala.concurrent.duration.{Duration, SECONDS}
-import scala.language.existentials
import scala.reflect.ClassTag
import org.scalactic.TripleEquals
@@ -52,7 +51,7 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
var taskScheduler: TestTaskScheduler = null
var scheduler: DAGScheduler = null
var backend: T = _
- // Even though the tests aren't doing much, occassionally we see flakiness from pauses over
+ // Even though the tests aren't doing much, occasionally we see flakiness from pauses over
// a second (probably from GC?) so we leave a long timeout in here
val duration = Duration(10, SECONDS)
@@ -297,11 +296,11 @@ private[spark] abstract class MockBackend(
* Test backends should call this to get a task that has been assigned to them by the scheduler.
* Each task should be responded to with either [[taskSuccess]] or [[taskFailed]].
*/
- def beginTask(): (TaskDescription, Task[_]) = {
+ def beginTask[T](): (TaskDescription, Task[T]) = {
synchronized {
val toRun = assignedTasksWaitingToRun.remove(assignedTasksWaitingToRun.size - 1)
runningTasks += toRun._1.taskId
- toRun
+ toRun.asInstanceOf[(TaskDescription, Task[T])]
}
}
@@ -589,7 +588,7 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
backend.taskSuccess(taskDescription, 4321 + partition)
}
}
- withBackend(runBackend _) {
+ withBackend(() => runBackend()) {
val jobFuture = submit(d, (0 until 30).toArray)
awaitJobTermination(jobFuture, duration)
}
diff --git a/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala b/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala
index 8f38ee3..597e0b9 100644
--- a/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/MutableURLClassLoaderSuite.scala
@@ -134,7 +134,8 @@ class MutableURLClassLoaderSuite extends SparkFunSuite with Matchers {
try {
sc.makeRDD(1 to 5, 2).mapPartitions { x =>
- Utils.classForName(className, noSparkClassLoader = true).getConstructor().newInstance()
+ Utils.classForName[AnyRef](className, noSparkClassLoader = true).
+ getConstructor().newInstance()
Seq().iterator
}.count()
}
diff --git a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaDelegationTokenProvider.scala b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaDelegationTokenProvider.scala
index cba4b40..da12e51 100644
--- a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaDelegationTokenProvider.scala
+++ b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaDelegationTokenProvider.scala
@@ -17,7 +17,6 @@
package org.apache.spark.kafka010
-import scala.language.existentials
import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
diff --git a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
index e0825e5..3f9a593 100644
--- a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
+++ b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
@@ -24,7 +24,7 @@ import scala.util.control.NonFatal
import org.apache.hadoop.io.Text
import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import org.apache.hadoop.security.token.Token
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions}
@@ -47,7 +47,8 @@ private[spark] object KafkaTokenUtil extends Logging {
override def getKind: Text = TOKEN_KIND
}
- private[kafka010] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = {
+ private[kafka010] def obtainToken(sparkConf: SparkConf):
+ (Token[KafkaDelegationTokenIdentifier], Long) = {
checkProxyUser()
val adminClient = AdminClient.create(createAdminClientProperties(sparkConf))
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala
index 4ea09ec..5ece5ae 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/util/BytecodeUtils.scala
@@ -20,7 +20,6 @@ package org.apache.spark.graphx.util
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import scala.collection.mutable.HashSet
-import scala.language.existentials
import org.apache.xbean.asm7.{ClassReader, ClassVisitor, MethodVisitor}
import org.apache.xbean.asm7.Opcodes._
@@ -48,7 +47,7 @@ private[graphx] object BytecodeUtils {
return true
}
}
- return false
+ false
}
}
@@ -59,7 +58,8 @@ private[graphx] object BytecodeUtils {
var stack = List[(Class[_], String)]((cls, method))
while (stack.nonEmpty) {
- val (c, m) = stack.head
+ val c = stack.head._1
+ val m = stack.head._2
stack = stack.tail
seen.add((c, m))
val finder = new MethodInvocationFinder(c.getName, m)
@@ -72,7 +72,7 @@ private[graphx] object BytecodeUtils {
}
}
}
- return false
+ false
}
/**
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
index c0f2366..7790de0 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
@@ -1290,7 +1290,7 @@ private[ml] class MultiClassSummarizer extends Serializable {
* @param weight The weight of this instances.
* @return This MultilabelSummarizer
*/
- def add(label: Double, weight: Double = 1.0): this.type = {
+ def add(label: Double, weight: Double = 1.0): MultiClassSummarizer = {
require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0")
if (weight == 0.0) return this
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
index f2e6012..94f40c3 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
@@ -17,8 +17,6 @@
package org.apache.spark.ml.feature
-import scala.language.existentials
-
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala
index 57e23d5..e6e9bdf 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala
@@ -20,7 +20,6 @@ package org.apache.spark.ml.feature
import java.util.NoSuchElementException
import scala.collection.mutable
-import scala.language.existentials
import org.apache.spark.SparkException
import org.apache.spark.annotation.Since
@@ -131,14 +130,14 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String)
throw new SparkException(s"VectorAssembler does not support the $otherType type")
}
}
- val featureAttributes = featureAttributesMap.flatten[Attribute].toArray
- val lengths = featureAttributesMap.map(a => a.length).toArray
+ val featureAttributes = featureAttributesMap.flatten[Attribute]
+ val lengths = featureAttributesMap.map(a => a.length)
val metadata = new AttributeGroup($(outputCol), featureAttributes).toMetadata()
- val (filteredDataset, keepInvalid) = $(handleInvalid) match {
- case VectorAssembler.SKIP_INVALID => (dataset.na.drop($(inputCols)), false)
- case VectorAssembler.KEEP_INVALID => (dataset, true)
- case VectorAssembler.ERROR_INVALID => (dataset, false)
+ val filteredDataset = $(handleInvalid) match {
+ case VectorAssembler.SKIP_INVALID => dataset.na.drop($(inputCols))
+ case VectorAssembler.KEEP_INVALID | VectorAssembler.ERROR_INVALID => dataset
}
+ val keepInvalid = $(handleInvalid) == VectorAssembler.KEEP_INVALID
// Data transformation.
val assembleFunc = udf { r: Row =>
VectorAssembler.assemble(lengths, keepInvalid)(r.toSeq: _*)
diff --git a/mllib/src/main/scala/org/apache/spark/ml/image/HadoopUtils.scala b/mllib/src/main/scala/org/apache/spark/ml/image/HadoopUtils.scala
index 1fae1dc..faa8589 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/image/HadoopUtils.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/image/HadoopUtils.scala
@@ -17,7 +17,6 @@
package org.apache.spark.ml.image
-import scala.language.existentials
import scala.util.Random
import org.apache.commons.io.FilenameUtils
@@ -103,7 +102,7 @@ private object SamplePathFilter {
// scalastyle:off hadoopconfiguration
val hadoopConf = spark.sparkContext.hadoopConfiguration
// scalastyle:on hadoopconfiguration
- val old = Option(hadoopConf.getClass(flagName, null))
+ val old = hadoopConf.getClass(flagName, null)
hadoopConf.setDouble(SamplePathFilter.ratioParam, sampleRatio)
hadoopConf.setLong(SamplePathFilter.seedParam, seed)
hadoopConf.setClass(flagName, classOf[SamplePathFilter], classOf[PathFilter])
@@ -111,8 +110,8 @@ private object SamplePathFilter {
hadoopConf.unset(SamplePathFilter.ratioParam)
hadoopConf.unset(SamplePathFilter.seedParam)
old match {
- case Some(v) => hadoopConf.setClass(flagName, v, classOf[PathFilter])
- case None => hadoopConf.unset(flagName)
+ case null => hadoopConf.unset(flagName)
+ case v => hadoopConf.setClass(flagName, v, classOf[PathFilter])
}
}
} else {
diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
index 50ef433..4fd3ed5 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
@@ -847,7 +847,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
}
/** Adds an observation. */
- def add(a: Array[Float], b: Double, c: Double = 1.0): this.type = {
+ def add(a: Array[Float], b: Double, c: Double = 1.0): NormalEquation = {
require(c >= 0.0)
require(a.length == k)
copyToDouble(a)
@@ -859,7 +859,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
}
/** Merges another normal equation object. */
- def merge(other: NormalEquation): this.type = {
+ def merge(other: NormalEquation): NormalEquation = {
require(other.k == k)
blas.daxpy(ata.length, 1.0, other.ata, 1, ata, 1)
blas.daxpy(atb.length, 1.0, other.atb, 1, atb, 1)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index 438a616..3e1bbba 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -23,7 +23,6 @@ import java.nio.charset.StandardCharsets
import java.util.{ArrayList => JArrayList, List => JList, Map => JMap}
import scala.collection.JavaConverters._
-import scala.language.existentials
import scala.reflect.ClassTag
import net.razorvine.pickle._
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPTree.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPTree.scala
index b0fa287..6ae8e20 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPTree.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPTree.scala
@@ -33,7 +33,7 @@ private[fpm] class FPTree[T] extends Serializable {
private val summaries: mutable.Map[T, Summary[T]] = mutable.Map.empty
/** Adds a transaction with count. */
- def add(t: Iterable[T], count: Long = 1L): this.type = {
+ def add(t: Iterable[T], count: Long = 1L): FPTree[T] = {
require(count > 0)
var curr = root
curr.count += count
@@ -53,7 +53,7 @@ private[fpm] class FPTree[T] extends Serializable {
}
/** Merges another FP-Tree. */
- def merge(other: FPTree[T]): this.type = {
+ def merge(other: FPTree[T]): FPTree[T] = {
other.transactions.foreach { case (t, c) =>
add(t, c)
}
diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
index 334f92b..6dea4b1 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
@@ -18,7 +18,6 @@
package org.apache.spark.ml.classification
import scala.collection.JavaConverters._
-import scala.language.existentials
import scala.util.Random
import scala.util.control.Breaks._
@@ -31,7 +30,7 @@ import org.apache.spark.ml.optim.aggregator.LogisticAggregator
import org.apache.spark.ml.param.{ParamMap, ParamsSuite}
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
import org.apache.spark.ml.util.TestingUtils._
-import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.functions.{col, lit, rand}
import org.apache.spark.sql.types.LongType
@@ -40,11 +39,11 @@ class LogisticRegressionSuite extends MLTest with DefaultReadWriteTest {
import testImplicits._
private val seed = 42
- @transient var smallBinaryDataset: Dataset[_] = _
- @transient var smallMultinomialDataset: Dataset[_] = _
- @transient var binaryDataset: Dataset[_] = _
- @transient var multinomialDataset: Dataset[_] = _
- @transient var multinomialDatasetWithZeroVar: Dataset[_] = _
+ @transient var smallBinaryDataset: DataFrame = _
+ @transient var smallMultinomialDataset: DataFrame = _
+ @transient var binaryDataset: DataFrame = _
+ @transient var multinomialDataset: DataFrame = _
+ @transient var multinomialDatasetWithZeroVar: DataFrame = _
private val eps: Double = 1e-5
override def beforeAll(): Unit = {
diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala
index 5708097..12122c0 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala
@@ -17,15 +17,13 @@
package org.apache.spark.ml.clustering
-import scala.language.existentials
-
import org.apache.spark.SparkException
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
import org.apache.spark.ml.util.TestingUtils._
import org.apache.spark.mllib.clustering.DistanceMeasure
-import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.DataFrame
class BisectingKMeansSuite extends MLTest with DefaultReadWriteTest {
@@ -33,9 +31,8 @@ class BisectingKMeansSuite extends MLTest with DefaultReadWriteTest {
import testImplicits._
final val k = 5
- @transient var dataset: Dataset[_] = _
-
- @transient var sparseDataset: Dataset[_] = _
+ @transient var dataset: DataFrame = _
+ @transient var sparseDataset: DataFrame = _
override def beforeAll(): Unit = {
super.beforeAll()
@@ -191,7 +188,7 @@ class BisectingKMeansSuite extends MLTest with DefaultReadWriteTest {
}
test("BisectingKMeans with Array input") {
- def trainAndComputeCost(dataset: Dataset[_]): Double = {
+ def trainAndComputeCost(dataset: DataFrame): Double = {
val model = new BisectingKMeans().setK(k).setMaxIter(1).setSeed(1).fit(dataset)
model.computeCost(dataset)
}
diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala
index 11fdd3a..133536f 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala
@@ -17,15 +17,13 @@
package org.apache.spark.ml.clustering
-import scala.language.existentials
-
import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.linalg.{DenseMatrix, Matrices, Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.stat.distribution.MultivariateGaussian
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
import org.apache.spark.ml.util.TestingUtils._
-import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.{DataFrame, Dataset, Row}
class GaussianMixtureSuite extends MLTest with DefaultReadWriteTest {
@@ -35,11 +33,11 @@ class GaussianMixtureSuite extends MLTest with DefaultReadWriteTest {
final val k = 5
private val seed = 538009335
- @transient var dataset: Dataset[_] = _
- @transient var denseDataset: Dataset[_] = _
- @transient var sparseDataset: Dataset[_] = _
- @transient var decompositionDataset: Dataset[_] = _
- @transient var rDataset: Dataset[_] = _
+ @transient var dataset: DataFrame = _
+ @transient var denseDataset: DataFrame = _
+ @transient var sparseDataset: DataFrame = _
+ @transient var decompositionDataset: DataFrame = _
+ @transient var rDataset: DataFrame = _
override def beforeAll(): Unit = {
super.beforeAll()
diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala
index 5d439a2..e3c82fa 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala
@@ -17,7 +17,6 @@
package org.apache.spark.ml.clustering
-import scala.language.existentials
import scala.util.Random
import org.dmg.pmml.PMML
@@ -40,7 +39,7 @@ class KMeansSuite extends MLTest with DefaultReadWriteTest with PMMLReadWriteTes
import testImplicits._
final val k = 5
- @transient var dataset: Dataset[_] = _
+ @transient var dataset: DataFrame = _
override def beforeAll(): Unit = {
super.beforeAll()
diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala
index bbd5408..d089822 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala
@@ -17,8 +17,6 @@
package org.apache.spark.ml.clustering
-import scala.language.existentials
-
import org.apache.hadoop.fs.Path
import org.apache.spark.ml.linalg.{Vector, Vectors}
@@ -64,7 +62,7 @@ class LDASuite extends MLTest with DefaultReadWriteTest {
val k: Int = 5
val vocabSize: Int = 30
- @transient var dataset: Dataset[_] = _
+ @transient var dataset: DataFrame = _
override def beforeAll(): Unit = {
super.beforeAll()
@@ -329,8 +327,7 @@ class LDASuite extends MLTest with DefaultReadWriteTest {
(model.logLikelihood(dataset), model.logPerplexity(dataset))
}
- val (newDataset, newDatasetD, newDatasetF) = MLTestingUtils.generateArrayFeatureDataset(dataset)
- val (ll, lp) = trainAndLogLikelihoodAndPerplexity(newDataset)
+ val (_, newDatasetD, newDatasetF) = MLTestingUtils.generateArrayFeatureDataset(dataset)
val (llD, lpD) = trainAndLogLikelihoodAndPerplexity(newDatasetD)
val (llF, lpF) = trainAndLogLikelihoodAndPerplexity(newDatasetF)
// TODO: need to compare the results once we fix the seed issue for LDA (SPARK-22210)
diff --git a/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala
index e2d7756..1d65faa 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.ml.param.ParamsSuite
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
import org.apache.spark.ml.util.TestingUtils._
import org.apache.spark.mllib.util.MLlibTestSparkContext
-import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.DataFrame
class ClusteringEvaluatorSuite
@@ -32,10 +32,10 @@ class ClusteringEvaluatorSuite
import testImplicits._
- @transient var irisDataset: Dataset[_] = _
- @transient var newIrisDataset: Dataset[_] = _
- @transient var newIrisDatasetD: Dataset[_] = _
- @transient var newIrisDatasetF: Dataset[_] = _
+ @transient var irisDataset: DataFrame = _
+ @transient var newIrisDataset: DataFrame = _
+ @transient var newIrisDatasetD: DataFrame = _
+ @transient var newIrisDatasetF: DataFrame = _
override def beforeAll(): Unit = {
super.beforeAll()
diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
index 2fc9754..6d0321c 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
@@ -23,7 +23,6 @@ import java.util.Random
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, WrappedArray}
-import scala.language.existentials
import com.github.fommil.netlib.BLAS.{getInstance => blas}
import org.apache.commons.io.FileUtils
diff --git a/mllib/src/test/scala/org/apache/spark/ml/util/MLTestingUtils.scala b/mllib/src/test/scala/org/apache/spark/ml/util/MLTestingUtils.scala
index 2c73700..ad4ac72 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/util/MLTestingUtils.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/util/MLTestingUtils.scala
@@ -24,7 +24,6 @@ import org.apache.spark.ml.feature.{Instance, LabeledPoint}
import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.param.shared.HasWeightCol
-import org.apache.spark.ml.recommendation.{ALS, ALSModel}
import org.apache.spark.ml.tree.impl.TreeTests
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.functions._
@@ -255,8 +254,8 @@ object MLTestingUtils extends SparkFunSuite {
* one having double array "features" column with float precision, and one having float array
* "features" column.
*/
- def generateArrayFeatureDataset(dataset: Dataset[_],
- featuresColName: String = "features"): (Dataset[_], Dataset[_], Dataset[_]) = {
+ def generateArrayFeatureDataset(dataset: DataFrame,
+ featuresColName: String = "features"): (DataFrame, DataFrame, DataFrame) = {
val toFloatVectorUDF = udf { (features: Vector) =>
Vectors.dense(features.toArray.map(_.toFloat.toDouble))}
val toDoubleArrayUDF = udf { (features: Vector) => features.toArray}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/fpm/FPGrowthSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/fpm/FPGrowthSuite.scala
index dc44c58..20bd2e5 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/fpm/FPGrowthSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/fpm/FPGrowthSuite.scala
@@ -16,8 +16,6 @@
*/
package org.apache.spark.mllib.fpm
-import scala.language.existentials
-
import org.apache.spark.SparkFunSuite
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.util.Utils
@@ -301,7 +299,7 @@ class FPGrowthSuite extends SparkFunSuite with MLlibTestSparkContext {
val path = tempDir.toURI.toString
try {
model3.save(sc, path)
- val newModel = FPGrowthModel.load(sc, path)
+ val newModel = FPGrowthModel.load(sc, path).asInstanceOf[FPGrowthModel[String]]
val newFreqItemsets = newModel.freqItemsets.collect().map { itemset =>
(itemset.items.toSet, itemset.freq)
}
@@ -335,7 +333,7 @@ class FPGrowthSuite extends SparkFunSuite with MLlibTestSparkContext {
val path = tempDir.toURI.toString
try {
model3.save(sc, path)
- val newModel = FPGrowthModel.load(sc, path)
+ val newModel = FPGrowthModel.load(sc, path).asInstanceOf[FPGrowthModel[String]]
val newFreqItemsets = newModel.freqItemsets.collect().map { itemset =>
(itemset.items.toSet, itemset.freq)
}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/fpm/FPTreeSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/fpm/FPTreeSuite.scala
index a56d7b3..4e2548d 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/fpm/FPTreeSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/fpm/FPTreeSuite.scala
@@ -17,8 +17,6 @@
package org.apache.spark.mllib.fpm
-import scala.language.existentials
-
import org.apache.spark.SparkFunSuite
import org.apache.spark.mllib.util.MLlibTestSparkContext
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala
index c2e08d0..a700706 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala
@@ -16,8 +16,6 @@
*/
package org.apache.spark.mllib.fpm
-import scala.language.existentials
-
import org.apache.spark.SparkFunSuite
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.util.Utils
@@ -420,7 +418,9 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext {
val path = tempDir.toURI.toString
try {
model.save(sc, path)
- val newModel = PrefixSpanModel.load(sc, path)
+ // Save/loading this model results in item type "Object" even if in this case
+ // the objects are Integers -- not Int as in the original saved model.
+ val newModel = PrefixSpanModel.load(sc, path).asInstanceOf[PrefixSpanModel[AnyRef]]
val originalSet = model.freqSequences.collect().map { x =>
(x.sequence.map(_.toSet).toSeq, x.freq)
}.toSet
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index deb53cf..d725ef5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -21,7 +21,6 @@ import java.util.Locale
import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable
-import scala.language.existentials
import scala.reflect.ClassTag
import scala.util.{Failure, Success, Try}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index de3223e..95fad41 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -23,7 +23,6 @@ import java.util.{Map => JavaMap}
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
-import scala.language.existentials
import scala.util.control.NonFatal
import com.google.common.cache.{CacheBuilder, CacheLoader}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala
index 2b73b96..80999ef 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.expressions.codegen
import java.lang.{Boolean => JBool}
import scala.collection.mutable.ArrayBuffer
-import scala.language.{existentials, implicitConversions}
+import scala.language.implicitConversions
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.types.{BooleanType, DataType}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index 8182730..a6a48b6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -21,7 +21,6 @@ import java.lang.reflect.{Method, Modifier}
import scala.collection.JavaConverters._
import scala.collection.mutable.Builder
-import scala.language.existentials
import scala.reflect.ClassTag
import scala.util.Try
@@ -30,7 +29,6 @@ import org.apache.spark.serializer._
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, ScalaReflection}
import org.apache.spark.sql.catalyst.ScalaReflection.universe.TermName
-import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, UnresolvedException}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen._
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
index 1d44cc3..74a9f13 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.catalyst.plans.logical
-import scala.language.existentials
-
import org.apache.spark.api.java.function.FilterFunction
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{Encoder, Row}
@@ -29,7 +27,6 @@ import org.apache.spark.sql.catalyst.expressions.objects.Invoke
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode}
import org.apache.spark.sql.types._
-import org.apache.spark.util.Utils
object CatalystSerde {
def deserialize[T : Encoder](child: LogicalPlan): DeserializeToObject = {
@@ -245,12 +242,12 @@ case class TypedFilter(
}
def typedCondition(input: Expression): Expression = {
- val (funcClass, methodName) = func match {
- case m: FilterFunction[_] => classOf[FilterFunction[_]] -> "call"
+ val funcMethod = func match {
+ case _: FilterFunction[_] => (classOf[FilterFunction[_]], "call")
case _ => FunctionUtils.getFunctionOneName(BooleanType, input.dataType)
}
- val funcObj = Literal.create(func, ObjectType(funcClass))
- Invoke(funcObj, methodName, BooleanType, input :: Nil)
+ val funcObj = Literal.create(func, ObjectType(funcMethod._1))
+ Invoke(funcObj, funcMethod._2, BooleanType, input :: Nil)
}
}
@@ -266,9 +263,9 @@ object FunctionUtils {
}
}
- def getFunctionOneName(outputDT: DataType, inputDT: DataType): (Class[_], String) = {
- // load "scala.Function1" using Java API to avoid requirements of type parameters
- Utils.classForName("scala.Function1") -> {
+ def getFunctionOneName(outputDT: DataType, inputDT: DataType):
+ (Class[scala.Function1[_, _]], String) = {
+ classOf[scala.Function1[_, _]] -> {
// if a pair of an argument and return types is one of specific types
// whose specialized method (apply$mc..$sp) is generated by scalac,
// Catalyst generated a direct method call to the specialized method.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala
index 0989af2..5df2af9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala
@@ -48,11 +48,10 @@ object ArrayBasedMapData {
* @param valueConverter This function is applied over all the values of the input map to
* obtain the output map's values
*/
- def apply(
- javaMap: JavaMap[_, _],
+ def apply[K, V](
+ javaMap: JavaMap[K, V],
keyConverter: (Any) => Any,
valueConverter: (Any) => Any): ArrayBasedMapData = {
- import scala.language.existentials
val keys: Array[Any] = new Array[Any](javaMap.size())
val values: Array[Any] = new Array[Any](javaMap.size())
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
index c58f7a2..d26a73a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
@@ -180,7 +180,7 @@ object DataType {
("pyClass", _),
("sqlType", _),
("type", JString("udt"))) =>
- Utils.classForName(udtClass).getConstructor().newInstance().asInstanceOf[UserDefinedType[_]]
+ Utils.classForName[UserDefinedType[_]](udtClass).getConstructor().newInstance()
// Python UDT
case JSortedObject(
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ObjectType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ObjectType.scala
index 6756b20..e79c0a4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ObjectType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ObjectType.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.types
-import scala.language.existentials
-
import org.apache.spark.annotation.Evolving
@Evolving
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
index 436675b..8520300 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.catalyst.expressions
import java.sql.{Date, Timestamp}
import scala.collection.JavaConverters._
-import scala.language.existentials
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
import scala.util.Random
@@ -316,7 +315,7 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
}
test("SPARK-23587: MapObjects should support interpreted execution") {
- def testMapObjects(collection: Any, collectionCls: Class[_], inputType: DataType): Unit = {
+ def testMapObjects[T](collection: Any, collectionCls: Class[T], inputType: DataType): Unit = {
val function = (lambda: Expression) => Add(lambda, Literal(1))
val elementType = IntegerType
val expected = Seq(2, 3, 4)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
index 83425de..f0ef6e1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
@@ -635,7 +635,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
private[sql] def registerJava(name: String, className: String, returnDataType: DataType): Unit = {
try {
- val clazz = Utils.classForName(className)
+ val clazz = Utils.classForName[AnyRef](className)
val udfInterfaces = clazz.getGenericInterfaces
.filter(_.isInstanceOf[ParameterizedType])
.map(_.asInstanceOf[ParameterizedType])
@@ -699,7 +699,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
*/
private[sql] def registerJavaUDAF(name: String, className: String): Unit = {
try {
- val clazz = Utils.classForName(className)
+ val clazz = Utils.classForName[AnyRef](className)
if (!classOf[UserDefinedAggregateFunction].isAssignableFrom(clazz)) {
throw new AnalysisException(s"class $className doesn't implement interface UserDefinedAggregateFunction")
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala
index b757529..a2def6b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.execution.aggregate
-import scala.language.existentials
-
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.UnresolvedDeserializer
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 08650c8..52f30ac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources
import java.util.{Locale, ServiceConfigurationError, ServiceLoader}
import scala.collection.JavaConverters._
-import scala.language.{existentials, implicitConversions}
+import scala.language.implicitConversions
import scala.util.{Failure, Success, Try}
import org.apache.hadoop.conf.Configuration
@@ -31,7 +31,6 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogUtils}
-import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.SparkPlan
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 81cc958..38c634e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -21,7 +21,6 @@ import java.util.Locale
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
-import scala.language.existentials
import org.apache.spark.TestUtils.{assertNotSpilled, assertSpilled}
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -50,8 +49,9 @@ class JoinSuite extends QueryTest with SharedSQLContext {
assert(planned.size === 1)
}
- def assertJoin(pair: (String, Class[_])): Any = {
- val (sqlString, c) = pair
+ def assertJoin(pair: (String, Class[_ <: BinaryExecNode])): Any = {
+ val sqlString = pair._1
+ val c = pair._2
val df = sql(sqlString)
val physical = df.queryExecution.sparkPlan
val operators = physical.collect {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
index 09793bd..edbc249 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
@@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.datasources.parquet
import java.io.File
-import scala.language.existentials
-
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER
@@ -186,12 +184,11 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS
assert(typeName === PrimitiveTypeName.INT96)
val oneBlockMeta = oneFooter.getBlocks().get(0)
val oneBlockColumnMeta = oneBlockMeta.getColumns().get(0)
- val columnStats = oneBlockColumnMeta.getStatistics
// This is the important assert. Column stats are written, but they are ignored
// when the data is read back as mentioned above, b/c int96 is unsigned. This
// assert makes sure this holds even if we change parquet versions (if eg. there
// were ever statistics even on unsigned columns).
- assert(!columnStats.hasNonNullValue)
+ assert(!oneBlockColumnMeta.getStatistics.hasNonNullValue)
}
// These queries should return the entire dataset with the conversion applied,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
index a538b94..daac207 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
@@ -19,8 +19,6 @@ package org.apache.spark.sql.sources
import java.util.Locale
-import scala.language.existentials
-
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.PredicateHelper
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala
index c1eaf94..309591d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.sources
-import scala.language.existentials
-
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.internal.SQLConf
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index ad11719..3f9925e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -94,7 +94,7 @@ class HadoopTableReader(
override def makeRDDForTable(hiveTable: HiveTable): RDD[InternalRow] =
makeRDDForTable(
hiveTable,
- Utils.classForName(tableDesc.getSerdeClassName).asInstanceOf[Class[Deserializer]],
+ Utils.classForName[Deserializer](tableDesc.getSerdeClassName),
filterOpt = None)
/**
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 640cca0..92a1120 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -922,11 +922,10 @@ private[hive] object HiveClientImpl {
}
private def toInputFormat(name: String) =
- Utils.classForName(name).asInstanceOf[Class[_ <: org.apache.hadoop.mapred.InputFormat[_, _]]]
+ Utils.classForName[org.apache.hadoop.mapred.InputFormat[_, _]](name)
private def toOutputFormat(name: String) =
- Utils.classForName(name)
- .asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]
+ Utils.classForName[org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]](name)
/**
* Converts the native table metadata representation format CatalogTable to Hive's Table.
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
index 1825af6..24a67f9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.hive.execution
-import scala.language.existentials
-
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.common.FileUtils
import org.apache.hadoop.hive.ql.plan.TableDesc
@@ -30,7 +28,6 @@ import org.apache.spark.SparkException
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
-import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.hive.client.HiveClientImpl
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala
index 905cb52..e12f663 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala
@@ -407,8 +407,8 @@ case class HiveScriptIOSchema (
columnTypes: Seq[DataType],
serdeProps: Seq[(String, String)]): AbstractSerDe = {
- val serde = Utils.classForName(serdeClassName).getConstructor().
- newInstance().asInstanceOf[AbstractSerDe]
+ val serde = Utils.classForName[AbstractSerDe](serdeClassName).getConstructor().
+ newInstance()
val columnTypesNames = columnTypes.map(_.toTypeInfo.getTypeName()).mkString(",")
@@ -428,8 +428,8 @@ case class HiveScriptIOSchema (
inputStream: InputStream,
conf: Configuration): Option[RecordReader] = {
recordReaderClass.map { klass =>
- val instance = Utils.classForName(klass).getConstructor().
- newInstance().asInstanceOf[RecordReader]
+ val instance = Utils.classForName[RecordReader](klass).getConstructor().
+ newInstance()
val props = new Properties()
// Can not use props.putAll(outputSerdeProps.toMap.asJava) in scala-2.12
// See https://github.com/scala/bug/issues/10418
@@ -441,8 +441,8 @@ case class HiveScriptIOSchema (
def recordWriter(outputStream: OutputStream, conf: Configuration): Option[RecordWriter] = {
recordWriterClass.map { klass =>
- val instance = Utils.classForName(klass).getConstructor().
- newInstance().asInstanceOf[RecordWriter]
+ val instance = Utils.classForName[RecordWriter](klass).getConstructor().
+ newInstance()
instance.initialize(outputStream, conf)
instance
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 5e97a05..a907fca 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -20,8 +20,6 @@ package org.apache.spark.sql.hive.execution
import java.io.File
import java.net.URI
-import scala.language.existentials
-
import org.apache.hadoop.fs.Path
import org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER
import org.apache.parquet.hadoop.ParquetFileReader
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala
index eaedac1..b20ef03 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala
@@ -17,15 +17,12 @@
package org.apache.spark.sql.hive.execution
-import scala.language.existentials
-
import org.apache.hadoop.conf.Configuration
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.StaticSQLConf._
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
index fe0f875..88bfc88 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
@@ -46,7 +46,7 @@ class WindowedDStream[T: ClassTag](
def windowDuration: Duration = _windowDuration
- override def dependencies: List[DStream[_]] = List(parent)
+ override def dependencies: List[DStream[T]] = List(parent)
override def slideDuration: Duration = _slideDuration
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlock.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlock.scala
index 8c3a797..a0b49e0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlock.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlock.scala
@@ -20,7 +20,6 @@ package org.apache.spark.streaming.receiver
import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
-import scala.language.existentials
/** Trait representing a received block */
private[streaming] sealed trait ReceivedBlock
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index f8ddabd..eb70232 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -19,7 +19,6 @@ package org.apache.spark.streaming.receiver
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
-import scala.language.existentials
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 4f3fa05..7091598 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -51,11 +51,11 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
val clockClass = ssc.sc.conf.get(
"spark.streaming.clock", "org.apache.spark.util.SystemClock")
try {
- Utils.classForName(clockClass).getConstructor().newInstance().asInstanceOf[Clock]
+ Utils.classForName[Clock](clockClass).getConstructor().newInstance()
} catch {
case e: ClassNotFoundException if clockClass.startsWith("org.apache.spark.streaming") =>
val newClockClass = clockClass.replace("org.apache.spark.streaming", "org.apache.spark")
- Utils.classForName(newClockClass).getConstructor().newInstance().asInstanceOf[Clock]
+ Utils.classForName[Clock](newClockClass).getConstructor().newInstance()
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 829254b..551d376 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -21,7 +21,6 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
import scala.collection.mutable.HashMap
import scala.concurrent.ExecutionContext
-import scala.language.existentials
import scala.util.{Failure, Success}
import org.apache.spark._
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
index 7542e2f..b0a4c98 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
@@ -132,8 +132,7 @@ private[streaming] object WriteAheadLogUtils extends Logging {
}
val wal = classNameOption.map { className =>
try {
- instantiateClass(
- Utils.classForName(className).asInstanceOf[Class[_ <: WriteAheadLog]], sparkConf)
+ instantiateClass(Utils.classForName[WriteAheadLog](className), sparkConf)
} catch {
case NonFatal(e) =>
throw new SparkException(s"Could not create a write ahead log of class $className", e)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 0a764f6..287a43a 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -20,7 +20,6 @@ package org.apache.spark.streaming
import java.util.concurrent.ConcurrentLinkedQueue
import scala.collection.mutable
-import scala.language.existentials
import scala.reflect.ClassTag
import org.scalatest.concurrent.Eventually.eventually
@@ -656,12 +655,12 @@ class BasicOperationsSuite extends TestSuiteBase {
runCleanupTest(
conf,
- operation _,
+ operation,
numExpectedOutput = cleanupTestInput.size / 2,
rememberDuration = Seconds(3)) { operatedStream =>
eventually(eventuallyTimeout) {
- val windowedStream2 = operatedStream.asInstanceOf[WindowedDStream[_]]
- val windowedStream1 = windowedStream2.dependencies.head.asInstanceOf[WindowedDStream[_]]
+ val windowedStream2 = operatedStream.asInstanceOf[WindowedDStream[Int]]
+ val windowedStream1 = windowedStream2.dependencies.head.asInstanceOf[WindowedDStream[Int]]
val mappedStream = windowedStream1.dependencies.head
// Checkpoint remember durations
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org