You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2013/12/14 09:41:50 UTC
[06/50] [abbrv] Merge branch 'master' into scala-2.10
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 0000000,a45bee5..3ccc38d
mode 000000,100644..100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@@ -1,0 -1,238 +1,238 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.spark.scheduler.cluster
+
+ import java.util.concurrent.atomic.AtomicInteger
+
+ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
++import scala.concurrent.Await
++import scala.concurrent.duration._
+
+ import akka.actor._
-import akka.dispatch.Await
+ import akka.pattern.ask
-import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent}
-import akka.util.Duration
-import akka.util.duration._
++import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent}
+
+ import org.apache.spark.{SparkException, Logging, TaskState}
+ import org.apache.spark.scheduler.TaskDescription
+ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
+ import org.apache.spark.util.Utils
+
+ /**
+ * A scheduler backend that waits for coarse grained executors to connect to it through Akka.
+ * This backend holds onto each executor for the duration of the Spark job rather than relinquishing
+ * executors whenever a task is done and asking the scheduler to launch a new executor for
+ * each new task. Executors may be launched in a variety of ways, such as Mesos tasks for the
+ * coarse-grained Mesos mode or standalone processes for Spark's standalone deploy mode
+ * (spark.deploy.*).
+ */
+ private[spark]
+ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem)
+ extends SchedulerBackend with Logging
+ {
+ // Use an atomic variable to track total number of cores in the cluster for simplicity and speed
+ var totalCoreCount = new AtomicInteger(0)
+
+ class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
+ private val executorActor = new HashMap[String, ActorRef]
+ private val executorAddress = new HashMap[String, Address]
+ private val executorHost = new HashMap[String, String]
+ private val freeCores = new HashMap[String, Int]
+ private val actorToExecutorId = new HashMap[ActorRef, String]
+ private val addressToExecutorId = new HashMap[Address, String]
+
+ override def preStart() {
+ // Listen for remote client disconnection events, since they don't go through Akka's watch()
- context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
++ context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
+
+ // Periodically revive offers to allow delay scheduling to work
+ val reviveInterval = System.getProperty("spark.scheduler.revive.interval", "1000").toLong
++ import context.dispatcher
+ context.system.scheduler.schedule(0.millis, reviveInterval.millis, self, ReviveOffers)
+ }
+
+ def receive = {
+ case RegisterExecutor(executorId, hostPort, cores) =>
+ Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
+ if (executorActor.contains(executorId)) {
+ sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
+ } else {
+ logInfo("Registered executor: " + sender + " with ID " + executorId)
+ sender ! RegisteredExecutor(sparkProperties)
+ context.watch(sender)
+ executorActor(executorId) = sender
+ executorHost(executorId) = Utils.parseHostPort(hostPort)._1
+ freeCores(executorId) = cores
+ executorAddress(executorId) = sender.path.address
+ actorToExecutorId(sender) = executorId
+ addressToExecutorId(sender.path.address) = executorId
+ totalCoreCount.addAndGet(cores)
+ makeOffers()
+ }
+
+ case StatusUpdate(executorId, taskId, state, data) =>
+ scheduler.statusUpdate(taskId, state, data.value)
+ if (TaskState.isFinished(state)) {
+ if (executorActor.contains(executorId)) {
+ freeCores(executorId) += 1
+ makeOffers(executorId)
+ } else {
+ // Ignoring the update since we don't know about the executor.
+ val msg = "Ignored task status update (%d state %s) from unknown executor %s with ID %s"
+ logWarning(msg.format(taskId, state, sender, executorId))
+ }
+ }
+
+ case ReviveOffers =>
+ makeOffers()
+
+ case KillTask(taskId, executorId) =>
+ executorActor(executorId) ! KillTask(taskId, executorId)
+
+ case StopDriver =>
+ sender ! true
+ context.stop(self)
+
+ case StopExecutors =>
+ logInfo("Asking each executor to shut down")
+ for (executor <- executorActor.values) {
+ executor ! StopExecutor
+ }
+ sender ! true
+
+ case RemoveExecutor(executorId, reason) =>
+ removeExecutor(executorId, reason)
+ sender ! true
+
+ case Terminated(actor) =>
+ actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka actor terminated"))
+
- case RemoteClientDisconnected(transport, address) =>
- addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disconnected"))
++ case DisassociatedEvent(_, remoteAddress, _) =>
++ addressToExecutorId.get(remoteAddress).foreach(removeExecutor(_, "remote Akka client disconnected"))
+
- case RemoteClientShutdown(transport, address) =>
- addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client shutdown"))
++ case AssociationErrorEvent(_, _, remoteAddress, _) =>
++ addressToExecutorId.get(remoteAddress).foreach(removeExecutor(_, "remote Akka client shutdown"))
+ }
+
+ // Make fake resource offers on all executors
+ def makeOffers() {
+ launchTasks(scheduler.resourceOffers(
+ executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
+ }
+
+ // Make fake resource offers on just one executor
+ def makeOffers(executorId: String) {
+ launchTasks(scheduler.resourceOffers(
+ Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId)))))
+ }
+
+ // Launch tasks returned by a set of resource offers
+ def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
+ for (task <- tasks.flatten) {
+ freeCores(task.executorId) -= 1
+ executorActor(task.executorId) ! LaunchTask(task)
+ }
+ }
+
+ // Remove a disconnected slave from the cluster
+ def removeExecutor(executorId: String, reason: String) {
+ if (executorActor.contains(executorId)) {
+ logInfo("Executor " + executorId + " disconnected, so removing it")
+ val numCores = freeCores(executorId)
+ actorToExecutorId -= executorActor(executorId)
+ addressToExecutorId -= executorAddress(executorId)
+ executorActor -= executorId
+ executorHost -= executorId
+ freeCores -= executorId
+ totalCoreCount.addAndGet(-numCores)
+ scheduler.executorLost(executorId, SlaveLost(reason))
+ }
+ }
+ }
+
+ var driverActor: ActorRef = null
+ val taskIdsOnSlave = new HashMap[String, HashSet[String]]
+
+ override def start() {
+ val properties = new ArrayBuffer[(String, String)]
+ val iterator = System.getProperties.entrySet.iterator
+ while (iterator.hasNext) {
+ val entry = iterator.next
+ val (key, value) = (entry.getKey.toString, entry.getValue.toString)
+ if (key.startsWith("spark.") && !key.equals("spark.hostPort")) {
+ properties += ((key, value))
+ }
+ }
+ driverActor = actorSystem.actorOf(
+ Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)
+ }
+
+ private val timeout = {
+ Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
+ }
+
+ def stopExecutors() {
+ try {
+ if (driverActor != null) {
+ logInfo("Shutting down all executors")
+ val future = driverActor.ask(StopExecutors)(timeout)
+ Await.ready(future, timeout)
+ }
+ } catch {
+ case e: Exception =>
+ throw new SparkException("Error asking standalone scheduler to shut down executors", e)
+ }
+ }
+
+ override def stop() {
+ try {
+ if (driverActor != null) {
+ val future = driverActor.ask(StopDriver)(timeout)
+ Await.ready(future, timeout)
+ }
+ } catch {
+ case e: Exception =>
+ throw new SparkException("Error stopping standalone scheduler's driver actor", e)
+ }
+ }
+
+ override def reviveOffers() {
+ driverActor ! ReviveOffers
+ }
+
+ override def killTask(taskId: Long, executorId: String) {
+ driverActor ! KillTask(taskId, executorId)
+ }
+
+ override def defaultParallelism() = Option(System.getProperty("spark.default.parallelism"))
+ .map(_.toInt).getOrElse(math.max(totalCoreCount.get(), 2))
+
+ // Called by subclasses when notified of a lost worker
+ def removeExecutor(executorId: String, reason: String) {
+ try {
+ val future = driverActor.ask(RemoveExecutor(executorId, reason))(timeout)
+ Await.ready(future, timeout)
+ } catch {
+ case e: Exception =>
+ throw new SparkException("Error notifying standalone scheduler's driver actor", e)
+ }
+ }
+ }
+
+ private[spark] object CoarseGrainedSchedulerBackend {
+ val ACTOR_NAME = "CoarseGrainedScheduler"
+ }
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index fa83ae1,cefa970..7127a72
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@@ -42,12 -42,12 +42,12 @@@ private[spark] class SparkDeploySchedul
super.start()
// The endpoint for executors to talk to us
- val driverUrl = "akka://spark@%s:%s/user/%s".format(
+ val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
- StandaloneSchedulerBackend.ACTOR_NAME)
+ CoarseGrainedSchedulerBackend.ACTOR_NAME)
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}")
val command = Command(
- "org.apache.spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs)
+ "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs)
val sparkHome = sc.getSparkHome().getOrElse(null)
val appDesc = new ApplicationDescription(appName, maxCores, executorMemory, command, sparkHome,
"http://" + sc.ui.appUIAddress)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index bf4040f,300fe69..8de9b72
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@@ -119,10 -120,10 +120,10 @@@ private[spark] class CoarseMesosSchedul
}
val command = CommandInfo.newBuilder()
.setEnvironment(environment)
- val driverUrl = "akka://spark@%s:%s/user/%s".format(
+ val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
System.getProperty("spark.driver.host"),
System.getProperty("spark.driver.port"),
- StandaloneSchedulerBackend.ACTOR_NAME)
+ CoarseGrainedSchedulerBackend.ACTOR_NAME)
val uri = System.getProperty("spark.executor.uri")
if (uri == null) {
val runScript = new File(sparkHome, "spark-class").getCanonicalPath
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 7852849,a34c95b..252329c
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@@ -17,17 -17,18 +17,18 @@@
package org.apache.spark.storage
- import java.io.{InputStream, OutputStream}
+ import java.io.{File, InputStream, OutputStream}
import java.nio.{ByteBuffer, MappedByteBuffer}
- import scala.collection.mutable.{HashMap, ArrayBuffer, HashSet}
+ import scala.collection.mutable.{HashMap, ArrayBuffer}
+ import scala.util.Random
import akka.actor.{ActorSystem, Cancellable, Props}
-import akka.dispatch.{Await, Future}
-import akka.util.Duration
-import akka.util.duration._
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration._
- import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
+ import it.unimi.dsi.fastutil.io.{FastBufferedOutputStream, FastByteArrayOutputStream}
import org.apache.spark.{Logging, SparkEnv, SparkException}
import org.apache.spark.io.CompressionCodec
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/util/Utils.scala
index 94ce50e,fe932d8..7557dda
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@@ -18,22 -18,14 +18,18 @@@
package org.apache.spark.util
import java.io._
- import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address, ServerSocket}
+ import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address}
import java.util.{Locale, Random, UUID}
+ import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor}
- import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadFactory, ThreadPoolExecutor}
- import java.util.regex.Pattern
-import scala.collection.Map
+
- import org.apache.hadoop.conf.Configuration
- import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
-
- import scala.collection.mutable.{ArrayBuffer, HashMap}
+ import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
+import scala.collection.Map
import scala.io.Source
+import scala.reflect.ClassTag
+import scala.Some
+
import com.google.common.io.Files
import com.google.common.util.concurrent.ThreadFactoryBuilder
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
index 0000000,80545c9..45849b3
mode 000000,100644..100644
--- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
@@@ -1,0 -1,152 +1,154 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.spark.util.collection
+
++import scala.reflect.ClassTag
++
+
+ /**
+ * A fast hash map implementation for nullable keys. This hash map supports insertions and updates,
+ * but not deletions. This map is about 5X faster than java.util.HashMap, while using much less
+ * space overhead.
+ *
+ * Under the hood, it uses our OpenHashSet implementation.
+ */
+ private[spark]
-class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V: ClassManifest](
++class OpenHashMap[K >: Null : ClassTag, @specialized(Long, Int, Double) V: ClassTag](
+ initialCapacity: Int)
+ extends Iterable[(K, V)]
+ with Serializable {
+
+ def this() = this(64)
+
+ protected var _keySet = new OpenHashSet[K](initialCapacity)
+
+ // Init in constructor (instead of in declaration) to work around a Scala compiler specialization
+ // bug that would generate two arrays (one for Object and one for specialized T).
+ private var _values: Array[V] = _
+ _values = new Array[V](_keySet.capacity)
+
+ @transient private var _oldValues: Array[V] = null
+
+ // Treat the null key differently so we can use nulls in "data" to represent empty items.
+ private var haveNullValue = false
+ private var nullValue: V = null.asInstanceOf[V]
+
+ override def size: Int = if (haveNullValue) _keySet.size + 1 else _keySet.size
+
+ /** Get the value for a given key */
+ def apply(k: K): V = {
+ if (k == null) {
+ nullValue
+ } else {
+ val pos = _keySet.getPos(k)
+ if (pos < 0) {
+ null.asInstanceOf[V]
+ } else {
+ _values(pos)
+ }
+ }
+ }
+
+ /** Set the value for a key */
+ def update(k: K, v: V) {
+ if (k == null) {
+ haveNullValue = true
+ nullValue = v
+ } else {
+ val pos = _keySet.addWithoutResize(k) & OpenHashSet.POSITION_MASK
+ _values(pos) = v
+ _keySet.rehashIfNeeded(k, grow, move)
+ _oldValues = null
+ }
+ }
+
+ /**
+ * If the key doesn't exist yet in the hash map, set its value to defaultValue; otherwise,
+ * set its value to mergeValue(oldValue).
+ *
+ * @return the newly updated value.
+ */
+ def changeValue(k: K, defaultValue: => V, mergeValue: (V) => V): V = {
+ if (k == null) {
+ if (haveNullValue) {
+ nullValue = mergeValue(nullValue)
+ } else {
+ haveNullValue = true
+ nullValue = defaultValue
+ }
+ nullValue
+ } else {
+ val pos = _keySet.addWithoutResize(k)
+ if ((pos & OpenHashSet.NONEXISTENCE_MASK) != 0) {
+ val newValue = defaultValue
+ _values(pos & OpenHashSet.POSITION_MASK) = newValue
+ _keySet.rehashIfNeeded(k, grow, move)
+ newValue
+ } else {
+ _values(pos) = mergeValue(_values(pos))
+ _values(pos)
+ }
+ }
+ }
+
+ override def iterator = new Iterator[(K, V)] {
+ var pos = -1
+ var nextPair: (K, V) = computeNextPair()
+
+ /** Get the next value we should return from next(), or null if we're finished iterating */
+ def computeNextPair(): (K, V) = {
+ if (pos == -1) { // Treat position -1 as looking at the null value
+ if (haveNullValue) {
+ pos += 1
+ return (null.asInstanceOf[K], nullValue)
+ }
+ pos += 1
+ }
+ pos = _keySet.nextPos(pos)
+ if (pos >= 0) {
+ val ret = (_keySet.getValue(pos), _values(pos))
+ pos += 1
+ ret
+ } else {
+ null
+ }
+ }
+
+ def hasNext = nextPair != null
+
+ def next() = {
+ val pair = nextPair
+ nextPair = computeNextPair()
+ pair
+ }
+ }
+
+ // The following member variables are declared as protected instead of private for the
+ // specialization to work (specialized class extends the non-specialized one and needs access
+ // to the "private" variables).
+ // They also should have been val's. We use var's because there is a Scala compiler bug that
+ // would throw illegal access error at runtime if they are declared as val's.
+ protected var grow = (newCapacity: Int) => {
+ _oldValues = _values
+ _values = new Array[V](newCapacity)
+ }
+
+ protected var move = (oldPos: Int, newPos: Int) => {
+ _values(newPos) = _oldValues(oldPos)
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
index 0000000,4592e4f..49d95af
mode 000000,100644..100644
--- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
@@@ -1,0 -1,271 +1,272 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.spark.util.collection
+
++import scala.reflect._
+
+ /**
+ * A simple, fast hash set optimized for non-null insertion-only use case, where keys are never
+ * removed.
+ *
+ * The underlying implementation uses Scala compiler's specialization to generate optimized
+ * storage for two primitive types (Long and Int). It is much faster than Java's standard HashSet
+ * while incurring much less memory overhead. This can serve as building blocks for higher level
+ * data structures such as an optimized HashMap.
+ *
+ * This OpenHashSet is designed to serve as building blocks for higher level data structures
+ * such as an optimized hash map. Compared with standard hash set implementations, this class
+ * provides its various callbacks interfaces (e.g. allocateFunc, moveFunc) and interfaces to
+ * retrieve the position of a key in the underlying array.
+ *
+ * It uses quadratic probing with a power-of-2 hash table size, which is guaranteed
+ * to explore all spaces for each key (see http://en.wikipedia.org/wiki/Quadratic_probing).
+ */
+ private[spark]
-class OpenHashSet[@specialized(Long, Int) T: ClassManifest](
++class OpenHashSet[@specialized(Long, Int) T: ClassTag](
+ initialCapacity: Int,
+ loadFactor: Double)
+ extends Serializable {
+
+ require(initialCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements")
+ require(initialCapacity >= 1, "Invalid initial capacity")
+ require(loadFactor < 1.0, "Load factor must be less than 1.0")
+ require(loadFactor > 0.0, "Load factor must be greater than 0.0")
+
+ import OpenHashSet._
+
+ def this(initialCapacity: Int) = this(initialCapacity, 0.7)
+
+ def this() = this(64)
+
+ // The following member variables are declared as protected instead of private for the
+ // specialization to work (specialized class extends the non-specialized one and needs access
+ // to the "private" variables).
+
+ protected val hasher: Hasher[T] = {
+ // It would've been more natural to write the following using pattern matching. But Scala 2.9.x
+ // compiler has a bug when specialization is used together with this pattern matching, and
+ // throws:
+ // scala.tools.nsc.symtab.Types$TypeError: type mismatch;
+ // found : scala.reflect.AnyValManifest[Long]
- // required: scala.reflect.ClassManifest[Int]
++ // required: scala.reflect.ClassTag[Int]
+ // at scala.tools.nsc.typechecker.Contexts$Context.error(Contexts.scala:298)
+ // at scala.tools.nsc.typechecker.Infer$Inferencer.error(Infer.scala:207)
+ // ...
- val mt = classManifest[T]
- if (mt == ClassManifest.Long) {
++ val mt = classTag[T]
++ if (mt == ClassTag.Long) {
+ (new LongHasher).asInstanceOf[Hasher[T]]
- } else if (mt == ClassManifest.Int) {
++ } else if (mt == ClassTag.Int) {
+ (new IntHasher).asInstanceOf[Hasher[T]]
+ } else {
+ new Hasher[T]
+ }
+ }
+
+ protected var _capacity = nextPowerOf2(initialCapacity)
+ protected var _mask = _capacity - 1
+ protected var _size = 0
+
+ protected var _bitset = new BitSet(_capacity)
+
+ // Init of the array in constructor (instead of in declaration) to work around a Scala compiler
+ // specialization bug that would generate two arrays (one for Object and one for specialized T).
+ protected var _data: Array[T] = _
+ _data = new Array[T](_capacity)
+
+ /** Number of elements in the set. */
+ def size: Int = _size
+
+ /** The capacity of the set (i.e. size of the underlying array). */
+ def capacity: Int = _capacity
+
+ /** Return true if this set contains the specified element. */
+ def contains(k: T): Boolean = getPos(k) != INVALID_POS
+
+ /**
+ * Add an element to the set. If the set is over capacity after the insertion, grow the set
+ * and rehash all elements.
+ */
+ def add(k: T) {
+ addWithoutResize(k)
+ rehashIfNeeded(k, grow, move)
+ }
+
+ /**
+ * Add an element to the set. This one differs from add in that it doesn't trigger rehashing.
+ * The caller is responsible for calling rehashIfNeeded.
+ *
+ * Use (retval & POSITION_MASK) to get the actual position, and
+ * (retval & EXISTENCE_MASK) != 0 for prior existence.
+ *
+ * @return The position where the key is placed, plus the highest order bit is set if the key
+ * exists previously.
+ */
+ def addWithoutResize(k: T): Int = putInto(_bitset, _data, k)
+
+ /**
+ * Rehash the set if it is overloaded.
+ * @param k A parameter unused in the function, but to force the Scala compiler to specialize
+ * this method.
+ * @param allocateFunc Callback invoked when we are allocating a new, larger array.
+ * @param moveFunc Callback invoked when we move the key from one position (in the old data array)
+ * to a new position (in the new data array).
+ */
+ def rehashIfNeeded(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, Int) => Unit) {
+ if (_size > loadFactor * _capacity) {
+ rehash(k, allocateFunc, moveFunc)
+ }
+ }
+
+ /**
+ * Return the position of the element in the underlying array, or INVALID_POS if it is not found.
+ */
+ def getPos(k: T): Int = {
+ var pos = hashcode(hasher.hash(k)) & _mask
+ var i = 1
+ while (true) {
+ if (!_bitset.get(pos)) {
+ return INVALID_POS
+ } else if (k == _data(pos)) {
+ return pos
+ } else {
+ val delta = i
+ pos = (pos + delta) & _mask
+ i += 1
+ }
+ }
+ // Never reached here
+ INVALID_POS
+ }
+
+ /** Return the value at the specified position. */
+ def getValue(pos: Int): T = _data(pos)
+
+ /**
+ * Return the next position with an element stored, starting from the given position inclusively.
+ */
+ def nextPos(fromPos: Int): Int = _bitset.nextSetBit(fromPos)
+
+ /**
+ * Put an entry into the set. Return the position where the key is placed. In addition, the
+ * highest bit in the returned position is set if the key exists prior to this put.
+ *
+ * This function assumes the data array has at least one empty slot.
+ */
+ private def putInto(bitset: BitSet, data: Array[T], k: T): Int = {
+ val mask = data.length - 1
+ var pos = hashcode(hasher.hash(k)) & mask
+ var i = 1
+ while (true) {
+ if (!bitset.get(pos)) {
+ // This is a new key.
+ data(pos) = k
+ bitset.set(pos)
+ _size += 1
+ return pos | NONEXISTENCE_MASK
+ } else if (data(pos) == k) {
+ // Found an existing key.
+ return pos
+ } else {
+ val delta = i
+ pos = (pos + delta) & mask
+ i += 1
+ }
+ }
+ // Never reached here
+ assert(INVALID_POS != INVALID_POS)
+ INVALID_POS
+ }
+
+ /**
+ * Double the table's size and re-hash everything. We are not really using k, but it is declared
+ * so Scala compiler can specialize this method (which leads to calling the specialized version
+ * of putInto).
+ *
+ * @param k A parameter unused in the function, but to force the Scala compiler to specialize
+ * this method.
+ * @param allocateFunc Callback invoked when we are allocating a new, larger array.
+ * @param moveFunc Callback invoked when we move the key from one position (in the old data array)
+ * to a new position (in the new data array).
+ */
+ private def rehash(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, Int) => Unit) {
+ val newCapacity = _capacity * 2
+ require(newCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements")
+
+ allocateFunc(newCapacity)
+ val newData = new Array[T](newCapacity)
+ val newBitset = new BitSet(newCapacity)
+ var pos = 0
+ _size = 0
+ while (pos < _capacity) {
+ if (_bitset.get(pos)) {
+ val newPos = putInto(newBitset, newData, _data(pos))
+ moveFunc(pos, newPos & POSITION_MASK)
+ }
+ pos += 1
+ }
+ _bitset = newBitset
+ _data = newData
+ _capacity = newCapacity
+ _mask = newCapacity - 1
+ }
+
+ /**
+ * Re-hash a value to deal better with hash functions that don't differ
+ * in the lower bits, similar to java.util.HashMap
+ */
+ private def hashcode(h: Int): Int = {
+ val r = h ^ (h >>> 20) ^ (h >>> 12)
+ r ^ (r >>> 7) ^ (r >>> 4)
+ }
+
+ private def nextPowerOf2(n: Int): Int = {
+ val highBit = Integer.highestOneBit(n)
+ if (highBit == n) n else highBit << 1
+ }
+ }
+
+
+ private[spark]
+ object OpenHashSet {
+
+ val INVALID_POS = -1
+ val NONEXISTENCE_MASK = 0x80000000
+ val POSITION_MASK = 0xEFFFFFF
+
+ /**
+ * A set of specialized hash function implementation to avoid boxing hash code computation
+ * in the specialized implementation of OpenHashSet.
+ */
+ sealed class Hasher[@specialized(Long, Int) T] {
+ def hash(o: T): Int = o.hashCode()
+ }
+
+ class LongHasher extends Hasher[Long] {
+ override def hash(o: Long): Int = (o ^ (o >>> 32)).toInt
+ }
+
+ class IntHasher extends Hasher[Int] {
+ override def hash(o: Int): Int = o
+ }
+
+ private def grow1(newSize: Int) {}
+ private def move1(oldPos: Int, newPos: Int) { }
+
+ private val grow = grow1 _
+ private val move = move1 _
+ }
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
index 0000000,d76143e..2e1ef06
mode 000000,100644..100644
--- a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
@@@ -1,0 -1,127 +1,128 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.spark.util.collection
+
++import scala.reflect._
+
+ /**
+ * A fast hash map implementation for primitive, non-null keys. This hash map supports
+ * insertions and updates, but not deletions. This map is about an order of magnitude
+ * faster than java.util.HashMap, while using much less space overhead.
+ *
+ * Under the hood, it uses our OpenHashSet implementation.
+ */
+ private[spark]
-class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest,
- @specialized(Long, Int, Double) V: ClassManifest](
++class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassTag,
++ @specialized(Long, Int, Double) V: ClassTag](
+ initialCapacity: Int)
+ extends Iterable[(K, V)]
+ with Serializable {
+
+ def this() = this(64)
+
- require(classManifest[K] == classManifest[Long] || classManifest[K] == classManifest[Int])
++ require(classTag[K] == classTag[Long] || classTag[K] == classTag[Int])
+
+ // Init in constructor (instead of in declaration) to work around a Scala compiler specialization
+ // bug that would generate two arrays (one for Object and one for specialized T).
+ protected var _keySet: OpenHashSet[K] = _
+ private var _values: Array[V] = _
+ _keySet = new OpenHashSet[K](initialCapacity)
+ _values = new Array[V](_keySet.capacity)
+
+ private var _oldValues: Array[V] = null
+
+ override def size = _keySet.size
+
+ /** Get the value for a given key */
+ def apply(k: K): V = {
+ val pos = _keySet.getPos(k)
+ _values(pos)
+ }
+
+ /** Get the value for a given key, or returns elseValue if it doesn't exist. */
+ def getOrElse(k: K, elseValue: V): V = {
+ val pos = _keySet.getPos(k)
+ if (pos >= 0) _values(pos) else elseValue
+ }
+
+ /** Set the value for a key */
+ def update(k: K, v: V) {
+ val pos = _keySet.addWithoutResize(k) & OpenHashSet.POSITION_MASK
+ _values(pos) = v
+ _keySet.rehashIfNeeded(k, grow, move)
+ _oldValues = null
+ }
+
+ /**
+ * If the key doesn't exist yet in the hash map, set its value to defaultValue; otherwise,
+ * set its value to mergeValue(oldValue).
+ *
+ * @return the newly updated value.
+ */
+ def changeValue(k: K, defaultValue: => V, mergeValue: (V) => V): V = {
+ val pos = _keySet.addWithoutResize(k)
+ if ((pos & OpenHashSet.NONEXISTENCE_MASK) != 0) {
+ val newValue = defaultValue
+ _values(pos & OpenHashSet.POSITION_MASK) = newValue
+ _keySet.rehashIfNeeded(k, grow, move)
+ newValue
+ } else {
+ _values(pos) = mergeValue(_values(pos))
+ _values(pos)
+ }
+ }
+
+ override def iterator = new Iterator[(K, V)] {
+ var pos = 0
+ var nextPair: (K, V) = computeNextPair()
+
+ /** Get the next value we should return from next(), or null if we're finished iterating */
+ def computeNextPair(): (K, V) = {
+ pos = _keySet.nextPos(pos)
+ if (pos >= 0) {
+ val ret = (_keySet.getValue(pos), _values(pos))
+ pos += 1
+ ret
+ } else {
+ null
+ }
+ }
+
+ def hasNext = nextPair != null
+
+ def next() = {
+ val pair = nextPair
+ nextPair = computeNextPair()
+ pair
+ }
+ }
+
+ // The following member variables are declared as protected instead of private for the
+ // specialization to work (specialized class extends the unspecialized one and needs access
+ // to the "private" variables).
+ // They also should have been val's. We use var's because there is a Scala compiler bug that
+ // would throw illegal access error at runtime if they are declared as val's.
+ protected var grow = (newCapacity: Int) => {
+ _oldValues = _values
+ _values = new Array[V](newCapacity)
+ }
+
+ protected var move = (oldPos: Int, newPos: Int) => {
+ _values(newPos) = _oldValues(oldPos)
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala
index 0000000,369519c..465c221
mode 000000,100644..100644
--- a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala
@@@ -1,0 -1,51 +1,53 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.spark.util.collection
+
++import scala.reflect.ClassTag
++
+ /** Provides a simple, non-threadsafe, array-backed vector that can store primitives. */
+ private[spark]
-class PrimitiveVector[@specialized(Long, Int, Double) V: ClassManifest](initialSize: Int = 64) {
++class PrimitiveVector[@specialized(Long, Int, Double) V: ClassTag](initialSize: Int = 64) {
+ private var numElements = 0
+ private var array: Array[V] = _
+
+ // NB: This must be separate from the declaration, otherwise the specialized parent class
+ // will get its own array with the same initial size. TODO: Figure out why...
+ array = new Array[V](initialSize)
+
+ def apply(index: Int): V = {
+ require(index < numElements)
+ array(index)
+ }
+
+ def +=(value: V) {
+ if (numElements == array.length) { resize(array.length * 2) }
+ array(numElements) = value
+ numElements += 1
+ }
+
+ def length = numElements
+
+ def getUnderlyingArray = array
+
+ /** Resizes the array, dropping elements if the total length decreases. */
+ def resize(newLength: Int) {
+ val newArray = new Array[V](newLength)
+ array.copyToArray(newArray)
+ array = newArray
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/test/scala/org/apache/spark/DistributedSuite.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
----------------------------------------------------------------------
diff --cc core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 0d8742c,1fd7642..2e41438
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@@ -89,13 -134,49 +134,49 @@@ class SparkListenerSuite extends FunSui
}
}
- def checkNonZeroAvg(m: Traversable[Long], msg: String) {
- assert(m.sum / m.size.toDouble > 0.0, msg)
+ test("onTaskGettingResult() called when result fetched remotely") {
+ // Need to use local cluster mode here, because results are not ever returned through the
+ // block manager when using the LocalScheduler.
+ sc = new SparkContext("local-cluster[1,1,512]", "test")
+
+ val listener = new SaveTaskEvents
+ sc.addSparkListener(listener)
+
+ // Make a task whose result is larger than the akka frame size
+ System.setProperty("spark.akka.frameSize", "1")
+ val akkaFrameSize =
- sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt
++ sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt
+ val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x,y) => x)
+ assert(result === 1.to(akkaFrameSize).toArray)
+
+ assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+ val TASK_INDEX = 0
+ assert(listener.startedTasks.contains(TASK_INDEX))
+ assert(listener.startedGettingResultTasks.contains(TASK_INDEX))
+ assert(listener.endedTasks.contains(TASK_INDEX))
}
- def isStage(stageInfo: StageInfo, rddNames: Set[String], excludedNames: Set[String]) = {
- val names = Set(stageInfo.stage.rdd.name) ++ stageInfo.stage.rdd.dependencies.map{_.rdd.name}
- !names.intersect(rddNames).isEmpty && names.intersect(excludedNames).isEmpty
+ test("onTaskGettingResult() not called when result sent directly") {
+ // Need to use local cluster mode here, because results are not ever returned through the
+ // block manager when using the LocalScheduler.
+ sc = new SparkContext("local-cluster[1,1,512]", "test")
+
+ val listener = new SaveTaskEvents
+ sc.addSparkListener(listener)
+
+ // Make a task whose result is larger than the akka frame size
+ val result = sc.parallelize(Seq(1), 1).map(x => 2 * x).reduce((x, y) => x)
+ assert(result === 2)
+
+ assert(sc.dagScheduler.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+ val TASK_INDEX = 0
+ assert(listener.startedTasks.contains(TASK_INDEX))
+ assert(listener.startedGettingResultTasks.isEmpty == true)
+ assert(listener.endedTasks.contains(TASK_INDEX))
+ }
+
+ def checkNonZeroAvg(m: Traversable[Long], msg: String) {
+ assert(m.sum / m.size.toDouble > 0.0, msg)
}
class SaveStageInfo extends SparkListener {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/examples/pom.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/pom.xml
----------------------------------------------------------------------
diff --cc pom.xml
index 9278856,53ac82e..72b9549
--- a/pom.xml
+++ b/pom.xml
@@@ -227,13 -259,36 +261,36 @@@
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
- <artifactId>akka-actor</artifactId>
++ <artifactId>akka-actor_${scala-short.version}</artifactId>
+ <version>${akka.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-remote</artifactId>
+ <artifactId>akka-remote_${scala-short.version}</artifactId>
<version>${akka.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
- <artifactId>akka-slf4j</artifactId>
+ <artifactId>akka-slf4j_${scala-short.version}</artifactId>
<version>${akka.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>it.unimi.dsi</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/project/SparkBuild.scala
----------------------------------------------------------------------
diff --cc project/SparkBuild.scala
index 43db97f,45fd30a..b71e1b3
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@@ -74,13 -76,16 +76,16 @@@ object SparkBuild extends Build
// Conditionally include the yarn sub-project
lazy val maybeYarn = if(isYarnEnabled) Seq[ClasspathDependency](yarn) else Seq[ClasspathDependency]()
lazy val maybeYarnRef = if(isYarnEnabled) Seq[ProjectReference](yarn) else Seq[ProjectReference]()
- lazy val allProjects = Seq[ProjectReference](
- core, repl, examples, bagel, streaming, mllib, tools, assemblyProj) ++ maybeYarnRef
+
+ // Everything except assembly, tools and examples belong to packageProjects
+ lazy val packageProjects = Seq[ProjectReference](core, repl, bagel, streaming, mllib) ++ maybeYarnRef
+
+ lazy val allProjects = packageProjects ++ Seq[ProjectReference](examples, tools, assemblyProj)
def sharedSettings = Defaults.defaultSettings ++ Seq(
- organization := "org.apache.spark",
- version := "0.9.0-incubating-SNAPSHOT",
- scalaVersion := "2.9.3",
+ organization := "org.apache.spark",
+ version := "0.9.0-incubating-SNAPSHOT",
+ scalaVersion := "2.10.3",
scalacOptions := Seq("-Xmax-classfile-name", "120", "-unchecked", "-deprecation",
"-target:" + SCALAC_JVM_VERSION),
javacOptions := Seq("-target", JAVAC_JVM_VERSION, "-source", JAVAC_JVM_VERSION),
@@@ -194,34 -201,37 +205,34 @@@
),
libraryDependencies ++= Seq(
- "com.google.guava" % "guava" % "14.0.1",
- "com.google.code.findbugs" % "jsr305" % "1.3.9",
- "log4j" % "log4j" % "1.2.17",
- "org.slf4j" % "slf4j-api" % slf4jVersion,
- "org.slf4j" % "slf4j-log4j12" % slf4jVersion,
- "commons-daemon" % "commons-daemon" % "1.0.10", // workaround for bug HADOOP-9407
- "com.ning" % "compress-lzf" % "0.8.4",
- "org.xerial.snappy" % "snappy-java" % "1.0.5",
- "org.ow2.asm" % "asm" % "4.0",
- "com.google.protobuf" % "protobuf-java" % "2.4.1",
- "com.typesafe.akka" % "akka-actor" % "2.0.5" excludeAll(excludeNetty),
- "com.typesafe.akka" % "akka-remote" % "2.0.5" excludeAll(excludeNetty),
- "com.typesafe.akka" % "akka-slf4j" % "2.0.5" excludeAll(excludeNetty),
- "it.unimi.dsi" % "fastutil" % "6.4.4",
- "colt" % "colt" % "1.2.0",
- "net.liftweb" % "lift-json_2.9.2" % "2.5",
- "org.apache.mesos" % "mesos" % "0.13.0",
- "io.netty" % "netty-all" % "4.0.0.Beta2",
- "org.apache.derby" % "derby" % "10.4.2.0" % "test",
- "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
- "net.java.dev.jets3t" % "jets3t" % "0.7.1",
- "org.apache.avro" % "avro" % "1.7.4",
- "org.apache.avro" % "avro-ipc" % "1.7.4" excludeAll(excludeNetty),
- "org.apache.zookeeper" % "zookeeper" % "3.4.5" excludeAll(excludeNetty),
- "com.codahale.metrics" % "metrics-core" % "3.0.0",
- "com.codahale.metrics" % "metrics-jvm" % "3.0.0",
- "com.codahale.metrics" % "metrics-json" % "3.0.0",
- "com.codahale.metrics" % "metrics-ganglia" % "3.0.0",
- "com.twitter" % "chill_2.9.3" % "0.3.1",
- "com.twitter" % "chill-java" % "0.3.1"
- )
+ "com.google.guava" % "guava" % "14.0.1",
+ "com.google.code.findbugs" % "jsr305" % "1.3.9",
+ "log4j" % "log4j" % "1.2.17",
+ "org.slf4j" % "slf4j-api" % slf4jVersion,
+ "org.slf4j" % "slf4j-log4j12" % slf4jVersion,
+ "com.ning" % "compress-lzf" % "0.8.4",
+ "org.xerial.snappy" % "snappy-java" % "1.0.5",
+ "commons-daemon" % "commons-daemon" % "1.0.10", // workaround for bug HADOOP-9407
+ "org.ow2.asm" % "asm" % "4.0",
+ "com.google.protobuf" % "protobuf-java" % "2.4.1",
+ "com.typesafe.akka" %% "akka-remote" % "2.2.3" excludeAll(excludeNetty),
+ "com.typesafe.akka" %% "akka-slf4j" % "2.2.3" excludeAll(excludeNetty),
+ "net.liftweb" %% "lift-json" % "2.5.1" excludeAll(excludeNetty),
+ "it.unimi.dsi" % "fastutil" % "6.4.4",
+ "colt" % "colt" % "1.2.0",
+ "org.apache.mesos" % "mesos" % "0.13.0",
+ "net.java.dev.jets3t" % "jets3t" % "0.7.1",
+ "org.apache.derby" % "derby" % "10.4.2.0" % "test",
- "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm),
++ "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
+ "org.apache.avro" % "avro" % "1.7.4",
+ "org.apache.avro" % "avro-ipc" % "1.7.4" excludeAll(excludeNetty),
+ "com.codahale.metrics" % "metrics-core" % "3.0.0",
+ "com.codahale.metrics" % "metrics-jvm" % "3.0.0",
+ "com.codahale.metrics" % "metrics-json" % "3.0.0",
+ "com.codahale.metrics" % "metrics-ganglia" % "3.0.0",
+ "com.twitter" %% "chill" % "0.3.1",
+ "com.twitter" % "chill-java" % "0.3.1"
+ )
)
def rootSettings = sharedSettings ++ Seq(
@@@ -272,11 -280,19 +283,22 @@@
def streamingSettings = sharedSettings ++ Seq(
name := "spark-streaming",
+ resolvers ++= Seq(
+ "Akka Repository" at "http://repo.akka.io/releases/",
+ "Apache repo" at "https://repository.apache.org/content/repositories/releases"
+ ),
++
libraryDependencies ++= Seq(
- "org.eclipse.paho" % "mqtt-client" % "0.4.0",
- "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy),
- "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty),
- "com.typesafe.akka" % "akka-zeromq" % "2.0.5" excludeAll(excludeNetty),
- "org.apache.kafka" % "kafka_2.9.2" % "0.8.0-beta1"
+ "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy),
++ "com.sksamuel.kafka" %% "kafka" % "0.8.0-beta1"
+ exclude("com.sun.jdmk", "jmxtools")
+ exclude("com.sun.jmx", "jmxri")
+ exclude("net.sf.jopt-simple", "jopt-simple")
++ excludeAll(excludeNetty),
++ "org.eclipse.paho" % "mqtt-client" % "0.4.0",
+ "com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty),
+ "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty),
+ "com.typesafe.akka" %% "akka-zeromq" % "2.2.3" excludeAll(excludeNetty)
)
)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
----------------------------------------------------------------------
diff --cc repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index 988b624,0ced284..43e504c
--- a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@@ -848,77 -827,28 +862,77 @@@ class SparkILoop(in0: Option[BufferedRe
}
}
- def initializeSpark() {
- intp.beQuietDuring {
- command("""
- org.apache.spark.repl.Main.interp.out.println("Creating SparkContext...");
- org.apache.spark.repl.Main.interp.out.flush();
- @transient val sc = org.apache.spark.repl.Main.interp.createSparkContext();
- org.apache.spark.repl.Main.interp.out.println("Spark context available as sc.");
- org.apache.spark.repl.Main.interp.out.flush();
- """)
- command("import org.apache.spark.SparkContext._")
+ val u: scala.reflect.runtime.universe.type = scala.reflect.runtime.universe
+ val m = u.runtimeMirror(getClass.getClassLoader)
+ private def tagOfStaticClass[T: ClassTag]: u.TypeTag[T] =
+ u.TypeTag[T](
+ m,
+ new TypeCreator {
+ def apply[U <: ApiUniverse with Singleton](m: Mirror[U]): U # Type =
+ m.staticClass(classTag[T].runtimeClass.getName).toTypeConstructor.asInstanceOf[U # Type]
+ })
+
+ def process(settings: Settings): Boolean = savingContextLoader {
+ this.settings = settings
+ createInterpreter()
+
+ // sets in to some kind of reader depending on environmental cues
+ in = in0 match {
+ case Some(reader) => SimpleReader(reader, out, true)
+ case None =>
+ // some post-initialization
+ chooseReader(settings) match {
+ case x: SparkJLineReader => addThunk(x.consoleReader.postInit) ; x
+ case x => x
+ }
}
- echo("Type in expressions to have them evaluated.")
- echo("Type :help for more information.")
- }
+ lazy val tagOfSparkIMain = tagOfStaticClass[org.apache.spark.repl.SparkIMain]
+ // Bind intp somewhere out of the regular namespace where
+ // we can get at it in generated code.
+ addThunk(intp.quietBind(NamedParam[SparkIMain]("$intp", intp)(tagOfSparkIMain, classTag[SparkIMain])))
+ addThunk({
+ import scala.tools.nsc.io._
+ import Properties.userHome
+ import scala.compat.Platform.EOL
+ val autorun = replProps.replAutorunCode.option flatMap (f => io.File(f).safeSlurp())
+ if (autorun.isDefined) intp.quietRun(autorun.get)
+ })
+
+ addThunk(printWelcome())
+ addThunk(initializeSpark())
+
+ loadFiles(settings)
+ // it is broken on startup; go ahead and exit
+ if (intp.reporter.hasErrors)
+ return false
- var sparkContext: SparkContext = null
+ // This is about the illusion of snappiness. We call initialize()
+ // which spins off a separate thread, then print the prompt and try
+ // our best to look ready. The interlocking lazy vals tend to
+ // inter-deadlock, so we break the cycle with a single asynchronous
+ // message to an actor.
+ if (isAsync) {
+ intp initialize initializedCallback()
+ createAsyncListener() // listens for signal to run postInitialization
+ }
+ else {
+ intp.initializeSynchronous()
+ postInitialization()
+ }
+ // printWelcome()
+
+ try loop()
+ catch AbstractOrMissingHandler()
+ finally closeInterpreter()
+
+ true
+ }
def createSparkContext(): SparkContext = {
- val uri = System.getenv("SPARK_EXECUTOR_URI")
- if (uri != null) {
- System.setProperty("spark.executor.uri", uri)
- }
+ val uri = System.getenv("SPARK_EXECUTOR_URI")
+ if (uri != null) {
+ System.setProperty("spark.executor.uri", uri)
+ }
val master = this.master match {
case Some(m) => m
case None => {
@@@ -926,9 -856,17 +940,17 @@@
if (prop != null) prop else "local"
}
}
- val jars = SparkILoop.getAddedJars.map(new java.io.File(_).getAbsolutePath)
- sparkContext = new SparkContext(master, "Spark shell", System.getenv("SPARK_HOME"), jars)
- echo("Created spark context..")
+ val jars = Option(System.getenv("ADD_JARS")).map(_.split(','))
- .getOrElse(new Array[String](0))
- .map(new java.io.File(_).getAbsolutePath)
++ .getOrElse(new Array[String](0))
++ .map(new java.io.File(_).getAbsolutePath)
+ try {
+ sparkContext = new SparkContext(master, "Spark shell", System.getenv("SPARK_HOME"), jars)
+ } catch {
+ case e: Exception =>
+ e.printStackTrace()
+ echo("Failed to create SparkContext, exiting...")
+ sys.exit(1)
+ }
sparkContext
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/spark-class
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/streaming/pom.xml
----------------------------------------------------------------------
diff --cc streaming/pom.xml
index 3f2033f,7a9ae6a..fb15681
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@@ -87,8 -111,14 +111,14 @@@
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
- <artifactId>akka-zeromq</artifactId>
- <version>2.0.3</version>
+ <artifactId>akka-zeromq_${scala-short.version}</artifactId>
+ <version>${akka.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
@@@ -111,13 -141,14 +141,18 @@@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
++ <dependency>
+ <groupId>org.eclipse.paho</groupId>
+ <artifactId>mqtt-client</artifactId>
+ <version>0.4.0</version>
+ </dependency>
</dependencies>
<build>
- <outputDirectory>target/scala-${scala.version}/classes</outputDirectory>
- <testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory>
+ <outputDirectory>target/scala-${scala-short.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala-short.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.scalatest</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
----------------------------------------------------------------------
diff --cc streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
index cd404fd,9ceff75..329d2b5
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
@@@ -38,7 -37,7 +38,7 @@@ import org.apache.hadoop.conf.Configura
/**
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
-- * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.RDD]]
++ * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.rdd.RDD]]
* for more details on RDDs). DStreams can either be created from live data (such as, data from
* HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations
* such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each
@@@ -488,26 -494,62 +495,60 @@@ abstract class DStream[T: ClassTag]
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
- * this DStream will be registered as an output stream and therefore materialized.
+ * 'this' DStream will be registered as an output stream and therefore materialized.
*/
def foreach(foreachFunc: (RDD[T], Time) => Unit) {
- val newStream = new ForEachDStream(this, context.sparkContext.clean(foreachFunc))
- ssc.registerOutputStream(newStream)
- newStream
+ ssc.registerOutputStream(new ForEachDStream(this, context.sparkContext.clean(foreachFunc)))
}
/**
* Return a new DStream in which each RDD is generated by applying a function
- * on each RDD of this DStream.
+ * on each RDD of 'this' DStream.
*/
- def transform[U: ClassManifest](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
+ def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
- transform((r: RDD[T], t: Time) => transformFunc(r))
+ transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r)))
}
/**
* Return a new DStream in which each RDD is generated by applying a function
- * on each RDD of this DStream.
+ * on each RDD of 'this' DStream.
*/
- def transform[U: ClassManifest](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
+ def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
- new TransformedDStream(this, context.sparkContext.clean(transformFunc))
+ //new TransformedDStream(this, context.sparkContext.clean(transformFunc))
+ val cleanedF = context.sparkContext.clean(transformFunc)
+ val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
+ assert(rdds.length == 1)
+ cleanedF(rdds.head.asInstanceOf[RDD[T]], time)
+ }
+ new TransformedDStream[U](Seq(this), realTransformFunc)
+ }
+
+ /**
+ * Return a new DStream in which each RDD is generated by applying a function
+ * on each RDD of 'this' DStream and 'other' DStream.
+ */
- def transformWith[U: ClassManifest, V: ClassManifest](
++ def transformWith[U: ClassTag, V: ClassTag](
+ other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]
+ ): DStream[V] = {
+ val cleanedF = ssc.sparkContext.clean(transformFunc)
+ transformWith(other, (rdd1: RDD[T], rdd2: RDD[U], time: Time) => cleanedF(rdd1, rdd2))
+ }
+
+ /**
+ * Return a new DStream in which each RDD is generated by applying a function
+ * on each RDD of 'this' DStream and 'other' DStream.
+ */
- def transformWith[U: ClassManifest, V: ClassManifest](
++ def transformWith[U: ClassTag, V: ClassTag](
+ other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]
+ ): DStream[V] = {
+ val cleanedF = ssc.sparkContext.clean(transformFunc)
+ val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
+ assert(rdds.length == 2)
+ val rdd1 = rdds(0).asInstanceOf[RDD[T]]
+ val rdd2 = rdds(1).asInstanceOf[RDD[U]]
+ cleanedF(rdd1, rdd2, time)
+ }
+ new TransformedDStream[V](Seq(this, other), realTransformFunc)
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala
----------------------------------------------------------------------
diff --cc streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala
index b761646,b97fb7e..66fe6e7
--- a/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala
@@@ -28,7 -28,9 +28,9 @@@ import scala.collection.mutable.Queu
import akka.actor._
import akka.pattern.ask
-import akka.util.duration._
+import scala.concurrent.duration._
+ import akka.dispatch._
+ import org.apache.spark.storage.BlockId
private[streaming] sealed trait NetworkInputTrackerMessage
private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
----------------------------------------------------------------------
diff --cc streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
index f021e29,8c12fd1..ea5c165
--- a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
@@@ -18,9 -18,9 +18,7 @@@
package org.apache.spark.streaming
import org.apache.spark.streaming.StreamingContext._
--import org.apache.spark.streaming.dstream.{ReducedWindowedDStream, StateDStream}
- import org.apache.spark.streaming.dstream.{CoGroupedDStream, ShuffledDStream}
- import org.apache.spark.streaming.dstream.{MapValuedDStream, FlatMapValuedDStream}
-import org.apache.spark.streaming.dstream.{ShuffledDStream}
-import org.apache.spark.streaming.dstream.{MapValuedDStream, FlatMapValuedDStream}
++import org.apache.spark.streaming.dstream._
import org.apache.spark.{Partitioner, HashPartitioner}
import org.apache.spark.SparkContext._
@@@ -35,8 -34,8 +33,9 @@@ import org.apache.hadoop.mapreduce.{Out
import org.apache.hadoop.mapred.OutputFormat
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.conf.Configuration
++import scala.Some
-class PairDStreamFunctions[K: ClassManifest, V: ClassManifest](self: DStream[(K,V)])
+class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
extends Serializable {
private[streaming] def ssc = self.ssc
@@@ -399,33 -398,46 +398,46 @@@
new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner)
}
-
+ /**
+ * Return a new DStream by applying a map function to the value of each key-value pairs in
+ * 'this' DStream without changing the key.
+ */
- def mapValues[U: ClassManifest](mapValuesFunc: V => U): DStream[(K, U)] = {
+ def mapValues[U: ClassTag](mapValuesFunc: V => U): DStream[(K, U)] = {
new MapValuedDStream[K, V, U](self, mapValuesFunc)
}
+ /**
+ * Return a new DStream by applying a flatmap function to the value of each key-value pairs in
+ * 'this' DStream without changing the key.
+ */
- def flatMapValues[U: ClassManifest](
+ def flatMapValues[U: ClassTag](
flatMapValuesFunc: V => TraversableOnce[U]
): DStream[(K, U)] = {
new FlatMapValuedDStream[K, V, U](self, flatMapValuesFunc)
}
/**
- * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this`
- * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that
- * key in both RDDs. HashPartitioner is used to partition each generated RDD into default number
+ * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
+ * Hash partitioning is used to generate the RDDs with Spark's default number
* of partitions.
*/
- def cogroup[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = {
+ def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = {
cogroup(other, defaultPartitioner())
}
/**
- * Cogroup `this` DStream with `other` DStream using a partitioner. For each key k in corresponding RDDs of `this`
- * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that
- * key in both RDDs. Partitioner is used to partition each generated RDD.
+ * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ */
- def cogroup[W: ClassManifest](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (Seq[V], Seq[W]))] = {
++ def cogroup[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (Seq[V], Seq[W]))] = {
+ cogroup(other, defaultPartitioner(numPartitions))
+ }
+
+ /**
+ * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream.
+ * The supplied [[org.apache.spark.Partitioner]] is used to partition the generated RDDs.
*/
- def cogroup[W: ClassManifest](
+ def cogroup[W: ClassTag](
other: DStream[(K, W)],
partitioner: Partitioner
): DStream[(K, (Seq[V], Seq[W]))] = {
@@@ -441,31 -448,105 +448,105 @@@
}
/**
- * Join `this` DStream with `other` DStream. HashPartitioner is used
- * to partition each generated RDD into default number of partitions.
+ * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
+ * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
*/
- def join[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (V, W))] = {
+ def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] = {
join[W](other, defaultPartitioner())
}
/**
- * Join `this` DStream with `other` DStream, that is, each RDD of the new DStream will
- * be generated by joining RDDs from `this` and other DStream. Uses the given
- * Partitioner to partition each generated RDD.
+ * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ */
- def join[W: ClassManifest](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))] = {
++ def join[W: ClassTag](other: DStream[(K, W)], numPartitions: Int): DStream[(K, (V, W))] = {
+ join[W](other, defaultPartitioner(numPartitions))
+ }
+
+ /**
+ * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
+ * The supplied [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD.
*/
- def join[W: ClassManifest](
+ def join[W: ClassTag](
other: DStream[(K, W)],
partitioner: Partitioner
): DStream[(K, (V, W))] = {
- this.cogroup(other, partitioner)
- .flatMapValues{
- case (vs, ws) =>
- for (v <- vs.iterator; w <- ws.iterator) yield (v, w)
- }
+ self.transformWith(
+ other,
+ (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.join(rdd2, partitioner)
+ )
+ }
+
+ /**
+ * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
+ * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
+ * number of partitions.
+ */
- def leftOuterJoin[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))] = {
++ def leftOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, Option[W]))] = {
+ leftOuterJoin[W](other, defaultPartitioner())
+ }
+
+ /**
+ * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
+ * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
+ * partitions.
+ */
- def leftOuterJoin[W: ClassManifest](
++ def leftOuterJoin[W: ClassTag](
+ other: DStream[(K, W)],
+ numPartitions: Int
+ ): DStream[(K, (V, Option[W]))] = {
+ leftOuterJoin[W](other, defaultPartitioner(numPartitions))
+ }
+
+ /**
+ * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and
+ * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
+ * the partitioning of each RDD.
+ */
- def leftOuterJoin[W: ClassManifest](
++ def leftOuterJoin[W: ClassTag](
+ other: DStream[(K, W)],
+ partitioner: Partitioner
+ ): DStream[(K, (V, Option[W]))] = {
+ self.transformWith(
+ other,
+ (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.leftOuterJoin(rdd2, partitioner)
+ )
+ }
+
+ /**
+ * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
+ * `other` DStream. Hash partitioning is used to generate the RDDs with Spark's default
+ * number of partitions.
+ */
- def rightOuterJoin[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Option[V], W))] = {
++ def rightOuterJoin[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Option[V], W))] = {
+ rightOuterJoin[W](other, defaultPartitioner())
+ }
+
+ /**
+ * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
+ * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions`
+ * partitions.
+ */
- def rightOuterJoin[W: ClassManifest](
++ def rightOuterJoin[W: ClassTag](
+ other: DStream[(K, W)],
+ numPartitions: Int
+ ): DStream[(K, (Option[V], W))] = {
+ rightOuterJoin[W](other, defaultPartitioner(numPartitions))
+ }
+
+ /**
+ * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and
+ * `other` DStream. The supplied [[org.apache.spark.Partitioner]] is used to control
+ * the partitioning of each RDD.
+ */
- def rightOuterJoin[W: ClassManifest](
++ def rightOuterJoin[W: ClassTag](
+ other: DStream[(K, W)],
+ partitioner: Partitioner
+ ): DStream[(K, (Option[V], W))] = {
+ self.transformWith(
+ other,
+ (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.rightOuterJoin(rdd2, partitioner)
+ )
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --cc streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index c722aa1,70bf902..d2c4fde
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@@ -268,7 -274,11 +276,11 @@@ class StreamingContext private
* in its own thread.
* @param storageLevel Storage level to use for storing the received objects
*/
- def kafkaStream[T: ClassTag, D <: kafka.serializer.Decoder[_]: Manifest](
+ def kafkaStream[
- K: ClassManifest,
- V: ClassManifest,
++ K: ClassTag,
++ V: ClassTag,
+ U <: kafka.serializer.Decoder[_]: Manifest,
+ T <: kafka.serializer.Decoder[_]: Manifest](
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
@@@ -452,14 -462,40 +464,40 @@@
inputStream
}
+ /**
+ * Create an input stream that receives messages pushed by a mqtt publisher.
+ * @param brokerUrl Url of remote mqtt publisher
+ * @param topic topic name to subscribe to
+ * @param storageLevel RDD storage level. Defaults to memory-only.
+ */
+
+ def mqttStream(
+ brokerUrl: String,
+ topic: String,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2): DStream[String] = {
+ val inputStream = new MQTTInputDStream[String](this, brokerUrl, topic, storageLevel)
+ registerInputStream(inputStream)
+ inputStream
+ }
/**
- * Create a unified DStream from multiple DStreams of the same type and same interval
+ * Create a unified DStream from multiple DStreams of the same type and same slide duration.
*/
- def union[T: ClassManifest](streams: Seq[DStream[T]]): DStream[T] = {
+ def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T] = {
new UnionDStream[T](streams.toArray)
}
/**
+ * Create a new DStream in which each RDD is generated by applying a function on RDDs of
+ * the DStreams.
+ */
- def transform[T: ClassManifest](
++ def transform[T: ClassTag](
+ dstreams: Seq[DStream[_]],
+ transformFunc: (Seq[RDD[_]], Time) => RDD[T]
+ ): DStream[T] = {
+ new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc))
+ }
+
+ /**
* Register an input stream that will be started (InputDStream.start() called) to get the
* input data.
*/
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
----------------------------------------------------------------------
diff --cc streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
index 0d54d78,1a2aeaa..d29033d
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
@@@ -23,11 -23,9 +23,11 @@@ import org.apache.spark.api.java.JavaRD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.rdd.RDD
+import scala.reflect.ClassTag
+
/**
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
-- * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.RDD]]
++ * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.rdd.RDD]]
* for more details on RDDs). DStreams can either be created from live data (such as, data from
* HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations
* such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0f2e3c6e/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
----------------------------------------------------------------------
diff --cc streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index 4508e48,09189ea..64f38ce
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@@ -255,11 -257,11 +258,11 @@@ trait JavaDStreamLike[T, This <: JavaDS
/**
* Return a new DStream in which each RDD is generated by applying a function
- * on each RDD of this DStream.
+ * on each RDD of 'this' DStream.
*/
def transform[U](transformFunc: JFunction[R, JavaRDD[U]]): JavaDStream[U] = {
- implicit val cm: ClassManifest[U] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
+ implicit val cm: ClassTag[U] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
def scalaTransform (in: RDD[T]): RDD[U] =
transformFunc.call(wrapRDD(in)).rdd
dstream.transform(scalaTransform(_))
@@@ -267,11 -269,11 +270,11 @@@
/**
* Return a new DStream in which each RDD is generated by applying a function
- * on each RDD of this DStream.
+ * on each RDD of 'this' DStream.
*/
def transform[U](transformFunc: JFunction2[R, Time, JavaRDD[U]]): JavaDStream[U] = {
- implicit val cm: ClassManifest[U] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
+ implicit val cm: ClassTag[U] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
def scalaTransform (in: RDD[T], time: Time): RDD[U] =
transformFunc.call(wrapRDD(in), time).rdd
dstream.transform(scalaTransform(_, _))
@@@ -308,6 -310,82 +311,82 @@@
}
/**
+ * Return a new DStream in which each RDD is generated by applying a function
+ * on each RDD of 'this' DStream and 'other' DStream.
+ */
+ def transformWith[U, W](
+ other: JavaDStream[U],
+ transformFunc: JFunction3[R, JavaRDD[U], Time, JavaRDD[W]]
+ ): JavaDStream[W] = {
- implicit val cmu: ClassManifest[U] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
- implicit val cmv: ClassManifest[W] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
++ implicit val cmu: ClassTag[U] =
++ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
++ implicit val cmv: ClassTag[W] =
++ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ def scalaTransform (inThis: RDD[T], inThat: RDD[U], time: Time): RDD[W] =
+ transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
+ dstream.transformWith[U, W](other.dstream, scalaTransform(_, _, _))
+ }
+
+ /**
+ * Return a new DStream in which each RDD is generated by applying a function
+ * on each RDD of 'this' DStream and 'other' DStream.
+ */
+ def transformWith[U, K2, V2](
+ other: JavaDStream[U],
+ transformFunc: JFunction3[R, JavaRDD[U], Time, JavaPairRDD[K2, V2]]
+ ): JavaPairDStream[K2, V2] = {
- implicit val cmu: ClassManifest[U] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
- implicit val cmk2: ClassManifest[K2] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
- implicit val cmv2: ClassManifest[V2] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
++ implicit val cmu: ClassTag[U] =
++ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
++ implicit val cmk2: ClassTag[K2] =
++ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
++ implicit val cmv2: ClassTag[V2] =
++ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
+ def scalaTransform (inThis: RDD[T], inThat: RDD[U], time: Time): RDD[(K2, V2)] =
+ transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
+ dstream.transformWith[U, (K2, V2)](other.dstream, scalaTransform(_, _, _))
+ }
+
+ /**
+ * Return a new DStream in which each RDD is generated by applying a function
+ * on each RDD of 'this' DStream and 'other' DStream.
+ */
+ def transformWith[K2, V2, W](
+ other: JavaPairDStream[K2, V2],
+ transformFunc: JFunction3[R, JavaPairRDD[K2, V2], Time, JavaRDD[W]]
+ ): JavaDStream[W] = {
- implicit val cmk2: ClassManifest[K2] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
- implicit val cmv2: ClassManifest[V2] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
- implicit val cmw: ClassManifest[W] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
++ implicit val cmk2: ClassTag[K2] =
++ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
++ implicit val cmv2: ClassTag[V2] =
++ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
++ implicit val cmw: ClassTag[W] =
++ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
+ def scalaTransform (inThis: RDD[T], inThat: RDD[(K2, V2)], time: Time): RDD[W] =
+ transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
+ dstream.transformWith[(K2, V2), W](other.dstream, scalaTransform(_, _, _))
+ }
+
+ /**
+ * Return a new DStream in which each RDD is generated by applying a function
+ * on each RDD of 'this' DStream and 'other' DStream.
+ */
+ def transformWith[K2, V2, K3, V3](
+ other: JavaPairDStream[K2, V2],
+ transformFunc: JFunction3[R, JavaPairRDD[K2, V2], Time, JavaPairRDD[K3, V3]]
+ ): JavaPairDStream[K3, V3] = {
- implicit val cmk2: ClassManifest[K2] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
- implicit val cmv2: ClassManifest[V2] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
- implicit val cmk3: ClassManifest[K3] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K3]]
- implicit val cmv3: ClassManifest[V3] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V3]]
++ implicit val cmk2: ClassTag[K2] =
++ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
++ implicit val cmv2: ClassTag[V2] =
++ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
++ implicit val cmk3: ClassTag[K3] =
++ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K3]]
++ implicit val cmv3: ClassTag[V3] =
++ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V3]]
+ def scalaTransform (inThis: RDD[T], inThat: RDD[(K2, V2)], time: Time): RDD[(K3, V3)] =
+ transformFunc.call(wrapRDD(inThis), other.wrapRDD(inThat), time).rdd
+ dstream.transformWith[(K2, V2), (K3, V3)](other.dstream, scalaTransform(_, _, _))
+ }
+
+ /**
* Enable periodic checkpointing of RDDs of this DStream
* @param interval Time interval after which generated RDD will be checkpointed
*/