You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/11/03 00:15:56 UTC

git commit: [SQL] Fixes race condition in CliSuite

Repository: spark
Updated Branches:
  refs/heads/master e4b80894b -> 495a13203


[SQL] Fixes race condition in CliSuite

`CliSuite` has been flaky for a while, this PR tries to improve this situation by fixing a race condition in `CliSuite`. The `captureOutput` function is used to capture both stdout and stderr output of the forked external process in two background threads and search for expected strings, but wasn't been properly synchronized before.

Author: Cheng Lian <li...@databricks.com>

Closes #3060 from liancheng/fix-cli-suite and squashes the following commits:

a70569c [Cheng Lian] Fixes race condition in CliSuite


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/495a1320
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/495a1320
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/495a1320

Branch: refs/heads/master
Commit: 495a132031ae002c787371f2fd0ba4be2437e7c8
Parents: e4b8089
Author: Cheng Lian <li...@databricks.com>
Authored: Sun Nov 2 15:15:52 2014 -0800
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Sun Nov 2 15:15:52 2014 -0800

----------------------------------------------------------------------
 .../spark/sql/hive/thriftserver/CliSuite.scala  | 35 +++++++++-----------
 1 file changed, 15 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/495a1320/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index 8a72e9d..e8ffbc5 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -18,19 +18,17 @@
 
 package org.apache.spark.sql.hive.thriftserver
 
+import java.io._
+
 import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent.duration._
-import scala.concurrent.{Await, Future, Promise}
+import scala.concurrent.{Await, Promise}
 import scala.sys.process.{Process, ProcessLogger}
 
-import java.io._
-import java.util.concurrent.atomic.AtomicInteger
-
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 import org.scalatest.{BeforeAndAfterAll, FunSuite}
 
-import org.apache.spark.{SparkException, Logging}
+import org.apache.spark.Logging
 import org.apache.spark.sql.catalyst.util.getTempFilePath
 
 class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
@@ -53,23 +51,20 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
        """.stripMargin.split("\\s+").toSeq ++ extraArgs
     }
 
-    // AtomicInteger is needed because stderr and stdout of the forked process are handled in
-    // different threads.
-    val next = new AtomicInteger(0)
+    var next = 0
     val foundAllExpectedAnswers = Promise.apply[Unit]()
     val queryStream = new ByteArrayInputStream(queries.mkString("\n").getBytes)
     val buffer = new ArrayBuffer[String]()
+    val lock = new Object
 
-    def captureOutput(source: String)(line: String) {
+    def captureOutput(source: String)(line: String): Unit = lock.synchronized {
       buffer += s"$source> $line"
-      // If we haven't found all expected answers...
-      if (next.get() < expectedAnswers.size) {
-        // If another expected answer is found...
-        if (line.startsWith(expectedAnswers(next.get()))) {
-          // If all expected answers have been found...
-          if (next.incrementAndGet() == expectedAnswers.size) {
-            foundAllExpectedAnswers.trySuccess(())
-          }
+      // If we haven't found all expected answers and another expected answer comes up...
+      if (next < expectedAnswers.size && line.startsWith(expectedAnswers(next))) {
+        next += 1
+        // If all expected answers have been found...
+        if (next == expectedAnswers.size) {
+          foundAllExpectedAnswers.trySuccess(())
         }
       }
     }
@@ -88,8 +83,8 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
            |=======================
            |Spark SQL CLI command line: ${command.mkString(" ")}
            |
-           |Executed query ${next.get()} "${queries(next.get())}",
-           |But failed to capture expected output "${expectedAnswers(next.get())}" within $timeout.
+           |Executed query $next "${queries(next)}",
+           |But failed to capture expected output "${expectedAnswers(next)}" within $timeout.
            |
            |${buffer.mkString("\n")}
            |===========================


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org