You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2013/11/04 08:46:27 UTC
git commit: Merge pull request #117 from
stephenh/avoid_concurrent_modification_exception
Updated Branches:
refs/heads/branch-0.8 ec0e4f057 -> 57fdb3feb
Merge pull request #117 from stephenh/avoid_concurrent_modification_exception
Handle ConcurrentModificationExceptions in SparkContext init.
System.getProperties.toMap will fail-fast when concurrently modified,
and it seems like some other thread started by SparkContext does
a System.setProperty during it's initialization.
Handle this by just looping on ConcurrentModificationException, which
seems the safest, since the non-fail-fast methods (Hastable.entrySet)
have undefined behavior under concurrent modification.
(cherry picked from commit 8f1098a3f0de8c9b2eb9ede91a1b01da10a525ea)
Signed-off-by: Reynold Xin <rx...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/57fdb3fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/57fdb3fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/57fdb3fe
Branch: refs/heads/branch-0.8
Commit: 57fdb3febf71871fbb1bfd9c05f4c700d37367e4
Parents: ec0e4f0
Author: Matei Zaharia <ma...@eecs.berkeley.edu>
Authored: Wed Oct 30 20:11:48 2013 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Sun Nov 3 23:46:18 2013 -0800
----------------------------------------------------------------------
core/src/main/scala/org/apache/spark/SparkContext.scala | 8 +++++---
core/src/main/scala/org/apache/spark/util/Utils.scala | 7 +++++++
2 files changed, 12 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/57fdb3fe/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index b67d02f..d22795d 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.collection.Map
import scala.collection.generic.Growable
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
@@ -258,8 +258,10 @@ class SparkContext(
conf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
}
// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
- for (key <- System.getProperties.toMap[String, String].keys if key.startsWith("spark.hadoop.")) {
- conf.set(key.substring("spark.hadoop.".length), System.getProperty(key))
+ Utils.getSystemProperties.foreach { case (key, value) =>
+ if (key.startsWith("spark.hadoop.")) {
+ conf.set(key.substring("spark.hadoop.".length), value)
+ }
}
val bufferSize = System.getProperty("spark.buffer.size", "65536")
conf.set("io.file.buffer.size", bufferSize)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/57fdb3fe/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index fd2811e..0c5c12b 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -37,6 +37,7 @@ import org.apache.spark.serializer.{DeserializationStream, SerializationStream,
import org.apache.spark.deploy.SparkHadoopUtil
import java.nio.ByteBuffer
import org.apache.spark.{SparkEnv, SparkException, Logging}
+import java.util.ConcurrentModificationException
/**
@@ -818,4 +819,10 @@ private[spark] object Utils extends Logging {
// Nothing else to guard against ?
hashAbs
}
+
+ /** Returns a copy of the system properties that is thread-safe to iterator over. */
+ def getSystemProperties(): Map[String, String] = {
+ return System.getProperties().clone()
+ .asInstanceOf[java.util.Properties].toMap[String, String]
+ }
}