You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2019/04/11 18:44:06 UTC

[spark] branch master updated: [SPARK-27404][CORE][SQL][STREAMING][YARN] Fix build warnings for 3.0: postfixOps edition

This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ec7f63  [SPARK-27404][CORE][SQL][STREAMING][YARN] Fix build warnings for 3.0: postfixOps edition
4ec7f63 is described below

commit 4ec7f631aa23bc0121656dd77f05767f447112c0
Author: Sean Owen <se...@databricks.com>
AuthorDate: Thu Apr 11 13:43:44 2019 -0500

    [SPARK-27404][CORE][SQL][STREAMING][YARN] Fix build warnings for 3.0: postfixOps edition
    
    ## What changes were proposed in this pull request?
    
    Fix build warnings -- see some details below.
    
    But mostly, remove use of postfix syntax where it causes warnings without the `scala.language.postfixOps` import. This is mostly in expressions like "120000 milliseconds". Which, I'd like to simplify to things like "2.minutes" anyway.
    
    ## How was this patch tested?
    
    Existing tests.
    
    Closes #24314 from srowen/SPARK-27404.
    
    Authored-by: Sean Owen <se...@databricks.com>
    Signed-off-by: Sean Owen <se...@databricks.com>
---
 .../network/shuffle/RetryingBlockFetcherSuite.java |  8 +--
 .../org/apache/spark/BarrierTaskContext.scala      |  3 +-
 .../apache/spark/deploy/FaultToleranceTest.scala   | 17 +++---
 .../scala/org/apache/spark/rpc/RpcTimeout.scala    |  2 +-
 .../org/apache/spark/scheduler/DAGScheduler.scala  |  3 +-
 .../main/scala/org/apache/spark/ui/UIUtils.scala   | 19 +++----
 .../spark/BarrierStageOnSubmittedSuite.scala       |  3 +-
 .../org/apache/spark/ContextCleanerSuite.scala     | 16 +++---
 .../test/scala/org/apache/spark/DriverSuite.scala  |  4 +-
 .../org/apache/spark/JobCancellationSuite.scala    |  4 +-
 .../scala/org/apache/spark/SparkConfSuite.scala    |  5 +-
 .../org/apache/spark/StatusTrackerSuite.scala      | 29 +++++-----
 .../org/apache/spark/deploy/SparkSubmitSuite.scala |  2 +-
 .../deploy/StandaloneDynamicAllocationSuite.scala  |  2 +-
 .../spark/deploy/client/AppClientSuite.scala       |  2 +-
 .../deploy/history/FsHistoryProviderSuite.scala    | 11 ++--
 .../spark/deploy/history/HistoryServerSuite.scala  | 20 +------
 .../apache/spark/deploy/master/MasterSuite.scala   | 11 ++--
 .../apache/spark/deploy/worker/WorkerSuite.scala   |  6 +-
 .../org/apache/spark/executor/ExecutorSuite.scala  |  1 -
 .../spark/launcher/LauncherBackendSuite.scala      |  5 +-
 .../apache/spark/rdd/AsyncRDDActionsSuite.scala    |  4 +-
 .../apache/spark/rdd/LocalCheckpointSuite.scala    |  3 +-
 .../scala/org/apache/spark/rpc/RpcEnvSuite.scala   | 55 +++++++++----------
 .../CoarseGrainedSchedulerBackendSuite.scala       |  2 +-
 .../apache/spark/scheduler/DAGSchedulerSuite.scala |  2 +-
 .../scheduler/OutputCommitCoordinatorSuite.scala   |  3 +-
 .../scheduler/SchedulerIntegrationSuite.scala      |  2 +-
 .../spark/scheduler/TaskResultGetterSuite.scala    |  7 +--
 .../storage/BlockManagerReplicationSuite.scala     | 15 +++--
 .../apache/spark/storage/BlockManagerSuite.scala   | 22 ++++----
 .../org/apache/spark/ui/UISeleniumSuite.scala      | 54 +++++++++---------
 .../test/scala/org/apache/spark/ui/UISuite.scala   |  4 +-
 .../org/apache/spark/util/EventLoopSuite.scala     | 19 +++----
 .../collection/ExternalAppendOnlyMapSuite.scala    |  3 +-
 .../sql/jdbc/DockerJDBCIntegrationSuite.scala      |  2 +-
 .../kafka010/KafkaDontFailOnDataLossSuite.scala    |  2 +-
 .../apache/spark/sql/kafka010/KafkaTestUtils.scala |  9 ++-
 .../kafka010/DirectKafkaStreamSuite.scala          | 21 ++++---
 .../kinesis/KinesisCheckpointerSuite.scala         | 11 ++--
 .../streaming/kinesis/KinesisReceiverSuite.scala   |  6 +-
 .../streaming/kinesis/KinesisStreamSuite.scala     | 42 ++++++++------
 .../scala/org/apache/spark/ml/MLEventsSuite.scala  | 41 +++++++-------
 .../spark/deploy/yarn/BaseYarnClusterSuite.scala   |  4 +-
 .../spark/deploy/yarn/YarnClusterSuite.scala       |  8 +--
 .../network/yarn/YarnShuffleServiceSuite.scala     |  5 +-
 .../spark/sql/sources/v2/JavaSimpleBatchTable.java | 54 ------------------
 ...atchTable.java => JavaSimpleReaderFactory.java} | 64 ++--------------------
 .../sql/sources/v2/JavaSimpleScanBuilder.java      | 47 ++++++++++++++++
 .../org/apache/spark/sql/CachedTableSuite.scala    | 11 ++--
 .../org/apache/spark/sql/DatasetCacheSuite.scala   |  4 +-
 .../streaming/sources/TextSocketStreamSuite.scala  |  2 +-
 .../state/StateStoreCoordinatorSuite.scala         |  8 +--
 .../streaming/state/StateStoreSuite.scala          |  2 +-
 .../sql/streaming/StreamingQueryManagerSuite.scala | 58 ++++++++++----------
 .../StreamingQueryStatusAndProgressSuite.scala     |  3 +-
 .../sql/hive/thriftserver/UISeleniumSuite.scala    |  4 +-
 .../streaming/receiver/ReceivedBlockHandler.scala  |  2 +-
 .../streaming/util/FileBasedWriteAheadLog.scala    |  5 +-
 .../apache/spark/streaming/CheckpointSuite.scala   |  4 +-
 .../streaming/ReceivedBlockHandlerSuite.scala      |  3 +-
 .../streaming/ReceivedBlockTrackerSuite.scala      |  4 +-
 .../org/apache/spark/streaming/ReceiverSuite.scala | 22 ++++----
 .../spark/streaming/StreamingContextSuite.scala    | 52 +++++++++---------
 .../spark/streaming/StreamingListenerSuite.scala   |  4 +-
 .../apache/spark/streaming/UISeleniumSuite.scala   |  8 +--
 .../streaming/receiver/BlockGeneratorSuite.scala   | 16 +++---
 .../scheduler/ExecutorAllocationManagerSuite.scala |  2 +-
 .../streaming/scheduler/JobGeneratorSuite.scala    |  5 +-
 .../streaming/scheduler/ReceiverTrackerSuite.scala |  8 +--
 .../spark/streaming/util/WriteAheadLogSuite.scala  | 20 +++----
 71 files changed, 413 insertions(+), 518 deletions(-)

diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
index a530e16..6f90df5 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
@@ -46,9 +46,9 @@ import static org.apache.spark.network.shuffle.RetryingBlockFetcher.BlockFetchSt
  */
 public class RetryingBlockFetcherSuite {
 
-  ManagedBuffer block0 = new NioManagedBuffer(ByteBuffer.wrap(new byte[13]));
-  ManagedBuffer block1 = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
-  ManagedBuffer block2 = new NioManagedBuffer(ByteBuffer.wrap(new byte[19]));
+  private final ManagedBuffer block0 = new NioManagedBuffer(ByteBuffer.wrap(new byte[13]));
+  private final ManagedBuffer block1 = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
+  private final ManagedBuffer block2 = new NioManagedBuffer(ByteBuffer.wrap(new byte[19]));
 
   @Test
   public void testNoFailures() throws IOException, InterruptedException {
@@ -291,7 +291,7 @@ public class RetryingBlockFetcherSuite {
     }
 
     assertNotNull(stub);
-    stub.when(fetchStarter).createAndStart(any(), anyObject());
+    stub.when(fetchStarter).createAndStart(any(), any());
     String[] blockIdArray = blockIds.toArray(new String[blockIds.size()]);
     new RetryingBlockFetcher(conf, fetchStarter, blockIdArray, listener).start();
   }
diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
index 2d842b9..a354f44 100644
--- a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
@@ -20,7 +20,6 @@ package org.apache.spark
 import java.util.{Properties, Timer, TimerTask}
 
 import scala.concurrent.duration._
-import scala.language.postfixOps
 
 import org.apache.spark.annotation.{Experimental, Since}
 import org.apache.spark.executor.TaskMetrics
@@ -122,7 +121,7 @@ class BarrierTaskContext private[spark] (
           barrierEpoch),
         // Set a fixed timeout for RPC here, so users shall get a SparkException thrown by
         // BarrierCoordinator on timeout, instead of RPCTimeoutException from the RPC framework.
-        timeout = new RpcTimeout(31536000 /* = 3600 * 24 * 365 */ seconds, "barrierTimeout"))
+        timeout = new RpcTimeout(365.days, "barrierTimeout"))
       barrierEpoch += 1
       logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt $stageAttemptNumber) finished " +
         "global sync successfully, waited for " +
diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
index a662430..99f8412 100644
--- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
@@ -26,7 +26,6 @@ import scala.collection.mutable.ListBuffer
 import scala.concurrent.{Future, Promise}
 import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent.duration._
-import scala.language.postfixOps
 import scala.sys.process._
 
 import org.json4s._
@@ -112,7 +111,7 @@ private object FaultToleranceTest extends App with Logging {
     assertValidClusterState()
 
     killLeader()
-    delay(30 seconds)
+    delay(30.seconds)
     assertValidClusterState()
     createClient()
     assertValidClusterState()
@@ -126,12 +125,12 @@ private object FaultToleranceTest extends App with Logging {
 
     killLeader()
     addMasters(1)
-    delay(30 seconds)
+    delay(30.seconds)
     assertValidClusterState()
 
     killLeader()
     addMasters(1)
-    delay(30 seconds)
+    delay(30.seconds)
     assertValidClusterState()
   }
 
@@ -156,7 +155,7 @@ private object FaultToleranceTest extends App with Logging {
     killLeader()
     workers.foreach(_.kill())
     workers.clear()
-    delay(30 seconds)
+    delay(30.seconds)
     addWorkers(2)
     assertValidClusterState()
   }
@@ -174,7 +173,7 @@ private object FaultToleranceTest extends App with Logging {
 
     (1 to 3).foreach { _ =>
       killLeader()
-      delay(30 seconds)
+      delay(30.seconds)
       assertValidClusterState()
       assertTrue(getLeader == masters.head)
       addMasters(1)
@@ -264,7 +263,7 @@ private object FaultToleranceTest extends App with Logging {
     }
 
     // Avoid waiting indefinitely (e.g., we could register but get no executors).
-    assertTrue(ThreadUtils.awaitResult(f, 120 seconds))
+    assertTrue(ThreadUtils.awaitResult(f, 2.minutes))
   }
 
   /**
@@ -317,7 +316,7 @@ private object FaultToleranceTest extends App with Logging {
     }
 
     try {
-      assertTrue(ThreadUtils.awaitResult(f, 120 seconds))
+      assertTrue(ThreadUtils.awaitResult(f, 2.minutes))
     } catch {
       case e: TimeoutException =>
         logError("Master states: " + masters.map(_.state))
@@ -421,7 +420,7 @@ private object SparkDocker {
     }
 
     dockerCmd.run(ProcessLogger(findIpAndLog _))
-    val ip = ThreadUtils.awaitResult(ipPromise.future, 30 seconds)
+    val ip = ThreadUtils.awaitResult(ipPromise.future, 30.seconds)
     val dockerId = Docker.getLastProcessId
     (ip, dockerId, outFile)
   }
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala b/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala
index 3dc41f7..770ae2f 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala
@@ -52,7 +52,7 @@ private[spark] class RpcTimeout(val duration: FiniteDuration, val timeoutProp: S
    *
    * @note This can be used in the recover callback of a Future to add to a TimeoutException
    * Example:
-   *    val timeout = new RpcTimeout(5 millis, "short timeout")
+   *    val timeout = new RpcTimeout(5.milliseconds, "short timeout")
    *    Future(throw new TimeoutException).recover(timeout.addMessageIfTimeout)
    */
   def addMessageIfTimeout[T]: PartialFunction[Throwable, T] = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index d967d38..524b0c4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -27,7 +27,6 @@ import scala.collection.Map
 import scala.collection.mutable.{ArrayStack, HashMap, HashSet}
 import scala.concurrent.duration._
 import scala.language.existentials
-import scala.language.postfixOps
 import scala.util.control.NonFatal
 
 import org.apache.commons.lang3.SerializationUtils
@@ -270,7 +269,7 @@ private[spark] class DAGScheduler(
     listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates,
       Some(executorUpdates)))
     blockManagerMaster.driverEndpoint.askSync[Boolean](
-      BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat"))
+      BlockManagerHeartbeat(blockManagerId), new RpcTimeout(10.minutes, "BlockManagerHeartbeat"))
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index d7bda8b..11647c0 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -109,12 +109,12 @@ private[spark] object UIUtils extends Logging {
         }
       }
       // if time is more than a year
-      return s"$yearString $weekString $dayString"
+      s"$yearString $weekString $dayString"
     } catch {
       case e: Exception =>
         logError("Error converting time to string", e)
         // if there is some error, return blank string
-        return ""
+        ""
     }
   }
 
