You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/04/02 14:37:32 UTC
[spark] branch master updated: [SPARK-27323][CORE][SQL][STREAMING]
Use Single-Abstract-Method support in Scala 2.12 to simplify code
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d4420b4 [SPARK-27323][CORE][SQL][STREAMING] Use Single-Abstract-Method support in Scala 2.12 to simplify code
d4420b4 is described below
commit d4420b455ab81b86c29fc45a3107e45873c72dc2
Author: Sean Owen <se...@databricks.com>
AuthorDate: Tue Apr 2 07:37:05 2019 -0700
[SPARK-27323][CORE][SQL][STREAMING] Use Single-Abstract-Method support in Scala 2.12 to simplify code
## What changes were proposed in this pull request?
Use Single Abstract Method syntax where possible (and minor related cleanup). Comments below. No logic should change here.
## How was this patch tested?
Existing tests.
Closes #24241 from srowen/SPARK-27323.
Authored-by: Sean Owen <se...@databricks.com>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../org/apache/spark/BarrierCoordinator.scala | 8 +--
.../scala/org/apache/spark/ContextCleaner.scala | 5 +-
.../scala/org/apache/spark/HeartbeatReceiver.scala | 8 +--
.../main/scala/org/apache/spark/SparkConf.scala | 6 +-
.../org/apache/spark/deploy/PythonRunner.scala | 6 +-
.../org/apache/spark/deploy/SparkHadoopUtil.scala | 11 ++-
.../spark/deploy/history/FsHistoryProvider.scala | 44 +++++-------
.../org/apache/spark/deploy/master/Master.scala | 8 +--
.../org/apache/spark/deploy/worker/Worker.scala | 38 ++++-------
.../scala/org/apache/spark/executor/Executor.scala | 11 ++-
.../apache/spark/launcher/LauncherBackend.scala | 9 +--
.../org/apache/spark/metrics/MetricsSystem.scala | 6 +-
.../org/apache/spark/scheduler/DAGScheduler.scala | 34 +++++-----
.../apache/spark/scheduler/TaskResultGetter.scala | 40 ++++++-----
.../apache/spark/scheduler/TaskSchedulerImpl.scala | 14 ++--
.../cluster/CoarseGrainedSchedulerBackend.scala | 14 ++--
.../apache/spark/status/AppStatusListener.scala | 15 ++--
.../spark/util/collection/ExternalSorter.scala | 32 ++++-----
.../WritablePartitionedPairCollection.scala | 23 +++----
.../apache/spark/util/logging/DriverLogger.scala | 6 +-
.../scala/org/apache/spark/SparkConfSuite.scala | 7 +-
.../deploy/history/ApplicationCacheSuite.scala | 9 +--
.../deploy/history/FsHistoryProviderSuite.scala | 9 +--
.../spark/deploy/worker/DriverRunnerTest.scala | 25 +++----
.../apache/spark/deploy/worker/WorkerSuite.scala | 19 ++----
.../org/apache/spark/executor/ExecutorSuite.scala | 11 ++-
.../apache/spark/memory/MemoryManagerSuite.scala | 42 +++++-------
.../spark/scheduler/BlacklistTrackerSuite.scala | 23 +++----
.../scheduler/OutputCommitCoordinatorSuite.scala | 46 ++++++-------
.../scheduler/SchedulerIntegrationSuite.scala | 10 +--
.../spark/scheduler/TaskSetManagerSuite.scala | 19 ++----
.../sort/BypassMergeSortShuffleWriterSuite.scala | 67 ++++++++----------
.../sort/IndexShuffleBlockResolverSuite.scala | 7 +-
.../storage/PartiallySerializedBlockSuite.scala | 9 +--
.../storage/ShuffleBlockFetcherIteratorSuite.scala | 79 +++++++++-------------
.../org/apache/spark/util/ThreadUtilsSuite.scala | 14 ++--
.../spark/util/collection/AppendOnlyMapSuite.scala | 12 ++--
.../util/collection/ExternalSorterSuite.scala | 15 ++--
.../apache/spark/util/collection/SorterSuite.scala | 20 ++----
.../collection/unsafe/sort/RadixSortSuite.scala | 13 ++--
.../spark/sql/kafka010/KafkaOffsetReader.scala | 16 ++---
.../spark/sql/kafka010/KafkaSourceProvider.scala | 5 +-
.../kafka010/DirectKafkaStreamSuite.scala | 18 +++--
.../kinesis/KinesisCheckpointerSuite.scala | 9 +--
.../spark/graphx/impl/EdgePartitionBuilder.scala | 14 ++--
.../spark/repl/ExecutorClassLoaderSuite.scala | 13 ++--
.../org/apache/spark/repl/SingletonReplSuite.scala | 4 +-
.../k8s/submit/LoggingPodStatusWatcher.scala | 4 +-
.../k8s/ExecutorPodsSnapshotsStoreImpl.scala | 6 +-
.../k8s/features/KubernetesFeaturesTestUtils.scala | 21 +++---
.../cluster/k8s/ExecutorPodsAllocatorSuite.scala | 15 ++--
.../k8s/ExecutorPodsLifecycleManagerSuite.scala | 14 ++--
.../apache/spark/deploy/yarn/YarnAllocator.scala | 66 +++++++++---------
.../expressions/collectionOperations.scala | 46 ++++++-------
.../apache/spark/sql/catalyst/util/package.scala | 8 +--
.../org/apache/spark/sql/types/BinaryType.scala | 8 +--
.../org/apache/spark/sql/types/DoubleType.scala | 7 +-
.../org/apache/spark/sql/types/FloatType.scala | 7 +-
.../catalog/ExternalCatalogEventSuite.scala | 9 +--
.../spark/sql/execution/SortPrefixUtils.scala | 11 +--
.../apache/spark/sql/execution/command/ddl.scala | 23 +++----
.../execution/datasources/DataSourceStrategy.scala | 36 +++++-----
.../streaming/CheckpointFileManager.scala | 16 ++---
.../streaming/CompactibleFileStreamLog.scala | 26 ++++---
.../streaming/sources/RateStreamProvider.scala | 18 ++---
.../sources/TextSocketMicroBatchStream.scala | 31 ++++-----
.../sources/TextSocketSourceProvider.scala | 16 ++---
.../sql/execution/ui/SQLAppStatusListener.scala | 7 +-
.../apache/spark/sql/internal/SharedState.scala | 8 +--
.../scala/org/apache/spark/sql/DatasetSuite.scala | 7 +-
.../spark/sql/execution/QueryExecutionSuite.scala | 19 ++----
.../sql/execution/benchmark/SortBenchmark.scala | 13 ++--
.../apache/spark/sql/streaming/StreamTest.scala | 7 +-
.../test/DataStreamReaderWriterSuite.scala | 6 +-
.../spark/sql/streaming/util/BlockingSource.scala | 6 +-
.../sql/hive/thriftserver/SparkSQLCLIDriver.scala | 22 +++---
.../streaming/scheduler/ReceiverTracker.scala | 18 ++---
.../streaming/util/BatchedWriteAheadLog.scala | 18 +++--
78 files changed, 543 insertions(+), 849 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/BarrierCoordinator.scala b/core/src/main/scala/org/apache/spark/BarrierCoordinator.scala
index 6439ca5..9f59295 100644
--- a/core/src/main/scala/org/apache/spark/BarrierCoordinator.scala
+++ b/core/src/main/scala/org/apache/spark/BarrierCoordinator.scala
@@ -19,7 +19,7 @@ package org.apache.spark
import java.util.{Timer, TimerTask}
import java.util.concurrent.ConcurrentHashMap
-import java.util.function.{Consumer, Function}
+import java.util.function.Consumer
import scala.collection.mutable.ArrayBuffer
@@ -202,10 +202,8 @@ private[spark] class BarrierCoordinator(
case request @ RequestToSync(numTasks, stageId, stageAttemptId, _, _) =>
// Get or init the ContextBarrierState correspond to the stage attempt.
val barrierId = ContextBarrierId(stageId, stageAttemptId)
- states.computeIfAbsent(barrierId, new Function[ContextBarrierId, ContextBarrierState] {
- override def apply(key: ContextBarrierId): ContextBarrierState =
- new ContextBarrierState(key, numTasks)
- })
+ states.computeIfAbsent(barrierId,
+ (key: ContextBarrierId) => new ContextBarrierState(key, numTasks))
val barrierState = states.get(barrierId)
barrierState.handleRequest(context, request)
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index 305ec46..a111a60 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -123,9 +123,8 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
cleaningThread.setDaemon(true)
cleaningThread.setName("Spark Context Cleaner")
cleaningThread.start()
- periodicGCService.scheduleAtFixedRate(new Runnable {
- override def run(): Unit = System.gc()
- }, periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)
+ periodicGCService.scheduleAtFixedRate(() => System.gc(),
+ periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index ec3a184..f7e3103 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -98,11 +98,9 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
private val killExecutorThread = ThreadUtils.newDaemonSingleThreadExecutor("kill-executor-thread")
override def onStart(): Unit = {
- timeoutCheckingTask = eventLoopThread.scheduleAtFixedRate(new Runnable {
- override def run(): Unit = Utils.tryLogNonFatalError {
- Option(self).foreach(_.ask[Boolean](ExpireDeadHosts))
- }
- }, 0, checkTimeoutIntervalMs, TimeUnit.MILLISECONDS)
+ timeoutCheckingTask = eventLoopThread.scheduleAtFixedRate(
+ () => Utils.tryLogNonFatalError { Option(self).foreach(_.ask[Boolean](ExpireDeadHosts)) },
+ 0, checkTimeoutIntervalMs, TimeUnit.MILLISECONDS)
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 7050396..8499246 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -62,9 +62,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
@transient private lazy val reader: ConfigReader = {
val _reader = new ConfigReader(new SparkConfigProvider(settings))
- _reader.bindEnv(new ConfigProvider {
- override def get(key: String): Option[String] = Option(getenv(key))
- })
+ _reader.bindEnv((key: String) => Option(getenv(key)))
_reader
}
@@ -392,7 +390,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
/** Get an optional value, applying variable substitution. */
private[spark] def getWithSubstitution(key: String): Option[String] = {
- getOption(key).map(reader.substitute(_))
+ getOption(key).map(reader.substitute)
}
/** Get all parameters as a list of pairs */
diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
index ccb30e2..f5e8cff 100644
--- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
@@ -60,11 +60,7 @@ object PythonRunner {
.javaAddress(localhost)
.callbackClient(py4j.GatewayServer.DEFAULT_PYTHON_PORT, localhost, secret)
.build()
- val thread = new Thread(new Runnable() {
- override def run(): Unit = Utils.logUncaughtExceptions {
- gatewayServer.start()
- }
- })
+ val thread = new Thread(() => Utils.logUncaughtExceptions { gatewayServer.start() })
thread.setName("py4j-gateway-init")
thread.setDaemon(true)
thread.start()
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index a97d072..11420bb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -20,7 +20,7 @@ package org.apache.spark.deploy
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream, File, IOException}
import java.security.PrivilegedExceptionAction
import java.text.DateFormat
-import java.util.{Arrays, Comparator, Date, Locale}
+import java.util.{Arrays, Date, Locale}
import scala.collection.JavaConverters._
import scala.collection.immutable.Map
@@ -270,11 +270,8 @@ private[spark] class SparkHadoopUtil extends Logging {
name.startsWith(prefix) && !name.endsWith(exclusionSuffix)
}
})
- Arrays.sort(fileStatuses, new Comparator[FileStatus] {
- override def compare(o1: FileStatus, o2: FileStatus): Int = {
- Longs.compare(o1.getModificationTime, o2.getModificationTime)
- }
- })
+ Arrays.sort(fileStatuses, (o1: FileStatus, o2: FileStatus) =>
+ Longs.compare(o1.getModificationTime, o2.getModificationTime))
fileStatuses
} catch {
case NonFatal(e) =>
@@ -465,7 +462,7 @@ private[spark] object SparkHadoopUtil {
// scalastyle:on line.size.limit
def createNonECFile(fs: FileSystem, path: Path): FSDataOutputStream = {
try {
- // Use reflection as this uses apis only avialable in hadoop 3
+ // Use reflection as this uses APIs only available in Hadoop 3
val builderMethod = fs.getClass().getMethod("createFile", classOf[Path])
// the builder api does not resolve relative paths, nor does it create parent dirs, while
// the old api does.
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 56aea0c..98265ff 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -186,13 +186,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
* Return a runnable that performs the given operation on the event logs.
* This operation is expected to be executed periodically.
*/
- private def getRunner(operateFun: () => Unit): Runnable = {
- new Runnable() {
- override def run(): Unit = Utils.tryOrExit {
- operateFun()
- }
- }
- }
+ private def getRunner(operateFun: () => Unit): Runnable =
+ () => Utils.tryOrExit { operateFun() }
/**
* Fixed size thread pool to fetch and parse log files.
@@ -221,29 +216,25 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// Cannot probe anything while the FS is in safe mode, so spawn a new thread that will wait
// for the FS to leave safe mode before enabling polling. This allows the main history server
// UI to be shown (so that the user can see the HDFS status).
- val initThread = new Thread(new Runnable() {
- override def run(): Unit = {
- try {
- while (isFsInSafeMode()) {
- logInfo("HDFS is still in safe mode. Waiting...")
- val deadline = clock.getTimeMillis() +
- TimeUnit.SECONDS.toMillis(SAFEMODE_CHECK_INTERVAL_S)
- clock.waitTillTime(deadline)
- }
- startPolling()
- } catch {
- case _: InterruptedException =>
+ val initThread = new Thread(() => {
+ try {
+ while (isFsInSafeMode()) {
+ logInfo("HDFS is still in safe mode. Waiting...")
+ val deadline = clock.getTimeMillis() +
+ TimeUnit.SECONDS.toMillis(SAFEMODE_CHECK_INTERVAL_S)
+ clock.waitTillTime(deadline)
}
+ startPolling()
+ } catch {
+ case _: InterruptedException =>
}
})
initThread.setDaemon(true)
initThread.setName(s"${getClass().getSimpleName()}-init")
initThread.setUncaughtExceptionHandler(errorHandler.getOrElse(
- new Thread.UncaughtExceptionHandler() {
- override def uncaughtException(t: Thread, e: Throwable): Unit = {
- logError("Error initializing FsHistoryProvider.", e)
- System.exit(1)
- }
+ (_: Thread, e: Throwable) => {
+ logError("Error initializing FsHistoryProvider.", e)
+ System.exit(1)
}))
initThread.start()
initThread
@@ -517,9 +508,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val tasks = updated.flatMap { entry =>
try {
- val task: Future[Unit] = replayExecutor.submit(new Runnable {
- override def run(): Unit = mergeApplicationListing(entry, newLastScanTime, true)
- }, Unit)
+ val task: Future[Unit] = replayExecutor.submit(
+ () => mergeApplicationListing(entry, newLastScanTime, true))
Some(task -> entry.getPath)
} catch {
// let the iteration over the updated entries break, since an exception on
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 50e282f..5db9b52 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -150,11 +150,9 @@ private[deploy] class Master(
logInfo(s"Spark Master is acting as a reverse proxy. Master, Workers and " +
s"Applications UIs are available at $masterWebUiUrl")
}
- checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
- override def run(): Unit = Utils.tryLogNonFatalError {
- self.send(CheckForWorkerTimeOut)
- }
- }, 0, workerTimeoutMs, TimeUnit.MILLISECONDS)
+ checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(
+ () => Utils.tryLogNonFatalError { self.send(CheckForWorkerTimeOut) },
+ 0, workerTimeoutMs, TimeUnit.MILLISECONDS)
if (restServerEnabled) {
val port = conf.get(MASTER_REST_SERVER_PORT)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index a0664b3f..eb2add3 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -325,11 +325,9 @@ private[deploy] class Worker(
if (connectionAttemptCount == INITIAL_REGISTRATION_RETRIES) {
registrationRetryTimer.foreach(_.cancel(true))
registrationRetryTimer = Some(
- forwardMessageScheduler.scheduleAtFixedRate(new Runnable {
- override def run(): Unit = Utils.tryLogNonFatalError {
- self.send(ReregisterWithMaster)
- }
- }, PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS,
+ forwardMessageScheduler.scheduleAtFixedRate(
+ () => Utils.tryLogNonFatalError { self.send(ReregisterWithMaster) },
+ PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS,
PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS,
TimeUnit.SECONDS))
}
@@ -341,7 +339,7 @@ private[deploy] class Worker(
}
/**
- * Cancel last registeration retry, or do nothing if no retry
+ * Cancel last registration retry, or do nothing if no retry
*/
private def cancelLastRegistrationRetry(): Unit = {
if (registerMasterFutures != null) {
@@ -361,11 +359,7 @@ private[deploy] class Worker(
registerMasterFutures = tryRegisterAllMasters()
connectionAttemptCount = 0
registrationRetryTimer = Some(forwardMessageScheduler.scheduleAtFixedRate(
- new Runnable {
- override def run(): Unit = Utils.tryLogNonFatalError {
- Option(self).foreach(_.send(ReregisterWithMaster))
- }
- },
+ () => Utils.tryLogNonFatalError { Option(self).foreach(_.send(ReregisterWithMaster)) },
INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
TimeUnit.SECONDS))
@@ -407,19 +401,15 @@ private[deploy] class Worker(
}
registered = true
changeMaster(masterRef, masterWebUiUrl, masterAddress)
- forwardMessageScheduler.scheduleAtFixedRate(new Runnable {
- override def run(): Unit = Utils.tryLogNonFatalError {
- self.send(SendHeartbeat)
- }
- }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
+ forwardMessageScheduler.scheduleAtFixedRate(
+ () => Utils.tryLogNonFatalError { self.send(SendHeartbeat) },
+ 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
if (CLEANUP_ENABLED) {
logInfo(
s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
- forwardMessageScheduler.scheduleAtFixedRate(new Runnable {
- override def run(): Unit = Utils.tryLogNonFatalError {
- self.send(WorkDirCleanup)
- }
- }, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
+ forwardMessageScheduler.scheduleAtFixedRate(
+ () => Utils.tryLogNonFatalError { self.send(WorkDirCleanup) },
+ CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
}
val execs = executors.values.map { e =>
@@ -568,7 +558,7 @@ private[deploy] class Worker(
}
}
- case executorStateChanged @ ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
+ case executorStateChanged: ExecutorStateChanged =>
handleExecutorStateChanged(executorStateChanged)
case KillExecutor(masterUrl, appId, execId) =>
@@ -632,7 +622,7 @@ private[deploy] class Worker(
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
if (master.exists(_.address == remoteAddress) ||
- masterAddressToConnect.exists(_ == remoteAddress)) {
+ masterAddressToConnect.contains(remoteAddress)) {
logInfo(s"$remoteAddress Disassociated !")
masterDisconnected()
}
@@ -815,7 +805,7 @@ private[deploy] object Worker extends Logging {
val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
val securityMgr = new SecurityManager(conf)
val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
- val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))
+ val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL)
rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,
masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr))
rpcEnv
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index d597315..cc3cc16 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -89,17 +89,14 @@ private[spark] class Executor(
}
// Start worker thread pool
+ // Use UninterruptibleThread to run tasks so that we can allow running codes without being
+ // interrupted by `Thread.interrupt()`. Some issues, such as KAFKA-1894, HADOOP-10622,
+ // will hang forever if some methods are interrupted.
private val threadPool = {
val threadFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("Executor task launch worker-%d")
- .setThreadFactory(new ThreadFactory {
- override def newThread(r: Runnable): Thread =
- // Use UninterruptibleThread to run tasks so that we can allow running codes without being
- // interrupted by `Thread.interrupt()`. Some issues, such as KAFKA-1894, HADOOP-10622,
- // will hang forever if some methods are interrupted.
- new UninterruptibleThread(r, "unused") // thread name will be set by ThreadFactoryBuilder
- })
+ .setThreadFactory((r: Runnable) => new UninterruptibleThread(r, "unused"))
.build()
Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
}
diff --git a/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala b/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala
index 1b049b7..77bbbd9 100644
--- a/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala
+++ b/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala
@@ -44,7 +44,7 @@ private[spark] abstract class LauncherBackend {
.map(_.toInt)
val secret = conf.getOption(LauncherProtocol.CONF_LAUNCHER_SECRET)
.orElse(sys.env.get(LauncherProtocol.ENV_LAUNCHER_SECRET))
- if (port != None && secret != None) {
+ if (port.isDefined && secret.isDefined) {
val s = new Socket(InetAddress.getLoopbackAddress(), port.get)
connection = new BackendConnection(s)
connection.send(new Hello(secret.get, SPARK_VERSION))
@@ -94,11 +94,8 @@ private[spark] abstract class LauncherBackend {
protected def onDisconnected() : Unit = { }
private def fireStopRequest(): Unit = {
- val thread = LauncherBackend.threadFactory.newThread(new Runnable() {
- override def run(): Unit = Utils.tryLogNonFatalError {
- onStopRequest()
- }
- })
+ val thread = LauncherBackend.threadFactory.newThread(
+ () => Utils.tryLogNonFatalError { onStopRequest() })
thread.start()
}
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index 89dd74e..8dad42b 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit
import scala.collection.mutable
-import com.codahale.metrics.{Metric, MetricFilter, MetricRegistry}
+import com.codahale.metrics.{Metric, MetricRegistry}
import org.eclipse.jetty.servlet.ServletContextHandler
import org.apache.spark.{SecurityManager, SparkConf}
@@ -168,9 +168,7 @@ private[spark] class MetricsSystem private (
def removeSource(source: Source) {
sources -= source
val regName = buildRegistryName(source)
- registry.removeMatching(new MetricFilter {
- def matches(name: String, metric: Metric): Boolean = name.startsWith(regName)
- })
+ registry.removeMatching((name: String, _: Metric) => name.startsWith(regName))
}
private def registerSources() {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 9177c1b..d967d38 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -21,7 +21,6 @@ import java.io.NotSerializableException
import java.util.Properties
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
-import java.util.function.BiFunction
import scala.annotation.tailrec
import scala.collection.Map
@@ -370,9 +369,10 @@ private[spark] class DAGScheduler(
* 2. An RDD that depends on multiple barrier RDDs (eg. barrierRdd1.zip(barrierRdd2)).
*/
private def checkBarrierStageWithRDDChainPattern(rdd: RDD[_], numTasksInStage: Int): Unit = {
- val predicate: RDD[_] => Boolean = (r =>
- r.getNumPartitions == numTasksInStage && r.dependencies.filter(_.rdd.isBarrier()).size <= 1)
- if (rdd.isBarrier() && !traverseParentRDDsWithinStage(rdd, predicate)) {
+ if (rdd.isBarrier() &&
+ !traverseParentRDDsWithinStage(rdd, (r: RDD[_]) =>
+ r.getNumPartitions == numTasksInStage &&
+ r.dependencies.count(_.rdd.isBarrier()) <= 1)) {
throw new BarrierJobUnsupportedRDDChainException
}
}
@@ -692,7 +692,7 @@ private[spark] class DAGScheduler(
}
val jobId = nextJobId.getAndIncrement()
- if (partitions.size == 0) {
+ if (partitions.isEmpty) {
val time = clock.getTimeMillis()
listenerBus.post(
SparkListenerJobStart(jobId, time, Seq[StageInfo](), properties))
@@ -702,9 +702,9 @@ private[spark] class DAGScheduler(
return new JobWaiter[U](this, jobId, 0, resultHandler)
}
- assert(partitions.size > 0)
+ assert(partitions.nonEmpty)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
- val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
+ val waiter = new JobWaiter[U](this, jobId, partitions.size, resultHandler)
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
@@ -767,9 +767,8 @@ private[spark] class DAGScheduler(
callSite: CallSite,
timeout: Long,
properties: Properties): PartialResult[R] = {
- val partitions = (0 until rdd.partitions.length).toArray
val jobId = nextJobId.getAndIncrement()
- if (partitions.isEmpty) {
+ if (rdd.partitions.isEmpty) {
// Return immediately if the job is running 0 tasks
val time = clock.getTimeMillis()
listenerBus.post(SparkListenerJobStart(jobId, time, Seq[StageInfo](), properties))
@@ -779,7 +778,8 @@ private[spark] class DAGScheduler(
val listener = new ApproximateActionListener(rdd, func, evaluator, timeout)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
eventProcessLoop.post(JobSubmitted(
- jobId, rdd, func2, partitions, callSite, listener, SerializationUtils.clone(properties)))
+ jobId, rdd, func2, rdd.partitions.indices.toArray, callSite, listener,
+ SerializationUtils.clone(properties)))
listener.awaitResult() // Will throw an exception if the job fails
}
@@ -812,7 +812,9 @@ private[spark] class DAGScheduler(
// This makes it easier to avoid race conditions between the user code and the map output
// tracker that might result if we told the user the stage had finished, but then they queries
// the map output tracker and some node failures had caused the output statistics to be lost.
- val waiter = new JobWaiter(this, jobId, 1, (i: Int, r: MapOutputStatistics) => callback(r))
+ val waiter = new JobWaiter[MapOutputStatistics](
+ this, jobId, 1,
+ (_: Int, r: MapOutputStatistics) => callback(r))
eventProcessLoop.post(MapStageSubmitted(
jobId, dependency, callSite, waiter, SerializationUtils.clone(properties)))
waiter
@@ -870,7 +872,7 @@ private[spark] class DAGScheduler(
* the last fetch failure.
*/
private[scheduler] def resubmitFailedStages() {
- if (failedStages.size > 0) {
+ if (failedStages.nonEmpty) {
// Failed stages may be removed by job cancellation, so failed might be empty even if
// the ResubmitFailedStages event has been scheduled.
logInfo("Resubmitting failed stages")
@@ -982,9 +984,7 @@ private[spark] class DAGScheduler(
"than the total number of slots in the cluster currently.")
// If jobId doesn't exist in the map, Scala coverts its value null to 0: Int automatically.
val numCheckFailures = barrierJobIdToNumTasksCheckFailures.compute(jobId,
- new BiFunction[Int, Int, Int] {
- override def apply(key: Int, value: Int): Int = value + 1
- })
+ (_: Int, value: Int) => value + 1)
if (numCheckFailures <= maxFailureNumTasksCheck) {
messageScheduler.schedule(
new Runnable {
@@ -1227,7 +1227,7 @@ private[spark] class DAGScheduler(
return
}
- if (tasks.size > 0) {
+ if (tasks.nonEmpty) {
logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
taskScheduler.submitTasks(new TaskSet(
@@ -1942,7 +1942,7 @@ private[spark] class DAGScheduler(
job: ActiveJob,
failureReason: String,
exception: Option[Throwable] = None): Unit = {
- val error = new SparkException(failureReason, exception.getOrElse(null))
+ val error = new SparkException(failureReason, exception.orNull)
var ableToCancelStages = true
// Cancel all independent, running stages.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index a284f79..c6dedaa 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -80,7 +80,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
logDebug("Fetching indirect task result for TID %s".format(tid))
scheduler.handleTaskGettingResult(taskSetManager, tid)
val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)
- if (!serializedTaskResult.isDefined) {
+ if (serializedTaskResult.isEmpty) {
/* We won't be able to get the task result if the machine that ran the task failed
* between when the task ended and when we tried to fetch the result, or if the
* block manager had to flush the result. */
@@ -128,27 +128,25 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
serializedData: ByteBuffer) {
var reason : TaskFailedReason = UnknownReason
try {
- getTaskResultExecutor.execute(new Runnable {
- override def run(): Unit = Utils.logUncaughtExceptions {
- val loader = Utils.getContextOrSparkClassLoader
- try {
- if (serializedData != null && serializedData.limit() > 0) {
- reason = serializer.get().deserialize[TaskFailedReason](
- serializedData, loader)
- }
- } catch {
- case cnd: ClassNotFoundException =>
- // Log an error but keep going here -- the task failed, so not catastrophic
- // if we can't deserialize the reason.
- logError(
- "Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader)
- case ex: Exception => // No-op
- } finally {
- // If there's an error while deserializing the TaskEndReason, this Runnable
- // will die. Still tell the scheduler about the task failure, to avoid a hang
- // where the scheduler thinks the task is still running.
- scheduler.handleFailedTask(taskSetManager, tid, taskState, reason)
+ getTaskResultExecutor.execute(() => Utils.logUncaughtExceptions {
+ val loader = Utils.getContextOrSparkClassLoader
+ try {
+ if (serializedData != null && serializedData.limit() > 0) {
+ reason = serializer.get().deserialize[TaskFailedReason](
+ serializedData, loader)
}
+ } catch {
+ case _: ClassNotFoundException =>
+ // Log an error but keep going here -- the task failed, so not catastrophic
+ // if we can't deserialize the reason.
+ logError(
+ "Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader)
+ case _: Exception => // No-op
+ } finally {
+ // If there's an error while deserializing the TaskEndReason, this Runnable
+ // will die. Still tell the scheduler about the task failure, to avoid a hang
+ // where the scheduler thinks the task is still running.
+ scheduler.handleFailedTask(taskSetManager, tid, taskState, reason)
}
})
} catch {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 1ef3566..bffa1ff 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -53,7 +53,7 @@ import org.apache.spark.util.{AccumulatorV2, SystemClock, ThreadUtils, Utils}
* we are holding a lock on ourselves. This class is called from many threads, notably:
* * The DAGScheduler Event Loop
* * The RPCHandler threads, responding to status updates from Executors
- * * Periodic revival of all offers from the CoarseGrainedSchedulerBackend, to accomodate delay
+ * * Periodic revival of all offers from the CoarseGrainedSchedulerBackend, to accommodate delay
* scheduling
* * task-result-getter threads
*/
@@ -194,11 +194,9 @@ private[spark] class TaskSchedulerImpl(
if (!isLocal && conf.get(SPECULATION_ENABLED)) {
logInfo("Starting speculative execution thread")
- speculationScheduler.scheduleWithFixedDelay(new Runnable {
- override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
- checkSpeculatableTasks()
- }
- }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
+ speculationScheduler.scheduleWithFixedDelay(
+ () => Utils.tryOrStopSparkContext(sc) { checkSpeculatableTasks() },
+ SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
}
}
@@ -373,7 +371,7 @@ private[spark] class TaskSchedulerImpl(
}
}
}
- return launchedTask
+ launchedTask
}
/**
@@ -527,7 +525,7 @@ private[spark] class TaskSchedulerImpl(
// TODO SPARK-24823 Cancel a job that contains barrier stage(s) if the barrier tasks don't get
// launched within a configured time.
- if (tasks.size > 0) {
+ if (tasks.nonEmpty) {
hasLaunchedTask = true
}
return tasks
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 808ef08..4830d0e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import javax.annotation.concurrent.GuardedBy
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.collection.mutable.{HashMap, HashSet}
import scala.concurrent.Future
import org.apache.hadoop.security.UserGroupInformation
@@ -133,10 +133,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Periodically revive offers to allow delay scheduling to work
val reviveIntervalMs = conf.get(SCHEDULER_REVIVE_INTERVAL).getOrElse(1000L)
- reviveThread.scheduleAtFixedRate(new Runnable {
- override def run(): Unit = Utils.tryLogNonFatalError {
- Option(self).foreach(_.send(ReviveOffers))
- }
+ reviveThread.scheduleAtFixedRate(() => Utils.tryLogNonFatalError {
+ Option(self).foreach(_.send(ReviveOffers))
}, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)
}
@@ -268,7 +266,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}.toIndexedSeq
scheduler.resourceOffers(workOffers)
}
- if (!taskDescs.isEmpty) {
+ if (taskDescs.nonEmpty) {
launchTasks(taskDescs)
}
}
@@ -296,7 +294,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
Seq.empty
}
}
- if (!taskDescs.isEmpty) {
+ if (taskDescs.nonEmpty) {
launchTasks(taskDescs)
}
}
@@ -669,7 +667,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
val killExecutors: Boolean => Future[Boolean] =
- if (!executorsToKill.isEmpty) {
+ if (executorsToKill.nonEmpty) {
_ => doKillExecutors(executorsToKill)
} else {
_ => Future.successful(false)
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index 71e0390..a3e8242 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -19,7 +19,6 @@ package org.apache.spark.status
import java.util.Date
import java.util.concurrent.ConcurrentHashMap
-import java.util.function.Function
import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
@@ -840,11 +839,11 @@ private[spark] class AppStatusListener(
// check if there is a new peak value for any of the executor level memory metrics,
// while reading from the log. SparkListenerStageExecutorMetrics are only processed
// when reading logs.
- liveExecutors.get(executorMetrics.execId)
- .orElse(deadExecutors.get(executorMetrics.execId)).map { exec =>
- if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(executorMetrics.executorMetrics)) {
- update(exec, now)
- }
+ liveExecutors.get(executorMetrics.execId).orElse(
+ deadExecutors.get(executorMetrics.execId)).foreach { exec =>
+ if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(executorMetrics.executorMetrics)) {
+ update(exec, now)
+ }
}
}
@@ -1048,9 +1047,7 @@ private[spark] class AppStatusListener(
private def getOrCreateStage(info: StageInfo): LiveStage = {
val stage = liveStages.computeIfAbsent((info.stageId, info.attemptNumber),
- new Function[(Int, Int), LiveStage]() {
- override def apply(key: (Int, Int)): LiveStage = new LiveStage()
- })
+ (_: (Int, Int)) => new LiveStage())
stage.info = info
stage
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 4806c13..3f3b7d2 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -143,12 +143,10 @@ private[spark] class ExternalSorter[K, V, C](
// user. (A partial ordering means that equal keys have comparator.compare(k, k) = 0, but some
// non-equal keys also have this, so we need to do a later pass to find truly equal keys).
// Note that we ignore this if no aggregator and no ordering are given.
- private val keyComparator: Comparator[K] = ordering.getOrElse(new Comparator[K] {
- override def compare(a: K, b: K): Int = {
- val h1 = if (a == null) 0 else a.hashCode()
- val h2 = if (b == null) 0 else b.hashCode()
- if (h1 < h2) -1 else if (h1 == h2) 0 else 1
- }
+ private val keyComparator: Comparator[K] = ordering.getOrElse((a: K, b: K) => {
+ val h1 = if (a == null) 0 else a.hashCode()
+ val h2 = if (b == null) 0 else b.hashCode()
+ if (h1 < h2) -1 else if (h1 == h2) 0 else 1
})
private def comparator: Option[Comparator[K]] = {
@@ -363,17 +361,15 @@ private[spark] class ExternalSorter[K, V, C](
* Merge-sort a sequence of (K, C) iterators using a given a comparator for the keys.
*/
private def mergeSort(iterators: Seq[Iterator[Product2[K, C]]], comparator: Comparator[K])
- : Iterator[Product2[K, C]] =
- {
+ : Iterator[Product2[K, C]] = {
val bufferedIters = iterators.filter(_.hasNext).map(_.buffered)
type Iter = BufferedIterator[Product2[K, C]]
- val heap = new mutable.PriorityQueue[Iter]()(new Ordering[Iter] {
- // Use the reverse order because PriorityQueue dequeues the max
- override def compare(x: Iter, y: Iter): Int = comparator.compare(y.head._1, x.head._1)
- })
+ // Use the reverse order (compare(y,x)) because PriorityQueue dequeues the max
+ val heap = new mutable.PriorityQueue[Iter]()(
+ (x: Iter, y: Iter) => comparator.compare(y.head._1, x.head._1))
heap.enqueue(bufferedIters: _*) // Will contain only the iterators with hasNext = true
new Iterator[Product2[K, C]] {
- override def hasNext: Boolean = !heap.isEmpty
+ override def hasNext: Boolean = heap.nonEmpty
override def next(): Product2[K, C] = {
if (!hasNext) {
@@ -400,13 +396,12 @@ private[spark] class ExternalSorter[K, V, C](
mergeCombiners: (C, C) => C,
comparator: Comparator[K],
totalOrder: Boolean)
- : Iterator[Product2[K, C]] =
- {
+ : Iterator[Product2[K, C]] = {
if (!totalOrder) {
// We only have a partial ordering, e.g. comparing the keys by hash code, which means that
// multiple distinct keys might be treated as equal by the ordering. To deal with this, we
// need to read all keys considered equal by the ordering at once and compare them.
- new Iterator[Iterator[Product2[K, C]]] {
+ val it = new Iterator[Iterator[Product2[K, C]]] {
val sorted = mergeSort(iterators, comparator).buffered
// Buffers reused across elements to decrease memory allocation
@@ -446,7 +441,8 @@ private[spark] class ExternalSorter[K, V, C](
// equal by the partial order; we flatten this below to get a flat iterator of (K, C).
keys.iterator.zip(combiners.iterator)
}
- }.flatMap(i => i)
+ }
+ it.flatten
} else {
// We have a total ordering, so the objects with the same key are sequential.
new Iterator[Product2[K, C]] {
@@ -650,7 +646,7 @@ private[spark] class ExternalSorter[K, V, C](
if (spills.isEmpty) {
// Special case: if we have only in-memory data, we don't need to merge streams, and perhaps
// we don't even need to sort by anything other than partition ID
- if (!ordering.isDefined) {
+ if (ordering.isEmpty) {
// The user hasn't requested sorted keys, so only sort by partition ID, not key
groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None)))
} else {
diff --git a/core/src/main/scala/org/apache/spark/util/collection/WritablePartitionedPairCollection.scala b/core/src/main/scala/org/apache/spark/util/collection/WritablePartitionedPairCollection.scala
index 5232c2b..dd7f68f 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/WritablePartitionedPairCollection.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/WritablePartitionedPairCollection.scala
@@ -68,27 +68,20 @@ private[spark] object WritablePartitionedPairCollection {
/**
* A comparator for (Int, K) pairs that orders them by only their partition ID.
*/
- def partitionComparator[K]: Comparator[(Int, K)] = new Comparator[(Int, K)] {
- override def compare(a: (Int, K), b: (Int, K)): Int = {
- a._1 - b._1
- }
- }
+ def partitionComparator[K]: Comparator[(Int, K)] = (a: (Int, K), b: (Int, K)) => a._1 - b._1
/**
* A comparator for (Int, K) pairs that orders them both by their partition ID and a key ordering.
*/
- def partitionKeyComparator[K](keyComparator: Comparator[K]): Comparator[(Int, K)] = {
- new Comparator[(Int, K)] {
- override def compare(a: (Int, K), b: (Int, K)): Int = {
- val partitionDiff = a._1 - b._1
- if (partitionDiff != 0) {
- partitionDiff
- } else {
- keyComparator.compare(a._2, b._2)
- }
+ def partitionKeyComparator[K](keyComparator: Comparator[K]): Comparator[(Int, K)] =
+ (a: (Int, K), b: (Int, K)) => {
+ val partitionDiff = a._1 - b._1
+ if (partitionDiff != 0) {
+ partitionDiff
+ } else {
+ keyComparator.compare(a._2, b._2)
}
}
- }
}
/**
diff --git a/core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala b/core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala
index bea18a3..c454043 100644
--- a/core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala
+++ b/core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala
@@ -39,7 +39,7 @@ private[spark] class DriverLogger(conf: SparkConf) extends Logging {
private val DEFAULT_LAYOUT = "%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n"
private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort)
- private var localLogFile: String = FileUtils.getFile(
+ private val localLogFile: String = FileUtils.getFile(
Utils.getLocalDir(conf),
DriverLogger.DRIVER_LOG_DIR,
DriverLogger.DRIVER_LOG_FILE).getAbsolutePath()
@@ -163,9 +163,7 @@ private[spark] class DriverLogger(conf: SparkConf) extends Logging {
def closeWriter(): Unit = {
try {
- threadpool.execute(new Runnable() {
- override def run(): Unit = DfsAsyncWriter.this.close()
- })
+ threadpool.execute(() => DfsAsyncWriter.this.close())
threadpool.shutdown()
threadpool.awaitTermination(1, TimeUnit.MINUTES)
} catch {
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 1123191..4b2dd9a 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -164,10 +164,9 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
test("Thread safeness - SPARK-5425") {
val executor = Executors.newSingleThreadScheduledExecutor()
- val sf = executor.scheduleAtFixedRate(new Runnable {
- override def run(): Unit =
- System.setProperty("spark.5425." + Random.nextInt(), Random.nextInt().toString)
- }, 0, 1, TimeUnit.MILLISECONDS)
+ executor.scheduleAtFixedRate(
+ () => System.setProperty("spark.5425." + Random.nextInt(), Random.nextInt().toString),
+ 0, 1, TimeUnit.MILLISECONDS)
try {
val t0 = System.nanoTime()
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala
index 0402d94..1148446 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala
@@ -27,7 +27,6 @@ import org.eclipse.jetty.servlet.ServletContextHandler
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
-import org.mockito.stubbing.Answer
import org.scalatest.Matchers
import org.scalatest.mockito.MockitoSugar
@@ -374,11 +373,9 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar
when(request.getRequestURI()).thenReturn("http://localhost:18080/history/local-123/jobs/job/")
when(request.getQueryString()).thenReturn("id=2")
val resp = mock[HttpServletResponse]
- when(resp.encodeRedirectURL(any())).thenAnswer(new Answer[String]() {
- override def answer(invocationOnMock: InvocationOnMock): String = {
- invocationOnMock.getArguments()(0).asInstanceOf[String]
- }
- })
+ when(resp.encodeRedirectURL(any())).thenAnswer { (invocationOnMock: InvocationOnMock) =>
+ invocationOnMock.getArguments()(0).asInstanceOf[String]
+ }
filter.doFilter(request, resp, null)
verify(resp).sendRedirect("http://localhost:18080/history/local-123/jobs/job/?id=2")
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index bce1733..d183170 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -33,7 +33,6 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataInputStream, Path}
import org.apache.hadoop.hdfs.{DFSInputStream, DistributedFileSystem}
import org.apache.hadoop.security.AccessControlException
import org.json4s.jackson.JsonMethods._
-import org.mockito.ArgumentMatcher
import org.mockito.ArgumentMatchers.{any, argThat}
import org.mockito.Mockito.{doThrow, mock, spy, verify, when}
import org.scalatest.BeforeAndAfter
@@ -49,7 +48,7 @@ import org.apache.spark.io._
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.security.GroupMappingServiceProvider
-import org.apache.spark.status.{AppStatusStore, ExecutorSummaryWrapper}
+import org.apache.spark.status.AppStatusStore
import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo}
import org.apache.spark.util.{Clock, JsonProtocol, ManualClock, Utils}
import org.apache.spark.util.logging.DriverLogger
@@ -1122,11 +1121,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
SparkListenerApplicationEnd(5L))
val mockedFs = spy(provider.fs)
doThrow(new AccessControlException("Cannot read accessDenied file")).when(mockedFs).open(
- argThat(new ArgumentMatcher[Path]() {
- override def matches(path: Path): Boolean = {
- path.asInstanceOf[Path].getName.toLowerCase(Locale.ROOT) == "accessdenied"
- }
- }))
+ argThat((path: Path) => path.getName.toLowerCase(Locale.ROOT) == "accessdenied"))
val mockedProvider = spy(provider)
when(mockedProvider.fs).thenReturn(mockedFs)
updateAndCheck(mockedProvider) { list =>
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
index 1deac43..c3b580e 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
@@ -24,7 +24,6 @@ import scala.concurrent.duration._
import org.mockito.ArgumentMatchers.{any, anyInt}
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
-import org.mockito.stubbing.Answer
import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
@@ -57,11 +56,9 @@ class DriverRunnerTest extends SparkFunSuite {
superviseRetry: Boolean) = {
val runner = createDriverRunner()
runner.setSleeper(mock(classOf[Sleeper]))
- doAnswer(new Answer[Int] {
- def answer(invocation: InvocationOnMock): Int = {
- runner.runCommandWithRetry(processBuilder, p => (), supervise = superviseRetry)
- }
- }).when(runner).prepareAndRunDriver()
+ doAnswer { (_: InvocationOnMock) =>
+ runner.runCommandWithRetry(processBuilder, p => (), supervise = superviseRetry)
+ }.when(runner).prepareAndRunDriver()
runner
}
@@ -120,11 +117,9 @@ class DriverRunnerTest extends SparkFunSuite {
runner.setSleeper(sleeper)
val (processBuilder, process) = createProcessBuilderAndProcess()
- when(process.waitFor()).thenAnswer(new Answer[Int] {
- def answer(invocation: InvocationOnMock): Int = {
- runner.kill()
- -1
- }
+ when(process.waitFor()).thenAnswer((_: InvocationOnMock) => {
+ runner.kill()
+ -1
})
runner.runCommandWithRetry(processBuilder, p => (), supervise = true)
@@ -169,11 +164,9 @@ class DriverRunnerTest extends SparkFunSuite {
val (processBuilder, process) = createProcessBuilderAndProcess()
val runner = createTestableDriverRunner(processBuilder, superviseRetry = true)
- when(process.waitFor()).thenAnswer(new Answer[Int] {
- def answer(invocation: InvocationOnMock): Int = {
- runner.kill()
- -1
- }
+ when(process.waitFor()).thenAnswer((_: InvocationOnMock) => {
+ runner.kill()
+ -1
})
runner.start()
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
index 168694c..5e8b363 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
@@ -28,7 +28,6 @@ import org.mockito.Answers.RETURNS_SMART_NULLS
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
-import org.mockito.stubbing.Answer
import org.scalatest.{BeforeAndAfter, Matchers}
import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
@@ -233,11 +232,8 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
val conf = new SparkConf().set(config.STORAGE_CLEANUP_FILES_AFTER_EXECUTOR_EXIT, value)
val cleanupCalled = new AtomicBoolean(false)
- when(shuffleService.executorRemoved(any[String], any[String])).thenAnswer(new Answer[Unit] {
- override def answer(invocations: InvocationOnMock): Unit = {
- cleanupCalled.set(true)
- }
- })
+ when(shuffleService.executorRemoved(any[String], any[String])).thenAnswer(
+ (_: InvocationOnMock) => cleanupCalled.set(true))
val externalShuffleServiceSupplier = new Supplier[ExternalShuffleService] {
override def get: ExternalShuffleService = shuffleService
}
@@ -269,11 +265,8 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
val appId = "app1"
val execId = "exec1"
val cleanupCalled = new AtomicBoolean(false)
- when(shuffleService.applicationRemoved(any[String])).thenAnswer(new Answer[Unit] {
- override def answer(invocations: InvocationOnMock): Unit = {
- cleanupCalled.set(true)
- }
- })
+ when(shuffleService.applicationRemoved(any[String])).thenAnswer(
+ (_: InvocationOnMock) => cleanupCalled.set(true))
val externalShuffleServiceSupplier = new Supplier[ExternalShuffleService] {
override def get: ExternalShuffleService = shuffleService
}
@@ -289,8 +282,8 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
}
executorDir.setLastModified(System.currentTimeMillis - (1000 * 120))
worker.receive(WorkDirCleanup)
- eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
- assert(!executorDir.exists() == true)
+ eventually(timeout(1000.milliseconds), interval(10.milliseconds)) {
+ assert(!executorDir.exists())
assert(cleanupCalled.get() == dbCleanupEnabled)
}
}
diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
index 558cd36..63a72e2 100644
--- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
@@ -279,13 +279,10 @@ class ExecutorSuite extends SparkFunSuite
val heartbeats = ArrayBuffer[Heartbeat]()
val mockReceiver = mock[RpcEndpointRef]
when(mockReceiver.askSync(any[Heartbeat], any[RpcTimeout])(any))
- .thenAnswer(new Answer[HeartbeatResponse] {
- override def answer(invocation: InvocationOnMock): HeartbeatResponse = {
- val args = invocation.getArguments()
- val mock = invocation.getMock
- heartbeats += args(0).asInstanceOf[Heartbeat]
- HeartbeatResponse(false)
- }
+ .thenAnswer((invocation: InvocationOnMock) => {
+ val args = invocation.getArguments()
+ heartbeats += args(0).asInstanceOf[Heartbeat]
+ HeartbeatResponse(false)
})
val receiverRef = executorClass.getDeclaredField("heartbeatReceiverRef")
receiverRef.setAccessible(true)
diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
index 1d5360a..4a8ba0a 100644
--- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
@@ -84,11 +84,8 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft
*/
protected def makeBadMemoryStore(mm: MemoryManager): MemoryStore = {
val ms = mock(classOf[MemoryStore], RETURNS_SMART_NULLS)
- when(ms.evictBlocksToFreeSpace(any(), anyLong(), any())).thenAnswer(new Answer[Long] {
- override def answer(invocation: InvocationOnMock): Long = {
- throw new RuntimeException("bad memory store!")
- }
- })
+ when(ms.evictBlocksToFreeSpace(any(), anyLong(), any())).thenAnswer(
+ (_: InvocationOnMock) => throw new RuntimeException("bad memory store!"))
mm.setMemoryStore(ms)
ms
}
@@ -106,27 +103,24 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft
* records the number of bytes this is called with. This variable is expected to be cleared
* by the test code later through [[assertEvictBlocksToFreeSpaceCalled]].
*/
- private def evictBlocksToFreeSpaceAnswer(mm: MemoryManager): Answer[Long] = {
- new Answer[Long] {
- override def answer(invocation: InvocationOnMock): Long = {
- val args = invocation.getArguments
- val numBytesToFree = args(1).asInstanceOf[Long]
- assert(numBytesToFree > 0)
- require(evictBlocksToFreeSpaceCalled.get() === DEFAULT_EVICT_BLOCKS_TO_FREE_SPACE_CALLED,
- "bad test: evictBlocksToFreeSpace() variable was not reset")
- evictBlocksToFreeSpaceCalled.set(numBytesToFree)
- if (numBytesToFree <= mm.storageMemoryUsed) {
- // We can evict enough blocks to fulfill the request for space
- mm.releaseStorageMemory(numBytesToFree, mm.tungstenMemoryMode)
- evictedBlocks += Tuple2(null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L))
- numBytesToFree
- } else {
- // No blocks were evicted because eviction would not free enough space.
- 0L
- }
+ private def evictBlocksToFreeSpaceAnswer(mm: MemoryManager): Answer[Long] =
+ (invocation: InvocationOnMock) => {
+ val args = invocation.getArguments
+ val numBytesToFree = args(1).asInstanceOf[Long]
+ assert(numBytesToFree > 0)
+ require(evictBlocksToFreeSpaceCalled.get() === DEFAULT_EVICT_BLOCKS_TO_FREE_SPACE_CALLED,
+ "bad test: evictBlocksToFreeSpace() variable was not reset")
+ evictBlocksToFreeSpaceCalled.set(numBytesToFree)
+ if (numBytesToFree <= mm.storageMemoryUsed) {
+ // We can evict enough blocks to fulfill the request for space
+ mm.releaseStorageMemory(numBytesToFree, mm.tungstenMemoryMode)
+ evictedBlocks += Tuple2(null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L))
+ numBytesToFree
+ } else {
+ // No blocks were evicted because eviction would not free enough space.
+ 0L
}
}
- }
/**
* Assert that [[MemoryStore.evictBlocksToFreeSpace]] is called with the given parameters.
diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
index 0adfb07..93a88cc 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
@@ -20,7 +20,6 @@ package org.apache.spark.scheduler
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{never, verify, when}
import org.mockito.invocation.InvocationOnMock
-import org.mockito.stubbing.Answer
import org.scalatest.BeforeAndAfterEach
import org.scalatest.mockito.MockitoSugar
@@ -480,17 +479,16 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
test("blacklisting kills executors, configured by BLACKLIST_KILL_ENABLED") {
val allocationClientMock = mock[ExecutorAllocationClient]
when(allocationClientMock.killExecutors(any(), any(), any(), any())).thenReturn(Seq("called"))
- when(allocationClientMock.killExecutorsOnHost("hostA")).thenAnswer(new Answer[Boolean] {
+ when(allocationClientMock.killExecutorsOnHost("hostA")).thenAnswer { (_: InvocationOnMock) =>
// To avoid a race between blacklisting and killing, it is important that the nodeBlacklist
// is updated before we ask the executor allocation client to kill all the executors
// on a particular host.
- override def answer(invocation: InvocationOnMock): Boolean = {
- if (blacklist.nodeBlacklist.contains("hostA") == false) {
- throw new IllegalStateException("hostA should be on the blacklist")
- }
+ if (blacklist.nodeBlacklist.contains("hostA")) {
true
+ } else {
+ throw new IllegalStateException("hostA should be on the blacklist")
}
- })
+ }
blacklist = new BlacklistTracker(listenerBusMock, conf, Some(allocationClientMock), clock)
// Disable auto-kill. Blacklist an executor and make sure killExecutors is not called.
@@ -552,17 +550,16 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
test("fetch failure blacklisting kills executors, configured by BLACKLIST_KILL_ENABLED") {
val allocationClientMock = mock[ExecutorAllocationClient]
when(allocationClientMock.killExecutors(any(), any(), any(), any())).thenReturn(Seq("called"))
- when(allocationClientMock.killExecutorsOnHost("hostA")).thenAnswer(new Answer[Boolean] {
+ when(allocationClientMock.killExecutorsOnHost("hostA")).thenAnswer { (_: InvocationOnMock) =>
// To avoid a race between blacklisting and killing, it is important that the nodeBlacklist
// is updated before we ask the executor allocation client to kill all the executors
// on a particular host.
- override def answer(invocation: InvocationOnMock): Boolean = {
- if (blacklist.nodeBlacklist.contains("hostA") == false) {
- throw new IllegalStateException("hostA should be on the blacklist")
- }
+ if (blacklist.nodeBlacklist.contains("hostA")) {
true
+ } else {
+ throw new IllegalStateException("hostA should be on the blacklist")
}
- })
+ }
conf.set(config.BLACKLIST_FETCH_FAILURE_ENABLED, true)
blacklist = new BlacklistTracker(listenerBusMock, conf, Some(allocationClientMock), clock)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
index a560013..2891dd6 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
@@ -29,7 +29,6 @@ import org.apache.hadoop.mapreduce.TaskType
import org.mockito.ArgumentMatchers.{any, eq => meq}
import org.mockito.Mockito.{doAnswer, spy, times, verify}
import org.mockito.invocation.InvocationOnMock
-import org.mockito.stubbing.Answer
import org.scalatest.BeforeAndAfter
import org.apache.spark._
@@ -98,34 +97,29 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
// Use Mockito.spy() to maintain the default infrastructure everywhere else
val mockTaskScheduler = spy(sc.taskScheduler.asInstanceOf[TaskSchedulerImpl])
- doAnswer(new Answer[Unit]() {
- override def answer(invoke: InvocationOnMock): Unit = {
- // Submit the tasks, then force the task scheduler to dequeue the
- // speculated task
- invoke.callRealMethod()
- mockTaskScheduler.backend.reviveOffers()
- }
- }).when(mockTaskScheduler).submitTasks(any())
-
- doAnswer(new Answer[TaskSetManager]() {
- override def answer(invoke: InvocationOnMock): TaskSetManager = {
- val taskSet = invoke.getArguments()(0).asInstanceOf[TaskSet]
- new TaskSetManager(mockTaskScheduler, taskSet, 4) {
- var hasDequeuedSpeculatedTask = false
- override def dequeueSpeculativeTask(
- execId: String,
- host: String,
- locality: TaskLocality.Value): Option[(Int, TaskLocality.Value)] = {
- if (!hasDequeuedSpeculatedTask) {
- hasDequeuedSpeculatedTask = true
- Some((0, TaskLocality.PROCESS_LOCAL))
- } else {
- None
- }
+ doAnswer { (invoke: InvocationOnMock) =>
+ // Submit the tasks, then force the task scheduler to dequeue the
+ // speculated task
+ invoke.callRealMethod()
+ mockTaskScheduler.backend.reviveOffers()
+ }.when(mockTaskScheduler).submitTasks(any())
+
+ doAnswer { (invoke: InvocationOnMock) =>
+ val taskSet = invoke.getArguments()(0).asInstanceOf[TaskSet]
+ new TaskSetManager(mockTaskScheduler, taskSet, 4) {
+ private var hasDequeuedSpeculatedTask = false
+ override def dequeueSpeculativeTask(execId: String,
+ host: String,
+ locality: TaskLocality.Value): Option[(Int, TaskLocality.Value)] = {
+ if (hasDequeuedSpeculatedTask) {
+ None
+ } else {
+ hasDequeuedSpeculatedTask = true
+ Some((0, TaskLocality.PROCESS_LOCAL))
}
}
}
- }).when(mockTaskScheduler).createTaskSetManager(any(), any())
+ }.when(mockTaskScheduler).createTaskSetManager(any(), any())
sc.taskScheduler = mockTaskScheduler
val dagSchedulerWithMockTaskScheduler = new DAGScheduler(sc, mockTaskScheduler)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
index aa6db8d..43d6ec1 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
@@ -356,13 +356,9 @@ private[spark] abstract class MockBackend(
assignedTasksWaitingToRun.nonEmpty
}
- override def start(): Unit = {
- reviveThread.scheduleAtFixedRate(new Runnable {
- override def run(): Unit = Utils.tryLogNonFatalError {
- reviveOffers()
- }
- }, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)
- }
+ override def start(): Unit =
+ reviveThread.scheduleAtFixedRate(() => Utils.tryLogNonFatalError { reviveOffers() },
+ 0, reviveIntervalMs, TimeUnit.MILLISECONDS)
override def stop(): Unit = {
reviveThread.shutdown()
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 32a2bdb..ad03194 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -25,14 +25,13 @@ import scala.collection.mutable.ArrayBuffer
import org.mockito.ArgumentMatchers.{any, anyInt, anyString}
import org.mockito.Mockito.{mock, never, spy, times, verify, when}
import org.mockito.invocation.InvocationOnMock
-import org.mockito.stubbing.Answer
import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.storage.BlockManagerId
-import org.apache.spark.util.{AccumulatorV2, ManualClock, Utils}
+import org.apache.spark.util.{AccumulatorV2, ManualClock}
class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
extends DAGScheduler(sc) {
@@ -1190,11 +1189,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
val taskSet = FakeTask.createTaskSet(numTasks = 1, stageId = 0, stageAttemptId = 0)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = new ManualClock(1))
when(mockDAGScheduler.taskEnded(any(), any(), any(), any(), any())).thenAnswer(
- new Answer[Unit] {
- override def answer(invocationOnMock: InvocationOnMock): Unit = {
- assert(manager.isZombie)
- }
- })
+ (invocationOnMock: InvocationOnMock) => assert(manager.isZombie))
val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)
assert(taskOption.isDefined)
// this would fail, inside our mock dag scheduler, if it calls dagScheduler.taskEnded() too soon
@@ -1317,12 +1312,10 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
// Assert the task has been black listed on the executor it was last executed on.
when(taskSetManagerSpy.addPendingTask(anyInt())).thenAnswer(
- new Answer[Unit] {
- override def answer(invocationOnMock: InvocationOnMock): Unit = {
- val task: Int = invocationOnMock.getArgument(0)
- assert(taskSetManager.taskSetBlacklistHelperOpt.get.
- isExecutorBlacklistedForTask(exec, task))
- }
+ (invocationOnMock: InvocationOnMock) => {
+ val task: Int = invocationOnMock.getArgument(0)
+ assert(taskSetManager.taskSetBlacklistHelperOpt.get.
+ isExecutorBlacklistedForTask(exec, task))
}
)
diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
index 72a1a4f..fc1422d 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
@@ -28,7 +28,6 @@ import org.mockito.Answers.RETURNS_SMART_NULLS
import org.mockito.ArgumentMatchers.{any, anyInt}
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
-import org.mockito.stubbing.Answer
import org.scalatest.BeforeAndAfterEach
import org.apache.spark._
@@ -69,16 +68,14 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
when(dependency.serializer).thenReturn(new JavaSerializer(conf))
when(taskContext.taskMetrics()).thenReturn(taskMetrics)
when(blockResolver.getDataFile(0, 0)).thenReturn(outputFile)
- doAnswer(new Answer[Void] {
- def answer(invocationOnMock: InvocationOnMock): Void = {
- val tmp: File = invocationOnMock.getArguments()(3).asInstanceOf[File]
- if (tmp != null) {
- outputFile.delete
- tmp.renameTo(outputFile)
- }
- null
+ doAnswer { (invocationOnMock: InvocationOnMock) =>
+ val tmp = invocationOnMock.getArguments()(3).asInstanceOf[File]
+ if (tmp != null) {
+ outputFile.delete
+ tmp.renameTo(outputFile)
}
- }).when(blockResolver)
+ null
+ }.when(blockResolver)
.writeIndexFileAndCommit(anyInt, anyInt, any(classOf[Array[Long]]), any(classOf[File]))
when(blockManager.diskBlockManager).thenReturn(diskBlockManager)
when(blockManager.getDiskWriter(
@@ -87,37 +84,29 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
any[SerializerInstance],
anyInt(),
any[ShuffleWriteMetrics]
- )).thenAnswer(new Answer[DiskBlockObjectWriter] {
- override def answer(invocation: InvocationOnMock): DiskBlockObjectWriter = {
- val args = invocation.getArguments
- val manager = new SerializerManager(new JavaSerializer(conf), conf)
- new DiskBlockObjectWriter(
- args(1).asInstanceOf[File],
- manager,
- args(2).asInstanceOf[SerializerInstance],
- args(3).asInstanceOf[Int],
- syncWrites = false,
- args(4).asInstanceOf[ShuffleWriteMetrics],
- blockId = args(0).asInstanceOf[BlockId]
- )
- }
+ )).thenAnswer((invocation: InvocationOnMock) => {
+ val args = invocation.getArguments
+ val manager = new SerializerManager(new JavaSerializer(conf), conf)
+ new DiskBlockObjectWriter(
+ args(1).asInstanceOf[File],
+ manager,
+ args(2).asInstanceOf[SerializerInstance],
+ args(3).asInstanceOf[Int],
+ syncWrites = false,
+ args(4).asInstanceOf[ShuffleWriteMetrics],
+ blockId = args(0).asInstanceOf[BlockId]
+ )
})
- when(diskBlockManager.createTempShuffleBlock()).thenAnswer(
- new Answer[(TempShuffleBlockId, File)] {
- override def answer(invocation: InvocationOnMock): (TempShuffleBlockId, File) = {
- val blockId = new TempShuffleBlockId(UUID.randomUUID)
- val file = new File(tempDir, blockId.name)
- blockIdToFileMap.put(blockId, file)
- temporaryFilesCreated += file
- (blockId, file)
- }
- })
- when(diskBlockManager.getFile(any[BlockId])).thenAnswer(
- new Answer[File] {
- override def answer(invocation: InvocationOnMock): File = {
- blockIdToFileMap(invocation.getArguments.head.asInstanceOf[BlockId])
- }
+ when(diskBlockManager.createTempShuffleBlock()).thenAnswer((_: InvocationOnMock) => {
+ val blockId = new TempShuffleBlockId(UUID.randomUUID)
+ val file = new File(tempDir, blockId.name)
+ blockIdToFileMap.put(blockId, file)
+ temporaryFilesCreated += file
+ (blockId, file)
})
+ when(diskBlockManager.getFile(any[BlockId])).thenAnswer { (invocation: InvocationOnMock) =>
+ blockIdToFileMap(invocation.getArguments.head.asInstanceOf[BlockId])
+ }
}
override def afterEach(): Unit = {
diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
index 0154d0b..27bb06b 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
@@ -24,7 +24,6 @@ import org.mockito.Answers.RETURNS_SMART_NULLS
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
-import org.mockito.stubbing.Answer
import org.scalatest.BeforeAndAfterEach
import org.apache.spark.{SparkConf, SparkFunSuite}
@@ -48,11 +47,7 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa
when(blockManager.diskBlockManager).thenReturn(diskBlockManager)
when(diskBlockManager.getFile(any[BlockId])).thenAnswer(
- new Answer[File] {
- override def answer(invocation: InvocationOnMock): File = {
- new File(tempDir, invocation.getArguments.head.toString)
- }
- })
+ (invocation: InvocationOnMock) => new File(tempDir, invocation.getArguments.head.toString))
}
override def afterEach(): Unit = {
diff --git a/core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala b/core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala
index 5351053..3dbc1c4 100644
--- a/core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/PartiallySerializedBlockSuite.scala
@@ -24,7 +24,6 @@ import scala.reflect.ClassTag
import org.mockito.Mockito
import org.mockito.Mockito.atLeastOnce
import org.mockito.invocation.InvocationOnMock
-import org.mockito.stubbing.Answer
import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester}
import org.apache.spark.{SparkConf, SparkFunSuite, TaskContext, TaskContextImpl}
@@ -59,11 +58,9 @@ class PartiallySerializedBlockSuite
val bbos: ChunkedByteBufferOutputStream = {
val spy = Mockito.spy(new ChunkedByteBufferOutputStream(128, ByteBuffer.allocate))
- Mockito.doAnswer(new Answer[ChunkedByteBuffer] {
- override def answer(invocationOnMock: InvocationOnMock): ChunkedByteBuffer = {
- Mockito.spy(invocationOnMock.callRealMethod().asInstanceOf[ChunkedByteBuffer])
- }
- }).when(spy).toChunkedByteBuffer
+ Mockito.doAnswer { (invocationOnMock: InvocationOnMock) =>
+ Mockito.spy(invocationOnMock.callRealMethod().asInstanceOf[ChunkedByteBuffer])
+ }.when(spy).toChunkedByteBuffer
spy
}
diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
index a1c298a..3ab2f0b 100644
--- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
@@ -28,7 +28,6 @@ import scala.concurrent.Future
import org.mockito.ArgumentMatchers.{any, eq => meq}
import org.mockito.Mockito.{mock, times, verify, when}
import org.mockito.invocation.InvocationOnMock
-import org.mockito.stubbing.Answer
import org.scalatest.PrivateMethodTester
import org.apache.spark.{SparkFunSuite, TaskContext}
@@ -50,9 +49,8 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
/** Creates a mock [[BlockTransferService]] that returns data from the given map. */
private def createMockTransfer(data: Map[BlockId, ManagedBuffer]): BlockTransferService = {
val transfer = mock(classOf[BlockTransferService])
- when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any()))
- .thenAnswer(new Answer[Unit] {
- override def answer(invocation: InvocationOnMock): Unit = {
+ when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any())).thenAnswer(
+ (invocation: InvocationOnMock) => {
val blocks = invocation.getArguments()(3).asInstanceOf[Array[String]]
val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
@@ -63,8 +61,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
listener.onBlockFetchFailure(blockId, new BlockNotFoundException(blockId))
}
}
- }
- })
+ })
transfer
}
@@ -168,8 +165,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
val transfer = mock(classOf[BlockTransferService])
when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any()))
- .thenAnswer(new Answer[Unit] {
- override def answer(invocation: InvocationOnMock): Unit = {
+ .thenAnswer((invocation: InvocationOnMock) => {
val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
Future {
// Return the first two blocks, and wait till task completion before returning the 3rd one
@@ -181,8 +177,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
listener.onBlockFetchSuccess(
ShuffleBlockId(0, 2, 0).toString, blocks(ShuffleBlockId(0, 2, 0)))
}
- }
- })
+ })
val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
(remoteBmId, blocks.keys.map(blockId => (blockId, 1.asInstanceOf[Long])).toSeq)).toIterator
@@ -237,20 +232,18 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
val transfer = mock(classOf[BlockTransferService])
when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any()))
- .thenAnswer(new Answer[Unit] {
- override def answer(invocation: InvocationOnMock): Unit = {
- val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
- Future {
- // Return the first two blocks, and wait till task completion before returning the last
- listener.onBlockFetchSuccess(
- ShuffleBlockId(0, 0, 0).toString, blocks(ShuffleBlockId(0, 0, 0)))
- listener.onBlockFetchSuccess(
- ShuffleBlockId(0, 1, 0).toString, blocks(ShuffleBlockId(0, 1, 0)))
- sem.acquire()
- listener.onBlockFetchSuccess(
- ShuffleBlockId(0, 2, 0).toString, blocks(ShuffleBlockId(0, 2, 0)))
- }
- }
+ .thenAnswer((invocation: InvocationOnMock) => {
+ val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
+ Future {
+ // Return the first two blocks, and wait till task completion before returning the last
+ listener.onBlockFetchSuccess(
+ ShuffleBlockId(0, 0, 0).toString, blocks(ShuffleBlockId(0, 0, 0)))
+ listener.onBlockFetchSuccess(
+ ShuffleBlockId(0, 1, 0).toString, blocks(ShuffleBlockId(0, 1, 0)))
+ sem.acquire()
+ listener.onBlockFetchSuccess(
+ ShuffleBlockId(0, 2, 0).toString, blocks(ShuffleBlockId(0, 2, 0)))
+ }
})
val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
@@ -298,8 +291,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
val transfer = mock(classOf[BlockTransferService])
when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any()))
- .thenAnswer(new Answer[Unit] {
- override def answer(invocation: InvocationOnMock): Unit = {
+ .thenAnswer((invocation: InvocationOnMock) => {
val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
Future {
// Return the first block, and then fail.
@@ -311,8 +303,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
ShuffleBlockId(0, 2, 0).toString, new BlockNotFoundException("blah"))
sem.release()
}
- }
- })
+ })
val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
(remoteBmId, blocks.keys.map(blockId => (blockId, 1.asInstanceOf[Long])).toSeq)).toIterator
@@ -389,8 +380,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
val transfer = mock(classOf[BlockTransferService])
when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any()))
- .thenAnswer(new Answer[Unit] {
- override def answer(invocation: InvocationOnMock): Unit = {
+ .thenAnswer((invocation: InvocationOnMock) => {
val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
Future {
// Return the first block, and then fail.
@@ -402,8 +392,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
ShuffleBlockId(0, 2, 0).toString, corruptLocalBuffer)
sem.release()
}
- }
- })
+ })
val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
(remoteBmId, blocks.keys.map(blockId => (blockId, 1.asInstanceOf[Long])).toSeq)).toIterator
@@ -431,8 +420,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
assert(id1 === ShuffleBlockId(0, 0, 0))
when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any()))
- .thenAnswer(new Answer[Unit] {
- override def answer(invocation: InvocationOnMock): Unit = {
+ .thenAnswer((invocation: InvocationOnMock) => {
val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
Future {
// Return the first block, and then fail.
@@ -440,8 +428,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
ShuffleBlockId(0, 1, 0).toString, mockCorruptBuffer())
sem.release()
}
- }
- })
+ })
// The next block is corrupt local block (the second one is corrupt and retried)
intercept[FetchFailedException] { iterator.next() }
@@ -588,8 +575,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
val transfer = mock(classOf[BlockTransferService])
when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any()))
- .thenAnswer(new Answer[Unit] {
- override def answer(invocation: InvocationOnMock): Unit = {
+ .thenAnswer((invocation: InvocationOnMock) => {
val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
Future {
// Return the first block, and then fail.
@@ -601,8 +587,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
ShuffleBlockId(0, 2, 0).toString, mockCorruptBuffer())
sem.release()
}
- }
- })
+ })
val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
(remoteBmId, blocks.keys.map(blockId => (blockId, 1.asInstanceOf[Long])).toSeq)).toIterator
@@ -654,14 +639,12 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
val transfer = mock(classOf[BlockTransferService])
var tempFileManager: DownloadFileManager = null
when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any()))
- .thenAnswer(new Answer[Unit] {
- override def answer(invocation: InvocationOnMock): Unit = {
- val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
- tempFileManager = invocation.getArguments()(5).asInstanceOf[DownloadFileManager]
- Future {
- listener.onBlockFetchSuccess(
- ShuffleBlockId(0, 0, 0).toString, remoteBlocks(ShuffleBlockId(0, 0, 0)))
- }
+ .thenAnswer((invocation: InvocationOnMock) => {
+ val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
+ tempFileManager = invocation.getArguments()(5).asInstanceOf[DownloadFileManager]
+ Future {
+ listener.onBlockFetchSuccess(
+ ShuffleBlockId(0, 0, 0).toString, remoteBlocks(ShuffleBlockId(0, 0, 0)))
}
})
diff --git a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
index c181553..aa3f062 100644
--- a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
@@ -71,11 +71,9 @@ class ThreadUtilsSuite extends SparkFunSuite {
keepAliveSeconds = 2)
try {
for (_ <- 1 to maxThreadNumber) {
- cachedThreadPool.execute(new Runnable {
- override def run(): Unit = {
- startThreadsLatch.countDown()
- latch.await(10, TimeUnit.SECONDS)
- }
+ cachedThreadPool.execute(() => {
+ startThreadsLatch.countDown()
+ latch.await(10, TimeUnit.SECONDS)
})
}
startThreadsLatch.await(10, TimeUnit.SECONDS)
@@ -84,11 +82,7 @@ class ThreadUtilsSuite extends SparkFunSuite {
// Submit a new task and it should be put into the queue since the thread number reaches the
// limitation
- cachedThreadPool.execute(new Runnable {
- override def run(): Unit = {
- latch.await(10, TimeUnit.SECONDS)
- }
- })
+ cachedThreadPool.execute(() => latch.await(10, TimeUnit.SECONDS))
assert(cachedThreadPool.getActiveCount === maxThreadNumber)
assert(cachedThreadPool.getQueue.size === 1)
diff --git a/core/src/test/scala/org/apache/spark/util/collection/AppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/AppendOnlyMapSuite.scala
index b28489a..f205770 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/AppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/AppendOnlyMapSuite.scala
@@ -17,8 +17,6 @@
package org.apache.spark.util.collection
-import java.util.Comparator
-
import scala.collection.mutable.HashSet
import org.apache.spark.SparkFunSuite
@@ -170,12 +168,10 @@ class AppendOnlyMapSuite extends SparkFunSuite {
case e: IllegalStateException => fail()
}
- val it = map.destructiveSortedIterator(new Comparator[String] {
- def compare(key1: String, key2: String): Int = {
- val x = if (key1 != null) key1.toInt else Int.MinValue
- val y = if (key2 != null) key2.toInt else Int.MinValue
- x.compareTo(y)
- }
+ val it = map.destructiveSortedIterator((key1: String, key2: String) => {
+ val x = if (key1 != null) key1.toInt else Int.MinValue
+ val y = if (key2 != null) key2.toInt else Int.MinValue
+ x.compareTo(y)
})
// Should be sorted by key
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
index bbc0b33..2bad56d 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@@ -17,8 +17,6 @@
package org.apache.spark.util.collection
-import java.util.Comparator
-
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
@@ -111,14 +109,9 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
val tmp = new Array[Long](size/2)
val tmpBuf = new LongArray(MemoryBlock.fromLongArray(tmp))
- new Sorter(new UnsafeSortDataFormat(tmpBuf)).sort(
- buf, 0, size, new Comparator[RecordPointerAndKeyPrefix] {
- override def compare(
- r1: RecordPointerAndKeyPrefix,
- r2: RecordPointerAndKeyPrefix): Int = {
- PrefixComparators.LONG.compare(r1.keyPrefix, r2.keyPrefix)
- }
- })
+ new Sorter(new UnsafeSortDataFormat(tmpBuf)).sort(buf, 0, size,
+ (r1: RecordPointerAndKeyPrefix, r2: RecordPointerAndKeyPrefix) =>
+ PrefixComparators.LONG.compare(r1.keyPrefix, r2.keyPrefix))
}
test("spilling with hash collisions") {
@@ -135,7 +128,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
buffer2: ArrayBuffer[String]): ArrayBuffer[String] = buffer1 ++= buffer2
val agg = new Aggregator[String, String, ArrayBuffer[String]](
- createCombiner _, mergeValue _, mergeCombiners _)
+ createCombiner, mergeValue, mergeCombiners)
val sorter = new ExternalSorter[String, String, ArrayBuffer[String]](
context, Some(agg), None, None)
diff --git a/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala
index 35a369e..acd0b0e 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala
@@ -18,7 +18,7 @@
package org.apache.spark.util.collection
import java.lang.{Float => JFloat}
-import java.util.{Arrays, Comparator}
+import java.util.Arrays
import java.util.concurrent.TimeUnit
import org.apache.spark.SparkFunSuite
@@ -219,10 +219,8 @@ class SorterSuite extends SparkFunSuite with Logging {
System.arraycopy(kvTuples, 0, kvTupleArray, 0, numElements)
}
runExperiment("Tuple-sort using Arrays.sort()")({
- Arrays.sort(kvTupleArray, new Comparator[AnyRef] {
- override def compare(x: AnyRef, y: AnyRef): Int =
- x.asInstanceOf[(JFloat, _)]._1.compareTo(y.asInstanceOf[(JFloat, _)]._1)
- })
+ Arrays.sort(kvTupleArray, (x: AnyRef, y: AnyRef) =>
+ x.asInstanceOf[(JFloat, _)]._1.compareTo(y.asInstanceOf[(JFloat, _)]._1))
}, prepareKvTupleArray)
// Test our Sorter where each element alternates between Float and Integer, non-primitive
@@ -245,9 +243,7 @@ class SorterSuite extends SparkFunSuite with Logging {
val sorter = new Sorter(new KVArraySortDataFormat[JFloat, AnyRef])
runExperiment("KV-sort using Sorter")({
- sorter.sort(keyValueArray, 0, numElements, new Comparator[JFloat] {
- override def compare(x: JFloat, y: JFloat): Int = x.compareTo(y)
- })
+ sorter.sort(keyValueArray, 0, numElements, (x: JFloat, y: JFloat) => x.compareTo(y))
}, prepareKeyValueArray)
}
@@ -280,11 +276,9 @@ class SorterSuite extends SparkFunSuite with Logging {
System.arraycopy(intObjects, 0, intObjectArray, 0, numElements)
}
- runExperiment("Java Arrays.sort() on non-primitive int array")({
- Arrays.sort(intObjectArray, new Comparator[Integer] {
- override def compare(x: Integer, y: Integer): Int = x.compareTo(y)
- })
- }, prepareIntObjectArray)
+ runExperiment("Java Arrays.sort() on non-primitive int array")(
+ Arrays.sort(intObjectArray, (x: Integer, y: Integer) => x.compareTo(y)),
+ prepareIntObjectArray)
val intPrimitiveArray = new Array[Int](numElements)
val prepareIntPrimitiveArray = () => {
diff --git a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala
index d570630..a3c006b 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala
@@ -69,9 +69,8 @@ class RadixSortSuite extends SparkFunSuite with Logging {
override def sortDescending = false
override def sortSigned = false
override def nullsFirst = true
- override def compare(a: Long, b: Long): Int = {
- return PrefixComparators.BINARY.compare(a & 0xffffff0000L, b & 0xffffff0000L)
- }
+ override def compare(a: Long, b: Long): Int =
+ PrefixComparators.BINARY.compare(a & 0xffffff0000L, b & 0xffffff0000L)
},
2, 4, false, false, true))
@@ -112,11 +111,9 @@ class RadixSortSuite extends SparkFunSuite with Logging {
private def referenceKeyPrefixSort(buf: LongArray, lo: Long, hi: Long, refCmp: PrefixComparator) {
val sortBuffer = new LongArray(MemoryBlock.fromLongArray(new Array[Long](buf.size().toInt)))
new Sorter(new UnsafeSortDataFormat(sortBuffer)).sort(
- buf, Ints.checkedCast(lo), Ints.checkedCast(hi), new Comparator[RecordPointerAndKeyPrefix] {
- override def compare(
- r1: RecordPointerAndKeyPrefix,
- r2: RecordPointerAndKeyPrefix): Int = refCmp.compare(r1.keyPrefix, r2.keyPrefix)
- })
+ buf, Ints.checkedCast(lo), Ints.checkedCast(hi),
+ (r1: RecordPointerAndKeyPrefix, r2: RecordPointerAndKeyPrefix) =>
+ refCmp.compare(r1.keyPrefix, r2.keyPrefix))
}
private def fuzzTest(name: String)(testFn: Long => Unit): Unit = {
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
index 14bc6ba..c64b070 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.kafka010
import java.{util => ju}
-import java.util.concurrent.{Executors, ThreadFactory}
+import java.util.concurrent.Executors
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
@@ -52,16 +52,14 @@ private[kafka010] class KafkaOffsetReader(
/**
* Used to ensure execute fetch operations execute in an UninterruptibleThread
*/
- val kafkaReaderThread = Executors.newSingleThreadExecutor(new ThreadFactory {
- override def newThread(r: Runnable): Thread = {
- val t = new UninterruptibleThread("Kafka Offset Reader") {
- override def run(): Unit = {
- r.run()
- }
+ val kafkaReaderThread = Executors.newSingleThreadExecutor((r: Runnable) => {
+ val t = new UninterruptibleThread("Kafka Offset Reader") {
+ override def run(): Unit = {
+ r.run()
}
- t.setDaemon(true)
- t
}
+ t.setDaemon(true)
+ t
})
val execContext = ExecutionContext.fromExecutorService(kafkaReaderThread)
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index a8eff6b..f7a2032 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -361,9 +361,8 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
override def capabilities(): ju.Set[TableCapability] = Collections.emptySet()
- override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new ScanBuilder {
- override def build(): Scan = new KafkaScan(options)
- }
+ override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder =
+ () => new KafkaScan(options)
override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
new WriteBuilder {
diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
index 93d0d2f..e042ae0 100644
--- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
+++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
@@ -53,7 +53,7 @@ class DirectKafkaStreamSuite
.setMaster("local[4]")
.setAppName(this.getClass.getSimpleName)
// Set a timeout of 10 seconds that's going to be used to fetch topics/partitions from kafka.
- // Othewise the poll timeout defaults to 2 minutes and causes test cases to run longer.
+ // Otherwise the poll timeout defaults to 2 minutes and causes test cases to run longer.
.set("spark.streaming.kafka.consumer.poll.ms", "10000")
private var ssc: StreamingContext = _
@@ -61,13 +61,13 @@ class DirectKafkaStreamSuite
private var kafkaTestUtils: KafkaTestUtils = _
- override def beforeAll {
+ override def beforeAll() {
super.beforeAll()
kafkaTestUtils = new KafkaTestUtils
kafkaTestUtils.setup()
}
- override def afterAll {
+ override def afterAll() {
try {
if (kafkaTestUtils != null) {
kafkaTestUtils.teardown()
@@ -454,13 +454,11 @@ class DirectKafkaStreamSuite
val data = rdd.map(_.value).collect()
collectedData.addAll(Arrays.asList(data: _*))
kafkaStream.asInstanceOf[CanCommitOffsets]
- .commitAsync(offsets, new OffsetCommitCallback() {
- def onComplete(m: JMap[TopicPartition, OffsetAndMetadata], e: Exception) {
- if (null != e) {
- logError("commit failed", e)
- } else {
- committed.putAll(m)
- }
+ .commitAsync(offsets, (m: JMap[TopicPartition, OffsetAndMetadata], e: Exception) => {
+ if (null != e) {
+ logError("commit failed", e)
+ } else {
+ committed.putAll(m)
}
})
}
diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala
index bd31b7d..9ea7bfc 100644
--- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala
+++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala
@@ -27,7 +27,6 @@ import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorC
import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
-import org.mockito.stubbing.Answer
import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester}
import org.scalatest.concurrent.Eventually
import org.scalatest.mockito.MockitoSugar
@@ -124,11 +123,9 @@ class KinesisCheckpointerSuite extends TestSuiteBase
test("if checkpointing is going on, wait until finished before removing and checkpointing") {
when(receiverMock.getLatestSeqNumToCheckpoint(shardId))
.thenReturn(someSeqNum).thenReturn(someOtherSeqNum)
- when(checkpointerMock.checkpoint(anyString)).thenAnswer(new Answer[Unit] {
- override def answer(invocations: InvocationOnMock): Unit = {
- clock.waitTillTime(clock.getTimeMillis() + checkpointInterval.milliseconds / 2)
- }
- })
+ when(checkpointerMock.checkpoint(anyString)).thenAnswer { (_: InvocationOnMock) =>
+ clock.waitTillTime(clock.getTimeMillis() + checkpointInterval.milliseconds / 2)
+ }
kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock)
clock.advance(checkpointInterval.milliseconds)
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
index da3db3c..27c08c8 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
@@ -130,16 +130,14 @@ private[impl] case class EdgeWithLocalIds[@specialized ED](
private[impl] object EdgeWithLocalIds {
implicit def lexicographicOrdering[ED]: Ordering[EdgeWithLocalIds[ED]] =
- new Ordering[EdgeWithLocalIds[ED]] {
- override def compare(a: EdgeWithLocalIds[ED], b: EdgeWithLocalIds[ED]): Int = {
- if (a.srcId == b.srcId) {
- if (a.dstId == b.dstId) 0
- else if (a.dstId < b.dstId) -1
- else 1
- } else if (a.srcId < b.srcId) -1
+ (a: EdgeWithLocalIds[ED], b: EdgeWithLocalIds[ED]) =>
+ if (a.srcId == b.srcId) {
+ if (a.dstId == b.dstId) 0
+ else if (a.dstId < b.dstId) -1
else 1
}
- }
+ else if (a.srcId < b.srcId) -1
+ else 1
private[graphx] def edgeArraySortDataFormat[ED] = {
new SortDataFormat[EdgeWithLocalIds[ED], Array[EdgeWithLocalIds[ED]]] {
diff --git a/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
index 4752495..0276f2d 100644
--- a/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
+++ b/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.repl
import java.io.File
import java.net.{URI, URL, URLClassLoader}
-import java.nio.channels.{FileChannel, ReadableByteChannel}
+import java.nio.channels.FileChannel
import java.nio.charset.StandardCharsets
import java.nio.file.{Paths, StandardOpenOption}
import java.util
@@ -33,7 +33,6 @@ import com.google.common.io.Files
import org.mockito.ArgumentMatchers.anyString
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
-import org.mockito.stubbing.Answer
import org.scalatest.BeforeAndAfterAll
import org.scalatest.mockito.MockitoSugar
@@ -191,12 +190,10 @@ class ExecutorClassLoaderSuite
val env = mock[SparkEnv]
val rpcEnv = mock[RpcEnv]
when(env.rpcEnv).thenReturn(rpcEnv)
- when(rpcEnv.openChannel(anyString())).thenAnswer(new Answer[ReadableByteChannel]() {
- override def answer(invocation: InvocationOnMock): ReadableByteChannel = {
- val uri = new URI(invocation.getArguments()(0).asInstanceOf[String])
- val path = Paths.get(tempDir1.getAbsolutePath(), uri.getPath().stripPrefix("/"))
- FileChannel.open(path, StandardOpenOption.READ)
- }
+ when(rpcEnv.openChannel(anyString())).thenAnswer((invocation: InvocationOnMock) => {
+ val uri = new URI(invocation.getArguments()(0).asInstanceOf[String])
+ val path = Paths.get(tempDir1.getAbsolutePath(), uri.getPath().stripPrefix("/"))
+ FileChannel.open(path, StandardOpenOption.READ)
})
val classLoader = new ExecutorClassLoader(new SparkConf(), env, "spark://localhost:1234",
diff --git a/repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala
index 039fc62..7e3d0d9 100644
--- a/repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala
+++ b/repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala
@@ -52,9 +52,7 @@ class SingletonReplSuite extends SparkFunSuite {
Main.sparkSession = null
// Starts a new thread to run the REPL interpreter, so that we won't block.
- thread = new Thread(new Runnable {
- override def run(): Unit = Main.doMain(Array("-classpath", classpath), interp)
- })
+ thread = new Thread(() => Main.doMain(Array("-classpath", classpath), interp))
thread.setDaemon(true)
thread.start()
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
index a2430c0..f16d1f3 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
@@ -47,9 +47,7 @@ private[k8s] class LoggingPodStatusWatcherImpl(
// start timer for periodic logging
private val scheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher")
- private val logRunnable: Runnable = new Runnable {
- override def run() = logShortStatus()
- }
+ private val logRunnable: Runnable = () => logShortStatus()
private var pod = Option.empty[Pod]
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
index 5583b46..010d93f 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
@@ -68,7 +68,7 @@ private[spark] class ExecutorPodsSnapshotsStoreImpl(subscribersExecutor: Schedul
}
subscribers += newSubscriber
pollingTasks += subscribersExecutor.scheduleWithFixedDelay(
- toRunnable(() => callSubscriber(newSubscriber)),
+ () => callSubscriber(newSubscriber),
0L,
processBatchIntervalMillis,
TimeUnit.MILLISECONDS)
@@ -103,10 +103,6 @@ private[spark] class ExecutorPodsSnapshotsStoreImpl(subscribersExecutor: Schedul
}
}
- private def toRunnable[T](runnable: () => Unit): Runnable = new Runnable {
- override def run(): Unit = runnable()
- }
-
private case class SnapshotsSubscriber(
snapshotsBuffer: BlockingQueue[ExecutorPodsSnapshot],
onNewSnapshots: Seq[ExecutorPodsSnapshot] => Unit)
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala
index 95de7d9..b0604ea 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala
@@ -23,7 +23,6 @@ import io.fabric8.kubernetes.api.model.{Container, HasMetadata, PodBuilder, Secr
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{mock, when}
import org.mockito.invocation.InvocationOnMock
-import org.mockito.stubbing.Answer
import org.apache.spark.deploy.k8s.SparkPod
@@ -38,16 +37,14 @@ object KubernetesFeaturesTestUtils {
when(mockStep.getAdditionalPodSystemProperties())
.thenReturn(Map(stepType -> stepType))
when(mockStep.configurePod(any(classOf[SparkPod])))
- .thenAnswer(new Answer[SparkPod]() {
- override def answer(invocation: InvocationOnMock): SparkPod = {
- val originalPod: SparkPod = invocation.getArgument(0)
- val configuredPod = new PodBuilder(originalPod.pod)
- .editOrNewMetadata()
- .addToLabels(stepType, stepType)
- .endMetadata()
- .build()
- SparkPod(configuredPod, originalPod.container)
- }
+ .thenAnswer((invocation: InvocationOnMock) => {
+ val originalPod: SparkPod = invocation.getArgument(0)
+ val configuredPod = new PodBuilder(originalPod.pod)
+ .editOrNewMetadata()
+ .addToLabels(stepType, stepType)
+ .endMetadata()
+ .build()
+ SparkPod(configuredPod, originalPod.container)
})
mockStep
}
@@ -67,6 +64,6 @@ object KubernetesFeaturesTestUtils {
def filter[T: ClassTag](list: Seq[HasMetadata]): Seq[T] = {
val desired = implicitly[ClassTag[T]].runtimeClass
- list.filter(_.getClass() == desired).map(_.asInstanceOf[T]).toSeq
+ list.filter(_.getClass() == desired).map(_.asInstanceOf[T])
}
}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
index 55d9adc..5862f64 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler.cluster.k8s
import io.fabric8.kubernetes.api.model.{DoneablePod, Pod, PodBuilder}
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.dsl.PodResource
-import org.mockito.{ArgumentMatcher, Matchers, Mock, MockitoAnnotations}
+import org.mockito.{Mock, MockitoAnnotations}
import org.mockito.ArgumentMatchers.{any, eq => meq}
import org.mockito.Mockito.{never, times, verify, when}
import org.mockito.invocation.InvocationOnMock
@@ -27,7 +27,7 @@ import org.mockito.stubbing.Answer
import org.scalatest.BeforeAndAfter
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesTestConf, SparkPod}
+import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, SparkPod}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.Fabric8Aliases._
@@ -153,12 +153,9 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
verify(podOperations).create(podWithAttachedContainerForId(2))
}
- private def executorPodAnswer(): Answer[SparkPod] = {
- new Answer[SparkPod] {
- override def answer(invocation: InvocationOnMock): SparkPod = {
- val k8sConf: KubernetesExecutorConf = invocation.getArgument(0)
- executorPodWithId(k8sConf.executorId.toInt)
- }
- }
+ private def executorPodAnswer(): Answer[SparkPod] =
+ (invocation: InvocationOnMock) => {
+ val k8sConf: KubernetesExecutorConf = invocation.getArgument(0)
+ executorPodWithId(k8sConf.executorId.toInt)
}
}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
index b20ed47..9920f4d 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
@@ -26,7 +26,6 @@ import org.mockito.Mockito.{mock, never, times, verify, when}
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.scalatest.BeforeAndAfter
-import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.spark.{SparkConf, SparkFunSuite}
@@ -125,13 +124,10 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte
""".stripMargin
}
- private def namedPodsAnswer(): Answer[PodResource[Pod, DoneablePod]] = {
- new Answer[PodResource[Pod, DoneablePod]] {
- override def answer(invocation: InvocationOnMock): PodResource[Pod, DoneablePod] = {
- val podName: String = invocation.getArgument(0)
- namedExecutorPods.getOrElseUpdate(
- podName, mock(classOf[PodResource[Pod, DoneablePod]]))
- }
+ private def namedPodsAnswer(): Answer[PodResource[Pod, DoneablePod]] =
+ (invocation: InvocationOnMock) => {
+ val podName: String = invocation.getArgument(0)
+ namedExecutorPods.getOrElseUpdate(
+ podName, mock(classOf[PodResource[Pod, DoneablePod]]))
}
- }
}
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index f9bdddc..5d939cf 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -192,11 +192,9 @@ private[yarn] class YarnAllocator(
* A sequence of pending container requests at the given location that have not yet been
* fulfilled.
*/
- private def getPendingAtLocation(location: String): Seq[ContainerRequest] = {
+ private def getPendingAtLocation(location: String): Seq[ContainerRequest] =
amClient.getMatchingRequests(RM_REQUEST_PRIORITY, location, resource).asScala
.flatMap(_.asScala)
- .toSeq
- }
/**
* Request as many executors from the ResourceManager as needed to reach the desired total. If
@@ -384,7 +382,7 @@ private[yarn] class YarnAllocator(
def stop(): Unit = {
// Forcefully shut down the launcher pool, in case this is being called in the middle of
// container allocation. This will prevent queued executors from being started - and
- // potentially interrupt active ExecutorRunnable instaces too.
+ // potentially interrupt active ExecutorRunnable instances too.
launcherPool.shutdownNow()
}
@@ -467,7 +465,7 @@ private[yarn] class YarnAllocator(
remainingAfterOffRackMatches)
}
- if (!remainingAfterOffRackMatches.isEmpty) {
+ if (remainingAfterOffRackMatches.nonEmpty) {
logDebug(s"Releasing ${remainingAfterOffRackMatches.size} unneeded containers that were " +
s"allocated to us")
for (container <- remainingAfterOffRackMatches) {
@@ -550,35 +548,33 @@ private[yarn] class YarnAllocator(
if (runningExecutors.size() < targetNumExecutors) {
numExecutorsStarting.incrementAndGet()
if (launchContainers) {
- launcherPool.execute(new Runnable {
- override def run(): Unit = {
- try {
- new ExecutorRunnable(
- Some(container),
- conf,
- sparkConf,
- driverUrl,
- executorId,
- executorHostname,
- executorMemory,
- executorCores,
- appAttemptId.getApplicationId.toString,
- securityMgr,
- localResources
- ).run()
- updateInternalState()
- } catch {
- case e: Throwable =>
- numExecutorsStarting.decrementAndGet()
- if (NonFatal(e)) {
- logError(s"Failed to launch executor $executorId on container $containerId", e)
- // Assigned container should be released immediately
- // to avoid unnecessary resource occupation.
- amClient.releaseAssignedContainer(containerId)
- } else {
- throw e
- }
- }
+ launcherPool.execute(() => {
+ try {
+ new ExecutorRunnable(
+ Some(container),
+ conf,
+ sparkConf,
+ driverUrl,
+ executorId,
+ executorHostname,
+ executorMemory,
+ executorCores,
+ appAttemptId.getApplicationId.toString,
+ securityMgr,
+ localResources
+ ).run()
+ updateInternalState()
+ } catch {
+ case e: Throwable =>
+ numExecutorsStarting.decrementAndGet()
+ if (NonFatal(e)) {
+ logError(s"Failed to launch executor $executorId on container $containerId", e)
+ // Assigned container should be released immediately
+ // to avoid unnecessary resource occupation.
+ amClient.releaseAssignedContainer(containerId)
+ } else {
+ throw e
+ }
}
})
} else {
@@ -776,7 +772,7 @@ private[yarn] class YarnAllocator(
}
}
- (localityMatched.toSeq, localityUnMatched.toSeq, localityFree.toSeq)
+ (localityMatched, localityUnMatched, localityFree)
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
index f7955bc..2a7488a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
@@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.Platform
import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.unsafe.array.ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
import org.apache.spark.unsafe.types.{ByteArray, UTF8String}
@@ -273,7 +272,7 @@ case class ArraysZip(children: Seq[Expression]) extends Expression with ExpectsI
}
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
- if (children.length == 0) {
+ if (children.isEmpty) {
emptyInputGenCode(ev)
} else {
nonEmptyInputGenCode(ctx, ev)
@@ -718,17 +717,15 @@ trait ArraySortLike extends ExpectsInputTypes {
case _ @ ArrayType(s: StructType, _) => s.interpretedOrdering.asInstanceOf[Ordering[Any]]
}
- new Comparator[Any]() {
- override def compare(o1: Any, o2: Any): Int = {
- if (o1 == null && o2 == null) {
- 0
- } else if (o1 == null) {
- nullOrder
- } else if (o2 == null) {
- -nullOrder
- } else {
- ordering.compare(o1, o2)
- }
+ (o1: Any, o2: Any) => {
+ if (o1 == null && o2 == null) {
+ 0
+ } else if (o1 == null) {
+ nullOrder
+ } else if (o2 == null) {
+ -nullOrder
+ } else {
+ ordering.compare(o1, o2)
}
}
}
@@ -740,17 +737,15 @@ trait ArraySortLike extends ExpectsInputTypes {
case _ @ ArrayType(s: StructType, _) => s.interpretedOrdering.asInstanceOf[Ordering[Any]]
}
- new Comparator[Any]() {
- override def compare(o1: Any, o2: Any): Int = {
- if (o1 == null && o2 == null) {
- 0
- } else if (o1 == null) {
- -nullOrder
- } else if (o2 == null) {
- nullOrder
- } else {
- ordering.compare(o2, o1)
- }
+ (o1: Any, o2: Any) => {
+ if (o1 == null && o2 == null) {
+ 0
+ } else if (o1 == null) {
+ -nullOrder
+ } else if (o2 == null) {
+ nullOrder
+ } else {
+ ordering.compare(o2, o1)
}
}
}
@@ -769,7 +764,6 @@ trait ArraySortLike extends ExpectsInputTypes {
}
def sortCodegen(ctx: CodegenContext, ev: ExprCode, base: String, order: String): String = {
- val arrayData = classOf[ArrayData].getName
val genericArrayData = classOf[GenericArrayData].getName
val unsafeArrayData = classOf[UnsafeArrayData].getName
val array = ctx.freshName("array")
@@ -2784,7 +2778,7 @@ case class ArrayRepeat(left: Expression, right: Expression)
} else {
if (count.asInstanceOf[Int] > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
throw new RuntimeException(s"Unsuccessful try to create array with $count elements " +
- s"due to exceeding the array size limit ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.");
+ s"due to exceeding the array size limit ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
}
val element = left.eval(input)
new GenericArrayData(Array.fill(count.asInstanceOf[Int])(element))
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
index a5dbc75..12e8d02 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
@@ -35,12 +35,8 @@ package object util extends Logging {
val origErr = System.err
val origOut = System.out
try {
- System.setErr(new PrintStream(new OutputStream {
- def write(b: Int) = {}
- }))
- System.setOut(new PrintStream(new OutputStream {
- def write(b: Int) = {}
- }))
+ System.setErr(new PrintStream((_: Int) => {}))
+ System.setOut(new PrintStream((_: Int) => {}))
f
} finally {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/BinaryType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/BinaryType.scala
index cc8b3e6..dddf874 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/BinaryType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/BinaryType.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.types
-import scala.math.Ordering
import scala.reflect.runtime.universe.typeTag
import org.apache.spark.annotation.Stable
@@ -37,11 +36,8 @@ class BinaryType private() extends AtomicType {
@transient private[sql] lazy val tag = typeTag[InternalType]
- private[sql] val ordering = new Ordering[InternalType] {
- def compare(x: Array[Byte], y: Array[Byte]): Int = {
- TypeUtils.compareBinary(x, y)
- }
- }
+ private[sql] val ordering =
+ (x: Array[Byte], y: Array[Byte]) => TypeUtils.compareBinary(x, y)
/**
* The default size of a value of the BinaryType is 100 bytes.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DoubleType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DoubleType.scala
index afd3353..7a48202 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DoubleType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DoubleType.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.types
-import scala.math.{Fractional, Numeric, Ordering}
+import scala.math.{Fractional, Numeric}
import scala.math.Numeric.DoubleAsIfIntegral
import scala.reflect.runtime.universe.typeTag
@@ -38,9 +38,8 @@ class DoubleType private() extends FractionalType {
@transient private[sql] lazy val tag = typeTag[InternalType]
private[sql] val numeric = implicitly[Numeric[Double]]
private[sql] val fractional = implicitly[Fractional[Double]]
- private[sql] val ordering = new Ordering[Double] {
- override def compare(x: Double, y: Double): Int = Utils.nanSafeCompareDoubles(x, y)
- }
+ private[sql] val ordering =
+ (x: Double, y: Double) => Utils.nanSafeCompareDoubles(x, y)
private[sql] val asIntegral = DoubleAsIfIntegral
/**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/FloatType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/FloatType.scala
index 6d98987..652edb9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/FloatType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/FloatType.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.types
-import scala.math.{Fractional, Numeric, Ordering}
+import scala.math.{Fractional, Numeric}
import scala.math.Numeric.FloatAsIfIntegral
import scala.reflect.runtime.universe.typeTag
@@ -38,9 +38,8 @@ class FloatType private() extends FractionalType {
@transient private[sql] lazy val tag = typeTag[InternalType]
private[sql] val numeric = implicitly[Numeric[Float]]
private[sql] val fractional = implicitly[Fractional[Float]]
- private[sql] val ordering = new Ordering[Float] {
- override def compare(x: Float, y: Float): Int = Utils.nanSafeCompareFloats(x, y)
- }
+ private[sql] val ordering =
+ (x: Float, y: Float) => Utils.nanSafeCompareFloats(x, y)
private[sql] val asIntegral = FloatAsIfIntegral
/**
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala
index 2fcaeca..366188c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogEventSuite.scala
@@ -38,11 +38,7 @@ class ExternalCatalogEventSuite extends SparkFunSuite {
f: (ExternalCatalog, Seq[ExternalCatalogEvent] => Unit) => Unit): Unit = test(name) {
val catalog = new ExternalCatalogWithListener(newCatalog)
val recorder = mutable.Buffer.empty[ExternalCatalogEvent]
- catalog.addListener(new ExternalCatalogEventListener {
- override def onEvent(event: ExternalCatalogEvent): Unit = {
- recorder += event
- }
- })
+ catalog.addListener((event: ExternalCatalogEvent) => recorder += event)
f(catalog, (expected: Seq[ExternalCatalogEvent]) => {
val actual = recorder.clone()
recorder.clear()
@@ -174,9 +170,6 @@ class ExternalCatalogEventSuite extends SparkFunSuite {
className = "",
resources = Seq.empty)
- val newIdentifier = functionDefinition.identifier.copy(funcName = "fn4")
- val renamedFunctionDefinition = functionDefinition.copy(identifier = newIdentifier)
-
catalog.createDatabase(dbDefinition, ignoreIfExists = false)
checkEvents(CreateDatabasePreEvent("db5") :: CreateDatabaseEvent("db5") :: Nil)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
index c6665d2..2bd5cad 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
@@ -112,9 +112,7 @@ object SortPrefixUtils {
val field = schema.head
getPrefixComparator(SortOrder(BoundReference(0, field.dataType, field.nullable), Ascending))
} else {
- new PrefixComparator {
- override def compare(prefix1: Long, prefix2: Long): Int = 0
- }
+ (_: Long, _: Long) => 0
}
}
@@ -164,12 +162,7 @@ object SortPrefixUtils {
}
}
} else {
- new UnsafeExternalRowSorter.PrefixComputer {
- override def computePrefix(row: InternalRow):
- UnsafeExternalRowSorter.PrefixComputer.Prefix = {
- emptyPrefix
- }
- }
+ _: InternalRow => emptyPrefix
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 235801a..1d3cc88 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -30,7 +30,7 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver}
+import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
@@ -72,7 +72,7 @@ case class CreateDatabaseCommand(
CatalogDatabase(
databaseName,
comment.getOrElse(""),
- path.map(CatalogUtils.stringToURI(_)).getOrElse(catalog.getDefaultDBPath(databaseName)),
+ path.map(CatalogUtils.stringToURI).getOrElse(catalog.getDefaultDBPath(databaseName)),
props),
ifNotExists)
Seq.empty[Row]
@@ -352,9 +352,8 @@ case class AlterTableChangeColumnCommand(
}
// Add the comment to a column, if comment is empty, return the original column.
- private def addComment(column: StructField, comment: Option[String]): StructField = {
- comment.map(column.withComment(_)).getOrElse(column)
- }
+ private def addComment(column: StructField, comment: Option[String]): StructField =
+ comment.map(column.withComment).getOrElse(column)
// Compare a [[StructField]] to another, return true if they have the same column
// name(by resolver) and dataType.
@@ -584,14 +583,12 @@ case class AlterTableRecoverPartitionsCommand(
// It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow)
val jobConf = new JobConf(hadoopConf, this.getClass)
val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
- new PathFilter {
- override def accept(path: Path): Boolean = {
- val name = path.getName
- if (name != "_SUCCESS" && name != "_temporary" && !name.startsWith(".")) {
- pathFilter == null || pathFilter.accept(path)
- } else {
- false
- }
+ path: Path => {
+ val name = path.getName
+ if (name != "_SUCCESS" && name != "_temporary" && !name.startsWith(".")) {
+ pathFilter == null || pathFilter.accept(path)
+ } else {
+ false
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 4c69927..c907ac2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -18,7 +18,6 @@
package org.apache.spark.sql.execution.datasources
import java.util.Locale
-import java.util.concurrent.Callable
import org.apache.hadoop.fs.Path
@@ -222,23 +221,20 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
private def readDataSourceTable(table: CatalogTable): LogicalPlan = {
val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table)
val catalog = sparkSession.sessionState.catalog
- catalog.getCachedPlan(qualifiedTableName, new Callable[LogicalPlan]() {
- override def call(): LogicalPlan = {
- val pathOption = table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_))
- val dataSource =
- DataSource(
- sparkSession,
- // In older version(prior to 2.1) of Spark, the table schema can be empty and should be
- // inferred at runtime. We should still support it.
- userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema),
- partitionColumns = table.partitionColumnNames,
- bucketSpec = table.bucketSpec,
- className = table.provider.get,
- options = table.storage.properties ++ pathOption,
- catalogTable = Some(table))
-
- LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), table)
- }
+ catalog.getCachedPlan(qualifiedTableName, () => {
+ val pathOption = table.storage.locationUri.map("path" -> CatalogUtils.URIToString(_))
+ val dataSource =
+ DataSource(
+ sparkSession,
+ // In older version(prior to 2.1) of Spark, the table schema can be empty and should be
+ // inferred at runtime. We should still support it.
+ userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema),
+ partitionColumns = table.partitionColumnNames,
+ bucketSpec = table.bucketSpec,
+ className = table.provider.get,
+ options = table.storage.properties ++ pathOption,
+ catalogTable = Some(table))
+ LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), table)
})
}
@@ -484,8 +480,8 @@ object DataSourceStrategy {
// Because we only convert In to InSet in Optimizer when there are more than certain
// items. So it is possible we still get an In expression here that needs to be pushed
// down.
- case expressions.In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) =>
- val hSet = list.map(e => e.eval(EmptyRow))
+ case expressions.In(a: Attribute, list) if list.forall(_.isInstanceOf[Literal]) =>
+ val hSet = list.map(_.eval(EmptyRow))
val toScala = CatalystTypeConverters.createToScalaConverter(a.dataType)
Some(sources.In(a.name, hSet.toArray.map(toScala)))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
index b3e4240..fe6362d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
@@ -69,7 +69,7 @@ trait CheckpointFileManager {
/** List all the files in a path. */
def list(path: Path): Array[FileStatus] = {
- list(path, new PathFilter { override def accept(path: Path): Boolean = true })
+ list(path, (_: Path) => true)
}
/** Make directory at the give path and all its parent directories as needed. */
@@ -103,7 +103,7 @@ object CheckpointFileManager extends Logging {
* @param overwriteIfPossible If true, then the implementations must do a best-effort attempt to
* overwrite the file if it already exists. It should not throw
* any exception if the file exists. However, if false, then the
- * implementation must not overwrite if the file alraedy exists and
+ * implementation must not overwrite if the file already exists and
* must throw `FileAlreadyExistsException` in that case.
*/
def renameTempFile(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit
@@ -236,14 +236,12 @@ class FileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration
fs.open(path)
}
- override def exists(path: Path): Boolean = {
- try
- return fs.getFileStatus(path) != null
- catch {
- case e: FileNotFoundException =>
- return false
+ override def exists(path: Path): Boolean =
+ try {
+ fs.getFileStatus(path) != null
+ } catch {
+ case _: FileNotFoundException => false
}
- }
override def renameTempFile(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = {
if (!overwriteIfPossible && fs.exists(dstPath)) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
index 77bc0ba..9e9bced 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
@@ -23,7 +23,7 @@ import java.nio.charset.StandardCharsets.UTF_8
import scala.io.{Source => IOSource}
import scala.reflect.ClassTag
-import org.apache.hadoop.fs.{Path, PathFilter}
+import org.apache.hadoop.fs.Path
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization
@@ -169,13 +169,13 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
*/
private def compact(batchId: Long, logs: Array[T]): Boolean = {
val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval)
- val allLogs = validBatches.map { id =>
+ val allLogs = validBatches.flatMap { id =>
super.get(id).getOrElse {
throw new IllegalStateException(
s"${batchIdToPath(id)} doesn't exist when compacting batch $batchId " +
s"(compactInterval: $compactInterval)")
}
- }.flatten ++ logs
+ } ++ logs
// Return false as there is another writer.
super.add(batchId, compactLogs(allLogs).toArray)
}
@@ -192,13 +192,13 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
if (latestId >= 0) {
try {
val logs =
- getAllValidBatches(latestId, compactInterval).map { id =>
+ getAllValidBatches(latestId, compactInterval).flatMap { id =>
super.get(id).getOrElse {
throw new IllegalStateException(
s"${batchIdToPath(id)} doesn't exist " +
s"(latestId: $latestId, compactInterval: $compactInterval)")
}
- }.flatten
+ }
return compactLogs(logs).toArray
} catch {
case e: IOException =>
@@ -240,15 +240,13 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
s"min compaction batch id to delete = $minCompactionBatchId")
val expiredTime = System.currentTimeMillis() - fileCleanupDelayMs
- fileManager.list(metadataPath, new PathFilter {
- override def accept(path: Path): Boolean = {
- try {
- val batchId = getBatchIdFromFileName(path.getName)
- batchId < minCompactionBatchId
- } catch {
- case _: NumberFormatException =>
- false
- }
+ fileManager.list(metadataPath, (path: Path) => {
+ try {
+ val batchId = getBatchIdFromFileName(path.getName)
+ batchId < minCompactionBatchId
+ } catch {
+ case _: NumberFormatException =>
+ false
}
}).foreach { f =>
if (f.getModificationTime <= expiredTime) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
index 08aea75..a652eeb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala
@@ -89,19 +89,15 @@ class RateStreamTable(
override def capabilities(): util.Set[TableCapability] = Collections.emptySet()
- override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new ScanBuilder {
- override def build(): Scan = new Scan {
- override def readSchema(): StructType = RateStreamProvider.SCHEMA
+ override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = () => new Scan {
+ override def readSchema(): StructType = RateStreamProvider.SCHEMA
- override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = {
- new RateStreamMicroBatchStream(
- rowsPerSecond, rampUpTimeSeconds, numPartitions, options, checkpointLocation)
- }
+ override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream =
+ new RateStreamMicroBatchStream(
+ rowsPerSecond, rampUpTimeSeconds, numPartitions, options, checkpointLocation)
- override def toContinuousStream(checkpointLocation: String): ContinuousStream = {
- new RateStreamContinuousStream(rowsPerSecond, numPartitions)
- }
- }
+ override def toContinuousStream(checkpointLocation: String): ContinuousStream =
+ new RateStreamContinuousStream(rowsPerSecond, numPartitions)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala
index 9168d46..dab64e1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala
@@ -130,27 +130,24 @@ class TextSocketMicroBatchStream(host: String, port: Int, numPartitions: Int)
slices.map(TextSocketInputPartition)
}
- override def createReaderFactory(): PartitionReaderFactory = {
- new PartitionReaderFactory {
- override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
- val slice = partition.asInstanceOf[TextSocketInputPartition].slice
- new PartitionReader[InternalRow] {
- private var currentIdx = -1
-
- override def next(): Boolean = {
- currentIdx += 1
- currentIdx < slice.size
- }
-
- override def get(): InternalRow = {
- InternalRow(slice(currentIdx)._1, slice(currentIdx)._2)
- }
+ override def createReaderFactory(): PartitionReaderFactory =
+ (partition: InputPartition) => {
+ val slice = partition.asInstanceOf[TextSocketInputPartition].slice
+ new PartitionReader[InternalRow] {
+ private var currentIdx = -1
+
+ override def next(): Boolean = {
+ currentIdx += 1
+ currentIdx < slice.size
+ }
- override def close(): Unit = {}
+ override def get(): InternalRow = {
+ InternalRow(slice(currentIdx)._1, slice(currentIdx)._2)
}
+
+ override def close(): Unit = {}
}
}
- }
override def commit(end: Offset): Unit = synchronized {
val newOffset = LongOffset.convert(end).getOrElse(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
index c0292ac..a0452cf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala
@@ -81,17 +81,15 @@ class TextSocketTable(host: String, port: Int, numPartitions: Int, includeTimest
override def capabilities(): util.Set[TableCapability] = Collections.emptySet()
- override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new ScanBuilder {
- override def build(): Scan = new Scan {
- override def readSchema(): StructType = schema()
+ override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = () => new Scan {
+ override def readSchema(): StructType = schema()
- override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = {
- new TextSocketMicroBatchStream(host, port, numPartitions)
- }
+ override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = {
+ new TextSocketMicroBatchStream(host, port, numPartitions)
+ }
- override def toContinuousStream(checkpointLocation: String): ContinuousStream = {
- new TextSocketContinuousStream(host, port, numPartitions, options)
- }
+ override def toContinuousStream(checkpointLocation: String): ContinuousStream = {
+ new TextSocketContinuousStream(host, port, numPartitions, options)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
index daf7f2c..e496de1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
@@ -18,7 +18,6 @@ package org.apache.spark.sql.execution.ui
import java.util.{Date, NoSuchElementException}
import java.util.concurrent.ConcurrentHashMap
-import java.util.function.Function
import scala.collection.JavaConverters._
@@ -196,7 +195,7 @@ class SQLAppStatusListener(
// Check the execution again for whether the aggregated metrics data has been calculated.
// This can happen if the UI is requesting this data, and the onExecutionEnd handler is
- // running at the same time. The metrics calculated for the UI can be innacurate in that
+ // running at the same time. The metrics calculated for the UI can be inaccurate in that
// case, since the onExecutionEnd handler will clean up tracked stage metrics.
if (exec.metricsValues != null) {
exec.metricsValues
@@ -328,9 +327,7 @@ class SQLAppStatusListener(
private def getOrCreateExecution(executionId: Long): LiveExecutionData = {
liveExecutions.computeIfAbsent(executionId,
- new Function[Long, LiveExecutionData]() {
- override def apply(key: Long): LiveExecutionData = new LiveExecutionData(executionId)
- })
+ (_: Long) => new LiveExecutionData(executionId))
}
private def update(exec: LiveExecutionData, force: Boolean = false): Unit = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index 33d7834..e6c40bd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.CacheManager
import org.apache.spark.sql.execution.ui.{SQLAppStatusListener, SQLAppStatusStore, SQLTab}
import org.apache.spark.sql.internal.StaticSQLConf._
import org.apache.spark.status.ElementTrackingStore
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
+import org.apache.spark.util.Utils
/**
@@ -146,11 +146,7 @@ private[sql] class SharedState(
val wrapped = new ExternalCatalogWithListener(externalCatalog)
// Make sure we propagate external catalog events to the spark listener bus
- wrapped.addListener(new ExternalCatalogEventListener {
- override def onEvent(event: ExternalCatalogEvent): Unit = {
- sparkContext.listenerBus.post(event)
- }
- })
+ wrapped.addListener((event: ExternalCatalogEvent) => sparkContext.listenerBus.post(event))
wrapped
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index d52047d..dd7c380 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -1195,11 +1195,8 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
GroupedRoutes("a", "c", Seq(Route("a", "c", 2)))
)
- implicit def ordering[GroupedRoutes]: Ordering[GroupedRoutes] = new Ordering[GroupedRoutes] {
- override def compare(x: GroupedRoutes, y: GroupedRoutes): Int = {
- x.toString.compareTo(y.toString)
- }
- }
+ implicit def ordering[GroupedRoutes]: Ordering[GroupedRoutes] =
+ (x: GroupedRoutes, y: GroupedRoutes) => x.toString.compareTo(y.toString)
checkDatasetUnorderly(grped, expected: _*)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala
index 3cc97c9..7de5e82 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala
@@ -120,10 +120,7 @@ class QueryExecutionSuite extends SharedSQLContext {
}
test("toString() exception/error handling") {
- spark.experimental.extraStrategies = Seq(
- new SparkStrategy {
- override def apply(plan: LogicalPlan): Seq[SparkPlan] = Nil
- })
+ spark.experimental.extraStrategies = Seq[SparkStrategy]((_: LogicalPlan) => Nil)
def qe: QueryExecution = new QueryExecution(spark, OneRowRelation())
@@ -131,19 +128,13 @@ class QueryExecutionSuite extends SharedSQLContext {
assert(qe.toString.contains("OneRowRelation"))
// Throw an AnalysisException - this should be captured.
- spark.experimental.extraStrategies = Seq(
- new SparkStrategy {
- override def apply(plan: LogicalPlan): Seq[SparkPlan] =
- throw new AnalysisException("exception")
- })
+ spark.experimental.extraStrategies = Seq[SparkStrategy](
+ (_: LogicalPlan) => throw new AnalysisException("exception"))
assert(qe.toString.contains("org.apache.spark.sql.AnalysisException"))
// Throw an Error - this should not be captured.
- spark.experimental.extraStrategies = Seq(
- new SparkStrategy {
- override def apply(plan: LogicalPlan): Seq[SparkPlan] =
- throw new Error("error")
- })
+ spark.experimental.extraStrategies = Seq[SparkStrategy](
+ (_: LogicalPlan) => throw new Error("error"))
val error = intercept[Error](qe.toString)
assert(error.getMessage.contains("error"))
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala
index 784438c..3760539 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SortBenchmark.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution.benchmark
-import java.util.{Arrays, Comparator}
+import java.util.Arrays
import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
import org.apache.spark.unsafe.array.LongArray
@@ -40,14 +40,9 @@ object SortBenchmark extends BenchmarkBase {
private def referenceKeyPrefixSort(buf: LongArray, lo: Int, hi: Int, refCmp: PrefixComparator) {
val sortBuffer = new LongArray(MemoryBlock.fromLongArray(new Array[Long](buf.size().toInt)))
- new Sorter(new UnsafeSortDataFormat(sortBuffer)).sort(
- buf, lo, hi, new Comparator[RecordPointerAndKeyPrefix] {
- override def compare(
- r1: RecordPointerAndKeyPrefix,
- r2: RecordPointerAndKeyPrefix): Int = {
- refCmp.compare(r1.keyPrefix, r2.keyPrefix)
- }
- })
+ new Sorter(new UnsafeSortDataFormat(sortBuffer)).sort(buf, lo, hi,
+ (r1: RecordPointerAndKeyPrefix, r2: RecordPointerAndKeyPrefix) =>
+ refCmp.compare(r1.keyPrefix, r2.keyPrefix))
}
private def generateKeyPrefixTestData(size: Int, rand: => Long): (LongArray, LongArray) = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index da49683..89ce636 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -354,11 +354,8 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
val listener = new StreamingQueryListener {
override def onQueryStarted(event: QueryStartedEvent): Unit = {
// Note: this assumes there is only one query active in the `testStream` method.
- Thread.currentThread.setUncaughtExceptionHandler(new UncaughtExceptionHandler {
- override def uncaughtException(t: Thread, e: Throwable): Unit = {
- streamThreadDeathCause = e
- }
- })
+ Thread.currentThread.setUncaughtExceptionHandler(
+ (_: Thread, e: Throwable) => streamThreadDeathCause = e)
}
override def onQueryProgress(event: QueryProgressEvent): Unit = {}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index 99dc076..8fb1400 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider}
-import org.apache.spark.sql.streaming.{ProcessingTime => DeprecatedProcessingTime, _}
+import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, StreamingQueryException, StreamTest}
import org.apache.spark.sql.streaming.Trigger._
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -104,9 +104,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
LastOptions.parameters = parameters
LastOptions.partitionColumns = partitionColumns
LastOptions.mockStreamSinkProvider.createSink(spark, parameters, partitionColumns, outputMode)
- new Sink {
- override def addBatch(batchId: Long, data: DataFrame): Unit = {}
- }
+ (_: Long, _: DataFrame) => {}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala
index 19ab2ff..67158fb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala
@@ -60,11 +60,7 @@ class BlockingSource extends StreamSourceProvider with StreamSinkProvider {
spark: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
- outputMode: OutputMode): Sink = {
- new Sink {
- override def addBatch(batchId: Long, data: DataFrame): Unit = {}
- }
- }
+ outputMode: OutputMode): Sink = (_: Long, _: DataFrame) => {}
}
object BlockingSource {
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index d9e4842..327dca7 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -29,7 +29,7 @@ import org.apache.commons.lang3.StringUtils
import org.apache.commons.logging.LogFactory
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.cli.{CliDriver, CliSessionState, OptionsProcessor}
-import org.apache.hadoop.hive.common.{HiveInterruptCallback, HiveInterruptUtils}
+import org.apache.hadoop.hive.common.HiveInterruptUtils
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.exec.Utilities
@@ -65,16 +65,14 @@ private[hive] object SparkSQLCLIDriver extends Logging {
* a command is being processed by the current thread.
*/
def installSignalHandler() {
- HiveInterruptUtils.add(new HiveInterruptCallback {
- override def interrupt() {
- // Handle remote execution mode
- if (SparkSQLEnv.sparkContext != null) {
- SparkSQLEnv.sparkContext.cancelAllJobs()
- } else {
- if (transport != null) {
- // Force closing of TCP connection upon session termination
- transport.getSocket.close()
- }
+ HiveInterruptUtils.add(() => {
+ // Handle remote execution mode
+ if (SparkSQLEnv.sparkContext != null) {
+ SparkSQLEnv.sparkContext.cancelAllJobs()
+ } else {
+ if (transport != null) {
+ // Force closing of TCP connection upon session termination
+ transport.getSocket.close()
}
}
})
@@ -208,7 +206,7 @@ private[hive] object SparkSQLCLIDriver extends Logging {
reader.setBellEnabled(false)
reader.setExpandEvents(false)
// reader.setDebug(new PrintWriter(new FileWriter("writer.debug", true)))
- CliDriver.getCommandCompleter.foreach((e) => reader.addCompleter(e))
+ CliDriver.getCommandCompleter.foreach(reader.addCompleter)
val historyDirectory = System.getProperty("user.home")
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index c74ca19..829254b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -255,9 +255,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
}
}
- def numReceivers(): Int = {
- receiverInputStreams.size
- }
+ def numReceivers(): Int = receiverInputStreams.length
/** Register a receiver */
private def registerReceiver(
@@ -516,14 +514,12 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
context.reply(successful)
case AddBlock(receivedBlockInfo) =>
if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {
- walBatchingThreadPool.execute(new Runnable {
- override def run(): Unit = Utils.tryLogNonFatalError {
- if (active) {
- context.reply(addBlock(receivedBlockInfo))
- } else {
- context.sendFailure(
- new IllegalStateException("ReceiverTracker RpcEndpoint already shut down."))
- }
+ walBatchingThreadPool.execute(() => Utils.tryLogNonFatalError {
+ if (active) {
+ context.reply(addBlock(receivedBlockInfo))
+ } else {
+ context.sendFailure(
+ new IllegalStateException("ReceiverTracker RpcEndpoint already shut down."))
}
})
} else {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala
index 71b86d1..31e4c6b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala
@@ -135,18 +135,16 @@ private[util] class BatchedWriteAheadLog(val wrappedLog: WriteAheadLog, conf: Sp
/** Start the actual log writer on a separate thread. */
private def startBatchedWriterThread(): Thread = {
- val thread = new Thread(new Runnable {
- override def run(): Unit = {
- while (active.get()) {
- try {
- flushRecords()
- } catch {
- case NonFatal(e) =>
- logWarning("Encountered exception in Batched Writer Thread.", e)
- }
+ val thread = new Thread(() => {
+ while (active.get()) {
+ try {
+ flushRecords()
+ } catch {
+ case NonFatal(e) =>
+ logWarning("Encountered exception in Batched Writer Thread.", e)
}
- logInfo("BatchedWriteAheadLog Writer thread exiting.")
}
+ logInfo("BatchedWriteAheadLog Writer thread exiting.")
}, "BatchedWriteAheadLog Writer")
thread.setDaemon(true)
thread.start()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org