You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/10/20 12:51:21 UTC

[spark] branch master updated: [SPARK-37058][REPL][TESTS] Add spark-shell command line unit test

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new deda62d  [SPARK-37058][REPL][TESTS] Add spark-shell command line unit test
deda62d is described below

commit deda62d85f7803c1fd92c21b64ec8a399652ad36
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Wed Oct 20 20:50:22 2021 +0800

    [SPARK-37058][REPL][TESTS] Add spark-shell command line unit test
    
    ### What changes were proposed in this pull request?
    
    In current unit tests for `spark-shell`, we don't have an end-to-end  test,  this cause some bug was not captured, in this PR we add a `spark-shell` command line tool's end to end unit test.
    
    ### Why are the changes needed?
    Add end-to-end unit test for  spark-shell command tool.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added UT
    
    Closes #34329 from AngersZhuuuu/SPARK-37058.
    
    Authored-by: Angerszhuuuu <an...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../org/apache/spark/repl/SparkShellSuite.scala    | 167 +++++++++++++++++++++
 1 file changed, 167 insertions(+)

diff --git a/repl/src/test/scala/org/apache/spark/repl/SparkShellSuite.scala b/repl/src/test/scala/org/apache/spark/repl/SparkShellSuite.scala
new file mode 100644
index 0000000..ba815d7
--- /dev/null
+++ b/repl/src/test/scala/org/apache/spark/repl/SparkShellSuite.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.repl
+
+import java.io._
+import java.nio.charset.StandardCharsets
+import java.sql.Timestamp
+import java.util.Date
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.Promise
+import scala.concurrent.duration._
+
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.ProcessTestUtils.ProcessOutputCapturer
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.ThreadUtils
+
+class SparkShellSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
+  /**
+   * Run a spark-shell operation and expect all the script and expected answers to be returned.
+   * This method refers to [[runCliWithin()]] method in [[CliSuite]].
+   *
+   * @param timeout maximum time for the commands to complete
+   * @param extraArgs any extra arguments
+   * @param errorResponses a sequence of strings whose presence in the stdout of the forked process
+   *                       is taken as an immediate error condition. That is: if a line containing
+   *                       with one of these strings is found, fail the test immediately.
+   *                       The default value is `Seq("Error:")`
+   * @param scriptsAndExpectedAnswers one or more tuples of query + answer
+   */
+  def runInterpreter(
+      timeout: FiniteDuration,
+      extraArgs: Seq[String] = Seq.empty,
+      errorResponses: Seq[String] = Seq("Error:"))(
+      scriptsAndExpectedAnswers: (String, String)*): Unit = {
+
+    val scripts = scriptsAndExpectedAnswers.map(_._1 + "\n").mkString
+    val expectedAnswers = scriptsAndExpectedAnswers.flatMap {
+      case (_, answer) =>
+        Seq(answer)
+    }
+
+    val command = {
+      val cliScript = "../bin/spark-shell".split("/").mkString(File.separator)
+      s"""$cliScript
+         |  --master local
+         |  --conf spark.ui.enabled=false
+       """.stripMargin.split("\\s+").toSeq ++ extraArgs
+    }
+
+    var next = 0
+    val foundMasterAndApplicationIdMessage = Promise.apply[Unit]()
+    val foundAllExpectedAnswers = Promise.apply[Unit]()
+    val buffer = new ArrayBuffer[String]()
+    val lock = new Object
+
+    def captureOutput(source: String)(line: String): Unit = lock.synchronized {
+      // This test suite sometimes gets extremely slow out of unknown reason on Jenkins.  Here we
+      // add a timestamp to provide more diagnosis information.
+      val newLine = s"${new Timestamp(new Date().getTime)} - $source> $line"
+      log.info(newLine)
+      buffer += newLine
+
+      if (line.startsWith("Spark context available") && line.contains("app id")) {
+        foundMasterAndApplicationIdMessage.trySuccess(())
+      }
+
+      // If we haven't found all expected answers and another expected answer comes up...
+      if (next < expectedAnswers.size && line.contains(expectedAnswers(next))) {
+        log.info(s"$source> found expected output line $next: '${expectedAnswers(next)}'")
+        next += 1
+        // If all expected answers have been found...
+        if (next == expectedAnswers.size) {
+          foundAllExpectedAnswers.trySuccess(())
+        }
+      } else {
+        errorResponses.foreach { r =>
+          if (line.contains(r)) {
+            foundAllExpectedAnswers.tryFailure(
+              new RuntimeException(s"Failed with error line '$line'"))
+          }
+        }
+      }
+    }
+
+    val process = new ProcessBuilder(command: _*).start()
+
+    val stdinWriter = new OutputStreamWriter(process.getOutputStream, StandardCharsets.UTF_8)
+    stdinWriter.write(scripts)
+    stdinWriter.flush()
+    stdinWriter.close()
+
+    new ProcessOutputCapturer(process.getInputStream, captureOutput("stdout")).start()
+    new ProcessOutputCapturer(process.getErrorStream, captureOutput("stderr")).start()
+
+    try {
+      val timeoutForQuery = if (!extraArgs.contains("-e")) {
+        // Wait for spark-shell driver to boot, up to two minutes
+        ThreadUtils.awaitResult(foundMasterAndApplicationIdMessage.future, 2.minutes)
+        log.info("spark-shell driver is booted. Waiting for expected answers.")
+        // Given timeout is applied after the spark-shell driver is ready
+        timeout
+      } else {
+        // There's no boot message if -e option is provided, just extend timeout long enough
+        // so that the bootup duration is counted on the timeout
+        2.minutes + timeout
+      }
+      ThreadUtils.awaitResult(foundAllExpectedAnswers.future, timeoutForQuery)
+      log.info("Found all expected output.")
+    } catch { case cause: Throwable =>
+      val message =
+        s"""
+           |=======================
+           |SparkShellSuite failure output
+           |=======================
+           |Spark Shell command line: ${command.mkString(" ")}
+           |Exception: $cause
+           |Failed to capture next expected output "${expectedAnswers(next)}" within $timeout.
+           |
+           |${buffer.mkString("\n")}
+           |===========================
+           |End SparkShellSuite failure output
+           |===========================
+         """.stripMargin
+      logError(message, cause)
+      fail(message, cause)
+    } finally {
+      if (!process.waitFor(1, MINUTES)) {
+        try {
+          log.warn("spark-shell did not exit gracefully.")
+        } finally {
+          process.destroy()
+        }
+      }
+    }
+  }
+
+  test("SPARK-37058: Add command line unit test for spark-shell") {
+    runInterpreter(2.minute, Seq.empty)(
+      """
+        |spark.sql("drop table if exists t_37058")
+      """.stripMargin -> "res0: org.apache.spark.sql.DataFrame = []")
+  }
+
+  test("SPARK-37058: Add command line unit test for spark-shell with --verbose") {
+    runInterpreter(2.minute, Seq("--verbose"))(
+      "".stripMargin -> "org.apache.spark.repl.Main")
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org