You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/12/14 19:24:29 UTC

spark git commit: [SPARK-18842][TESTS][LAUNCHER] De-duplicate paths in classpaths in commands for local-cluster mode to work around the path length limitation on Windows

Repository: spark
Updated Branches:
  refs/heads/master ba4aab9b8 -> c6b8eb71a


[SPARK-18842][TESTS][LAUNCHER] De-duplicate paths in classpaths in commands for local-cluster mode to work around the path length limitation on Windows

## What changes were proposed in this pull request?

Currently, some tests are being failed and hanging on Windows due to this problem. For the reason in SPARK-18718, some tests using `local-cluster` mode were disabled on Windows due to the length limitation by paths given to classpaths.

The limitation seems roughly 32K (see the [blog in MS](https://blogs.msdn.microsoft.com/oldnewthing/20031210-00/?p=41553/) and [another reference](https://support.thoughtworks.com/hc/en-us/articles/213248526-Getting-around-maximum-command-line-length-is-32767-characters-on-Windows)) but in `local-cluster` mode, executors were being launched as processes with the command such as [here](https://gist.github.com/HyukjinKwon/5bc81061c250d4af5a180869b59d42ea) in (only) tests.

This length is roughly 40K due to the classpaths given to `java` command. However, it seems duplicates are almost half of them. So, if we deduplicate the paths, it seems reduced to roughly 20K with the command, [here](https://gist.github.com/HyukjinKwon/dad0c8db897e5e094684a2dc6a417790).

Maybe, we should consider as some more paths are added in the future but it seems better than disabling all the tests for now with minimised changes.

Therefore, this PR proposes to deduplicate the paths in classpaths in case of launching executors as processes in `local-cluster` mode.

## How was this patch tested?

Existing tests in `ShuffleSuite` and `BroadcastJoinSuite` manually via AppVeyor

Author: hyukjinkwon <gu...@gmail.com>

Closes #16266 from HyukjinKwon/disable-local-cluster-tests.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c6b8eb71
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c6b8eb71
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c6b8eb71

Branch: refs/heads/master
Commit: c6b8eb71a9638c9a8ce02d11d5fe26f4c5be531e
Parents: ba4aab9
Author: hyukjinkwon <gu...@gmail.com>
Authored: Wed Dec 14 19:24:24 2016 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Wed Dec 14 19:24:24 2016 +0000

----------------------------------------------------------------------
 core/src/test/scala/org/apache/spark/ShuffleSuite.scala | 12 ------------
 .../apache/spark/launcher/AbstractCommandBuilder.java   |  8 +++++---
 project/SparkBuild.scala                                |  3 ++-
 .../spark/sql/execution/joins/BroadcastJoinSuite.scala  | 10 ----------
 4 files changed, 7 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c6b8eb71/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index dc3a28e..e626ed3 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -51,10 +51,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
     assert(valuesFor2.toList.sorted === List(1))
   }
 
-  // Some tests using `local-cluster` here are failed on Windows due to the failure of initiating
-  // executors by the path length limitation. See SPARK-18718.
   test("shuffle non-zero block size") {
-    assume(!Utils.isWindows)
     sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
     val NUM_BLOCKS = 3
 
@@ -80,7 +77,6 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
   }
 
   test("shuffle serializer") {
-    assume(!Utils.isWindows)
     // Use a local cluster with 2 processes to make sure there are both local and remote blocks
     sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
     val a = sc.parallelize(1 to 10, 2)
@@ -97,7 +93,6 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
   }
 
   test("zero sized blocks") {
-    assume(!Utils.isWindows)
     // Use a local cluster with 2 processes to make sure there are both local and remote blocks
     sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
 
@@ -125,7 +120,6 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
   }
 
   test("zero sized blocks without kryo") {
-    assume(!Utils.isWindows)
     // Use a local cluster with 2 processes to make sure there are both local and remote blocks
     sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
 
@@ -151,7 +145,6 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
   }
 
   test("shuffle on mutable pairs") {
-    assume(!Utils.isWindows)
     // Use a local cluster with 2 processes to make sure there are both local and remote blocks
     sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
     def p[T1, T2](_1: T1, _2: T2): MutablePair[T1, T2] = MutablePair(_1, _2)
@@ -164,7 +157,6 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
   }
 
   test("sorting on mutable pairs") {
-    assume(!Utils.isWindows)
     // This is not in SortingSuite because of the local cluster setup.
     // Use a local cluster with 2 processes to make sure there are both local and remote blocks
     sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
@@ -180,7 +172,6 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
   }
 
   test("cogroup using mutable pairs") {
-    assume(!Utils.isWindows)
     // Use a local cluster with 2 processes to make sure there are both local and remote blocks
     sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
     def p[T1, T2](_1: T1, _2: T2): MutablePair[T1, T2] = MutablePair(_1, _2)
@@ -208,7 +199,6 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
   }
 
   test("subtract mutable pairs") {
-    assume(!Utils.isWindows)
     // Use a local cluster with 2 processes to make sure there are both local and remote blocks
     sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
     def p[T1, T2](_1: T1, _2: T2): MutablePair[T1, T2] = MutablePair(_1, _2)
@@ -223,7 +213,6 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
   }
 
   test("sort with Java non serializable class - Kryo") {
-    assume(!Utils.isWindows)
     // Use a local cluster with 2 processes to make sure there are both local and remote blocks
     val myConf = conf.clone().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
     sc = new SparkContext("local-cluster[2,1,1024]", "test", myConf)
@@ -238,7 +227,6 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
   }
 
   test("sort with Java non serializable class - Java") {
-    assume(!Utils.isWindows)
     // Use a local cluster with 2 processes to make sure there are both local and remote blocks
     sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
     val a = sc.parallelize(1 to 10, 2)

http://git-wip-us.apache.org/repos/asf/spark/blob/c6b8eb71/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
----------------------------------------------------------------------
diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
index ba43659..0622fef 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
@@ -26,9 +26,11 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.regex.Pattern;
 
 import static org.apache.spark.launcher.CommandBuilderUtils.*;
@@ -135,7 +137,7 @@ abstract class AbstractCommandBuilder {
   List<String> buildClassPath(String appClassPath) throws IOException {
     String sparkHome = getSparkHome();
 
-    List<String> cp = new ArrayList<>();
+    Set<String> cp = new LinkedHashSet<>();
     addToClassPath(cp, getenv("SPARK_CLASSPATH"));
     addToClassPath(cp, appClassPath);
 
@@ -201,7 +203,7 @@ abstract class AbstractCommandBuilder {
     addToClassPath(cp, getenv("HADOOP_CONF_DIR"));
     addToClassPath(cp, getenv("YARN_CONF_DIR"));
     addToClassPath(cp, getenv("SPARK_DIST_CLASSPATH"));
-    return cp;
+    return new ArrayList<>(cp);
   }
 
   /**
@@ -210,7 +212,7 @@ abstract class AbstractCommandBuilder {
    * @param cp List to which the new entries are appended.
    * @param entries New classpath entries (separated by File.pathSeparator).
    */
-  private void addToClassPath(List<String> cp, String entries) {
+  private void addToClassPath(Set<String> cp, String entries) {
     if (isEmpty(entries)) {
       return;
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/c6b8eb71/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index fdc33c7..74edd53 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -824,7 +824,8 @@ object TestSettings {
     // launched by the tests have access to the correct test-time classpath.
     envVars in Test ++= Map(
       "SPARK_DIST_CLASSPATH" ->
-        (fullClasspath in Test).value.files.map(_.getAbsolutePath).mkString(":").stripSuffix(":"),
+        (fullClasspath in Test).value.files.map(_.getAbsolutePath)
+          .mkString(File.pathSeparator).stripSuffix(File.pathSeparator),
       "SPARK_PREPEND_CLASSES" -> "1",
       "SPARK_SCALA_VERSION" -> scalaBinaryVersion,
       "SPARK_TESTING" -> "1",

http://git-wip-us.apache.org/repos/asf/spark/blob/c6b8eb71/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
index 0783935..119d6e2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
@@ -86,39 +86,31 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils {
     plan
   }
 
-  // The tests here are failed on Windows due to the failure of initiating executors
-  // by the path length limitation. See SPARK-18718.
   test("unsafe broadcast hash join updates peak execution memory") {
-    assume(!Utils.isWindows)
     testBroadcastJoinPeak[BroadcastHashJoinExec]("unsafe broadcast hash join", "inner")
   }
 
   test("unsafe broadcast hash outer join updates peak execution memory") {
-    assume(!Utils.isWindows)
     testBroadcastJoinPeak[BroadcastHashJoinExec]("unsafe broadcast hash outer join", "left_outer")
   }
 
   test("unsafe broadcast left semi join updates peak execution memory") {
-    assume(!Utils.isWindows)
     testBroadcastJoinPeak[BroadcastHashJoinExec]("unsafe broadcast left semi join", "leftsemi")
   }
 
   test("broadcast hint isn't bothered by authBroadcastJoinThreshold set to low values") {
-    assume(!Utils.isWindows)
     withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
       testBroadcastJoin[BroadcastHashJoinExec]("inner", true)
     }
   }
 
   test("broadcast hint isn't bothered by a disabled authBroadcastJoinThreshold") {
-    assume(!Utils.isWindows)
     withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
       testBroadcastJoin[BroadcastHashJoinExec]("inner", true)
     }
   }
 
   test("broadcast hint isn't propagated after a join") {
-    assume(!Utils.isWindows)
     withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
       val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value")
       val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", "value")
@@ -146,7 +138,6 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils {
   }
 
   test("broadcast hint is propagated correctly") {
-    assume(!Utils.isWindows)
     withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
       val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"), (3, "2"))).toDF("key", "value")
       val broadcasted = broadcast(df2)
@@ -167,7 +158,6 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils {
   }
 
   test("join key rewritten") {
-    assume(!Utils.isWindows)
     val l = Literal(1L)
     val i = Literal(2)
     val s = Literal.create(3, ShortType)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org