You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/12/31 03:07:11 UTC

spark git commit: [SPARK-12588] Remove HttpBroadcast in Spark 2.0.

Repository: spark
Updated Branches:
  refs/heads/master f76ee109d -> ee8f8d318


[SPARK-12588] Remove HttpBroadcast in Spark 2.0.

We switched to TorrentBroadcast in Spark 1.1, and HttpBroadcast has been undocumented since then. It's time to remove it in Spark 2.0.

Author: Reynold Xin <rx...@databricks.com>

Closes #10531 from rxin/SPARK-12588.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ee8f8d31
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ee8f8d31
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ee8f8d31

Branch: refs/heads/master
Commit: ee8f8d318417c514fbb26e57157483d466ddbfae
Parents: f76ee10
Author: Reynold Xin <rx...@databricks.com>
Authored: Wed Dec 30 18:07:07 2015 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Dec 30 18:07:07 2015 -0800

----------------------------------------------------------------------
 .../spark/broadcast/BroadcastFactory.scala      |   4 +-
 .../spark/broadcast/BroadcastManager.scala      |  13 +-
 .../apache/spark/broadcast/HttpBroadcast.scala  | 269 -------------------
 .../spark/broadcast/HttpBroadcastFactory.scala  |  47 ----
 .../spark/broadcast/TorrentBroadcast.scala      |   2 +-
 .../broadcast/TorrentBroadcastFactory.scala     |   2 +-
 .../spark/serializer/KryoSerializer.scala       |   2 -
 .../apache/spark/broadcast/BroadcastSuite.scala | 131 +--------
 docs/configuration.md                           |  19 +-
 docs/security.md                                |  13 +-
 .../apache/spark/examples/BroadcastTest.scala   |   8 +-
 project/MimaExcludes.scala                      |   3 +-
 12 files changed, 22 insertions(+), 491 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ee8f8d31/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala
index 6a187b4..7f35ac4 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala
@@ -24,14 +24,12 @@ import org.apache.spark.SparkConf
 import org.apache.spark.annotation.DeveloperApi
 
 /**
- * :: DeveloperApi ::
  * An interface for all the broadcast implementations in Spark (to allow
  * multiple broadcast implementations). SparkContext uses a user-specified
  * BroadcastFactory implementation to instantiate a particular broadcast for the
  * entire Spark job.
  */
