You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2013/12/14 09:41:50 UTC

[06/50] [abbrv] Merge branch 'master' into scala-2.10

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 0000000,a45bee5..3ccc38d
mode 000000,100644..100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@@ -1,0 -1,238 +1,238 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.spark.scheduler.cluster
+ 
+ import java.util.concurrent.atomic.AtomicInteger
+ 
+ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
++import scala.concurrent.Await
++import scala.concurrent.duration._
+ 
+ import akka.actor._
 -import akka.dispatch.Await
+ import akka.pattern.ask
 -import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent}
 -import akka.util.Duration
 -import akka.util.duration._
++import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent}
+ 
+ import org.apache.spark.{SparkException, Logging, TaskState}
+ import org.apache.spark.scheduler.TaskDescription
+ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
+ import org.apache.spark.util.Utils
+ 
+ /**
+  * A scheduler backend that waits for coarse grained executors to connect to it through Akka.
+  * This backend holds onto each executor for the duration of the Spark job rather than relinquishing
+  * executors whenever a task is done and asking the scheduler to launch a new executor for
+  * each new task. Executors may be launched in a variety of ways, such as Mesos tasks for the
+  * coarse-grained Mesos mode or standalone processes for Spark's standalone deploy mode
+  * (spark.deploy.*).
+  */
+ private[spark]
+ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem)
+   extends SchedulerBackend with Logging
+ {
+   // Use an atomic variable to track total number of cores in the cluster for simplicity and speed
+   var totalCoreCount = new AtomicInteger(0)
+ 
+   class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
+     private val executorActor = new HashMap[String, ActorRef]
+     private val executorAddress = new HashMap[String, Address]
+     private val executorHost = new HashMap[String, String]
+     private val freeCores = new HashMap[String, Int]
+     private val actorToExecutorId = new HashMap[ActorRef, String]
+     private val addressToExecutorId = new HashMap[Address, String]
+ 
+     override def preStart() {
+       // Listen for remote client disconnection events, since they don't go through Akka's watch()
 -      context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
++      context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
+ 
+       // Periodically revive offers to allow delay scheduling to work
+       val reviveInterval = System.getProperty("spark.scheduler.revive.interval", "1000").toLong
++      import context.dispatcher
+       context.system.scheduler.schedule(0.millis, reviveInterval.millis, self, ReviveOffers)
+     }
+ 
+     def receive = {
+       case RegisterExecutor(executorId, hostPort, cores) =>
+         Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
+         if (executorActor.contains(executorId)) {
+           sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
+         } else {
+           logInfo("Registered executor: " + sender + " with ID " + executorId)
+           sender ! RegisteredExecutor(sparkProperties)
+           context.watch(sender)
+           executorActor(executorId) = sender
+           executorHost(executorId) = Utils.parseHostPort(hostPort)._1
+           freeCores(executorId) = cores
+           executorAddress(executorId) = sender.path.address
+           actorToExecutorId(sender) = executorId
+           addressToExecutorId(sender.path.address) = executorId
+           totalCoreCount.addAndGet(cores)
+           makeOffers()
+         }
+ 
+       case StatusUpdate(executorId, taskId, state, data) =>
+         scheduler.statusUpdate(taskId, state, data.value)
+         if (TaskState.isFinished(state)) {
+           if (executorActor.contains(executorId)) {
+             freeCores(executorId) += 1
+             makeOffers(executorId)
+           } else {
+             // Ignoring the update since we don't know about the executor.
+             val msg = "Ignored task status update (%d state %s) from unknown executor %s with ID %s"
+             logWarning(msg.format(taskId, state, sender, executorId))
+           }
+         }
+ 
+       case ReviveOffers =>
+         makeOffers()
+ 
+       case KillTask(taskId, executorId) =>
+         executorActor(executorId) ! KillTask(taskId, executorId)
+ 
+       case StopDriver =>
+         sender ! true
+         context.stop(self)
+ 
+       case StopExecutors =>
+         logInfo("Asking each executor to shut down")
+         for (executor <- executorActor.values) {
+           executor ! StopExecutor
+         }
+         sender ! true
+ 
+       case RemoveExecutor(executorId, reason) =>
+         removeExecutor(executorId, reason)
+         sender ! true
+ 
+       case Terminated(actor) =>
+         actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka actor terminated"))
+ 
 -      case RemoteClientDisconnected(transport, address) =>
 -        addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disconnected"))
++      case DisassociatedEvent(_, remoteAddress, _) =>
++        addressToExecutorId.get(remoteAddress).foreach(removeExecutor(_, "remote Akka client disconnected"))
+ 
 -      case RemoteClientShutdown(transport, address) =>
 -        addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client shutdown"))
++      case AssociationErrorEvent(_, _, remoteAddress, _) =>
++        addressToExecutorId.get(remoteAddress).foreach(removeExecutor(_, "remote Akka client shutdown"))
+     }
+ 
+     // Make fake resource offers on all executors
+     def makeOffers() {
+       launchTasks(scheduler.resourceOffers(
+         executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
+     }
+ 
+     // Make fake resource offers on just one executor
+     def makeOffers(executorId: String) {
+       launchTasks(scheduler.resourceOffers(
+         Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId)))))
+     }
+ 
+     // Launch tasks returned by a set of resource offers
+     def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
+       for (task <- tasks.flatten) {
+         freeCores(task.executorId) -= 1
+         executorActor(task.executorId) ! LaunchTask(task)
+       }
+     }
+ 
+     // Remove a disconnected slave from the cluster
+     def removeExecutor(executorId: String, reason: String) {
+       if (executorActor.contains(executorId)) {
+         logInfo("Executor " + executorId + " disconnected, so removing it")
+         val numCores = freeCores(executorId)
+         actorToExecutorId -= executorActor(executorId)
+         addressToExecutorId -= executorAddress(executorId)
+         executorActor -= executorId
+         executorHost -= executorId
+         freeCores -= executorId
+         totalCoreCount.addAndGet(-numCores)
+         scheduler.executorLost(executorId, SlaveLost(reason))
+       }
+     }
+   }
+ 
+   var driverActor: ActorRef = null
+   val taskIdsOnSlave = new HashMap[String, HashSet[String]]
+ 
+   override def start() {
+     val properties = new ArrayBuffer[(String, String)]
+     val iterator = System.getProperties.entrySet.iterator
+     while (iterator.hasNext) {
+       val entry = iterator.next
+       val (key, value) = (entry.getKey.toString, entry.getValue.toString)
+       if (key.startsWith("spark.") && !key.equals("spark.hostPort")) {
+         properties += ((key, value))
+       }
+     }
+     driverActor = actorSystem.actorOf(
+       Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)
+   }
+ 
+   private val timeout = {
+     Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
+   }
+ 
+   def stopExecutors() {
+     try {
+       if (driverActor != null) {
+         logInfo("Shutting down all executors")
+         val future = driverActor.ask(StopExecutors)(timeout)
+         Await.ready(future, timeout)
+       }
+     } catch {
+       case e: Exception =>
+         throw new SparkException("Error asking standalone scheduler to shut down executors", e)
+     }
+   }
+ 
+   override def stop() {
+     try {
+       if (driverActor != null) {
+         val future = driverActor.ask(StopDriver)(timeout)
+         Await.ready(future, timeout)
+       }
+     } catch {
+       case e: Exception =>
+         throw new SparkException("Error stopping standalone scheduler's driver actor", e)
+     }
+   }
+ 
+   override def reviveOffers() {
+     driverActor ! ReviveOffers
+   }
+ 
+   override def killTask(taskId: Long, executorId: String) {
+     driverActor ! KillTask(taskId, executorId)
+   }
+ 
+   override def defaultParallelism() = Option(System.getProperty("spark.default.parallelism"))
+       .map(_.toInt).getOrElse(math.max(totalCoreCount.get(), 2))
+ 
+   // Called by subclasses when notified of a lost worker
+   def removeExecutor(executorId: String, reason: String) {
+     try {
+       val future = driverActor.ask(RemoveExecutor(executorId, reason))(timeout)
+       Await.ready(future, timeout)
+     } catch {
+       case e: Exception =>
+         throw new SparkException("Error notifying standalone scheduler's driver actor", e)
+     }
+   }
+ }
+ 
+ private[spark] object CoarseGrainedSchedulerBackend {
+   val ACTOR_NAME = "CoarseGrainedScheduler"
+ }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index fa83ae1,cefa970..7127a72
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@@ -42,12 -42,12 +42,12 @@@ private[spark] class SparkDeploySchedul
      super.start()
  
      // The endpoint for executors to talk to us
 -    val driverUrl = "akka://spark@%s:%s/user/%s".format(
 +    val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
        System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
-       StandaloneSchedulerBackend.ACTOR_NAME)
+       CoarseGrainedSchedulerBackend.ACTOR_NAME)
      val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}")
      val command = Command(
-       "org.apache.spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs)
+       "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs)
      val sparkHome = sc.getSparkHome().getOrElse(null)
      val appDesc = new ApplicationDescription(appName, maxCores, executorMemory, command, sparkHome,
          "http://" + sc.ui.appUIAddress)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index bf4040f,300fe69..8de9b72
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@@ -119,10 -120,10 +120,10 @@@ private[spark] class CoarseMesosSchedul
      }
      val command = CommandInfo.newBuilder()
        .setEnvironment(environment)
 -    val driverUrl = "akka://spark@%s:%s/user/%s".format(
 +    val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
        System.getProperty("spark.driver.host"),
        System.getProperty("spark.driver.port"),
-       StandaloneSchedulerBackend.ACTOR_NAME)
+       CoarseGrainedSchedulerBackend.ACTOR_NAME)
      val uri = System.getProperty("spark.executor.uri")
      if (uri == null) {
        val runScript = new File(sparkHome, "spark-class").getCanonicalPath

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 7852849,a34c95b..252329c
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@@ -17,17 -17,18 +17,18 @@@
  
  package org.apache.spark.storage
  
- import java.io.{InputStream, OutputStream}
+ import java.io.{File, InputStream, OutputStream}
  import java.nio.{ByteBuffer, MappedByteBuffer}
  
- import scala.collection.mutable.{HashMap, ArrayBuffer, HashSet}
+ import scala.collection.mutable.{HashMap, ArrayBuffer}
+ import scala.util.Random
  
  import akka.actor.{ActorSystem, Cancellable, Props}
 -import akka.dispatch.{Await, Future}
 -import akka.util.Duration
 -import akka.util.duration._
 +import scala.concurrent.{Await, Future}
 +import scala.concurrent.duration.Duration
 +import scala.concurrent.duration._
  
- import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
+ import it.unimi.dsi.fastutil.io.{FastBufferedOutputStream, FastByteArrayOutputStream}
  
  import org.apache.spark.{Logging, SparkEnv, SparkException}
  import org.apache.spark.io.CompressionCodec

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/util/Utils.scala
index 94ce50e,fe932d8..7557dda
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@@ -18,22 -18,14 +18,18 @@@
  package org.apache.spark.util
  
  import java.io._
- import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address, ServerSocket}
+ import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address}
  import java.util.{Locale, Random, UUID}
