You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2017/04/20 16:49:45 UTC
spark git commit: [SPARK-20407][TESTS] ParquetQuerySuite
'Enabling/disabling ignoreCorruptFiles' flaky test
Repository: spark
Updated Branches:
refs/heads/master b91873db0 -> c5a31d160
[SPARK-20407][TESTS] ParquetQuerySuite 'Enabling/disabling ignoreCorruptFiles' flaky test
## What changes were proposed in this pull request?
SharedSQLContext.afterEach now calls DebugFilesystem.assertNoOpenStreams inside eventually.
SQLTestUtils withTempDir calls waitForTasksToFinish before deleting the directory.
## How was this patch tested?
Added new test in ParquetQuerySuite based on the flaky test
Author: Bogdan Raducanu <bo...@databricks.com>
Closes #17701 from bogdanrdc/SPARK-20407.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c5a31d16
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c5a31d16
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c5a31d16
Branch: refs/heads/master
Commit: c5a31d160f47ba51bb9f8a4f3141851034640fc7
Parents: b91873d
Author: Bogdan Raducanu <bo...@databricks.com>
Authored: Thu Apr 20 18:49:39 2017 +0200
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Thu Apr 20 18:49:39 2017 +0200
----------------------------------------------------------------------
.../datasources/parquet/ParquetQuerySuite.scala | 35 +++++++++++++++++++-
.../apache/spark/sql/test/SQLTestUtils.scala | 19 +++++++++--
.../spark/sql/test/SharedSQLContext.scala | 13 +++++---
3 files changed, 60 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/c5a31d16/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index c366095..2efff3f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -23,7 +23,7 @@ import java.sql.Timestamp
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.parquet.hadoop.ParquetOutputFormat
-import org.apache.spark.SparkException
+import org.apache.spark.{DebugFilesystem, SparkException}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
@@ -316,6 +316,39 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
}
}
+ /**
+ * this is part of test 'Enabling/disabling ignoreCorruptFiles' but run in a loop
+ * to increase the chance of failure
+ */
+ ignore("SPARK-20407 ParquetQuerySuite 'Enabling/disabling ignoreCorruptFiles' flaky test") {
+ def testIgnoreCorruptFiles(): Unit = {
+ withTempDir { dir =>
+ val basePath = dir.getCanonicalPath
+ spark.range(1).toDF("a").write.parquet(new Path(basePath, "first").toString)
+ spark.range(1, 2).toDF("a").write.parquet(new Path(basePath, "second").toString)
+ spark.range(2, 3).toDF("a").write.json(new Path(basePath, "third").toString)
+ val df = spark.read.parquet(
+ new Path(basePath, "first").toString,
+ new Path(basePath, "second").toString,
+ new Path(basePath, "third").toString)
+ checkAnswer(
+ df,
+ Seq(Row(0), Row(1)))
+ }
+ }
+
+ for (i <- 1 to 100) {
+ DebugFilesystem.clearOpenStreams()
+ withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
+ val exception = intercept[SparkException] {
+ testIgnoreCorruptFiles()
+ }
+ assert(exception.getMessage().contains("is not a Parquet file"))
+ }
+ DebugFilesystem.assertNoOpenStreams()
+ }
+ }
+
test("SPARK-8990 DataFrameReader.parquet() should respect user specified options") {
withTempPath { dir =>
val basePath = dir.getCanonicalPath
http://git-wip-us.apache.org/repos/asf/spark/blob/c5a31d16/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index b5ad73b7..44c0fc7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -22,11 +22,13 @@ import java.net.URI
import java.nio.file.Files
import java.util.{Locale, UUID}
+import scala.concurrent.duration._
import scala.language.implicitConversions
import scala.util.control.NonFatal
import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterAll
+import org.scalatest.concurrent.Eventually
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql._
@@ -49,7 +51,7 @@ import org.apache.spark.util.{UninterruptibleThread, Utils}
* prone to leaving multiple overlapping [[org.apache.spark.SparkContext]]s in the same JVM.
*/
private[sql] trait SQLTestUtils
- extends SparkFunSuite
+ extends SparkFunSuite with Eventually
with BeforeAndAfterAll
with SQLTestData { self =>
@@ -139,6 +141,15 @@ private[sql] trait SQLTestUtils
}
/**
+ * Waits for all tasks on all executors to be finished.
+ */
+ protected def waitForTasksToFinish(): Unit = {
+ eventually(timeout(10.seconds)) {
+ assert(spark.sparkContext.statusTracker
+ .getExecutorInfos.map(_.numRunningTasks()).sum == 0)
+ }
+ }
+ /**
* Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
* returns.
*
@@ -146,7 +157,11 @@ private[sql] trait SQLTestUtils
*/
protected def withTempDir(f: File => Unit): Unit = {
val dir = Utils.createTempDir().getCanonicalFile
- try f(dir) finally Utils.deleteRecursively(dir)
+ try f(dir) finally {
+ // wait for all tasks to finish before deleting files
+ waitForTasksToFinish()
+ Utils.deleteRecursively(dir)
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/c5a31d16/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
index e122b39..3d76e05 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
@@ -17,17 +17,18 @@
package org.apache.spark.sql.test
+import scala.concurrent.duration._
+
import org.scalatest.BeforeAndAfterEach
+import org.scalatest.concurrent.Eventually
import org.apache.spark.{DebugFilesystem, SparkConf}
import org.apache.spark.sql.{SparkSession, SQLContext}
-import org.apache.spark.sql.internal.SQLConf
-
/**
* Helper trait for SQL test suites where all tests share a single [[TestSparkSession]].
*/
-trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach {
+trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach with Eventually {
protected val sparkConf = new SparkConf()
@@ -84,6 +85,10 @@ trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach {
protected override def afterEach(): Unit = {
super.afterEach()
- DebugFilesystem.assertNoOpenStreams()
+ // files can be closed from other threads, so wait a bit
+ // normally this doesn't take more than 1s
+ eventually(timeout(10.seconds)) {
+ DebugFilesystem.assertNoOpenStreams()
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org