You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2019/04/29 16:17:03 UTC

[spark] branch master updated: [SPARK-27571][CORE][YARN][EXAMPLES] Avoid scala.language.reflectiveCalls

This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a6716d3  [SPARK-27571][CORE][YARN][EXAMPLES] Avoid scala.language.reflectiveCalls
a6716d3 is described below

commit a6716d3f033825aa8a7a153674f29a2e2b673273
Author: Sean Owen <se...@databricks.com>
AuthorDate: Mon Apr 29 11:16:45 2019 -0500

    [SPARK-27571][CORE][YARN][EXAMPLES] Avoid scala.language.reflectiveCalls
    
    ## What changes were proposed in this pull request?
    
    This PR avoids usage of reflective calls in Scala. It removes the import that suppresses the warnings and rewrites code in small ways to avoid accessing methods that aren't technically accessible.
    
    ## How was this patch tested?
    
    Existing tests.
    
    Closes #24463 from srowen/SPARK-27571.
    
    Authored-by: Sean Owen <se...@databricks.com>
    Signed-off-by: Sean Owen <se...@databricks.com>
---
 .../apache/spark/scheduler/DAGSchedulerSuite.scala | 83 +++++++++++-----------
 .../apache/spark/storage/MemoryStoreSuite.scala    | 13 ++--
 .../apache/spark/util/ClosureCleanerSuite.scala    |  2 -
 .../spark/examples/mllib/DecisionTreeRunner.scala  | 20 +++---
 .../mllib/GradientBoostedTreesRunner.scala         | 14 +++-
 .../cluster/mesos/MesosSchedulerUtilsSuite.scala   | 26 ++++---
 .../cluster/YarnSchedulerBackendSuite.scala        | 31 ++++----
 .../sql/execution/datasources/FileIndexSuite.scala | 30 ++++----
 .../streaming/StreamingQueryListenerSuite.scala    |  5 +-
 .../StreamingQueryListenersConfSuite.scala         |  2 -
 10 files changed, 109 insertions(+), 117 deletions(-)

diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index c8ae834..749e47c 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -23,7 +23,6 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
 
 import scala.annotation.meta.param
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
-import scala.language.reflectiveCalls
 import scala.util.control.NonFatal
 
 import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
@@ -175,13 +174,13 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
 
   /** Length of time to wait while draining listener events. */
   val WAIT_TIMEOUT_MILLIS = 10000