@@ -336,7 +336,7 @@ private[spark] object UIUtils extends Logging {
     def getHeaderContent(header: String): Seq[Node] = {
       if (newlinesInHeader) {
         <ul class="unstyled">
-          { header.split("\n").map { case t => <li> {t} </li> } }
+          { header.split("\n").map(t => <li> {t} </li>) }
         </ul>
       } else {
         Text(header)
@@ -446,7 +446,7 @@ private[spark] object UIUtils extends Logging {
    * the whole string will rendered as a simple escaped text.
    *
    * Note: In terms of security, only anchor tags with root relative links are supported. So any
-   * attempts to embed links outside Spark UI, or other tags like {@code <script>} will cause in
+   * attempts to embed links outside Spark UI, or other tags like &lt;script&gt; will cause in
    * the whole description to be treated as plain text.
    *
    * @param desc        the original job or stage description string, which may contain html tags.
@@ -458,7 +458,6 @@ private[spark] object UIUtils extends Logging {
    *         is true, and an Elem otherwise.
    */
   def makeDescription(desc: String, basePathUri: String, plainText: Boolean = false): NodeSeq = {
-    import scala.language.postfixOps
 
     // If the description can be parsed as HTML and has only relative links, then render
     // as HTML, otherwise render as escaped string
@@ -468,9 +467,7 @@ private[spark] object UIUtils extends Logging {
 
       // Verify that this has only anchors and span (we are wrapping in span)
       val allowedNodeLabels = Set("a", "span", "br")
-      val illegalNodes = xml \\ "_"  filterNot { case node: Node =>
-        allowedNodeLabels.contains(node.label)
-      }
+      val illegalNodes = (xml \\ "_").filterNot(node => allowedNodeLabels.contains(node.label))
       if (illegalNodes.nonEmpty) {
         throw new IllegalArgumentException(
           "Only HTML anchors allowed in job descriptions\n" +
@@ -491,8 +488,8 @@ private[spark] object UIUtils extends Logging {
           new RewriteRule() {
             override def transform(n: Node): Seq[Node] = {
               n match {
-                case e: Elem if e.child isEmpty => Text(e.text)
-                case e: Elem if e.child nonEmpty => Text(e.child.flatMap(transform).text)
+                case e: Elem if e.child.isEmpty => Text(e.text)
+                case e: Elem => Text(e.child.flatMap(transform).text)
                 case _ => n
               }
             }
@@ -503,7 +500,7 @@ private[spark] object UIUtils extends Logging {
           new RewriteRule() {
             override def transform(n: Node): Seq[Node] = {
               n match {
-                case e: Elem if e \ "@href" nonEmpty =>
+                case e: Elem if (e \ "@href").nonEmpty =>
                   val relativePath = e.attribute("href").get.toString
                   val fullUri = s"${basePathUri.stripSuffix("/")}/${relativePath.stripPrefix("/")}"
                   e % Attribute(null, "href", fullUri, Null)
diff --git a/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala b/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala
index ca839d1..435b927 100644
--- a/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala
@@ -18,7 +18,6 @@
 package org.apache.spark
 
 import scala.concurrent.duration._
-import scala.language.postfixOps
 
 import org.apache.spark.internal.config._
 import org.apache.spark.rdd.{PartitionPruningRDD, RDD}
@@ -52,7 +51,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext
     )
 
     val error = intercept[SparkException] {
-      ThreadUtils.awaitResult(futureAction, 5 seconds)
+      ThreadUtils.awaitResult(futureAction, 5.seconds)
     }.getCause.getMessage
     assert(error.contains(message))
   }
diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
index d4bf20c..c16e227 100644
--- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
@@ -43,7 +43,7 @@ import org.apache.spark.storage._
 abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[SortShuffleManager])
   extends SparkFunSuite with BeforeAndAfter with LocalSparkContext
 {
-  implicit val defaultTimeout = timeout(10000 millis)
+  implicit val defaultTimeout = timeout(10.seconds)
   val conf = new SparkConf()
     .setMaster("local[2]")
     .setAppName("ContextCleanerSuite")
@@ -159,7 +159,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
     val preGCTester = new CleanerTester(sc, rddIds = Seq(rdd.id))
     runGC()
     intercept[Exception] {
-      preGCTester.assertCleanup()(timeout(1000 millis))
+      preGCTester.assertCleanup()(timeout(1.second))
     }
 
     // Test that GC causes RDD cleanup after dereferencing the RDD
@@ -178,7 +178,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
     val preGCTester = new CleanerTester(sc, shuffleIds = Seq(0))
     runGC()
     intercept[Exception] {
-      preGCTester.assertCleanup()(timeout(1000 millis))
+      preGCTester.assertCleanup()(timeout(1.second))
     }
     rdd.count()  // Defeat early collection by the JVM
 
@@ -196,7 +196,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
     val preGCTester = new CleanerTester(sc, broadcastIds = Seq(broadcast.id))
     runGC()
     intercept[Exception] {
-      preGCTester.assertCleanup()(timeout(1000 millis))
+      preGCTester.assertCleanup()(timeout(1.second))
     }
 
     // Test that GC causes broadcast cleanup after dereferencing the broadcast variable
@@ -272,7 +272,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
     val preGCTester = new CleanerTester(sc, rddIds = Seq(rdd.id))
     runGC()
     intercept[Exception] {
-      preGCTester.assertCleanup()(timeout(1000 millis))
+      preGCTester.assertCleanup()(timeout(1.second))
     }
 
     // Test that RDD going out of scope does cause the checkpoint blocks to be cleaned up
@@ -294,7 +294,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
     val preGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds)
     runGC()
     intercept[Exception] {
-      preGCTester.assertCleanup()(timeout(1000 millis))
+      preGCTester.assertCleanup()(timeout(1.second))
     }
 
     // Test that GC triggers the cleanup of all variables after the dereferencing them
@@ -334,7 +334,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
     val preGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds)
     runGC()
     intercept[Exception] {
-      preGCTester.assertCleanup()(timeout(1000 millis))
+      preGCTester.assertCleanup()(timeout(1.second))
     }
 
     // Test that GC triggers the cleanup of all variables after the dereferencing them
@@ -408,7 +408,7 @@ class CleanerTester(
   /** Assert that all the stuff has been cleaned up */
   def assertCleanup()(implicit waitTimeout: PatienceConfiguration.Timeout) {
     try {
-      eventually(waitTimeout, interval(100 millis)) {
+      eventually(waitTimeout, interval(100.milliseconds)) {
         assert(isAllCleanedUp,
           "The following resources were not cleaned up:\n" + uncleanedResourcesToString)
       }
diff --git a/core/src/test/scala/org/apache/spark/DriverSuite.scala b/core/src/test/scala/org/apache/spark/DriverSuite.scala
index 896cd2e..182f28c 100644
--- a/core/src/test/scala/org/apache/spark/DriverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DriverSuite.scala
@@ -33,12 +33,12 @@ class DriverSuite extends SparkFunSuite with TimeLimits {
   ignore("driver should exit after finishing without cleanup (SPARK-530)") {
     val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
     val masters = Table("master", "local", "local-cluster[2,1,1024]")
-    forAll(masters) { (master: String) =>
+    forAll(masters) { master =>
       val process = Utils.executeCommand(
         Seq(s"$sparkHome/bin/spark-class", "org.apache.spark.DriverWithoutCleanup", master),
         new File(sparkHome),
         Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
-      failAfter(60 seconds) { process.waitFor() }
+      failAfter(1.minute) { process.waitFor() }
       // Ensure we still kill the process in case it timed out
       process.destroy()
     }
diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
index 00fecd4..db90c31 100644
--- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
@@ -250,7 +250,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
     assert(e.getMessage contains "cancel")
 
     // Once A is cancelled, job B should finish fairly quickly.
-    assert(ThreadUtils.awaitResult(jobB, 60.seconds) === 100)
+    assert(ThreadUtils.awaitResult(jobB, 1.minute) === 100)
   }
 
   test("task reaper will not kill JVM if spark.task.killTimeout == -1") {
@@ -290,7 +290,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
     assert(e.getMessage contains "cancel")
 
     // Once A is cancelled, job B should finish fairly quickly.
-    assert(ThreadUtils.awaitResult(jobB, 60.seconds) === 100)
+    assert(ThreadUtils.awaitResult(jobB, 1.minute) === 100)
   }
 
   test("two jobs sharing the same stage") {
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 0795790..3a5de8a 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -21,7 +21,6 @@ import java.util.concurrent.{Executors, TimeUnit}
 
 import scala.collection.JavaConverters._
 import scala.concurrent.duration._
-import scala.language.postfixOps
 import scala.util.{Random, Try}
 
 import com.esotericsoftware.kryo.Kryo
@@ -279,10 +278,10 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
     assert(RpcUtils.retryWaitMs(conf) === 2L)
 
     conf.set("spark.akka.askTimeout", "3")
-    assert(RpcUtils.askRpcTimeout(conf).duration === (3 seconds))
+    assert(RpcUtils.askRpcTimeout(conf).duration === 3.seconds)
 
     conf.set("spark.akka.lookupTimeout", "4")
-    assert(RpcUtils.lookupRpcTimeout(conf).duration === (4 seconds))
+    assert(RpcUtils.lookupRpcTimeout(conf).duration === 4.seconds)
   }
 
   test("SPARK-13727") {
diff --git a/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala b/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala
index a15ae04..c96db4e 100644
--- a/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark
 
 import scala.concurrent.duration._
 import scala.language.implicitConversions
-import scala.language.postfixOps
 
 import org.scalatest.Matchers
 import org.scalatest.concurrent.Eventually._
@@ -31,25 +30,25 @@ class StatusTrackerSuite extends SparkFunSuite with Matchers with LocalSparkCont
   test("basic status API usage") {
     sc = new SparkContext("local", "test", new SparkConf(false))
     val jobFuture = sc.parallelize(1 to 10000, 2).map(identity).groupBy(identity).collectAsync()
-    val jobId: Int = eventually(timeout(10 seconds)) {
+    val jobId: Int = eventually(timeout(10.seconds)) {
       val jobIds = jobFuture.jobIds
       jobIds.size should be(1)
       jobIds.head
     }
-    val jobInfo = eventually(timeout(10 seconds)) {
+    val jobInfo = eventually(timeout(10.seconds)) {
       sc.statusTracker.getJobInfo(jobId).get
     }
     jobInfo.status() should not be FAILED
     val stageIds = jobInfo.stageIds()
     stageIds.size should be(2)
 
-    val firstStageInfo = eventually(timeout(10 seconds)) {
+    val firstStageInfo = eventually(timeout(10.seconds)) {
       sc.statusTracker.getStageInfo(stageIds.min).get
     }
     firstStageInfo.stageId() should be(stageIds.min)
     firstStageInfo.currentAttemptId() should be(0)
     firstStageInfo.numTasks() should be(2)
-    eventually(timeout(10 seconds)) {
+    eventually(timeout(10.seconds)) {
       val updatedFirstStageInfo = sc.statusTracker.getStageInfo(stageIds.min).get
       updatedFirstStageInfo.numCompletedTasks() should be(2)
       updatedFirstStageInfo.numActiveTasks() should be(0)
@@ -61,27 +60,27 @@ class StatusTrackerSuite extends SparkFunSuite with Matchers with LocalSparkCont
     sc = new SparkContext("local", "test", new SparkConf(false))
     // Passing `null` should return jobs that were not run in a job group:
     val defaultJobGroupFuture = sc.parallelize(1 to 1000).countAsync()
-    val defaultJobGroupJobId = eventually(timeout(10 seconds)) {
+    val defaultJobGroupJobId = eventually(timeout(10.seconds)) {
       defaultJobGroupFuture.jobIds.head
     }
-    eventually(timeout(10 seconds)) {
+    eventually(timeout(10.seconds)) {
       sc.statusTracker.getJobIdsForGroup(null).toSet should be (Set(defaultJobGroupJobId))
     }
     // Test jobs submitted in job groups:
     sc.setJobGroup("my-job-group", "description")
     sc.statusTracker.getJobIdsForGroup("my-job-group") should be (Seq.empty)
     val firstJobFuture = sc.parallelize(1 to 1000).countAsync()
-    val firstJobId = eventually(timeout(10 seconds)) {
+    val firstJobId = eventually(timeout(10.seconds)) {
       firstJobFuture.jobIds.head
     }
-    eventually(timeout(10 seconds)) {
+    eventually(timeout(10.seconds)) {
       sc.statusTracker.getJobIdsForGroup("my-job-group") should be (Seq(firstJobId))
     }
     val secondJobFuture = sc.parallelize(1 to 1000).countAsync()
-    val secondJobId = eventually(timeout(10 seconds)) {
+    val secondJobId = eventually(timeout(10.seconds)) {
       secondJobFuture.jobIds.head
     }
-    eventually(timeout(10 seconds)) {
+    eventually(timeout(10.seconds)) {
       sc.statusTracker.getJobIdsForGroup("my-job-group").toSet should be (
         Set(firstJobId, secondJobId))
     }
@@ -92,10 +91,10 @@ class StatusTrackerSuite extends SparkFunSuite with Matchers with LocalSparkCont
     sc.setJobGroup("my-job-group2", "description")
     sc.statusTracker.getJobIdsForGroup("my-job-group2") shouldBe empty
     val firstJobFuture = sc.parallelize(1 to 1000, 1).takeAsync(1)
-    val firstJobId = eventually(timeout(10 seconds)) {
+    val firstJobId = eventually(timeout(10.seconds)) {
       firstJobFuture.jobIds.head
     }
-    eventually(timeout(10 seconds)) {
+    eventually(timeout(10.seconds)) {
       sc.statusTracker.getJobIdsForGroup("my-job-group2") should be (Seq(firstJobId))
     }
   }
@@ -105,10 +104,10 @@ class StatusTrackerSuite extends SparkFunSuite with Matchers with LocalSparkCont
     sc.setJobGroup("my-job-group2", "description")
     sc.statusTracker.getJobIdsForGroup("my-job-group2") shouldBe empty
     val firstJobFuture = sc.parallelize(1 to 1000, 2).takeAsync(999)
-    val firstJobId = eventually(timeout(10 seconds)) {
+    eventually(timeout(10.seconds)) {
       firstJobFuture.jobIds.head
     }
-    eventually(timeout(10 seconds)) {
+    eventually(timeout(10.seconds)) {
       sc.statusTracker.getJobIdsForGroup("my-job-group2") should have size 2
     }
   }
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 641751f..2a17245 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -1277,7 +1277,7 @@ object SparkSubmitSuite extends SparkFunSuite with TimeLimits {
       Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
 
     try {
-      val exitCode = failAfter(60 seconds) { process.waitFor() }
+      val exitCode = failAfter(1.minute) { process.waitFor() }
       if (exitCode != 0) {
         fail(s"Process returned with exit code $exitCode. See the log4j logs for more detail.")
       }
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index 573a496..1ed2a1a 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -67,7 +67,7 @@ class StandaloneDynamicAllocationSuite
     master = makeMaster()
     workers = makeWorkers(10, 2048)
     // Wait until all workers register with master successfully
-    eventually(timeout(60.seconds), interval(10.millis)) {
+    eventually(timeout(1.minute), interval(10.milliseconds)) {
       assert(getMasterState.workers.size === numWorkers)
     }
   }
diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
index baeefea..a1d3077 100644
--- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
@@ -65,7 +65,7 @@ class AppClientSuite
     master = makeMaster()
     workers = makeWorkers(10, 2048)
     // Wait until all workers register with master successfully
-    eventually(timeout(60.seconds), interval(10.millis)) {
+    eventually(timeout(1.minute), interval(10.milliseconds)) {
       assert(getMasterState.workers.size === numWorkers)
     }
   }
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index d183170..1a326a8 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -25,7 +25,6 @@ import java.util.zip.{ZipInputStream, ZipOutputStream}
 
 import scala.collection.JavaConverters._
 import scala.concurrent.duration._
-import scala.language.postfixOps
 
 import com.google.common.io.{ByteStreams, Files}
 import org.apache.commons.io.FileUtils
@@ -142,7 +141,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
         clock.getTimeMillis(), "test", false))
 
       // Make sure the UI can be rendered.
-      list.foreach { case info =>
+      list.foreach { info =>
         val appUi = provider.getAppUI(info.id, None)
         appUi should not be null
         appUi should not be None
@@ -281,7 +280,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
       list.last.attempts.size should be (3)
       list.head.attempts.head.attemptId should be (Some("attempt1"))
 
-      list.foreach { case app =>
+      list.foreach { app =>
         app.attempts.foreach { attempt =>
           val appUi = provider.getAppUI(app.id, attempt.attemptId)
           appUi should not be null
@@ -734,7 +733,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
       provider.inSafeMode = false
       clock.setTime(10000)
 
-      eventually(timeout(1 second), interval(10 millis)) {
+      eventually(timeout(1.second), interval(10.milliseconds)) {
         provider.getConfig().keys should not contain ("HDFS State")
       }
     } finally {
@@ -747,12 +746,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
     val clock = new ManualClock()
     val provider = new SafeModeTestProvider(createTestConf(), clock)
     val errorHandler = mock(classOf[Thread.UncaughtExceptionHandler])
-    val initThread = provider.startSafeModeCheckThread(Some(errorHandler))
+    provider.startSafeModeCheckThread(Some(errorHandler))
     try {
       provider.inSafeMode = false
       clock.setTime(10000)
 
-      eventually(timeout(1 second), interval(10 millis)) {
+      eventually(timeout(1.second), interval(10.milliseconds)) {
         verify(errorHandler).uncaughtException(any(), any())
       }
     } finally {
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index 6665a89..c99bcf2 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -25,9 +25,7 @@ import javax.servlet.http.{HttpServletRequest, HttpServletRequestWrapper, HttpSe
 
 import scala.collection.JavaConverters._
 import scala.concurrent.duration._
-import scala.language.postfixOps
 
-import com.codahale.metrics.Counter
 import com.google.common.io.{ByteStreams, Files}
 import org.apache.commons.io.{FileUtils, IOUtils}
 import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
@@ -457,16 +455,6 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
     val port = server.boundPort
     val metrics = server.cacheMetrics
 
-    // assert that a metric has a value; if not dump the whole metrics instance
-    def assertMetric(name: String, counter: Counter, expected: Long): Unit = {
-      val actual = counter.getCount
-      if (actual != expected) {
-        // this is here because Scalatest loses stack depth
-        fail(s"Wrong $name value - expected $expected but got $actual" +
-            s" in metrics\n$metrics")
-      }
-    }
-
     // build a URL for an app or app/attempt plus a page underneath
     def buildURL(appId: String, suffix: String): URL = {
       new URL(s"http://localhost:$port/history/$appId$suffix")
@@ -477,13 +465,11 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
       new URL(s"http://localhost:$port/api/v1/applications/$appId$suffix")
     }
 
-    val historyServerRoot = new URL(s"http://localhost:$port/")
-
     // start initial job
     val d = sc.parallelize(1 to 10)
     d.count()
-    val stdInterval = interval(100 milliseconds)
-    val appId = eventually(timeout(20 seconds), stdInterval) {
+    val stdInterval = interval(100.milliseconds)
+    val appId = eventually(timeout(20.seconds), stdInterval) {
       val json = getContentAndCode("applications", port)._2.get
       val apps = parse(json).asInstanceOf[JArray].arr
       apps should have size 1
@@ -567,7 +553,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
     d2.count()
     dumpLogDir("After second job")
 
-    val stdTimeout = timeout(10 seconds)
+    val stdTimeout = timeout(10.seconds)
     logDebug("waiting for UI to update")
     eventually(stdTimeout, stdInterval) {
       assert(2 === getNumJobs(""),
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index fbf2acc..61aaaa5 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -26,7 +26,6 @@ import scala.collection.mutable
 import scala.collection.mutable.{HashMap, HashSet}
 import scala.concurrent.duration._
 import scala.io.Source
-import scala.language.postfixOps
 import scala.reflect.ClassTag
 
 import org.json4s._
@@ -216,7 +215,7 @@ class MasterSuite extends SparkFunSuite
       master = makeMaster(conf)
       master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
       // Wait until Master recover from checkpoint data.
-      eventually(timeout(5 seconds), interval(100 milliseconds)) {
+      eventually(timeout(5.seconds), interval(100.milliseconds)) {
         master.workers.size should be(1)
       }
 
@@ -234,7 +233,7 @@ class MasterSuite extends SparkFunSuite
       fakeWorkerInfo.coresUsed should be(0)
 
       master.self.send(MasterChangeAcknowledged(fakeAppInfo.id))
-      eventually(timeout(1 second), interval(10 milliseconds)) {
+      eventually(timeout(1.second), interval(10.milliseconds)) {
         // Application state should be WAITING when "MasterChangeAcknowledged" event executed.
         fakeAppInfo.state should be(ApplicationState.WAITING)
       }
@@ -242,7 +241,7 @@ class MasterSuite extends SparkFunSuite
       master.self.send(
         WorkerSchedulerStateResponse(fakeWorkerInfo.id, fakeExecutors, Seq(fakeDriverInfo.id)))
 
-      eventually(timeout(5 seconds), interval(100 milliseconds)) {
+      eventually(timeout(5.seconds), interval(100.milliseconds)) {
         getState(master) should be(RecoveryState.ALIVE)
       }
 
@@ -267,7 +266,7 @@ class MasterSuite extends SparkFunSuite
     val localCluster = new LocalSparkCluster(2, 2, 512, conf)
     localCluster.start()
     try {
-      eventually(timeout(5 seconds), interval(100 milliseconds)) {
+      eventually(timeout(5.seconds), interval(100.milliseconds)) {
         val json = Source.fromURL(s"http://localhost:${localCluster.masterWebUIPort}/json")
           .getLines().mkString("\n")
         val JArray(workers) = (parse(json) \ "workers")
@@ -293,7 +292,7 @@ class MasterSuite extends SparkFunSuite
     val localCluster = new LocalSparkCluster(2, 2, 512, conf)
     localCluster.start()
     try {
-      eventually(timeout(5 seconds), interval(100 milliseconds)) {
+      eventually(timeout(5.seconds), interval(100.milliseconds)) {
         val json = Source.fromURL(s"http://localhost:${localCluster.masterWebUIPort}/json")
           .getLines().mkString("\n")
         val JArray(workers) = (parse(json) \ "workers")
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
index 5e8b363..d899ee0 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
@@ -228,7 +228,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
     testCleanupFilesWithConfig(false)
   }
 
-  private def testCleanupFilesWithConfig(value: Boolean) = {
+  private def testCleanupFilesWithConfig(value: Boolean): Unit = {
     val conf = new SparkConf().set(config.STORAGE_CLEANUP_FILES_AFTER_EXECUTOR_EXIT, value)
 
     val cleanupCalled = new AtomicBoolean(false)
@@ -257,7 +257,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
     testWorkDirCleanupAndRemoveMetadataWithConfig(false)
   }
 
-  private def testWorkDirCleanupAndRemoveMetadataWithConfig(dbCleanupEnabled: Boolean) = {
+  private def testWorkDirCleanupAndRemoveMetadataWithConfig(dbCleanupEnabled: Boolean): Unit = {
     val conf = new SparkConf().set("spark.shuffle.service.db.enabled", dbCleanupEnabled.toString)
     conf.set("spark.worker.cleanup.appDataTtl", "60")
     conf.set("spark.shuffle.service.enabled", "true")
@@ -282,7 +282,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
     }
     executorDir.setLastModified(System.currentTimeMillis - (1000 * 120))
     worker.receive(WorkDirCleanup)
-    eventually(timeout(1000.milliseconds), interval(10.milliseconds)) {
+    eventually(timeout(1.second), interval(10.milliseconds)) {
       assert(!executorDir.exists())
       assert(cleanupCalled.get() == dbCleanupEnabled)
     }
diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
index 63a72e2..8a4f7a3 100644
--- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
@@ -27,7 +27,6 @@ import java.util.concurrent.atomic.AtomicBoolean
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable.Map
 import scala.concurrent.duration._
-import scala.language.postfixOps
 
 import org.mockito.ArgumentCaptor
 import org.mockito.ArgumentMatchers.{any, eq => meq}
diff --git a/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala b/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala
index 548949e..edec968 100644
--- a/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala
@@ -20,7 +20,6 @@ package org.apache.spark.launcher
 import java.util.concurrent.TimeUnit
 
 import scala.concurrent.duration._
-import scala.language.postfixOps
 
 import org.scalatest.Matchers
 import org.scalatest.concurrent.Eventually._
@@ -57,13 +56,13 @@ class LauncherBackendSuite extends SparkFunSuite with Matchers {
       .startApplication()
 
     try {
-      eventually(timeout(30 seconds), interval(100 millis)) {
+      eventually(timeout(30.seconds), interval(100.milliseconds)) {
         handle.getAppId() should not be (null)
       }
 
       handle.stop()
 
-      eventually(timeout(30 seconds), interval(100 millis)) {
+      eventually(timeout(30.seconds), interval(100.milliseconds)) {
         handle.getState() should be (SparkAppHandle.State.KILLED)
       }
     } finally {
diff --git a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
index 24b0144..a7eb0ec 100644
--- a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
@@ -142,7 +142,7 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim
     }
     assert(f.get() === 10)
 
-    failAfter(10 seconds) {
+    failAfter(10.seconds) {
       sem.acquire(2)
     }
   }
@@ -178,7 +178,7 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim
       f.get()
     }
 
-    failAfter(10 seconds) {
+    failAfter(10.seconds) {
       sem.acquire(2)
     }
   }
diff --git a/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala b/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala
index 478f069..c942328 100644
--- a/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.rdd
 
 import scala.concurrent.duration._
-import scala.language.postfixOps
 
 import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
 
@@ -174,7 +173,7 @@ class LocalCheckpointSuite extends SparkFunSuite with LocalSparkContext {
     val blockId = RDDBlockId(rdd.id, numPartitions - 1)
     bmm.removeBlock(blockId)
     // Wait until the block has been removed successfully.
-    eventually(timeout(1 seconds), interval(100 milliseconds)) {
+    eventually(timeout(1.second), interval(100.milliseconds)) {
       assert(bmm.getBlockStatus(blockId).isEmpty)
     }
     try {
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index 178d420..99b4e8f 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -26,7 +26,6 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.concurrent.Await
 import scala.concurrent.duration._
-import scala.language.postfixOps
 
 import com.google.common.io.Files
 import org.mockito.ArgumentMatchers.any
@@ -80,7 +79,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
       }
     })
     rpcEndpointRef.send("hello")
-    eventually(timeout(5 seconds), interval(10 millis)) {
+    eventually(timeout(5.seconds), interval(10.milliseconds)) {
       assert("hello" === message)
     }
   }
@@ -101,7 +100,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
     val rpcEndpointRef = anotherEnv.setupEndpointRef(env.address, "send-remotely")
     try {
       rpcEndpointRef.send("hello")
-      eventually(timeout(5 seconds), interval(10 millis)) {
+      eventually(timeout(5.seconds), interval(10.milliseconds)) {
         assert("hello" === message)
       }
     } finally {
@@ -180,7 +179,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
     val rpcEndpointRef = anotherEnv.setupEndpointRef(env.address, "ask-timeout")
     try {
       val e = intercept[RpcTimeoutException] {
-        rpcEndpointRef.askSync[String]("hello", new RpcTimeout(1 millis, shortProp))
+        rpcEndpointRef.askSync[String]("hello", new RpcTimeout(1.millisecond, shortProp))
       }
       // The SparkException cause should be a RpcTimeoutException with message indicating the
       // controlling timeout property
@@ -236,7 +235,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
       }
     })
 
-    eventually(timeout(5 seconds), interval(10 millis)) {
+    eventually(timeout(5.seconds), interval(10.milliseconds)) {
       assert(e.getMessage === "Oops!")
     }
   }
@@ -261,7 +260,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
 
     env.stop(endpointRef)
 
-    eventually(timeout(5 seconds), interval(10 millis)) {
+    eventually(timeout(5.seconds), interval(10.milliseconds)) {
       assert(e.getMessage === "Oops!")
     }
   }
@@ -282,7 +281,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
 
     endpointRef.send("Foo")
 
-    eventually(timeout(5 seconds), interval(10 millis)) {
+    eventually(timeout(5.seconds), interval(10.milliseconds)) {
       assert(e.getMessage === "Oops!")
     }
   }
@@ -303,7 +302,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
       }
     })
 
-    eventually(timeout(5 seconds), interval(10 millis)) {
+    eventually(timeout(5.seconds), interval(10.milliseconds)) {
       // Calling `self` in `onStart` is fine
       assert(callSelfSuccessfully)
     }
@@ -324,7 +323,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
 
     endpointRef.send("Foo")
 
-    eventually(timeout(5 seconds), interval(10 millis)) {
+    eventually(timeout(5.seconds), interval(10.milliseconds)) {
       // Calling `self` in `receive` is fine
       assert(callSelfSuccessfully)
     }
@@ -347,9 +346,9 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
 
     env.stop(endpointRef)
 
-    eventually(timeout(5 seconds), interval(10 millis)) {
+    eventually(timeout(5.seconds), interval(10.milliseconds)) {
       // Calling `self` in `onStop` will return null, so selfOption will be None
-      assert(selfOption == None)
+      assert(selfOption.isEmpty)
     }
   }
 
@@ -376,7 +375,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
         }.start()
       }
 
-      eventually(timeout(5 seconds), interval(5 millis)) {
+      eventually(timeout(5.seconds), interval(5.milliseconds)) {
         assert(result == 1000)
       }
 
@@ -401,7 +400,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
     env.stop(endpointRef)
     env.stop(endpointRef)
 
-    eventually(timeout(5 seconds), interval(5 millis)) {
+    eventually(timeout(5.seconds), interval(5.milliseconds)) {
       // Calling stop twice should only trigger onStop once.
       assert(onStopCount == 1)
     }
@@ -417,7 +416,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
     })
 
     val f = endpointRef.ask[String]("Hi")
-    val ack = ThreadUtils.awaitResult(f, 5 seconds)
+    val ack = ThreadUtils.awaitResult(f, 5.seconds)
     assert("ack" === ack)
 
     env.stop(endpointRef)
@@ -437,7 +436,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
     val rpcEndpointRef = anotherEnv.setupEndpointRef(env.address, "sendWithReply-remotely")
     try {
       val f = rpcEndpointRef.ask[String]("hello")
-      val ack = ThreadUtils.awaitResult(f, 5 seconds)
+      val ack = ThreadUtils.awaitResult(f, 5.seconds)
       assert("ack" === ack)
     } finally {
       anotherEnv.shutdown()
@@ -456,7 +455,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
 
     val f = endpointRef.ask[String]("Hi")
     val e = intercept[SparkException] {
-      ThreadUtils.awaitResult(f, 5 seconds)
+      ThreadUtils.awaitResult(f, 5.seconds)
     }
     assert("Oops" === e.getCause.getMessage)
 
@@ -478,7 +477,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
     try {
       val f = rpcEndpointRef.ask[String]("hello")
       val e = intercept[SparkException] {
-        ThreadUtils.awaitResult(f, 5 seconds)
+        ThreadUtils.awaitResult(f, 5.seconds)
       }
       assert("Oops" === e.getCause.getMessage)
     } finally {
@@ -530,14 +529,14 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
       // Send a message to set up the connection
       serverRefInServer2.send("hello")
 
-      eventually(timeout(5 seconds), interval(5 millis)) {
+      eventually(timeout(5.seconds), interval(5.milliseconds)) {
         assert(events.contains(("onConnected", serverEnv2.address)))
       }
 
       serverEnv2.shutdown()
       serverEnv2.awaitTermination()
 
-      eventually(timeout(5 seconds), interval(5 millis)) {
+      eventually(timeout(5.seconds), interval(5.milliseconds)) {
         assert(events.contains(("onConnected", serverEnv2.address)))
         assert(events.contains(("onDisconnected", serverEnv2.address)))
       }
@@ -558,7 +557,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
       // Send a message to set up the connection
       serverRefInClient.send("hello")
 
-      eventually(timeout(5 seconds), interval(5 millis)) {
+      eventually(timeout(5.seconds), interval(5.milliseconds)) {
         // We don't know the exact client address but at least we can verify the message type
         assert(events.asScala.map(_._1).exists(_ == "onConnected"))
       }
@@ -566,7 +565,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
       clientEnv.shutdown()
       clientEnv.awaitTermination()
 
-      eventually(timeout(5 seconds), interval(5 millis)) {
+      eventually(timeout(5.seconds), interval(5.milliseconds)) {
         // We don't know the exact client address but at least we can verify the message type
         assert(events.asScala.map(_._1).exists(_ == "onConnected"))
         assert(events.asScala.map(_._1).exists(_ == "onDisconnected"))
@@ -589,14 +588,14 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
       // Send a message to set up the connection
       serverRefInClient.send("hello")
 
-      eventually(timeout(5 seconds), interval(5 millis)) {
+      eventually(timeout(5.seconds), interval(5.milliseconds)) {
         assert(events.contains(("onConnected", serverEnv.address)))
       }
 
       serverEnv.shutdown()
       serverEnv.awaitTermination()
 
-      eventually(timeout(5 seconds), interval(5 millis)) {
+      eventually(timeout(5.seconds), interval(5.milliseconds)) {
         assert(events.contains(("onConnected", serverEnv.address)))
         assert(events.contains(("onDisconnected", serverEnv.address)))
       }
@@ -624,7 +623,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
     try {
       val f = rpcEndpointRef.ask[String]("hello")
       val e = intercept[SparkException] {
-        ThreadUtils.awaitResult(f, 1 seconds)
+        ThreadUtils.awaitResult(f, 1.second)
       }
       assert(e.getCause.isInstanceOf[NotSerializableException])
     } finally {
@@ -658,7 +657,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
       })
       val rpcEndpointRef = remoteEnv.setupEndpointRef(localEnv.address, "send-authentication")
       rpcEndpointRef.send("hello")
-      eventually(timeout(5 seconds), interval(10 millis)) {
+      eventually(timeout(5.seconds), interval(10.milliseconds)) {
         assert("hello" === message)
       }
     } finally {
@@ -778,8 +777,8 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
       }
     })
 
-    val longTimeout = new RpcTimeout(1 second, "spark.rpc.long.timeout")
-    val shortTimeout = new RpcTimeout(10 millis, "spark.rpc.short.timeout")
+    val longTimeout = new RpcTimeout(1.second, "spark.rpc.long.timeout")
+    val shortTimeout = new RpcTimeout(10.milliseconds, "spark.rpc.short.timeout")
 
     // Ask with immediate response, should complete successfully
     val fut1 = rpcEndpointRef.ask[String]("hello", longTimeout)
@@ -804,7 +803,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
     // once the future is complete to verify addMessageIfTimeout was invoked
     val reply3 =
       intercept[RpcTimeoutException] {
-        Await.result(fut3, 2000 millis)
+        Await.result(fut3, 2.seconds)
       }.getMessage
     // scalastyle:on awaitresult
 
diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
index ae306d3..33f48b8 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
@@ -36,7 +36,7 @@ import org.apache.spark.util.{RpcUtils, SerializableBuffer}
 class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext
     with Eventually {
 
-  private val executorUpTimeout = 60.seconds
+  private val executorUpTimeout = 1.minute
 
   test("serialized task larger than max RPC message size") {
     val conf = new SparkConf
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index e74f462..72c20a8 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -594,7 +594,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
     var rdd: RDD[_] = new MyRDD(sc, 1, Nil)
     (1 to 30).foreach(_ => rdd = rdd.zip(rdd))
     // getPreferredLocs runs quickly, indicating that exponential graph traversal is avoided.
-    failAfter(10 seconds) {
+    failAfter(10.seconds) {
       val preferredLocs = scheduler.getPreferredLocs(rdd, 0)
       // No preferred locations are returned.
       assert(preferredLocs.length === 0)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
index 2891dd6..f582ef5 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
@@ -22,7 +22,6 @@ import java.util.Date
 import java.util.concurrent.TimeoutException
 
 import scala.concurrent.duration._
-import scala.language.postfixOps
 
 import org.apache.hadoop.mapred._
 import org.apache.hadoop.mapreduce.TaskType
@@ -159,7 +158,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
     // It's an error if the job completes successfully even though no committer was authorized,
     // so throw an exception if the job was allowed to complete.
     intercept[TimeoutException] {
-      ThreadUtils.awaitResult(futureAction, 5 seconds)
+      ThreadUtils.awaitResult(futureAction, 5.seconds)
     }
     assert(tempDir.list().size === 0)
   }
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
index 43d6ec1..76be693 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
@@ -170,7 +170,7 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
       // and notifies the job waiter before our original thread in the task scheduler finishes
       // handling the event and marks the taskset as complete.  So its ok if we need to wait a
       // *little* bit longer for the original taskscheduler thread to finish up to deal w/ the race.
-      eventually(timeout(1 second), interval(10 millis)) {
+      eventually(timeout(1.second), interval(10.milliseconds)) {
         assert(taskScheduler.runningTaskSets.isEmpty)
       }
       assert(!backend.hasTasks)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
index 5b17dbf..ae46435 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
@@ -23,7 +23,6 @@ import java.nio.ByteBuffer
 
 import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.duration._
-import scala.language.postfixOps
 import scala.util.control.NonFatal
 
 import com.google.common.util.concurrent.MoreExecutors
@@ -58,18 +57,18 @@ private class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: Task
       // Only remove the result once, since we'd like to test the case where the task eventually
       // succeeds.
       serializer.get().deserialize[TaskResult[_]](serializedData) match {
-        case IndirectTaskResult(blockId, size) =>
+        case IndirectTaskResult(blockId, _) =>
           sparkEnv.blockManager.master.removeBlock(blockId)
           // removeBlock is asynchronous. Need to wait it's removed successfully
           try {
-            eventually(timeout(3 seconds), interval(200 milliseconds)) {
+            eventually(timeout(3.seconds), interval(200.milliseconds)) {
               assert(!sparkEnv.blockManager.master.contains(blockId))
             }
             removeBlockSuccessfully = true
           } catch {
             case NonFatal(e) => removeBlockSuccessfully = false
           }
-        case directResult: DirectTaskResult[_] =>
+        case _: DirectTaskResult[_] =>
           taskSetManager.abort("Internal error: expect only indirect results")
       }
       serializedData.rewind()
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index b158b6c..a739701 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -22,7 +22,6 @@ import java.util.Locale
 import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.duration._
 import scala.language.implicitConversions
-import scala.language.postfixOps
 
 import org.mockito.Mockito.{mock, when}
 import org.scalatest.{BeforeAndAfter, Matchers}
@@ -247,7 +246,7 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
 
     // Add another normal block manager and test that 2x replication works
     makeBlockManager(10000, "anotherStore")
-    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+    eventually(timeout(1.second), interval(10.milliseconds)) {
       assert(replicateAndGetNumCopies("a2") === 2)
     }
   }
@@ -272,14 +271,14 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
 
     // Add another store, 3x replication should work now, 4x replication should only replicate 3x
     val newStore1 = makeBlockManager(storeSize, s"newstore1")
-    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+    eventually(timeout(1.second), interval(10.milliseconds)) {
       assert(replicateAndGetNumCopies("a3", 3) === 3)
     }
     assert(replicateAndGetNumCopies("a4", 4) === 3)
 
     // Add another store, 4x replication should work now
     val newStore2 = makeBlockManager(storeSize, s"newstore2")
-    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+    eventually(timeout(1.second), interval(10.milliseconds)) {
       assert(replicateAndGetNumCopies("a5", 4) === 4)
     }
 
@@ -295,7 +294,7 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
     val newStores = (3 to 5).map {
       i => makeBlockManager(storeSize, s"newstore$i")
     }
-    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+    eventually(timeout(1.second), interval(10.milliseconds)) {
       assert(replicateAndGetNumCopies("a7", 3) === 3)
     }
   }
@@ -454,13 +453,13 @@ class BlockManagerProactiveReplicationSuite extends BlockManagerReplicationBehav
       master.removeExecutor(bm.blockManagerId.executorId)
       bm.stop()
       // giving enough time for replication to happen and new block be reported to master
-      eventually(timeout(5 seconds), interval(100 millis)) {
+      eventually(timeout(5.seconds), interval(100.milliseconds)) {
         val newLocations = master.getLocations(blockId).toSet
         assert(newLocations.size === replicationFactor)
       }
     }
 
-    val newLocations = eventually(timeout(5 seconds), interval(100 millis)) {
+    val newLocations = eventually(timeout(5.seconds), interval(100.milliseconds)) {
       val _newLocations = master.getLocations(blockId).toSet
       assert(_newLocations.size === replicationFactor)
       _newLocations
@@ -472,7 +471,7 @@ class BlockManagerProactiveReplicationSuite extends BlockManagerReplicationBehav
       "New locations contain stopped block managers.")
 
     // Make sure all locks have been released.
-    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+    eventually(timeout(1.second), interval(10.milliseconds)) {
       initialStores.filter(bm => newLocations.contains(bm.blockManagerId)).foreach { bm =>
         assert(bm.blockInfoManager.getTaskLockCount(BlockInfo.NON_TASK_WRITER) === 0)
       }
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 576e5f5..9f3d8f2 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -23,7 +23,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.Future
 import scala.concurrent.duration._
-import scala.language.{implicitConversions, postfixOps}
+import scala.language.implicitConversions
 import scala.reflect.ClassTag
 
 import org.apache.commons.lang3.RandomUtils
@@ -275,19 +275,19 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     master.removeBlock("a2-to-remove")
     master.removeBlock("a3-to-remove")
 
-    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+    eventually(timeout(1.second), interval(10.milliseconds)) {
       assert(!store.hasLocalBlock("a1-to-remove"))
       master.getLocations("a1-to-remove") should have size 0
     }
-    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+    eventually(timeout(1.second), interval(10.milliseconds)) {
       assert(!store.hasLocalBlock("a2-to-remove"))
       master.getLocations("a2-to-remove") should have size 0
     }
-    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+    eventually(timeout(1.second), interval(10.milliseconds)) {
       assert(store.hasLocalBlock("a3-to-remove"))
       master.getLocations("a3-to-remove") should have size 0
     }
-    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+    eventually(timeout(1.second), interval(10.milliseconds)) {
       val memStatus = master.getMemoryStatus.head._2
       memStatus._1 should equal (40000L)
       memStatus._2 should equal (40000L)
@@ -305,15 +305,15 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     store.putSingle("nonrddblock", a3, StorageLevel.MEMORY_ONLY)
     master.removeRdd(0, blocking = false)
 
-    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+    eventually(timeout(1.second), interval(10.milliseconds)) {
       store.getSingleAndReleaseLock(rdd(0, 0)) should be (None)
       master.getLocations(rdd(0, 0)) should have size 0
     }
-    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+    eventually(timeout(1.second), interval(10.milliseconds)) {
       store.getSingleAndReleaseLock(rdd(0, 1)) should be (None)
       master.getLocations(rdd(0, 1)) should have size 0
     }
-    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+    eventually(timeout(1.second), interval(10.milliseconds)) {
       store.getSingleAndReleaseLock("nonrddblock") should not be (None)
       master.getLocations("nonrddblock") should have size (1)
     }
@@ -378,7 +378,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     // remove broadcast 1 block from both the stores asynchronously
     // and verify all broadcast 1 blocks have been removed
     master.removeBroadcast(1, removeFromMaster = true, blocking = false)
-    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+    eventually(timeout(1.second), interval(10.milliseconds)) {
       assert(!driverStore.hasLocalBlock(broadcast1BlockId))
       assert(!executorStore.hasLocalBlock(broadcast1BlockId))
     }
@@ -386,7 +386,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     // remove broadcast 2 from both the stores asynchronously
     // and verify all broadcast 2 blocks have been removed
     master.removeBroadcast(2, removeFromMaster = true, blocking = false)
-    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+    eventually(timeout(1.second), interval(10.milliseconds)) {
       assert(!driverStore.hasLocalBlock(broadcast2BlockId))
       assert(!driverStore.hasLocalBlock(broadcast2BlockId2))
       assert(!executorStore.hasLocalBlock(broadcast2BlockId))
@@ -905,7 +905,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     }
 
     // Make sure get a1 doesn't hang and returns None.
-    failAfter(1 second) {
+    failAfter(1.second) {
       assert(store.getSingleAndReleaseLock("a1").isEmpty, "a1 should not be in store")
     }
   }
diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
index a9f03eb..1913b8d 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
@@ -123,12 +123,12 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
       val ui = sc.ui.get
       val rdd = sc.parallelize(Seq(1, 2, 3))
       rdd.persist(StorageLevels.DISK_ONLY).count()
-      eventually(timeout(5 seconds), interval(50 milliseconds)) {
+      eventually(timeout(5.seconds), interval(50.milliseconds)) {
         goToUi(ui, "/storage")
         val tableRowText = findAll(cssSelector("#storage-by-rdd-table td")).map(_.text).toSeq
         tableRowText should contain (StorageLevels.DISK_ONLY.description)
       }
-      eventually(timeout(5 seconds), interval(50 milliseconds)) {
+      eventually(timeout(5.seconds), interval(50.milliseconds)) {
         goToUi(ui, "/storage/rdd/?id=0")
         val tableRowText = findAll(cssSelector("#rdd-storage-by-block-table td")).map(_.text).toSeq
         tableRowText should contain (StorageLevels.DISK_ONLY.description)
@@ -143,12 +143,12 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
 
       rdd.unpersist(blocking = true)
       rdd.persist(StorageLevels.MEMORY_ONLY).count()
-      eventually(timeout(5 seconds), interval(50 milliseconds)) {
+      eventually(timeout(5.seconds), interval(50.milliseconds)) {
         goToUi(ui, "/storage")
         val tableRowText = findAll(cssSelector("#storage-by-rdd-table td")).map(_.text).toSeq
         tableRowText should contain (StorageLevels.MEMORY_ONLY.description)
       }
-      eventually(timeout(5 seconds), interval(50 milliseconds)) {
+      eventually(timeout(5.seconds), interval(50.milliseconds)) {
         goToUi(ui, "/storage/rdd/?id=0")
         val tableRowText = findAll(cssSelector("#rdd-storage-by-block-table td")).map(_.text).toSeq
         tableRowText should contain (StorageLevels.MEMORY_ONLY.description)
@@ -203,7 +203,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
       intercept[SparkException] {
         sc.parallelize(1 to 10).map { x => throw new Exception()}.collect()
       }
-      eventually(timeout(5 seconds), interval(50 milliseconds)) {
+      eventually(timeout(5.seconds), interval(50.milliseconds)) {
         goToUi(sc, "/stages")
         find(id("active")) should be(None)  // Since we hide empty tables
         find(id("failed")).get.text should be("Failed Stages (1)")
@@ -218,7 +218,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
       intercept[SparkException] {
         sc.parallelize(1 to 10).map { x => unserializableObject}.collect()
       }
-      eventually(timeout(5 seconds), interval(50 milliseconds)) {
+      eventually(timeout(5.seconds), interval(50.milliseconds)) {
         goToUi(sc, "/stages")
         find(id("active")) should be(None)  // Since we hide empty tables
         // The failure occurs before the stage becomes active, hence we should still show only one
@@ -239,7 +239,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
 
     withSpark(newSparkContext(killEnabled = true)) { sc =>
       runSlowJob(sc)
-      eventually(timeout(5 seconds), interval(50 milliseconds)) {
+      eventually(timeout(5.seconds), interval(50.milliseconds)) {
         goToUi(sc, "/jobs")
         assert(hasKillLink)
       }
@@ -247,7 +247,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
 
     withSpark(newSparkContext(killEnabled = false)) { sc =>
       runSlowJob(sc)
-      eventually(timeout(5 seconds), interval(50 milliseconds)) {
+      eventually(timeout(5.seconds), interval(50.milliseconds)) {
         goToUi(sc, "/jobs")
         assert(!hasKillLink)
       }
@@ -255,7 +255,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
 
     withSpark(newSparkContext(killEnabled = true)) { sc =>
       runSlowJob(sc)
-      eventually(timeout(5 seconds), interval(50 milliseconds)) {
+      eventually(timeout(5.seconds), interval(50.milliseconds)) {
         goToUi(sc, "/stages")
         assert(hasKillLink)
       }
@@ -263,7 +263,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
 
     withSpark(newSparkContext(killEnabled = false)) { sc =>
       runSlowJob(sc)
-      eventually(timeout(5 seconds), interval(50 milliseconds)) {
+      eventually(timeout(5.seconds), interval(50.milliseconds)) {
         goToUi(sc, "/stages")
         assert(!hasKillLink)
       }
@@ -274,7 +274,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
     withSpark(newSparkContext()) { sc =>
       // If no job has been run in a job group, then "(Job Group)" should not appear in the header
       sc.parallelize(Seq(1, 2, 3)).count()
-      eventually(timeout(5 seconds), interval(50 milliseconds)) {
+      eventually(timeout(5.seconds), interval(50.milliseconds)) {
         goToUi(sc, "/jobs")
         val tableHeaders = findAll(cssSelector("th")).map(_.text).toSeq
         tableHeaders(0) should not startWith "Job Id (Job Group)"
@@ -282,7 +282,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
       // Once at least one job has been run in a job group, then we should display the group name:
       sc.setJobGroup("my-job-group", "my-job-group-description")
       sc.parallelize(Seq(1, 2, 3)).count()
-      eventually(timeout(5 seconds), interval(50 milliseconds)) {
+      eventually(timeout(5.seconds), interval(50.milliseconds)) {
         goToUi(sc, "/jobs")
         val tableHeaders = findAll(cssSelector("th")).map(_.text).toSeq
         // Can suffix up/down arrow in the header
@@ -325,7 +325,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
         }
       }
       mappedData.count()
-      eventually(timeout(5 seconds), interval(50 milliseconds)) {
+      eventually(timeout(5.seconds), interval(50.milliseconds)) {
         goToUi(sc, "/jobs")
         find(cssSelector(".stage-progress-cell")).get.text should be ("2/2 (1 failed)")
         find(cssSelector(".progress-cell .progress")).get.text should be ("2/2 (1 failed)")
@@ -373,7 +373,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
       }.groupBy(identity).map(identity).groupBy(identity).map(identity)
       // Start the job:
       rdd.countAsync()
-      eventually(timeout(10 seconds), interval(50 milliseconds)) {
+      eventually(timeout(10.seconds), interval(50.milliseconds)) {
         goToUi(sc, "/jobs/job/?id=0")
         find(id("active")).get.text should be ("Active Stages (1)")
         find(id("pending")).get.text should be ("Pending Stages (2)")
@@ -399,7 +399,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
       // mentioned in its job start event but which were never actually executed:
       rdd.count()
       rdd.count()
-      eventually(timeout(10 seconds), interval(50 milliseconds)) {
+      eventually(timeout(10.seconds), interval(50.milliseconds)) {
         goToUi(sc, "/jobs")
         // The completed jobs table should have two rows. The first row will be the most recent job:
         val firstRow = find(cssSelector("tbody tr")).get.underlying
@@ -426,7 +426,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
       // mentioned in its job start event but which were never actually executed:
       rdd.count()
       rdd.count()
-      eventually(timeout(10 seconds), interval(50 milliseconds)) {
+      eventually(timeout(10.seconds), interval(50.milliseconds)) {
         goToUi(sc, "/jobs/job/?id=1")
         find(id("pending")) should be (None)
         find(id("active")) should be (None)
@@ -454,7 +454,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
       // mentioned in its job start event but which were never actually executed:
       rdd.count()
       rdd.count()
-      eventually(timeout(10 seconds), interval(50 milliseconds)) {
+      eventually(timeout(10.seconds), interval(50.milliseconds)) {
         goToUi(sc, "/jobs")
         findAll(cssSelector("tbody tr a")).foreach { link =>
           link.text.toLowerCase(Locale.ROOT) should include ("count")
@@ -476,7 +476,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
         })
       }
       sparkUI.attachTab(newTab)
-      eventually(timeout(10 seconds), interval(50 milliseconds)) {
+      eventually(timeout(10.seconds), interval(50.milliseconds)) {
         goToUi(sc, "")
         find(cssSelector("""ul li a[href*="jobs"]""")) should not be(None)
         find(cssSelector("""ul li a[href*="stages"]""")) should not be(None)
@@ -484,13 +484,13 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
         find(cssSelector("""ul li a[href*="environment"]""")) should not be(None)
         find(cssSelector("""ul li a[href*="foo"]""")) should not be(None)
       }
-      eventually(timeout(10 seconds), interval(50 milliseconds)) {
+      eventually(timeout(10.seconds), interval(50.milliseconds)) {
         // check whether new page exists
         goToUi(sc, "/foo")
         find(cssSelector("b")).get.text should include ("html magic")
       }
       sparkUI.detachTab(newTab)
-      eventually(timeout(10 seconds), interval(50 milliseconds)) {
+      eventually(timeout(10.seconds), interval(50.milliseconds)) {
         goToUi(sc, "")
         find(cssSelector("""ul li a[href*="jobs"]""")) should not be(None)
         find(cssSelector("""ul li a[href*="stages"]""")) should not be(None)
@@ -498,7 +498,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
         find(cssSelector("""ul li a[href*="environment"]""")) should not be(None)
         find(cssSelector("""ul li a[href*="foo"]""")) should be(None)
       }
-      eventually(timeout(10 seconds), interval(50 milliseconds)) {
+      eventually(timeout(10.seconds), interval(50.milliseconds)) {
         // check new page not exist
         goToUi(sc, "/foo")
         find(cssSelector("b")) should be(None)
@@ -509,7 +509,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
   test("kill stage POST/GET response is correct") {
     withSpark(newSparkContext(killEnabled = true)) { sc =>
       sc.parallelize(1 to 10).map{x => Thread.sleep(10000); x}.countAsync()
-      eventually(timeout(5 seconds), interval(50 milliseconds)) {
+      eventually(timeout(5.seconds), interval(50.milliseconds)) {
         val url = new URL(
           sc.ui.get.webUrl.stripSuffix("/") + "/stages/stage/kill/?id=0")
         // SPARK-6846: should be POST only but YARN AM doesn't proxy POST
@@ -522,7 +522,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
   test("kill job POST/GET response is correct") {
     withSpark(newSparkContext(killEnabled = true)) { sc =>
       sc.parallelize(1 to 10).map{x => Thread.sleep(10000); x}.countAsync()
-      eventually(timeout(5 seconds), interval(50 milliseconds)) {
+      eventually(timeout(5.seconds), interval(50.milliseconds)) {
         val url = new URL(
           sc.ui.get.webUrl.stripSuffix("/") + "/jobs/job/kill/?id=0")
         // SPARK-6846: should be POST only but YARN AM doesn't proxy POST
@@ -560,7 +560,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
         ("8", "count")
       )
 
-      eventually(timeout(1 second), interval(50 milliseconds)) {
+      eventually(timeout(1.second), interval(50.milliseconds)) {
         goToUi(sc, "/jobs")
         // The completed jobs table should have two rows. The first row will be the most recent job:
         find("completed-summary").get.text should be ("Completed Jobs: 10, only showing 2")
@@ -606,7 +606,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
         ("17", "groupBy")
       )
 
-      eventually(timeout(1 second), interval(50 milliseconds)) {
+      eventually(timeout(1.second), interval(50.milliseconds)) {
         goToUi(sc, "/stages")
         find("completed-summary").get.text should be ("Completed Stages: 20, only showing 3")
         find("completed").get.text should be ("Completed Stages (20, only showing 3)")
@@ -679,7 +679,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
         sc.parallelize(Seq(1, 2, 3)).map(identity).groupBy(identity).map(identity).groupBy(identity)
       rdd.count()
 
-      eventually(timeout(5 seconds), interval(100 milliseconds)) {
+      eventually(timeout(5.seconds), interval(100.milliseconds)) {
         val stage0 = Source.fromURL(sc.ui.get.webUrl +
           "/stages/stage/?id=0&attempt=0&expandDagViz=true").mkString
         assert(stage0.contains("digraph G {\n  subgraph clusterstage_0 {\n    " +
@@ -718,7 +718,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
       rdd.count()
       rdd.count()
 
-      eventually(timeout(5 seconds), interval(50 milliseconds)) {
+      eventually(timeout(5.seconds), interval(50.milliseconds)) {
         goToUi(sc, "/stages")
         find(id("skipped")).get.text should be("Skipped Stages (1)")
       }
diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
index 1bd7aed..34fd218 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
@@ -75,7 +75,7 @@ class UISuite extends SparkFunSuite {
   ignore("basic ui visibility") {
     withSpark(newSparkContext()) { sc =>
       // test if the ui is visible, and all the expected tabs are visible
-      eventually(timeout(10 seconds), interval(50 milliseconds)) {
+      eventually(timeout(10.seconds), interval(50.milliseconds)) {
         val html = Source.fromURL(sc.ui.get.webUrl).mkString
         assert(!html.contains("random data that should not be present"))
         assert(html.toLowerCase(Locale.ROOT).contains("stages"))
@@ -89,7 +89,7 @@ class UISuite extends SparkFunSuite {
   ignore("visibility at localhost:4040") {
     withSpark(newSparkContext()) { sc =>
       // test if visible from http://localhost:4040
-      eventually(timeout(10 seconds), interval(50 milliseconds)) {
+      eventually(timeout(10.seconds), interval(50.milliseconds)) {
         val html = Source.fromURL("http://localhost:4040").mkString
         assert(html.toLowerCase(Locale.ROOT).contains("stages"))
       }
diff --git a/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala b/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala
index 5507457..45aad3f 100644
--- a/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala
@@ -21,7 +21,6 @@ import java.util.concurrent.{ConcurrentLinkedQueue, CountDownLatch}
 
 import scala.collection.JavaConverters._
 import scala.concurrent.duration._
-import scala.language.postfixOps
 
 import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
 import org.scalatest.concurrent.Eventually._
@@ -45,7 +44,7 @@ class EventLoopSuite extends SparkFunSuite with TimeLimits {
     }
     eventLoop.start()
     (1 to 100).foreach(eventLoop.post)
-    eventually(timeout(5 seconds), interval(5 millis)) {
+    eventually(timeout(5.seconds), interval(5.milliseconds)) {
       assert((1 to 100) === buffer.asScala.toSeq)
     }
     eventLoop.stop()
@@ -80,7 +79,7 @@ class EventLoopSuite extends SparkFunSuite with TimeLimits {
     }
     eventLoop.start()
     eventLoop.post(1)
-    eventually(timeout(5 seconds), interval(5 millis)) {
+    eventually(timeout(5.seconds), interval(5.milliseconds)) {
       assert(e === receivedError)
     }
     eventLoop.stop()
@@ -102,7 +101,7 @@ class EventLoopSuite extends SparkFunSuite with TimeLimits {
     }
     eventLoop.start()
     eventLoop.post(1)
-    eventually(timeout(5 seconds), interval(5 millis)) {
+    eventually(timeout(5.seconds), interval(5.milliseconds)) {
       assert(e === receivedError)
       assert(eventLoop.isActive)
     }
@@ -157,7 +156,7 @@ class EventLoopSuite extends SparkFunSuite with TimeLimits {
       }.start()
     }
 
-    eventually(timeout(5 seconds), interval(5 millis)) {
+    eventually(timeout(5.seconds), interval(5.milliseconds)) {
       assert(threadNum * eventsFromEachThread === receivedEventsCount)
     }
     eventLoop.stop()
@@ -182,7 +181,7 @@ class EventLoopSuite extends SparkFunSuite with TimeLimits {
     }
     eventLoop.start()
     eventLoop.post(1)
-    failAfter(5 seconds) {
+    failAfter(5.seconds) {
       // Wait until we enter `onReceive`
       onReceiveLatch.await()
       eventLoop.stop()
@@ -203,7 +202,7 @@ class EventLoopSuite extends SparkFunSuite with TimeLimits {
     }
     eventLoop.start()
     eventLoop.post(1)
-    eventually(timeout(5 seconds), interval(5 millis)) {
+    eventually(timeout(5.seconds), interval(5.milliseconds)) {
       assert(!eventLoop.isActive)
     }
   }
@@ -227,7 +226,7 @@ class EventLoopSuite extends SparkFunSuite with TimeLimits {
       }
     }
     eventLoop.start()
-    eventually(timeout(5 seconds), interval(5 millis)) {
+    eventually(timeout(5.seconds), interval(5.milliseconds)) {
       assert(!eventLoop.isActive)
     }
     assert(onStopCalled)
@@ -250,7 +249,7 @@ class EventLoopSuite extends SparkFunSuite with TimeLimits {
     }
     eventLoop.start()
     eventLoop.post(1)
-    eventually(timeout(5 seconds), interval(5 millis)) {
+    eventually(timeout(5.seconds), interval(5.milliseconds)) {
       assert(!eventLoop.isActive)
     }
     assert(onStopCalled)
@@ -274,7 +273,7 @@ class EventLoopSuite extends SparkFunSuite with TimeLimits {
     }
     eventLoop.start()
     eventLoop.post(1)
-    eventually(timeout(5 seconds), interval(5 millis)) {
+    eventually(timeout(5.seconds), interval(5.milliseconds)) {
       assert(!eventLoop.isActive)
     }
     assert(onStopCalled)
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index d5a20cc..2b5993a 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark.util.collection
 
 import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.duration._
-import scala.language.postfixOps
 import scala.ref.WeakReference
 
 import org.scalatest.Matchers
@@ -460,7 +459,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite
     // https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala
     // (lines 69-89)
     // assert(map.currentMap == null)
-    eventually(timeout(5 seconds), interval(200 milliseconds)) {
+    eventually(timeout(5.seconds), interval(200.milliseconds)) {
       System.gc()
       // direct asserts introduced some macro generated code that held a reference to the map
       val tmpIsNull = null == underlyingMapRef.get.orNull
diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala
index 609696b..e9e547e 100644
--- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala
@@ -129,7 +129,7 @@ abstract class DockerJDBCIntegrationSuite
       // Start the container and wait until the database can accept JDBC connections:
       docker.startContainer(containerId)
       jdbcUrl = db.getJdbcUrl(dockerIp, externalPort)
-      eventually(timeout(60.seconds), interval(1.seconds)) {
+      eventually(timeout(1.minute), interval(1.second)) {
         val conn = java.sql.DriverManager.getConnection(jdbcUrl)
         conn.close()
       }
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala
index d9edf3c..cabc8e1 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala
@@ -99,7 +99,7 @@ class KafkaDontFailOnDataLossSuite extends StreamTest with KafkaMissingOffsetsTe
     testUtils.createTopic(topic, partitions = 1)
     testUtils.sendMessages(topic, (0 until 50).map(_.toString).toArray)
 
-    eventually(timeout(60.seconds)) {
+    eventually(timeout(1.minute)) {
       assert(
         testUtils.getEarliestOffsets(Set(topic)).head._2 > 0,
         "Kafka didn't delete records after 1 minute")
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index 70b6e67..f2e4ee7 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -24,7 +24,6 @@ import java.util.{Collections, Map => JMap, Properties, UUID}
 import java.util.concurrent.TimeUnit
 
 import scala.collection.JavaConverters._
-import scala.language.postfixOps
 import scala.util.Random
 
 import kafka.api.Request
@@ -138,7 +137,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
 
     setupEmbeddedZookeeper()
     setupEmbeddedKafkaServer()
-    eventually(timeout(60.seconds)) {
+    eventually(timeout(1.minute)) {
       assert(zkUtils.getAllBrokersInCluster().nonEmpty, "Broker was not up in 60 seconds")
     }
   }
@@ -414,7 +413,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
       topic: String,
       numPartitions: Int,
       servers: Seq[KafkaServer]) {
-    eventually(timeout(60.seconds), interval(200.millis)) {
+    eventually(timeout(1.minute), interval(200.milliseconds)) {
       try {
         verifyTopicDeletion(topic, numPartitions, servers)
       } catch {
@@ -439,7 +438,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
       case _ =>
         false
     }
-    eventually(timeout(60.seconds)) {
+    eventually(timeout(1.minute)) {
       assert(isPropagated, s"Partition [$topic, $partition] metadata not propagated after timeout")
     }
   }
@@ -448,7 +447,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
    * Wait until the latest offset of the given `TopicPartition` is not less than `offset`.
    */
   def waitUntilOffsetAppears(topicPartition: TopicPartition, offset: Long): Unit = {
-    eventually(timeout(60.seconds)) {
+    eventually(timeout(1.minute)) {
       val currentOffset = getLatestOffsets(Set(topicPartition.topic)).get(topicPartition)
       assert(currentOffset.nonEmpty && currentOffset.get >= offset)
     }
diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
index e042ae0..422f53d 100644
--- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
+++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
@@ -25,7 +25,6 @@ import java.util.concurrent.atomic.AtomicLong
 
 import scala.collection.JavaConverters._
 import scala.concurrent.duration._
-import scala.language.postfixOps
 import scala.util.Random
 
 import org.apache.kafka.clients.consumer._
@@ -153,7 +152,7 @@ class DirectKafkaStreamSuite
       allReceived.addAll(Arrays.asList(rdd.map(r => (r.key, r.value)).collect(): _*))
     }
     ssc.start()
-    eventually(timeout(100000.milliseconds), interval(1000.milliseconds)) {
+    eventually(timeout(100.seconds), interval(1.second)) {
       assert(allReceived.size === expectedTotal,
         "didn't get expected number of messages, messages:\n" +
           allReceived.asScala.mkString("\n"))
@@ -219,7 +218,7 @@ class DirectKafkaStreamSuite
       allReceived.addAll(Arrays.asList(rdd.map(r => (r.key, r.value)).collect(): _*))
     }
     ssc.start()
-    eventually(timeout(100000.milliseconds), interval(1000.milliseconds)) {
+    eventually(timeout(100.seconds), interval(1.second)) {
       assert(allReceived.size === expectedTotal,
         "didn't get expected number of messages, messages:\n" +
           allReceived.asScala.mkString("\n"))
@@ -243,7 +242,7 @@ class DirectKafkaStreamSuite
 
     // Send some initial messages before starting context
     kafkaTestUtils.sendMessages(topic, data)
-    eventually(timeout(10 seconds), interval(20 milliseconds)) {
+    eventually(timeout(10.seconds), interval(20.milliseconds)) {
       assert(getLatestOffset() > 3)
     }
     val offsetBeforeStart = getLatestOffset()
@@ -272,7 +271,7 @@ class DirectKafkaStreamSuite
     ssc.start()
     val newData = Map("b" -> 10)
     kafkaTestUtils.sendMessages(topic, newData)
-    eventually(timeout(10 seconds), interval(50 milliseconds)) {
+    eventually(timeout(10.seconds), interval(50.milliseconds)) {
       collectedData.contains("b")
     }
     assert(!collectedData.contains("a"))
@@ -295,7 +294,7 @@ class DirectKafkaStreamSuite
 
     // Send some initial messages before starting context
     kafkaTestUtils.sendMessages(topic, data)
-    eventually(timeout(10 seconds), interval(20 milliseconds)) {
+    eventually(timeout(10.seconds), interval(20.milliseconds)) {
       assert(getLatestOffset() >= 10)
     }
     val offsetBeforeStart = getLatestOffset()
@@ -326,7 +325,7 @@ class DirectKafkaStreamSuite
     ssc.start()
     val newData = Map("b" -> 10)
     kafkaTestUtils.sendMessages(topic, newData)
-    eventually(timeout(10 seconds), interval(50 milliseconds)) {
+    eventually(timeout(10.seconds), interval(50.milliseconds)) {
       collectedData.contains("b")
     }
     assert(!collectedData.contains("a"))
@@ -375,7 +374,7 @@ class DirectKafkaStreamSuite
       sendData(i)
     }
 
-    eventually(timeout(20 seconds), interval(50 milliseconds)) {
+    eventually(timeout(20.seconds), interval(50.milliseconds)) {
       assert(DirectKafkaStreamSuite.total.get === (1 to 10).sum)
     }
 
@@ -414,7 +413,7 @@ class DirectKafkaStreamSuite
       sendData(i)
     }
 
-    eventually(timeout(20 seconds), interval(50 milliseconds)) {
+    eventually(timeout(20.seconds), interval(50.milliseconds)) {
       assert(DirectKafkaStreamSuite.total.get === (1 to 20).sum)
     }
     ssc.stop()
@@ -437,7 +436,7 @@ class DirectKafkaStreamSuite
     def sendDataAndWaitForReceive(data: Seq[Int]) {
       val strings = data.map { _.toString}
       kafkaTestUtils.sendMessages(topic, strings.map { _ -> 1}.toMap)
-      eventually(timeout(10 seconds), interval(50 milliseconds)) {
+      eventually(timeout(10.seconds), interval(50.milliseconds)) {
         assert(strings.forall { collectedData.contains })
       }
     }
@@ -612,7 +611,7 @@ class DirectKafkaStreamSuite
       estimator.updateRate(rate)  // Set a new rate.
       // Expect blocks of data equal to "rate", scaled by the interval length in secs.
       val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001)
-      eventually(timeout(5.seconds), interval(10 milliseconds)) {
+      eventually(timeout(5.seconds), interval(10.milliseconds)) {
         // Assert that rate estimator values are used to determine maxMessagesPerPartition.
         // Funky "-" in message makes the complete assertion message read better.
         assert(collectedData.asScala.exists(_.size == expectedSize),
diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala
index 9ea7bfc..ac0e6a8 100644
--- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala
+++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala
@@ -21,7 +21,6 @@ import java.util.concurrent.TimeoutException
 
 import scala.concurrent.{Await, ExecutionContext, Future}
 import scala.concurrent.duration._
-import scala.language.postfixOps
 
 import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
 import org.mockito.ArgumentMatchers._
@@ -87,7 +86,7 @@ class KinesisCheckpointerSuite extends TestSuiteBase
     kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock)
     clock.advance(5 * checkpointInterval.milliseconds)
 
-    eventually(timeout(1 second)) {
+    eventually(timeout(1.second)) {
       verify(checkpointerMock, times(1)).checkpoint(seqNum)
       verify(checkpointerMock, times(1)).checkpoint(otherSeqNum)
     }
@@ -108,7 +107,7 @@ class KinesisCheckpointerSuite extends TestSuiteBase
     kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock)
 
     clock.advance(checkpointInterval.milliseconds * 5)
-    eventually(timeout(1 second)) {
+    eventually(timeout(1.second)) {
       verify(checkpointerMock, atMost(1)).checkpoint(anyString())
     }
   }
@@ -129,7 +128,7 @@ class KinesisCheckpointerSuite extends TestSuiteBase
 
     kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock)
     clock.advance(checkpointInterval.milliseconds)
-    eventually(timeout(1 second)) {
+    eventually(timeout(1.second)) {
       verify(checkpointerMock, times(1)).checkpoint(anyString())
     }
     // don't block test thread
@@ -138,12 +137,12 @@ class KinesisCheckpointerSuite extends TestSuiteBase
 
     intercept[TimeoutException] {
       // scalastyle:off awaitready
-      Await.ready(f, 50 millis)
+      Await.ready(f, 50.milliseconds)
       // scalastyle:on awaitready
     }
 
     clock.advance(checkpointInterval.milliseconds / 2)
-    eventually(timeout(1 second)) {
+    eventually(timeout(1.second)) {
       verify(checkpointerMock, times(1)).checkpoint(anyString)
       verify(checkpointerMock, times(1)).checkpoint()
     }
diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
index 7531a9c..5269084 100644
--- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
+++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
@@ -24,7 +24,7 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions._
 import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
 import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason
 import com.amazonaws.services.kinesis.model.Record
-import org.mockito.ArgumentMatchers.{anyListOf, anyString, eq => meq}
+import org.mockito.ArgumentMatchers.{anyList, anyString, eq => meq}
 import org.mockito.Mockito.{never, times, verify, when}
 import org.scalatest.{BeforeAndAfter, Matchers}
 import org.scalatest.mockito.MockitoSugar
@@ -95,7 +95,7 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
     recordProcessor.processRecords(batch, checkpointerMock)
 
     verify(receiverMock, times(1)).isStopped()
-    verify(receiverMock, never).addRecords(anyString, anyListOf(classOf[Record]))
+    verify(receiverMock, never).addRecords(anyString, anyList())
     verify(receiverMock, never).setCheckpointer(anyString, meq(checkpointerMock))
   }
 
@@ -103,7 +103,7 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
     when(receiverMock.isStopped()).thenReturn(false)
     when(receiverMock.getCurrentLimit).thenReturn(Int.MaxValue)
     when(
-      receiverMock.addRecords(anyString, anyListOf(classOf[Record]))
+      receiverMock.addRecords(anyString, anyList())
     ).thenThrow(new RuntimeException())
 
     intercept[RuntimeException] {
diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
index 5733721..51ee7fd 100644
--- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
+++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark.streaming.kinesis
 
 import scala.collection.mutable
 import scala.concurrent.duration._
-import scala.language.postfixOps
 import scala.util.Random
 
 import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
@@ -198,10 +197,11 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
     ssc.start()
 
     val testData = 1 to 10
-    eventually(timeout(120 seconds), interval(10 second)) {
+    eventually(timeout(2.minutes), interval(10.seconds)) {
       testUtils.pushData(testData, aggregateTestData)
-      assert(collected.synchronized { collected === testData.toSet },
-        "\nData received does not match data sent")
+      collected.synchronized {
+        assert(collected === testData.toSet, "\nData received does not match data sent")
+      }
     }
     ssc.stop(stopSparkContext = false)
   }
@@ -217,7 +217,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
       .initialPosition(new Latest())
       .checkpointInterval(Seconds(10))
       .storageLevel(StorageLevel.MEMORY_ONLY)
-      .buildWithMessageHandler(addFive(_))
+      .buildWithMessageHandler(addFive)
 
     stream shouldBe a [ReceiverInputDStream[_]]
 
@@ -231,11 +231,12 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
     ssc.start()
 
     val testData = 1 to 10
-    eventually(timeout(120 seconds), interval(10 second)) {
+    eventually(timeout(2.minutes), interval(10.seconds)) {
       testUtils.pushData(testData, aggregateTestData)
       val modData = testData.map(_ + 5)
-      assert(collected.synchronized { collected === modData.toSet },
-        "\nData received does not match data sent")
+      collected.synchronized {
+        assert(collected === modData.toSet, "\nData received does not match data sent")
+      }
     }
     ssc.stop(stopSparkContext = false)
   }
@@ -316,10 +317,11 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
       val testData2 = 11 to 20
       val testData3 = 21 to 30
 
-      eventually(timeout(60 seconds), interval(10 second)) {
+      eventually(timeout(1.minute), interval(10.seconds)) {
         localTestUtils.pushData(testData1, aggregateTestData)
-        assert(collected.synchronized { collected === testData1.toSet },
-          "\nData received does not match data sent")
+        collected.synchronized {
+          assert(collected === testData1.toSet, "\nData received does not match data sent")
+        }
       }
 
       val shardToSplit = localTestUtils.getShards().head
@@ -332,10 +334,12 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
       assert(splitCloseShards.size == 1)
       assert(splitOpenShards.size == 2)
 
-      eventually(timeout(60 seconds), interval(10 second)) {
+      eventually(timeout(1.minute), interval(10.seconds)) {
         localTestUtils.pushData(testData2, aggregateTestData)
-        assert(collected.synchronized { collected === (testData1 ++ testData2).toSet },
-          "\nData received does not match data sent after splitting a shard")
+        collected.synchronized {
+          assert(collected === (testData1 ++ testData2).toSet,
+            "\nData received does not match data sent after splitting a shard")
+        }
       }
 
       val Seq(shardToMerge, adjShard) = splitOpenShards
@@ -348,10 +352,12 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
       assert(mergedCloseShards.size == 3)
       assert(mergedOpenShards.size == 1)
 
-      eventually(timeout(60 seconds), interval(10 second)) {
+      eventually(timeout(1.minute), interval(10.seconds)) {
         localTestUtils.pushData(testData3, aggregateTestData)
-        assert(collected.synchronized { collected === (testData1 ++ testData2 ++ testData3).toSet },
-          "\nData received does not match data sent after merging shards")
+        collected.synchronized {
+          assert(collected === (testData1 ++ testData2 ++ testData3).toSet,
+            "\nData received does not match data sent after merging shards")
+        }
       }
     } finally {
       ssc.stop(stopSparkContext = false)
@@ -399,7 +405,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
     // Run until there are at least 10 batches with some data in them
     // If this times out because numBatchesWithData is empty, then its likely that foreachRDD
     // function failed with exceptions, and nothing got added to `collectedData`
-    eventually(timeout(2 minutes), interval(1 seconds)) {
+    eventually(timeout(2.minutes), interval(1.second)) {
       testUtils.pushData(1 to 5, aggregateTestData)
       assert(isCheckpointPresent && numBatchesWithData > 10)
     }
diff --git a/mllib/src/test/scala/org/apache/spark/ml/MLEventsSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/MLEventsSuite.scala
index 4fe69b6..e2ee7c0 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/MLEventsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/MLEventsSuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark.ml
 
 import scala.collection.mutable
 import scala.concurrent.duration._
-import scala.language.postfixOps
 
 import org.apache.hadoop.fs.Path
 import org.mockito.ArgumentMatchers.{any, eq => meq}
@@ -142,7 +141,7 @@ class MLEventsSuite
 
     val expected = Seq(
       event0, event1, event2, event3, event4, event5, event6, event7, event8, event9)
-    eventually(timeout(10 seconds), interval(1 second)) {
+    eventually(timeout(10.seconds), interval(1.second)) {
       assert(events === expected)
     }
     // Test if they can be ser/de via JSON protocol.
@@ -200,7 +199,7 @@ class MLEventsSuite
     event7.output = output
 
     val expected = Seq(event0, event1, event2, event3, event4, event5, event6, event7)
-    eventually(timeout(10 seconds), interval(1 second)) {
+    eventually(timeout(10.seconds), interval(1.second)) {
       assert(events === expected)
     }
     // Test if they can be ser/de via JSON protocol.
@@ -221,7 +220,7 @@ class MLEventsSuite
       val pipelineWriter = newPipeline.write
       assert(events.isEmpty)
       pipelineWriter.save(path)
-      eventually(timeout(10 seconds), interval(1 second)) {
+      eventually(timeout(10.seconds), interval(1.second)) {
         events.foreach {
           case e: SaveInstanceStart if e.writer.isInstanceOf[DefaultParamsWriter] =>
             assert(e.path.endsWith("writableStage"))
@@ -245,18 +244,18 @@ class MLEventsSuite
       val pipelineReader = Pipeline.read
       assert(events.isEmpty)
       pipelineReader.load(path)
-      eventually(timeout(10 seconds), interval(1 second)) {
+      eventually(timeout(10.seconds), interval(1.second)) {
         events.foreach {
-          case e: LoadInstanceStart[PipelineStage]
-              if e.reader.isInstanceOf[DefaultParamsReader[PipelineStage]] =>
+          case e: LoadInstanceStart[_]
+              if e.reader.isInstanceOf[DefaultParamsReader[_]] =>
             assert(e.path.endsWith("writableStage"))
-          case e: LoadInstanceEnd[PipelineStage]
-              if e.reader.isInstanceOf[DefaultParamsReader[PipelineStage]] =>
+          case e: LoadInstanceEnd[_]
+              if e.reader.isInstanceOf[DefaultParamsReader[_]] =>
             assert(e.instance.isInstanceOf[PipelineStage])
-          case e: LoadInstanceStart[Pipeline] =>
+          case e: LoadInstanceStart[_] =>
             assert(e.reader === pipelineReader)
-          case e: LoadInstanceEnd[Pipeline] =>
-            assert(e.instance.uid === newPipeline.uid)
+          case e: LoadInstanceEnd[_] =>
+            assert(e.instance.asInstanceOf[Pipeline].uid === newPipeline.uid)
           case e => fail(s"Unexpected event thrown: $e")
         }
       }
@@ -280,7 +279,7 @@ class MLEventsSuite
       val pipelineWriter = pipelineModel.write
       assert(events.isEmpty)
       pipelineWriter.save(path)
-      eventually(timeout(10 seconds), interval(1 second)) {
+      eventually(timeout(10.seconds), interval(1.second)) {
         events.foreach {
           case e: SaveInstanceStart if e.writer.isInstanceOf[DefaultParamsWriter] =>
             assert(e.path.endsWith("writableStage"))
@@ -304,18 +303,18 @@ class MLEventsSuite
       val pipelineModelReader = PipelineModel.read
       assert(events.isEmpty)
       pipelineModelReader.load(path)
-      eventually(timeout(10 seconds), interval(1 second)) {
+      eventually(timeout(10.seconds), interval(1.second)) {
         events.foreach {
-          case e: LoadInstanceStart[PipelineStage]
-            if e.reader.isInstanceOf[DefaultParamsReader[PipelineStage]] =>
+          case e: LoadInstanceStart[_]
+            if e.reader.isInstanceOf[DefaultParamsReader[_]] =>
             assert(e.path.endsWith("writableStage"))
-          case e: LoadInstanceEnd[PipelineStage]
-            if e.reader.isInstanceOf[DefaultParamsReader[PipelineStage]] =>
+          case e: LoadInstanceEnd[_]
+            if e.reader.isInstanceOf[DefaultParamsReader[_]] =>
             assert(e.instance.isInstanceOf[PipelineStage])
-          case e: LoadInstanceStart[PipelineModel] =>
+          case e: LoadInstanceStart[_] =>
             assert(e.reader === pipelineModelReader)
-          case e: LoadInstanceEnd[PipelineModel] =>
-            assert(e.instance.uid === pipelineModel.uid)
+          case e: LoadInstanceEnd[_] =>
+            assert(e.instance.asInstanceOf[PipelineModel].uid === pipelineModel.uid)
           case e => fail(s"Unexpected event thrown: $e")
         }
       }
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
index b9aeb1c..384a5f4 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
@@ -24,10 +24,8 @@ import java.util.concurrent.TimeUnit
 
 import scala.collection.JavaConverters._
 import scala.concurrent.duration._
-import scala.language.postfixOps
 
 import com.google.common.io.Files
-import org.apache.commons.lang3.SerializationUtils
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.server.MiniYARNCluster
 import org.scalatest.{BeforeAndAfterAll, Matchers}
@@ -169,7 +167,7 @@ abstract class BaseYarnClusterSuite
 
     val handle = launcher.startApplication()
     try {
-      eventually(timeout(2 minutes), interval(1 second)) {
+      eventually(timeout(2.minutes), interval(1.second)) {
         assert(handle.getState().isFinal())
       }
     } finally {
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index bb75952..b072202 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -18,18 +18,14 @@
 package org.apache.spark.deploy.yarn
 
 import java.io.File
-import java.net.URL
 import java.nio.charset.StandardCharsets
 import java.util.{HashMap => JHashMap}
 
 import scala.collection.mutable
 import scala.concurrent.duration._
 import scala.io.Source
-import scala.language.postfixOps
 
 import com.google.common.io.{ByteStreams, Files}
-import org.apache.hadoop.HadoopIllegalArgumentException
-import org.apache.hadoop.yarn.api.ApplicationConstants
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.util.ConverterUtils
 import org.scalatest.Matchers
@@ -209,7 +205,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
       .startApplication()
 
     try {
-      eventually(timeout(30 seconds), interval(100 millis)) {
+      eventually(timeout(30.seconds), interval(100.milliseconds)) {
         handle.getState() should be (SparkAppHandle.State.RUNNING)
       }
 
@@ -217,7 +213,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
       handle.getAppId() should startWith ("application_")
       handle.stop()
 
-      eventually(timeout(30 seconds), interval(100 millis)) {
+      eventually(timeout(30.seconds), interval(100.milliseconds)) {
         handle.getState() should be (SparkAppHandle.State.KILLED)
       }
     } finally {
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
index 756e6d4..381a935 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
@@ -24,7 +24,6 @@ import java.util.EnumSet
 
 import scala.annotation.tailrec
 import scala.concurrent.duration._
-import scala.language.postfixOps
 
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.service.ServiceStateException
@@ -327,10 +326,10 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
 
     recoveryPath.toString should be (new Path(execStateFile2.getParentFile.toURI).toString)
     recoveryPath.toString should be (new Path(secretsFile2.getParentFile.toURI).toString)
-    eventually(timeout(10 seconds), interval(5 millis)) {
+    eventually(timeout(10.seconds), interval(5.milliseconds)) {
       assert(!execStateFile.exists())
     }
-    eventually(timeout(10 seconds), interval(5 millis)) {
+    eventually(timeout(10.seconds), interval(5.milliseconds)) {
       assert(!secretsFile.exists())
     }
 
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleBatchTable.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleBatchTable.java
index 9b0eb61..64663d5 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleBatchTable.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleBatchTable.java
@@ -17,17 +17,13 @@
 
 package test.org.apache.spark.sql.sources.v2;
 
-import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
 import org.apache.spark.sql.sources.v2.SupportsRead;
 import org.apache.spark.sql.sources.v2.Table;
 import org.apache.spark.sql.sources.v2.TableCapability;
-import org.apache.spark.sql.sources.v2.reader.*;
 import org.apache.spark.sql.types.StructType;
 
 abstract class JavaSimpleBatchTable implements Table, SupportsRead {
@@ -52,53 +48,3 @@ abstract class JavaSimpleBatchTable implements Table, SupportsRead {
   }
 }
 
-abstract class JavaSimpleScanBuilder implements ScanBuilder, Scan, Batch {
-
-  @Override
-  public Scan build() {
-    return this;
-  }
-
-  @Override
-  public Batch toBatch() {
-    return this;
-  }
-
-  @Override
-  public StructType readSchema() {
-    return new StructType().add("i", "int").add("j", "int");
-  }
-
-  @Override
-  public PartitionReaderFactory createReaderFactory() {
-    return new JavaSimpleReaderFactory();
-  }
-}
-
-class JavaSimpleReaderFactory implements PartitionReaderFactory {
-
-  @Override
-  public PartitionReader<InternalRow> createReader(InputPartition partition) {
-    JavaRangeInputPartition p = (JavaRangeInputPartition) partition;
-    return new PartitionReader<InternalRow>() {
-      private int current = p.start - 1;
-
-      @Override
-      public boolean next() throws IOException {
-        current += 1;
-        return current < p.end;
-      }
-
-      @Override
-      public InternalRow get() {
-        return new GenericInternalRow(new Object[] {current, -current});
-      }
-
-      @Override
-      public void close() throws IOException {
-
-      }
-    };
-  }
-}
-
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleBatchTable.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleReaderFactory.java
similarity index 51%
copy from sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleBatchTable.java
copy to sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleReaderFactory.java
index 9b0eb61..7402790 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleBatchTable.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleReaderFactory.java
@@ -17,63 +17,11 @@
 
 package test.org.apache.spark.sql.sources.v2;
 
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
-import org.apache.spark.sql.sources.v2.SupportsRead;
-import org.apache.spark.sql.sources.v2.Table;
-import org.apache.spark.sql.sources.v2.TableCapability;
-import org.apache.spark.sql.sources.v2.reader.*;
-import org.apache.spark.sql.types.StructType;
-
-abstract class JavaSimpleBatchTable implements Table, SupportsRead {
-  private static final Set<TableCapability> CAPABILITIES = new HashSet<>(Arrays.asList(
-      TableCapability.BATCH_READ,
-      TableCapability.BATCH_WRITE,
-      TableCapability.TRUNCATE));
-
-  @Override
-  public StructType schema() {
-    return new StructType().add("i", "int").add("j", "int");
-  }
-
-  @Override
-  public String name() {
-    return this.getClass().toString();
-  }
-
-  @Override
-  public Set<TableCapability> capabilities() {
-    return CAPABILITIES;
-  }
-}
-
-abstract class JavaSimpleScanBuilder implements ScanBuilder, Scan, Batch {
-
-  @Override
-  public Scan build() {
-    return this;
-  }
-
-  @Override
-  public Batch toBatch() {
-    return this;
-  }
-
-  @Override
-  public StructType readSchema() {
-    return new StructType().add("i", "int").add("j", "int");
-  }
-
-  @Override
-  public PartitionReaderFactory createReaderFactory() {
-    return new JavaSimpleReaderFactory();
-  }
-}
+import org.apache.spark.sql.sources.v2.reader.InputPartition;
+import org.apache.spark.sql.sources.v2.reader.PartitionReader;
+import org.apache.spark.sql.sources.v2.reader.PartitionReaderFactory;
 
 class JavaSimpleReaderFactory implements PartitionReaderFactory {
 
@@ -84,7 +32,7 @@ class JavaSimpleReaderFactory implements PartitionReaderFactory {
       private int current = p.start - 1;
 
       @Override
-      public boolean next() throws IOException {
+      public boolean next() {
         current += 1;
         return current < p.end;
       }
@@ -95,10 +43,8 @@ class JavaSimpleReaderFactory implements PartitionReaderFactory {
       }
 
       @Override
-      public void close() throws IOException {
-
+      public void close() {
       }
     };
   }
 }
-
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleScanBuilder.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleScanBuilder.java
new file mode 100644
index 0000000..217e669
--- /dev/null
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleScanBuilder.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.sql.sources.v2.reader.Batch;
+import org.apache.spark.sql.sources.v2.reader.PartitionReaderFactory;
+import org.apache.spark.sql.sources.v2.reader.Scan;
+import org.apache.spark.sql.sources.v2.reader.ScanBuilder;
+import org.apache.spark.sql.types.StructType;
+
+abstract class JavaSimpleScanBuilder implements ScanBuilder, Scan, Batch {
+
+  @Override
+  public Scan build() {
+    return this;
+  }
+
+  @Override
+  public Batch toBatch() {
+    return this;
+  }
+
+  @Override
+  public StructType readSchema() {
+    return new StructType().add("i", "int").add("j", "int");
+  }
+
+  @Override
+  public PartitionReaderFactory createReaderFactory() {
+    return new JavaSimpleReaderFactory();
+  }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 92157d8..76350ad 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql
 
 import scala.collection.mutable.HashSet
 import scala.concurrent.duration._
-import scala.language.postfixOps
 
 import org.apache.spark.CleanerListener
 import org.apache.spark.executor.DataReadMethod._
@@ -251,7 +250,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
     sql("UNCACHE TABLE testData")
     assert(!spark.catalog.isCached("testData"), "Table 'testData' should not be cached")
 
-    eventually(timeout(10 seconds)) {
+    eventually(timeout(10.seconds)) {
       assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
     }
   }
@@ -267,7 +266,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
         "Eagerly cached in-memory table should have already been materialized")
 
       uncacheTable("testCacheTable")
-      eventually(timeout(10 seconds)) {
+      eventually(timeout(10.seconds)) {
         assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
       }
     }
@@ -284,7 +283,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
         "Eagerly cached in-memory table should have already been materialized")
 
       uncacheTable("testCacheTable")
-      eventually(timeout(10 seconds)) {
+      eventually(timeout(10.seconds)) {
         assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
       }
     }
@@ -305,7 +304,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
       "Lazily cached in-memory table should have been materialized")
 
     uncacheTable("testData")
-    eventually(timeout(10 seconds)) {
+    eventually(timeout(10.seconds)) {
       assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
     }
   }
@@ -446,7 +445,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
 
     System.gc()
 
-    eventually(timeout(10 seconds)) {
+    eventually(timeout(10.seconds)) {
       assert(toBeCleanedAccIds.synchronized { toBeCleanedAccIds.isEmpty },
         "batchStats accumulators should be cleared after GC when uncacheTable")
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
index b3b2cdc..b828b23 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
@@ -136,7 +136,7 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits
     assertCached(df2)
 
     // udf has been evaluated during caching, and thus should not be re-evaluated here
-    failAfter(2 seconds) {
+    failAfter(2.seconds) {
       df2.collect()
     }
 
@@ -197,7 +197,7 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits
     val df4 = df1.groupBy('a).agg(sum('b)).agg(sum("sum(b)"))
     assertCached(df4)
     // reuse loaded cache
-    failAfter(3 seconds) {
+    failAfter(3.seconds) {
       checkDataset(df4, Row(10))
     }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
index a5ba4f9..22f8437 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
@@ -308,7 +308,7 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before
     val offsets = scala.collection.mutable.ListBuffer[Int]()
     val readerFactory = stream.createContinuousReaderFactory()
     import org.scalatest.time.SpanSugar._
-    failAfter(5 seconds) {
+    failAfter(5.seconds) {
       // inject rows, read and check the data and offsets
       for (i <- 0 until numRecords) {
         serverThread.enqueue(i.toString)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
index 3dd3210..2a1e7d6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
@@ -41,7 +41,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext {
       assert(coordinatorRef.getLocation(id) === None)
 
       coordinatorRef.reportActiveInstance(id, "hostX", "exec1")
-      eventually(timeout(5 seconds)) {
+      eventually(timeout(5.seconds)) {
         assert(coordinatorRef.verifyIfInstanceActive(id, "exec1"))
         assert(
           coordinatorRef.getLocation(id) ===
@@ -50,7 +50,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext {
 
       coordinatorRef.reportActiveInstance(id, "hostX", "exec2")
 
-      eventually(timeout(5 seconds)) {
+      eventually(timeout(5.seconds)) {
         assert(coordinatorRef.verifyIfInstanceActive(id, "exec1") === false)
         assert(coordinatorRef.verifyIfInstanceActive(id, "exec2"))
 
@@ -75,7 +75,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext {
       coordinatorRef.reportActiveInstance(id2, host, exec)
       coordinatorRef.reportActiveInstance(id3, host, exec)
 
-      eventually(timeout(5 seconds)) {
+      eventually(timeout(5.seconds)) {
         assert(coordinatorRef.verifyIfInstanceActive(id1, exec))
         assert(coordinatorRef.verifyIfInstanceActive(id2, exec))
         assert(coordinatorRef.verifyIfInstanceActive(id3, exec))
@@ -107,7 +107,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext {
 
       coordRef1.reportActiveInstance(id, "hostX", "exec1")
 
-      eventually(timeout(5 seconds)) {
+      eventually(timeout(5.seconds)) {
         assert(coordRef2.verifyIfInstanceActive(id, "exec1"))
         assert(
           coordRef2.getLocation(id) ===
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 12bc68c..af4369d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -416,7 +416,7 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
       }
     }
 
-    val timeoutDuration = 60 seconds
+    val timeoutDuration = 1.minute
 
     quietly {
       withSpark(new SparkContext(conf)) { sc =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
index 13b8866..b421809 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
@@ -93,7 +93,7 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
       testAwaitAnyTermination(ExpectBlocked)
 
       // Stop a query asynchronously and see if it is reported through awaitAnyTermination
-      val q1 = stopRandomQueryAsync(stopAfter = 100 milliseconds, withError = false)
+      val q1 = stopRandomQueryAsync(stopAfter = 100.milliseconds, withError = false)
       testAwaitAnyTermination(ExpectNotBlocked)
       require(!q1.isActive) // should be inactive by the time the prev awaitAnyTerm returned
 
@@ -106,7 +106,7 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
 
       // Terminate a query asynchronously with exception and see awaitAnyTermination throws
       // the exception
-      val q2 = stopRandomQueryAsync(100 milliseconds, withError = true)
+      val q2 = stopRandomQueryAsync(100.milliseconds, withError = true)
       testAwaitAnyTermination(ExpectException[SparkException])
       require(!q2.isActive) // should be inactive by the time the prev awaitAnyTerm returned
 
@@ -119,10 +119,10 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
 
       // Terminate multiple queries, one with failure and see whether awaitAnyTermination throws
       // the exception
-      val q3 = stopRandomQueryAsync(10 milliseconds, withError = false)
+      val q3 = stopRandomQueryAsync(10.milliseconds, withError = false)
       testAwaitAnyTermination(ExpectNotBlocked)
       require(!q3.isActive)
-      val q4 = stopRandomQueryAsync(10 milliseconds, withError = true)
+      val q4 = stopRandomQueryAsync(10.milliseconds, withError = true)
       eventually(Timeout(streamingTimeout)) { require(!q4.isActive) }
       // After q4 terminates with exception, awaitAnyTerm should start throwing exception
       testAwaitAnyTermination(ExpectException[SparkException])
@@ -138,81 +138,81 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
       // awaitAnyTermination should be blocking or non-blocking depending on timeout values
       testAwaitAnyTermination(
         ExpectBlocked,
-        awaitTimeout = 4 seconds,
+        awaitTimeout = 4.seconds,
         expectedReturnedValue = false,
-        testBehaviorFor = 2 seconds)
+        testBehaviorFor = 2.seconds)
 
       testAwaitAnyTermination(
         ExpectNotBlocked,
-        awaitTimeout = 50 milliseconds,
+        awaitTimeout = 50.milliseconds,
         expectedReturnedValue = false,
-        testBehaviorFor = 1 second)
+        testBehaviorFor = 1.second)
 
       // Stop a query asynchronously within timeout and awaitAnyTerm should be unblocked
-      val q1 = stopRandomQueryAsync(stopAfter = 100 milliseconds, withError = false)
+      val q1 = stopRandomQueryAsync(stopAfter = 100.milliseconds, withError = false)
       testAwaitAnyTermination(
         ExpectNotBlocked,
-        awaitTimeout = 2 seconds,
+        awaitTimeout = 2.seconds,
         expectedReturnedValue = true,
-        testBehaviorFor = 4 seconds)
+        testBehaviorFor = 4.seconds)
       require(!q1.isActive) // should be inactive by the time the prev awaitAnyTerm returned
 
       // All subsequent calls to awaitAnyTermination should be non-blocking even if timeout is high
       testAwaitAnyTermination(
-        ExpectNotBlocked, awaitTimeout = 4 seconds, expectedReturnedValue = true)
+        ExpectNotBlocked, awaitTimeout = 4.seconds, expectedReturnedValue = true)
 
       // Resetting termination should make awaitAnyTermination() blocking again
       spark.streams.resetTerminated()
       testAwaitAnyTermination(
         ExpectBlocked,
-        awaitTimeout = 4 seconds,
+        awaitTimeout = 4.seconds,
         expectedReturnedValue = false,
-        testBehaviorFor = 1 second)
+        testBehaviorFor = 1.second)
 
       // Terminate a query asynchronously with exception within timeout, awaitAnyTermination should
       // throws the exception
-      val q2 = stopRandomQueryAsync(100 milliseconds, withError = true)
+      val q2 = stopRandomQueryAsync(100.milliseconds, withError = true)
       testAwaitAnyTermination(
         ExpectException[SparkException],
-        awaitTimeout = 4 seconds,
-        testBehaviorFor = 6 seconds)
+        awaitTimeout = 4.seconds,
+        testBehaviorFor = 6.seconds)
       require(!q2.isActive) // should be inactive by the time the prev awaitAnyTerm returned
 
       // All subsequent calls to awaitAnyTermination should throw the exception
       testAwaitAnyTermination(
         ExpectException[SparkException],
-        awaitTimeout = 2 seconds,
-        testBehaviorFor = 4 seconds)
+        awaitTimeout = 2.seconds,
+        testBehaviorFor = 4.seconds)
 
       // Terminate a query asynchronously outside the timeout, awaitAnyTerm should be blocked
       spark.streams.resetTerminated()
-      val q3 = stopRandomQueryAsync(2 seconds, withError = true)
+      val q3 = stopRandomQueryAsync(2.seconds, withError = true)
       testAwaitAnyTermination(
         ExpectNotBlocked,
-        awaitTimeout = 100 milliseconds,
+        awaitTimeout = 100.milliseconds,
         expectedReturnedValue = false,
-        testBehaviorFor = 4 seconds)
+        testBehaviorFor = 4.seconds)
 
       // After that query is stopped, awaitAnyTerm should throw exception
       eventually(Timeout(streamingTimeout)) { require(!q3.isActive) } // wait for query to stop
       testAwaitAnyTermination(
         ExpectException[SparkException],
-        awaitTimeout = 100 milliseconds,
-        testBehaviorFor = 4 seconds)
+        awaitTimeout = 100.milliseconds,
+        testBehaviorFor = 4.seconds)
 
 
       // Terminate multiple queries, one with failure and see whether awaitAnyTermination throws
       // the exception
       spark.streams.resetTerminated()
 
-      val q4 = stopRandomQueryAsync(10 milliseconds, withError = false)
+      val q4 = stopRandomQueryAsync(10.milliseconds, withError = false)
       testAwaitAnyTermination(
-        ExpectNotBlocked, awaitTimeout = 2 seconds, expectedReturnedValue = true)
+        ExpectNotBlocked, awaitTimeout = 2.seconds, expectedReturnedValue = true)
       require(!q4.isActive)
-      val q5 = stopRandomQueryAsync(10 milliseconds, withError = true)
+      val q5 = stopRandomQueryAsync(10.milliseconds, withError = true)
       eventually(Timeout(streamingTimeout)) { require(!q5.isActive) }
       // After q5 terminates with exception, awaitAnyTerm should start throwing exception
-      testAwaitAnyTermination(ExpectException[SparkException], awaitTimeout = 2 seconds)
+      testAwaitAnyTermination(ExpectException[SparkException], awaitTimeout = 2.seconds)
     }
   }
 
@@ -276,7 +276,7 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
       expectedBehavior: ExpectedBehavior,
       expectedReturnedValue: Boolean = false,
       awaitTimeout: Span = null,
-      testBehaviorFor: Span = 4 seconds
+      testBehaviorFor: Span = 4.seconds
     ): Unit = {
 
     def awaitTermFunc(): Unit = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
index 2f460b0..e784d31 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.streaming
 import java.util.UUID
 
 import scala.collection.JavaConverters._
-import scala.language.postfixOps
 
 import org.json4s._
 import org.json4s.jackson.JsonMethods._
@@ -203,7 +202,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
         val progress = query.lastProgress
         assert(progress.stateOperators.length > 0)
         // Should emit new progresses every 10 ms, but we could be facing a slow Jenkins
-        eventually(timeout(1 minute)) {
+        eventually(timeout(1.minute)) {
           val nextProgress = query.lastProgress
           assert(nextProgress.timestamp !== progress.timestamp)
           assert(nextProgress.numInputRows === 0)
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala
index fef18f1..47cf4f10 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala
@@ -86,12 +86,12 @@ class UISeleniumSuite
 
       queries.foreach(statement.execute)
 
-      eventually(timeout(10 seconds), interval(50 milliseconds)) {
+      eventually(timeout(10.seconds), interval(50.milliseconds)) {
         go to baseURL
         find(cssSelector("""ul li a[href*="sql"]""")) should not be None
       }
 
-      eventually(timeout(10 seconds), interval(50 milliseconds)) {
+      eventually(timeout(10.seconds), interval(50.milliseconds)) {
         go to (baseURL + "/sql")
         find(id("sessionstat")) should not be None
         find(id("sqlstat")) should not be None
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index 80c0795..f8ddabd 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -19,7 +19,7 @@ package org.apache.spark.streaming.receiver
 
 import scala.concurrent.{ExecutionContext, Future}
 import scala.concurrent.duration._
-import scala.language.{existentials, postfixOps}
+import scala.language.existentials
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index f0161e1..21f3bbe 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -25,7 +25,6 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.parallel.ExecutionContextTaskSupport
 import scala.concurrent.{Await, ExecutionContext, Future}
-import scala.language.postfixOps
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
@@ -142,7 +141,7 @@ private[streaming] class FileBasedWriteAheadLog(
       CompletionIterator[ByteBuffer, Iterator[ByteBuffer]](reader, () => reader.close())
     }
     if (!closeFileAfterWrite) {
-      logFilesToRead.iterator.map(readFile).flatten.asJava
+      logFilesToRead.iterator.flatMap(readFile).asJava
     } else {
       // For performance gains, it makes sense to parallelize the recovery if
       // closeFileAfterWrite = true
@@ -190,7 +189,7 @@ private[streaming] class FileBasedWriteAheadLog(
           if (waitForCompletion) {
             import scala.concurrent.duration._
             // scalastyle:off awaitready
-            Await.ready(f, 1 second)
+            Await.ready(f, 1.second)
             // scalastyle:on awaitready
           }
         } catch {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 19da181..55fdd4c 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -162,12 +162,12 @@ trait DStreamCheckpointTester { self: SparkFunSuite =>
 
       val outputStream = getTestOutputStream[V](ssc.graph.getOutputStreams())
 
-      eventually(timeout(10 seconds)) {
+      eventually(timeout(10.seconds)) {
         ssc.awaitTerminationOrTimeout(10)
         assert(batchCounter.getLastCompletedBatchTime === targetBatchTime)
       }
 
-      eventually(timeout(10 seconds)) {
+      eventually(timeout(10.seconds)) {
         val checkpointFilesOfLatestTime = Checkpoint.getCheckpointFiles(checkpointDir).filter {
           _.getName.contains(clock.getTimeMillis.toString)
         }
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index f0960d0..1506061 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -22,7 +22,6 @@ import java.nio.ByteBuffer
 
 import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.duration._
-import scala.language.postfixOps
 import scala.reflect.ClassTag
 
 import org.apache.hadoop.conf.Configuration
@@ -199,7 +198,7 @@ abstract class BaseReceivedBlockHandlerSuite(enableEncryption: Boolean)
 
       val cleanupThreshTime = 3000L
       handler.cleanupOldBlocks(cleanupThreshTime)
-      eventually(timeout(10000 millis), interval(10 millis)) {
+      eventually(timeout(10.seconds), interval(10.milliseconds)) {
         getWriteAheadLogFiles().size should be < preCleanupLogFiles.size
       }
     }
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index 8800f1c..0b15f00 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer
 
 import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.duration._
-import scala.language.{implicitConversions, postfixOps}
+import scala.language.implicitConversions
 import scala.util.Random
 
 import org.apache.hadoop.conf.Configuration
@@ -276,7 +276,7 @@ class ReceivedBlockTrackerSuite
     getWrittenLogData(getWriteAheadLogFiles().last) should contain(createBatchCleanup(batchTime1))
 
     // Verify that at least one log file gets deleted
-    eventually(timeout(10 seconds), interval(10 millisecond)) {
+    eventually(timeout(10.seconds), interval(10.millisecond)) {
       getWriteAheadLogFiles() should not contain oldestLogFile
     }
     printLogFiles("After clean")
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
index 130e981..48aa9e5 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
@@ -64,7 +64,7 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable {
 
     // Verify that the receiver
     intercept[Exception] {
-      failAfter(200 millis) {
+      failAfter(200.milliseconds) {
         executingThread.join()
       }
     }
@@ -78,7 +78,7 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable {
     assert(receiver.isStarted)
     assert(!receiver.isStopped())
     assert(receiver.otherThread.isAlive)
-    eventually(timeout(100 millis), interval(10 millis)) {
+    eventually(timeout(100.milliseconds), interval(10.milliseconds)) {
       assert(receiver.receiving)
     }
 
@@ -107,12 +107,12 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable {
 
     // Verify restarting actually stops and starts the receiver
     receiver.restart("restarting", null, 600)
-    eventually(timeout(300 millis), interval(10 millis)) {
+    eventually(timeout(300.milliseconds), interval(10.milliseconds)) {
       // receiver will be stopped async
       assert(receiver.isStopped)
       assert(receiver.onStopCalled)
     }
-    eventually(timeout(1000 millis), interval(10 millis)) {
+    eventually(timeout(1.second), interval(10.milliseconds)) {
       // receiver will be started async
       assert(receiver.onStartCalled)
       assert(executor.isReceiverStarted)
@@ -122,7 +122,7 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable {
     }
 
     // Verify that stopping actually stops the thread
-    failAfter(100 millis) {
+    failAfter(100.milliseconds) {
       receiver.stop("test")
       assert(receiver.isStopped)
       assert(!receiver.otherThread.isAlive)
@@ -159,7 +159,7 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable {
 
     val recordedBlocks = blockGeneratorListener.arrayBuffers
     val recordedData = recordedBlocks.flatten
-    assert(blockGeneratorListener.arrayBuffers.size > 0, "No blocks received")
+    assert(blockGeneratorListener.arrayBuffers.nonEmpty, "No blocks received")
     assert(recordedData.toSet === generatedData.toSet, "Received data not same")
 
     // recordedData size should be close to the expected rate; use an error margin proportional to
@@ -245,15 +245,15 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable {
 
       // Run until sufficient WAL files have been generated and
       // the first WAL files has been deleted
-      eventually(timeout(20 seconds), interval(batchDuration.milliseconds millis)) {
+      eventually(timeout(20.seconds), interval(batchDuration.milliseconds.millis)) {
         val (logFiles1, logFiles2) = getBothCurrentLogFiles()
         allLogFiles1 ++= logFiles1
         allLogFiles2 ++= logFiles2
-        if (allLogFiles1.size > 0) {
-          assert(!logFiles1.contains(allLogFiles1.toSeq.sorted.head))
+        if (allLogFiles1.nonEmpty) {
+          assert(!logFiles1.contains(allLogFiles1.toSeq.min))
         }
-        if (allLogFiles2.size > 0) {
-          assert(!logFiles2.contains(allLogFiles2.toSeq.sorted.head))
+        if (allLogFiles2.nonEmpty) {
+          assert(!logFiles2.contains(allLogFiles2.toSeq.min))
         }
         assert(allLogFiles1.size >= 7)
         assert(allLogFiles2.size >= 7)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 8d07210..5cda6f9 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -211,7 +211,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
     // Local props set after start should be ignored
     ssc.sc.setLocalProperty("customPropKey", "value2")
 
-    eventually(timeout(10 seconds), interval(10 milliseconds)) {
+    eventually(timeout(10.seconds), interval(10.milliseconds)) {
       assert(allFound)
     }
 
@@ -342,7 +342,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
     input.foreachRDD(_ => {})
     ssc.start()
     // Call `ssc.stop` at once so that it's possible that the receiver will miss "StopReceiver"
-    failAfter(30000 millis) {
+    failAfter(30.seconds) {
       ssc.stop(stopSparkContext = true, stopGracefully = true)
     }
   }
@@ -398,18 +398,18 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
     inputStream.map(x => x).register()
 
     // test whether start() blocks indefinitely or not
-    failAfter(2000 millis) {
+    failAfter(2.seconds) {
       ssc.start()
     }
 
     // test whether awaitTermination() exits after give amount of time
-    failAfter(1000 millis) {
+    failAfter(1.second) {
       ssc.awaitTerminationOrTimeout(500)
     }
 
     // test whether awaitTermination() does not exit if not time is given
     val exception = intercept[Exception] {
-      failAfter(1000 millis) {
+      failAfter(1.second) {
         ssc.awaitTermination()
         throw new Exception("Did not wait for stop")
       }
@@ -418,7 +418,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
 
     var t: Thread = null
     // test whether wait exits if context is stopped
-    failAfter(10000 millis) { // 10 seconds because spark takes a long time to shutdown
+    failAfter(10.seconds) { // 10 seconds because spark takes a long time to shutdown
       t = new Thread() {
         override def run() {
           Thread.sleep(500)
@@ -439,7 +439,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
     val inputStream = addInputStream(ssc)
     inputStream.map(x => x).register()
 
-    failAfter(10000 millis) {
+    failAfter(10.seconds) {
       ssc.start()
       ssc.stop()
       ssc.awaitTermination()
@@ -479,13 +479,13 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
     ssc.start()
 
     // test whether awaitTerminationOrTimeout() return false after give amount of time
-    failAfter(1000 millis) {
+    failAfter(1.second) {
       assert(ssc.awaitTerminationOrTimeout(500) === false)
     }
 
     var t: Thread = null
     // test whether awaitTerminationOrTimeout() return true if context is stopped
-    failAfter(10000 millis) { // 10 seconds because spark takes a long time to shutdown
+    failAfter(10.seconds) { // 10 seconds because spark takes a long time to shutdown
       t = new Thread() {
         override def run() {
           Thread.sleep(500)
@@ -528,7 +528,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
 
     // getOrCreate should create new context with empty path
     testGetOrCreate {
-      ssc = StreamingContext.getOrCreate(emptyPath, creatingFunction _)
+      ssc = StreamingContext.getOrCreate(emptyPath, () => creatingFunction())
       assert(ssc != null, "no context created")
       assert(newContextCreated, "new context not created")
     }
@@ -537,19 +537,19 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
 
     // getOrCreate should throw exception with fake checkpoint file and createOnError = false
     intercept[Exception] {
-      ssc = StreamingContext.getOrCreate(corruptedCheckpointPath, creatingFunction _)
+      ssc = StreamingContext.getOrCreate(corruptedCheckpointPath, () => creatingFunction())
     }
 
     // getOrCreate should throw exception with fake checkpoint file
     intercept[Exception] {
       ssc = StreamingContext.getOrCreate(
-        corruptedCheckpointPath, creatingFunction _, createOnError = false)
+        corruptedCheckpointPath, () => creatingFunction(), createOnError = false)
     }
 
     // getOrCreate should create new context with fake checkpoint file and createOnError = true
     testGetOrCreate {
       ssc = StreamingContext.getOrCreate(
-        corruptedCheckpointPath, creatingFunction _, createOnError = true)
+        corruptedCheckpointPath, () => creatingFunction(), createOnError = true)
       assert(ssc != null, "no context created")
       assert(newContextCreated, "new context not created")
     }
@@ -558,7 +558,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
 
     // getOrCreate should recover context with checkpoint path, and recover old configuration
     testGetOrCreate {
-      ssc = StreamingContext.getOrCreate(checkpointPath, creatingFunction _)
+      ssc = StreamingContext.getOrCreate(checkpointPath, () => creatingFunction())
       assert(ssc != null, "no context created")
       assert(!newContextCreated, "old context not recovered")
       assert(ssc.conf.get("someKey") === "someValue", "checkpointed config not recovered")
@@ -567,7 +567,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
     // getOrCreate should recover StreamingContext with existing SparkContext
     testGetOrCreate {
       sc = new SparkContext(conf)
-      ssc = StreamingContext.getOrCreate(checkpointPath, creatingFunction _)
+      ssc = StreamingContext.getOrCreate(checkpointPath, () => creatingFunction())
       assert(ssc != null, "no context created")
       assert(!newContextCreated, "old context not recovered")
       assert(!ssc.conf.contains("someKey"), "checkpointed config unexpectedly recovered")
@@ -669,41 +669,41 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
         conf.clone.set("spark.streaming.clock", "org.apache.spark.util.ManualClock"), batchDuration)
       addInputStream(ssc).register()
       ssc.start()
-      val returnedSsc = StreamingContext.getActiveOrCreate(checkpointPath, creatingFunction _)
+      val returnedSsc = StreamingContext.getActiveOrCreate(checkpointPath, () => creatingFunction())
       assert(!newContextCreated, "new context created instead of returning")
       assert(returnedSsc.eq(ssc), "returned context is not the activated context")
     }
 
     // getActiveOrCreate should create new context with empty path
     testGetActiveOrCreate {
-      ssc = StreamingContext.getActiveOrCreate(emptyPath, creatingFunction _)
+      ssc = StreamingContext.getActiveOrCreate(emptyPath, () => creatingFunction())
       assert(ssc != null, "no context created")
       assert(newContextCreated, "new context not created")
     }
 
     // getActiveOrCreate should throw exception with fake checkpoint file and createOnError = false
     intercept[Exception] {
-      ssc = StreamingContext.getOrCreate(corruptedCheckpointPath, creatingFunction _)
+      ssc = StreamingContext.getOrCreate(corruptedCheckpointPath, () => creatingFunction())
     }
 
     // getActiveOrCreate should throw exception with fake checkpoint file
     intercept[Exception] {
       ssc = StreamingContext.getActiveOrCreate(
-        corruptedCheckpointPath, creatingFunction _, createOnError = false)
+        corruptedCheckpointPath, () => creatingFunction(), createOnError = false)
     }
 
     // getActiveOrCreate should create new context with fake
     // checkpoint file and createOnError = true
     testGetActiveOrCreate {
       ssc = StreamingContext.getActiveOrCreate(
-        corruptedCheckpointPath, creatingFunction _, createOnError = true)
+        corruptedCheckpointPath, () => creatingFunction(), createOnError = true)
       assert(ssc != null, "no context created")
       assert(newContextCreated, "new context not created")
     }
 
     // getActiveOrCreate should recover context with checkpoint path, and recover old configuration
     testGetActiveOrCreate {
-      ssc = StreamingContext.getActiveOrCreate(checkpointPath, creatingFunction _)
+      ssc = StreamingContext.getActiveOrCreate(checkpointPath, () => creatingFunction())
       assert(ssc != null, "no context created")
       assert(!newContextCreated, "old context not recovered")
       assert(ssc.conf.get("someKey") === "someValue")
@@ -781,14 +781,14 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
       _ssc.queueStream[Int](Queue(rdd)).register()
       _ssc
     }
-    ssc = StreamingContext.getOrCreate(checkpointDirectory, creatingFunction _)
+    ssc = StreamingContext.getOrCreate(checkpointDirectory, () => creatingFunction())
     ssc.start()
-    eventually(timeout(10000 millis)) {
+    eventually(timeout(10.seconds)) {
       assert(Checkpoint.getCheckpointFiles(checkpointDirectory).size > 1)
     }
     ssc.stop()
     val e = intercept[SparkException] {
-      ssc = StreamingContext.getOrCreate(checkpointDirectory, creatingFunction _)
+      ssc = StreamingContext.getOrCreate(checkpointDirectory, () => creatingFunction())
     }
     // StreamingContext.validate changes the message, so use "contains" here
     assert(e.getCause.getMessage.contains("queueStream doesn't support checkpointing. " +
@@ -855,7 +855,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
     ssc.textFileStream(testDirectory).foreachRDD { rdd => rdd.count() }
     ssc.start()
     try {
-      eventually(timeout(30000 millis)) {
+      eventually(timeout(30.seconds)) {
         assert(Checkpoint.getCheckpointFiles(checkpointDirectory).size > 1)
       }
     } finally {
@@ -967,7 +967,7 @@ package object testPackage extends Assertions {
       }
       ssc.start()
 
-      eventually(timeout(10000 millis), interval(10 millis)) {
+      eventually(timeout(10.seconds), interval(10.milliseconds)) {
         assert(rddGenerated && rddCreationSiteCorrect, "RDD creation site was not correct")
         assert(rddGenerated && foreachCallSiteCorrect, "Call site in foreachRDD was not correct")
       }
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index 0f957a1..62fd433 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -130,7 +130,7 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
 
     ssc.start()
     try {
-      eventually(timeout(30 seconds), interval(20 millis)) {
+      eventually(timeout(30.seconds), interval(20.milliseconds)) {
         collector.startedReceiverStreamIds.size should equal (1)
         collector.startedReceiverStreamIds.peek() should equal (0)
         collector.stoppedReceiverStreamIds.size should equal (1)
@@ -157,7 +157,7 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
 
     ssc.start()
     try {
-      eventually(timeout(30 seconds), interval(20 millis)) {
+      eventually(timeout(30.seconds), interval(20.milliseconds)) {
         collector.startedOutputOperationIds.asScala.take(3) should be (Seq(0, 1, 2))
         collector.completedOutputOperationIds.asScala.take(3) should be (Seq(0, 1, 2))
       }
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
index 29e4513..483a751 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
@@ -97,12 +97,12 @@ class UISeleniumSuite
 
       val sparkUI = ssc.sparkContext.ui.get
 
-      eventually(timeout(10 seconds), interval(50 milliseconds)) {
+      eventually(timeout(10.seconds), interval(50.milliseconds)) {
         go to (sparkUI.webUrl.stripSuffix("/"))
         find(cssSelector( """ul li a[href*="streaming"]""")) should not be (None)
       }
 
-      eventually(timeout(10 seconds), interval(50 milliseconds)) {
+      eventually(timeout(10.seconds), interval(50.milliseconds)) {
         // check whether streaming page exists
         go to (sparkUI.webUrl.stripSuffix("/") + "/streaming")
         val h3Text = findAll(cssSelector("h3")).map(_.text).toSeq
@@ -196,12 +196,12 @@ class UISeleniumSuite
 
       ssc.stop(false)
 
-      eventually(timeout(10 seconds), interval(50 milliseconds)) {
+      eventually(timeout(10.seconds), interval(50.milliseconds)) {
         go to (sparkUI.webUrl.stripSuffix("/"))
         find(cssSelector( """ul li a[href*="streaming"]""")) should be(None)
       }
 
-      eventually(timeout(10 seconds), interval(50 milliseconds)) {
+      eventually(timeout(10.seconds), interval(50.milliseconds)) {
         go to (sparkUI.webUrl.stripSuffix("/") + "/streaming")
         val h3Text = findAll(cssSelector("h3")).map(_.text).toSeq
         h3Text should not contain("Streaming Statistics")
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala
index 7b839ae..4c0dd0d 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala
@@ -84,7 +84,7 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter with TimeLim
     }
     clock.advance(blockIntervalMs)  // advance clock to generate blocks
     withClue("blocks not generated or pushed") {
-      eventually(timeout(1 second)) {
+      eventually(timeout(1.second)) {
         assert(listener.onGenerateBlockCalled)
         assert(listener.onPushBlockCalled)
       }
@@ -100,7 +100,7 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter with TimeLim
     listener.addedData.asScala.toSeq should contain theSameElementsInOrderAs (data2)
     listener.addedMetadata.asScala.toSeq should contain theSameElementsInOrderAs (metadata2)
     clock.advance(blockIntervalMs)  // advance clock to generate blocks
-    eventually(timeout(1 second)) {
+    eventually(timeout(1.second)) {
       val combined = data1 ++ data2
       listener.pushedData.asScala.toSeq should contain theSameElementsInOrderAs combined
     }
@@ -112,7 +112,7 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter with TimeLim
     val combinedMetadata = metadata2 :+ metadata3
     listener.addedMetadata.asScala.toSeq should contain theSameElementsInOrderAs (combinedMetadata)
     clock.advance(blockIntervalMs)  // advance clock to generate blocks
-    eventually(timeout(1 second)) {
+    eventually(timeout(1.second)) {
       val combinedData = data1 ++ data2 ++ data3
       listener.pushedData.asScala.toSeq should contain theSameElementsInOrderAs (combinedData)
     }
@@ -120,7 +120,7 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter with TimeLim
     // Stop the block generator by starting the stop on a different thread and
     // then advancing the manual clock for the stopping to proceed.
     val thread = stopBlockGenerator(blockGenerator)
-    eventually(timeout(1 second), interval(10 milliseconds)) {
+    eventually(timeout(1.second), interval(10.milliseconds)) {
       clock.advance(blockIntervalMs)
       assert(blockGenerator.isStopped())
     }
@@ -160,7 +160,7 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter with TimeLim
     // - Finally, wait for all blocks to be pushed
     clock.advance(1) // to make sure that the timer for another interval to complete
     val thread = stopBlockGenerator(blockGenerator)
-    eventually(timeout(1 second), interval(10 milliseconds)) {
+    eventually(timeout(1.second), interval(10.milliseconds)) {
       assert(blockGenerator.isActive() === false)
     }
     assert(blockGenerator.isStopped() === false)
@@ -181,7 +181,7 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter with TimeLim
     // (expected as stop() should never complete) or a SparkException (unexpected as stop()
     // completed and thread terminated).
     val exception = intercept[Exception] {
-      failAfter(200 milliseconds) {
+      failAfter(200.milliseconds) {
         thread.join()
         throw new SparkException(
           "BlockGenerator.stop() completed before generating timer was stopped")
@@ -193,7 +193,7 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter with TimeLim
     // Verify that the final data is present in the final generated block and
     // pushed before complete stop
     assert(blockGenerator.isStopped() === false) // generator has not stopped yet
-    eventually(timeout(10 seconds), interval(10 milliseconds)) {
+    eventually(timeout(10.seconds), interval(10.milliseconds)) {
       // Keep calling `advance` to avoid blocking forever in `clock.waitTillTime`
       clock.advance(blockIntervalMs)
       assert(thread.isAlive === false)
@@ -213,7 +213,7 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter with TimeLim
     blockGenerator.start()
     assert(listener.onErrorCalled === false)
     blockGenerator.addData(1)
-    eventually(timeout(1 second), interval(10 milliseconds)) {
+    eventually(timeout(1.second), interval(10.milliseconds)) {
       assert(listener.onErrorCalled)
     }
     blockGenerator.stop()
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
index 22d027d..d072b99 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
@@ -61,7 +61,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite
         val expectedWaitTime = clock.getTimeMillis() + advancedTime
         clock.advance(advancedTime)
         // Make sure ExecutorAllocationManager.manageAllocation is called
-        eventually(timeout(10 seconds)) {
+        eventually(timeout(10.seconds)) {
           assert(clock.isStreamWaitingAt(expectedWaitTime))
         }
         body
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala
index 5f7f7fa..f0e5027 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala
@@ -20,7 +20,6 @@ package org.apache.spark.streaming.scheduler
 import java.util.concurrent.CountDownLatch
 
 import scala.concurrent.duration._
-import scala.language.postfixOps
 
 import org.scalatest.concurrent.Eventually._
 
@@ -69,10 +68,10 @@ class JobGeneratorSuite extends TestSuiteBase {
       val longBatchNumber = 3 // 3rd batch will take a long time
       val longBatchTime = longBatchNumber * batchDuration.milliseconds
 
-      val testTimeout = timeout(10 seconds)
+      val testTimeout = timeout(10.seconds)
       val inputStream = ssc.receiverStream(new TestReceiver)
 
-      inputStream.foreachRDD((rdd: RDD[Int], time: Time) => {
+      inputStream.foreachRDD((_: RDD[Int], time: Time) => {
         if (time.milliseconds == longBatchTime) {
           while (waitLatch.getCount() > 0) {
             waitLatch.await()
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
index c206d31..fec20f0 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
@@ -41,7 +41,7 @@ class ReceiverTrackerSuite extends TestSuiteBase {
       try {
         // we wait until the Receiver has registered with the tracker,
         // otherwise our rate update is lost
-        eventually(timeout(5 seconds)) {
+        eventually(timeout(5.seconds)) {
           assert(RateTestReceiver.getActive().nonEmpty)
         }
 
@@ -49,7 +49,7 @@ class ReceiverTrackerSuite extends TestSuiteBase {
         // Verify that the rate of the block generator in the receiver get updated
         val activeReceiver = RateTestReceiver.getActive().get
         tracker.sendRateUpdate(inputDStream.id, newRateLimit)
-        eventually(timeout(5 seconds)) {
+        eventually(timeout(5.seconds)) {
           assert(activeReceiver.getDefaultBlockGeneratorRateLimit() === newRateLimit,
             "default block generator did not receive rate update")
           assert(activeReceiver.getCustomBlockGeneratorRateLimit() === newRateLimit,
@@ -76,7 +76,7 @@ class ReceiverTrackerSuite extends TestSuiteBase {
       output.register()
       ssc.start()
       StoppableReceiver.shouldStop = true
-      eventually(timeout(10 seconds), interval(10 millis)) {
+      eventually(timeout(10.seconds), interval(10.milliseconds)) {
         // The receiver is stopped once, so if it's restarted, it should be started twice.
         assert(startTimes === 2)
       }
@@ -98,7 +98,7 @@ class ReceiverTrackerSuite extends TestSuiteBase {
       val output = new TestOutputStream(input)
       output.register()
       ssc.start()
-      eventually(timeout(10 seconds), interval(10 millis)) {
+      eventually(timeout(10.seconds), interval(10.milliseconds)) {
         // If preferredLocations is set correctly, receiverTaskLocality should be PROCESS_LOCAL
         assert(receiverTaskLocality === TaskLocality.PROCESS_LOCAL)
       }
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index dc9305c..8d2fa7d 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -26,7 +26,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.concurrent._
 import scala.concurrent.duration._
-import scala.language.{implicitConversions, postfixOps}
+import scala.language.implicitConversions
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
@@ -135,7 +135,7 @@ abstract class CommonWriteAheadLogTests(
     if (waitForCompletion) {
       assert(getLogFilesInDirectory(testDir).size < logFiles.size)
     } else {
-      eventually(Eventually.timeout(1 second), interval(10 milliseconds)) {
+      eventually(Eventually.timeout(1.second), interval(10.milliseconds)) {
         assert(getLogFilesInDirectory(testDir).size < logFiles.size)
       }
     }
@@ -504,7 +504,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
     // The queue.take() immediately takes the 3, and there is nothing left in the queue at that
     // moment. Then the promise blocks the writing of 3. The rest get queued.
     writeAsync(batchedWal, event1, 3L)
-    eventually(timeout(1 second)) {
+    eventually(timeout(1.second)) {
       assert(blockingWal.isBlocked)
       assert(batchedWal.invokePrivate(queueLength()) === 0)
     }
@@ -514,12 +514,12 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
     // we would like event 5 to be written before event 4 in order to test that they get
     // sorted before being aggregated
     writeAsync(batchedWal, event5, 12L)
-    eventually(timeout(1 second)) {
+    eventually(timeout(1.second)) {
       assert(blockingWal.isBlocked)
       assert(batchedWal.invokePrivate(queueLength()) === 3)
     }
     writeAsync(batchedWal, event4, 10L)
-    eventually(timeout(1 second)) {
+    eventually(timeout(1.second)) {
       assert(walBatchingThreadPool.getActiveCount === 5)
       assert(batchedWal.invokePrivate(queueLength()) === 4)
     }
@@ -528,7 +528,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
     val buffer = wrapArrayArrayByte(Array(event1))
     val queuedEvents = Set(event2, event3, event4, event5)
 
-    eventually(timeout(1 second)) {
+    eventually(timeout(1.second)) {
       assert(batchedWal.invokePrivate(queueLength()) === 0)
       verify(wal, times(1)).write(meq(buffer), meq(3L))
       // the file name should be the timestamp of the last record, as events should be naturally
@@ -559,7 +559,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
     // The queue.take() immediately takes the 3, and there is nothing left in the queue at that
     // moment. Then the promise blocks the writing of 3. The rest get queued.
     val promise1 = writeAsync(batchedWal, event1, 3L)
-    eventually(timeout(1 second)) {
+    eventually(timeout(1.second)) {
       assert(blockingWal.isBlocked)
       assert(batchedWal.invokePrivate(queueLength()) === 0)
     }
@@ -567,7 +567,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
     val promise2 = writeAsync(batchedWal, event2, 5L)
     val promise3 = writeAsync(batchedWal, event3, 8L)
 
-    eventually(timeout(1 second)) {
+    eventually(timeout(1.second)) {
       assert(walBatchingThreadPool.getActiveCount === 3)
       assert(blockingWal.isBlocked)
       assert(batchedWal.invokePrivate(queueLength()) === 2) // event1 is being written
@@ -576,7 +576,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
     val writePromises = Seq(promise1, promise2, promise3)
 
     batchedWal.close()
-    eventually(timeout(1 second)) {
+    eventually(timeout(1.second)) {
       assert(writePromises.forall(_.isCompleted))
       assert(writePromises.forall(_.future.value.get.isFailure)) // all should have failed
     }
@@ -772,7 +772,7 @@ object WriteAheadLogSuite {
 
     override def write(record: ByteBuffer, time: Long): WriteAheadLogRecordHandle = {
       isWriteCalled = true
-      eventually(Eventually.timeout(2 second)) {
+      eventually(Eventually.timeout(2.second)) {
         assert(!blockWrite)
       }
       wal.write(record, time)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org