You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2017/04/20 16:49:45 UTC

spark git commit: [SPARK-20407][TESTS] ParquetQuerySuite 'Enabling/disabling ignoreCorruptFiles' flaky test

Repository: spark
Updated Branches:
  refs/heads/master b91873db0 -> c5a31d160


[SPARK-20407][TESTS] ParquetQuerySuite 'Enabling/disabling ignoreCorruptFiles' flaky test

## What changes were proposed in this pull request?

SharedSQLContext.afterEach now calls DebugFilesystem.assertNoOpenStreams inside eventually.
SQLTestUtils withTempDir calls waitForTasksToFinish before deleting the directory.

## How was this patch tested?
Added new test in ParquetQuerySuite based on the flaky test

Author: Bogdan Raducanu <bo...@databricks.com>

Closes #17701 from bogdanrdc/SPARK-20407.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c5a31d16
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c5a31d16
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c5a31d16

Branch: refs/heads/master
Commit: c5a31d160f47ba51bb9f8a4f3141851034640fc7
Parents: b91873d
Author: Bogdan Raducanu <bo...@databricks.com>
Authored: Thu Apr 20 18:49:39 2017 +0200
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Thu Apr 20 18:49:39 2017 +0200

----------------------------------------------------------------------
 .../datasources/parquet/ParquetQuerySuite.scala | 35 +++++++++++++++++++-
 .../apache/spark/sql/test/SQLTestUtils.scala    | 19 +++++++++--
 .../spark/sql/test/SharedSQLContext.scala       | 13 +++++---
 3 files changed, 60 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c5a31d16/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index c366095..2efff3f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -23,7 +23,7 @@ import java.sql.Timestamp
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.parquet.hadoop.ParquetOutputFormat
 
-import org.apache.spark.SparkException
+import org.apache.spark.{DebugFilesystem, SparkException}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
@@ -316,6 +316,39 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
     }
   }
 
+  /**
+   * this is part of test 'Enabling/disabling ignoreCorruptFiles' but run in a loop
+   * to increase the chance of failure
+    */
+  ignore("SPARK-20407 ParquetQuerySuite 'Enabling/disabling ignoreCorruptFiles' flaky test") {
+    def testIgnoreCorruptFiles(): Unit = {
+      withTempDir { dir =>
+        val basePath = dir.getCanonicalPath
+        spark.range(1).toDF("a").write.parquet(new Path(basePath, "first").toString)
+        spark.range(1, 2).toDF("a").write.parquet(new Path(basePath, "second").toString)
+        spark.range(2, 3).toDF("a").write.json(new Path(basePath, "third").toString)
+        val df = spark.read.parquet(
+          new Path(basePath, "first").toString,
+          new Path(basePath, "second").toString,
+          new Path(basePath, "third").toString)
+        checkAnswer(
+          df,
+          Seq(Row(0), Row(1)))
+      }
+    }
+
+    for (i <- 1 to 100) {
+      DebugFilesystem.clearOpenStreams()
+      withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
+        val exception = intercept[SparkException] {
+          testIgnoreCorruptFiles()
+        }
+        assert(exception.getMessage().contains("is not a Parquet file"))
+      }
+      DebugFilesystem.assertNoOpenStreams()
+    }
+  }
+
   test("SPARK-8990 DataFrameReader.parquet() should respect user specified options") {
     withTempPath { dir =>
       val basePath = dir.getCanonicalPath

http://git-wip-us.apache.org/repos/asf/spark/blob/c5a31d16/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index b5ad73b7..44c0fc7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -22,11 +22,13 @@ import java.net.URI
 import java.nio.file.Files
 import java.util.{Locale, UUID}
 
+import scala.concurrent.duration._
 import scala.language.implicitConversions
 import scala.util.control.NonFatal
 
 import org.apache.hadoop.fs.Path
 import org.scalatest.BeforeAndAfterAll
+import org.scalatest.concurrent.Eventually
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql._
@@ -49,7 +51,7 @@ import org.apache.spark.util.{UninterruptibleThread, Utils}
  * prone to leaving multiple overlapping [[org.apache.spark.SparkContext]]s in the same JVM.
  */
 private[sql] trait SQLTestUtils
-  extends SparkFunSuite
+  extends SparkFunSuite with Eventually
   with BeforeAndAfterAll
   with SQLTestData { self =>
 
@@ -139,6 +141,15 @@ private[sql] trait SQLTestUtils
   }
 
   /**
+   * Waits for all tasks on all executors to be finished.
+   */
+  protected def waitForTasksToFinish(): Unit = {
+    eventually(timeout(10.seconds)) {
+      assert(spark.sparkContext.statusTracker
+        .getExecutorInfos.map(_.numRunningTasks()).sum == 0)
+    }
+  }
+  /**
    * Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
    * returns.
    *
@@ -146,7 +157,11 @@ private[sql] trait SQLTestUtils
    */
   protected def withTempDir(f: File => Unit): Unit = {
     val dir = Utils.createTempDir().getCanonicalFile
-    try f(dir) finally Utils.deleteRecursively(dir)
+    try f(dir) finally {
+      // wait for all tasks to finish before deleting files
+      waitForTasksToFinish()
+      Utils.deleteRecursively(dir)
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/c5a31d16/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
index e122b39..3d76e05 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
@@ -17,17 +17,18 @@
 
 package org.apache.spark.sql.test
 
+import scala.concurrent.duration._
+
 import org.scalatest.BeforeAndAfterEach
+import org.scalatest.concurrent.Eventually
 
 import org.apache.spark.{DebugFilesystem, SparkConf}
 import org.apache.spark.sql.{SparkSession, SQLContext}
-import org.apache.spark.sql.internal.SQLConf
-
 
 /**
  * Helper trait for SQL test suites where all tests share a single [[TestSparkSession]].
  */
-trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach {
+trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach with Eventually {
 
   protected val sparkConf = new SparkConf()
 
@@ -84,6 +85,10 @@ trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach {
 
   protected override def afterEach(): Unit = {
     super.afterEach()
-    DebugFilesystem.assertNoOpenStreams()
+    // files can be closed from other threads, so wait a bit
+    // normally this doesn't take more than 1s
+    eventually(timeout(10.seconds)) {
+      DebugFilesystem.assertNoOpenStreams()
+    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org