-  val sparkListener = new SparkListener() {
-    val submittedStageInfos = new HashSet[StageInfo]
-    val successfulStages = new HashSet[Int]
-    val failedStages = new ArrayBuffer[Int]
-    val stageByOrderOfExecution = new ArrayBuffer[Int]
-    val endedTasks = new HashSet[Long]
 
+  val submittedStageInfos = new HashSet[StageInfo]
+  val successfulStages = new HashSet[Int]
+  val failedStages = new ArrayBuffer[Int]
+  val stageByOrderOfExecution = new ArrayBuffer[Int]
+  val endedTasks = new HashSet[Long]
+  val sparkListener = new SparkListener() {
     override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) {
       submittedStageInfos += stageSubmitted.stageInfo
     }
@@ -249,10 +248,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
 
   private def init(testConf: SparkConf): Unit = {
     sc = new SparkContext("local[2]", "DAGSchedulerSuite", testConf)
-    sparkListener.submittedStageInfos.clear()
-    sparkListener.successfulStages.clear()
-    sparkListener.failedStages.clear()
-    sparkListener.endedTasks.clear()
+    submittedStageInfos.clear()
+    successfulStages.clear()
+    failedStages.clear()
+    endedTasks.clear()
     failure = null
     sc.addSparkListener(sparkListener)
     taskSets.clear()
@@ -375,11 +374,11 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
   }
 
   test("[SPARK-3353] parent stage should have lower stage id") {
-    sparkListener.stageByOrderOfExecution.clear()
+    stageByOrderOfExecution.clear()
     sc.parallelize(1 to 10).map(x => (x, x)).reduceByKey(_ + _, 4).count()
     sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
-    assert(sparkListener.stageByOrderOfExecution.length === 2)
-    assert(sparkListener.stageByOrderOfExecution(0) < sparkListener.stageByOrderOfExecution(1))
+    assert(stageByOrderOfExecution.length === 2)
+    assert(stageByOrderOfExecution(0) < stageByOrderOfExecution(1))
   }
 
   /**
@@ -621,8 +620,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
     assert(failure.getMessage.startsWith(
       "Job aborted due to stage failure: Task not serializable:"))
     sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
-    assert(sparkListener.failedStages.contains(0))
-    assert(sparkListener.failedStages.size === 1)
+    assert(failedStages.contains(0))
+    assert(failedStages.size === 1)
     assertDataStructuresEmpty()
   }
 
@@ -631,8 +630,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
     failed(taskSets(0), "some failure")
     assert(failure.getMessage === "Job aborted due to stage failure: some failure")
     sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
-    assert(sparkListener.failedStages.contains(0))
-    assert(sparkListener.failedStages.size === 1)
+    assert(failedStages.contains(0))
+    assert(failedStages.size === 1)
     assertDataStructuresEmpty()
   }
 
@@ -642,8 +641,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
     cancel(jobId)
     assert(failure.getMessage === s"Job $jobId cancelled ")
     sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
-    assert(sparkListener.failedStages.contains(0))
-    assert(sparkListener.failedStages.size === 1)
+    assert(failedStages.contains(0))
+    assert(failedStages.size === 1)
     assertDataStructuresEmpty()
   }
 
@@ -703,8 +702,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
     assertDataStructuresEmpty()
 
     sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
-    assert(sparkListener.failedStages.isEmpty)
-    assert(sparkListener.successfulStages.contains(0))
+    assert(failedStages.isEmpty)
+    assert(successfulStages.contains(0))
   }
 
   test("run trivial shuffle") {
@@ -1088,7 +1087,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
       FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"),
       null))
     sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
-    assert(sparkListener.failedStages.contains(1))
+    assert(failedStages.contains(1))
 
     // The second ResultTask fails, with a fetch failure for the output from the second mapper.
     runEvent(makeCompletionEvent(
@@ -1097,7 +1096,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
       null))
     // The SparkListener should not receive redundant failure events.
     sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
-    assert(sparkListener.failedStages.size == 1)
+    assert(failedStages.size == 1)
   }
 
   test("Retry all the tasks on a resubmitted attempt of a barrier stage caused by FetchFailure") {
@@ -1144,7 +1143,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
       taskSets(0).tasks(1),
       TaskKilled("test"),
       null))
-    assert(sparkListener.failedStages === Seq(0))
+    assert(failedStages === Seq(0))
     assert(mapOutputTracker.findMissingPartitions(shuffleId) === Some(Seq(0, 1)))
 
     scheduler.resubmitFailedStages()
@@ -1198,7 +1197,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
 
     val mapStageId = 0
     def countSubmittedMapStageAttempts(): Int = {
-      sparkListener.submittedStageInfos.count(_.stageId == mapStageId)
+      submittedStageInfos.count(_.stageId == mapStageId)
     }
 
     // The map stage should have been submitted.
@@ -1220,7 +1219,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
       FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"),
       null))
     sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
-    assert(sparkListener.failedStages.contains(1))
+    assert(failedStages.contains(1))
 
     // Trigger resubmission of the failed map stage.
     runEvent(ResubmitFailedStages)
@@ -1259,10 +1258,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
     submit(reduceRdd, Array(0, 1))
 
     def countSubmittedReduceStageAttempts(): Int = {
-      sparkListener.submittedStageInfos.count(_.stageId == 1)
+      submittedStageInfos.count(_.stageId == 1)
     }
     def countSubmittedMapStageAttempts(): Int = {
-      sparkListener.submittedStageInfos.count(_.stageId == 0)
+      submittedStageInfos.count(_.stageId == 0)
     }
 
     // The map stage should have been submitted.
@@ -1323,7 +1322,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
     sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
     // verify stage exists
     assert(scheduler.stageIdToStage.contains(0))
-    assert(sparkListener.endedTasks.size == 2)
+    assert(endedTasks.size == 2)
 
     // finish other 2 tasks
     runEvent(makeCompletionEvent(
@@ -1333,7 +1332,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
       taskSets(0).tasks(3), Success, 42,
       Seq.empty, createFakeTaskInfoWithId(3)))
     sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
-    assert(sparkListener.endedTasks.size == 4)
+    assert(endedTasks.size == 4)
 
     // verify the stage is done
     assert(!scheduler.stageIdToStage.contains(0))
@@ -1344,14 +1343,14 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
       taskSets(0).tasks(3), Success, 42,
       Seq.empty, createFakeTaskInfoWithId(5)))
     sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
-    assert(sparkListener.endedTasks.size == 5)
+    assert(endedTasks.size == 5)
 
     // make sure non successful tasks also send out event
     runEvent(makeCompletionEvent(
       taskSets(0).tasks(3), UnknownReason, 42,
       Seq.empty, createFakeTaskInfoWithId(6)))
     sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
-    assert(sparkListener.endedTasks.size == 6)
+    assert(endedTasks.size == 6)
   }
 
   test("ignore late map task completions") {
@@ -1425,7 +1424,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
     // Listener bus should get told about the map stage failing, but not the reduce stage
     // (since the reduce stage hasn't been started yet).
     sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
-    assert(sparkListener.failedStages.toSet === Set(0))
+    assert(failedStages.toSet === Set(0))
 
     assertDataStructuresEmpty()
   }
@@ -1669,8 +1668,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
 
     // Make sure the listeners got told about both failed stages.
     sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
-    assert(sparkListener.successfulStages.isEmpty)
-    assert(sparkListener.failedStages.toSet === Set(0, 2))
+    assert(successfulStages.isEmpty)
+    assert(failedStages.toSet === Set(0, 2))
 
     assert(listener1.failureMessage === s"Job aborted due to stage failure: $stageFailureMessage")
     assert(listener2.failureMessage === s"Job aborted due to stage failure: $stageFailureMessage")
@@ -2455,7 +2454,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
    *
    * A <------------s---------,
    *                           \
-   * B <--s-- C <--s-- D <--n---`-- E
+   * B <--s-- C <--s-- D <--n------ E
    *
    * Here, the direct shuffle dependency of C is just the shuffle dependency on B. The direct
    * shuffle dependencies of E are the shuffle dependency on A and the shuffle dependency on C.
@@ -2639,13 +2638,12 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
   test("Barrier task failures from the same stage attempt don't trigger multiple stage retries") {
     val shuffleMapRdd = new MyRDD(sc, 2, Nil).barrier().mapPartitions(iter => iter)
     val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
-    val shuffleId = shuffleDep.shuffleId
     val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker)
     submit(reduceRdd, Array(0, 1))
 
     val mapStageId = 0
     def countSubmittedMapStageAttempts(): Int = {
-      sparkListener.submittedStageInfos.count(_.stageId == mapStageId)
+      submittedStageInfos.count(_.stageId == mapStageId)
     }
 
     // The map stage should have been submitted.
@@ -2657,7 +2655,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
       taskSets(0).tasks(0),
       TaskKilled("test"),
       null))
-    assert(sparkListener.failedStages === Seq(0))
+    assert(failedStages === Seq(0))
 
     // The second map task fails with TaskKilled.
     runEvent(makeCompletionEvent(
@@ -2676,13 +2674,12 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
   test("Barrier task failures from a previous stage attempt don't trigger stage retry") {
     val shuffleMapRdd = new MyRDD(sc, 2, Nil).barrier().mapPartitions(iter => iter)
     val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
-    val shuffleId = shuffleDep.shuffleId
     val reduceRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker)
     submit(reduceRdd, Array(0, 1))
 
     val mapStageId = 0
     def countSubmittedMapStageAttempts(): Int = {
-      sparkListener.submittedStageInfos.count(_.stageId == mapStageId)
+      submittedStageInfos.count(_.stageId == mapStageId)
     }
 
     // The map stage should have been submitted.
@@ -2694,7 +2691,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
       taskSets(0).tasks(0),
       TaskKilled("test"),
       null))
-    assert(sparkListener.failedStages === Seq(0))
+    assert(failedStages === Seq(0))
 
     // Trigger resubmission of the failed map stage.
     runEvent(ResubmitFailedStages)
diff --git a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
index 3052ed9..958d57d 100644
--- a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
@@ -20,7 +20,6 @@ package org.apache.spark.storage
 import java.nio.ByteBuffer
 
 import scala.language.implicitConversions
-import scala.language.reflectiveCalls
 import scala.reflect.ClassTag
 
 import org.scalatest._
@@ -62,8 +61,8 @@ class MemoryStoreSuite
   def makeMemoryStore(maxMem: Long): (MemoryStore, BlockInfoManager) = {
     val memManager = new UnifiedMemoryManager(conf, maxMem, maxMem / 2, 1)
     val blockInfoManager = new BlockInfoManager
+    var memoryStore: MemoryStore = null
     val blockEvictionHandler = new BlockEvictionHandler {
-      var memoryStore: MemoryStore = _
       override private[storage] def dropFromMemory[T: ClassTag](
           blockId: BlockId,
           data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel = {
@@ -71,10 +70,9 @@ class MemoryStoreSuite
         StorageLevel.NONE
       }
     }
-    val memoryStore =
+    memoryStore =
       new MemoryStore(conf, blockInfoManager, serializerManager, memManager, blockEvictionHandler)
     memManager.setMemoryStore(memoryStore)
-    blockEvictionHandler.memoryStore = memoryStore
     (memoryStore, blockInfoManager)
   }
 
@@ -421,8 +419,8 @@ class MemoryStoreSuite
       val blockInfoManager = new BlockInfoManager
       blockInfoManager.registerTask(tc.taskAttemptId)
       var droppedSoFar = 0
+      var memoryStore: MemoryStore = null
       val blockEvictionHandler = new BlockEvictionHandler {
-        var memoryStore: MemoryStore = _
 
         override private[storage] def dropFromMemory[T: ClassTag](
             blockId: BlockId,
@@ -442,7 +440,7 @@ class MemoryStoreSuite
           }
         }
       }
-      val memoryStore = new MemoryStore(conf, blockInfoManager, serializerManager, memManager,
+      memoryStore = new MemoryStore(conf, blockInfoManager, serializerManager, memManager,
           blockEvictionHandler) {
         override def afterDropAction(blockId: BlockId): Unit = {
           if (readLockAfterDrop) {
@@ -454,11 +452,10 @@ class MemoryStoreSuite
         }
       }
 
-      blockEvictionHandler.memoryStore = memoryStore
       memManager.setMemoryStore(memoryStore)
 
       // Put in some small blocks to fill up the memory store
-      val initialBlocks = (1 to numInitialBlocks).map { id =>
+      (1 to numInitialBlocks).foreach { id =>
         val blockId = BlockId(s"rdd_1_$id")
         val blockInfo = new BlockInfo(StorageLevel.MEMORY_ONLY, ct, tellMaster = false)
         val initialWriteLock = blockInfoManager.lockNewBlockForWriting(blockId, blockInfo)
diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
index b7032e8..5e08a3d 100644
--- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
@@ -19,8 +19,6 @@ package org.apache.spark.util
 
 import java.io.NotSerializableException
 
-import scala.language.reflectiveCalls
-
 import org.apache.spark.{SparkContext, SparkException, SparkFunSuite, TaskContext}
 import org.apache.spark.LocalSparkContext._
 import org.apache.spark.partial.CountEvaluator
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala
index dd363f5..b5d1b02f 100644
--- a/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRunner.scala
@@ -18,17 +18,15 @@
 // scalastyle:off println
 package org.apache.spark.examples.mllib
 
-import scala.language.reflectiveCalls
-
 import scopt.OptionParser
 
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.mllib.evaluation.MulticlassMetrics
-import org.apache.spark.mllib.linalg.Vector
 import org.apache.spark.mllib.regression.LabeledPoint
 import org.apache.spark.mllib.tree.{impurity, DecisionTree, RandomForest}
 import org.apache.spark.mllib.tree.configuration.{Algo, Strategy}
 import org.apache.spark.mllib.tree.configuration.Algo._
+import org.apache.spark.mllib.tree.model.{DecisionTreeModel, RandomForestModel}
 import org.apache.spark.mllib.util.MLUtils
 import org.apache.spark.rdd.RDD
 import org.apache.spark.util.Utils
@@ -349,19 +347,17 @@ object DecisionTreeRunner {
 
   /**
    * Calculates the mean squared error for regression.
-   *
-   * This is just for demo purpose. In general, don't copy this code because it is NOT efficient
-   * due to the use of structural types, which leads to one reflection call per record.
    */
-  // scalastyle:off structural.type
-  private[mllib] def meanSquaredError(
-      model: { def predict(features: Vector): Double },
-      data: RDD[LabeledPoint]): Double = {
+  private[mllib] def meanSquaredError(model: RandomForestModel, data: RDD[LabeledPoint]): Double =
+    data.map { y =>
+      val err = model.predict(y.features) - y.label
+      err * err
+    }.mean()
+
+  private[mllib] def meanSquaredError(model: DecisionTreeModel, data: RDD[LabeledPoint]): Double =
     data.map { y =>
       val err = model.predict(y.features) - y.label
       err * err
     }.mean()
-  }
-  // scalastyle:on structural.type
 }
 // scalastyle:on println
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/GradientBoostedTreesRunner.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/GradientBoostedTreesRunner.scala
index 4020c6b..3f26493 100644
--- a/examples/src/main/scala/org/apache/spark/examples/mllib/GradientBoostedTreesRunner.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/GradientBoostedTreesRunner.scala
@@ -22,8 +22,11 @@ import scopt.OptionParser
 
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.mllib.evaluation.MulticlassMetrics
+import org.apache.spark.mllib.regression.LabeledPoint
 import org.apache.spark.mllib.tree.GradientBoostedTrees
 import org.apache.spark.mllib.tree.configuration.{Algo, BoostingStrategy}
