You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2019/09/07 06:38:55 UTC
[spark] branch branch-2.4 updated: Revert "[SPARK-28912][STREAMING]
Fixed MatchError in getCheckpointFiles()"
This is an automated email from the ASF dual-hosted git repository.
lixiao pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new 0a4b356 Revert "[SPARK-28912][STREAMING] Fixed MatchError in getCheckpointFiles()"
0a4b356 is described below
commit 0a4b35642ffa3020ec0fcae2cca59376e2095636
Author: Xiao Li <ga...@gmail.com>
AuthorDate: Fri Sep 6 23:37:36 2019 -0700
Revert "[SPARK-28912][STREAMING] Fixed MatchError in getCheckpointFiles()"
This reverts commit 2654c33fd6a7a09e2b2fa9fc1c2ea6224ab292e6.
---
.../scala/org/apache/spark/streaming/Checkpoint.scala | 4 ++--
.../org/apache/spark/streaming/CheckpointSuite.scala | 17 -----------------
2 files changed, 2 insertions(+), 19 deletions(-)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index b081287..a882558 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -128,8 +128,8 @@ object Checkpoint extends Logging {
try {
val statuses = fs.listStatus(path)
if (statuses != null) {
- val paths = statuses.filterNot(_.isDirectory).map(_.getPath)
- val filtered = paths.filter(p => REGEX.findFirstIn(p.getName).nonEmpty)
+ val paths = statuses.map(_.getPath)
+ val filtered = paths.filter(p => REGEX.findFirstIn(p.toString).nonEmpty)
filtered.sortWith(sortFunc)
} else {
logWarning(s"Listing $path returned null")
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 43e3cdd..19b621f 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -846,23 +846,6 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
checkpointWriter.stop()
}
- test("SPARK-28912: Fix MatchError in getCheckpointFiles") {
- withTempDir { tempDir =>
- val fs = FileSystem.get(tempDir.toURI, new Configuration)
- val checkpointDir = tempDir.getAbsolutePath + "/checkpoint-01"
-
- assert(Checkpoint.getCheckpointFiles(checkpointDir, Some(fs)).length === 0)
-
- // Ignore files whose parent path match.
- fs.create(new Path(checkpointDir, "this-is-matched-before-due-to-parent-path")).close()
- assert(Checkpoint.getCheckpointFiles(checkpointDir, Some(fs)).length === 0)
-
- // Ignore directories whose names match.
- fs.mkdirs(new Path(checkpointDir, "checkpoint-1000000000"))
- assert(Checkpoint.getCheckpointFiles(checkpointDir, Some(fs)).length === 0)
- }
- }
-
test("SPARK-6847: stack overflow when updateStateByKey is followed by a checkpointed dstream") {
// In this test, there are two updateStateByKey operators. The RDD DAG is as follows:
//
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org