You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/07/16 19:44:58 UTC

git commit: Tightening visibility for various Broadcast related classes.

Repository: spark
Updated Branches:
  refs/heads/master 33e64ecac -> efe2a8b12


Tightening visibility for various Broadcast related classes.

In preparation for SPARK-2521.

Author: Reynold Xin <rx...@apache.org>

Closes #1438 from rxin/broadcast and squashes the following commits:

432f1cc [Reynold Xin] Tightening visibility for various Broadcast related classes.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/efe2a8b1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/efe2a8b1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/efe2a8b1

Branch: refs/heads/master
Commit: efe2a8b1262a371471f52ca7d47dc34789e80558
Parents: 33e64ec
Author: Reynold Xin <rx...@apache.org>
Authored: Wed Jul 16 10:44:54 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Wed Jul 16 10:44:54 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/broadcast/Broadcast.scala  |  8 ++---
 .../apache/spark/broadcast/HttpBroadcast.scala  | 14 ++++-----
 .../spark/broadcast/HttpBroadcastFactory.scala  |  8 ++---
 .../spark/broadcast/TorrentBroadcast.scala      | 33 ++++++++++----------
 .../broadcast/TorrentBroadcastFactory.scala     |  8 ++---
 5 files changed, 36 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/efe2a8b1/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