+ import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor}
  
- import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadFactory, ThreadPoolExecutor}
- import java.util.regex.Pattern
 -import scala.collection.Map
 +
- import org.apache.hadoop.conf.Configuration
- import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
- 
- import scala.collection.mutable.{ArrayBuffer, HashMap}
+ import scala.collection.mutable.ArrayBuffer
  import scala.collection.JavaConversions._
 +import scala.collection.Map
  import scala.io.Source
 +import scala.reflect.ClassTag
 +import scala.Some
 +
  
  import com.google.common.io.Files
  import com.google.common.util.concurrent.ThreadFactoryBuilder

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
index 0000000,80545c9..45849b3
mode 000000,100644..100644
--- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
@@@ -1,0 -1,152 +1,154 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.spark.util.collection
+ 
++import scala.reflect.ClassTag
++
+ 
+ /**
+  * A fast hash map implementation for nullable keys. This hash map supports insertions and updates,
+  * but not deletions. This map is about 5X faster than java.util.HashMap, while using much less
+  * space overhead.
+  *
+  * Under the hood, it uses our OpenHashSet implementation.
+  */
+ private[spark]
 -class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V: ClassManifest](
++class OpenHashMap[K >: Null : ClassTag, @specialized(Long, Int, Double) V: ClassTag](
+     initialCapacity: Int)
+   extends Iterable[(K, V)]
+   with Serializable {
+ 
+   def this() = this(64)
+ 
+   protected var _keySet = new OpenHashSet[K](initialCapacity)
+ 
+   // Init in constructor (instead of in declaration) to work around a Scala compiler specialization
+   // bug that would generate two arrays (one for Object and one for specialized T).
+   private var _values: Array[V] = _
+   _values = new Array[V](_keySet.capacity)
+ 
+   @transient private var _oldValues: Array[V] = null
+ 
+   // Treat the null key differently so we can use nulls in "data" to represent empty items.
+   private var haveNullValue = false
+   private var nullValue: V = null.asInstanceOf[V]
+ 
+   override def size: Int = if (haveNullValue) _keySet.size + 1 else _keySet.size
+ 
+   /** Get the value for a given key */
+   def apply(k: K): V = {
+     if (k == null) {
+       nullValue
+     } else {
+       val pos = _keySet.getPos(k)
+       if (pos < 0) {
+         null.asInstanceOf[V]
+       } else {
+         _values(pos)
+       }
+     }
+   }
+ 
+   /** Set the value for a key */
+   def update(k: K, v: V) {
+     if (k == null) {
+       haveNullValue = true
+       nullValue = v
+     } else {
+       val pos = _keySet.addWithoutResize(k) & OpenHashSet.POSITION_MASK
+       _values(pos) = v
+       _keySet.rehashIfNeeded(k, grow, move)
+       _oldValues = null
+     }
+   }
+ 
+   /**
+    * If the key doesn't exist yet in the hash map, set its value to defaultValue; otherwise,
+    * set its value to mergeValue(oldValue).
+    *
+    * @return the newly updated value.
+    */
+   def changeValue(k: K, defaultValue: => V, mergeValue: (V) => V): V = {
+     if (k == null) {
+       if (haveNullValue) {
+         nullValue = mergeValue(nullValue)
+       } else {
+         haveNullValue = true
+         nullValue = defaultValue
+       }
+       nullValue
+     } else {
+       val pos = _keySet.addWithoutResize(k)
+       if ((pos & OpenHashSet.NONEXISTENCE_MASK) != 0) {
+         val newValue = defaultValue
+         _values(pos & OpenHashSet.POSITION_MASK) = newValue
+         _keySet.rehashIfNeeded(k, grow, move)
+         newValue
+       } else {
+         _values(pos) = mergeValue(_values(pos))
+         _values(pos)
+       }
+     }
+   }
+ 
+   override def iterator = new Iterator[(K, V)] {
+     var pos = -1
+     var nextPair: (K, V) = computeNextPair()
+ 
+     /** Get the next value we should return from next(), or null if we're finished iterating */
+     def computeNextPair(): (K, V) = {
+       if (pos == -1) {    // Treat position -1 as looking at the null value
+         if (haveNullValue) {
+           pos += 1
+           return (null.asInstanceOf[K], nullValue)
+         }
+         pos += 1
+       }
+       pos = _keySet.nextPos(pos)
+       if (pos >= 0) {
+         val ret = (_keySet.getValue(pos), _values(pos))
+         pos += 1
+         ret
+       } else {
+         null
+       }
+     }
+ 
+     def hasNext = nextPair != null
+ 
+     def next() = {
+       val pair = nextPair
+       nextPair = computeNextPair()
+       pair
+     }
+   }
+ 
+   // The following member variables are declared as protected instead of private for the
+   // specialization to work (specialized class extends the non-specialized one and needs access
+   // to the "private" variables).
+   // They also should have been val's. We use var's because there is a Scala compiler bug that
+   // would throw illegal access error at runtime if they are declared as val's.
+   protected var grow = (newCapacity: Int) => {
+     _oldValues = _values
+     _values = new Array[V](newCapacity)
+   }
+ 
+   protected var move = (oldPos: Int, newPos: Int) => {
+     _values(newPos) = _oldValues(oldPos)
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
index 0000000,4592e4f..49d95af
mode 000000,100644..100644
--- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
@@@ -1,0 -1,271 +1,272 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.spark.util.collection
+ 
++import scala.reflect._
+ 
+ /**
+  * A simple, fast hash set optimized for non-null insertion-only use case, where keys are never
+  * removed.
+  *
+  * The underlying implementation uses Scala compiler's specialization to generate optimized
+  * storage for two primitive types (Long and Int). It is much faster than Java's standard HashSet
+  * while incurring much less memory overhead. This can serve as building blocks for higher level
+  * data structures such as an optimized HashMap.
+  *
+  * This OpenHashSet is designed to serve as building blocks for higher level data structures
+  * such as an optimized hash map. Compared with standard hash set implementations, this class
+  * provides its various callbacks interfaces (e.g. allocateFunc, moveFunc) and interfaces to
+  * retrieve the position of a key in the underlying array.
+  *
+  * It uses quadratic probing with a power-of-2 hash table size, which is guaranteed
+  * to explore all spaces for each key (see http://en.wikipedia.org/wiki/Quadratic_probing).
+  */
+ private[spark]
 -class OpenHashSet[@specialized(Long, Int) T: ClassManifest](
++class OpenHashSet[@specialized(Long, Int) T: ClassTag](
+     initialCapacity: Int,
+     loadFactor: Double)
+   extends Serializable {
+ 
+   require(initialCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements")
+   require(initialCapacity >= 1, "Invalid initial capacity")
+   require(loadFactor < 1.0, "Load factor must be less than 1.0")
+   require(loadFactor > 0.0, "Load factor must be greater than 0.0")
+ 
+   import OpenHashSet._
+ 
+   def this(initialCapacity: Int) = this(initialCapacity, 0.7)
+ 
+   def this() = this(64)
+ 
+   // The following member variables are declared as protected instead of private for the
+   // specialization to work (specialized class extends the non-specialized one and needs access
+   // to the "private" variables).
+ 
+   protected val hasher: Hasher[T] = {
+     // It would've been more natural to write the following using pattern matching. But Scala 2.9.x
+     // compiler has a bug when specialization is used together with this pattern matching, and
+     // throws:
+     // scala.tools.nsc.symtab.Types$TypeError: type mismatch;
+     //  found   : scala.reflect.AnyValManifest[Long]
 -    //  required: scala.reflect.ClassManifest[Int]
++    //  required: scala.reflect.ClassTag[Int]
+     //         at scala.tools.nsc.typechecker.Contexts$Context.error(Contexts.scala:298)
+     //         at scala.tools.nsc.typechecker.Infer$Inferencer.error(Infer.scala:207)
+     //         ...
 -    val mt = classManifest[T]
 -    if (mt == ClassManifest.Long) {
++    val mt = classTag[T]
++    if (mt == ClassTag.Long) {
+       (new LongHasher).asInstanceOf[Hasher[T]]
 -    } else if (mt == ClassManifest.Int) {
++    } else if (mt == ClassTag.Int) {
+       (new IntHasher).asInstanceOf[Hasher[T]]
+     } else {
+       new Hasher[T]
+     }
+   }
+ 
+   protected var _capacity = nextPowerOf2(initialCapacity)
+   protected var _mask = _capacity - 1
+   protected var _size = 0
+ 
+   protected var _bitset = new BitSet(_capacity)
+ 
+   // Init of the array in constructor (instead of in declaration) to work around a Scala compiler
+   // specialization bug that would generate two arrays (one for Object and one for specialized T).
+   protected var _data: Array[T] = _
+   _data = new Array[T](_capacity)
+ 
+   /** Number of elements in the set. */
+   def size: Int = _size
+ 
+   /** The capacity of the set (i.e. size of the underlying array). */
+   def capacity: Int = _capacity
+ 
+   /** Return true if this set contains the specified element. */
+   def contains(k: T): Boolean = getPos(k) != INVALID_POS
+ 
+   /**
+    * Add an element to the set. If the set is over capacity after the insertion, grow the set
+    * and rehash all elements.
+    */
+   def add(k: T) {
+     addWithoutResize(k)
+     rehashIfNeeded(k, grow, move)
+   }
+ 
+   /**
+    * Add an element to the set. This one differs from add in that it doesn't trigger rehashing.
+    * The caller is responsible for calling rehashIfNeeded.
+    *
+    * Use (retval & POSITION_MASK) to get the actual position, and
+    * (retval & EXISTENCE_MASK) != 0 for prior existence.
+    *
+    * @return The position where the key is placed, plus the highest order bit is set if the key
+    *         exists previously.
+    */
+   def addWithoutResize(k: T): Int = putInto(_bitset, _data, k)
+ 
+   /**
+    * Rehash the set if it is overloaded.
+    * @param k A parameter unused in the function, but to force the Scala compiler to specialize
+    *          this method.
+    * @param allocateFunc Callback invoked when we are allocating a new, larger array.
+    * @param moveFunc Callback invoked when we move the key from one position (in the old data array)
+    *                 to a new position (in the new data array).
+    */
+   def rehashIfNeeded(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, Int) => Unit) {
+     if (_size > loadFactor * _capacity) {
+       rehash(k, allocateFunc, moveFunc)
+     }
+   }
+ 
+   /**
+    * Return the position of the element in the underlying array, or INVALID_POS if it is not found.
+    */
+   def getPos(k: T): Int = {
+     var pos = hashcode(hasher.hash(k)) & _mask
+     var i = 1
+     while (true) {
+       if (!_bitset.get(pos)) {
+         return INVALID_POS
+       } else if (k == _data(pos)) {
+         return pos
+       } else {
+         val delta = i
+         pos = (pos + delta) & _mask
+         i += 1
+       }
+     }
+     // Never reached here
+     INVALID_POS
+   }
+ 
+   /** Return the value at the specified position. */
+   def getValue(pos: Int): T = _data(pos)
+ 
+   /**
+    * Return the next position with an element stored, starting from the given position inclusively.
+    */
+   def nextPos(fromPos: Int): Int = _bitset.nextSetBit(fromPos)
+ 
+   /**
+    * Put an entry into the set. Return the position where the key is placed. In addition, the
+    * highest bit in the returned position is set if the key exists prior to this put.
+    *
+    * This function assumes the data array has at least one empty slot.
+    */
+   private def putInto(bitset: BitSet, data: Array[T], k: T): Int = {
+     val mask = data.length - 1
+     var pos = hashcode(hasher.hash(k)) & mask
+     var i = 1
+     while (true) {
+       if (!bitset.get(pos)) {
+         // This is a new key.
+         data(pos) = k
+         bitset.set(pos)
+         _size += 1
+         return pos | NONEXISTENCE_MASK
+       } else if (data(pos) == k) {
+         // Found an existing key.
+         return pos
+       } else {
+         val delta = i
+         pos = (pos + delta) & mask
+         i += 1
+       }
+     }
+     // Never reached here
+     assert(INVALID_POS != INVALID_POS)
+     INVALID_POS
+   }
+ 
+   /**
+    * Double the table's size and re-hash everything. We are not really using k, but it is declared
+    * so Scala compiler can specialize this method (which leads to calling the specialized version
+    * of putInto).
+    *
+    * @param k A parameter unused in the function, but to force the Scala compiler to specialize
+    *          this method.
+    * @param allocateFunc Callback invoked when we are allocating a new, larger array.
+    * @param moveFunc Callback invoked when we move the key from one position (in the old data array)
+    *                 to a new position (in the new data array).
+    */
+   private def rehash(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, Int) => Unit) {
+     val newCapacity = _capacity * 2
+     require(newCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements")
+ 
+     allocateFunc(newCapacity)
+     val newData = new Array[T](newCapacity)
+     val newBitset = new BitSet(newCapacity)
+     var pos = 0
+     _size = 0
+     while (pos < _capacity) {
+       if (_bitset.get(pos)) {
+         val newPos = putInto(newBitset, newData, _data(pos))
+         moveFunc(pos, newPos & POSITION_MASK)
+       }
+       pos += 1
+     }
+     _bitset = newBitset
+     _data = newData
+     _capacity = newCapacity
+     _mask = newCapacity - 1
+   }
+ 
+   /**
+    * Re-hash a value to deal better with hash functions that don't differ
+    * in the lower bits, similar to java.util.HashMap
+    */
+   private def hashcode(h: Int): Int = {
+     val r = h ^ (h >>> 20) ^ (h >>> 12)
+     r ^ (r >>> 7) ^ (r >>> 4)
+   }
+ 
+   private def nextPowerOf2(n: Int): Int = {
+     val highBit = Integer.highestOneBit(n)
+     if (highBit == n) n else highBit << 1
+   }
+ }
+ 
+ 
+ private[spark]
+ object OpenHashSet {
+ 
+   val INVALID_POS = -1
+   val NONEXISTENCE_MASK = 0x80000000
+   val POSITION_MASK = 0xEFFFFFF
+ 
+   /**
+    * A set of specialized hash function implementation to avoid boxing hash code computation
+    * in the specialized implementation of OpenHashSet.
+    */
+   sealed class Hasher[@specialized(Long, Int) T] {
+     def hash(o: T): Int = o.hashCode()
+   }
+ 
+   class LongHasher extends Hasher[Long] {
+     override def hash(o: Long): Int = (o ^ (o >>> 32)).toInt
+   }
+ 
+   class IntHasher extends Hasher[Int] {
+     override def hash(o: Int): Int = o
+   }
+ 
+   private def grow1(newSize: Int) {}
+   private def move1(oldPos: Int, newPos: Int) { }
+ 
+   private val grow = grow1 _
+   private val move = move1 _
+ }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
index 0000000,d76143e..2e1ef06
mode 000000,100644..100644
--- a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
@@@ -1,0 -1,127 +1,128 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.spark.util.collection
+ 
++import scala.reflect._
+ 
+ /**
+  * A fast hash map implementation for primitive, non-null keys. This hash map supports
+  * insertions and updates, but not deletions. This map is about an order of magnitude
+  * faster than java.util.HashMap, while using much less space overhead.
+  *
+  * Under the hood, it uses our OpenHashSet implementation.
+  */
+ private[spark]
 -class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest,
 -                              @specialized(Long, Int, Double) V: ClassManifest](
++class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassTag,
++                              @specialized(Long, Int, Double) V: ClassTag](
+     initialCapacity: Int)
+   extends Iterable[(K, V)]
+   with Serializable {
+ 
+   def this() = this(64)
+ 
 -  require(classManifest[K] == classManifest[Long] || classManifest[K] == classManifest[Int])
++  require(classTag[K] == classTag[Long] || classTag[K] == classTag[Int])
+ 
+   // Init in constructor (instead of in declaration) to work around a Scala compiler specialization
+   // bug that would generate two arrays (one for Object and one for specialized T).
+   protected var _keySet: OpenHashSet[K] = _
+   private var _values: Array[V] = _
+   _keySet = new OpenHashSet[K](initialCapacity)
+   _values = new Array[V](_keySet.capacity)
+ 
+   private var _oldValues: Array[V] = null
+ 
+   override def size = _keySet.size
+ 
+   /** Get the value for a given key */
+   def apply(k: K): V = {
+     val pos = _keySet.getPos(k)
+     _values(pos)
+   }
+ 
+   /** Get the value for a given key, or returns elseValue if it doesn't exist. */
+   def getOrElse(k: K, elseValue: V): V = {
+     val pos = _keySet.getPos(k)
+     if (pos >= 0) _values(pos) else elseValue
+   }
+ 
+   /** Set the value for a key */
+   def update(k: K, v: V) {
+     val pos = _keySet.addWithoutResize(k) & OpenHashSet.POSITION_MASK
+     _values(pos) = v
+     _keySet.rehashIfNeeded(k, grow, move)
+     _oldValues = null
+   }
+ 
+   /**
+    * If the key doesn't exist yet in the hash map, set its value to defaultValue; otherwise,
+    * set its value to mergeValue(oldValue).
+    *
+    * @return the newly updated value.
+    */
+   def changeValue(k: K, defaultValue: => V, mergeValue: (V) => V): V = {
+     val pos = _keySet.addWithoutResize(k)
+     if ((pos & OpenHashSet.NONEXISTENCE_MASK) != 0) {
+       val newValue = defaultValue
+       _values(pos & OpenHashSet.POSITION_MASK) = newValue
+       _keySet.rehashIfNeeded(k, grow, move)
+       newValue
+     } else {
+       _values(pos) = mergeValue(_values(pos))
+       _values(pos)
+     }
+   }
+ 
+   override def iterator = new Iterator[(K, V)] {
+     var pos = 0
+     var nextPair: (K, V) = computeNextPair()
+ 
+     /** Get the next value we should return from next(), or null if we're finished iterating */
+     def computeNextPair(): (K, V) = {
+       pos = _keySet.nextPos(pos)
+       if (pos >= 0) {
+         val ret = (_keySet.getValue(pos), _values(pos))
+         pos += 1
+         ret
+       } else {
+         null
+       }
+     }
+ 
+     def hasNext = nextPair != null
+ 
+     def next() = {
+       val pair = nextPair
+       nextPair = computeNextPair()
+       pair
+     }
+   }
+ 
+   // The following member variables are declared as protected instead of private for the
+   // specialization to work (specialized class extends the unspecialized one and needs access
+   // to the "private" variables).
+   // They also should have been val's. We use var's because there is a Scala compiler bug that
+   // would throw illegal access error at runtime if they are declared as val's.
+   protected var grow = (newCapacity: Int) => {
+     _oldValues = _values
+     _values = new Array[V](newCapacity)
+   }
+ 
+   protected var move = (oldPos: Int, newPos: Int) => {
+     _values(newPos) = _oldValues(oldPos)
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala
index 0000000,369519c..465c221
mode 000000,100644..100644
--- a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala
@@@ -1,0 -1,51 +1,53 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.spark.util.collection
+ 
++import scala.reflect.ClassTag
++
+ /** Provides a simple, non-threadsafe, array-backed vector that can store primitives. */
+ private[spark]
 -class PrimitiveVector[@specialized(Long, Int, Double) V: ClassManifest](initialSize: Int = 64) {
++class PrimitiveVector[@specialized(Long, Int, Double) V: ClassTag](initialSize: Int = 64) {
+   private var numElements = 0
+   private var array: Array[V] = _
+ 
+   // NB: This must be separate from the declaration, otherwise the specialized parent class
+   // will get its own array with the same initial size. TODO: Figure out why...
+   array = new Array[V](initialSize)
+ 
+   def apply(index: Int): V = {
+     require(index < numElements)
+     array(index)
+   }
+ 
+   def +=(value: V) {
+     if (numElements == array.length) { resize(array.length * 2) }
+     array(numElements) = value
+     numElements += 1
+   }
+ 
+   def length = numElements
+ 
+   def getUnderlyingArray = array
+ 
+   /** Resizes the array, dropping elements if the total length decreases. */
+   def resize(newLength: Int) {
+     val newArray = new Array[V](newLength)
+     array.copyToArray(newArray)
+     array = newArray
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/test/scala/org/apache/spark/DistributedSuite.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 0d8742c,1fd7642..2e41438
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@@ -89,13 -134,49 +134,49 @@@ class SparkListenerSuite extends FunSui
      }
    }
  
-   def checkNonZeroAvg(m: Traversable[Long], msg: String) {
-     assert(m.sum / m.size.toDouble > 0.0, msg)
+   test("onTaskGettingResult() called when result fetched remotely") {
+     // Need to use local cluster mode here, because results are not ever returned through the
+     // block manager when using the LocalScheduler.
+     sc = new SparkContext("local-cluster[1,1,512]", "test")
+ 
+     val listener = new SaveTaskEvents
+     sc.addSparkListener(listener)
+  
+     // Make a task whose result is larger than the akka frame size
+     System.setProperty("spark.akka.frameSize", "1")
+     val akkaFrameSize =
 -      sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt
++      sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt
+     val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x,y) => x)
+     assert(result === 1.to(akkaFrameSize).toArray)
+ 
+     assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+     val TASK_INDEX = 0
+     assert(listener.startedTasks.contains(TASK_INDEX))
+     assert(listener.startedGettingResultTasks.contains(TASK_INDEX))
+     assert(listener.endedTasks.contains(TASK_INDEX))
    }
  
-   def isStage(stageInfo: StageInfo, rddNames: Set[String], excludedNames: Set[String]) = {
-     val names = Set(stageInfo.stage.rdd.name) ++ stageInfo.stage.rdd.dependencies.map{_.rdd.name}
-     !names.intersect(rddNames).isEmpty && names.intersect(excludedNames).isEmpty
+   test("onTaskGettingResult() not called when result sent directly") {
+     // Need to use local cluster mode here, because results are not ever returned through the
+     // block manager when using the LocalScheduler.
+     sc = new SparkContext("local-cluster[1,1,512]", "test")
+ 
+     val listener = new SaveTaskEvents
+     sc.addSparkListener(listener)
+  
+     // Make a task whose result is larger than the akka frame size
+     val result = sc.parallelize(Seq(1), 1).map(x => 2 * x).reduce((x, y) => x)
+     assert(result === 2)
+ 
+     assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+     val TASK_INDEX = 0
+     assert(listener.startedTasks.contains(TASK_INDEX))
+     assert(listener.startedGettingResultTasks.isEmpty == true)
+     assert(listener.endedTasks.contains(TASK_INDEX))
+   }
+ 
+   def checkNonZeroAvg(m: Traversable[Long], msg: String) {
+     assert(m.sum / m.size.toDouble > 0.0, msg)
    }
  
    class SaveStageInfo extends SparkListener {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/examples/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/pom.xml
----------------------------------------------------------------------
diff --cc pom.xml
index 9278856,53ac82e..72b9549
--- a/pom.xml
+++ b/pom.xml
@@@ -227,13 -259,36 +261,36 @@@
        </dependency>
        <dependency>
          <groupId>com.typesafe.akka</groupId>
 -        <artifactId>akka-actor</artifactId>
++        <artifactId>akka-actor_${scala-short.version}</artifactId>
+         <version>${akka.version}</version>
+         <exclusions>
+           <exclusion>
+             <groupId>org.jboss.netty</groupId>
+             <artifactId>netty</artifactId>
+           </exclusion>
+         </exclusions>
+       </dependency>
+       <dependency>
+         <groupId>com.typesafe.akka</groupId>
 -        <artifactId>akka-remote</artifactId>
 +        <artifactId>akka-remote_${scala-short.version}</artifactId>
          <version>${akka.version}</version>
+         <exclusions>
+           <exclusion>
+             <groupId>org.jboss.netty</groupId>
+             <artifactId>netty</artifactId>
+           </exclusion>
+         </exclusions>
        </dependency>
        <dependency>
          <groupId>com.typesafe.akka</groupId>
 -        <artifactId>akka-slf4j</artifactId>
 +        <artifactId>akka-slf4j_${scala-short.version}</artifactId>
          <version>${akka.version}</version>
+         <exclusions>
+           <exclusion>
+             <groupId>org.jboss.netty</groupId>
+             <artifactId>netty</artifactId>
+           </exclusion>
+         </exclusions>
        </dependency>
        <dependency>
          <groupId>it.unimi.dsi</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/project/SparkBuild.scala
----------------------------------------------------------------------
diff --cc project/SparkBuild.scala
index 43db97f,45fd30a..b71e1b3
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@@ -74,13 -76,16 +76,16 @@@ object SparkBuild extends Build 
    // Conditionally include the yarn sub-project
    lazy val maybeYarn = if(isYarnEnabled) Seq[ClasspathDependency](yarn) else Seq[ClasspathDependency]()
    lazy val maybeYarnRef = if(isYarnEnabled) Seq[ProjectReference](yarn) else Seq[ProjectReference]()
-   lazy val allProjects = Seq[ProjectReference](
-     core, repl, examples, bagel, streaming, mllib, tools, assemblyProj) ++ maybeYarnRef
+ 
+   // Everything except assembly, tools and examples belong to packageProjects
+   lazy val packageProjects = Seq[ProjectReference](core, repl, bagel, streaming, mllib) ++ maybeYarnRef
+ 
+   lazy val allProjects = packageProjects ++ Seq[ProjectReference](examples, tools, assemblyProj)
  
    def sharedSettings = Defaults.defaultSettings ++ Seq(
 -    organization := "org.apache.spark",
 -    version := "0.9.0-incubating-SNAPSHOT",
 -    scalaVersion := "2.9.3",
 +    organization       := "org.apache.spark",
 +    version            := "0.9.0-incubating-SNAPSHOT",
 +    scalaVersion       := "2.10.3",
      scalacOptions := Seq("-Xmax-classfile-name", "120", "-unchecked", "-deprecation",
        "-target:" + SCALAC_JVM_VERSION),
      javacOptions := Seq("-target", JAVAC_JVM_VERSION, "-source", JAVAC_JVM_VERSION),
@@@ -194,34 -201,37 +205,34 @@@
      ),
  
      libraryDependencies ++= Seq(
 -      "com.google.guava" % "guava" % "14.0.1",
 -      "com.google.code.findbugs" % "jsr305" % "1.3.9",
 -      "log4j" % "log4j" % "1.2.17",
 -      "org.slf4j" % "slf4j-api" % slf4jVersion,
 -      "org.slf4j" % "slf4j-log4j12" % slf4jVersion,
 -      "commons-daemon" % "commons-daemon" % "1.0.10",  // workaround for bug HADOOP-9407
 -      "com.ning" % "compress-lzf" % "0.8.4",
 -      "org.xerial.snappy" % "snappy-java" % "1.0.5",
 -      "org.ow2.asm" % "asm" % "4.0",
 -      "com.google.protobuf" % "protobuf-java" % "2.4.1",
 -      "com.typesafe.akka" % "akka-actor" % "2.0.5" excludeAll(excludeNetty),
 -      "com.typesafe.akka" % "akka-remote" % "2.0.5" excludeAll(excludeNetty),
 -      "com.typesafe.akka" % "akka-slf4j" % "2.0.5" excludeAll(excludeNetty),
 -      "it.unimi.dsi" % "fastutil" % "6.4.4",
 -      "colt" % "colt" % "1.2.0",
 -      "net.liftweb" % "lift-json_2.9.2" % "2.5",
 -      "org.apache.mesos" % "mesos" % "0.13.0",
 -      "io.netty" % "netty-all" % "4.0.0.Beta2",
 -      "org.apache.derby" % "derby" % "10.4.2.0" % "test",
 -      "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
 -      "net.java.dev.jets3t" % "jets3t" % "0.7.1",
 -      "org.apache.avro" % "avro" % "1.7.4",
 -      "org.apache.avro" % "avro-ipc" % "1.7.4" excludeAll(excludeNetty),
 -      "org.apache.zookeeper" % "zookeeper" % "3.4.5" excludeAll(excludeNetty),
 -      "com.codahale.metrics" % "metrics-core" % "3.0.0",
 -      "com.codahale.metrics" % "metrics-jvm" % "3.0.0",
 -      "com.codahale.metrics" % "metrics-json" % "3.0.0",
 -      "com.codahale.metrics" % "metrics-ganglia" % "3.0.0",
 -      "com.twitter" % "chill_2.9.3" % "0.3.1",
 -      "com.twitter" % "chill-java" % "0.3.1"
 -    )
 +        "com.google.guava"         % "guava"            % "14.0.1",
 +        "com.google.code.findbugs" % "jsr305"           % "1.3.9",
 +        "log4j"                    % "log4j"            % "1.2.17",
 +        "org.slf4j"                % "slf4j-api"        % slf4jVersion,
 +        "org.slf4j"                % "slf4j-log4j12"    % slf4jVersion,
 +        "com.ning"                 % "compress-lzf"     % "0.8.4",
 +        "org.xerial.snappy"        % "snappy-java"      % "1.0.5",
 +        "commons-daemon"           % "commons-daemon"   % "1.0.10", // workaround for bug HADOOP-9407
 +        "org.ow2.asm"              % "asm"              % "4.0",
 +        "com.google.protobuf"      % "protobuf-java"    % "2.4.1",
 +        "com.typesafe.akka"       %% "akka-remote"      % "2.2.3"  excludeAll(excludeNetty), 
 +        "com.typesafe.akka"       %% "akka-slf4j"       % "2.2.3"  excludeAll(excludeNetty),
 +        "net.liftweb"             %% "lift-json"        % "2.5.1"  excludeAll(excludeNetty),
 +        "it.unimi.dsi"             % "fastutil"         % "6.4.4",
 +        "colt"                     % "colt"             % "1.2.0",
 +        "org.apache.mesos"         % "mesos"            % "0.13.0",
 +        "net.java.dev.jets3t"      % "jets3t"           % "0.7.1",
 +        "org.apache.derby"         % "derby"            % "10.4.2.0"                     % "test",
-         "org.apache.hadoop"        % "hadoop-client"    % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm),
++        "org.apache.hadoop"        % "hadoop-client"    % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
 +        "org.apache.avro"          % "avro"             % "1.7.4",
 +        "org.apache.avro"          % "avro-ipc"         % "1.7.4" excludeAll(excludeNetty),
 +        "com.codahale.metrics"     % "metrics-core"     % "3.0.0",
 +        "com.codahale.metrics"     % "metrics-jvm"      % "3.0.0",
 +        "com.codahale.metrics"     % "metrics-json"     % "3.0.0",
 +        "com.codahale.metrics"     % "metrics-ganglia"  % "3.0.0",
 +        "com.twitter"             %% "chill"            % "0.3.1",
 +        "com.twitter"              % "chill-java"       % "0.3.1"
 +      )
    )
  
    def rootSettings = sharedSettings ++ Seq(
@@@ -272,11 -280,19 +283,22 @@@
  
    def streamingSettings = sharedSettings ++ Seq(
      name := "spark-streaming",
+     resolvers ++= Seq(
+       "Akka Repository" at "http://repo.akka.io/releases/",
+       "Apache repo" at "https://repository.apache.org/content/repositories/releases"
+     ),
++
      libraryDependencies ++= Seq(
 -      "org.eclipse.paho" % "mqtt-client" % "0.4.0",
 -      "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy),
 -      "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty),
 -      "com.typesafe.akka" % "akka-zeromq" % "2.0.5" excludeAll(excludeNetty),
 -      "org.apache.kafka" % "kafka_2.9.2" % "0.8.0-beta1"
 +      "org.apache.flume"      % "flume-ng-sdk"     % "1.2.0" % "compile"  excludeAll(excludeNetty, excludeSnappy),
++      "com.sksamuel.kafka"    %% "kafka"            % "0.8.0-beta1"
+         exclude("com.sun.jdmk", "jmxtools")
+         exclude("com.sun.jmx", "jmxri")
+         exclude("net.sf.jopt-simple", "jopt-simple")
++        excludeAll(excludeNetty),
++      "org.eclipse.paho"      % "mqtt-client"      % "0.4.0",
 +      "com.github.sgroschupf" % "zkclient"         % "0.1"                excludeAll(excludeNetty),
 +      "org.twitter4j"         % "twitter4j-stream" % "3.0.3"              excludeAll(excludeNetty),
 +      "com.typesafe.akka"    %%  "akka-zeromq"     % "2.2.3"              excludeAll(excludeNetty)
      )
    )
  

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
----------------------------------------------------------------------
diff --cc repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index 988b624,0ced284..43e504c
--- a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@@ -848,77 -827,28 +862,77 @@@ class SparkILoop(in0: Option[BufferedRe
      }
    }
  
 -  def initializeSpark() {
 -    intp.beQuietDuring {
 -      command("""
 -        org.apache.spark.repl.Main.interp.out.println("Creating SparkContext...");
 -        org.apache.spark.repl.Main.interp.out.flush();
 -        @transient val sc = org.apache.spark.repl.Main.interp.createSparkContext();
 -        org.apache.spark.repl.Main.interp.out.println("Spark context available as sc.");
 -        org.apache.spark.repl.Main.interp.out.flush();
 -        """)
 -      command("import org.apache.spark.SparkContext._")
 +  val u: scala.reflect.runtime.universe.type = scala.reflect.runtime.universe
 +  val m = u.runtimeMirror(getClass.getClassLoader)
 +  private def tagOfStaticClass[T: ClassTag]: u.TypeTag[T] =
 +    u.TypeTag[T](
 +      m,
 +      new TypeCreator {
 +        def apply[U <: ApiUniverse with Singleton](m: Mirror[U]): U # Type =
 +          m.staticClass(classTag[T].runtimeClass.getName).toTypeConstructor.asInstanceOf[U # Type]
 +      })
 +
 +  def process(settings: Settings): Boolean = savingContextLoader {
 +    this.settings = settings
 +    createInterpreter()
 +
 +    // sets in to some kind of reader depending on environmental cues
 +    in = in0 match {
 +      case Some(reader) => SimpleReader(reader, out, true)
 +      case None         =>
 +        // some post-initialization
 +        chooseReader(settings) match {
 +          case x: SparkJLineReader => addThunk(x.consoleReader.postInit) ; x
 +          case x                   => x
 +        }
      }
 -    echo("Type in expressions to have them evaluated.")
 -    echo("Type :help for more information.")
 -  }
 +    lazy val tagOfSparkIMain = tagOfStaticClass[org.apache.spark.repl.SparkIMain]
 +    // Bind intp somewhere out of the regular namespace where
 +    // we can get at it in generated code.
 +    addThunk(intp.quietBind(NamedParam[SparkIMain]("$intp", intp)(tagOfSparkIMain, classTag[SparkIMain])))
 +    addThunk({
 +      import scala.tools.nsc.io._
 +      import Properties.userHome
 +      import scala.compat.Platform.EOL
 +      val autorun = replProps.replAutorunCode.option flatMap (f => io.File(f).safeSlurp())
 +      if (autorun.isDefined) intp.quietRun(autorun.get)
 +    })
 +
 +    addThunk(printWelcome())
 +    addThunk(initializeSpark())
 +
 +    loadFiles(settings)
 +    // it is broken on startup; go ahead and exit
 +    if (intp.reporter.hasErrors)
 +      return false
  
 -  var sparkContext: SparkContext = null
 +    // This is about the illusion of snappiness.  We call initialize()
 +    // which spins off a separate thread, then print the prompt and try
 +    // our best to look ready.  The interlocking lazy vals tend to
 +    // inter-deadlock, so we break the cycle with a single asynchronous
 +    // message to an actor.
 +    if (isAsync) {
 +      intp initialize initializedCallback()
 +      createAsyncListener() // listens for signal to run postInitialization
 +    }
 +    else {
 +      intp.initializeSynchronous()
 +      postInitialization()
 +    }
 +    // printWelcome()
 +
 +    try loop()
 +    catch AbstractOrMissingHandler()
 +    finally closeInterpreter()
 +
 +    true
 +  }
  
    def createSparkContext(): SparkContext = {
-    val uri = System.getenv("SPARK_EXECUTOR_URI")
-    if (uri != null) {
-          System.setProperty("spark.executor.uri", uri)
-    }
+     val uri = System.getenv("SPARK_EXECUTOR_URI")
+     if (uri != null) {
+       System.setProperty("spark.executor.uri", uri)
+     }
      val master = this.master match {
        case Some(m) => m
        case None => {
@@@ -926,9 -856,17 +940,17 @@@
          if (prop != null) prop else "local"
        }
      }
-     val jars = SparkILoop.getAddedJars.map(new java.io.File(_).getAbsolutePath)
-     sparkContext = new SparkContext(master, "Spark shell", System.getenv("SPARK_HOME"), jars)
-     echo("Created spark context..")
+     val jars = Option(System.getenv("ADD_JARS")).map(_.split(','))
 -                                                .getOrElse(new Array[String](0))
 -                                                .map(new java.io.File(_).getAbsolutePath)
++      .getOrElse(new Array[String](0))
++      .map(new java.io.File(_).getAbsolutePath)
+     try {
+       sparkContext = new SparkContext(master, "Spark shell", System.getenv("SPARK_HOME"), jars)
+     } catch {
+       case e: Exception =>
+         e.printStackTrace()
+         echo("Failed to create SparkContext, exiting...")
+         sys.exit(1)
+     }
      sparkContext
    }
  

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/spark-class
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/streaming/pom.xml
----------------------------------------------------------------------
diff --cc streaming/pom.xml
index 3f2033f,7a9ae6a..fb15681
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@@ -87,8 -111,14 +111,14 @@@
      </dependency>
      <dependency>
        <groupId>com.typesafe.akka</groupId>
 -      <artifactId>akka-zeromq</artifactId>
 -      <version>2.0.3</version>
 +      <artifactId>akka-zeromq_${scala-short.version}</artifactId>
 +      <version>${akka.version}</version>
+       <exclusions>
+         <exclusion>
+           <groupId>org.jboss.netty</groupId>
+           <artifactId>netty</artifactId>
+         </exclusion>
+       </exclusions>
      </dependency>
      <dependency>
        <groupId>org.scalatest</groupId>
@@@ -111,13 -141,14 +141,18 @@@
        <scope>test</scope>
      </dependency>
      <dependency>
 +      <groupId>commons-io</groupId>
 +      <artifactId>commons-io</artifactId>
 +    </dependency>
++    <dependency>
+       <groupId>org.eclipse.paho</groupId>
+       <artifactId>mqtt-client</artifactId>
+        <version>0.4.0</version>
+     </dependency>
    </dependencies>
    <build>
 -    <outputDirectory>target/scala-${scala.version}/classes</outputDirectory>
 -    <testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory>
 +    <outputDirectory>target/scala-${scala-short.version}/classes</outputDirectory>
 +    <testOutputDirectory>target/scala-${scala-short.version}/test-classes</testOutputDirectory>
      <plugins>
        <plugin>
          <groupId>org.scalatest</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
----------------------------------------------------------------------
diff --cc streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
index cd404fd,9ceff75..329d2b5
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
@@@ -38,7 -37,7 +38,7 @@@ import org.apache.hadoop.conf.Configura
  
  /**
   * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
-- * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.RDD]]
++ * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.rdd.RDD]]
   * for more details on RDDs). DStreams can either be created from live data (such as, data from
   * HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations
   * such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each
@@@ -488,26 -494,62 +495,60 @@@ abstract class DStream[T: ClassTag] 
  
    /**
     * Apply a function to each RDD in this DStream. This is an output operator, so
-    * this DStream will be registered as an output stream and therefore materialized.
+    * 'this' DStream will be registered as an output stream and therefore materialized.
     */
    def foreach(foreachFunc: (RDD[T], Time) => Unit) {
 -    val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc))
 -    ssc.registerOutputStream(newStream)
 -    newStream
 +    ssc.registerOutputStream(new ForEachDStream(this, context.sparkContext.clean(foreachFunc)))
    }
  
    /**
     * Return a new DStream in which each RDD is generated by applying a function
-    * on each RDD of this DStream.
+    * on each RDD of 'this' DStream.
     */
 -  def transform[U: ClassManifest](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
 +  def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
-     transform((r: RDD[T], t: Time) => transformFunc(r))
+     transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r)))
    }
  
    /**
     * Return a new DStream in which each RDD is generated by applying a function
-    * on each RDD of this DStream.
+    * on each RDD of 'this' DStream.
     */
 -  def transform[U: ClassManifest](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
 +  def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
-     new TransformedDStream(this, context.sparkContext.clean(transformFunc))
+     //new TransformedDStream(this, context.sparkContext.clean(transformFunc))
+     val cleanedF = context.sparkContext.clean(transformFunc)
+     val realTransformFunc =  (rdds: Seq[RDD[_]], time: Time) => {
+       assert(rdds.length == 1)
+       cleanedF(rdds.head.asInstanceOf[RDD[T]], time)
+     }
+     new TransformedDStream[U](Seq(this), realTransformFunc)
+   }
+ 
+   /**
+    * Return a new DStream in which each RDD is generated by applying a function
+    * on each RDD of 'this' DStream and 'other' DStream.
+    */
 -  def transformWith[U: ClassManifest, V: ClassManifest](
++  def transformWith[U: ClassTag, V: ClassTag](
+       other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]
+     ): DStream[V] = {
+     val cleanedF = ssc.sparkContext.clean(transformFunc)
+     transformWith(other, (rdd1: RDD[T], rdd2: RDD[U], time: Time) => cleanedF(rdd1, rdd2))
+   }
+ 
+   /**
+    * Return a new DStream in which each RDD is generated by applying a function
+    * on each RDD of 'this' DStream and 'other' DStream.
+    */
 -  def transformWith[U: ClassManifest, V: ClassManifest](
++  def transformWith[U: ClassTag, V: ClassTag](
+       other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]
+     ): DStream[V] = {
+     val cleanedF = ssc.sparkContext.clean(transformFunc)
+     val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
+       assert(rdds.length == 2)
+       val rdd1 = rdds(0).asInstanceOf[RDD[T]]
+       val rdd2 = rdds(1).asInstanceOf[RDD[U]]
+       cleanedF(rdd1, rdd2, time)
+     }
+     new TransformedDStream[V](Seq(this, other), realTransformFunc)
    }
  
    /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala
----------------------------------------------------------------------
diff --cc streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala
index b761646,b97fb7e..66fe6e7
--- a/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala
@@@ -28,7 -28,9 +28,9 @@@ import scala.collection.mutable.Queu
  
  import akka.actor._
  import akka.pattern.ask
 -import akka.util.duration._
 +import scala.concurrent.duration._
+ import akka.dispatch._
+ import org.apache.spark.storage.BlockId
  
  private[streaming] sealed trait NetworkInputTrackerMessage
  private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
----------------------------------------------------------------------
diff --cc streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
index f021e29,8c12fd1..ea5c165
--- a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
@@@ -18,9 -18,9 +18,7 @@@
  package org.apache.spark.streaming
  
  import org.apache.spark.streaming.StreamingContext._
--import org.apache.spark.streaming.dstream.{ReducedWindowedDStream, StateDStream}
- import org.apache.spark.streaming.dstream.{CoGroupedDStream, ShuffledDStream}
- import org.apache.spark.streaming.dstream.{MapValuedDStream, FlatMapValuedDStream}
 -import org.apache.spark.streaming.dstream.{ShuffledDStream}
 -import org.apache.spark.streaming.dstream.{MapValuedDStream, FlatMapValuedDStream}
++import org.apache.spark.streaming.dstream._
  
  import org.apache.spark.{Partitioner, HashPartitioner}
  import org.apache.spark.SparkContext._
@@@ -35,8 -34,8 +33,9 @@@ import org.apache.hadoop.mapreduce.{Out
  import org.apache.hadoop.mapred.OutputFormat
  import org.apache.hadoop.security.UserGroupInformation
  import org.apache.hadoop.conf.Configuration
++import scala.Some
  
 -class PairDStreamFunctions[K: ClassManifest, V: ClassManifest](self: DStream[(K,V)])
 +class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
  extends Serializable {
  
    private[streaming] def ssc = self.ssc
@@@ -399,33 -398,46 +398,46 @@@
       new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner)
    }
  
- 
+   /**
+    * Return a new DStream by applying a map function to the value of each key-value pairs in
+    * 'this' DStream without changing the key.
+    */
 -  def mapValues[U: ClassManifest](mapValuesFunc: V => U): DStream[(K, U)] = {
 +  def mapValues[U: ClassTag](mapValuesFunc: V => U): DStream[(K, U)] = {
      new MapValuedDStream[K, V, U](self, mapValuesFunc)
    }
  
+   /**
+    * Return a new DStream by applying a flatmap function to the value of each key-value pairs in
+    * 'this' DStream without changing the key.
+    */
 -  def flatMapValues[U: ClassManifest](
 +  def flatMapValues[U: ClassTag](
        flatMapValuesFunc: V => TraversableOnce[U]
      ): DStream[(K, U)] = {
      new FlatMapValuedDStream[K, V, U](self, flatMapValuesFunc)
    }
  
    /**
-    * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this`
-    * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that
-    * key in both RDDs. HashPartitioner is used to partition each generated RDD into default number
+    * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
+    * Hash partitioning is used to generate the RDDs with Spark's default number
     * of partitions.
     */
 -  def cogroup[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = {
 +  def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = {
      cogroup(other, defaultPartitioner())
    }
  
    /**
-    * Cogroup `this` DStream with `other` DStream using a partitioner. For each key k in corresponding RDDs of `this`
-    * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that
-    * key in both RDDs. Partitioner is used to partition each generated RDD.
+    * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
+    * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+    */
 -  def cogroup[W: ClassManifest](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (Seq[V], Seq[W]))] = {
++  def cogroup[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (Seq[V], Seq[W]))] = {
+     cogroup(other, defaultPartitioner(numPartitions))
+   }
+ 
+   /**
+    * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
+    * The supplied [[org.apache.spark.Partitioner]] is used to partition the generated RDDs.
     */
 -  def cogroup[W: ClassManifest](
 +  def cogroup[W: ClassTag](
        other: DStream[(K, W)],
        partitioner: Partitioner
      ): DStream[(K, (Seq[V], Seq[W]))] = {
@@@ -441,31 -448,105 +448,105 @@@
    }
  
    /**
-    * Join `this` DStream with `other` DStream. HashPartitioner is used
-    * to partition each generated RDD into default number of partitions.
+    * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
+    * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
     */
 -  def join[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (V, W))] = {
 +  def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] = {
      join[W](other, defaultPartitioner())
    }
  
    /**
-    * Join `this` DStream with `other` DStream, that is, each RDD of the new DStream will
-    * be generated by joining RDDs from `this` and other DStream. Uses the given
-    * Partitioner to partition each generated RDD.
+    * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
+    * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+    */
 -  def join[W: ClassManifest](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))] = {
++  def join[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))] = {
+     join[W](other, defaultPartitioner(numPartitions))
+   }
+ 
+   /**
+    * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
+    * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
     */
 -  def join[W: ClassManifest](
 +  def join[W: ClassTag](
        other: DStream[(K, W)],
        partitioner: Partitioner
      ): DStream[(K, (V, W))] = {
-     this.cogroup(other, partitioner)
-         .flatMapValues{
-       case (vs, ws) =>
-         for (v <- vs.iterator; w <- ws.iterator) yield (v, w)
-     }
+     self.transformWith(
+       other,
+       (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.join(rdd2, partitioner)
+     )
+   }
+ 
+   /**
+    * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
+    * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
+    * number of partitions.
+    */
 -  def leftOuterJoin[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))] = {
++  def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))] = {
+     leftOuterJoin[W](other, defaultPartitioner())
+   }
+ 
+   /**
+    * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
+    * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
+    * partitions.
+    */
 -  def leftOuterJoin[W: ClassManifest](
++  def leftOuterJoin[W: ClassTag](
+       other: DStream[(K, W)],
+       numPartitions: Int
+     ): DStream[(K, (V, Option[W]))] = {
+     leftOuterJoin[W](other, defaultPartitioner(numPartitions))
+   }
+ 
+   /**
+    * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
+    * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
+    * the partitioning of each RDD.
+    */
 -  def leftOuterJoin[W: ClassManifest](
++  def leftOuterJoin[W: ClassTag](
+       other: DStream[(K, W)],
+       partitioner: Partitioner
+     ): DStream[(K, (V, Option[W]))] = {
+     self.transformWith(
+       other,
+       (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.leftOuterJoin(rdd2, partitioner)
+     )
+   }
+ 
+   /**
+    * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
+    * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
+    * number of partitions.
+    */
 -  def rightOuterJoin[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Option[V], W))] = {
++  def rightOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], W))] = {
+     rightOuterJoin[W](other, defaultPartitioner())
+   }
+ 
+   /**
+    * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
+    * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
+    * partitions.
+    */
 -  def rightOuterJoin[W: ClassManifest](
++  def rightOuterJoin[W: ClassTag](
+       other: DStream[(K, W)],
+       numPartitions: Int
+     ): DStream[(K, (Option[V], W))] = {
+     rightOuterJoin[W](other, defaultPartitioner(numPartitions))
+   }
+ 
+   /**
+    * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
+    * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
+    * the partitioning of each RDD.
+    */
 -  def rightOuterJoin[W: ClassManifest](
++  def rightOuterJoin[W: ClassTag](
+       other: DStream[(K, W)],
+       partitioner: Partitioner
+     ): DStream[(K, (Option[V], W))] = {
+     self.transformWith(
+       other,
+       (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.rightOuterJoin(rdd2, partitioner)
+     )
    }
  
    /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --cc streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index c722aa1,70bf902..d2c4fde
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@@ -268,7 -274,11 +276,11 @@@ class StreamingContext private 
     *               in its own thread.
     * @param storageLevel  Storage level to use for storing the received objects
     */
-   def kafkaStream[T: ClassTag, D <: kafka.serializer.Decoder[_]: Manifest](
+   def kafkaStream[
 -    K: ClassManifest,
 -    V: ClassManifest,
++    K: ClassTag,
++    V: ClassTag,
+     U <: kafka.serializer.Decoder[_]: Manifest,
+     T <: kafka.serializer.Decoder[_]: Manifest](
        kafkaParams: Map[String, String],
        topics: Map[String, Int],
        storageLevel: StorageLevel
@@@ -452,14 -462,40 +464,40 @@@
      inputStream
    }
  
+ /**
+    * Create an input stream that receives messages pushed by a mqtt publisher.
+    * @param brokerUrl Url of remote mqtt publisher
+    * @param topic topic name to subscribe to
+    * @param storageLevel RDD storage level. Defaults to memory-only.
+    */
+ 
+   def mqttStream(
+     brokerUrl: String,
+     topic: String,
+     storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2): DStream[String] = {
+     val inputStream = new MQTTInputDStream[String](this, brokerUrl, topic, storageLevel)
+     registerInputStream(inputStream)
+     inputStream
+   }
    /**
-    * Create a unified DStream from multiple DStreams of the same type and same interval
+    * Create a unified DStream from multiple DStreams of the same type and same slide duration.
     */
 -  def union[T: ClassManifest](streams: Seq[DStream[T]]): DStream[T] = {
 +  def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T] = {
      new UnionDStream[T](streams.toArray)
    }
  
    /**
+    * Create a new DStream in which each RDD is generated by applying a function on RDDs of
+    * the DStreams.
+    */
 -  def transform[T: ClassManifest](
++  def transform[T: ClassTag](
+       dstreams: Seq[DStream[_]],
+       transformFunc: (Seq[RDD[_]], Time) => RDD[T]
+     ): DStream[T] = {
+     new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc))
+   }
+ 
+   /**
     * Register an input stream that will be started (InputDStream.start() called) to get the
     * input data.
     */

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
----------------------------------------------------------------------
diff --cc streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
index 0d54d78,1a2aeaa..d29033d
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
@@@ -23,11 -23,9 +23,11 @@@ import org.apache.spark.api.java.JavaRD
  import org.apache.spark.storage.StorageLevel
  import org.apache.spark.rdd.RDD
  
 +import scala.reflect.ClassTag
 +
  /**
   * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
-- * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.RDD]]
++ * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.rdd.RDD]]
   * for more details on RDDs). DStreams can either be created from live data (such as, data from
   * HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations
   * such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
----------------------------------------------------------------------
diff --cc streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index 4508e48,09189ea..64f38ce
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@@ -255,11 -257,11 +258,11 @@@ trait JavaDStreamLike[T, This <: JavaDS
  
    /**
     * Return a new DStream in which each RDD is generated by applying a function
-    * on each RDD of this DStream.
+    * on each RDD of 'this' DStream.
     */
    def transform[U](transformFunc: JFunction[R, JavaRDD[U]]): JavaDStream[U] = {
 -    implicit val cm: ClassManifest[U] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
 +    implicit val cm: ClassTag[U] =
 +      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
      def scalaTransform (in: RDD[T]): RDD[U] =
        transformFunc.call(wrapRDD(in)).rdd
      dstream.transform(scalaTransform(_))
@@@ -267,11 -269,11 +270,11 @@@
  
    /**
     * Return a new DStream in which each RDD is generated by applying a function
-    * on each RDD of this DStream.
+    * on each RDD of 'this' DStream.
     */
    def transform[U](transformFunc: JFunction2[R, Time, JavaRDD[U]]): JavaDStream[U] = {
 -    implicit val cm: ClassManifest[U] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
 +    implicit val cm: ClassTag[U] =
 +      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
      def scalaTransform (in: RDD[T], time: Time): RDD[U] =
        transformFunc.call(wrapRDD(in), time).rdd
      dstream.transform(scalaTransform(_, _))
@@@ -308,6 -310,82 +311,82 @@@
    }
  
    /**
+    * Return a new DStream in which each RDD is generated by applying a function
+    * on each RDD of 'this' DStream and 'other' DStream.
+    */
+   def transformWith[U, W](
+       other: JavaDStream[U],
+       transformFunc: JFunction3[R, JavaRDD[U], Time, JavaRDD[W]]
+     ): JavaDStream[W] = {
 -    implicit val cmu: ClassManifest[U] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
 -    implicit val cmv: ClassManifest[W] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
++    implicit val cmu: ClassTag[U] =
++      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
++    implicit val cmv: ClassTag[W] =
++      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+     def scalaTransform (inThis: RDD[T], inThat: RDD[U], time: Time): RDD[W] =
+       transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
+     dstream.transformWith[U, W](other.dstream, scalaTransform(_, _, _))
+   }
+ 
+   /**
+    * Return a new DStream in which each RDD is generated by applying a function
+    * on each RDD of 'this' DStream and 'other' DStream.
+    */
+   def transformWith[U, K2, V2](
+       other: JavaDStream[U],
+       transformFunc: JFunction3[R, JavaRDD[U], Time, JavaPairRDD[K2, V2]]
+     ): JavaPairDStream[K2, V2] = {
 -    implicit val cmu: ClassManifest[U] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
 -    implicit val cmk2: ClassManifest[K2] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
 -    implicit val cmv2: ClassManifest[V2] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
++    implicit val cmu: ClassTag[U] =
++      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
++    implicit val cmk2: ClassTag[K2] =
++      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
++    implicit val cmv2: ClassTag[V2] =
++      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
+     def scalaTransform (inThis: RDD[T], inThat: RDD[U], time: Time): RDD[(K2, V2)] =
+       transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
+     dstream.transformWith[U, (K2, V2)](other.dstream, scalaTransform(_, _, _))
+   }
+ 
+   /**
+    * Return a new DStream in which each RDD is generated by applying a function
+    * on each RDD of 'this' DStream and 'other' DStream.
+    */
+   def transformWith[K2, V2, W](
+       other: JavaPairDStream[K2, V2],
+       transformFunc: JFunction3[R, JavaPairRDD[K2, V2], Time, JavaRDD[W]]
+     ): JavaDStream[W] = {
 -    implicit val cmk2: ClassManifest[K2] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
 -    implicit val cmv2: ClassManifest[V2] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
 -    implicit val cmw: ClassManifest[W] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
++    implicit val cmk2: ClassTag[K2] =
++      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
++    implicit val cmv2: ClassTag[V2] =
++      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
++    implicit val cmw: ClassTag[W] =
++      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+     def scalaTransform (inThis: RDD[T], inThat: RDD[(K2, V2)], time: Time): RDD[W] =
+       transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
+     dstream.transformWith[(K2, V2), W](other.dstream, scalaTransform(_, _, _))
+   }
+ 
+   /**
+    * Return a new DStream in which each RDD is generated by applying a function
+    * on each RDD of 'this' DStream and 'other' DStream.
+    */
+   def transformWith[K2, V2, K3, V3](
+       other: JavaPairDStream[K2, V2],
+       transformFunc: JFunction3[R, JavaPairRDD[K2, V2], Time, JavaPairRDD[K3, V3]]
+     ): JavaPairDStream[K3, V3] = {
 -    implicit val cmk2: ClassManifest[K2] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
 -    implicit val cmv2: ClassManifest[V2] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
 -    implicit val cmk3: ClassManifest[K3] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K3]]
 -    implicit val cmv3: ClassManifest[V3] =
 -      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V3]]
++    implicit val cmk2: ClassTag[K2] =
++      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
++    implicit val cmv2: ClassTag[V2] =
++      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
++    implicit val cmk3: ClassTag[K3] =
++      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K3]]
++    implicit val cmv3: ClassTag[V3] =
++      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V3]]
+     def scalaTransform (inThis: RDD[T], inThat: RDD[(K2, V2)], time: Time): RDD[(K3, V3)] =
+       transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
+     dstream.transformWith[(K2, V2), (K3, V3)](other.dstream, scalaTransform(_, _, _))
+   }
+ 
+   /**
     * Enable periodic checkpointing of RDDs of this DStream
     * @param interval Time interval after which generated RDD will be checkpointed
     */