You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/01/15 03:13:13 UTC

[spark] branch master updated: [SPARK-34118][CORE][SQL] Replaces filter and check for emptiness with exists or forall

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ed23ed  [SPARK-34118][CORE][SQL] Replaces filter and check for emptiness with exists or forall
8ed23ed is described below

commit 8ed23ed499ec7745a8e9bdc4c4fb3200fdb6c3c8
Author: yangjie01 <ya...@baidu.com>
AuthorDate: Fri Jan 15 12:12:33 2021 +0900

    [SPARK-34118][CORE][SQL] Replaces filter and check for emptiness with exists or forall
    
    ### What changes were proposed in this pull request?
    This pr use `exists` or `forall` to simplify `filter + emptiness check`, it's semantically consistent, but looks simpler. The rule as follow:
    
    - `seq.filter(p).size == 0)` -> `!seq.exists(p)`
    - `seq.filter(p).length > 0` -> `seq.exists(p)`
    - `seq.filterNot(p).isEmpty` -> `seq.forall(p)`
    - `seq.filterNot(p).nonEmpty` -> `!seq.forall(p)`
    
    ### Why are the changes needed?
    Code Simpilefications.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Pass the Jenkins or GitHub Action
    
    Closes #31184 from LuciferYang/SPARK-34118.
    
    Authored-by: yangjie01 <ya...@baidu.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 core/src/main/scala/org/apache/spark/api/r/RUtils.scala               | 4 ++--
 .../spark/storage/BlockManagerDecommissionIntegrationSuite.scala      | 2 +-
 core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala     | 2 +-
 .../scala/org/apache/spark/sql/execution/DataSourceScanExec.scala     | 4 ++--
 .../scala/org/apache/spark/sql/connector/DataSourceV2UtilsSuite.scala | 4 ++--
 sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala     | 2 +-
 .../org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala  | 2 +-
 7 files changed, 10 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
index 311fade..784a57e 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
@@ -43,9 +43,9 @@ private[spark] object RUtils {
    * Check if SparkR is installed before running tests that use SparkR.
    */
   def isSparkRInstalled: Boolean = {
-    localSparkRPackagePath.filter { pkgDir =>
+    localSparkRPackagePath.exists { pkgDir =>
       new File(Seq(pkgDir, "SparkR").mkString(File.separator)).exists
-    }.isDefined
+    }
   }
 
   /**
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
index e6267fa..672bd8c 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
@@ -281,7 +281,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
           (update.blockUpdatedInfo.blockId.name,
             update.blockUpdatedInfo.blockManagerId)}
         val blocksToManagers = blockLocs.groupBy(_._1).mapValues(_.size)
-        assert(!blocksToManagers.filter(_._2 > 1).isEmpty,
+        assert(blocksToManagers.exists(_._2 > 1),
           s"We should have a block that has been on multiple BMs in rdds:\n ${rddUpdates} from:\n" +
           s"${blocksUpdated}\n but instead we got:\n ${blocksToManagers}")
       }
diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
index 21e6955..12d9757 100644
--- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
@@ -337,7 +337,7 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
     assert(generatedFiles.size > 1)
     if (isCompressed) {
       assert(
-        generatedFiles.filter(_.getName.endsWith(RollingFileAppender.GZIP_LOG_SUFFIX)).size > 0)
+        generatedFiles.exists(_.getName.endsWith(RollingFileAppender.GZIP_LOG_SUFFIX)))
     }
     val allText = generatedFiles.map { file =>
       if (file.getName.endsWith(RollingFileAppender.GZIP_LOG_SUFFIX)) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index df3b9f2..02fb73e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -421,7 +421,7 @@ case class FileSourceScanExec(
   }
 
   /** SQL metrics generated only for scans using dynamic partition pruning. */
-  private lazy val staticMetrics = if (partitionFilters.filter(isDynamicPruningFilter).nonEmpty) {
+  private lazy val staticMetrics = if (partitionFilters.exists(isDynamicPruningFilter)) {
     Map("staticFilesNum" -> SQLMetrics.createMetric(sparkContext, "static number of files read"),
       "staticFilesSize" -> SQLMetrics.createSizeMetric(sparkContext, "static size of files read"))
   } else {
@@ -434,7 +434,7 @@ case class FileSourceScanExec(
       static: Boolean): Unit = {
     val filesNum = partitions.map(_.files.size.toLong).sum
     val filesSize = partitions.map(_.files.map(_.getLen).sum).sum
-    if (!static || partitionFilters.filter(isDynamicPruningFilter).isEmpty) {
+    if (!static || !partitionFilters.exists(isDynamicPruningFilter)) {
       driverMetrics("numFiles") = filesNum
       driverMetrics("filesSize") = filesSize
     } else {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2UtilsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2UtilsSuite.scala
index 01fcced..a58bab2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2UtilsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2UtilsSuite.scala
@@ -37,8 +37,8 @@ class DataSourceV2UtilsSuite extends SparkFunSuite {
     val source = new DataSourceV2WithSessionConfig
     val confs = DataSourceV2Utils.extractSessionConfigs(source, conf)
     assert(confs.size == 2)
-    assert(confs.keySet.filter(_.startsWith("spark.datasource")).size == 0)
-    assert(confs.keySet.filter(_.startsWith("not.exist.prefix")).size == 0)
+    assert(!confs.keySet.exists(_.startsWith("spark.datasource")))
+    assert(!confs.keySet.exists(_.startsWith("not.exist.prefix")))
     assert(confs.keySet.contains("foo.bar"))
     assert(confs.keySet.contains("whateverConfigName"))
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 639fd0e..730eb78 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -722,7 +722,7 @@ class JDBCSuite extends QueryTest
   test("Remap types via JdbcDialects") {
     JdbcDialects.registerDialect(testH2Dialect)
     val df = spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", new Properties())
-    assert(df.schema.filter(_.dataType != org.apache.spark.sql.types.StringType).isEmpty)
+    assert(!df.schema.exists(_.dataType != org.apache.spark.sql.types.StringType))
     val rows = df.collect()
     assert(rows(0).get(0).isInstanceOf[String])
     assert(rows(0).get(1).isInstanceOf[String])
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 26158f4..1565287 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -49,7 +49,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
     assert(spark.streams.active.isEmpty)
     // Skip check default `StreamingQueryStatusListener` which is for streaming UI.
     assert(spark.streams.listListeners()
-      .filterNot(_.isInstanceOf[StreamingQueryStatusListener]).isEmpty)
+      .forall(_.isInstanceOf[StreamingQueryStatusListener]))
     // Make sure we don't leak any events to the next test
     spark.sparkContext.listenerBus.waitUntilEmpty()
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org