+import org.apache.spark.mllib.tree.model.GradientBoostedTreesModel
+import org.apache.spark.rdd.RDD
 import org.apache.spark.util.Utils
 
 /**
@@ -134,13 +137,20 @@ object GradientBoostedTreesRunner {
       } else {
         println(model) // Print model summary.
       }
-      val trainMSE = DecisionTreeRunner.meanSquaredError(model, training)
+      val trainMSE = meanSquaredError(model, training)
       println(s"Train mean squared error = $trainMSE")
-      val testMSE = DecisionTreeRunner.meanSquaredError(model, test)
+      val testMSE = meanSquaredError(model, test)
       println(s"Test mean squared error = $testMSE")
     }
 
     sc.stop()
   }
+
+  private[mllib] def meanSquaredError(
+      model: GradientBoostedTreesModel, data: RDD[LabeledPoint]): Double =
+    data.map { y =>
+      val err = model.predict(y.features) - y.label
+      err * err
+    }.mean()
 }
 // scalastyle:on println
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
index dfbdae1..6a9ebfb 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
@@ -20,7 +20,6 @@ package org.apache.spark.scheduler.cluster.mesos
 import java.io.{File, FileNotFoundException}
 
 import scala.collection.JavaConverters._
-import scala.language.reflectiveCalls
 
 import com.google.common.io.Files
 import org.apache.mesos.Protos.{FrameworkInfo, Resource, Value}
@@ -35,14 +34,14 @@ import org.apache.spark.util.SparkConfWithEnv
 
 class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoSugar {
 
-  // scalastyle:off structural.type
-  // this is the documented way of generating fixtures in scalatest
-  def fixture: Object {val sc: SparkContext; val sparkConf: SparkConf} = new {
+  class SparkConfFixture {
     val sparkConf = new SparkConf
-    val sc = mock[SparkContext]
+    val sc: SparkContext = mock[SparkContext]
     when(sc.conf).thenReturn(sparkConf)
   }
 
+  def fixture: SparkConfFixture = new SparkConfFixture()
+
   private def createTestPortResource(range: (Long, Long), role: Option[String] = None): Resource = {
     val rangeValue = Value.Range.newBuilder()
     rangeValue.setBegin(range._1)
@@ -78,7 +77,6 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS
   }
 
   val utils = new MesosSchedulerUtils { }
-  // scalastyle:on structural.type
 
   test("use at-least minimum overhead") {
     val f = fixture
@@ -253,9 +251,9 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS
   }
 
   test("Principal specified via spark.mesos.principal.file") {
-    val pFile = File.createTempFile("MesosSchedulerUtilsSuite", ".txt");
+    val pFile = File.createTempFile("MesosSchedulerUtilsSuite", ".txt")
     pFile.deleteOnExit()
-    Files.write("test-principal".getBytes("UTF-8"), pFile);
+    Files.write("test-principal".getBytes("UTF-8"), pFile)
     val conf = new SparkConf()
     conf.set(mesosConfig.CREDENTIAL_PRINCIPAL_FILE, pFile.getAbsolutePath())
 
@@ -282,9 +280,9 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS
   }
 
   test("Principal specified via SPARK_MESOS_PRINCIPAL_FILE") {
-    val pFile = File.createTempFile("MesosSchedulerUtilsSuite", ".txt");
+    val pFile = File.createTempFile("MesosSchedulerUtilsSuite", ".txt")
     pFile.deleteOnExit()
-    Files.write("test-principal".getBytes("UTF-8"), pFile);
+    Files.write("test-principal".getBytes("UTF-8"), pFile)
     val conf = new SparkConfWithEnv(Map("SPARK_MESOS_PRINCIPAL_FILE" -> pFile.getAbsolutePath()))
 
     val credBuilder = utils.buildCredentials(conf, FrameworkInfo.newBuilder())
@@ -313,9 +311,9 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS
   }
 
   test("Principal specified via spark.mesos.secret.file") {
-    val sFile = File.createTempFile("MesosSchedulerUtilsSuite", ".txt");
+    val sFile = File.createTempFile("MesosSchedulerUtilsSuite", ".txt")
     sFile.deleteOnExit()
-    Files.write("my-secret".getBytes("UTF-8"), sFile);
+    Files.write("my-secret".getBytes("UTF-8"), sFile)
     val conf = new SparkConf()
     conf.set(mesosConfig.CREDENTIAL_PRINCIPAL, "test-principal")
     conf.set(mesosConfig.CREDENTIAL_SECRET_FILE, sFile.getAbsolutePath())
@@ -350,9 +348,9 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS
   }
 
   test("Principal specified via SPARK_MESOS_SECRET_FILE") {
-    val sFile = File.createTempFile("MesosSchedulerUtilsSuite", ".txt");
+    val sFile = File.createTempFile("MesosSchedulerUtilsSuite", ".txt")
     sFile.deleteOnExit()
-    Files.write("my-secret".getBytes("UTF-8"), sFile);
+    Files.write("my-secret".getBytes("UTF-8"), sFile)
 
     val sFilePath = sFile.getAbsolutePath()
     val env = Map("SPARK_MESOS_SECRET_FILE" -> sFilePath)
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala
index 70f86aa..835f073 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala
@@ -20,9 +20,6 @@ import java.net.URL
 import java.util.concurrent.atomic.AtomicReference
 import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
 
-import scala.language.reflectiveCalls
-
-import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}
 import org.mockito.Mockito.when
 import org.scalatest.mockito.MockitoSugar
 
@@ -45,22 +42,24 @@ class YarnSchedulerBackendSuite extends SparkFunSuite with MockitoSugar with Loc
     }
   }
 
-  test("RequestExecutors reflects node blacklist and is serializable") {
-    sc = new SparkContext("local", "YarnSchedulerBackendSuite")
-    // Subclassing the TaskSchedulerImpl here instead of using Mockito. For details see SPARK-26891.
-    val sched = new TaskSchedulerImpl(sc) {
-      val blacklistedNodes = new AtomicReference[Set[String]]()
-
-      def setNodeBlacklist(nodeBlacklist: Set[String]): Unit = blacklistedNodes.set(nodeBlacklist)
+  private class TestTaskSchedulerImpl(sc: SparkContext) extends TaskSchedulerImpl(sc) {
+    val blacklistedNodes = new AtomicReference[Set[String]]()
+    def setNodeBlacklist(nodeBlacklist: Set[String]): Unit = blacklistedNodes.set(nodeBlacklist)
+    override def nodeBlacklist(): Set[String] = blacklistedNodes.get()
+  }
 
-      override def nodeBlacklist(): Set[String] = blacklistedNodes.get()
+  private class TestYarnSchedulerBackend(scheduler: TaskSchedulerImpl, sc: SparkContext)
+      extends YarnSchedulerBackend(scheduler, sc) {
+    def setHostToLocalTaskCount(hostToLocalTaskCount: Map[String, Int]): Unit = {
+      this.hostToLocalTaskCount = hostToLocalTaskCount
     }
+  }
 
-    val yarnSchedulerBackendExtended = new YarnSchedulerBackend(sched, sc) {
-      def setHostToLocalTaskCount(hostToLocalTaskCount: Map[String, Int]): Unit = {
-        this.hostToLocalTaskCount = hostToLocalTaskCount
-      }
-    }
+  test("RequestExecutors reflects node blacklist and is serializable") {
+    sc = new SparkContext("local", "YarnSchedulerBackendSuite")
+    // Subclassing the TaskSchedulerImpl here instead of using Mockito. For details see SPARK-26891.
+    val sched = new TestTaskSchedulerImpl(sc)
+    val yarnSchedulerBackendExtended = new TestYarnSchedulerBackend(sched, sc)
     yarnSchedulerBackend = yarnSchedulerBackendExtended
     val ser = new JavaSerializer(sc.conf).newInstance()
     for {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index e0a3641..bc83f3d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -21,30 +21,37 @@ import java.io.File
 import java.net.URI
 
 import scala.collection.mutable
-import scala.language.reflectiveCalls
 
 import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path, RawLocalFileSystem}
 
 import org.apache.spark.metrics.source.HiveCatalogMetrics
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
-import org.apache.spark.util.{KnownSizeEstimation, SizeEstimator}
+import org.apache.spark.util.KnownSizeEstimation
 
 class FileIndexSuite extends SharedSQLContext {
 
+  private class TestInMemoryFileIndex(
+      spark: SparkSession,
+      path: Path,
+      fileStatusCache: FileStatusCache = NoopCache)
+      extends InMemoryFileIndex(spark, Seq(path), Map.empty, None, fileStatusCache) {
+    def leafFilePaths: Seq[Path] = leafFiles.keys.toSeq
+    def leafDirPaths: Seq[Path] = leafDirToChildrenFiles.keys.toSeq
+    def leafFileStatuses: Iterable[FileStatus] = leafFiles.values
+  }
+
   test("InMemoryFileIndex: leaf files are qualified paths") {
     withTempDir { dir =>
       val file = new File(dir, "text.txt")
       stringToFile(file, "text")
 
       val path = new Path(file.getCanonicalPath)
-      val catalog = new InMemoryFileIndex(spark, Seq(path), Map.empty, None) {
-        def leafFilePaths: Seq[Path] = leafFiles.keys.toSeq
-        def leafDirPaths: Seq[Path] = leafDirToChildrenFiles.keys.toSeq
-      }
+      val catalog = new TestInMemoryFileIndex(spark, path)
       assert(catalog.leafFilePaths.forall(p => p.toString.startsWith("file:/")))
       assert(catalog.leafDirPaths.forall(p => p.toString.startsWith("file:/")))
     }
@@ -288,11 +295,7 @@ class FileIndexSuite extends SharedSQLContext {
       val fileStatusCache = FileStatusCache.getOrCreate(spark)
       val dirPath = new Path(dir.getAbsolutePath)
       val fs = dirPath.getFileSystem(spark.sessionState.newHadoopConf())
-      val catalog =
-        new InMemoryFileIndex(spark, Seq(dirPath), Map.empty, None, fileStatusCache) {
-          def leafFilePaths: Seq[Path] = leafFiles.keys.toSeq
-          def leafDirPaths: Seq[Path] = leafDirToChildrenFiles.keys.toSeq
-        }
+      val catalog = new TestInMemoryFileIndex(spark, dirPath, fileStatusCache)
 
       val file = new File(dir, "text.txt")
       stringToFile(file, "text")
@@ -342,10 +345,7 @@ class FileIndexSuite extends SharedSQLContext {
         val file = new File(dir, "text.txt")
         stringToFile(file, "text")
 
-        val inMemoryFileIndex = new InMemoryFileIndex(
-          spark, Seq(new Path(file.getCanonicalPath)), Map.empty, None) {
-          def leafFileStatuses = leafFiles.values
-        }
+        val inMemoryFileIndex = new TestInMemoryFileIndex(spark, new Path(file.getCanonicalPath))
         val blockLocations = inMemoryFileIndex.leafFileStatuses.flatMap(
           _.asInstanceOf[LocatedFileStatus].getBlockLocations)
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 10b2966..6b711f9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.streaming
 import java.util.UUID
 
 import scala.collection.mutable
-import scala.language.reflectiveCalls
 
 import org.scalactic.TolerantNumerics
 import org.scalatest.BeforeAndAfter
@@ -297,8 +296,8 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
       }
       spark.streams.addListener(listener)
       try {
+        var numTriggers = 0
         val input = new MemoryStream[Int](0, sqlContext) {
-          @volatile var numTriggers = 0
           override def latestOffset(): OffsetV2 = {
             numTriggers += 1
             super.latestOffset()
@@ -312,7 +311,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
         }
         actions += AssertOnQuery { _ =>
           eventually(timeout(streamingTimeout)) {
-            assert(input.numTriggers > 100) // at least 100 triggers have occurred
+            assert(numTriggers > 100) // at least 100 triggers have occurred
           }
           true
         }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenersConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenersConfSuite.scala
index ddbc175..88f510c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenersConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenersConfSuite.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.streaming
 
-import scala.language.reflectiveCalls
-
 import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.SparkConf


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org