You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/09/07 00:55:52 UTC

[spark] branch branch-2.4 updated: [SPARK-28912][STREAMING] Fixed MatchError in getCheckpointFiles()

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 2654c33  [SPARK-28912][STREAMING] Fixed MatchError in getCheckpointFiles()
2654c33 is described below

commit 2654c33fd6a7a09e2b2fa9fc1c2ea6224ab292e6
Author: avk <nu...@gmail.com>
AuthorDate: Fri Sep 6 17:55:09 2019 -0700

    [SPARK-28912][STREAMING] Fixed MatchError in getCheckpointFiles()
    
    ### What changes were proposed in this pull request?
    
    This change fixes issue SPARK-28912.
    
    ### Why are the changes needed?
    
    If checkpoint directory is set to name which matches regex pattern used for checkpoint files then logs are flooded with MatchError exceptions and old checkpoint files are not removed.
    
    ### Does this PR introduce any user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Manually.
    
    1. Start Hadoop in a pseudo-distributed mode.
    
    2. In another terminal run command  nc -lk 9999
    
    3. In the Spark shell execute the following statements:
    
        ```scala
        val ssc = new StreamingContext(sc, Seconds(30))
        ssc.checkpoint("hdfs://localhost:9000/checkpoint-01")
        val lines = ssc.socketTextStream("localhost", 9999)
        val words = lines.flatMap(_.split(" "))
        val pairs = words.map(word => (word, 1))
        val wordCounts = pairs.reduceByKey(_ + _)
        wordCounts.print()
        ssc.start()
        ssc.awaitTermination()
        ```
    
    Closes #25654 from avkgh/SPARK-28912.
    
    Authored-by: avk <nu...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
    (cherry picked from commit 723faadf80da91a6e5514fc16b7af3ca4900eda8)
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../scala/org/apache/spark/streaming/Checkpoint.scala   |  4 ++--
 .../org/apache/spark/streaming/CheckpointSuite.scala    | 17 +++++++++++++++++
 2 files changed, 19 insertions(+), 2 deletions(-)

diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index a882558..b081287 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -128,8 +128,8 @@ object Checkpoint extends Logging {
     try {
       val statuses = fs.listStatus(path)
       if (statuses != null) {
-        val paths = statuses.map(_.getPath)
-        val filtered = paths.filter(p => REGEX.findFirstIn(p.toString).nonEmpty)
+        val paths = statuses.filterNot(_.isDirectory).map(_.getPath)
+        val filtered = paths.filter(p => REGEX.findFirstIn(p.getName).nonEmpty)
         filtered.sortWith(sortFunc)
       } else {
         logWarning(s"Listing $path returned null")
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 19b621f..43e3cdd 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -846,6 +846,23 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
     checkpointWriter.stop()
   }
 
+  test("SPARK-28912: Fix MatchError in getCheckpointFiles") {
+    withTempDir { tempDir =>
+      val fs = FileSystem.get(tempDir.toURI, new Configuration)
+      val checkpointDir = tempDir.getAbsolutePath + "/checkpoint-01"
+
+      assert(Checkpoint.getCheckpointFiles(checkpointDir, Some(fs)).length === 0)
+
+      // Ignore files whose parent path match.
+      fs.create(new Path(checkpointDir, "this-is-matched-before-due-to-parent-path")).close()
+      assert(Checkpoint.getCheckpointFiles(checkpointDir, Some(fs)).length === 0)
+
+      // Ignore directories whose names match.
+      fs.mkdirs(new Path(checkpointDir, "checkpoint-1000000000"))
+      assert(Checkpoint.getCheckpointFiles(checkpointDir, Some(fs)).length === 0)
+    }
+  }
+
   test("SPARK-6847: stack overflow when updateStateByKey is followed by a checkpointed dstream") {
     // In this test, there are two updateStateByKey operators. The RDD DAG is as follows:
     //


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org