-@DeveloperApi
-trait BroadcastFactory {
+private[spark] trait BroadcastFactory {
 
   def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager): Unit
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ee8f8d31/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala
index fac6666..6134360 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala
@@ -21,8 +21,8 @@ import java.util.concurrent.atomic.AtomicLong
 
 import scala.reflect.ClassTag
 
-import org.apache.spark._
-import org.apache.spark.util.Utils
+import org.apache.spark.{Logging, SparkConf, SecurityManager}
+
 
 private[spark] class BroadcastManager(
     val isDriver: Boolean,
@@ -39,15 +39,8 @@ private[spark] class BroadcastManager(
   private def initialize() {
     synchronized {
       if (!initialized) {
-        val broadcastFactoryClass =
-          conf.get("spark.broadcast.factory", "org.apache.spark.broadcast.TorrentBroadcastFactory")
-
-        broadcastFactory =
-          Utils.classForName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]
-
-        // Initialize appropriate BroadcastFactory and BroadcastObject
+        broadcastFactory = new TorrentBroadcastFactory
         broadcastFactory.initialize(isDriver, conf, securityManager)
-
         initialized = true
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/ee8f8d31/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
deleted file mode 100644
index b69af63..0000000
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.broadcast
-
-import java.io.{File, FileOutputStream, ObjectInputStream, ObjectOutputStream, OutputStream}
-import java.io.{BufferedInputStream, BufferedOutputStream}
-import java.net.{URL, URLConnection, URI}
-import java.util.concurrent.TimeUnit
-
-import scala.reflect.ClassTag
-
-import org.apache.spark.{HttpServer, Logging, SecurityManager, SparkConf, SparkEnv}
-import org.apache.spark.io.CompressionCodec
-import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
-import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashSet, Utils}
-
-/**
- * A [[org.apache.spark.broadcast.Broadcast]] implementation that uses HTTP server
- * as a broadcast mechanism. The first time a HTTP broadcast variable (sent as part of a
- * task) is deserialized in the executor, the broadcasted data is fetched from the driver
- * (through a HTTP server running at the driver) and stored in the BlockManager of the
- * executor to speed up future accesses.
- */
-private[spark] class HttpBroadcast[T: ClassTag](
-    @transient var value_ : T, isLocal: Boolean, id: Long)
-  extends Broadcast[T](id) with Logging with Serializable {
-
-  override protected def getValue() = value_
-
-  private val blockId = BroadcastBlockId(id)
-
-  /*
-   * Broadcasted data is also stored in the BlockManager of the driver. The BlockManagerMaster
-   * does not need to be told about this block as not only need to know about this data block.
-   */
-  HttpBroadcast.synchronized {
-    SparkEnv.get.blockManager.putSingle(
-      blockId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
-  }
-
-  if (!isLocal) {
-    HttpBroadcast.write(id, value_)
-  }
-
-  /**
-   * Remove all persisted state associated with this HTTP broadcast on the executors.
-   */
-  override protected def doUnpersist(blocking: Boolean) {
-    HttpBroadcast.unpersist(id, removeFromDriver = false, blocking)
-  }
-
-  /**
-   * Remove all persisted state associated with this HTTP broadcast on the executors and driver.
-   */
-  override protected def doDestroy(blocking: Boolean) {
-    HttpBroadcast.unpersist(id, removeFromDriver = true, blocking)
-  }
-
-  /** Used by the JVM when serializing this object. */
-  private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
-    assertValid()
-    out.defaultWriteObject()
-  }
-
-  /** Used by the JVM when deserializing this object. */
-  private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
-    in.defaultReadObject()
-    HttpBroadcast.synchronized {
-      SparkEnv.get.blockManager.getSingle(blockId) match {
-        case Some(x) => value_ = x.asInstanceOf[T]
-        case None => {
-          logInfo("Started reading broadcast variable " + id)
-          val start = System.nanoTime
-          value_ = HttpBroadcast.read[T](id)
-          /*
-           * We cache broadcast data in the BlockManager so that subsequent tasks using it
-           * do not need to re-fetch. This data is only used locally and no other node
-           * needs to fetch this block, so we don't notify the master.
-           */
-          SparkEnv.get.blockManager.putSingle(
-            blockId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
-          val time = (System.nanoTime - start) / 1e9
-          logInfo("Reading broadcast variable " + id + " took " + time + " s")
-        }
-      }
-    }
-  }
-}
-
-private[broadcast] object HttpBroadcast extends Logging {
-  private var initialized = false
-  private var broadcastDir: File = null
-  private var compress: Boolean = false
-  private var bufferSize: Int = 65536
-  private var serverUri: String = null
-  private var server: HttpServer = null
-  private var securityManager: SecurityManager = null
-
-  // TODO: This shouldn't be a global variable so that multiple SparkContexts can coexist
-  private val files = new TimeStampedHashSet[File]
-  private val httpReadTimeout = TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES).toInt
-  private var compressionCodec: CompressionCodec = null
-  private var cleaner: MetadataCleaner = null
-
-  def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {
-    synchronized {
-      if (!initialized) {
-        bufferSize = conf.getInt("spark.buffer.size", 65536)
-        compress = conf.getBoolean("spark.broadcast.compress", true)
-        securityManager = securityMgr
-        if (isDriver) {
-          createServer(conf)
-          conf.set("spark.httpBroadcast.uri", serverUri)
-        }
-        serverUri = conf.get("spark.httpBroadcast.uri")
-        cleaner = new MetadataCleaner(MetadataCleanerType.HTTP_BROADCAST, cleanup, conf)
-        compressionCodec = CompressionCodec.createCodec(conf)
-        initialized = true
-      }
-    }
-  }
-
-  def stop() {
-    synchronized {
-      if (server != null) {
-        server.stop()
-        server = null
-      }
-      if (cleaner != null) {
-        cleaner.cancel()
-        cleaner = null
-      }
-      compressionCodec = null
-      initialized = false
-    }
-  }
-
-  private def createServer(conf: SparkConf) {
-    broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf), "broadcast")
-    val broadcastPort = conf.getInt("spark.broadcast.port", 0)
-    server =
-      new HttpServer(conf, broadcastDir, securityManager, broadcastPort, "HTTP broadcast server")
-    server.start()
-    serverUri = server.uri
-    logInfo("Broadcast server started at " + serverUri)
-  }
-
-  def getFile(id: Long): File = new File(broadcastDir, BroadcastBlockId(id).name)
-
-  private def write(id: Long, value: Any) {
-    val file = getFile(id)
-    val fileOutputStream = new FileOutputStream(file)
-    Utils.tryWithSafeFinally {
-      val out: OutputStream = {
-        if (compress) {
-          compressionCodec.compressedOutputStream(fileOutputStream)
-        } else {
-          new BufferedOutputStream(fileOutputStream, bufferSize)
-        }
-      }
-      val ser = SparkEnv.get.serializer.newInstance()
-      val serOut = ser.serializeStream(out)
-      Utils.tryWithSafeFinally {
-        serOut.writeObject(value)
-      } {
-        serOut.close()
-      }
-      files += file
-    } {
-      fileOutputStream.close()
-    }
-  }
-
-  private def read[T: ClassTag](id: Long): T = {
-    logDebug("broadcast read server: " + serverUri + " id: broadcast-" + id)
-    val url = serverUri + "/" + BroadcastBlockId(id).name
-
-    var uc: URLConnection = null
-    if (securityManager.isAuthenticationEnabled()) {
-      logDebug("broadcast security enabled")
-      val newuri = Utils.constructURIForAuthentication(new URI(url), securityManager)
-      uc = newuri.toURL.openConnection()
-      uc.setConnectTimeout(httpReadTimeout)
-      uc.setAllowUserInteraction(false)
-    } else {
-      logDebug("broadcast not using security")
-      uc = new URL(url).openConnection()
-      uc.setConnectTimeout(httpReadTimeout)
-    }
-    Utils.setupSecureURLConnection(uc, securityManager)
-
-    val in = {
-      uc.setReadTimeout(httpReadTimeout)
-      val inputStream = uc.getInputStream
-      if (compress) {
-        compressionCodec.compressedInputStream(inputStream)
-      } else {
-        new BufferedInputStream(inputStream, bufferSize)
-      }
-    }
-    val ser = SparkEnv.get.serializer.newInstance()
-    val serIn = ser.deserializeStream(in)
-    Utils.tryWithSafeFinally {
-      serIn.readObject[T]()
-    } {
-      serIn.close()
-    }
-  }
-
-  /**
-   * Remove all persisted blocks associated with this HTTP broadcast on the executors.
-   * If removeFromDriver is true, also remove these persisted blocks on the driver
-   * and delete the associated broadcast file.
-   */
-  def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean): Unit = synchronized {
-    SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
-    if (removeFromDriver) {
-      val file = getFile(id)
-      files.remove(file)
-      deleteBroadcastFile(file)
-    }
-  }
-
-  /**
-   * Periodically clean up old broadcasts by removing the associated map entries and
-   * deleting the associated files.
-   */
-  private def cleanup(cleanupTime: Long) {
-    val iterator = files.internalMap.entrySet().iterator()
-    while(iterator.hasNext) {
-      val entry = iterator.next()
-      val (file, time) = (entry.getKey, entry.getValue)
-      if (time < cleanupTime) {
-        iterator.remove()
-        deleteBroadcastFile(file)
-      }
-    }
-  }
-
-  private def deleteBroadcastFile(file: File) {
-    try {
-      if (file.exists) {
-        if (file.delete()) {
-          logInfo("Deleted broadcast file: %s".format(file))
-        } else {
-          logWarning("Could not delete broadcast file: %s".format(file))
-        }
-      }
-    } catch {
-      case e: Exception =>
-        logError("Exception while deleting broadcast file: %s".format(file), e)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/ee8f8d31/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcastFactory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcastFactory.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcastFactory.scala
deleted file mode 100644
index cf3ae36..0000000
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcastFactory.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.broadcast
-
-import scala.reflect.ClassTag
-
-import org.apache.spark.{SecurityManager, SparkConf}
-
-/**
- * A [[org.apache.spark.broadcast.BroadcastFactory]] implementation that uses a
- * HTTP server as the broadcast mechanism. Refer to
- * [[org.apache.spark.broadcast.HttpBroadcast]] for more details about this mechanism.
- */
-class HttpBroadcastFactory extends BroadcastFactory {
-  override def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {
-    HttpBroadcast.initialize(isDriver, conf, securityMgr)
-  }
-
-  override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long): Broadcast[T] =
-    new HttpBroadcast[T](value_, isLocal, id)
-
-  override def stop() { HttpBroadcast.stop() }
-
-  /**
-   * Remove all persisted state associated with the HTTP broadcast with the given ID.
-   * @param removeFromDriver Whether to remove state from the driver
-   * @param blocking Whether to block until unbroadcasted
-   */
-  override def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
-    HttpBroadcast.unpersist(id, removeFromDriver, blocking)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/ee8f8d31/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 7e3764d..9bd6972 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -45,7 +45,7 @@ import org.apache.spark.util.io.ByteArrayChunkOutputStream
  * BlockManager, ready for other executors to fetch from.
  *
  * This prevents the driver from being the bottleneck in sending out multiple copies of the
- * broadcast data (one per executor) as done by the [[org.apache.spark.broadcast.HttpBroadcast]].
+ * broadcast data (one per executor).
  *
  * When initialized, TorrentBroadcast objects read SparkEnv.get.conf.
  *

http://git-wip-us.apache.org/repos/asf/spark/blob/ee8f8d31/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala
index 96d8dd7..b11f9ba 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala
@@ -26,7 +26,7 @@ import org.apache.spark.{SecurityManager, SparkConf}
  * protocol to do a distributed transfer of the broadcasted data to the executors. Refer to
  * [[org.apache.spark.broadcast.TorrentBroadcast]] for more details.
  */
-class TorrentBroadcastFactory extends BroadcastFactory {
+private[spark] class TorrentBroadcastFactory extends BroadcastFactory {
 
   override def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) { }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ee8f8d31/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index eed9937..1b4538e 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -34,7 +34,6 @@ import org.roaringbitmap.RoaringBitmap
 
 import org.apache.spark._
 import org.apache.spark.api.python.PythonBroadcast
-import org.apache.spark.broadcast.HttpBroadcast
 import org.apache.spark.network.util.ByteUnit
 import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
 import org.apache.spark.storage._
@@ -107,7 +106,6 @@ class KryoSerializer(conf: SparkConf)
     kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer())
     kryo.register(classOf[SerializableConfiguration], new KryoJavaSerializer())
     kryo.register(classOf[SerializableJobConf], new KryoJavaSerializer())
-    kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer())
     kryo.register(classOf[PythonBroadcast], new KryoJavaSerializer())
 
     kryo.register(classOf[GenericRecord], new GenericAvroSerializer(avroSchemas))

http://git-wip-us.apache.org/repos/asf/spark/blob/ee8f8d31/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
index ba21075..88fdbbd 100644
--- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
@@ -45,39 +45,8 @@ class DummyBroadcastClass(rdd: RDD[Int]) extends Serializable {
 
 class BroadcastSuite extends SparkFunSuite with LocalSparkContext {
 
-  private val httpConf = broadcastConf("HttpBroadcastFactory")
-  private val torrentConf = broadcastConf("TorrentBroadcastFactory")
-
-  test("Using HttpBroadcast locally") {
-    sc = new SparkContext("local", "test", httpConf)
-    val list = List[Int](1, 2, 3, 4)
-    val broadcast = sc.broadcast(list)
-    val results = sc.parallelize(1 to 2).map(x => (x, broadcast.value.sum))
-    assert(results.collect().toSet === Set((1, 10), (2, 10)))
-  }
-
-  test("Accessing HttpBroadcast variables from multiple threads") {
-    sc = new SparkContext("local[10]", "test", httpConf)
-    val list = List[Int](1, 2, 3, 4)
-    val broadcast = sc.broadcast(list)
-    val results = sc.parallelize(1 to 10).map(x => (x, broadcast.value.sum))
-    assert(results.collect().toSet === (1 to 10).map(x => (x, 10)).toSet)
-  }
-
-  test("Accessing HttpBroadcast variables in a local cluster") {
-    val numSlaves = 4
-    val conf = httpConf.clone
-    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
-    conf.set("spark.broadcast.compress", "true")
-    sc = new SparkContext("local-cluster[%d, 1, 1024]".format(numSlaves), "test", conf)
-    val list = List[Int](1, 2, 3, 4)
-    val broadcast = sc.broadcast(list)
-    val results = sc.parallelize(1 to numSlaves).map(x => (x, broadcast.value.sum))
-    assert(results.collect().toSet === (1 to numSlaves).map(x => (x, 10)).toSet)
-  }
-
   test("Using TorrentBroadcast locally") {
-    sc = new SparkContext("local", "test", torrentConf)
+    sc = new SparkContext("local", "test")
     val list = List[Int](1, 2, 3, 4)
     val broadcast = sc.broadcast(list)
     val results = sc.parallelize(1 to 2).map(x => (x, broadcast.value.sum))
@@ -85,7 +54,7 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext {
   }
 
   test("Accessing TorrentBroadcast variables from multiple threads") {
-    sc = new SparkContext("local[10]", "test", torrentConf)
+    sc = new SparkContext("local[10]", "test")
     val list = List[Int](1, 2, 3, 4)
     val broadcast = sc.broadcast(list)
     val results = sc.parallelize(1 to 10).map(x => (x, broadcast.value.sum))
@@ -94,7 +63,7 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext {
 
   test("Accessing TorrentBroadcast variables in a local cluster") {
     val numSlaves = 4
-    val conf = torrentConf.clone
+    val conf = new SparkConf
     conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
     conf.set("spark.broadcast.compress", "true")
     sc = new SparkContext("local-cluster[%d, 1, 1024]".format(numSlaves), "test", conf)
@@ -124,31 +93,13 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext {
 
   test("Test Lazy Broadcast variables with TorrentBroadcast") {
     val numSlaves = 2
-    val conf = torrentConf.clone
-    sc = new SparkContext("local-cluster[%d, 1, 1024]".format(numSlaves), "test", conf)
+    sc = new SparkContext("local-cluster[%d, 1, 1024]".format(numSlaves), "test")
     val rdd = sc.parallelize(1 to numSlaves)
-
     val results = new DummyBroadcastClass(rdd).doSomething()
 
     assert(results.toSet === (1 to numSlaves).map(x => (x, false)).toSet)
   }
 
-  test("Unpersisting HttpBroadcast on executors only in local mode") {
-    testUnpersistHttpBroadcast(distributed = false, removeFromDriver = false)
-  }
-
-  test("Unpersisting HttpBroadcast on executors and driver in local mode") {
-    testUnpersistHttpBroadcast(distributed = false, removeFromDriver = true)
-  }
-
-  test("Unpersisting HttpBroadcast on executors only in distributed mode") {
-    testUnpersistHttpBroadcast(distributed = true, removeFromDriver = false)
-  }
-
-  test("Unpersisting HttpBroadcast on executors and driver in distributed mode") {
-    testUnpersistHttpBroadcast(distributed = true, removeFromDriver = true)
-  }
-
   test("Unpersisting TorrentBroadcast on executors only in local mode") {
     testUnpersistTorrentBroadcast(distributed = false, removeFromDriver = false)
   }
@@ -180,66 +131,6 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext {
   }
 
   /**
-   * Verify the persistence of state associated with an HttpBroadcast in either local mode or
-   * local-cluster mode (when distributed = true).
-   *
-   * This test creates a broadcast variable, uses it on all executors, and then unpersists it.
-   * In between each step, this test verifies that the broadcast blocks and the broadcast file
-   * are present only on the expected nodes.
-   */
-  private def testUnpersistHttpBroadcast(distributed: Boolean, removeFromDriver: Boolean) {
-    val numSlaves = if (distributed) 2 else 0
-
-    // Verify that the broadcast file is created, and blocks are persisted only on the driver
-    def afterCreation(broadcastId: Long, bmm: BlockManagerMaster) {
-      val blockId = BroadcastBlockId(broadcastId)
-      val statuses = bmm.getBlockStatus(blockId, askSlaves = true)
-      assert(statuses.size === 1)
-      statuses.head match { case (bm, status) =>
-        assert(bm.isDriver, "Block should only be on the driver")
-        assert(status.storageLevel === StorageLevel.MEMORY_AND_DISK)
-        assert(status.memSize > 0, "Block should be in memory store on the driver")
-        assert(status.diskSize === 0, "Block should not be in disk store on the driver")
-      }
-      if (distributed) {
-        // this file is only generated in distributed mode
-        assert(HttpBroadcast.getFile(blockId.broadcastId).exists, "Broadcast file not found!")
-      }
-    }
-
-    // Verify that blocks are persisted in both the executors and the driver
-    def afterUsingBroadcast(broadcastId: Long, bmm: BlockManagerMaster) {
-      val blockId = BroadcastBlockId(broadcastId)
-      val statuses = bmm.getBlockStatus(blockId, askSlaves = true)
-      assert(statuses.size === numSlaves + 1)
-      statuses.foreach { case (_, status) =>
-        assert(status.storageLevel === StorageLevel.MEMORY_AND_DISK)
-        assert(status.memSize > 0, "Block should be in memory store")
-        assert(status.diskSize === 0, "Block should not be in disk store")
-      }
-    }
-
-    // Verify that blocks are unpersisted on all executors, and on all nodes if removeFromDriver
-    // is true. In the latter case, also verify that the broadcast file is deleted on the driver.
-    def afterUnpersist(broadcastId: Long, bmm: BlockManagerMaster) {
-      val blockId = BroadcastBlockId(broadcastId)
-      val statuses = bmm.getBlockStatus(blockId, askSlaves = true)
-      val expectedNumBlocks = if (removeFromDriver) 0 else 1
-      val possiblyNot = if (removeFromDriver) "" else " not"
-      assert(statuses.size === expectedNumBlocks,
-        "Block should%s be unpersisted on the driver".format(possiblyNot))
-      if (distributed && removeFromDriver) {
-        // this file is only generated in distributed mode
-        assert(!HttpBroadcast.getFile(blockId.broadcastId).exists,
-          "Broadcast file should%s be deleted".format(possiblyNot))
-      }
-    }
-
-    testUnpersistBroadcast(distributed, numSlaves, httpConf, afterCreation,
-      afterUsingBroadcast, afterUnpersist, removeFromDriver)
-  }
-
-  /**
    * Verify the persistence of state associated with an TorrentBroadcast in a local-cluster.
    *
    * This test creates a broadcast variable, uses it on all executors, and then unpersists it.
@@ -284,7 +175,7 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext {
       assert(statuses.size === expectedNumBlocks)
     }
 
-    testUnpersistBroadcast(distributed, numSlaves, torrentConf, afterCreation,
+    testUnpersistBroadcast(distributed, numSlaves, afterCreation,
       afterUsingBroadcast, afterUnpersist, removeFromDriver)
   }
 
@@ -300,7 +191,6 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext {
   private def testUnpersistBroadcast(
       distributed: Boolean,
       numSlaves: Int,  // used only when distributed = true
-      broadcastConf: SparkConf,
       afterCreation: (Long, BlockManagerMaster) => Unit,
       afterUsingBroadcast: (Long, BlockManagerMaster) => Unit,
       afterUnpersist: (Long, BlockManagerMaster) => Unit,
@@ -308,7 +198,7 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext {
 
     sc = if (distributed) {
       val _sc =
-        new SparkContext("local-cluster[%d, 1, 1024]".format(numSlaves), "test", broadcastConf)
+        new SparkContext("local-cluster[%d, 1, 1024]".format(numSlaves), "test")
       // Wait until all salves are up
       try {
         _sc.jobProgressListener.waitUntilExecutorsUp(numSlaves, 60000)
@@ -319,7 +209,7 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext {
           throw e
       }
     } else {
-      new SparkContext("local", "test", broadcastConf)
+      new SparkContext("local", "test")
     }
     val blockManagerMaster = sc.env.blockManager.master
     val list = List[Int](1, 2, 3, 4)
@@ -356,13 +246,6 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext {
       assert(results.collect().toSet === (1 to partitions).map(x => (x, list.sum)).toSet)
     }
   }
-
-  /** Helper method to create a SparkConf that uses the given broadcast factory. */
-  private def broadcastConf(factoryName: String): SparkConf = {
-    val conf = new SparkConf
-    conf.set("spark.broadcast.factory", "org.apache.spark.broadcast.%s".format(factoryName))
-    conf
-  }
 }
 
 package object testPackage extends Assertions {

http://git-wip-us.apache.org/repos/asf/spark/blob/ee8f8d31/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index a9ef37a..7d743d5 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -824,13 +824,6 @@ Apart from these, the following properties are also available, and may be useful
   </td>
 </tr>
 <tr>
-  <td><code>spark.broadcast.factory</code></td>
-  <td>org.apache.spark.broadcast.<br />TorrentBroadcastFactory</td>
-  <td>
-    Which broadcast implementation to use.
-  </td>
-</tr>
-<tr>
   <td><code>spark.cleaner.ttl</code></td>
   <td>(infinite)</td>
   <td>
@@ -1018,14 +1011,6 @@ Apart from these, the following properties are also available, and may be useful
   </td>
 </tr>
 <tr>
-  <td><code>spark.broadcast.port</code></td>
-  <td>(random)</td>
-  <td>
-    Port for the driver's HTTP broadcast server to listen on.
-    This is not relevant for torrent broadcast.
-  </td>
-</tr>
-<tr>
   <td><code>spark.driver.host</code></td>
   <td>(local hostname)</td>
   <td>
@@ -1444,8 +1429,8 @@ Apart from these, the following properties are also available, and may be useful
 
             <p>Use <code>spark.ssl.YYY.XXX</code> settings to overwrite the global configuration for
             particular protocol denoted by <code>YYY</code>. Currently <code>YYY</code> can be
-            either <code>akka</code> for Akka based connections or <code>fs</code> for broadcast and
-            file server.</p>
+            either <code>akka</code> for Akka based connections or <code>fs</code> for file
+            server.</p>
         </td>
     </tr>
     <tr>

http://git-wip-us.apache.org/repos/asf/spark/blob/ee8f8d31/docs/security.md
----------------------------------------------------------------------
diff --git a/docs/security.md b/docs/security.md
index 0bfc791..1b7741d 100644
--- a/docs/security.md
+++ b/docs/security.md
@@ -23,7 +23,7 @@ If your applications are using event logging, the directory where the event logs
 
 ## Encryption
 
-Spark supports SSL for Akka and HTTP (for broadcast and file server) protocols. SASL encryption is
+Spark supports SSL for Akka and HTTP (for file server) protocols. SASL encryption is
 supported for the block transfer service. Encryption is not yet supported for the WebUI.
 
 Encryption is not yet supported for data stored by Spark in temporary local storage, such as shuffle
@@ -32,7 +32,7 @@ to configure your cluster manager to store application data on encrypted disks.
 
 ### SSL Configuration
 
-Configuration for SSL is organized hierarchically. The user can configure the default SSL settings which will be used for all the supported communication protocols unless they are overwritten by protocol-specific settings. This way the user can easily provide the common settings for all the protocols without disabling the ability to configure each one individually. The common SSL settings are at `spark.ssl` namespace in Spark configuration, while Akka SSL configuration is at `spark.ssl.akka` and HTTP for broadcast and file server SSL configuration is at `spark.ssl.fs`. The full breakdown can be found on the [configuration page](configuration.html).
+Configuration for SSL is organized hierarchically. The user can configure the default SSL settings which will be used for all the supported communication protocols unless they are overwritten by protocol-specific settings. This way the user can easily provide the common settings for all the protocols without disabling the ability to configure each one individually. The common SSL settings are at `spark.ssl` namespace in Spark configuration, while Akka SSL configuration is at `spark.ssl.akka` and HTTP for file server SSL configuration is at `spark.ssl.fs`. The full breakdown can be found on the [configuration page](configuration.html).
 
 SSL must be configured on each node and configured for each component involved in communication using the particular protocol.
 
@@ -161,15 +161,6 @@ configure those ports.
     <td>Jetty-based. Only used if Akka RPC backend is configured.</td>
   </tr>
   <tr>
-    <td>Executor</td>
-    <td>Driver</td>
-    <td>(random)</td>
-    <td>HTTP Broadcast</td>
-    <td><code>spark.broadcast.port</code></td>
-    <td>Jetty-based. Not used by TorrentBroadcast, which sends data through the block manager
-    instead.</td>
-  </tr>
-  <tr>
     <td>Executor / Driver</td>
     <td>Executor / Driver</td>
     <td>(random)</td>

http://git-wip-us.apache.org/repos/asf/spark/blob/ee8f8d31/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
index d812262..3da5236 100644
--- a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
@@ -21,16 +21,14 @@ package org.apache.spark.examples
 import org.apache.spark.{SparkConf, SparkContext}
 
 /**
-  * Usage: BroadcastTest [slices] [numElem] [broadcastAlgo] [blockSize]
+  * Usage: BroadcastTest [slices] [numElem] [blockSize]
   */
 object BroadcastTest {
   def main(args: Array[String]) {
 
-    val bcName = if (args.length > 2) args(2) else "Http"
-    val blockSize = if (args.length > 3) args(3) else "4096"
+    val blockSize = if (args.length > 2) args(2) else "4096"
 
     val sparkConf = new SparkConf().setAppName("Broadcast Test")
-      .set("spark.broadcast.factory", s"org.apache.spark.broadcast.${bcName}BroadcastFactory")
       .set("spark.broadcast.blockSize", blockSize)
     val sc = new SparkContext(sparkConf)
 
@@ -44,7 +42,7 @@ object BroadcastTest {
       println("===========")
       val startTime = System.nanoTime
       val barr1 = sc.broadcast(arr1)
-      val observedSizes = sc.parallelize(1 to 10, slices).map(_ => barr1.value.size)
+      val observedSizes = sc.parallelize(1 to 10, slices).map(_ => barr1.value.length)
       // Collect the small RDD so we can print the observed sizes locally.
       observedSizes.collect().foreach(i => println(i))
       println("Iteration %d took %.0f milliseconds".format(i, (System.nanoTime - startTime) / 1E6))

http://git-wip-us.apache.org/repos/asf/spark/blob/ee8f8d31/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index ad878c1..b7d27c9 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -35,7 +35,8 @@ object MimaExcludes {
   def excludes(version: String) = version match {
     case v if v.startsWith("2.0") =>
       Seq(
-        ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.SparkContext.emptyRDD")
+        ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.SparkContext.emptyRDD"),
+        ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.broadcast.HttpBroadcastFactory")
         ) ++
       // When 1.6 is officially released, update this exclusion list.
       Seq(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org