You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2019/04/11 18:44:06 UTC
[spark] branch master updated:
[SPARK-27404][CORE][SQL][STREAMING][YARN] Fix build warnings for 3.0:
postfixOps edition
This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 4ec7f63 [SPARK-27404][CORE][SQL][STREAMING][YARN] Fix build warnings for 3.0: postfixOps edition
4ec7f63 is described below
commit 4ec7f631aa23bc0121656dd77f05767f447112c0
Author: Sean Owen <se...@databricks.com>
AuthorDate: Thu Apr 11 13:43:44 2019 -0500
[SPARK-27404][CORE][SQL][STREAMING][YARN] Fix build warnings for 3.0: postfixOps edition
## What changes were proposed in this pull request?
Fix build warnings -- see some details below.
But mostly, remove use of postfix syntax where it causes warnings without the `scala.language.postfixOps` import. This is mostly in expressions like "120000 milliseconds". Which, I'd like to simplify to things like "2.minutes" anyway.
## How was this patch tested?
Existing tests.
Closes #24314 from srowen/SPARK-27404.
Authored-by: Sean Owen <se...@databricks.com>
Signed-off-by: Sean Owen <se...@databricks.com>
---
.../network/shuffle/RetryingBlockFetcherSuite.java | 8 +--
.../org/apache/spark/BarrierTaskContext.scala | 3 +-
.../apache/spark/deploy/FaultToleranceTest.scala | 17 +++---
.../scala/org/apache/spark/rpc/RpcTimeout.scala | 2 +-
.../org/apache/spark/scheduler/DAGScheduler.scala | 3 +-
.../main/scala/org/apache/spark/ui/UIUtils.scala | 19 +++----
.../spark/BarrierStageOnSubmittedSuite.scala | 3 +-
.../org/apache/spark/ContextCleanerSuite.scala | 16 +++---
.../test/scala/org/apache/spark/DriverSuite.scala | 4 +-
.../org/apache/spark/JobCancellationSuite.scala | 4 +-
.../scala/org/apache/spark/SparkConfSuite.scala | 5 +-
.../org/apache/spark/StatusTrackerSuite.scala | 29 +++++-----
.../org/apache/spark/deploy/SparkSubmitSuite.scala | 2 +-
.../deploy/StandaloneDynamicAllocationSuite.scala | 2 +-
.../spark/deploy/client/AppClientSuite.scala | 2 +-
.../deploy/history/FsHistoryProviderSuite.scala | 11 ++--
.../spark/deploy/history/HistoryServerSuite.scala | 20 +------
.../apache/spark/deploy/master/MasterSuite.scala | 11 ++--
.../apache/spark/deploy/worker/WorkerSuite.scala | 6 +-
.../org/apache/spark/executor/ExecutorSuite.scala | 1 -
.../spark/launcher/LauncherBackendSuite.scala | 5 +-
.../apache/spark/rdd/AsyncRDDActionsSuite.scala | 4 +-
.../apache/spark/rdd/LocalCheckpointSuite.scala | 3 +-
.../scala/org/apache/spark/rpc/RpcEnvSuite.scala | 55 +++++++++----------
.../CoarseGrainedSchedulerBackendSuite.scala | 2 +-
.../apache/spark/scheduler/DAGSchedulerSuite.scala | 2 +-
.../scheduler/OutputCommitCoordinatorSuite.scala | 3 +-
.../scheduler/SchedulerIntegrationSuite.scala | 2 +-
.../spark/scheduler/TaskResultGetterSuite.scala | 7 +--
.../storage/BlockManagerReplicationSuite.scala | 15 +++--
.../apache/spark/storage/BlockManagerSuite.scala | 22 ++++----
.../org/apache/spark/ui/UISeleniumSuite.scala | 54 +++++++++---------
.../test/scala/org/apache/spark/ui/UISuite.scala | 4 +-
.../org/apache/spark/util/EventLoopSuite.scala | 19 +++----
.../collection/ExternalAppendOnlyMapSuite.scala | 3 +-
.../sql/jdbc/DockerJDBCIntegrationSuite.scala | 2 +-
.../kafka010/KafkaDontFailOnDataLossSuite.scala | 2 +-
.../apache/spark/sql/kafka010/KafkaTestUtils.scala | 9 ++-
.../kafka010/DirectKafkaStreamSuite.scala | 21 ++++---
.../kinesis/KinesisCheckpointerSuite.scala | 11 ++--
.../streaming/kinesis/KinesisReceiverSuite.scala | 6 +-
.../streaming/kinesis/KinesisStreamSuite.scala | 42 ++++++++------
.../scala/org/apache/spark/ml/MLEventsSuite.scala | 41 +++++++-------
.../spark/deploy/yarn/BaseYarnClusterSuite.scala | 4 +-
.../spark/deploy/yarn/YarnClusterSuite.scala | 8 +--
.../network/yarn/YarnShuffleServiceSuite.scala | 5 +-
.../spark/sql/sources/v2/JavaSimpleBatchTable.java | 54 ------------------
...atchTable.java => JavaSimpleReaderFactory.java} | 64 ++--------------------
.../sql/sources/v2/JavaSimpleScanBuilder.java | 47 ++++++++++++++++
.../org/apache/spark/sql/CachedTableSuite.scala | 11 ++--
.../org/apache/spark/sql/DatasetCacheSuite.scala | 4 +-
.../streaming/sources/TextSocketStreamSuite.scala | 2 +-
.../state/StateStoreCoordinatorSuite.scala | 8 +--
.../streaming/state/StateStoreSuite.scala | 2 +-
.../sql/streaming/StreamingQueryManagerSuite.scala | 58 ++++++++++----------
.../StreamingQueryStatusAndProgressSuite.scala | 3 +-
.../sql/hive/thriftserver/UISeleniumSuite.scala | 4 +-
.../streaming/receiver/ReceivedBlockHandler.scala | 2 +-
.../streaming/util/FileBasedWriteAheadLog.scala | 5 +-
.../apache/spark/streaming/CheckpointSuite.scala | 4 +-
.../streaming/ReceivedBlockHandlerSuite.scala | 3 +-
.../streaming/ReceivedBlockTrackerSuite.scala | 4 +-
.../org/apache/spark/streaming/ReceiverSuite.scala | 22 ++++----
.../spark/streaming/StreamingContextSuite.scala | 52 +++++++++---------
.../spark/streaming/StreamingListenerSuite.scala | 4 +-
.../apache/spark/streaming/UISeleniumSuite.scala | 8 +--
.../streaming/receiver/BlockGeneratorSuite.scala | 16 +++---
.../scheduler/ExecutorAllocationManagerSuite.scala | 2 +-
.../streaming/scheduler/JobGeneratorSuite.scala | 5 +-
.../streaming/scheduler/ReceiverTrackerSuite.scala | 8 +--
.../spark/streaming/util/WriteAheadLogSuite.scala | 20 +++----
71 files changed, 413 insertions(+), 518 deletions(-)
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
index a530e16..6f90df5 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
@@ -46,9 +46,9 @@ import static org.apache.spark.network.shuffle.RetryingBlockFetcher.BlockFetchSt
*/
public class RetryingBlockFetcherSuite {
- ManagedBuffer block0 = new NioManagedBuffer(ByteBuffer.wrap(new byte[13]));
- ManagedBuffer block1 = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
- ManagedBuffer block2 = new NioManagedBuffer(ByteBuffer.wrap(new byte[19]));
+ private final ManagedBuffer block0 = new NioManagedBuffer(ByteBuffer.wrap(new byte[13]));
+ private final ManagedBuffer block1 = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
+ private final ManagedBuffer block2 = new NioManagedBuffer(ByteBuffer.wrap(new byte[19]));
@Test
public void testNoFailures() throws IOException, InterruptedException {
@@ -291,7 +291,7 @@ public class RetryingBlockFetcherSuite {
}
assertNotNull(stub);
- stub.when(fetchStarter).createAndStart(any(), anyObject());
+ stub.when(fetchStarter).createAndStart(any(), any());
String[] blockIdArray = blockIds.toArray(new String[blockIds.size()]);
new RetryingBlockFetcher(conf, fetchStarter, blockIdArray, listener).start();
}
diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
index 2d842b9..a354f44 100644
--- a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
@@ -20,7 +20,6 @@ package org.apache.spark
import java.util.{Properties, Timer, TimerTask}
import scala.concurrent.duration._
-import scala.language.postfixOps
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.executor.TaskMetrics
@@ -122,7 +121,7 @@ class BarrierTaskContext private[spark] (
barrierEpoch),
// Set a fixed timeout for RPC here, so users shall get a SparkException thrown by
// BarrierCoordinator on timeout, instead of RPCTimeoutException from the RPC framework.
- timeout = new RpcTimeout(31536000 /* = 3600 * 24 * 365 */ seconds, "barrierTimeout"))
+ timeout = new RpcTimeout(365.days, "barrierTimeout"))
barrierEpoch += 1
logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt $stageAttemptNumber) finished " +
"global sync successfully, waited for " +
diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
index a662430..99f8412 100644
--- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
@@ -26,7 +26,6 @@ import scala.collection.mutable.ListBuffer
import scala.concurrent.{Future, Promise}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
-import scala.language.postfixOps
import scala.sys.process._
import org.json4s._
@@ -112,7 +111,7 @@ private object FaultToleranceTest extends App with Logging {
assertValidClusterState()
killLeader()
- delay(30 seconds)
+ delay(30.seconds)
assertValidClusterState()
createClient()
assertValidClusterState()
@@ -126,12 +125,12 @@ private object FaultToleranceTest extends App with Logging {
killLeader()
addMasters(1)
- delay(30 seconds)
+ delay(30.seconds)
assertValidClusterState()
killLeader()
addMasters(1)
- delay(30 seconds)
+ delay(30.seconds)
assertValidClusterState()
}
@@ -156,7 +155,7 @@ private object FaultToleranceTest extends App with Logging {
killLeader()
workers.foreach(_.kill())
workers.clear()
- delay(30 seconds)
+ delay(30.seconds)
addWorkers(2)
assertValidClusterState()
}
@@ -174,7 +173,7 @@ private object FaultToleranceTest extends App with Logging {
(1 to 3).foreach { _ =>
killLeader()
- delay(30 seconds)
+ delay(30.seconds)
assertValidClusterState()
assertTrue(getLeader == masters.head)
addMasters(1)
@@ -264,7 +263,7 @@ private object FaultToleranceTest extends App with Logging {
}
// Avoid waiting indefinitely (e.g., we could register but get no executors).
- assertTrue(ThreadUtils.awaitResult(f, 120 seconds))
+ assertTrue(ThreadUtils.awaitResult(f, 2.minutes))
}
/**
@@ -317,7 +316,7 @@ private object FaultToleranceTest extends App with Logging {
}
try {
- assertTrue(ThreadUtils.awaitResult(f, 120 seconds))
+ assertTrue(ThreadUtils.awaitResult(f, 2.minutes))
} catch {
case e: TimeoutException =>
logError("Master states: " + masters.map(_.state))
@@ -421,7 +420,7 @@ private object SparkDocker {
}
dockerCmd.run(ProcessLogger(findIpAndLog _))
- val ip = ThreadUtils.awaitResult(ipPromise.future, 30 seconds)
+ val ip = ThreadUtils.awaitResult(ipPromise.future, 30.seconds)
val dockerId = Docker.getLastProcessId
(ip, dockerId, outFile)
}
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala b/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala
index 3dc41f7..770ae2f 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala
@@ -52,7 +52,7 @@ private[spark] class RpcTimeout(val duration: FiniteDuration, val timeoutProp: S
*
* @note This can be used in the recover callback of a Future to add to a TimeoutException
* Example:
- * val timeout = new RpcTimeout(5 millis, "short timeout")
+ * val timeout = new RpcTimeout(5.milliseconds, "short timeout")
* Future(throw new TimeoutException).recover(timeout.addMessageIfTimeout)
*/
def addMessageIfTimeout[T]: PartialFunction[Throwable, T] = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index d967d38..524b0c4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -27,7 +27,6 @@ import scala.collection.Map
import scala.collection.mutable.{ArrayStack, HashMap, HashSet}
import scala.concurrent.duration._
import scala.language.existentials
-import scala.language.postfixOps
import scala.util.control.NonFatal
import org.apache.commons.lang3.SerializationUtils
@@ -270,7 +269,7 @@ private[spark] class DAGScheduler(
listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates,
Some(executorUpdates)))
blockManagerMaster.driverEndpoint.askSync[Boolean](
- BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat"))
+ BlockManagerHeartbeat(blockManagerId), new RpcTimeout(10.minutes, "BlockManagerHeartbeat"))
}
/**
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index d7bda8b..11647c0 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -109,12 +109,12 @@ private[spark] object UIUtils extends Logging {
}
}
// if time is more than a year
- return s"$yearString $weekString $dayString"
+ s"$yearString $weekString $dayString"
} catch {
case e: Exception =>
logError("Error converting time to string", e)
// if there is some error, return blank string
- return ""
+ ""
}
}
@@ -336,7 +336,7 @@ private[spark] object UIUtils extends Logging {
def getHeaderContent(header: String): Seq[Node] = {
if (newlinesInHeader) {
<ul class="unstyled">
- { header.split("\n").map { case t => <li> {t} </li> } }
+ { header.split("\n").map(t => <li> {t} </li>) }
</ul>
} else {
Text(header)
@@ -446,7 +446,7 @@ private[spark] object UIUtils extends Logging {
* the whole string will rendered as a simple escaped text.
*
* Note: In terms of security, only anchor tags with root relative links are supported. So any
- * attempts to embed links outside Spark UI, or other tags like {@code <script>} will cause in
+ * attempts to embed links outside Spark UI, or other tags like <script> will cause in
* the whole description to be treated as plain text.
*
* @param desc the original job or stage description string, which may contain html tags.
@@ -458,7 +458,6 @@ private[spark] object UIUtils extends Logging {
* is true, and an Elem otherwise.
*/
def makeDescription(desc: String, basePathUri: String, plainText: Boolean = false): NodeSeq = {
- import scala.language.postfixOps
// If the description can be parsed as HTML and has only relative links, then render
// as HTML, otherwise render as escaped string
@@ -468,9 +467,7 @@ private[spark] object UIUtils extends Logging {
// Verify that this has only anchors and span (we are wrapping in span)
val allowedNodeLabels = Set("a", "span", "br")
- val illegalNodes = xml \\ "_" filterNot { case node: Node =>
- allowedNodeLabels.contains(node.label)
- }
+ val illegalNodes = (xml \\ "_").filterNot(node => allowedNodeLabels.contains(node.label))
if (illegalNodes.nonEmpty) {
throw new IllegalArgumentException(
"Only HTML anchors allowed in job descriptions\n" +
@@ -491,8 +488,8 @@ private[spark] object UIUtils extends Logging {
new RewriteRule() {
override def transform(n: Node): Seq[Node] = {
n match {
- case e: Elem if e.child isEmpty => Text(e.text)
- case e: Elem if e.child nonEmpty => Text(e.child.flatMap(transform).text)
+ case e: Elem if e.child.isEmpty => Text(e.text)
+ case e: Elem => Text(e.child.flatMap(transform).text)
case _ => n
}
}
@@ -503,7 +500,7 @@ private[spark] object UIUtils extends Logging {
new RewriteRule() {
override def transform(n: Node): Seq[Node] = {
n match {
- case e: Elem if e \ "@href" nonEmpty =>
+ case e: Elem if (e \ "@href").nonEmpty =>
val relativePath = e.attribute("href").get.toString
val fullUri = s"${basePathUri.stripSuffix("/")}/${relativePath.stripPrefix("/")}"
e % Attribute(null, "href", fullUri, Null)
diff --git a/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala b/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala
index ca839d1..435b927 100644
--- a/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala
@@ -18,7 +18,6 @@
package org.apache.spark
import scala.concurrent.duration._
-import scala.language.postfixOps
import org.apache.spark.internal.config._
import org.apache.spark.rdd.{PartitionPruningRDD, RDD}
@@ -52,7 +51,7 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext
)
val error = intercept[SparkException] {
- ThreadUtils.awaitResult(futureAction, 5 seconds)
+ ThreadUtils.awaitResult(futureAction, 5.seconds)
}.getCause.getMessage
assert(error.contains(message))
}
diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
index d4bf20c..c16e227 100644
--- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
@@ -43,7 +43,7 @@ import org.apache.spark.storage._
abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[SortShuffleManager])
extends SparkFunSuite with BeforeAndAfter with LocalSparkContext
{
- implicit val defaultTimeout = timeout(10000 millis)
+ implicit val defaultTimeout = timeout(10.seconds)
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("ContextCleanerSuite")
@@ -159,7 +159,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
val preGCTester = new CleanerTester(sc, rddIds = Seq(rdd.id))
runGC()
intercept[Exception] {
- preGCTester.assertCleanup()(timeout(1000 millis))
+ preGCTester.assertCleanup()(timeout(1.second))
}
// Test that GC causes RDD cleanup after dereferencing the RDD
@@ -178,7 +178,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
val preGCTester = new CleanerTester(sc, shuffleIds = Seq(0))
runGC()
intercept[Exception] {
- preGCTester.assertCleanup()(timeout(1000 millis))
+ preGCTester.assertCleanup()(timeout(1.second))
}
rdd.count() // Defeat early collection by the JVM
@@ -196,7 +196,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
val preGCTester = new CleanerTester(sc, broadcastIds = Seq(broadcast.id))
runGC()
intercept[Exception] {
- preGCTester.assertCleanup()(timeout(1000 millis))
+ preGCTester.assertCleanup()(timeout(1.second))
}
// Test that GC causes broadcast cleanup after dereferencing the broadcast variable
@@ -272,7 +272,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
val preGCTester = new CleanerTester(sc, rddIds = Seq(rdd.id))
runGC()
intercept[Exception] {
- preGCTester.assertCleanup()(timeout(1000 millis))
+ preGCTester.assertCleanup()(timeout(1.second))
}
// Test that RDD going out of scope does cause the checkpoint blocks to be cleaned up
@@ -294,7 +294,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
val preGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds)
runGC()
intercept[Exception] {
- preGCTester.assertCleanup()(timeout(1000 millis))
+ preGCTester.assertCleanup()(timeout(1.second))
}
// Test that GC triggers the cleanup of all variables after the dereferencing them
@@ -334,7 +334,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
val preGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds)
runGC()
intercept[Exception] {
- preGCTester.assertCleanup()(timeout(1000 millis))
+ preGCTester.assertCleanup()(timeout(1.second))
}
// Test that GC triggers the cleanup of all variables after the dereferencing them
@@ -408,7 +408,7 @@ class CleanerTester(
/** Assert that all the stuff has been cleaned up */
def assertCleanup()(implicit waitTimeout: PatienceConfiguration.Timeout) {
try {
- eventually(waitTimeout, interval(100 millis)) {
+ eventually(waitTimeout, interval(100.milliseconds)) {
assert(isAllCleanedUp,
"The following resources were not cleaned up:\n" + uncleanedResourcesToString)
}
diff --git a/core/src/test/scala/org/apache/spark/DriverSuite.scala b/core/src/test/scala/org/apache/spark/DriverSuite.scala
index 896cd2e..182f28c 100644
--- a/core/src/test/scala/org/apache/spark/DriverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DriverSuite.scala
@@ -33,12 +33,12 @@ class DriverSuite extends SparkFunSuite with TimeLimits {
ignore("driver should exit after finishing without cleanup (SPARK-530)") {
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
val masters = Table("master", "local", "local-cluster[2,1,1024]")
- forAll(masters) { (master: String) =>
+ forAll(masters) { master =>
val process = Utils.executeCommand(
Seq(s"$sparkHome/bin/spark-class", "org.apache.spark.DriverWithoutCleanup", master),
new File(sparkHome),
Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
- failAfter(60 seconds) { process.waitFor() }
+ failAfter(1.minute) { process.waitFor() }
// Ensure we still kill the process in case it timed out
process.destroy()
}
diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
index 00fecd4..db90c31 100644
--- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
@@ -250,7 +250,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
assert(e.getMessage contains "cancel")
// Once A is cancelled, job B should finish fairly quickly.
- assert(ThreadUtils.awaitResult(jobB, 60.seconds) === 100)
+ assert(ThreadUtils.awaitResult(jobB, 1.minute) === 100)
}
test("task reaper will not kill JVM if spark.task.killTimeout == -1") {
@@ -290,7 +290,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
assert(e.getMessage contains "cancel")
// Once A is cancelled, job B should finish fairly quickly.
- assert(ThreadUtils.awaitResult(jobB, 60.seconds) === 100)
+ assert(ThreadUtils.awaitResult(jobB, 1.minute) === 100)
}
test("two jobs sharing the same stage") {
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 0795790..3a5de8a 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -21,7 +21,6 @@ import java.util.concurrent.{Executors, TimeUnit}
import scala.collection.JavaConverters._
import scala.concurrent.duration._
-import scala.language.postfixOps
import scala.util.{Random, Try}
import com.esotericsoftware.kryo.Kryo
@@ -279,10 +278,10 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
assert(RpcUtils.retryWaitMs(conf) === 2L)
conf.set("spark.akka.askTimeout", "3")
- assert(RpcUtils.askRpcTimeout(conf).duration === (3 seconds))
+ assert(RpcUtils.askRpcTimeout(conf).duration === 3.seconds)
conf.set("spark.akka.lookupTimeout", "4")
- assert(RpcUtils.lookupRpcTimeout(conf).duration === (4 seconds))
+ assert(RpcUtils.lookupRpcTimeout(conf).duration === 4.seconds)
}
test("SPARK-13727") {
diff --git a/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala b/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala
index a15ae04..c96db4e 100644
--- a/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark
import scala.concurrent.duration._
import scala.language.implicitConversions
-import scala.language.postfixOps
import org.scalatest.Matchers
import org.scalatest.concurrent.Eventually._
@@ -31,25 +30,25 @@ class StatusTrackerSuite extends SparkFunSuite with Matchers with LocalSparkCont
test("basic status API usage") {
sc = new SparkContext("local", "test", new SparkConf(false))
val jobFuture = sc.parallelize(1 to 10000, 2).map(identity).groupBy(identity).collectAsync()
- val jobId: Int = eventually(timeout(10 seconds)) {
+ val jobId: Int = eventually(timeout(10.seconds)) {
val jobIds = jobFuture.jobIds
jobIds.size should be(1)
jobIds.head
}
- val jobInfo = eventually(timeout(10 seconds)) {
+ val jobInfo = eventually(timeout(10.seconds)) {
sc.statusTracker.getJobInfo(jobId).get
}
jobInfo.status() should not be FAILED
val stageIds = jobInfo.stageIds()
stageIds.size should be(2)
- val firstStageInfo = eventually(timeout(10 seconds)) {
+ val firstStageInfo = eventually(timeout(10.seconds)) {
sc.statusTracker.getStageInfo(stageIds.min).get
}
firstStageInfo.stageId() should be(stageIds.min)
firstStageInfo.currentAttemptId() should be(0)
firstStageInfo.numTasks() should be(2)
- eventually(timeout(10 seconds)) {
+ eventually(timeout(10.seconds)) {
val updatedFirstStageInfo = sc.statusTracker.getStageInfo(stageIds.min).get
updatedFirstStageInfo.numCompletedTasks() should be(2)
updatedFirstStageInfo.numActiveTasks() should be(0)
@@ -61,27 +60,27 @@ class StatusTrackerSuite extends SparkFunSuite with Matchers with LocalSparkCont
sc = new SparkContext("local", "test", new SparkConf(false))
// Passing `null` should return jobs that were not run in a job group:
val defaultJobGroupFuture = sc.parallelize(1 to 1000).countAsync()
- val defaultJobGroupJobId = eventually(timeout(10 seconds)) {
+ val defaultJobGroupJobId = eventually(timeout(10.seconds)) {
defaultJobGroupFuture.jobIds.head
}
- eventually(timeout(10 seconds)) {
+ eventually(timeout(10.seconds)) {
sc.statusTracker.getJobIdsForGroup(null).toSet should be (Set(defaultJobGroupJobId))
}
// Test jobs submitted in job groups:
sc.setJobGroup("my-job-group", "description")
sc.statusTracker.getJobIdsForGroup("my-job-group") should be (Seq.empty)
val firstJobFuture = sc.parallelize(1 to 1000).countAsync()
- val firstJobId = eventually(timeout(10 seconds)) {
+ val firstJobId = eventually(timeout(10.seconds)) {
firstJobFuture.jobIds.head
}
- eventually(timeout(10 seconds)) {
+ eventually(timeout(10.seconds)) {
sc.statusTracker.getJobIdsForGroup("my-job-group") should be (Seq(firstJobId))
}
val secondJobFuture = sc.parallelize(1 to 1000).countAsync()
- val secondJobId = eventually(timeout(10 seconds)) {
+ val secondJobId = eventually(timeout(10.seconds)) {
secondJobFuture.jobIds.head
}
- eventually(timeout(10 seconds)) {
+ eventually(timeout(10.seconds)) {
sc.statusTracker.getJobIdsForGroup("my-job-group").toSet should be (
Set(firstJobId, secondJobId))
}
@@ -92,10 +91,10 @@ class StatusTrackerSuite extends SparkFunSuite with Matchers with LocalSparkCont
sc.setJobGroup("my-job-group2", "description")
sc.statusTracker.getJobIdsForGroup("my-job-group2") shouldBe empty
val firstJobFuture = sc.parallelize(1 to 1000, 1).takeAsync(1)
- val firstJobId = eventually(timeout(10 seconds)) {
+ val firstJobId = eventually(timeout(10.seconds)) {
firstJobFuture.jobIds.head
}
- eventually(timeout(10 seconds)) {
+ eventually(timeout(10.seconds)) {
sc.statusTracker.getJobIdsForGroup("my-job-group2") should be (Seq(firstJobId))
}
}
@@ -105,10 +104,10 @@ class StatusTrackerSuite extends SparkFunSuite with Matchers with LocalSparkCont
sc.setJobGroup("my-job-group2", "description")
sc.statusTracker.getJobIdsForGroup("my-job-group2") shouldBe empty
val firstJobFuture = sc.parallelize(1 to 1000, 2).takeAsync(999)
- val firstJobId = eventually(timeout(10 seconds)) {
+ eventually(timeout(10.seconds)) {
firstJobFuture.jobIds.head
}
- eventually(timeout(10 seconds)) {
+ eventually(timeout(10.seconds)) {
sc.statusTracker.getJobIdsForGroup("my-job-group2") should have size 2
}
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 641751f..2a17245 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -1277,7 +1277,7 @@ object SparkSubmitSuite extends SparkFunSuite with TimeLimits {
Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
try {
- val exitCode = failAfter(60 seconds) { process.waitFor() }
+ val exitCode = failAfter(1.minute) { process.waitFor() }
if (exitCode != 0) {
fail(s"Process returned with exit code $exitCode. See the log4j logs for more detail.")
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index 573a496..1ed2a1a 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -67,7 +67,7 @@ class StandaloneDynamicAllocationSuite
master = makeMaster()
workers = makeWorkers(10, 2048)
// Wait until all workers register with master successfully
- eventually(timeout(60.seconds), interval(10.millis)) {
+ eventually(timeout(1.minute), interval(10.milliseconds)) {
assert(getMasterState.workers.size === numWorkers)
}
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
index baeefea..a1d3077 100644
--- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
@@ -65,7 +65,7 @@ class AppClientSuite
master = makeMaster()
workers = makeWorkers(10, 2048)
// Wait until all workers register with master successfully
- eventually(timeout(60.seconds), interval(10.millis)) {
+ eventually(timeout(1.minute), interval(10.milliseconds)) {
assert(getMasterState.workers.size === numWorkers)
}
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index d183170..1a326a8 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -25,7 +25,6 @@ import java.util.zip.{ZipInputStream, ZipOutputStream}
import scala.collection.JavaConverters._
import scala.concurrent.duration._
-import scala.language.postfixOps
import com.google.common.io.{ByteStreams, Files}
import org.apache.commons.io.FileUtils
@@ -142,7 +141,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
clock.getTimeMillis(), "test", false))
// Make sure the UI can be rendered.
- list.foreach { case info =>
+ list.foreach { info =>
val appUi = provider.getAppUI(info.id, None)
appUi should not be null
appUi should not be None
@@ -281,7 +280,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
list.last.attempts.size should be (3)
list.head.attempts.head.attemptId should be (Some("attempt1"))
- list.foreach { case app =>
+ list.foreach { app =>
app.attempts.foreach { attempt =>
val appUi = provider.getAppUI(app.id, attempt.attemptId)
appUi should not be null
@@ -734,7 +733,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
provider.inSafeMode = false
clock.setTime(10000)
- eventually(timeout(1 second), interval(10 millis)) {
+ eventually(timeout(1.second), interval(10.milliseconds)) {
provider.getConfig().keys should not contain ("HDFS State")
}
} finally {
@@ -747,12 +746,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
val clock = new ManualClock()
val provider = new SafeModeTestProvider(createTestConf(), clock)
val errorHandler = mock(classOf[Thread.UncaughtExceptionHandler])
- val initThread = provider.startSafeModeCheckThread(Some(errorHandler))
+ provider.startSafeModeCheckThread(Some(errorHandler))
try {
provider.inSafeMode = false
clock.setTime(10000)
- eventually(timeout(1 second), interval(10 millis)) {
+ eventually(timeout(1.second), interval(10.milliseconds)) {
verify(errorHandler).uncaughtException(any(), any())
}
} finally {
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index 6665a89..c99bcf2 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -25,9 +25,7 @@ import javax.servlet.http.{HttpServletRequest, HttpServletRequestWrapper, HttpSe
import scala.collection.JavaConverters._
import scala.concurrent.duration._
-import scala.language.postfixOps
-import com.codahale.metrics.Counter
import com.google.common.io.{ByteStreams, Files}
import org.apache.commons.io.{FileUtils, IOUtils}
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
@@ -457,16 +455,6 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
val port = server.boundPort
val metrics = server.cacheMetrics
- // assert that a metric has a value; if not dump the whole metrics instance
- def assertMetric(name: String, counter: Counter, expected: Long): Unit = {
- val actual = counter.getCount
- if (actual != expected) {
- // this is here because Scalatest loses stack depth
- fail(s"Wrong $name value - expected $expected but got $actual" +
- s" in metrics\n$metrics")
- }
- }
-
// build a URL for an app or app/attempt plus a page underneath
def buildURL(appId: String, suffix: String): URL = {
new URL(s"http://localhost:$port/history/$appId$suffix")
@@ -477,13 +465,11 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
new URL(s"http://localhost:$port/api/v1/applications/$appId$suffix")
}
- val historyServerRoot = new URL(s"http://localhost:$port/")
-
// start initial job
val d = sc.parallelize(1 to 10)
d.count()
- val stdInterval = interval(100 milliseconds)
- val appId = eventually(timeout(20 seconds), stdInterval) {
+ val stdInterval = interval(100.milliseconds)
+ val appId = eventually(timeout(20.seconds), stdInterval) {
val json = getContentAndCode("applications", port)._2.get
val apps = parse(json).asInstanceOf[JArray].arr
apps should have size 1
@@ -567,7 +553,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
d2.count()
dumpLogDir("After second job")
- val stdTimeout = timeout(10 seconds)
+ val stdTimeout = timeout(10.seconds)
logDebug("waiting for UI to update")
eventually(stdTimeout, stdInterval) {
assert(2 === getNumJobs(""),
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index fbf2acc..61aaaa5 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -26,7 +26,6 @@ import scala.collection.mutable
import scala.collection.mutable.{HashMap, HashSet}
import scala.concurrent.duration._
import scala.io.Source
-import scala.language.postfixOps
import scala.reflect.ClassTag
import org.json4s._
@@ -216,7 +215,7 @@ class MasterSuite extends SparkFunSuite
master = makeMaster(conf)
master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
// Wait until Master recover from checkpoint data.
- eventually(timeout(5 seconds), interval(100 milliseconds)) {
+ eventually(timeout(5.seconds), interval(100.milliseconds)) {
master.workers.size should be(1)
}
@@ -234,7 +233,7 @@ class MasterSuite extends SparkFunSuite
fakeWorkerInfo.coresUsed should be(0)
master.self.send(MasterChangeAcknowledged(fakeAppInfo.id))
- eventually(timeout(1 second), interval(10 milliseconds)) {
+ eventually(timeout(1.second), interval(10.milliseconds)) {
// Application state should be WAITING when "MasterChangeAcknowledged" event executed.
fakeAppInfo.state should be(ApplicationState.WAITING)
}
@@ -242,7 +241,7 @@ class MasterSuite extends SparkFunSuite
master.self.send(
WorkerSchedulerStateResponse(fakeWorkerInfo.id, fakeExecutors, Seq(fakeDriverInfo.id)))
- eventually(timeout(5 seconds), interval(100 milliseconds)) {
+ eventually(timeout(5.seconds), interval(100.milliseconds)) {
getState(master) should be(RecoveryState.ALIVE)
}
@@ -267,7 +266,7 @@ class MasterSuite extends SparkFunSuite
val localCluster = new LocalSparkCluster(2, 2, 512, conf)
localCluster.start()
try {
- eventually(timeout(5 seconds), interval(100 milliseconds)) {
+ eventually(timeout(5.seconds), interval(100.milliseconds)) {
val json = Source.fromURL(s"http://localhost:${localCluster.masterWebUIPort}/json")
.getLines().mkString("\n")
val JArray(workers) = (parse(json) \ "workers")
@@ -293,7 +292,7 @@ class MasterSuite extends SparkFunSuite
val localCluster = new LocalSparkCluster(2, 2, 512, conf)
localCluster.start()
try {
- eventually(timeout(5 seconds), interval(100 milliseconds)) {
+ eventually(timeout(5.seconds), interval(100.milliseconds)) {
val json = Source.fromURL(s"http://localhost:${localCluster.masterWebUIPort}/json")
.getLines().mkString("\n")
val JArray(workers) = (parse(json) \ "workers")
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
index 5e8b363..d899ee0 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
@@ -228,7 +228,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
testCleanupFilesWithConfig(false)
}
- private def testCleanupFilesWithConfig(value: Boolean) = {
+ private def testCleanupFilesWithConfig(value: Boolean): Unit = {
val conf = new SparkConf().set(config.STORAGE_CLEANUP_FILES_AFTER_EXECUTOR_EXIT, value)
val cleanupCalled = new AtomicBoolean(false)
@@ -257,7 +257,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
testWorkDirCleanupAndRemoveMetadataWithConfig(false)
}
- private def testWorkDirCleanupAndRemoveMetadataWithConfig(dbCleanupEnabled: Boolean) = {
+ private def testWorkDirCleanupAndRemoveMetadataWithConfig(dbCleanupEnabled: Boolean): Unit = {
val conf = new SparkConf().set("spark.shuffle.service.db.enabled", dbCleanupEnabled.toString)
conf.set("spark.worker.cleanup.appDataTtl", "60")
conf.set("spark.shuffle.service.enabled", "true")
@@ -282,7 +282,7 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
}
executorDir.setLastModified(System.currentTimeMillis - (1000 * 120))
worker.receive(WorkDirCleanup)
- eventually(timeout(1000.milliseconds), interval(10.milliseconds)) {
+ eventually(timeout(1.second), interval(10.milliseconds)) {
assert(!executorDir.exists())
assert(cleanupCalled.get() == dbCleanupEnabled)
}
diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
index 63a72e2..8a4f7a3 100644
--- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
@@ -27,7 +27,6 @@ import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.Map
import scala.concurrent.duration._
-import scala.language.postfixOps
import org.mockito.ArgumentCaptor
import org.mockito.ArgumentMatchers.{any, eq => meq}
diff --git a/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala b/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala
index 548949e..edec968 100644
--- a/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/launcher/LauncherBackendSuite.scala
@@ -20,7 +20,6 @@ package org.apache.spark.launcher
import java.util.concurrent.TimeUnit
import scala.concurrent.duration._
-import scala.language.postfixOps
import org.scalatest.Matchers
import org.scalatest.concurrent.Eventually._
@@ -57,13 +56,13 @@ class LauncherBackendSuite extends SparkFunSuite with Matchers {
.startApplication()
try {
- eventually(timeout(30 seconds), interval(100 millis)) {
+ eventually(timeout(30.seconds), interval(100.milliseconds)) {
handle.getAppId() should not be (null)
}
handle.stop()
- eventually(timeout(30 seconds), interval(100 millis)) {
+ eventually(timeout(30.seconds), interval(100.milliseconds)) {
handle.getState() should be (SparkAppHandle.State.KILLED)
}
} finally {
diff --git a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
index 24b0144..a7eb0ec 100644
--- a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
@@ -142,7 +142,7 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim
}
assert(f.get() === 10)
- failAfter(10 seconds) {
+ failAfter(10.seconds) {
sem.acquire(2)
}
}
@@ -178,7 +178,7 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim
f.get()
}
- failAfter(10 seconds) {
+ failAfter(10.seconds) {
sem.acquire(2)
}
}
diff --git a/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala b/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala
index 478f069..c942328 100644
--- a/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala
@@ -18,7 +18,6 @@
package org.apache.spark.rdd
import scala.concurrent.duration._
-import scala.language.postfixOps
import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
@@ -174,7 +173,7 @@ class LocalCheckpointSuite extends SparkFunSuite with LocalSparkContext {
val blockId = RDDBlockId(rdd.id, numPartitions - 1)
bmm.removeBlock(blockId)
// Wait until the block has been removed successfully.
- eventually(timeout(1 seconds), interval(100 milliseconds)) {
+ eventually(timeout(1.second), interval(100.milliseconds)) {
assert(bmm.getBlockStatus(blockId).isEmpty)
}
try {
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index 178d420..99b4e8f 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -26,7 +26,6 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.Await
import scala.concurrent.duration._
-import scala.language.postfixOps
import com.google.common.io.Files
import org.mockito.ArgumentMatchers.any
@@ -80,7 +79,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
}
})
rpcEndpointRef.send("hello")
- eventually(timeout(5 seconds), interval(10 millis)) {
+ eventually(timeout(5.seconds), interval(10.milliseconds)) {
assert("hello" === message)
}
}
@@ -101,7 +100,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
val rpcEndpointRef = anotherEnv.setupEndpointRef(env.address, "send-remotely")
try {
rpcEndpointRef.send("hello")
- eventually(timeout(5 seconds), interval(10 millis)) {
+ eventually(timeout(5.seconds), interval(10.milliseconds)) {
assert("hello" === message)
}
} finally {
@@ -180,7 +179,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
val rpcEndpointRef = anotherEnv.setupEndpointRef(env.address, "ask-timeout")
try {
val e = intercept[RpcTimeoutException] {
- rpcEndpointRef.askSync[String]("hello", new RpcTimeout(1 millis, shortProp))
+ rpcEndpointRef.askSync[String]("hello", new RpcTimeout(1.millisecond, shortProp))
}
// The SparkException cause should be a RpcTimeoutException with message indicating the
// controlling timeout property
@@ -236,7 +235,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
}
})
- eventually(timeout(5 seconds), interval(10 millis)) {
+ eventually(timeout(5.seconds), interval(10.milliseconds)) {
assert(e.getMessage === "Oops!")
}
}
@@ -261,7 +260,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
env.stop(endpointRef)
- eventually(timeout(5 seconds), interval(10 millis)) {
+ eventually(timeout(5.seconds), interval(10.milliseconds)) {
assert(e.getMessage === "Oops!")
}
}
@@ -282,7 +281,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
endpointRef.send("Foo")
- eventually(timeout(5 seconds), interval(10 millis)) {
+ eventually(timeout(5.seconds), interval(10.milliseconds)) {
assert(e.getMessage === "Oops!")
}
}
@@ -303,7 +302,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
}
})
- eventually(timeout(5 seconds), interval(10 millis)) {
+ eventually(timeout(5.seconds), interval(10.milliseconds)) {
// Calling `self` in `onStart` is fine
assert(callSelfSuccessfully)
}
@@ -324,7 +323,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
endpointRef.send("Foo")
- eventually(timeout(5 seconds), interval(10 millis)) {
+ eventually(timeout(5.seconds), interval(10.milliseconds)) {
// Calling `self` in `receive` is fine
assert(callSelfSuccessfully)
}
@@ -347,9 +346,9 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
env.stop(endpointRef)
- eventually(timeout(5 seconds), interval(10 millis)) {
+ eventually(timeout(5.seconds), interval(10.milliseconds)) {
// Calling `self` in `onStop` will return null, so selfOption will be None
- assert(selfOption == None)
+ assert(selfOption.isEmpty)
}
}
@@ -376,7 +375,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
}.start()
}
- eventually(timeout(5 seconds), interval(5 millis)) {
+ eventually(timeout(5.seconds), interval(5.milliseconds)) {
assert(result == 1000)
}
@@ -401,7 +400,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
env.stop(endpointRef)
env.stop(endpointRef)
- eventually(timeout(5 seconds), interval(5 millis)) {
+ eventually(timeout(5.seconds), interval(5.milliseconds)) {
// Calling stop twice should only trigger onStop once.
assert(onStopCount == 1)
}
@@ -417,7 +416,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
})
val f = endpointRef.ask[String]("Hi")
- val ack = ThreadUtils.awaitResult(f, 5 seconds)
+ val ack = ThreadUtils.awaitResult(f, 5.seconds)
assert("ack" === ack)
env.stop(endpointRef)
@@ -437,7 +436,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
val rpcEndpointRef = anotherEnv.setupEndpointRef(env.address, "sendWithReply-remotely")
try {
val f = rpcEndpointRef.ask[String]("hello")
- val ack = ThreadUtils.awaitResult(f, 5 seconds)
+ val ack = ThreadUtils.awaitResult(f, 5.seconds)
assert("ack" === ack)
} finally {
anotherEnv.shutdown()
@@ -456,7 +455,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
val f = endpointRef.ask[String]("Hi")
val e = intercept[SparkException] {
- ThreadUtils.awaitResult(f, 5 seconds)
+ ThreadUtils.awaitResult(f, 5.seconds)
}
assert("Oops" === e.getCause.getMessage)
@@ -478,7 +477,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
try {
val f = rpcEndpointRef.ask[String]("hello")
val e = intercept[SparkException] {
- ThreadUtils.awaitResult(f, 5 seconds)
+ ThreadUtils.awaitResult(f, 5.seconds)
}
assert("Oops" === e.getCause.getMessage)
} finally {
@@ -530,14 +529,14 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
// Send a message to set up the connection
serverRefInServer2.send("hello")
- eventually(timeout(5 seconds), interval(5 millis)) {
+ eventually(timeout(5.seconds), interval(5.milliseconds)) {
assert(events.contains(("onConnected", serverEnv2.address)))
}
serverEnv2.shutdown()
serverEnv2.awaitTermination()
- eventually(timeout(5 seconds), interval(5 millis)) {
+ eventually(timeout(5.seconds), interval(5.milliseconds)) {
assert(events.contains(("onConnected", serverEnv2.address)))
assert(events.contains(("onDisconnected", serverEnv2.address)))
}
@@ -558,7 +557,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
// Send a message to set up the connection
serverRefInClient.send("hello")
- eventually(timeout(5 seconds), interval(5 millis)) {
+ eventually(timeout(5.seconds), interval(5.milliseconds)) {
// We don't know the exact client address but at least we can verify the message type
assert(events.asScala.map(_._1).exists(_ == "onConnected"))
}
@@ -566,7 +565,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
clientEnv.shutdown()
clientEnv.awaitTermination()
- eventually(timeout(5 seconds), interval(5 millis)) {
+ eventually(timeout(5.seconds), interval(5.milliseconds)) {
// We don't know the exact client address but at least we can verify the message type
assert(events.asScala.map(_._1).exists(_ == "onConnected"))
assert(events.asScala.map(_._1).exists(_ == "onDisconnected"))
@@ -589,14 +588,14 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
// Send a message to set up the connection
serverRefInClient.send("hello")
- eventually(timeout(5 seconds), interval(5 millis)) {
+ eventually(timeout(5.seconds), interval(5.milliseconds)) {
assert(events.contains(("onConnected", serverEnv.address)))
}
serverEnv.shutdown()
serverEnv.awaitTermination()
- eventually(timeout(5 seconds), interval(5 millis)) {
+ eventually(timeout(5.seconds), interval(5.milliseconds)) {
assert(events.contains(("onConnected", serverEnv.address)))
assert(events.contains(("onDisconnected", serverEnv.address)))
}
@@ -624,7 +623,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
try {
val f = rpcEndpointRef.ask[String]("hello")
val e = intercept[SparkException] {
- ThreadUtils.awaitResult(f, 1 seconds)
+ ThreadUtils.awaitResult(f, 1.second)
}
assert(e.getCause.isInstanceOf[NotSerializableException])
} finally {
@@ -658,7 +657,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
})
val rpcEndpointRef = remoteEnv.setupEndpointRef(localEnv.address, "send-authentication")
rpcEndpointRef.send("hello")
- eventually(timeout(5 seconds), interval(10 millis)) {
+ eventually(timeout(5.seconds), interval(10.milliseconds)) {
assert("hello" === message)
}
} finally {
@@ -778,8 +777,8 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
}
})
- val longTimeout = new RpcTimeout(1 second, "spark.rpc.long.timeout")
- val shortTimeout = new RpcTimeout(10 millis, "spark.rpc.short.timeout")
+ val longTimeout = new RpcTimeout(1.second, "spark.rpc.long.timeout")
+ val shortTimeout = new RpcTimeout(10.milliseconds, "spark.rpc.short.timeout")
// Ask with immediate response, should complete successfully
val fut1 = rpcEndpointRef.ask[String]("hello", longTimeout)
@@ -804,7 +803,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
// once the future is complete to verify addMessageIfTimeout was invoked
val reply3 =
intercept[RpcTimeoutException] {
- Await.result(fut3, 2000 millis)
+ Await.result(fut3, 2.seconds)
}.getMessage
// scalastyle:on awaitresult
diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
index ae306d3..33f48b8 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
@@ -36,7 +36,7 @@ import org.apache.spark.util.{RpcUtils, SerializableBuffer}
class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext
with Eventually {
- private val executorUpTimeout = 60.seconds
+ private val executorUpTimeout = 1.minute
test("serialized task larger than max RPC message size") {
val conf = new SparkConf
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index e74f462..72c20a8 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -594,7 +594,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
var rdd: RDD[_] = new MyRDD(sc, 1, Nil)
(1 to 30).foreach(_ => rdd = rdd.zip(rdd))
// getPreferredLocs runs quickly, indicating that exponential graph traversal is avoided.
- failAfter(10 seconds) {
+ failAfter(10.seconds) {
val preferredLocs = scheduler.getPreferredLocs(rdd, 0)
// No preferred locations are returned.
assert(preferredLocs.length === 0)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
index 2891dd6..f582ef5 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
@@ -22,7 +22,6 @@ import java.util.Date
import java.util.concurrent.TimeoutException
import scala.concurrent.duration._
-import scala.language.postfixOps
import org.apache.hadoop.mapred._
import org.apache.hadoop.mapreduce.TaskType
@@ -159,7 +158,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
// It's an error if the job completes successfully even though no committer was authorized,
// so throw an exception if the job was allowed to complete.
intercept[TimeoutException] {
- ThreadUtils.awaitResult(futureAction, 5 seconds)
+ ThreadUtils.awaitResult(futureAction, 5.seconds)
}
assert(tempDir.list().size === 0)
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
index 43d6ec1..76be693 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
@@ -170,7 +170,7 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
// and notifies the job waiter before our original thread in the task scheduler finishes
// handling the event and marks the taskset as complete. So its ok if we need to wait a
// *little* bit longer for the original taskscheduler thread to finish up to deal w/ the race.
- eventually(timeout(1 second), interval(10 millis)) {
+ eventually(timeout(1.second), interval(10.milliseconds)) {
assert(taskScheduler.runningTaskSets.isEmpty)
}
assert(!backend.hasTasks)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
index 5b17dbf..ae46435 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
@@ -23,7 +23,6 @@ import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
-import scala.language.postfixOps
import scala.util.control.NonFatal
import com.google.common.util.concurrent.MoreExecutors
@@ -58,18 +57,18 @@ private class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: Task
// Only remove the result once, since we'd like to test the case where the task eventually
// succeeds.
serializer.get().deserialize[TaskResult[_]](serializedData) match {
- case IndirectTaskResult(blockId, size) =>
+ case IndirectTaskResult(blockId, _) =>
sparkEnv.blockManager.master.removeBlock(blockId)
// removeBlock is asynchronous. Need to wait it's removed successfully
try {
- eventually(timeout(3 seconds), interval(200 milliseconds)) {
+ eventually(timeout(3.seconds), interval(200.milliseconds)) {
assert(!sparkEnv.blockManager.master.contains(blockId))
}
removeBlockSuccessfully = true
} catch {
case NonFatal(e) => removeBlockSuccessfully = false
}
- case directResult: DirectTaskResult[_] =>
+ case _: DirectTaskResult[_] =>
taskSetManager.abort("Internal error: expect only indirect results")
}
serializedData.rewind()
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index b158b6c..a739701 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -22,7 +22,6 @@ import java.util.Locale
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import scala.language.implicitConversions
-import scala.language.postfixOps
import org.mockito.Mockito.{mock, when}
import org.scalatest.{BeforeAndAfter, Matchers}
@@ -247,7 +246,7 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
// Add another normal block manager and test that 2x replication works
makeBlockManager(10000, "anotherStore")
- eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ eventually(timeout(1.second), interval(10.milliseconds)) {
assert(replicateAndGetNumCopies("a2") === 2)
}
}
@@ -272,14 +271,14 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
// Add another store, 3x replication should work now, 4x replication should only replicate 3x
val newStore1 = makeBlockManager(storeSize, s"newstore1")
- eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ eventually(timeout(1.second), interval(10.milliseconds)) {
assert(replicateAndGetNumCopies("a3", 3) === 3)
}
assert(replicateAndGetNumCopies("a4", 4) === 3)
// Add another store, 4x replication should work now
val newStore2 = makeBlockManager(storeSize, s"newstore2")
- eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ eventually(timeout(1.second), interval(10.milliseconds)) {
assert(replicateAndGetNumCopies("a5", 4) === 4)
}
@@ -295,7 +294,7 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
val newStores = (3 to 5).map {
i => makeBlockManager(storeSize, s"newstore$i")
}
- eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ eventually(timeout(1.second), interval(10.milliseconds)) {
assert(replicateAndGetNumCopies("a7", 3) === 3)
}
}
@@ -454,13 +453,13 @@ class BlockManagerProactiveReplicationSuite extends BlockManagerReplicationBehav
master.removeExecutor(bm.blockManagerId.executorId)
bm.stop()
// giving enough time for replication to happen and new block be reported to master
- eventually(timeout(5 seconds), interval(100 millis)) {
+ eventually(timeout(5.seconds), interval(100.milliseconds)) {
val newLocations = master.getLocations(blockId).toSet
assert(newLocations.size === replicationFactor)
}
}
- val newLocations = eventually(timeout(5 seconds), interval(100 millis)) {
+ val newLocations = eventually(timeout(5.seconds), interval(100.milliseconds)) {
val _newLocations = master.getLocations(blockId).toSet
assert(_newLocations.size === replicationFactor)
_newLocations
@@ -472,7 +471,7 @@ class BlockManagerProactiveReplicationSuite extends BlockManagerReplicationBehav
"New locations contain stopped block managers.")
// Make sure all locks have been released.
- eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ eventually(timeout(1.second), interval(10.milliseconds)) {
initialStores.filter(bm => newLocations.contains(bm.blockManagerId)).foreach { bm =>
assert(bm.blockInfoManager.getTaskLockCount(BlockInfo.NON_TASK_WRITER) === 0)
}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 576e5f5..9f3d8f2 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -23,7 +23,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.Future
import scala.concurrent.duration._
-import scala.language.{implicitConversions, postfixOps}
+import scala.language.implicitConversions
import scala.reflect.ClassTag
import org.apache.commons.lang3.RandomUtils
@@ -275,19 +275,19 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
master.removeBlock("a2-to-remove")
master.removeBlock("a3-to-remove")
- eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ eventually(timeout(1.second), interval(10.milliseconds)) {
assert(!store.hasLocalBlock("a1-to-remove"))
master.getLocations("a1-to-remove") should have size 0
}
- eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ eventually(timeout(1.second), interval(10.milliseconds)) {
assert(!store.hasLocalBlock("a2-to-remove"))
master.getLocations("a2-to-remove") should have size 0
}
- eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ eventually(timeout(1.second), interval(10.milliseconds)) {
assert(store.hasLocalBlock("a3-to-remove"))
master.getLocations("a3-to-remove") should have size 0
}
- eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ eventually(timeout(1.second), interval(10.milliseconds)) {
val memStatus = master.getMemoryStatus.head._2
memStatus._1 should equal (40000L)
memStatus._2 should equal (40000L)
@@ -305,15 +305,15 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
store.putSingle("nonrddblock", a3, StorageLevel.MEMORY_ONLY)
master.removeRdd(0, blocking = false)
- eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ eventually(timeout(1.second), interval(10.milliseconds)) {
store.getSingleAndReleaseLock(rdd(0, 0)) should be (None)
master.getLocations(rdd(0, 0)) should have size 0
}
- eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ eventually(timeout(1.second), interval(10.milliseconds)) {
store.getSingleAndReleaseLock(rdd(0, 1)) should be (None)
master.getLocations(rdd(0, 1)) should have size 0
}
- eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ eventually(timeout(1.second), interval(10.milliseconds)) {
store.getSingleAndReleaseLock("nonrddblock") should not be (None)
master.getLocations("nonrddblock") should have size (1)
}
@@ -378,7 +378,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// remove broadcast 1 block from both the stores asynchronously
// and verify all broadcast 1 blocks have been removed
master.removeBroadcast(1, removeFromMaster = true, blocking = false)
- eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ eventually(timeout(1.second), interval(10.milliseconds)) {
assert(!driverStore.hasLocalBlock(broadcast1BlockId))
assert(!executorStore.hasLocalBlock(broadcast1BlockId))
}
@@ -386,7 +386,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// remove broadcast 2 from both the stores asynchronously
// and verify all broadcast 2 blocks have been removed
master.removeBroadcast(2, removeFromMaster = true, blocking = false)
- eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ eventually(timeout(1.second), interval(10.milliseconds)) {
assert(!driverStore.hasLocalBlock(broadcast2BlockId))
assert(!driverStore.hasLocalBlock(broadcast2BlockId2))
assert(!executorStore.hasLocalBlock(broadcast2BlockId))
@@ -905,7 +905,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
}
// Make sure get a1 doesn't hang and returns None.
- failAfter(1 second) {
+ failAfter(1.second) {
assert(store.getSingleAndReleaseLock("a1").isEmpty, "a1 should not be in store")
}
}
diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
index a9f03eb..1913b8d 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
@@ -123,12 +123,12 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
val ui = sc.ui.get
val rdd = sc.parallelize(Seq(1, 2, 3))
rdd.persist(StorageLevels.DISK_ONLY).count()
- eventually(timeout(5 seconds), interval(50 milliseconds)) {
+ eventually(timeout(5.seconds), interval(50.milliseconds)) {
goToUi(ui, "/storage")
val tableRowText = findAll(cssSelector("#storage-by-rdd-table td")).map(_.text).toSeq
tableRowText should contain (StorageLevels.DISK_ONLY.description)
}
- eventually(timeout(5 seconds), interval(50 milliseconds)) {
+ eventually(timeout(5.seconds), interval(50.milliseconds)) {
goToUi(ui, "/storage/rdd/?id=0")
val tableRowText = findAll(cssSelector("#rdd-storage-by-block-table td")).map(_.text).toSeq
tableRowText should contain (StorageLevels.DISK_ONLY.description)
@@ -143,12 +143,12 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
rdd.unpersist(blocking = true)
rdd.persist(StorageLevels.MEMORY_ONLY).count()
- eventually(timeout(5 seconds), interval(50 milliseconds)) {
+ eventually(timeout(5.seconds), interval(50.milliseconds)) {
goToUi(ui, "/storage")
val tableRowText = findAll(cssSelector("#storage-by-rdd-table td")).map(_.text).toSeq
tableRowText should contain (StorageLevels.MEMORY_ONLY.description)
}
- eventually(timeout(5 seconds), interval(50 milliseconds)) {
+ eventually(timeout(5.seconds), interval(50.milliseconds)) {
goToUi(ui, "/storage/rdd/?id=0")
val tableRowText = findAll(cssSelector("#rdd-storage-by-block-table td")).map(_.text).toSeq
tableRowText should contain (StorageLevels.MEMORY_ONLY.description)
@@ -203,7 +203,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
intercept[SparkException] {
sc.parallelize(1 to 10).map { x => throw new Exception()}.collect()
}
- eventually(timeout(5 seconds), interval(50 milliseconds)) {
+ eventually(timeout(5.seconds), interval(50.milliseconds)) {
goToUi(sc, "/stages")
find(id("active")) should be(None) // Since we hide empty tables
find(id("failed")).get.text should be("Failed Stages (1)")
@@ -218,7 +218,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
intercept[SparkException] {
sc.parallelize(1 to 10).map { x => unserializableObject}.collect()
}
- eventually(timeout(5 seconds), interval(50 milliseconds)) {
+ eventually(timeout(5.seconds), interval(50.milliseconds)) {
goToUi(sc, "/stages")
find(id("active")) should be(None) // Since we hide empty tables
// The failure occurs before the stage becomes active, hence we should still show only one
@@ -239,7 +239,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
withSpark(newSparkContext(killEnabled = true)) { sc =>
runSlowJob(sc)
- eventually(timeout(5 seconds), interval(50 milliseconds)) {
+ eventually(timeout(5.seconds), interval(50.milliseconds)) {
goToUi(sc, "/jobs")
assert(hasKillLink)
}
@@ -247,7 +247,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
withSpark(newSparkContext(killEnabled = false)) { sc =>
runSlowJob(sc)
- eventually(timeout(5 seconds), interval(50 milliseconds)) {
+ eventually(timeout(5.seconds), interval(50.milliseconds)) {
goToUi(sc, "/jobs")
assert(!hasKillLink)
}
@@ -255,7 +255,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
withSpark(newSparkContext(killEnabled = true)) { sc =>
runSlowJob(sc)
- eventually(timeout(5 seconds), interval(50 milliseconds)) {
+ eventually(timeout(5.seconds), interval(50.milliseconds)) {
goToUi(sc, "/stages")
assert(hasKillLink)
}
@@ -263,7 +263,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
withSpark(newSparkContext(killEnabled = false)) { sc =>
runSlowJob(sc)
- eventually(timeout(5 seconds), interval(50 milliseconds)) {
+ eventually(timeout(5.seconds), interval(50.milliseconds)) {
goToUi(sc, "/stages")
assert(!hasKillLink)
}
@@ -274,7 +274,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
withSpark(newSparkContext()) { sc =>
// If no job has been run in a job group, then "(Job Group)" should not appear in the header
sc.parallelize(Seq(1, 2, 3)).count()
- eventually(timeout(5 seconds), interval(50 milliseconds)) {
+ eventually(timeout(5.seconds), interval(50.milliseconds)) {
goToUi(sc, "/jobs")
val tableHeaders = findAll(cssSelector("th")).map(_.text).toSeq
tableHeaders(0) should not startWith "Job Id (Job Group)"
@@ -282,7 +282,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
// Once at least one job has been run in a job group, then we should display the group name:
sc.setJobGroup("my-job-group", "my-job-group-description")
sc.parallelize(Seq(1, 2, 3)).count()
- eventually(timeout(5 seconds), interval(50 milliseconds)) {
+ eventually(timeout(5.seconds), interval(50.milliseconds)) {
goToUi(sc, "/jobs")
val tableHeaders = findAll(cssSelector("th")).map(_.text).toSeq
// Can suffix up/down arrow in the header
@@ -325,7 +325,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
}
}
mappedData.count()
- eventually(timeout(5 seconds), interval(50 milliseconds)) {
+ eventually(timeout(5.seconds), interval(50.milliseconds)) {
goToUi(sc, "/jobs")
find(cssSelector(".stage-progress-cell")).get.text should be ("2/2 (1 failed)")
find(cssSelector(".progress-cell .progress")).get.text should be ("2/2 (1 failed)")
@@ -373,7 +373,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
}.groupBy(identity).map(identity).groupBy(identity).map(identity)
// Start the job:
rdd.countAsync()
- eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ eventually(timeout(10.seconds), interval(50.milliseconds)) {
goToUi(sc, "/jobs/job/?id=0")
find(id("active")).get.text should be ("Active Stages (1)")
find(id("pending")).get.text should be ("Pending Stages (2)")
@@ -399,7 +399,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
// mentioned in its job start event but which were never actually executed:
rdd.count()
rdd.count()
- eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ eventually(timeout(10.seconds), interval(50.milliseconds)) {
goToUi(sc, "/jobs")
// The completed jobs table should have two rows. The first row will be the most recent job:
val firstRow = find(cssSelector("tbody tr")).get.underlying
@@ -426,7 +426,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
// mentioned in its job start event but which were never actually executed:
rdd.count()
rdd.count()
- eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ eventually(timeout(10.seconds), interval(50.milliseconds)) {
goToUi(sc, "/jobs/job/?id=1")
find(id("pending")) should be (None)
find(id("active")) should be (None)
@@ -454,7 +454,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
// mentioned in its job start event but which were never actually executed:
rdd.count()
rdd.count()
- eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ eventually(timeout(10.seconds), interval(50.milliseconds)) {
goToUi(sc, "/jobs")
findAll(cssSelector("tbody tr a")).foreach { link =>
link.text.toLowerCase(Locale.ROOT) should include ("count")
@@ -476,7 +476,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
})
}
sparkUI.attachTab(newTab)
- eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ eventually(timeout(10.seconds), interval(50.milliseconds)) {
goToUi(sc, "")
find(cssSelector("""ul li a[href*="jobs"]""")) should not be(None)
find(cssSelector("""ul li a[href*="stages"]""")) should not be(None)
@@ -484,13 +484,13 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
find(cssSelector("""ul li a[href*="environment"]""")) should not be(None)
find(cssSelector("""ul li a[href*="foo"]""")) should not be(None)
}
- eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ eventually(timeout(10.seconds), interval(50.milliseconds)) {
// check whether new page exists
goToUi(sc, "/foo")
find(cssSelector("b")).get.text should include ("html magic")
}
sparkUI.detachTab(newTab)
- eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ eventually(timeout(10.seconds), interval(50.milliseconds)) {
goToUi(sc, "")
find(cssSelector("""ul li a[href*="jobs"]""")) should not be(None)
find(cssSelector("""ul li a[href*="stages"]""")) should not be(None)
@@ -498,7 +498,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
find(cssSelector("""ul li a[href*="environment"]""")) should not be(None)
find(cssSelector("""ul li a[href*="foo"]""")) should be(None)
}
- eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ eventually(timeout(10.seconds), interval(50.milliseconds)) {
// check new page not exist
goToUi(sc, "/foo")
find(cssSelector("b")) should be(None)
@@ -509,7 +509,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
test("kill stage POST/GET response is correct") {
withSpark(newSparkContext(killEnabled = true)) { sc =>
sc.parallelize(1 to 10).map{x => Thread.sleep(10000); x}.countAsync()
- eventually(timeout(5 seconds), interval(50 milliseconds)) {
+ eventually(timeout(5.seconds), interval(50.milliseconds)) {
val url = new URL(
sc.ui.get.webUrl.stripSuffix("/") + "/stages/stage/kill/?id=0")
// SPARK-6846: should be POST only but YARN AM doesn't proxy POST
@@ -522,7 +522,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
test("kill job POST/GET response is correct") {
withSpark(newSparkContext(killEnabled = true)) { sc =>
sc.parallelize(1 to 10).map{x => Thread.sleep(10000); x}.countAsync()
- eventually(timeout(5 seconds), interval(50 milliseconds)) {
+ eventually(timeout(5.seconds), interval(50.milliseconds)) {
val url = new URL(
sc.ui.get.webUrl.stripSuffix("/") + "/jobs/job/kill/?id=0")
// SPARK-6846: should be POST only but YARN AM doesn't proxy POST
@@ -560,7 +560,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
("8", "count")
)
- eventually(timeout(1 second), interval(50 milliseconds)) {
+ eventually(timeout(1.second), interval(50.milliseconds)) {
goToUi(sc, "/jobs")
// The completed jobs table should have two rows. The first row will be the most recent job:
find("completed-summary").get.text should be ("Completed Jobs: 10, only showing 2")
@@ -606,7 +606,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
("17", "groupBy")
)
- eventually(timeout(1 second), interval(50 milliseconds)) {
+ eventually(timeout(1.second), interval(50.milliseconds)) {
goToUi(sc, "/stages")
find("completed-summary").get.text should be ("Completed Stages: 20, only showing 3")
find("completed").get.text should be ("Completed Stages (20, only showing 3)")
@@ -679,7 +679,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
sc.parallelize(Seq(1, 2, 3)).map(identity).groupBy(identity).map(identity).groupBy(identity)
rdd.count()
- eventually(timeout(5 seconds), interval(100 milliseconds)) {
+ eventually(timeout(5.seconds), interval(100.milliseconds)) {
val stage0 = Source.fromURL(sc.ui.get.webUrl +
"/stages/stage/?id=0&attempt=0&expandDagViz=true").mkString
assert(stage0.contains("digraph G {\n subgraph clusterstage_0 {\n " +
@@ -718,7 +718,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
rdd.count()
rdd.count()
- eventually(timeout(5 seconds), interval(50 milliseconds)) {
+ eventually(timeout(5.seconds), interval(50.milliseconds)) {
goToUi(sc, "/stages")
find(id("skipped")).get.text should be("Skipped Stages (1)")
}
diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
index 1bd7aed..34fd218 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
@@ -75,7 +75,7 @@ class UISuite extends SparkFunSuite {
ignore("basic ui visibility") {
withSpark(newSparkContext()) { sc =>
// test if the ui is visible, and all the expected tabs are visible
- eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ eventually(timeout(10.seconds), interval(50.milliseconds)) {
val html = Source.fromURL(sc.ui.get.webUrl).mkString
assert(!html.contains("random data that should not be present"))
assert(html.toLowerCase(Locale.ROOT).contains("stages"))
@@ -89,7 +89,7 @@ class UISuite extends SparkFunSuite {
ignore("visibility at localhost:4040") {
withSpark(newSparkContext()) { sc =>
// test if visible from http://localhost:4040
- eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ eventually(timeout(10.seconds), interval(50.milliseconds)) {
val html = Source.fromURL("http://localhost:4040").mkString
assert(html.toLowerCase(Locale.ROOT).contains("stages"))
}
diff --git a/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala b/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala
index 5507457..45aad3f 100644
--- a/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala
@@ -21,7 +21,6 @@ import java.util.concurrent.{ConcurrentLinkedQueue, CountDownLatch}
import scala.collection.JavaConverters._
import scala.concurrent.duration._
-import scala.language.postfixOps
import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
import org.scalatest.concurrent.Eventually._
@@ -45,7 +44,7 @@ class EventLoopSuite extends SparkFunSuite with TimeLimits {
}
eventLoop.start()
(1 to 100).foreach(eventLoop.post)
- eventually(timeout(5 seconds), interval(5 millis)) {
+ eventually(timeout(5.seconds), interval(5.milliseconds)) {
assert((1 to 100) === buffer.asScala.toSeq)
}
eventLoop.stop()
@@ -80,7 +79,7 @@ class EventLoopSuite extends SparkFunSuite with TimeLimits {
}
eventLoop.start()
eventLoop.post(1)
- eventually(timeout(5 seconds), interval(5 millis)) {
+ eventually(timeout(5.seconds), interval(5.milliseconds)) {
assert(e === receivedError)
}
eventLoop.stop()
@@ -102,7 +101,7 @@ class EventLoopSuite extends SparkFunSuite with TimeLimits {
}
eventLoop.start()
eventLoop.post(1)
- eventually(timeout(5 seconds), interval(5 millis)) {
+ eventually(timeout(5.seconds), interval(5.milliseconds)) {
assert(e === receivedError)
assert(eventLoop.isActive)
}
@@ -157,7 +156,7 @@ class EventLoopSuite extends SparkFunSuite with TimeLimits {
}.start()
}
- eventually(timeout(5 seconds), interval(5 millis)) {
+ eventually(timeout(5.seconds), interval(5.milliseconds)) {
assert(threadNum * eventsFromEachThread === receivedEventsCount)
}
eventLoop.stop()
@@ -182,7 +181,7 @@ class EventLoopSuite extends SparkFunSuite with TimeLimits {
}
eventLoop.start()
eventLoop.post(1)
- failAfter(5 seconds) {
+ failAfter(5.seconds) {
// Wait until we enter `onReceive`
onReceiveLatch.await()
eventLoop.stop()
@@ -203,7 +202,7 @@ class EventLoopSuite extends SparkFunSuite with TimeLimits {
}
eventLoop.start()
eventLoop.post(1)
- eventually(timeout(5 seconds), interval(5 millis)) {
+ eventually(timeout(5.seconds), interval(5.milliseconds)) {
assert(!eventLoop.isActive)
}
}
@@ -227,7 +226,7 @@ class EventLoopSuite extends SparkFunSuite with TimeLimits {
}
}
eventLoop.start()
- eventually(timeout(5 seconds), interval(5 millis)) {
+ eventually(timeout(5.seconds), interval(5.milliseconds)) {
assert(!eventLoop.isActive)
}
assert(onStopCalled)
@@ -250,7 +249,7 @@ class EventLoopSuite extends SparkFunSuite with TimeLimits {
}
eventLoop.start()
eventLoop.post(1)
- eventually(timeout(5 seconds), interval(5 millis)) {
+ eventually(timeout(5.seconds), interval(5.milliseconds)) {
assert(!eventLoop.isActive)
}
assert(onStopCalled)
@@ -274,7 +273,7 @@ class EventLoopSuite extends SparkFunSuite with TimeLimits {
}
eventLoop.start()
eventLoop.post(1)
- eventually(timeout(5 seconds), interval(5 millis)) {
+ eventually(timeout(5.seconds), interval(5.milliseconds)) {
assert(!eventLoop.isActive)
}
assert(onStopCalled)
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index d5a20cc..2b5993a 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark.util.collection
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
-import scala.language.postfixOps
import scala.ref.WeakReference
import org.scalatest.Matchers
@@ -460,7 +459,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite
// https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala
// (lines 69-89)
// assert(map.currentMap == null)
- eventually(timeout(5 seconds), interval(200 milliseconds)) {
+ eventually(timeout(5.seconds), interval(200.milliseconds)) {
System.gc()
// direct asserts introduced some macro generated code that held a reference to the map
val tmpIsNull = null == underlyingMapRef.get.orNull
diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala
index 609696b..e9e547e 100644
--- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/DockerJDBCIntegrationSuite.scala
@@ -129,7 +129,7 @@ abstract class DockerJDBCIntegrationSuite
// Start the container and wait until the database can accept JDBC connections:
docker.startContainer(containerId)
jdbcUrl = db.getJdbcUrl(dockerIp, externalPort)
- eventually(timeout(60.seconds), interval(1.seconds)) {
+ eventually(timeout(1.minute), interval(1.second)) {
val conn = java.sql.DriverManager.getConnection(jdbcUrl)
conn.close()
}
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala
index d9edf3c..cabc8e1 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala
@@ -99,7 +99,7 @@ class KafkaDontFailOnDataLossSuite extends StreamTest with KafkaMissingOffsetsTe
testUtils.createTopic(topic, partitions = 1)
testUtils.sendMessages(topic, (0 until 50).map(_.toString).toArray)
- eventually(timeout(60.seconds)) {
+ eventually(timeout(1.minute)) {
assert(
testUtils.getEarliestOffsets(Set(topic)).head._2 > 0,
"Kafka didn't delete records after 1 minute")
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index 70b6e67..f2e4ee7 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -24,7 +24,6 @@ import java.util.{Collections, Map => JMap, Properties, UUID}
import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
-import scala.language.postfixOps
import scala.util.Random
import kafka.api.Request
@@ -138,7 +137,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
setupEmbeddedZookeeper()
setupEmbeddedKafkaServer()
- eventually(timeout(60.seconds)) {
+ eventually(timeout(1.minute)) {
assert(zkUtils.getAllBrokersInCluster().nonEmpty, "Broker was not up in 60 seconds")
}
}
@@ -414,7 +413,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
topic: String,
numPartitions: Int,
servers: Seq[KafkaServer]) {
- eventually(timeout(60.seconds), interval(200.millis)) {
+ eventually(timeout(1.minute), interval(200.milliseconds)) {
try {
verifyTopicDeletion(topic, numPartitions, servers)
} catch {
@@ -439,7 +438,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
case _ =>
false
}
- eventually(timeout(60.seconds)) {
+ eventually(timeout(1.minute)) {
assert(isPropagated, s"Partition [$topic, $partition] metadata not propagated after timeout")
}
}
@@ -448,7 +447,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
* Wait until the latest offset of the given `TopicPartition` is not less than `offset`.
*/
def waitUntilOffsetAppears(topicPartition: TopicPartition, offset: Long): Unit = {
- eventually(timeout(60.seconds)) {
+ eventually(timeout(1.minute)) {
val currentOffset = getLatestOffsets(Set(topicPartition.topic)).get(topicPartition)
assert(currentOffset.nonEmpty && currentOffset.get >= offset)
}
diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
index e042ae0..422f53d 100644
--- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
+++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
@@ -25,7 +25,6 @@ import java.util.concurrent.atomic.AtomicLong
import scala.collection.JavaConverters._
import scala.concurrent.duration._
-import scala.language.postfixOps
import scala.util.Random
import org.apache.kafka.clients.consumer._
@@ -153,7 +152,7 @@ class DirectKafkaStreamSuite
allReceived.addAll(Arrays.asList(rdd.map(r => (r.key, r.value)).collect(): _*))
}
ssc.start()
- eventually(timeout(100000.milliseconds), interval(1000.milliseconds)) {
+ eventually(timeout(100.seconds), interval(1.second)) {
assert(allReceived.size === expectedTotal,
"didn't get expected number of messages, messages:\n" +
allReceived.asScala.mkString("\n"))
@@ -219,7 +218,7 @@ class DirectKafkaStreamSuite
allReceived.addAll(Arrays.asList(rdd.map(r => (r.key, r.value)).collect(): _*))
}
ssc.start()
- eventually(timeout(100000.milliseconds), interval(1000.milliseconds)) {
+ eventually(timeout(100.seconds), interval(1.second)) {
assert(allReceived.size === expectedTotal,
"didn't get expected number of messages, messages:\n" +
allReceived.asScala.mkString("\n"))
@@ -243,7 +242,7 @@ class DirectKafkaStreamSuite
// Send some initial messages before starting context
kafkaTestUtils.sendMessages(topic, data)
- eventually(timeout(10 seconds), interval(20 milliseconds)) {
+ eventually(timeout(10.seconds), interval(20.milliseconds)) {
assert(getLatestOffset() > 3)
}
val offsetBeforeStart = getLatestOffset()
@@ -272,7 +271,7 @@ class DirectKafkaStreamSuite
ssc.start()
val newData = Map("b" -> 10)
kafkaTestUtils.sendMessages(topic, newData)
- eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ eventually(timeout(10.seconds), interval(50.milliseconds)) {
collectedData.contains("b")
}
assert(!collectedData.contains("a"))
@@ -295,7 +294,7 @@ class DirectKafkaStreamSuite
// Send some initial messages before starting context
kafkaTestUtils.sendMessages(topic, data)
- eventually(timeout(10 seconds), interval(20 milliseconds)) {
+ eventually(timeout(10.seconds), interval(20.milliseconds)) {
assert(getLatestOffset() >= 10)
}
val offsetBeforeStart = getLatestOffset()
@@ -326,7 +325,7 @@ class DirectKafkaStreamSuite
ssc.start()
val newData = Map("b" -> 10)
kafkaTestUtils.sendMessages(topic, newData)
- eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ eventually(timeout(10.seconds), interval(50.milliseconds)) {
collectedData.contains("b")
}
assert(!collectedData.contains("a"))
@@ -375,7 +374,7 @@ class DirectKafkaStreamSuite
sendData(i)
}
- eventually(timeout(20 seconds), interval(50 milliseconds)) {
+ eventually(timeout(20.seconds), interval(50.milliseconds)) {
assert(DirectKafkaStreamSuite.total.get === (1 to 10).sum)
}
@@ -414,7 +413,7 @@ class DirectKafkaStreamSuite
sendData(i)
}
- eventually(timeout(20 seconds), interval(50 milliseconds)) {
+ eventually(timeout(20.seconds), interval(50.milliseconds)) {
assert(DirectKafkaStreamSuite.total.get === (1 to 20).sum)
}
ssc.stop()
@@ -437,7 +436,7 @@ class DirectKafkaStreamSuite
def sendDataAndWaitForReceive(data: Seq[Int]) {
val strings = data.map { _.toString}
kafkaTestUtils.sendMessages(topic, strings.map { _ -> 1}.toMap)
- eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ eventually(timeout(10.seconds), interval(50.milliseconds)) {
assert(strings.forall { collectedData.contains })
}
}
@@ -612,7 +611,7 @@ class DirectKafkaStreamSuite
estimator.updateRate(rate) // Set a new rate.
// Expect blocks of data equal to "rate", scaled by the interval length in secs.
val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001)
- eventually(timeout(5.seconds), interval(10 milliseconds)) {
+ eventually(timeout(5.seconds), interval(10.milliseconds)) {
// Assert that rate estimator values are used to determine maxMessagesPerPartition.
// Funky "-" in message makes the complete assertion message read better.
assert(collectedData.asScala.exists(_.size == expectedSize),
diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala
index 9ea7bfc..ac0e6a8 100644
--- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala
+++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointerSuite.scala
@@ -21,7 +21,6 @@ import java.util.concurrent.TimeoutException
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
-import scala.language.postfixOps
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
import org.mockito.ArgumentMatchers._
@@ -87,7 +86,7 @@ class KinesisCheckpointerSuite extends TestSuiteBase
kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock)
clock.advance(5 * checkpointInterval.milliseconds)
- eventually(timeout(1 second)) {
+ eventually(timeout(1.second)) {
verify(checkpointerMock, times(1)).checkpoint(seqNum)
verify(checkpointerMock, times(1)).checkpoint(otherSeqNum)
}
@@ -108,7 +107,7 @@ class KinesisCheckpointerSuite extends TestSuiteBase
kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock)
clock.advance(checkpointInterval.milliseconds * 5)
- eventually(timeout(1 second)) {
+ eventually(timeout(1.second)) {
verify(checkpointerMock, atMost(1)).checkpoint(anyString())
}
}
@@ -129,7 +128,7 @@ class KinesisCheckpointerSuite extends TestSuiteBase
kinesisCheckpointer.setCheckpointer(shardId, checkpointerMock)
clock.advance(checkpointInterval.milliseconds)
- eventually(timeout(1 second)) {
+ eventually(timeout(1.second)) {
verify(checkpointerMock, times(1)).checkpoint(anyString())
}
// don't block test thread
@@ -138,12 +137,12 @@ class KinesisCheckpointerSuite extends TestSuiteBase
intercept[TimeoutException] {
// scalastyle:off awaitready
- Await.ready(f, 50 millis)
+ Await.ready(f, 50.milliseconds)
// scalastyle:on awaitready
}
clock.advance(checkpointInterval.milliseconds / 2)
- eventually(timeout(1 second)) {
+ eventually(timeout(1.second)) {
verify(checkpointerMock, times(1)).checkpoint(anyString)
verify(checkpointerMock, times(1)).checkpoint()
}
diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
index 7531a9c..5269084 100644
--- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
+++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
@@ -24,7 +24,7 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions._
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason
import com.amazonaws.services.kinesis.model.Record
-import org.mockito.ArgumentMatchers.{anyListOf, anyString, eq => meq}
+import org.mockito.ArgumentMatchers.{anyList, anyString, eq => meq}
import org.mockito.Mockito.{never, times, verify, when}
import org.scalatest.{BeforeAndAfter, Matchers}
import org.scalatest.mockito.MockitoSugar
@@ -95,7 +95,7 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
recordProcessor.processRecords(batch, checkpointerMock)
verify(receiverMock, times(1)).isStopped()
- verify(receiverMock, never).addRecords(anyString, anyListOf(classOf[Record]))
+ verify(receiverMock, never).addRecords(anyString, anyList())
verify(receiverMock, never).setCheckpointer(anyString, meq(checkpointerMock))
}
@@ -103,7 +103,7 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
when(receiverMock.isStopped()).thenReturn(false)
when(receiverMock.getCurrentLimit).thenReturn(Int.MaxValue)
when(
- receiverMock.addRecords(anyString, anyListOf(classOf[Record]))
+ receiverMock.addRecords(anyString, anyList())
).thenThrow(new RuntimeException())
intercept[RuntimeException] {
diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
index 5733721..51ee7fd 100644
--- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
+++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark.streaming.kinesis
import scala.collection.mutable
import scala.concurrent.duration._
-import scala.language.postfixOps
import scala.util.Random
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
@@ -198,10 +197,11 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
ssc.start()
val testData = 1 to 10
- eventually(timeout(120 seconds), interval(10 second)) {
+ eventually(timeout(2.minutes), interval(10.seconds)) {
testUtils.pushData(testData, aggregateTestData)
- assert(collected.synchronized { collected === testData.toSet },
- "\nData received does not match data sent")
+ collected.synchronized {
+ assert(collected === testData.toSet, "\nData received does not match data sent")
+ }
}
ssc.stop(stopSparkContext = false)
}
@@ -217,7 +217,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
.initialPosition(new Latest())
.checkpointInterval(Seconds(10))
.storageLevel(StorageLevel.MEMORY_ONLY)
- .buildWithMessageHandler(addFive(_))
+ .buildWithMessageHandler(addFive)
stream shouldBe a [ReceiverInputDStream[_]]
@@ -231,11 +231,12 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
ssc.start()
val testData = 1 to 10
- eventually(timeout(120 seconds), interval(10 second)) {
+ eventually(timeout(2.minutes), interval(10.seconds)) {
testUtils.pushData(testData, aggregateTestData)
val modData = testData.map(_ + 5)
- assert(collected.synchronized { collected === modData.toSet },
- "\nData received does not match data sent")
+ collected.synchronized {
+ assert(collected === modData.toSet, "\nData received does not match data sent")
+ }
}
ssc.stop(stopSparkContext = false)
}
@@ -316,10 +317,11 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
val testData2 = 11 to 20
val testData3 = 21 to 30
- eventually(timeout(60 seconds), interval(10 second)) {
+ eventually(timeout(1.minute), interval(10.seconds)) {
localTestUtils.pushData(testData1, aggregateTestData)
- assert(collected.synchronized { collected === testData1.toSet },
- "\nData received does not match data sent")
+ collected.synchronized {
+ assert(collected === testData1.toSet, "\nData received does not match data sent")
+ }
}
val shardToSplit = localTestUtils.getShards().head
@@ -332,10 +334,12 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
assert(splitCloseShards.size == 1)
assert(splitOpenShards.size == 2)
- eventually(timeout(60 seconds), interval(10 second)) {
+ eventually(timeout(1.minute), interval(10.seconds)) {
localTestUtils.pushData(testData2, aggregateTestData)
- assert(collected.synchronized { collected === (testData1 ++ testData2).toSet },
- "\nData received does not match data sent after splitting a shard")
+ collected.synchronized {
+ assert(collected === (testData1 ++ testData2).toSet,
+ "\nData received does not match data sent after splitting a shard")
+ }
}
val Seq(shardToMerge, adjShard) = splitOpenShards
@@ -348,10 +352,12 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
assert(mergedCloseShards.size == 3)
assert(mergedOpenShards.size == 1)
- eventually(timeout(60 seconds), interval(10 second)) {
+ eventually(timeout(1.minute), interval(10.seconds)) {
localTestUtils.pushData(testData3, aggregateTestData)
- assert(collected.synchronized { collected === (testData1 ++ testData2 ++ testData3).toSet },
- "\nData received does not match data sent after merging shards")
+ collected.synchronized {
+ assert(collected === (testData1 ++ testData2 ++ testData3).toSet,
+ "\nData received does not match data sent after merging shards")
+ }
}
} finally {
ssc.stop(stopSparkContext = false)
@@ -399,7 +405,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
// Run until there are at least 10 batches with some data in them
// If this times out because numBatchesWithData is empty, then its likely that foreachRDD
// function failed with exceptions, and nothing got added to `collectedData`
- eventually(timeout(2 minutes), interval(1 seconds)) {
+ eventually(timeout(2.minutes), interval(1.second)) {
testUtils.pushData(1 to 5, aggregateTestData)
assert(isCheckpointPresent && numBatchesWithData > 10)
}
diff --git a/mllib/src/test/scala/org/apache/spark/ml/MLEventsSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/MLEventsSuite.scala
index 4fe69b6..e2ee7c0 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/MLEventsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/MLEventsSuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark.ml
import scala.collection.mutable
import scala.concurrent.duration._
-import scala.language.postfixOps
import org.apache.hadoop.fs.Path
import org.mockito.ArgumentMatchers.{any, eq => meq}
@@ -142,7 +141,7 @@ class MLEventsSuite
val expected = Seq(
event0, event1, event2, event3, event4, event5, event6, event7, event8, event9)
- eventually(timeout(10 seconds), interval(1 second)) {
+ eventually(timeout(10.seconds), interval(1.second)) {
assert(events === expected)
}
// Test if they can be ser/de via JSON protocol.
@@ -200,7 +199,7 @@ class MLEventsSuite
event7.output = output
val expected = Seq(event0, event1, event2, event3, event4, event5, event6, event7)
- eventually(timeout(10 seconds), interval(1 second)) {
+ eventually(timeout(10.seconds), interval(1.second)) {
assert(events === expected)
}
// Test if they can be ser/de via JSON protocol.
@@ -221,7 +220,7 @@ class MLEventsSuite
val pipelineWriter = newPipeline.write
assert(events.isEmpty)
pipelineWriter.save(path)
- eventually(timeout(10 seconds), interval(1 second)) {
+ eventually(timeout(10.seconds), interval(1.second)) {
events.foreach {
case e: SaveInstanceStart if e.writer.isInstanceOf[DefaultParamsWriter] =>
assert(e.path.endsWith("writableStage"))
@@ -245,18 +244,18 @@ class MLEventsSuite
val pipelineReader = Pipeline.read
assert(events.isEmpty)
pipelineReader.load(path)
- eventually(timeout(10 seconds), interval(1 second)) {
+ eventually(timeout(10.seconds), interval(1.second)) {
events.foreach {
- case e: LoadInstanceStart[PipelineStage]
- if e.reader.isInstanceOf[DefaultParamsReader[PipelineStage]] =>
+ case e: LoadInstanceStart[_]
+ if e.reader.isInstanceOf[DefaultParamsReader[_]] =>
assert(e.path.endsWith("writableStage"))
- case e: LoadInstanceEnd[PipelineStage]
- if e.reader.isInstanceOf[DefaultParamsReader[PipelineStage]] =>
+ case e: LoadInstanceEnd[_]
+ if e.reader.isInstanceOf[DefaultParamsReader[_]] =>
assert(e.instance.isInstanceOf[PipelineStage])
- case e: LoadInstanceStart[Pipeline] =>
+ case e: LoadInstanceStart[_] =>
assert(e.reader === pipelineReader)
- case e: LoadInstanceEnd[Pipeline] =>
- assert(e.instance.uid === newPipeline.uid)
+ case e: LoadInstanceEnd[_] =>
+ assert(e.instance.asInstanceOf[Pipeline].uid === newPipeline.uid)
case e => fail(s"Unexpected event thrown: $e")
}
}
@@ -280,7 +279,7 @@ class MLEventsSuite
val pipelineWriter = pipelineModel.write
assert(events.isEmpty)
pipelineWriter.save(path)
- eventually(timeout(10 seconds), interval(1 second)) {
+ eventually(timeout(10.seconds), interval(1.second)) {
events.foreach {
case e: SaveInstanceStart if e.writer.isInstanceOf[DefaultParamsWriter] =>
assert(e.path.endsWith("writableStage"))
@@ -304,18 +303,18 @@ class MLEventsSuite
val pipelineModelReader = PipelineModel.read
assert(events.isEmpty)
pipelineModelReader.load(path)
- eventually(timeout(10 seconds), interval(1 second)) {
+ eventually(timeout(10.seconds), interval(1.second)) {
events.foreach {
- case e: LoadInstanceStart[PipelineStage]
- if e.reader.isInstanceOf[DefaultParamsReader[PipelineStage]] =>
+ case e: LoadInstanceStart[_]
+ if e.reader.isInstanceOf[DefaultParamsReader[_]] =>
assert(e.path.endsWith("writableStage"))
- case e: LoadInstanceEnd[PipelineStage]
- if e.reader.isInstanceOf[DefaultParamsReader[PipelineStage]] =>
+ case e: LoadInstanceEnd[_]
+ if e.reader.isInstanceOf[DefaultParamsReader[_]] =>
assert(e.instance.isInstanceOf[PipelineStage])
- case e: LoadInstanceStart[PipelineModel] =>
+ case e: LoadInstanceStart[_] =>
assert(e.reader === pipelineModelReader)
- case e: LoadInstanceEnd[PipelineModel] =>
- assert(e.instance.uid === pipelineModel.uid)
+ case e: LoadInstanceEnd[_] =>
+ assert(e.instance.asInstanceOf[PipelineModel].uid === pipelineModel.uid)
case e => fail(s"Unexpected event thrown: $e")
}
}
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
index b9aeb1c..384a5f4 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
@@ -24,10 +24,8 @@ import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
import scala.concurrent.duration._
-import scala.language.postfixOps
import com.google.common.io.Files
-import org.apache.commons.lang3.SerializationUtils
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.server.MiniYARNCluster
import org.scalatest.{BeforeAndAfterAll, Matchers}
@@ -169,7 +167,7 @@ abstract class BaseYarnClusterSuite
val handle = launcher.startApplication()
try {
- eventually(timeout(2 minutes), interval(1 second)) {
+ eventually(timeout(2.minutes), interval(1.second)) {
assert(handle.getState().isFinal())
}
} finally {
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index bb75952..b072202 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -18,18 +18,14 @@
package org.apache.spark.deploy.yarn
import java.io.File
-import java.net.URL
import java.nio.charset.StandardCharsets
import java.util.{HashMap => JHashMap}
import scala.collection.mutable
import scala.concurrent.duration._
import scala.io.Source
-import scala.language.postfixOps
import com.google.common.io.{ByteStreams, Files}
-import org.apache.hadoop.HadoopIllegalArgumentException
-import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.util.ConverterUtils
import org.scalatest.Matchers
@@ -209,7 +205,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
.startApplication()
try {
- eventually(timeout(30 seconds), interval(100 millis)) {
+ eventually(timeout(30.seconds), interval(100.milliseconds)) {
handle.getState() should be (SparkAppHandle.State.RUNNING)
}
@@ -217,7 +213,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
handle.getAppId() should startWith ("application_")
handle.stop()
- eventually(timeout(30 seconds), interval(100 millis)) {
+ eventually(timeout(30.seconds), interval(100.milliseconds)) {
handle.getState() should be (SparkAppHandle.State.KILLED)
}
} finally {
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
index 756e6d4..381a935 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
@@ -24,7 +24,6 @@ import java.util.EnumSet
import scala.annotation.tailrec
import scala.concurrent.duration._
-import scala.language.postfixOps
import org.apache.hadoop.fs.Path
import org.apache.hadoop.service.ServiceStateException
@@ -327,10 +326,10 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
recoveryPath.toString should be (new Path(execStateFile2.getParentFile.toURI).toString)
recoveryPath.toString should be (new Path(secretsFile2.getParentFile.toURI).toString)
- eventually(timeout(10 seconds), interval(5 millis)) {
+ eventually(timeout(10.seconds), interval(5.milliseconds)) {
assert(!execStateFile.exists())
}
- eventually(timeout(10 seconds), interval(5 millis)) {
+ eventually(timeout(10.seconds), interval(5.milliseconds)) {
assert(!secretsFile.exists())
}
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleBatchTable.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleBatchTable.java
index 9b0eb61..64663d5 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleBatchTable.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleBatchTable.java
@@ -17,17 +17,13 @@
package test.org.apache.spark.sql.sources.v2;
-import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.sources.v2.SupportsRead;
import org.apache.spark.sql.sources.v2.Table;
import org.apache.spark.sql.sources.v2.TableCapability;
-import org.apache.spark.sql.sources.v2.reader.*;
import org.apache.spark.sql.types.StructType;
abstract class JavaSimpleBatchTable implements Table, SupportsRead {
@@ -52,53 +48,3 @@ abstract class JavaSimpleBatchTable implements Table, SupportsRead {
}
}
-abstract class JavaSimpleScanBuilder implements ScanBuilder, Scan, Batch {
-
- @Override
- public Scan build() {
- return this;
- }
-
- @Override
- public Batch toBatch() {
- return this;
- }
-
- @Override
- public StructType readSchema() {
- return new StructType().add("i", "int").add("j", "int");
- }
-
- @Override
- public PartitionReaderFactory createReaderFactory() {
- return new JavaSimpleReaderFactory();
- }
-}
-
-class JavaSimpleReaderFactory implements PartitionReaderFactory {
-
- @Override
- public PartitionReader<InternalRow> createReader(InputPartition partition) {
- JavaRangeInputPartition p = (JavaRangeInputPartition) partition;
- return new PartitionReader<InternalRow>() {
- private int current = p.start - 1;
-
- @Override
- public boolean next() throws IOException {
- current += 1;
- return current < p.end;
- }
-
- @Override
- public InternalRow get() {
- return new GenericInternalRow(new Object[] {current, -current});
- }
-
- @Override
- public void close() throws IOException {
-
- }
- };
- }
-}
-
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleBatchTable.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleReaderFactory.java
similarity index 51%
copy from sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleBatchTable.java
copy to sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleReaderFactory.java
index 9b0eb61..7402790 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleBatchTable.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleReaderFactory.java
@@ -17,63 +17,11 @@
package test.org.apache.spark.sql.sources.v2;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
-import org.apache.spark.sql.sources.v2.SupportsRead;
-import org.apache.spark.sql.sources.v2.Table;
-import org.apache.spark.sql.sources.v2.TableCapability;
-import org.apache.spark.sql.sources.v2.reader.*;
-import org.apache.spark.sql.types.StructType;
-
-abstract class JavaSimpleBatchTable implements Table, SupportsRead {
- private static final Set<TableCapability> CAPABILITIES = new HashSet<>(Arrays.asList(
- TableCapability.BATCH_READ,
- TableCapability.BATCH_WRITE,
- TableCapability.TRUNCATE));
-
- @Override
- public StructType schema() {
- return new StructType().add("i", "int").add("j", "int");
- }
-
- @Override
- public String name() {
- return this.getClass().toString();
- }
-
- @Override
- public Set<TableCapability> capabilities() {
- return CAPABILITIES;
- }
-}
-
-abstract class JavaSimpleScanBuilder implements ScanBuilder, Scan, Batch {
-
- @Override
- public Scan build() {
- return this;
- }
-
- @Override
- public Batch toBatch() {
- return this;
- }
-
- @Override
- public StructType readSchema() {
- return new StructType().add("i", "int").add("j", "int");
- }
-
- @Override
- public PartitionReaderFactory createReaderFactory() {
- return new JavaSimpleReaderFactory();
- }
-}
+import org.apache.spark.sql.sources.v2.reader.InputPartition;
+import org.apache.spark.sql.sources.v2.reader.PartitionReader;
+import org.apache.spark.sql.sources.v2.reader.PartitionReaderFactory;
class JavaSimpleReaderFactory implements PartitionReaderFactory {
@@ -84,7 +32,7 @@ class JavaSimpleReaderFactory implements PartitionReaderFactory {
private int current = p.start - 1;
@Override
- public boolean next() throws IOException {
+ public boolean next() {
current += 1;
return current < p.end;
}
@@ -95,10 +43,8 @@ class JavaSimpleReaderFactory implements PartitionReaderFactory {
}
@Override
- public void close() throws IOException {
-
+ public void close() {
}
};
}
}
-
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleScanBuilder.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleScanBuilder.java
new file mode 100644
index 0000000..217e669
--- /dev/null
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleScanBuilder.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.sql.sources.v2.reader.Batch;
+import org.apache.spark.sql.sources.v2.reader.PartitionReaderFactory;
+import org.apache.spark.sql.sources.v2.reader.Scan;
+import org.apache.spark.sql.sources.v2.reader.ScanBuilder;
+import org.apache.spark.sql.types.StructType;
+
+abstract class JavaSimpleScanBuilder implements ScanBuilder, Scan, Batch {
+
+ @Override
+ public Scan build() {
+ return this;
+ }
+
+ @Override
+ public Batch toBatch() {
+ return this;
+ }
+
+ @Override
+ public StructType readSchema() {
+ return new StructType().add("i", "int").add("j", "int");
+ }
+
+ @Override
+ public PartitionReaderFactory createReaderFactory() {
+ return new JavaSimpleReaderFactory();
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 92157d8..76350ad 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql
import scala.collection.mutable.HashSet
import scala.concurrent.duration._
-import scala.language.postfixOps
import org.apache.spark.CleanerListener
import org.apache.spark.executor.DataReadMethod._
@@ -251,7 +250,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
sql("UNCACHE TABLE testData")
assert(!spark.catalog.isCached("testData"), "Table 'testData' should not be cached")
- eventually(timeout(10 seconds)) {
+ eventually(timeout(10.seconds)) {
assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
}
}
@@ -267,7 +266,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
"Eagerly cached in-memory table should have already been materialized")
uncacheTable("testCacheTable")
- eventually(timeout(10 seconds)) {
+ eventually(timeout(10.seconds)) {
assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
}
}
@@ -284,7 +283,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
"Eagerly cached in-memory table should have already been materialized")
uncacheTable("testCacheTable")
- eventually(timeout(10 seconds)) {
+ eventually(timeout(10.seconds)) {
assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
}
}
@@ -305,7 +304,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
"Lazily cached in-memory table should have been materialized")
uncacheTable("testData")
- eventually(timeout(10 seconds)) {
+ eventually(timeout(10.seconds)) {
assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
}
}
@@ -446,7 +445,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
System.gc()
- eventually(timeout(10 seconds)) {
+ eventually(timeout(10.seconds)) {
assert(toBeCleanedAccIds.synchronized { toBeCleanedAccIds.isEmpty },
"batchStats accumulators should be cleared after GC when uncacheTable")
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
index b3b2cdc..b828b23 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
@@ -136,7 +136,7 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits
assertCached(df2)
// udf has been evaluated during caching, and thus should not be re-evaluated here
- failAfter(2 seconds) {
+ failAfter(2.seconds) {
df2.collect()
}
@@ -197,7 +197,7 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits
val df4 = df1.groupBy('a).agg(sum('b)).agg(sum("sum(b)"))
assertCached(df4)
// reuse loaded cache
- failAfter(3 seconds) {
+ failAfter(3.seconds) {
checkDataset(df4, Row(10))
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
index a5ba4f9..22f8437 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
@@ -308,7 +308,7 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before
val offsets = scala.collection.mutable.ListBuffer[Int]()
val readerFactory = stream.createContinuousReaderFactory()
import org.scalatest.time.SpanSugar._
- failAfter(5 seconds) {
+ failAfter(5.seconds) {
// inject rows, read and check the data and offsets
for (i <- 0 until numRecords) {
serverThread.enqueue(i.toString)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
index 3dd3210..2a1e7d6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
@@ -41,7 +41,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext {
assert(coordinatorRef.getLocation(id) === None)
coordinatorRef.reportActiveInstance(id, "hostX", "exec1")
- eventually(timeout(5 seconds)) {
+ eventually(timeout(5.seconds)) {
assert(coordinatorRef.verifyIfInstanceActive(id, "exec1"))
assert(
coordinatorRef.getLocation(id) ===
@@ -50,7 +50,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext {
coordinatorRef.reportActiveInstance(id, "hostX", "exec2")
- eventually(timeout(5 seconds)) {
+ eventually(timeout(5.seconds)) {
assert(coordinatorRef.verifyIfInstanceActive(id, "exec1") === false)
assert(coordinatorRef.verifyIfInstanceActive(id, "exec2"))
@@ -75,7 +75,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext {
coordinatorRef.reportActiveInstance(id2, host, exec)
coordinatorRef.reportActiveInstance(id3, host, exec)
- eventually(timeout(5 seconds)) {
+ eventually(timeout(5.seconds)) {
assert(coordinatorRef.verifyIfInstanceActive(id1, exec))
assert(coordinatorRef.verifyIfInstanceActive(id2, exec))
assert(coordinatorRef.verifyIfInstanceActive(id3, exec))
@@ -107,7 +107,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext {
coordRef1.reportActiveInstance(id, "hostX", "exec1")
- eventually(timeout(5 seconds)) {
+ eventually(timeout(5.seconds)) {
assert(coordRef2.verifyIfInstanceActive(id, "exec1"))
assert(
coordRef2.getLocation(id) ===
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 12bc68c..af4369d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -416,7 +416,7 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
}
}
- val timeoutDuration = 60 seconds
+ val timeoutDuration = 1.minute
quietly {
withSpark(new SparkContext(conf)) { sc =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
index 13b8866..b421809 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
@@ -93,7 +93,7 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
testAwaitAnyTermination(ExpectBlocked)
// Stop a query asynchronously and see if it is reported through awaitAnyTermination
- val q1 = stopRandomQueryAsync(stopAfter = 100 milliseconds, withError = false)
+ val q1 = stopRandomQueryAsync(stopAfter = 100.milliseconds, withError = false)
testAwaitAnyTermination(ExpectNotBlocked)
require(!q1.isActive) // should be inactive by the time the prev awaitAnyTerm returned
@@ -106,7 +106,7 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
// Terminate a query asynchronously with exception and see awaitAnyTermination throws
// the exception
- val q2 = stopRandomQueryAsync(100 milliseconds, withError = true)
+ val q2 = stopRandomQueryAsync(100.milliseconds, withError = true)
testAwaitAnyTermination(ExpectException[SparkException])
require(!q2.isActive) // should be inactive by the time the prev awaitAnyTerm returned
@@ -119,10 +119,10 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
// Terminate multiple queries, one with failure and see whether awaitAnyTermination throws
// the exception
- val q3 = stopRandomQueryAsync(10 milliseconds, withError = false)
+ val q3 = stopRandomQueryAsync(10.milliseconds, withError = false)
testAwaitAnyTermination(ExpectNotBlocked)
require(!q3.isActive)
- val q4 = stopRandomQueryAsync(10 milliseconds, withError = true)
+ val q4 = stopRandomQueryAsync(10.milliseconds, withError = true)
eventually(Timeout(streamingTimeout)) { require(!q4.isActive) }
// After q4 terminates with exception, awaitAnyTerm should start throwing exception
testAwaitAnyTermination(ExpectException[SparkException])
@@ -138,81 +138,81 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
// awaitAnyTermination should be blocking or non-blocking depending on timeout values
testAwaitAnyTermination(
ExpectBlocked,
- awaitTimeout = 4 seconds,
+ awaitTimeout = 4.seconds,
expectedReturnedValue = false,
- testBehaviorFor = 2 seconds)
+ testBehaviorFor = 2.seconds)
testAwaitAnyTermination(
ExpectNotBlocked,
- awaitTimeout = 50 milliseconds,
+ awaitTimeout = 50.milliseconds,
expectedReturnedValue = false,
- testBehaviorFor = 1 second)
+ testBehaviorFor = 1.second)
// Stop a query asynchronously within timeout and awaitAnyTerm should be unblocked
- val q1 = stopRandomQueryAsync(stopAfter = 100 milliseconds, withError = false)
+ val q1 = stopRandomQueryAsync(stopAfter = 100.milliseconds, withError = false)
testAwaitAnyTermination(
ExpectNotBlocked,
- awaitTimeout = 2 seconds,
+ awaitTimeout = 2.seconds,
expectedReturnedValue = true,
- testBehaviorFor = 4 seconds)
+ testBehaviorFor = 4.seconds)
require(!q1.isActive) // should be inactive by the time the prev awaitAnyTerm returned
// All subsequent calls to awaitAnyTermination should be non-blocking even if timeout is high
testAwaitAnyTermination(
- ExpectNotBlocked, awaitTimeout = 4 seconds, expectedReturnedValue = true)
+ ExpectNotBlocked, awaitTimeout = 4.seconds, expectedReturnedValue = true)
// Resetting termination should make awaitAnyTermination() blocking again
spark.streams.resetTerminated()
testAwaitAnyTermination(
ExpectBlocked,
- awaitTimeout = 4 seconds,
+ awaitTimeout = 4.seconds,
expectedReturnedValue = false,
- testBehaviorFor = 1 second)
+ testBehaviorFor = 1.second)
// Terminate a query asynchronously with exception within timeout, awaitAnyTermination should
// throws the exception
- val q2 = stopRandomQueryAsync(100 milliseconds, withError = true)
+ val q2 = stopRandomQueryAsync(100.milliseconds, withError = true)
testAwaitAnyTermination(
ExpectException[SparkException],
- awaitTimeout = 4 seconds,
- testBehaviorFor = 6 seconds)
+ awaitTimeout = 4.seconds,
+ testBehaviorFor = 6.seconds)
require(!q2.isActive) // should be inactive by the time the prev awaitAnyTerm returned
// All subsequent calls to awaitAnyTermination should throw the exception
testAwaitAnyTermination(
ExpectException[SparkException],
- awaitTimeout = 2 seconds,
- testBehaviorFor = 4 seconds)
+ awaitTimeout = 2.seconds,
+ testBehaviorFor = 4.seconds)
// Terminate a query asynchronously outside the timeout, awaitAnyTerm should be blocked
spark.streams.resetTerminated()
- val q3 = stopRandomQueryAsync(2 seconds, withError = true)
+ val q3 = stopRandomQueryAsync(2.seconds, withError = true)
testAwaitAnyTermination(
ExpectNotBlocked,
- awaitTimeout = 100 milliseconds,
+ awaitTimeout = 100.milliseconds,
expectedReturnedValue = false,
- testBehaviorFor = 4 seconds)
+ testBehaviorFor = 4.seconds)
// After that query is stopped, awaitAnyTerm should throw exception
eventually(Timeout(streamingTimeout)) { require(!q3.isActive) } // wait for query to stop
testAwaitAnyTermination(
ExpectException[SparkException],
- awaitTimeout = 100 milliseconds,
- testBehaviorFor = 4 seconds)
+ awaitTimeout = 100.milliseconds,
+ testBehaviorFor = 4.seconds)
// Terminate multiple queries, one with failure and see whether awaitAnyTermination throws
// the exception
spark.streams.resetTerminated()
- val q4 = stopRandomQueryAsync(10 milliseconds, withError = false)
+ val q4 = stopRandomQueryAsync(10.milliseconds, withError = false)
testAwaitAnyTermination(
- ExpectNotBlocked, awaitTimeout = 2 seconds, expectedReturnedValue = true)
+ ExpectNotBlocked, awaitTimeout = 2.seconds, expectedReturnedValue = true)
require(!q4.isActive)
- val q5 = stopRandomQueryAsync(10 milliseconds, withError = true)
+ val q5 = stopRandomQueryAsync(10.milliseconds, withError = true)
eventually(Timeout(streamingTimeout)) { require(!q5.isActive) }
// After q5 terminates with exception, awaitAnyTerm should start throwing exception
- testAwaitAnyTermination(ExpectException[SparkException], awaitTimeout = 2 seconds)
+ testAwaitAnyTermination(ExpectException[SparkException], awaitTimeout = 2.seconds)
}
}
@@ -276,7 +276,7 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
expectedBehavior: ExpectedBehavior,
expectedReturnedValue: Boolean = false,
awaitTimeout: Span = null,
- testBehaviorFor: Span = 4 seconds
+ testBehaviorFor: Span = 4.seconds
): Unit = {
def awaitTermFunc(): Unit = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
index 2f460b0..e784d31 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.streaming
import java.util.UUID
import scala.collection.JavaConverters._
-import scala.language.postfixOps
import org.json4s._
import org.json4s.jackson.JsonMethods._
@@ -203,7 +202,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
val progress = query.lastProgress
assert(progress.stateOperators.length > 0)
// Should emit new progresses every 10 ms, but we could be facing a slow Jenkins
- eventually(timeout(1 minute)) {
+ eventually(timeout(1.minute)) {
val nextProgress = query.lastProgress
assert(nextProgress.timestamp !== progress.timestamp)
assert(nextProgress.numInputRows === 0)
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala
index fef18f1..47cf4f10 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala
@@ -86,12 +86,12 @@ class UISeleniumSuite
queries.foreach(statement.execute)
- eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ eventually(timeout(10.seconds), interval(50.milliseconds)) {
go to baseURL
find(cssSelector("""ul li a[href*="sql"]""")) should not be None
}
- eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ eventually(timeout(10.seconds), interval(50.milliseconds)) {
go to (baseURL + "/sql")
find(id("sessionstat")) should not be None
find(id("sqlstat")) should not be None
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index 80c0795..f8ddabd 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -19,7 +19,7 @@ package org.apache.spark.streaming.receiver
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
-import scala.language.{existentials, postfixOps}
+import scala.language.existentials
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index f0161e1..21f3bbe 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -25,7 +25,6 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.collection.parallel.ExecutionContextTaskSupport
import scala.concurrent.{Await, ExecutionContext, Future}
-import scala.language.postfixOps
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
@@ -142,7 +141,7 @@ private[streaming] class FileBasedWriteAheadLog(
CompletionIterator[ByteBuffer, Iterator[ByteBuffer]](reader, () => reader.close())
}
if (!closeFileAfterWrite) {
- logFilesToRead.iterator.map(readFile).flatten.asJava
+ logFilesToRead.iterator.flatMap(readFile).asJava
} else {
// For performance gains, it makes sense to parallelize the recovery if
// closeFileAfterWrite = true
@@ -190,7 +189,7 @@ private[streaming] class FileBasedWriteAheadLog(
if (waitForCompletion) {
import scala.concurrent.duration._
// scalastyle:off awaitready
- Await.ready(f, 1 second)
+ Await.ready(f, 1.second)
// scalastyle:on awaitready
}
} catch {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 19da181..55fdd4c 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -162,12 +162,12 @@ trait DStreamCheckpointTester { self: SparkFunSuite =>
val outputStream = getTestOutputStream[V](ssc.graph.getOutputStreams())
- eventually(timeout(10 seconds)) {
+ eventually(timeout(10.seconds)) {
ssc.awaitTerminationOrTimeout(10)
assert(batchCounter.getLastCompletedBatchTime === targetBatchTime)
}
- eventually(timeout(10 seconds)) {
+ eventually(timeout(10.seconds)) {
val checkpointFilesOfLatestTime = Checkpoint.getCheckpointFiles(checkpointDir).filter {
_.getName.contains(clock.getTimeMillis.toString)
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index f0960d0..1506061 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -22,7 +22,6 @@ import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
-import scala.language.postfixOps
import scala.reflect.ClassTag
import org.apache.hadoop.conf.Configuration
@@ -199,7 +198,7 @@ abstract class BaseReceivedBlockHandlerSuite(enableEncryption: Boolean)
val cleanupThreshTime = 3000L
handler.cleanupOldBlocks(cleanupThreshTime)
- eventually(timeout(10000 millis), interval(10 millis)) {
+ eventually(timeout(10.seconds), interval(10.milliseconds)) {
getWriteAheadLogFiles().size should be < preCleanupLogFiles.size
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index 8800f1c..0b15f00 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
-import scala.language.{implicitConversions, postfixOps}
+import scala.language.implicitConversions
import scala.util.Random
import org.apache.hadoop.conf.Configuration
@@ -276,7 +276,7 @@ class ReceivedBlockTrackerSuite
getWrittenLogData(getWriteAheadLogFiles().last) should contain(createBatchCleanup(batchTime1))
// Verify that at least one log file gets deleted
- eventually(timeout(10 seconds), interval(10 millisecond)) {
+ eventually(timeout(10.seconds), interval(10.millisecond)) {
getWriteAheadLogFiles() should not contain oldestLogFile
}
printLogFiles("After clean")
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
index 130e981..48aa9e5 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
@@ -64,7 +64,7 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable {
// Verify that the receiver
intercept[Exception] {
- failAfter(200 millis) {
+ failAfter(200.milliseconds) {
executingThread.join()
}
}
@@ -78,7 +78,7 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable {
assert(receiver.isStarted)
assert(!receiver.isStopped())
assert(receiver.otherThread.isAlive)
- eventually(timeout(100 millis), interval(10 millis)) {
+ eventually(timeout(100.milliseconds), interval(10.milliseconds)) {
assert(receiver.receiving)
}
@@ -107,12 +107,12 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable {
// Verify restarting actually stops and starts the receiver
receiver.restart("restarting", null, 600)
- eventually(timeout(300 millis), interval(10 millis)) {
+ eventually(timeout(300.milliseconds), interval(10.milliseconds)) {
// receiver will be stopped async
assert(receiver.isStopped)
assert(receiver.onStopCalled)
}
- eventually(timeout(1000 millis), interval(10 millis)) {
+ eventually(timeout(1.second), interval(10.milliseconds)) {
// receiver will be started async
assert(receiver.onStartCalled)
assert(executor.isReceiverStarted)
@@ -122,7 +122,7 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable {
}
// Verify that stopping actually stops the thread
- failAfter(100 millis) {
+ failAfter(100.milliseconds) {
receiver.stop("test")
assert(receiver.isStopped)
assert(!receiver.otherThread.isAlive)
@@ -159,7 +159,7 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable {
val recordedBlocks = blockGeneratorListener.arrayBuffers
val recordedData = recordedBlocks.flatten
- assert(blockGeneratorListener.arrayBuffers.size > 0, "No blocks received")
+ assert(blockGeneratorListener.arrayBuffers.nonEmpty, "No blocks received")
assert(recordedData.toSet === generatedData.toSet, "Received data not same")
// recordedData size should be close to the expected rate; use an error margin proportional to
@@ -245,15 +245,15 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits with Serializable {
// Run until sufficient WAL files have been generated and
// the first WAL files has been deleted
- eventually(timeout(20 seconds), interval(batchDuration.milliseconds millis)) {
+ eventually(timeout(20.seconds), interval(batchDuration.milliseconds.millis)) {
val (logFiles1, logFiles2) = getBothCurrentLogFiles()
allLogFiles1 ++= logFiles1
allLogFiles2 ++= logFiles2
- if (allLogFiles1.size > 0) {
- assert(!logFiles1.contains(allLogFiles1.toSeq.sorted.head))
+ if (allLogFiles1.nonEmpty) {
+ assert(!logFiles1.contains(allLogFiles1.toSeq.min))
}
- if (allLogFiles2.size > 0) {
- assert(!logFiles2.contains(allLogFiles2.toSeq.sorted.head))
+ if (allLogFiles2.nonEmpty) {
+ assert(!logFiles2.contains(allLogFiles2.toSeq.min))
}
assert(allLogFiles1.size >= 7)
assert(allLogFiles2.size >= 7)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 8d07210..5cda6f9 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -211,7 +211,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
// Local props set after start should be ignored
ssc.sc.setLocalProperty("customPropKey", "value2")
- eventually(timeout(10 seconds), interval(10 milliseconds)) {
+ eventually(timeout(10.seconds), interval(10.milliseconds)) {
assert(allFound)
}
@@ -342,7 +342,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
input.foreachRDD(_ => {})
ssc.start()
// Call `ssc.stop` at once so that it's possible that the receiver will miss "StopReceiver"
- failAfter(30000 millis) {
+ failAfter(30.seconds) {
ssc.stop(stopSparkContext = true, stopGracefully = true)
}
}
@@ -398,18 +398,18 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
inputStream.map(x => x).register()
// test whether start() blocks indefinitely or not
- failAfter(2000 millis) {
+ failAfter(2.seconds) {
ssc.start()
}
// test whether awaitTermination() exits after give amount of time
- failAfter(1000 millis) {
+ failAfter(1.second) {
ssc.awaitTerminationOrTimeout(500)
}
// test whether awaitTermination() does not exit if not time is given
val exception = intercept[Exception] {
- failAfter(1000 millis) {
+ failAfter(1.second) {
ssc.awaitTermination()
throw new Exception("Did not wait for stop")
}
@@ -418,7 +418,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
var t: Thread = null
// test whether wait exits if context is stopped
- failAfter(10000 millis) { // 10 seconds because spark takes a long time to shutdown
+ failAfter(10.seconds) { // 10 seconds because spark takes a long time to shutdown
t = new Thread() {
override def run() {
Thread.sleep(500)
@@ -439,7 +439,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
val inputStream = addInputStream(ssc)
inputStream.map(x => x).register()
- failAfter(10000 millis) {
+ failAfter(10.seconds) {
ssc.start()
ssc.stop()
ssc.awaitTermination()
@@ -479,13 +479,13 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
ssc.start()
// test whether awaitTerminationOrTimeout() return false after give amount of time
- failAfter(1000 millis) {
+ failAfter(1.second) {
assert(ssc.awaitTerminationOrTimeout(500) === false)
}
var t: Thread = null
// test whether awaitTerminationOrTimeout() return true if context is stopped
- failAfter(10000 millis) { // 10 seconds because spark takes a long time to shutdown
+ failAfter(10.seconds) { // 10 seconds because spark takes a long time to shutdown
t = new Thread() {
override def run() {
Thread.sleep(500)
@@ -528,7 +528,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
// getOrCreate should create new context with empty path
testGetOrCreate {
- ssc = StreamingContext.getOrCreate(emptyPath, creatingFunction _)
+ ssc = StreamingContext.getOrCreate(emptyPath, () => creatingFunction())
assert(ssc != null, "no context created")
assert(newContextCreated, "new context not created")
}
@@ -537,19 +537,19 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
// getOrCreate should throw exception with fake checkpoint file and createOnError = false
intercept[Exception] {
- ssc = StreamingContext.getOrCreate(corruptedCheckpointPath, creatingFunction _)
+ ssc = StreamingContext.getOrCreate(corruptedCheckpointPath, () => creatingFunction())
}
// getOrCreate should throw exception with fake checkpoint file
intercept[Exception] {
ssc = StreamingContext.getOrCreate(
- corruptedCheckpointPath, creatingFunction _, createOnError = false)
+ corruptedCheckpointPath, () => creatingFunction(), createOnError = false)
}
// getOrCreate should create new context with fake checkpoint file and createOnError = true
testGetOrCreate {
ssc = StreamingContext.getOrCreate(
- corruptedCheckpointPath, creatingFunction _, createOnError = true)
+ corruptedCheckpointPath, () => creatingFunction(), createOnError = true)
assert(ssc != null, "no context created")
assert(newContextCreated, "new context not created")
}
@@ -558,7 +558,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
// getOrCreate should recover context with checkpoint path, and recover old configuration
testGetOrCreate {
- ssc = StreamingContext.getOrCreate(checkpointPath, creatingFunction _)
+ ssc = StreamingContext.getOrCreate(checkpointPath, () => creatingFunction())
assert(ssc != null, "no context created")
assert(!newContextCreated, "old context not recovered")
assert(ssc.conf.get("someKey") === "someValue", "checkpointed config not recovered")
@@ -567,7 +567,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
// getOrCreate should recover StreamingContext with existing SparkContext
testGetOrCreate {
sc = new SparkContext(conf)
- ssc = StreamingContext.getOrCreate(checkpointPath, creatingFunction _)
+ ssc = StreamingContext.getOrCreate(checkpointPath, () => creatingFunction())
assert(ssc != null, "no context created")
assert(!newContextCreated, "old context not recovered")
assert(!ssc.conf.contains("someKey"), "checkpointed config unexpectedly recovered")
@@ -669,41 +669,41 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
conf.clone.set("spark.streaming.clock", "org.apache.spark.util.ManualClock"), batchDuration)
addInputStream(ssc).register()
ssc.start()
- val returnedSsc = StreamingContext.getActiveOrCreate(checkpointPath, creatingFunction _)
+ val returnedSsc = StreamingContext.getActiveOrCreate(checkpointPath, () => creatingFunction())
assert(!newContextCreated, "new context created instead of returning")
assert(returnedSsc.eq(ssc), "returned context is not the activated context")
}
// getActiveOrCreate should create new context with empty path
testGetActiveOrCreate {
- ssc = StreamingContext.getActiveOrCreate(emptyPath, creatingFunction _)
+ ssc = StreamingContext.getActiveOrCreate(emptyPath, () => creatingFunction())
assert(ssc != null, "no context created")
assert(newContextCreated, "new context not created")
}
// getActiveOrCreate should throw exception with fake checkpoint file and createOnError = false
intercept[Exception] {
- ssc = StreamingContext.getOrCreate(corruptedCheckpointPath, creatingFunction _)
+ ssc = StreamingContext.getOrCreate(corruptedCheckpointPath, () => creatingFunction())
}
// getActiveOrCreate should throw exception with fake checkpoint file
intercept[Exception] {
ssc = StreamingContext.getActiveOrCreate(
- corruptedCheckpointPath, creatingFunction _, createOnError = false)
+ corruptedCheckpointPath, () => creatingFunction(), createOnError = false)
}
// getActiveOrCreate should create new context with fake
// checkpoint file and createOnError = true
testGetActiveOrCreate {
ssc = StreamingContext.getActiveOrCreate(
- corruptedCheckpointPath, creatingFunction _, createOnError = true)
+ corruptedCheckpointPath, () => creatingFunction(), createOnError = true)
assert(ssc != null, "no context created")
assert(newContextCreated, "new context not created")
}
// getActiveOrCreate should recover context with checkpoint path, and recover old configuration
testGetActiveOrCreate {
- ssc = StreamingContext.getActiveOrCreate(checkpointPath, creatingFunction _)
+ ssc = StreamingContext.getActiveOrCreate(checkpointPath, () => creatingFunction())
assert(ssc != null, "no context created")
assert(!newContextCreated, "old context not recovered")
assert(ssc.conf.get("someKey") === "someValue")
@@ -781,14 +781,14 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
_ssc.queueStream[Int](Queue(rdd)).register()
_ssc
}
- ssc = StreamingContext.getOrCreate(checkpointDirectory, creatingFunction _)
+ ssc = StreamingContext.getOrCreate(checkpointDirectory, () => creatingFunction())
ssc.start()
- eventually(timeout(10000 millis)) {
+ eventually(timeout(10.seconds)) {
assert(Checkpoint.getCheckpointFiles(checkpointDirectory).size > 1)
}
ssc.stop()
val e = intercept[SparkException] {
- ssc = StreamingContext.getOrCreate(checkpointDirectory, creatingFunction _)
+ ssc = StreamingContext.getOrCreate(checkpointDirectory, () => creatingFunction())
}
// StreamingContext.validate changes the message, so use "contains" here
assert(e.getCause.getMessage.contains("queueStream doesn't support checkpointing. " +
@@ -855,7 +855,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL
ssc.textFileStream(testDirectory).foreachRDD { rdd => rdd.count() }
ssc.start()
try {
- eventually(timeout(30000 millis)) {
+ eventually(timeout(30.seconds)) {
assert(Checkpoint.getCheckpointFiles(checkpointDirectory).size > 1)
}
} finally {
@@ -967,7 +967,7 @@ package object testPackage extends Assertions {
}
ssc.start()
- eventually(timeout(10000 millis), interval(10 millis)) {
+ eventually(timeout(10.seconds), interval(10.milliseconds)) {
assert(rddGenerated && rddCreationSiteCorrect, "RDD creation site was not correct")
assert(rddGenerated && foreachCallSiteCorrect, "Call site in foreachRDD was not correct")
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index 0f957a1..62fd433 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -130,7 +130,7 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
ssc.start()
try {
- eventually(timeout(30 seconds), interval(20 millis)) {
+ eventually(timeout(30.seconds), interval(20.milliseconds)) {
collector.startedReceiverStreamIds.size should equal (1)
collector.startedReceiverStreamIds.peek() should equal (0)
collector.stoppedReceiverStreamIds.size should equal (1)
@@ -157,7 +157,7 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
ssc.start()
try {
- eventually(timeout(30 seconds), interval(20 millis)) {
+ eventually(timeout(30.seconds), interval(20.milliseconds)) {
collector.startedOutputOperationIds.asScala.take(3) should be (Seq(0, 1, 2))
collector.completedOutputOperationIds.asScala.take(3) should be (Seq(0, 1, 2))
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
index 29e4513..483a751 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
@@ -97,12 +97,12 @@ class UISeleniumSuite
val sparkUI = ssc.sparkContext.ui.get
- eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ eventually(timeout(10.seconds), interval(50.milliseconds)) {
go to (sparkUI.webUrl.stripSuffix("/"))
find(cssSelector( """ul li a[href*="streaming"]""")) should not be (None)
}
- eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ eventually(timeout(10.seconds), interval(50.milliseconds)) {
// check whether streaming page exists
go to (sparkUI.webUrl.stripSuffix("/") + "/streaming")
val h3Text = findAll(cssSelector("h3")).map(_.text).toSeq
@@ -196,12 +196,12 @@ class UISeleniumSuite
ssc.stop(false)
- eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ eventually(timeout(10.seconds), interval(50.milliseconds)) {
go to (sparkUI.webUrl.stripSuffix("/"))
find(cssSelector( """ul li a[href*="streaming"]""")) should be(None)
}
- eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ eventually(timeout(10.seconds), interval(50.milliseconds)) {
go to (sparkUI.webUrl.stripSuffix("/") + "/streaming")
val h3Text = findAll(cssSelector("h3")).map(_.text).toSeq
h3Text should not contain("Streaming Statistics")
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala
index 7b839ae..4c0dd0d 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala
@@ -84,7 +84,7 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter with TimeLim
}
clock.advance(blockIntervalMs) // advance clock to generate blocks
withClue("blocks not generated or pushed") {
- eventually(timeout(1 second)) {
+ eventually(timeout(1.second)) {
assert(listener.onGenerateBlockCalled)
assert(listener.onPushBlockCalled)
}
@@ -100,7 +100,7 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter with TimeLim
listener.addedData.asScala.toSeq should contain theSameElementsInOrderAs (data2)
listener.addedMetadata.asScala.toSeq should contain theSameElementsInOrderAs (metadata2)
clock.advance(blockIntervalMs) // advance clock to generate blocks
- eventually(timeout(1 second)) {
+ eventually(timeout(1.second)) {
val combined = data1 ++ data2
listener.pushedData.asScala.toSeq should contain theSameElementsInOrderAs combined
}
@@ -112,7 +112,7 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter with TimeLim
val combinedMetadata = metadata2 :+ metadata3
listener.addedMetadata.asScala.toSeq should contain theSameElementsInOrderAs (combinedMetadata)
clock.advance(blockIntervalMs) // advance clock to generate blocks
- eventually(timeout(1 second)) {
+ eventually(timeout(1.second)) {
val combinedData = data1 ++ data2 ++ data3
listener.pushedData.asScala.toSeq should contain theSameElementsInOrderAs (combinedData)
}
@@ -120,7 +120,7 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter with TimeLim
// Stop the block generator by starting the stop on a different thread and
// then advancing the manual clock for the stopping to proceed.
val thread = stopBlockGenerator(blockGenerator)
- eventually(timeout(1 second), interval(10 milliseconds)) {
+ eventually(timeout(1.second), interval(10.milliseconds)) {
clock.advance(blockIntervalMs)
assert(blockGenerator.isStopped())
}
@@ -160,7 +160,7 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter with TimeLim
// - Finally, wait for all blocks to be pushed
clock.advance(1) // to make sure that the timer for another interval to complete
val thread = stopBlockGenerator(blockGenerator)
- eventually(timeout(1 second), interval(10 milliseconds)) {
+ eventually(timeout(1.second), interval(10.milliseconds)) {
assert(blockGenerator.isActive() === false)
}
assert(blockGenerator.isStopped() === false)
@@ -181,7 +181,7 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter with TimeLim
// (expected as stop() should never complete) or a SparkException (unexpected as stop()
// completed and thread terminated).
val exception = intercept[Exception] {
- failAfter(200 milliseconds) {
+ failAfter(200.milliseconds) {
thread.join()
throw new SparkException(
"BlockGenerator.stop() completed before generating timer was stopped")
@@ -193,7 +193,7 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter with TimeLim
// Verify that the final data is present in the final generated block and
// pushed before complete stop
assert(blockGenerator.isStopped() === false) // generator has not stopped yet
- eventually(timeout(10 seconds), interval(10 milliseconds)) {
+ eventually(timeout(10.seconds), interval(10.milliseconds)) {
// Keep calling `advance` to avoid blocking forever in `clock.waitTillTime`
clock.advance(blockIntervalMs)
assert(thread.isAlive === false)
@@ -213,7 +213,7 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter with TimeLim
blockGenerator.start()
assert(listener.onErrorCalled === false)
blockGenerator.addData(1)
- eventually(timeout(1 second), interval(10 milliseconds)) {
+ eventually(timeout(1.second), interval(10.milliseconds)) {
assert(listener.onErrorCalled)
}
blockGenerator.stop()
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
index 22d027d..d072b99 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
@@ -61,7 +61,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite
val expectedWaitTime = clock.getTimeMillis() + advancedTime
clock.advance(advancedTime)
// Make sure ExecutorAllocationManager.manageAllocation is called
- eventually(timeout(10 seconds)) {
+ eventually(timeout(10.seconds)) {
assert(clock.isStreamWaitingAt(expectedWaitTime))
}
body
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala
index 5f7f7fa..f0e5027 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/JobGeneratorSuite.scala
@@ -20,7 +20,6 @@ package org.apache.spark.streaming.scheduler
import java.util.concurrent.CountDownLatch
import scala.concurrent.duration._
-import scala.language.postfixOps
import org.scalatest.concurrent.Eventually._
@@ -69,10 +68,10 @@ class JobGeneratorSuite extends TestSuiteBase {
val longBatchNumber = 3 // 3rd batch will take a long time
val longBatchTime = longBatchNumber * batchDuration.milliseconds
- val testTimeout = timeout(10 seconds)
+ val testTimeout = timeout(10.seconds)
val inputStream = ssc.receiverStream(new TestReceiver)
- inputStream.foreachRDD((rdd: RDD[Int], time: Time) => {
+ inputStream.foreachRDD((_: RDD[Int], time: Time) => {
if (time.milliseconds == longBatchTime) {
while (waitLatch.getCount() > 0) {
waitLatch.await()
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
index c206d31..fec20f0 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
@@ -41,7 +41,7 @@ class ReceiverTrackerSuite extends TestSuiteBase {
try {
// we wait until the Receiver has registered with the tracker,
// otherwise our rate update is lost
- eventually(timeout(5 seconds)) {
+ eventually(timeout(5.seconds)) {
assert(RateTestReceiver.getActive().nonEmpty)
}
@@ -49,7 +49,7 @@ class ReceiverTrackerSuite extends TestSuiteBase {
// Verify that the rate of the block generator in the receiver get updated
val activeReceiver = RateTestReceiver.getActive().get
tracker.sendRateUpdate(inputDStream.id, newRateLimit)
- eventually(timeout(5 seconds)) {
+ eventually(timeout(5.seconds)) {
assert(activeReceiver.getDefaultBlockGeneratorRateLimit() === newRateLimit,
"default block generator did not receive rate update")
assert(activeReceiver.getCustomBlockGeneratorRateLimit() === newRateLimit,
@@ -76,7 +76,7 @@ class ReceiverTrackerSuite extends TestSuiteBase {
output.register()
ssc.start()
StoppableReceiver.shouldStop = true
- eventually(timeout(10 seconds), interval(10 millis)) {
+ eventually(timeout(10.seconds), interval(10.milliseconds)) {
// The receiver is stopped once, so if it's restarted, it should be started twice.
assert(startTimes === 2)
}
@@ -98,7 +98,7 @@ class ReceiverTrackerSuite extends TestSuiteBase {
val output = new TestOutputStream(input)
output.register()
ssc.start()
- eventually(timeout(10 seconds), interval(10 millis)) {
+ eventually(timeout(10.seconds), interval(10.milliseconds)) {
// If preferredLocations is set correctly, receiverTaskLocality should be PROCESS_LOCAL
assert(receiverTaskLocality === TaskLocality.PROCESS_LOCAL)
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index dc9305c..8d2fa7d 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -26,7 +26,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.concurrent._
import scala.concurrent.duration._
-import scala.language.{implicitConversions, postfixOps}
+import scala.language.implicitConversions
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
@@ -135,7 +135,7 @@ abstract class CommonWriteAheadLogTests(
if (waitForCompletion) {
assert(getLogFilesInDirectory(testDir).size < logFiles.size)
} else {
- eventually(Eventually.timeout(1 second), interval(10 milliseconds)) {
+ eventually(Eventually.timeout(1.second), interval(10.milliseconds)) {
assert(getLogFilesInDirectory(testDir).size < logFiles.size)
}
}
@@ -504,7 +504,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
// The queue.take() immediately takes the 3, and there is nothing left in the queue at that
// moment. Then the promise blocks the writing of 3. The rest get queued.
writeAsync(batchedWal, event1, 3L)
- eventually(timeout(1 second)) {
+ eventually(timeout(1.second)) {
assert(blockingWal.isBlocked)
assert(batchedWal.invokePrivate(queueLength()) === 0)
}
@@ -514,12 +514,12 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
// we would like event 5 to be written before event 4 in order to test that they get
// sorted before being aggregated
writeAsync(batchedWal, event5, 12L)
- eventually(timeout(1 second)) {
+ eventually(timeout(1.second)) {
assert(blockingWal.isBlocked)
assert(batchedWal.invokePrivate(queueLength()) === 3)
}
writeAsync(batchedWal, event4, 10L)
- eventually(timeout(1 second)) {
+ eventually(timeout(1.second)) {
assert(walBatchingThreadPool.getActiveCount === 5)
assert(batchedWal.invokePrivate(queueLength()) === 4)
}
@@ -528,7 +528,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
val buffer = wrapArrayArrayByte(Array(event1))
val queuedEvents = Set(event2, event3, event4, event5)
- eventually(timeout(1 second)) {
+ eventually(timeout(1.second)) {
assert(batchedWal.invokePrivate(queueLength()) === 0)
verify(wal, times(1)).write(meq(buffer), meq(3L))
// the file name should be the timestamp of the last record, as events should be naturally
@@ -559,7 +559,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
// The queue.take() immediately takes the 3, and there is nothing left in the queue at that
// moment. Then the promise blocks the writing of 3. The rest get queued.
val promise1 = writeAsync(batchedWal, event1, 3L)
- eventually(timeout(1 second)) {
+ eventually(timeout(1.second)) {
assert(blockingWal.isBlocked)
assert(batchedWal.invokePrivate(queueLength()) === 0)
}
@@ -567,7 +567,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
val promise2 = writeAsync(batchedWal, event2, 5L)
val promise3 = writeAsync(batchedWal, event3, 8L)
- eventually(timeout(1 second)) {
+ eventually(timeout(1.second)) {
assert(walBatchingThreadPool.getActiveCount === 3)
assert(blockingWal.isBlocked)
assert(batchedWal.invokePrivate(queueLength()) === 2) // event1 is being written
@@ -576,7 +576,7 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
val writePromises = Seq(promise1, promise2, promise3)
batchedWal.close()
- eventually(timeout(1 second)) {
+ eventually(timeout(1.second)) {
assert(writePromises.forall(_.isCompleted))
assert(writePromises.forall(_.future.value.get.isFailure)) // all should have failed
}
@@ -772,7 +772,7 @@ object WriteAheadLogSuite {
override def write(record: ByteBuffer, time: Long): WriteAheadLogRecordHandle = {
isWriteCalled = true
- eventually(Eventually.timeout(2 second)) {
+ eventually(Eventually.timeout(2.second)) {
assert(!blockWrite)
}
wal.write(record, time)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org