You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/12/14 19:27:34 UTC

spark git commit: [SPARK-18830][TESTS] Fix tests in PipedRDDSuite to pass on Windows

Repository: spark
Updated Branches:
  refs/heads/master c6b8eb71a -> 169b9d73e


[SPARK-18830][TESTS] Fix tests in PipedRDDSuite to pass on Windows

## What changes were proposed in this pull request?

This PR proposes to fix the tests failed on Windows as below:

```
[info] - pipe with empty partition *** FAILED *** (672 milliseconds)
[info]   Set(0, 4, 5) did not equal Set(0, 5, 6) (PipedRDDSuite.scala:145)
[info]   org.scalatest.exceptions.TestFailedException:
...
```

In this case, `wc -c` counts the characters on both Windows and Linux but the newlines characters on Windows are `\r\n` which are two. So, the counts ends up one more for each.

```
[info] - test pipe exports map_input_file *** FAILED *** (62 milliseconds)
[info]   java.lang.IllegalStateException: Subprocess exited with status 1. Command ran: printenv map_input_file
[info]   at org.apache.spark.rdd.PipedRDD$$anon$1.hasNext(PipedRDD.scala:178)
...
```

```
[info] - test pipe exports mapreduce_map_input_file *** FAILED *** (172 milliseconds)
[info]   java.lang.IllegalStateException: Subprocess exited with status 1. Command ran: printenv mapreduce_map_input_file
[info]   at org.apache.spark.rdd.PipedRDD$$anon$1.hasNext(PipedRDD.scala:178)
...
```