index 76956f6..15fd30e 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
@@ -106,23 +106,23 @@ abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable {
    * Actually get the broadcasted value. Concrete implementations of Broadcast class must
    * define their own way to get the value.
    */
-  private[spark] def getValue(): T
+  protected def getValue(): T
 
   /**
    * Actually unpersist the broadcasted value on the executors. Concrete implementations of
    * Broadcast class must define their own logic to unpersist their own data.
    */
-  private[spark] def doUnpersist(blocking: Boolean)
+  protected def doUnpersist(blocking: Boolean)
 
   /**
    * Actually destroy all data and metadata related to this broadcast variable.
    * Implementation of Broadcast class must define their own logic to destroy their own
    * state.
    */
-  private[spark] def doDestroy(blocking: Boolean)
+  protected def doDestroy(blocking: Boolean)
 
   /** Check if this broadcast is valid. If not valid, exception is thrown. */
-  private[spark] def assertValid() {
+  protected def assertValid() {
     if (!_isValid) {
       throw new SparkException("Attempted to use %s after it has been destroyed!".format(toString))
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/efe2a8b1/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index 4f6caba..4874564 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -40,9 +40,9 @@ private[spark] class HttpBroadcast[T: ClassTag](
     @transient var value_ : T, isLocal: Boolean, id: Long)
   extends Broadcast[T](id) with Logging with Serializable {
 
-  def getValue = value_
+  override protected def getValue() = value_
 
-  val blockId = BroadcastBlockId(id)
+  private val blockId = BroadcastBlockId(id)
 
   /*
    * Broadcasted data is also stored in the BlockManager of the driver. The BlockManagerMaster
@@ -60,14 +60,14 @@ private[spark] class HttpBroadcast[T: ClassTag](
   /**
    * Remove all persisted state associated with this HTTP broadcast on the executors.
    */
-  def doUnpersist(blocking: Boolean) {
+  override protected def doUnpersist(blocking: Boolean) {
     HttpBroadcast.unpersist(id, removeFromDriver = false, blocking)
   }
 
   /**
    * Remove all persisted state associated with this HTTP broadcast on the executors and driver.
    */
-  def doDestroy(blocking: Boolean) {
+  override protected def doDestroy(blocking: Boolean) {
     HttpBroadcast.unpersist(id, removeFromDriver = true, blocking)
   }
 
@@ -102,7 +102,7 @@ private[spark] class HttpBroadcast[T: ClassTag](
   }
 }
 
-private[spark] object HttpBroadcast extends Logging {
+private[broadcast] object HttpBroadcast extends Logging {
   private var initialized = false
   private var broadcastDir: File = null
   private var compress: Boolean = false
@@ -160,7 +160,7 @@ private[spark] object HttpBroadcast extends Logging {
 
   def getFile(id: Long) = new File(broadcastDir, BroadcastBlockId(id).name)
 
-  def write(id: Long, value: Any) {
+  private def write(id: Long, value: Any) {
     val file = getFile(id)
     val out: OutputStream = {
       if (compress) {
@@ -176,7 +176,7 @@ private[spark] object HttpBroadcast extends Logging {
     files += file
   }
 
-  def read[T: ClassTag](id: Long): T = {
+  private def read[T: ClassTag](id: Long): T = {
     logDebug("broadcast read server: " +  serverUri + " id: broadcast-" + id)
     val url = serverUri + "/" + BroadcastBlockId(id).name
 

http://git-wip-us.apache.org/repos/asf/spark/blob/efe2a8b1/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcastFactory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcastFactory.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcastFactory.scala
index d5a031e..c7ef02d 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcastFactory.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcastFactory.scala
@@ -27,21 +27,21 @@ import org.apache.spark.{SecurityManager, SparkConf}
  * [[org.apache.spark.broadcast.HttpBroadcast]] for more details about this mechanism.
  */
 class HttpBroadcastFactory extends BroadcastFactory {
-  def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {
+  override def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {
     HttpBroadcast.initialize(isDriver, conf, securityMgr)
   }
 
-  def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =
+  override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =
     new HttpBroadcast[T](value_, isLocal, id)
 
-  def stop() { HttpBroadcast.stop() }
+  override def stop() { HttpBroadcast.stop() }
 
   /**
    * Remove all persisted state associated with the HTTP broadcast with the given ID.
    * @param removeFromDriver Whether to remove state from the driver
    * @param blocking Whether to block until unbroadcasted
    */
-  def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
+  override def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
     HttpBroadcast.unpersist(id, removeFromDriver, blocking)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/efe2a8b1/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 734de37..86731b6 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -20,7 +20,6 @@ package org.apache.spark.broadcast
 import java.io.{ByteArrayInputStream, ObjectInputStream, ObjectOutputStream}
 
 import scala.reflect.ClassTag
-import scala.math
 import scala.util.Random
 
 import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
@@ -49,19 +48,19 @@ private[spark] class TorrentBroadcast[T: ClassTag](
     @transient var value_ : T, isLocal: Boolean, id: Long)
   extends Broadcast[T](id) with Logging with Serializable {
 
-  def getValue = value_
+  override protected def getValue() = value_
 
-  val broadcastId = BroadcastBlockId(id)
+  private val broadcastId = BroadcastBlockId(id)
 
   TorrentBroadcast.synchronized {
     SparkEnv.get.blockManager.putSingle(
       broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
   }
 
-  @transient var arrayOfBlocks: Array[TorrentBlock] = null
-  @transient var totalBlocks = -1
-  @transient var totalBytes = -1
-  @transient var hasBlocks = 0
+  @transient private var arrayOfBlocks: Array[TorrentBlock] = null
+  @transient private var totalBlocks = -1
+  @transient private var totalBytes = -1
+  @transient private var hasBlocks = 0
 
   if (!isLocal) {
     sendBroadcast()
@@ -70,7 +69,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](
   /**
    * Remove all persisted state associated with this Torrent broadcast on the executors.
    */
-  def doUnpersist(blocking: Boolean) {
+  override protected def doUnpersist(blocking: Boolean) {
     TorrentBroadcast.unpersist(id, removeFromDriver = false, blocking)
   }
 
@@ -78,11 +77,11 @@ private[spark] class TorrentBroadcast[T: ClassTag](
    * Remove all persisted state associated with this Torrent broadcast on the executors
    * and driver.
    */
-  def doDestroy(blocking: Boolean) {
+  override protected def doDestroy(blocking: Boolean) {
     TorrentBroadcast.unpersist(id, removeFromDriver = true, blocking)
   }
 
-  def sendBroadcast() {
+  private def sendBroadcast() {
     val tInfo = TorrentBroadcast.blockifyObject(value_)
     totalBlocks = tInfo.totalBlocks
     totalBytes = tInfo.totalBytes
@@ -159,7 +158,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](
     hasBlocks = 0
   }
 
-  def receiveBroadcast(): Boolean = {
+  private def receiveBroadcast(): Boolean = {
     // Receive meta-info about the size of broadcast data,
     // the number of chunks it is divided into, etc.
     val metaId = BroadcastBlockId(id, "meta")
@@ -211,7 +210,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](
 
 }
 
-private[spark] object TorrentBroadcast extends Logging {
+private[broadcast] object TorrentBroadcast extends Logging {
   private lazy val BLOCK_SIZE = conf.getInt("spark.broadcast.blockSize", 4096) * 1024
   private var initialized = false
   private var conf: SparkConf = null
@@ -272,17 +271,19 @@ private[spark] object TorrentBroadcast extends Logging {
    * Remove all persisted blocks associated with this torrent broadcast on the executors.
    * If removeFromDriver is true, also remove these persisted blocks on the driver.
    */
-  def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = synchronized {
-    SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
+  def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = {
+    synchronized {
+      SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
+    }
   }
 }
 
-private[spark] case class TorrentBlock(
+private[broadcast] case class TorrentBlock(
     blockID: Int,
     byteArray: Array[Byte])
   extends Serializable
 
-private[spark] case class TorrentInfo(
+private[broadcast] case class TorrentInfo(
     @transient arrayOfBlocks: Array[TorrentBlock],
     totalBlocks: Int,
     totalBytes: Int)

http://git-wip-us.apache.org/repos/asf/spark/blob/efe2a8b1/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala
index 1de8396..ad0f701 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala
@@ -28,21 +28,21 @@ import org.apache.spark.{SecurityManager, SparkConf}
  */
 class TorrentBroadcastFactory extends BroadcastFactory {
 
-  def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {
+  override def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {
     TorrentBroadcast.initialize(isDriver, conf)
   }
 
-  def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =
+  override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =
     new TorrentBroadcast[T](value_, isLocal, id)
 
-  def stop() { TorrentBroadcast.stop() }
+  override def stop() { TorrentBroadcast.stop() }
 
   /**
    * Remove all persisted state associated with the torrent broadcast with the given ID.
    * @param removeFromDriver Whether to remove state from the driver.
    * @param blocking Whether to block until unbroadcasted
    */
-  def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
+  override def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
     TorrentBroadcast.unpersist(id, removeFromDriver, blocking)
   }
 }