You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2019/09/07 06:38:55 UTC

[spark] branch branch-2.4 updated: Revert "[SPARK-28912][STREAMING] Fixed MatchError in getCheckpointFiles()"

This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 0a4b356  Revert "[SPARK-28912][STREAMING] Fixed MatchError in getCheckpointFiles()"
0a4b356 is described below

commit 0a4b35642ffa3020ec0fcae2cca59376e2095636
Author: Xiao Li <ga...@gmail.com>
AuthorDate: Fri Sep 6 23:37:36 2019 -0700

    Revert "[SPARK-28912][STREAMING] Fixed MatchError in getCheckpointFiles()"
    
    This reverts commit 2654c33fd6a7a09e2b2fa9fc1c2ea6224ab292e6.
---
 .../scala/org/apache/spark/streaming/Checkpoint.scala   |  4 ++--
 .../org/apache/spark/streaming/CheckpointSuite.scala    | 17 -----------------
 2 files changed, 2 insertions(+), 19 deletions(-)

diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index b081287..a882558 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -128,8 +128,8 @@ object Checkpoint extends Logging {
     try {
       val statuses = fs.listStatus(path)
       if (statuses != null) {
-        val paths = statuses.filterNot(_.isDirectory).map(_.getPath)
-        val filtered = paths.filter(p => REGEX.findFirstIn(p.getName).nonEmpty)
+        val paths = statuses.map(_.getPath)
+        val filtered = paths.filter(p => REGEX.findFirstIn(p.toString).nonEmpty)
         filtered.sortWith(sortFunc)
       } else {
         logWarning(s"Listing $path returned null")
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 43e3cdd..19b621f 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -846,23 +846,6 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
     checkpointWriter.stop()
   }
 
-  test("SPARK-28912: Fix MatchError in getCheckpointFiles") {
-    withTempDir { tempDir =>
-      val fs = FileSystem.get(tempDir.toURI, new Configuration)
-      val checkpointDir = tempDir.getAbsolutePath + "/checkpoint-01"
-
-      assert(Checkpoint.getCheckpointFiles(checkpointDir, Some(fs)).length === 0)
-
-      // Ignore files whose parent path match.
-      fs.create(new Path(checkpointDir, "this-is-matched-before-due-to-parent-path")).close()
-      assert(Checkpoint.getCheckpointFiles(checkpointDir, Some(fs)).length === 0)
-
-      // Ignore directories whose names match.
-      fs.mkdirs(new Path(checkpointDir, "checkpoint-1000000000"))
-      assert(Checkpoint.getCheckpointFiles(checkpointDir, Some(fs)).length === 0)
-    }
-  }
-
   test("SPARK-6847: stack overflow when updateStateByKey is followed by a checkpointed dstream") {
     // In this test, there are two updateStateByKey operators. The RDD DAG is as follows:
     //


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org