`printenv` command prints the environment variables; however, when environment variables are set to `ProcessBuilder` as lower-cased keys, `printenv` in Windows ignores and does not print this although it is actually set and accessible. (this was tested in [here](https://ci.appveyor.com/project/spark-test/spark/build/208-PipedRDDSuite) for upper-cases with this [diff](https://github.com/apache/spark/compare/master...spark-test:74d39da) and [here](https://ci.appveyor.com/project/spark-test/spark/build/203-PipedRDDSuite) for lower-cases with this [diff](https://github.com/apache/spark/compare/master...spark-test:fde5e37f28032c15a8d8693ba033a8a779a26317). It seems a bug in `printenv`.
(BTW, note that environment variables on Windows are case-insensitive).

This is (I believe) a thirdparty tool on Windows that resembles `printenv` on Linux (installed in AppVeyor environment or Windows Server 2012 R2). This command does not exist, at least, for Windows 7 and 10 (manually tested).

On Windows, we can use `cmd.exe /C set [varname]` officially for this purpose. We could fix the tests with this in order to test if the environment variable is set.

## How was this patch tested?

Manually tested via AppVeyor.

**Before**
https://ci.appveyor.com/project/spark-test/spark/build/194-PipedRDDSuite

**After**
https://ci.appveyor.com/project/spark-test/spark/build/226-PipedRDDSuite

Author: hyukjinkwon <gu...@gmail.com>

Closes #16254 from HyukjinKwon/pipe-errors.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/169b9d73
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/169b9d73
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/169b9d73

Branch: refs/heads/master
Commit: 169b9d73ee2136194df42c8deaaa95572b4ae56c
Parents: c6b8eb7
Author: hyukjinkwon <gu...@gmail.com>
Authored: Wed Dec 14 19:27:29 2016 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Wed Dec 14 19:27:29 2016 +0000

----------------------------------------------------------------------
 .../org/apache/spark/rdd/PipedRDDSuite.scala    | 308 +++++++++----------
 1 file changed, 151 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/169b9d73/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
index 7293aa9..287ae6f 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
@@ -32,109 +32,104 @@ import org.apache.spark._
 import org.apache.spark.util.Utils
 
 class PipedRDDSuite extends SparkFunSuite with SharedSparkContext {
+  val envCommand = if (Utils.isWindows) {
+    "cmd.exe /C set"
+  } else {
+    "printenv"
+  }
 
   test("basic pipe") {
-    if (testCommandAvailable("cat")) {
-      val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
+    assume(testCommandAvailable("cat"))
+    val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
 
-      val piped = nums.pipe(Seq("cat"))
+    val piped = nums.pipe(Seq("cat"))
 
-      val c = piped.collect()
-      assert(c.size === 4)
-      assert(c(0) === "1")
-      assert(c(1) === "2")
-      assert(c(2) === "3")
-      assert(c(3) === "4")
-    } else {
-      assert(true)
-    }
+    val c = piped.collect()
+    assert(c.size === 4)
+    assert(c(0) === "1")
+    assert(c(1) === "2")
+    assert(c(2) === "3")
+    assert(c(3) === "4")
   }
 
   test("basic pipe with tokenization") {
-    if (testCommandAvailable("wc")) {
-      val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
-
-      // verify that both RDD.pipe(command: String) and RDD.pipe(command: String, env) work good
-      for (piped <- Seq(nums.pipe("wc -l"), nums.pipe("wc -l", Map[String, String]()))) {
-        val c = piped.collect()
-        assert(c.size === 2)
-        assert(c(0).trim === "2")
-        assert(c(1).trim === "2")
-      }
-    } else {
-      assert(true)
+    assume(testCommandAvailable("wc"))
+    val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
+
+    // verify that both RDD.pipe(command: String) and RDD.pipe(command: String, env) work good
+    for (piped <- Seq(nums.pipe("wc -l"), nums.pipe("wc -l", Map[String, String]()))) {
+      val c = piped.collect()
+      assert(c.size === 2)
+      assert(c(0).trim === "2")
+      assert(c(1).trim === "2")
     }
   }
 
   test("failure in iterating over pipe input") {
-    if (testCommandAvailable("cat")) {
-      val nums =
-        sc.makeRDD(Array(1, 2, 3, 4), 2)
-          .mapPartitionsWithIndex((index, iterator) => {
-            new Iterator[Int] {
-              def hasNext = true
-              def next() = {
-                throw new SparkException("Exception to simulate bad scenario")
-              }
-            }
-          })
-
-      val piped = nums.pipe(Seq("cat"))
-
-      intercept[SparkException] {
-        piped.collect()
-      }
+    assume(testCommandAvailable("cat"))
+    val nums =
+      sc.makeRDD(Array(1, 2, 3, 4), 2)
+        .mapPartitionsWithIndex((index, iterator) => {
+        new Iterator[Int] {
+          def hasNext = true
+          def next() = {
+            throw new SparkException("Exception to simulate bad scenario")
+          }
+        }
+      })
+
+    val piped = nums.pipe(Seq("cat"))
+
+    intercept[SparkException] {
+      piped.collect()
     }
   }
 
   test("advanced pipe") {
-    if (testCommandAvailable("cat")) {
-      val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
-      val bl = sc.broadcast(List("0"))
-
-      val piped = nums.pipe(Seq("cat"),
+    assume(testCommandAvailable("cat"))
+    val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
+    val bl = sc.broadcast(List("0"))
+
+    val piped = nums.pipe(Seq("cat"),
+      Map[String, String](),
+      (f: String => Unit) => {
+        bl.value.foreach(f); f("\u0001")
+      },
+      (i: Int, f: String => Unit) => f(i + "_"))
+
+    val c = piped.collect()
+
+    assert(c.size === 8)
+    assert(c(0) === "0")
+    assert(c(1) === "\u0001")
+    assert(c(2) === "1_")
+    assert(c(3) === "2_")
+    assert(c(4) === "0")
+    assert(c(5) === "\u0001")
+    assert(c(6) === "3_")
+    assert(c(7) === "4_")
+
+    val nums1 = sc.makeRDD(Array("a\t1", "b\t2", "a\t3", "b\t4"), 2)
+    val d = nums1.groupBy(str => str.split("\t")(0)).
+      pipe(Seq("cat"),
         Map[String, String](),
         (f: String => Unit) => {
           bl.value.foreach(f); f("\u0001")
         },
-        (i: Int, f: String => Unit) => f(i + "_"))
-
-      val c = piped.collect()
-
-      assert(c.size === 8)
-      assert(c(0) === "0")
-      assert(c(1) === "\u0001")
-      assert(c(2) === "1_")
-      assert(c(3) === "2_")
-      assert(c(4) === "0")
-      assert(c(5) === "\u0001")
-      assert(c(6) === "3_")
-      assert(c(7) === "4_")
-
-      val nums1 = sc.makeRDD(Array("a\t1", "b\t2", "a\t3", "b\t4"), 2)
-      val d = nums1.groupBy(str => str.split("\t")(0)).
-        pipe(Seq("cat"),
-          Map[String, String](),
-          (f: String => Unit) => {
-            bl.value.foreach(f); f("\u0001")
-          },
-          (i: Tuple2[String, Iterable[String]], f: String => Unit) => {
-            for (e <- i._2) {
-              f(e + "_")
-            }
-          }).collect()
-      assert(d.size === 8)
-      assert(d(0) === "0")
-      assert(d(1) === "\u0001")
-      assert(d(2) === "b\t2_")
-      assert(d(3) === "b\t4_")
-      assert(d(4) === "0")
-      assert(d(5) === "\u0001")
-      assert(d(6) === "a\t1_")
-      assert(d(7) === "a\t3_")
-    } else {
-      assert(true)
-    }
+        (i: Tuple2[String, Iterable[String]], f: String => Unit) => {
+          for (e <- i._2) {
+            f(e + "_")
+          }
+        }).collect()
+    assert(d.size === 8)
+    assert(d(0) === "0")
+    assert(d(1) === "\u0001")
+    assert(d(2) === "b\t2_")
+    assert(d(3) === "b\t4_")
+    assert(d(4) === "0")
+    assert(d(5) === "\u0001")
+    assert(d(6) === "a\t1_")
+    assert(d(7) === "a\t3_")
   }
 
   test("pipe with empty partition") {
@@ -142,67 +137,67 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext {
     val piped = data.pipe("wc -c")
     assert(piped.count == 8)
     val charCounts = piped.map(_.trim.toInt).collect().toSet
-    assert(Set(0, 4, 5) == charCounts)
+    val expected = if (Utils.isWindows) {
+      // Note that newline character on Windows is \r\n which are two.
+      Set(0, 5, 6)
+    } else {
+      Set(0, 4, 5)
+    }
+    assert(expected == charCounts)
   }
 
   test("pipe with env variable") {
-    if (testCommandAvailable("printenv")) {
-      val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
-      val piped = nums.pipe(Seq("printenv", "MY_TEST_ENV"), Map("MY_TEST_ENV" -> "LALALA"))
-      val c = piped.collect()
-      assert(c.size === 2)
-      assert(c(0) === "LALALA")
-      assert(c(1) === "LALALA")
-    } else {
-      assert(true)
-    }
+    assume(testCommandAvailable(envCommand))
+    val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
+    val piped = nums.pipe(s"$envCommand MY_TEST_ENV", Map("MY_TEST_ENV" -> "LALALA"))
+    val c = piped.collect()
+    assert(c.length === 2)
+    // On Windows, `cmd.exe /C set` is used which prints out it as `varname=value` format
+    // whereas `printenv` usually prints out `value`. So, `varname=` is stripped here for both.
+    assert(c(0).stripPrefix("MY_TEST_ENV=") === "LALALA")
+    assert(c(1).stripPrefix("MY_TEST_ENV=") === "LALALA")
   }
 
   test("pipe with process which cannot be launched due to bad command") {
-    if (!testCommandAvailable("some_nonexistent_command")) {
-      val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
-      val command = Seq("some_nonexistent_command")
-      val piped = nums.pipe(command)
-      val exception = intercept[SparkException] {
-        piped.collect()
-      }
-      assert(exception.getMessage.contains(command.mkString(" ")))
+    assume(!testCommandAvailable("some_nonexistent_command"))
+    val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
+    val command = Seq("some_nonexistent_command")
+    val piped = nums.pipe(command)
+    val exception = intercept[SparkException] {
+      piped.collect()
     }
+    assert(exception.getMessage.contains(command.mkString(" ")))
   }
 
   test("pipe with process which is launched but fails with non-zero exit status") {
-    if (testCommandAvailable("cat")) {
-      val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
-      val command = Seq("cat", "nonexistent_file")
-      val piped = nums.pipe(command)
-      val exception = intercept[SparkException] {
-        piped.collect()
-      }
-      assert(exception.getMessage.contains(command.mkString(" ")))
+    assume(testCommandAvailable("cat"))
+    val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
+    val command = Seq("cat", "nonexistent_file")
+    val piped = nums.pipe(command)
+    val exception = intercept[SparkException] {
+      piped.collect()
     }
+    assert(exception.getMessage.contains(command.mkString(" ")))
   }
 
   test("basic pipe with separate working directory") {
-    if (testCommandAvailable("cat")) {
-      val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
-      val piped = nums.pipe(Seq("cat"), separateWorkingDir = true)
-      val c = piped.collect()
-      assert(c.size === 4)
-      assert(c(0) === "1")
-      assert(c(1) === "2")
-      assert(c(2) === "3")
-      assert(c(3) === "4")
-      val pipedPwd = nums.pipe(Seq("pwd"), separateWorkingDir = true)
-      val collectPwd = pipedPwd.collect()
-      assert(collectPwd(0).contains("tasks/"))
-      val pipedLs = nums.pipe(Seq("ls"), separateWorkingDir = true, bufferSize = 16384).collect()
-      // make sure symlinks were created
-      assert(pipedLs.length > 0)
-      // clean up top level tasks directory
-      Utils.deleteRecursively(new File("tasks"))
-    } else {
-      assert(true)
-    }
+    assume(testCommandAvailable("cat"))
+    val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
+    val piped = nums.pipe(Seq("cat"), separateWorkingDir = true)
+    val c = piped.collect()
+    assert(c.size === 4)
+    assert(c(0) === "1")
+    assert(c(1) === "2")
+    assert(c(2) === "3")
+    assert(c(3) === "4")
+    val pipedPwd = nums.pipe(Seq("pwd"), separateWorkingDir = true)
+    val collectPwd = pipedPwd.collect()
+    assert(collectPwd(0).contains("tasks/"))
+    val pipedLs = nums.pipe(Seq("ls"), separateWorkingDir = true, bufferSize = 16384).collect()
+    // make sure symlinks were created
+    assert(pipedLs.length > 0)
+    // clean up top level tasks directory
+    Utils.deleteRecursively(new File("tasks"))
   }
 
   test("test pipe exports map_input_file") {
@@ -219,36 +214,35 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext {
   }
 
   def testExportInputFile(varName: String) {
-    if (testCommandAvailable("printenv")) {
-      val nums = new HadoopRDD(sc, new JobConf(), classOf[TextInputFormat], classOf[LongWritable],
-        classOf[Text], 2) {
-        override def getPartitions: Array[Partition] = Array(generateFakeHadoopPartition())
+    assume(testCommandAvailable(envCommand))
+    val nums = new HadoopRDD(sc, new JobConf(), classOf[TextInputFormat], classOf[LongWritable],
+      classOf[Text], 2) {
+      override def getPartitions: Array[Partition] = Array(generateFakeHadoopPartition())
 
-        override val getDependencies = List[Dependency[_]]()
+      override val getDependencies = List[Dependency[_]]()
 
-        override def compute(theSplit: Partition, context: TaskContext) = {
-          new InterruptibleIterator[(LongWritable, Text)](context, Iterator((new LongWritable(1),
-            new Text("b"))))
-        }
+      override def compute(theSplit: Partition, context: TaskContext) = {
+        new InterruptibleIterator[(LongWritable, Text)](context, Iterator((new LongWritable(1),
+          new Text("b"))))
       }
-      val hadoopPart1 = generateFakeHadoopPartition()
-      val pipedRdd =
-        new PipedRDD(
-          nums,
-          PipedRDD.tokenize("printenv " + varName),
-          Map(),
-          null,
-          null,
-          false,
-          4092,
-          Codec.defaultCharsetCodec.name)
-      val tContext = TaskContext.empty()
-      val rddIter = pipedRdd.compute(hadoopPart1, tContext)
-      val arr = rddIter.toArray
-      assert(arr(0) == "/some/path")
-    } else {
-      // printenv isn't available so just pass the test
     }
+    val hadoopPart1 = generateFakeHadoopPartition()
+    val pipedRdd =
+      new PipedRDD(
+        nums,
+        PipedRDD.tokenize(s"$envCommand $varName"),
+        Map(),
+        null,
+        null,
+        false,
+        4092,
+        Codec.defaultCharsetCodec.name)
+    val tContext = TaskContext.empty()
+    val rddIter = pipedRdd.compute(hadoopPart1, tContext)
+    val arr = rddIter.toArray
+    // On Windows, `cmd.exe /C set` is used which prints out it as `varname=value` format
+    // whereas `printenv` usually prints out `value`. So, `varname=` is stripped here for both.
+    assert(arr(0).stripPrefix(s"$varName=") === "/some/path")
   }
 
   def generateFakeHadoopPartition(): HadoopPartition = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org