You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:59:49 UTC
[65/69] [abbrv] git commit: Move some classes to more appropriate
packages:
Move some classes to more appropriate packages:
* RDD, *RDDFunctions -> org.apache.spark.rdd
* Utils, ClosureCleaner, SizeEstimator -> org.apache.spark.util
* JavaSerializer, KryoSerializer -> org.apache.spark.serializer
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/0a8cc309
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/0a8cc309
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/0a8cc309
Branch: refs/heads/branch-0.8
Commit: 0a8cc309211c62f8824d76618705c817edcf2424
Parents: 5b4dea2
Author: Matei Zaharia <ma...@eecs.berkeley.edu>
Authored: Sun Sep 1 00:32:28 2013 -0700
Committer: Matei Zaharia <ma...@eecs.berkeley.edu>
Committed: Sun Sep 1 14:13:16 2013 -0700
----------------------------------------------------------------------
.../scala/org/apache/spark/bagel/Bagel.scala | 2 +-
.../scala/org/apache/spark/Accumulators.scala | 1 +
.../scala/org/apache/spark/CacheManager.scala | 1 +
.../scala/org/apache/spark/ClosureCleaner.scala | 231 -----
.../scala/org/apache/spark/Dependency.scala | 2 +
.../org/apache/spark/DoubleRDDFunctions.scala | 78 --
.../scala/org/apache/spark/HttpFileServer.scala | 1 +
.../scala/org/apache/spark/HttpServer.scala | 1 +
.../scala/org/apache/spark/JavaSerializer.scala | 83 --
.../scala/org/apache/spark/KryoSerializer.scala | 156 ---
.../org/apache/spark/MapOutputTracker.scala | 2 +-
.../org/apache/spark/PairRDDFunctions.scala | 703 --------------
.../scala/org/apache/spark/Partitioner.scala | 3 +
core/src/main/scala/org/apache/spark/RDD.scala | 957 -------------------
.../org/apache/spark/RDDCheckpointData.scala | 130 ---
.../apache/spark/SequenceFileRDDFunctions.scala | 107 ---
.../scala/org/apache/spark/SizeEstimator.scala | 283 ------
.../scala/org/apache/spark/SparkContext.scala | 8 +-
.../main/scala/org/apache/spark/SparkEnv.scala | 6 +-
.../src/main/scala/org/apache/spark/Utils.scala | 780 ---------------
.../apache/spark/api/java/JavaDoubleRDD.scala | 2 +-
.../org/apache/spark/api/java/JavaPairRDD.scala | 2 +-
.../org/apache/spark/api/java/JavaRDD.scala | 1 +
.../org/apache/spark/api/java/JavaRDDLike.scala | 6 +-
.../spark/api/java/JavaSparkContext.scala | 6 +-
.../spark/api/python/PythonPartitioner.scala | 2 +-
.../org/apache/spark/api/python/PythonRDD.scala | 2 +
.../spark/broadcast/BitTorrentBroadcast.scala | 1 +
.../apache/spark/broadcast/HttpBroadcast.scala | 4 +-
.../apache/spark/broadcast/MultiTracker.scala | 1 +
.../apache/spark/broadcast/TreeBroadcast.scala | 1 +
.../org/apache/spark/deploy/DeployMessage.scala | 2 +-
.../apache/spark/deploy/LocalSparkCluster.scala | 4 +-
.../apache/spark/deploy/client/TestClient.scala | 4 +-
.../org/apache/spark/deploy/master/Master.scala | 4 +-
.../spark/deploy/master/MasterArguments.scala | 3 +-
.../apache/spark/deploy/master/WorkerInfo.scala | 2 +-
.../deploy/master/ui/ApplicationPage.scala | 2 +-
.../spark/deploy/master/ui/IndexPage.scala | 2 +-
.../spark/deploy/master/ui/MasterWebUI.scala | 3 +-
.../spark/deploy/worker/ExecutorRunner.scala | 3 +-
.../org/apache/spark/deploy/worker/Worker.scala | 4 +-
.../spark/deploy/worker/WorkerArguments.scala | 4 +-
.../spark/deploy/worker/ui/IndexPage.scala | 2 +-
.../spark/deploy/worker/ui/WorkerWebUI.scala | 3 +-
.../org/apache/spark/executor/Executor.scala | 1 +
.../spark/executor/MesosExecutorBackend.scala | 3 +-
.../executor/StandaloneExecutorBackend.scala | 4 +-
.../spark/network/ConnectionManager.scala | 1 +
.../spark/network/ConnectionManagerId.scala | 2 +-
.../main/scala/org/apache/spark/package.scala | 21 +-
.../partial/ApproximateActionListener.scala | 1 +
.../scala/org/apache/spark/rdd/BlockRDD.scala | 2 +-
.../org/apache/spark/rdd/CoGroupedRDD.scala | 2 +-
.../apache/spark/rdd/DoubleRDDFunctions.scala | 79 ++
.../scala/org/apache/spark/rdd/EmptyRDD.scala | 2 +-
.../org/apache/spark/rdd/FilteredRDD.scala | 2 +-
.../org/apache/spark/rdd/FlatMappedRDD.scala | 2 +-
.../apache/spark/rdd/FlatMappedValuesRDD.scala | 2 +-
.../scala/org/apache/spark/rdd/GlommedRDD.scala | 2 +-
.../scala/org/apache/spark/rdd/HadoopRDD.scala | 8 +-
.../scala/org/apache/spark/rdd/JdbcRDD.scala | 2 +-
.../org/apache/spark/rdd/MapPartitionsRDD.scala | 2 +-
.../spark/rdd/MapPartitionsWithIndexRDD.scala | 2 +-
.../scala/org/apache/spark/rdd/MappedRDD.scala | 2 +-
.../org/apache/spark/rdd/MappedValuesRDD.scala | 2 +-
.../org/apache/spark/rdd/NewHadoopRDD.scala | 2 +-
.../apache/spark/rdd/OrderedRDDFunctions.scala | 7 +-
.../org/apache/spark/rdd/PairRDDFunctions.scala | 702 ++++++++++++++
.../spark/rdd/ParallelCollectionRDD.scala | 2 +
.../apache/spark/rdd/PartitionPruningRDD.scala | 2 +-
.../scala/org/apache/spark/rdd/PipedRDD.scala | 2 +-
.../main/scala/org/apache/spark/rdd/RDD.scala | 942 ++++++++++++++++++
.../apache/spark/rdd/RDDCheckpointData.scala | 131 +++
.../scala/org/apache/spark/rdd/SampledRDD.scala | 2 +-
.../spark/rdd/SequenceFileRDDFunctions.scala | 89 ++
.../org/apache/spark/rdd/ShuffledRDD.scala | 2 +-
.../org/apache/spark/rdd/SubtractedRDD.scala | 1 -
.../scala/org/apache/spark/rdd/UnionRDD.scala | 2 +-
.../apache/spark/rdd/ZippedPartitionsRDD.scala | 2 +-
.../scala/org/apache/spark/rdd/ZippedRDD.scala | 2 +-
.../apache/spark/scheduler/DAGScheduler.scala | 1 +
.../spark/scheduler/DAGSchedulerEvent.scala | 1 +
.../org/apache/spark/scheduler/JobLogger.scala | 1 +
.../org/apache/spark/scheduler/ResultTask.scala | 7 +-
.../apache/spark/scheduler/ShuffleMapTask.scala | 2 +
.../apache/spark/scheduler/SparkListener.scala | 4 +-
.../org/apache/spark/scheduler/Stage.scala | 3 +-
.../org/apache/spark/scheduler/TaskResult.scala | 3 +-
.../cluster/ClusterTaskSetManager.scala | 2 +-
.../scheduler/cluster/SchedulerBackend.scala | 2 +-
.../cluster/SparkDeploySchedulerBackend.scala | 3 +-
.../cluster/StandaloneClusterMessage.scala | 3 +-
.../cluster/StandaloneSchedulerBackend.scala | 3 +-
.../spark/scheduler/cluster/TaskInfo.scala | 2 +-
.../spark/scheduler/local/LocalScheduler.scala | 1 +
.../mesos/CoarseMesosSchedulerBackend.scala | 2 +-
.../scheduler/mesos/MesosSchedulerBackend.scala | 3 +-
.../spark/serializer/JavaSerializer.scala | 82 ++
.../spark/serializer/KryoSerializer.scala | 159 +++
.../spark/storage/BlockFetcherIterator.scala | 2 +-
.../org/apache/spark/storage/BlockManager.scala | 4 +-
.../apache/spark/storage/BlockManagerId.scala | 2 +-
.../spark/storage/BlockManagerMasterActor.scala | 3 +-
.../spark/storage/BlockManagerWorker.scala | 3 +-
.../org/apache/spark/storage/DiskStore.scala | 2 +-
.../org/apache/spark/storage/MemoryStore.scala | 2 +-
.../org/apache/spark/storage/StorageUtils.scala | 3 +-
.../apache/spark/storage/ThreadingTest.scala | 2 +-
.../scala/org/apache/spark/ui/SparkUI.scala | 3 +-
.../org/apache/spark/ui/exec/ExecutorsUI.scala | 3 +-
.../spark/ui/jobs/JobProgressListener.scala | 2 +-
.../apache/spark/ui/jobs/JobProgressUI.scala | 3 +-
.../org/apache/spark/ui/jobs/StagePage.scala | 4 +-
.../org/apache/spark/ui/jobs/StageTable.scala | 2 +-
.../org/apache/spark/ui/storage/IndexPage.scala | 2 +-
.../org/apache/spark/ui/storage/RDDPage.scala | 2 +-
.../org/apache/spark/util/ClosureCleaner.scala | 232 +++++
.../org/apache/spark/util/MemoryParam.scala | 2 -
.../org/apache/spark/util/SizeEstimator.scala | 284 ++++++
.../scala/org/apache/spark/util/Utils.scala | 781 +++++++++++++++
.../org/apache/spark/CheckpointSuite.scala | 1 +
.../org/apache/spark/ClosureCleanerSuite.scala | 146 ---
.../scala/org/apache/spark/DriverSuite.scala | 1 +
.../scala/org/apache/spark/FailureSuite.scala | 1 +
.../org/apache/spark/KryoSerializerSuite.scala | 208 ----
.../apache/spark/PairRDDFunctionsSuite.scala | 299 ------
.../apache/spark/PartitionPruningRDDSuite.scala | 2 +-
.../org/apache/spark/PartitioningSuite.scala | 9 +-
.../test/scala/org/apache/spark/RDDSuite.scala | 389 --------
.../scala/org/apache/spark/ShuffleSuite.scala | 3 +-
.../org/apache/spark/SizeEstimatorSuite.scala | 164 ----
.../scala/org/apache/spark/SortingSuite.scala | 123 ---
.../scala/org/apache/spark/UtilsSuite.scala | 139 ---
.../spark/rdd/PairRDDFunctionsSuite.scala | 300 ++++++
.../scala/org/apache/spark/rdd/RDDSuite.scala | 391 ++++++++
.../org/apache/spark/rdd/SortingSuite.scala | 125 +++
.../spark/scheduler/DAGSchedulerSuite.scala | 2 +-
.../apache/spark/scheduler/JobLoggerSuite.scala | 6 +-
.../spark/scheduler/TaskContextSuite.scala | 2 +-
.../cluster/ClusterTaskSetManagerSuite.scala | 2 +-
.../spark/serializer/KryoSerializerSuite.scala | 208 ++++
.../spark/storage/BlockManagerSuite.scala | 8 +-
.../apache/spark/util/ClosureCleanerSuite.scala | 147 +++
.../apache/spark/util/SizeEstimatorSuite.scala | 164 ++++
.../org/apache/spark/util/UtilsSuite.scala | 139 +++
docs/configuration.md | 10 +-
docs/quick-start.md | 2 +-
docs/scala-programming-guide.md | 2 +-
docs/tuning.md | 10 +-
.../spark/examples/bagel/PageRankUtils.scala | 1 +
.../examples/bagel/WikipediaPageRank.scala | 2 +-
.../bagel/WikipediaPageRankStandalone.scala | 17 +-
.../spark/streaming/examples/QueueStream.scala | 2 +-
.../streaming/examples/RawNetworkGrep.scala | 2 +-
.../classification/ClassificationModel.scala | 2 +-
.../classification/LogisticRegression.scala | 3 +-
.../apache/spark/mllib/classification/SVM.scala | 3 +-
.../apache/spark/mllib/clustering/KMeans.scala | 3 +-
.../spark/mllib/clustering/KMeansModel.scala | 2 +-
.../mllib/optimization/GradientDescent.scala | 8 +-
.../spark/mllib/optimization/Optimizer.scala | 2 +-
.../apache/spark/mllib/recommendation/ALS.scala | 7 +-
.../MatrixFactorizationModel.scala | 2 +-
.../regression/GeneralizedLinearAlgorithm.scala | 3 +-
.../apache/spark/mllib/regression/Lasso.scala | 3 +-
.../mllib/regression/LinearRegression.scala | 3 +-
.../mllib/regression/RegressionModel.scala | 2 +-
.../mllib/regression/RidgeRegression.scala | 3 +-
.../spark/mllib/util/DataValidators.scala | 3 +-
.../spark/mllib/util/KMeansDataGenerator.scala | 3 +-
.../spark/mllib/util/LinearDataGenerator.scala | 3 +-
.../util/LogisticRegressionDataGenerator.scala | 3 +-
.../spark/mllib/util/MFDataGenerator.scala | 3 +-
.../org/apache/spark/mllib/util/MLUtils.scala | 3 +-
.../spark/mllib/util/SVMDataGenerator.scala | 5 +-
python/pyspark/context.py | 4 +-
.../org/apache/spark/repl/SparkIMain.scala | 2 +-
.../org/apache/spark/streaming/DStream.scala | 3 +-
.../org/apache/spark/streaming/Duration.scala | 2 +-
.../spark/streaming/PairDStreamFunctions.scala | 9 +-
.../spark/streaming/StreamingContext.scala | 1 +
.../spark/streaming/api/java/JavaDStream.scala | 2 +-
.../streaming/api/java/JavaDStreamLike.scala | 2 +-
.../streaming/api/java/JavaPairDStream.scala | 7 +-
.../api/java/JavaStreamingContext.scala | 25 +-
.../streaming/dstream/CoGroupedDStream.scala | 3 +-
.../dstream/ConstantInputDStream.scala | 2 +-
.../streaming/dstream/FileInputDStream.scala | 2 +-
.../streaming/dstream/FilteredDStream.scala | 2 +-
.../dstream/FlatMapValuedDStream.scala | 2 +-
.../streaming/dstream/FlatMappedDStream.scala | 2 +-
.../streaming/dstream/FlumeInputDStream.scala | 15 +-
.../streaming/dstream/ForEachDStream.scala | 2 +-
.../streaming/dstream/GlommedDStream.scala | 2 +-
.../dstream/MapPartitionedDStream.scala | 2 +-
.../streaming/dstream/MapValuedDStream.scala | 2 +-
.../spark/streaming/dstream/MappedDStream.scala | 2 +-
.../streaming/dstream/NetworkInputDStream.scala | 15 +-
.../streaming/dstream/QueueInputDStream.scala | 2 +-
.../dstream/ReducedWindowedDStream.scala | 2 +-
.../streaming/dstream/ShuffledDStream.scala | 3 +-
.../spark/streaming/dstream/StateDStream.scala | 2 +-
.../streaming/dstream/TransformedDStream.scala | 2 +-
.../spark/streaming/dstream/UnionDStream.scala | 2 +-
.../streaming/dstream/WindowedDStream.scala | 2 +-
.../streaming/util/MasterFailureTest.scala | 3 +-
.../spark/streaming/util/RawTextSender.scala | 3 +-
.../apache/spark/streaming/TestSuiteBase.scala | 5 +-
.../tools/JavaAPICompletenessChecker.scala | 38 +-
210 files changed, 5290 insertions(+), 5246 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
----------------------------------------------------------------------
diff --git a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
index fec8737..44e26bb 100644
--- a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
+++ b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
@@ -19,7 +19,7 @@ package org.apache.spark.bagel
import org.apache.spark._
import org.apache.spark.SparkContext._
-
+import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
object Bagel extends Logging {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/Accumulators.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala
index 5177ee5..6e922a6 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulators.scala
@@ -21,6 +21,7 @@ import java.io._
import scala.collection.mutable.Map
import scala.collection.generic.Growable
+import org.apache.spark.serializer.JavaSerializer
/**
* A datatype that can be accumulated, i.e. has an commutative and associative "add" operation,
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/CacheManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 42e465b..e299a10 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -19,6 +19,7 @@ package org.apache.spark
import scala.collection.mutable.{ArrayBuffer, HashSet}
import org.apache.spark.storage.{BlockManager, StorageLevel}
+import org.apache.spark.rdd.RDD
/** Spark class responsible for passing RDDs split contents to the BlockManager and making
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/ClosureCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/ClosureCleaner.scala
deleted file mode 100644
index 71d9e62..0000000
--- a/core/src/main/scala/org/apache/spark/ClosureCleaner.scala
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import java.lang.reflect.Field
-
-import scala.collection.mutable.Map
-import scala.collection.mutable.Set
-
-import org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor, Type}
-import org.objectweb.asm.Opcodes._
-import java.io.{InputStream, IOException, ByteArrayOutputStream, ByteArrayInputStream, BufferedInputStream}
-
-private[spark] object ClosureCleaner extends Logging {
- // Get an ASM class reader for a given class from the JAR that loaded it
- private def getClassReader(cls: Class[_]): ClassReader = {
- // Copy data over, before delegating to ClassReader - else we can run out of open file handles.
- val className = cls.getName.replaceFirst("^.*\\.", "") + ".class"
- val resourceStream = cls.getResourceAsStream(className)
- // todo: Fixme - continuing with earlier behavior ...
- if (resourceStream == null) return new ClassReader(resourceStream)
-
- val baos = new ByteArrayOutputStream(128)
- Utils.copyStream(resourceStream, baos, true)
- new ClassReader(new ByteArrayInputStream(baos.toByteArray))
- }
-
- // Check whether a class represents a Scala closure
- private def isClosure(cls: Class[_]): Boolean = {
- cls.getName.contains("$anonfun$")
- }
-
- // Get a list of the classes of the outer objects of a given closure object, obj;
- // the outer objects are defined as any closures that obj is nested within, plus
- // possibly the class that the outermost closure is in, if any. We stop searching
- // for outer objects beyond that because cloning the user's object is probably
- // not a good idea (whereas we can clone closure objects just fine since we
- // understand how all their fields are used).
- private def getOuterClasses(obj: AnyRef): List[Class[_]] = {
- for (f <- obj.getClass.getDeclaredFields if f.getName == "$outer") {
- f.setAccessible(true)
- if (isClosure(f.getType)) {
- return f.getType :: getOuterClasses(f.get(obj))
- } else {
- return f.getType :: Nil // Stop at the first $outer that is not a closure
- }
- }
- return Nil
- }
-
- // Get a list of the outer objects for a given closure object.
- private def getOuterObjects(obj: AnyRef): List[AnyRef] = {
- for (f <- obj.getClass.getDeclaredFields if f.getName == "$outer") {
- f.setAccessible(true)
- if (isClosure(f.getType)) {
- return f.get(obj) :: getOuterObjects(f.get(obj))
- } else {
- return f.get(obj) :: Nil // Stop at the first $outer that is not a closure
- }
- }
- return Nil
- }
-
- private def getInnerClasses(obj: AnyRef): List[Class[_]] = {
- val seen = Set[Class[_]](obj.getClass)
- var stack = List[Class[_]](obj.getClass)
- while (!stack.isEmpty) {
- val cr = getClassReader(stack.head)
- stack = stack.tail
- val set = Set[Class[_]]()
- cr.accept(new InnerClosureFinder(set), 0)
- for (cls <- set -- seen) {
- seen += cls
- stack = cls :: stack
- }
- }
- return (seen - obj.getClass).toList
- }
-
- private def createNullValue(cls: Class[_]): AnyRef = {
- if (cls.isPrimitive) {
- new java.lang.Byte(0: Byte) // Should be convertible to any primitive type
- } else {
- null
- }
- }
-
- def clean(func: AnyRef) {
- // TODO: cache outerClasses / innerClasses / accessedFields
- val outerClasses = getOuterClasses(func)
- val innerClasses = getInnerClasses(func)
- val outerObjects = getOuterObjects(func)
-
- val accessedFields = Map[Class[_], Set[String]]()
- for (cls <- outerClasses)
- accessedFields(cls) = Set[String]()
- for (cls <- func.getClass :: innerClasses)
- getClassReader(cls).accept(new FieldAccessFinder(accessedFields), 0)
- //logInfo("accessedFields: " + accessedFields)
-
- val inInterpreter = {
- try {
- val interpClass = Class.forName("spark.repl.Main")
- interpClass.getMethod("interp").invoke(null) != null
- } catch {
- case _: ClassNotFoundException => true
- }
- }
-
- var outerPairs: List[(Class[_], AnyRef)] = (outerClasses zip outerObjects).reverse
- var outer: AnyRef = null
- if (outerPairs.size > 0 && !isClosure(outerPairs.head._1)) {
- // The closure is ultimately nested inside a class; keep the object of that
- // class without cloning it since we don't want to clone the user's objects.
- outer = outerPairs.head._2
- outerPairs = outerPairs.tail
- }
- // Clone the closure objects themselves, nulling out any fields that are not
- // used in the closure we're working on or any of its inner closures.
- for ((cls, obj) <- outerPairs) {
- outer = instantiateClass(cls, outer, inInterpreter)
- for (fieldName <- accessedFields(cls)) {
- val field = cls.getDeclaredField(fieldName)
- field.setAccessible(true)
- val value = field.get(obj)
- //logInfo("1: Setting " + fieldName + " on " + cls + " to " + value);
- field.set(outer, value)
- }
- }
-
- if (outer != null) {
- //logInfo("2: Setting $outer on " + func.getClass + " to " + outer);
- val field = func.getClass.getDeclaredField("$outer")
- field.setAccessible(true)
- field.set(func, outer)
- }
- }
-
- private def instantiateClass(cls: Class[_], outer: AnyRef, inInterpreter: Boolean): AnyRef = {
- //logInfo("Creating a " + cls + " with outer = " + outer)
- if (!inInterpreter) {
- // This is a bona fide closure class, whose constructor has no effects
- // other than to set its fields, so use its constructor
- val cons = cls.getConstructors()(0)
- val params = cons.getParameterTypes.map(createNullValue).toArray
- if (outer != null)
- params(0) = outer // First param is always outer object
- return cons.newInstance(params: _*).asInstanceOf[AnyRef]
- } else {
- // Use reflection to instantiate object without calling constructor
- val rf = sun.reflect.ReflectionFactory.getReflectionFactory()
- val parentCtor = classOf[java.lang.Object].getDeclaredConstructor()
- val newCtor = rf.newConstructorForSerialization(cls, parentCtor)
- val obj = newCtor.newInstance().asInstanceOf[AnyRef]
- if (outer != null) {
- //logInfo("3: Setting $outer on " + cls + " to " + outer);
- val field = cls.getDeclaredField("$outer")
- field.setAccessible(true)
- field.set(obj, outer)
- }
- return obj
- }
- }
-}
-
-private[spark] class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends ClassVisitor(ASM4) {
- override def visitMethod(access: Int, name: String, desc: String,
- sig: String, exceptions: Array[String]): MethodVisitor = {
- return new MethodVisitor(ASM4) {
- override def visitFieldInsn(op: Int, owner: String, name: String, desc: String) {
- if (op == GETFIELD) {
- for (cl <- output.keys if cl.getName == owner.replace('/', '.')) {
- output(cl) += name
- }
- }
- }
-
- override def visitMethodInsn(op: Int, owner: String, name: String,
- desc: String) {
- // Check for calls a getter method for a variable in an interpreter wrapper object.
- // This means that the corresponding field will be accessed, so we should save it.
- if (op == INVOKEVIRTUAL && owner.endsWith("$iwC") && !name.endsWith("$outer")) {
- for (cl <- output.keys if cl.getName == owner.replace('/', '.')) {
- output(cl) += name
- }
- }
- }
- }
- }
-}
-
-private[spark] class InnerClosureFinder(output: Set[Class[_]]) extends ClassVisitor(ASM4) {
- var myName: String = null
-
- override def visit(version: Int, access: Int, name: String, sig: String,
- superName: String, interfaces: Array[String]) {
- myName = name
- }
-
- override def visitMethod(access: Int, name: String, desc: String,
- sig: String, exceptions: Array[String]): MethodVisitor = {
- return new MethodVisitor(ASM4) {
- override def visitMethodInsn(op: Int, owner: String, name: String,
- desc: String) {
- val argTypes = Type.getArgumentTypes(desc)
- if (op == INVOKESPECIAL && name == "<init>" && argTypes.length > 0
- && argTypes(0).toString.startsWith("L") // is it an object?
- && argTypes(0).getInternalName == myName)
- output += Class.forName(
- owner.replace('/', '.'),
- false,
- Thread.currentThread.getContextClassLoader)
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/Dependency.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala
index cc3c247..cc30105 100644
--- a/core/src/main/scala/org/apache/spark/Dependency.scala
+++ b/core/src/main/scala/org/apache/spark/Dependency.scala
@@ -17,6 +17,8 @@
package org.apache.spark
+import org.apache.spark.rdd.RDD
+
/**
* Base class for dependencies.
*/
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/DoubleRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/DoubleRDDFunctions.scala
deleted file mode 100644
index dd34449..0000000
--- a/core/src/main/scala/org/apache/spark/DoubleRDDFunctions.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import org.apache.spark.partial.BoundedDouble
-import org.apache.spark.partial.MeanEvaluator
-import org.apache.spark.partial.PartialResult
-import org.apache.spark.partial.SumEvaluator
-import org.apache.spark.util.StatCounter
-
-/**
- * Extra functions available on RDDs of Doubles through an implicit conversion.
- * Import `spark.SparkContext._` at the top of your program to use these functions.
- */
-class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
- /** Add up the elements in this RDD. */
- def sum(): Double = {
- self.reduce(_ + _)
- }
-
- /**
- * Return a [[org.apache.spark.util.StatCounter]] object that captures the mean, variance and count
- * of the RDD's elements in one operation.
- */
- def stats(): StatCounter = {
- self.mapPartitions(nums => Iterator(StatCounter(nums))).reduce((a, b) => a.merge(b))
- }
-
- /** Compute the mean of this RDD's elements. */
- def mean(): Double = stats().mean
-
- /** Compute the variance of this RDD's elements. */
- def variance(): Double = stats().variance
-
- /** Compute the standard deviation of this RDD's elements. */
- def stdev(): Double = stats().stdev
-
- /**
- * Compute the sample standard deviation of this RDD's elements (which corrects for bias in
- * estimating the standard deviation by dividing by N-1 instead of N).
- */
- def sampleStdev(): Double = stats().sampleStdev
-
- /**
- * Compute the sample variance of this RDD's elements (which corrects for bias in
- * estimating the variance by dividing by N-1 instead of N).
- */
- def sampleVariance(): Double = stats().sampleVariance
-
- /** (Experimental) Approximate operation to return the mean within a timeout. */
- def meanApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
- val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns)
- val evaluator = new MeanEvaluator(self.partitions.size, confidence)
- self.context.runApproximateJob(self, processPartition, evaluator, timeout)
- }
-
- /** (Experimental) Approximate operation to return the sum within a timeout. */
- def sumApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
- val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns)
- val evaluator = new SumEvaluator(self.partitions.size, confidence)
- self.context.runApproximateJob(self, processPartition, evaluator, timeout)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/HttpFileServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
index 9b3a896..ad1ee20 100644
--- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
@@ -19,6 +19,7 @@ package org.apache.spark
import java.io.{File}
import com.google.common.io.Files
+import org.apache.spark.util.Utils
private[spark] class HttpFileServer extends Logging {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/HttpServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala
index db36c7c..cdfc9dd 100644
--- a/core/src/main/scala/org/apache/spark/HttpServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpServer.scala
@@ -26,6 +26,7 @@ import org.eclipse.jetty.server.handler.DefaultHandler
import org.eclipse.jetty.server.handler.HandlerList
import org.eclipse.jetty.server.handler.ResourceHandler
import org.eclipse.jetty.util.thread.QueuedThreadPool
+import org.apache.spark.util.Utils
/**
* Exception type thrown by HttpServer when it is in the wrong state for an operation.
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/JavaSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/JavaSerializer.scala
deleted file mode 100644
index f43396c..0000000
--- a/core/src/main/scala/org/apache/spark/JavaSerializer.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import java.io._
-import java.nio.ByteBuffer
-
-import serializer.{Serializer, SerializerInstance, DeserializationStream, SerializationStream}
-import org.apache.spark.util.ByteBufferInputStream
-
-private[spark] class JavaSerializationStream(out: OutputStream) extends SerializationStream {
- val objOut = new ObjectOutputStream(out)
- def writeObject[T](t: T): SerializationStream = { objOut.writeObject(t); this }
- def flush() { objOut.flush() }
- def close() { objOut.close() }
-}
-
-private[spark] class JavaDeserializationStream(in: InputStream, loader: ClassLoader)
-extends DeserializationStream {
- val objIn = new ObjectInputStream(in) {
- override def resolveClass(desc: ObjectStreamClass) =
- Class.forName(desc.getName, false, loader)
- }
-
- def readObject[T](): T = objIn.readObject().asInstanceOf[T]
- def close() { objIn.close() }
-}
-
-private[spark] class JavaSerializerInstance extends SerializerInstance {
- def serialize[T](t: T): ByteBuffer = {
- val bos = new ByteArrayOutputStream()
- val out = serializeStream(bos)
- out.writeObject(t)
- out.close()
- ByteBuffer.wrap(bos.toByteArray)
- }
-
- def deserialize[T](bytes: ByteBuffer): T = {
- val bis = new ByteBufferInputStream(bytes)
- val in = deserializeStream(bis)
- in.readObject().asInstanceOf[T]
- }
-
- def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = {
- val bis = new ByteBufferInputStream(bytes)
- val in = deserializeStream(bis, loader)
- in.readObject().asInstanceOf[T]
- }
-
- def serializeStream(s: OutputStream): SerializationStream = {
- new JavaSerializationStream(s)
- }
-
- def deserializeStream(s: InputStream): DeserializationStream = {
- new JavaDeserializationStream(s, Thread.currentThread.getContextClassLoader)
- }
-
- def deserializeStream(s: InputStream, loader: ClassLoader): DeserializationStream = {
- new JavaDeserializationStream(s, loader)
- }
-}
-
-/**
- * A Spark serializer that uses Java's built-in serialization.
- */
-class JavaSerializer extends Serializer {
- def newInstance(): SerializerInstance = new JavaSerializerInstance
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/KryoSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/KryoSerializer.scala
deleted file mode 100644
index db86e6d..0000000
--- a/core/src/main/scala/org/apache/spark/KryoSerializer.scala
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import java.io._
-import java.nio.ByteBuffer
-import com.esotericsoftware.kryo.{Kryo, KryoException}
-import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
-import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
-import com.twitter.chill.ScalaKryoInstantiator
-import serializer.{SerializerInstance, DeserializationStream, SerializationStream}
-import org.apache.spark.broadcast._
-import org.apache.spark.storage._
-
-private[spark]
-class KryoSerializationStream(kryo: Kryo, outStream: OutputStream) extends SerializationStream {
- val output = new KryoOutput(outStream)
-
- def writeObject[T](t: T): SerializationStream = {
- kryo.writeClassAndObject(output, t)
- this
- }
-
- def flush() { output.flush() }
- def close() { output.close() }
-}
-
-private[spark]
-class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends DeserializationStream {
- val input = new KryoInput(inStream)
-
- def readObject[T](): T = {
- try {
- kryo.readClassAndObject(input).asInstanceOf[T]
- } catch {
- // DeserializationStream uses the EOF exception to indicate stopping condition.
- case _: KryoException => throw new EOFException
- }
- }
-
- def close() {
- // Kryo's Input automatically closes the input stream it is using.
- input.close()
- }
-}
-
-private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance {
- val kryo = ks.newKryo()
- val output = ks.newKryoOutput()
- val input = ks.newKryoInput()
-
- def serialize[T](t: T): ByteBuffer = {
- output.clear()
- kryo.writeClassAndObject(output, t)
- ByteBuffer.wrap(output.toBytes)
- }
-
- def deserialize[T](bytes: ByteBuffer): T = {
- input.setBuffer(bytes.array)
- kryo.readClassAndObject(input).asInstanceOf[T]
- }
-
- def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = {
- val oldClassLoader = kryo.getClassLoader
- kryo.setClassLoader(loader)
- input.setBuffer(bytes.array)
- val obj = kryo.readClassAndObject(input).asInstanceOf[T]
- kryo.setClassLoader(oldClassLoader)
- obj
- }
-
- def serializeStream(s: OutputStream): SerializationStream = {
- new KryoSerializationStream(kryo, s)
- }
-
- def deserializeStream(s: InputStream): DeserializationStream = {
- new KryoDeserializationStream(kryo, s)
- }
-}
-
-/**
- * Interface implemented by clients to register their classes with Kryo when using Kryo
- * serialization.
- */
-trait KryoRegistrator {
- def registerClasses(kryo: Kryo)
-}
-
-/**
- * A Spark serializer that uses the [[http://code.google.com/p/kryo/wiki/V1Documentation Kryo 1.x library]].
- */
-class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging {
- private val bufferSize = System.getProperty("spark.kryoserializer.buffer.mb", "2").toInt * 1024 * 1024
-
- def newKryoOutput() = new KryoOutput(bufferSize)
-
- def newKryoInput() = new KryoInput(bufferSize)
-
- def newKryo(): Kryo = {
- val instantiator = new ScalaKryoInstantiator
- val kryo = instantiator.newKryo()
- val classLoader = Thread.currentThread.getContextClassLoader
-
- // Register some commonly used classes
- val toRegister: Seq[AnyRef] = Seq(
- ByteBuffer.allocate(1),
- StorageLevel.MEMORY_ONLY,
- PutBlock("1", ByteBuffer.allocate(1), StorageLevel.MEMORY_ONLY),
- GotBlock("1", ByteBuffer.allocate(1)),
- GetBlock("1")
- )
-
- for (obj <- toRegister) kryo.register(obj.getClass)
-
- // Allow sending SerializableWritable
- kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer())
- kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer())
-
- // Allow the user to register their own classes by setting spark.kryo.registrator
- try {
- Option(System.getProperty("spark.kryo.registrator")).foreach { regCls =>
- logDebug("Running user registrator: " + regCls)
- val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]
- reg.registerClasses(kryo)
- }
- } catch {
- case _: Exception => println("Failed to register spark.kryo.registrator")
- }
-
- kryo.setClassLoader(classLoader)
-
- // Allow disabling Kryo reference tracking if user knows their object graphs don't have loops
- kryo.setReferences(System.getProperty("spark.kryo.referenceTracking", "true").toBoolean)
-
- kryo
- }
-
- def newInstance(): SerializerInstance = {
- new KryoSerializerInstance(this)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 0f422d9..ae7cf2a 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -32,7 +32,7 @@ import akka.util.Duration
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.storage.BlockManagerId
-import org.apache.spark.util.{MetadataCleaner, TimeStampedHashMap}
+import org.apache.spark.util.{Utils, MetadataCleaner, TimeStampedHashMap}
private[spark] sealed trait MapOutputTrackerMessage
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/PairRDDFunctions.scala
deleted file mode 100644
index d046e7c..0000000
--- a/core/src/main/scala/org/apache/spark/PairRDDFunctions.scala
+++ /dev/null
@@ -1,703 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import java.nio.ByteBuffer
-import java.util.{Date, HashMap => JHashMap}
-import java.text.SimpleDateFormat
-
-import scala.collection.{mutable, Map}
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.JavaConversions._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.compress.CompressionCodec
-import org.apache.hadoop.io.SequenceFile.CompressionType
-import org.apache.hadoop.mapred.FileOutputCommitter
-import org.apache.hadoop.mapred.FileOutputFormat
-import org.apache.hadoop.mapred.SparkHadoopWriter
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapred.OutputFormat
-
-import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
-import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat,
- RecordWriter => NewRecordWriter, Job => NewAPIHadoopJob, SparkHadoopMapReduceUtil}
-import org.apache.hadoop.security.UserGroupInformation
-
-import org.apache.spark.partial.BoundedDouble
-import org.apache.spark.partial.PartialResult
-import org.apache.spark.rdd._
-import org.apache.spark.SparkContext._
-import org.apache.spark.Partitioner._
-
-/**
- * Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
- * Import `spark.SparkContext._` at the top of your program to use these functions.
- */
-class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
- extends Logging
- with SparkHadoopMapReduceUtil
- with Serializable {
-
- /**
- * Generic function to combine the elements for each key using a custom set of aggregation
- * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
- * Note that V and C can be different -- for example, one might group an RDD of type
- * (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:
- *
- * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
- * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
- * - `mergeCombiners`, to combine two C's into a single one.
- *
- * In addition, users can control the partitioning of the output RDD, and whether to perform
- * map-side aggregation (if a mapper can produce multiple items with the same key).
- */
- def combineByKey[C](createCombiner: V => C,
- mergeValue: (C, V) => C,
- mergeCombiners: (C, C) => C,
- partitioner: Partitioner,
- mapSideCombine: Boolean = true,
- serializerClass: String = null): RDD[(K, C)] = {
- if (getKeyClass().isArray) {
- if (mapSideCombine) {
- throw new SparkException("Cannot use map-side combining with array keys.")
- }
- if (partitioner.isInstanceOf[HashPartitioner]) {
- throw new SparkException("Default partitioner cannot partition array keys.")
- }
- }
- val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
- if (self.partitioner == Some(partitioner)) {
- self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
- } else if (mapSideCombine) {
- val combined = self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
- val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)
- .setSerializer(serializerClass)
- partitioned.mapPartitions(aggregator.combineCombinersByKey, preservesPartitioning = true)
- } else {
- // Don't apply map-side combiner.
- // A sanity check to make sure mergeCombiners is not defined.
- assert(mergeCombiners == null)
- val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass)
- values.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
- }
- }
-
- /**
- * Simplified version of combineByKey that hash-partitions the output RDD.
- */
- def combineByKey[C](createCombiner: V => C,
- mergeValue: (C, V) => C,
- mergeCombiners: (C, C) => C,
- numPartitions: Int): RDD[(K, C)] = {
- combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions))
- }
-
- /**
- * Merge the values for each key using an associative function and a neutral "zero value" which may
- * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
- * list concatenation, 0 for addition, or 1 for multiplication.).
- */
- def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = {
- // Serialize the zero value to a byte array so that we can get a new clone of it on each key
- val zeroBuffer = SparkEnv.get.closureSerializer.newInstance().serialize(zeroValue)
- val zeroArray = new Array[Byte](zeroBuffer.limit)
- zeroBuffer.get(zeroArray)
-
- // When deserializing, use a lazy val to create just one instance of the serializer per task
- lazy val cachedSerializer = SparkEnv.get.closureSerializer.newInstance()
- def createZero() = cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))
-
- combineByKey[V]((v: V) => func(createZero(), v), func, func, partitioner)
- }
-
- /**
- * Merge the values for each key using an associative function and a neutral "zero value" which may
- * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
- * list concatenation, 0 for addition, or 1 for multiplication.).
- */
- def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] = {
- foldByKey(zeroValue, new HashPartitioner(numPartitions))(func)
- }
-
- /**
- * Merge the values for each key using an associative function and a neutral "zero value" which may
- * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
- * list concatenation, 0 for addition, or 1 for multiplication.).
- */
- def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = {
- foldByKey(zeroValue, defaultPartitioner(self))(func)
- }
-
- /**
- * Merge the values for each key using an associative reduce function. This will also perform
- * the merging locally on each mapper before sending results to a reducer, similarly to a
- * "combiner" in MapReduce.
- */
- def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
- combineByKey[V]((v: V) => v, func, func, partitioner)
- }
-
- /**
- * Merge the values for each key using an associative reduce function, but return the results
- * immediately to the master as a Map. This will also perform the merging locally on each mapper
- * before sending results to a reducer, similarly to a "combiner" in MapReduce.
- */
- def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = {
-
- if (getKeyClass().isArray) {
- throw new SparkException("reduceByKeyLocally() does not support array keys")
- }
-
- def reducePartition(iter: Iterator[(K, V)]): Iterator[JHashMap[K, V]] = {
- val map = new JHashMap[K, V]
- iter.foreach { case (k, v) =>
- val old = map.get(k)
- map.put(k, if (old == null) v else func(old, v))
- }
- Iterator(map)
- }
-
- def mergeMaps(m1: JHashMap[K, V], m2: JHashMap[K, V]): JHashMap[K, V] = {
- m2.foreach { case (k, v) =>
- val old = m1.get(k)
- m1.put(k, if (old == null) v else func(old, v))
- }
- m1
- }
-
- self.mapPartitions(reducePartition).reduce(mergeMaps)
- }
-
- /** Alias for reduceByKeyLocally */
- def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = reduceByKeyLocally(func)
-
- /** Count the number of elements for each key, and return the result to the master as a Map. */
- def countByKey(): Map[K, Long] = self.map(_._1).countByValue()
-
- /**
- * (Experimental) Approximate version of countByKey that can return a partial result if it does
- * not finish within a timeout.
- */
- def countByKeyApprox(timeout: Long, confidence: Double = 0.95)
- : PartialResult[Map[K, BoundedDouble]] = {
- self.map(_._1).countByValueApprox(timeout, confidence)
- }
-
- /**
- * Merge the values for each key using an associative reduce function. This will also perform
- * the merging locally on each mapper before sending results to a reducer, similarly to a
- * "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
- */
- def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = {
- reduceByKey(new HashPartitioner(numPartitions), func)
- }
-
- /**
- * Group the values for each key in the RDD into a single sequence. Allows controlling the
- * partitioning of the resulting key-value pair RDD by passing a Partitioner.
- */
- def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = {
- // groupByKey shouldn't use map side combine because map side combine does not
- // reduce the amount of data shuffled and requires all map side data be inserted
- // into a hash table, leading to more objects in the old gen.
- def createCombiner(v: V) = ArrayBuffer(v)
- def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
- val bufs = combineByKey[ArrayBuffer[V]](
- createCombiner _, mergeValue _, null, partitioner, mapSideCombine=false)
- bufs.asInstanceOf[RDD[(K, Seq[V])]]
- }
-
- /**
- * Group the values for each key in the RDD into a single sequence. Hash-partitions the
- * resulting RDD with into `numPartitions` partitions.
- */
- def groupByKey(numPartitions: Int): RDD[(K, Seq[V])] = {
- groupByKey(new HashPartitioner(numPartitions))
- }
-
- /**
- * Return a copy of the RDD partitioned using the specified partitioner.
- */
- def partitionBy(partitioner: Partitioner): RDD[(K, V)] = {
- if (getKeyClass().isArray && partitioner.isInstanceOf[HashPartitioner]) {
- throw new SparkException("Default partitioner cannot partition array keys.")
- }
- new ShuffledRDD[K, V, (K, V)](self, partitioner)
- }
-
- /**
- * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
- * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
- * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
- */
- def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
- this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
- for (v <- vs.iterator; w <- ws.iterator) yield (v, w)
- }
- }
-
- /**
- * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
- * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
- * pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to
- * partition the output RDD.
- */
- def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
- this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
- if (ws.isEmpty) {
- vs.iterator.map(v => (v, None))
- } else {
- for (v <- vs.iterator; w <- ws.iterator) yield (v, Some(w))
- }
- }
- }
-
- /**
- * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
- * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
- * pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to
- * partition the output RDD.
- */
- def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
- : RDD[(K, (Option[V], W))] = {
- this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
- if (vs.isEmpty) {
- ws.iterator.map(w => (None, w))
- } else {
- for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), w)
- }
- }
- }
-
- /**
- * Simplified version of combineByKey that hash-partitions the resulting RDD using the
- * existing partitioner/parallelism level.
- */
- def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
- : RDD[(K, C)] = {
- combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self))
- }
-
- /**
- * Merge the values for each key using an associative reduce function. This will also perform
- * the merging locally on each mapper before sending results to a reducer, similarly to a
- * "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
- * parallelism level.
- */
- def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {
- reduceByKey(defaultPartitioner(self), func)
- }
-
- /**
- * Group the values for each key in the RDD into a single sequence. Hash-partitions the
- * resulting RDD with the existing partitioner/parallelism level.
- */
- def groupByKey(): RDD[(K, Seq[V])] = {
- groupByKey(defaultPartitioner(self))
- }
-
- /**
- * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
- * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
- * (k, v2) is in `other`. Performs a hash join across the cluster.
- */
- def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = {
- join(other, defaultPartitioner(self, other))
- }
-
- /**
- * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
- * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
- * (k, v2) is in `other`. Performs a hash join across the cluster.
- */
- def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] = {
- join(other, new HashPartitioner(numPartitions))
- }
-
- /**
- * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
- * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
- * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
- * using the existing partitioner/parallelism level.
- */
- def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = {
- leftOuterJoin(other, defaultPartitioner(self, other))
- }
-
- /**
- * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
- * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
- * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
- * into `numPartitions` partitions.
- */
- def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))] = {
- leftOuterJoin(other, new HashPartitioner(numPartitions))
- }
-
- /**
- * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
- * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
- * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
- * RDD using the existing partitioner/parallelism level.
- */
- def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = {
- rightOuterJoin(other, defaultPartitioner(self, other))
- }
-
- /**
- * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
- * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
- * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
- * RDD into the given number of partitions.
- */
- def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))] = {
- rightOuterJoin(other, new HashPartitioner(numPartitions))
- }
-
- /**
- * Return the key-value pairs in this RDD to the master as a Map.
- */
- def collectAsMap(): Map[K, V] = {
- val data = self.toArray()
- val map = new mutable.HashMap[K, V]
- map.sizeHint(data.length)
- data.foreach { case (k, v) => map.put(k, v) }
- map
- }
-
- /**
- * Pass each value in the key-value pair RDD through a map function without changing the keys;
- * this also retains the original RDD's partitioning.
- */
- def mapValues[U](f: V => U): RDD[(K, U)] = {
- val cleanF = self.context.clean(f)
- new MappedValuesRDD(self, cleanF)
- }
-
- /**
- * Pass each value in the key-value pair RDD through a flatMap function without changing the
- * keys; this also retains the original RDD's partitioning.
- */
- def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = {
- val cleanF = self.context.clean(f)
- new FlatMappedValuesRDD(self, cleanF)
- }
-
- /**
- * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
- * list of values for that key in `this` as well as `other`.
- */
- def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = {
- if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
- throw new SparkException("Default partitioner cannot partition array keys.")
- }
- val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
- val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
- prfs.mapValues { case Seq(vs, ws) =>
- (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
- }
- }
-
- /**
- * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
- * tuple with the list of values for that key in `this`, `other1` and `other2`.
- */
- def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
- : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
- if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
- throw new SparkException("Default partitioner cannot partition array keys.")
- }
- val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
- val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
- prfs.mapValues { case Seq(vs, w1s, w2s) =>
- (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]])
- }
- }
-
- /**
- * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
- * list of values for that key in `this` as well as `other`.
- */
- def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
- cogroup(other, defaultPartitioner(self, other))
- }
-
- /**
- * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
- * tuple with the list of values for that key in `this`, `other1` and `other2`.
- */
- def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
- : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
- cogroup(other1, other2, defaultPartitioner(self, other1, other2))
- }
-
- /**
- * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
- * list of values for that key in `this` as well as `other`.
- */
- def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Seq[V], Seq[W]))] = {
- cogroup(other, new HashPartitioner(numPartitions))
- }
-
- /**
- * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
- * tuple with the list of values for that key in `this`, `other1` and `other2`.
- */
- def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int)
- : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
- cogroup(other1, other2, new HashPartitioner(numPartitions))
- }
-
- /** Alias for cogroup. */
- def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
- cogroup(other, defaultPartitioner(self, other))
- }
-
- /** Alias for cogroup. */
- def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
- : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
- cogroup(other1, other2, defaultPartitioner(self, other1, other2))
- }
-
- /**
- * Return an RDD with the pairs from `this` whose keys are not in `other`.
- *
- * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
- * RDD will be <= us.
- */
- def subtractByKey[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, V)] =
- subtractByKey(other, self.partitioner.getOrElse(new HashPartitioner(self.partitions.size)))
-
- /** Return an RDD with the pairs from `this` whose keys are not in `other`. */
- def subtractByKey[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)] =
- subtractByKey(other, new HashPartitioner(numPartitions))
-
- /** Return an RDD with the pairs from `this` whose keys are not in `other`. */
- def subtractByKey[W: ClassManifest](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)] =
- new SubtractedRDD[K, V, W](self, other, p)
-
- /**
- * Return the list of values in the RDD for key `key`. This operation is done efficiently if the
- * RDD has a known partitioner by only searching the partition that the key maps to.
- */
- def lookup(key: K): Seq[V] = {
- self.partitioner match {
- case Some(p) =>
- val index = p.getPartition(key)
- def process(it: Iterator[(K, V)]): Seq[V] = {
- val buf = new ArrayBuffer[V]
- for ((k, v) <- it if k == key) {
- buf += v
- }
- buf
- }
- val res = self.context.runJob(self, process _, Array(index), false)
- res(0)
- case None =>
- self.filter(_._1 == key).map(_._2).collect()
- }
- }
-
- /**
- * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
- * supporting the key and value types K and V in this RDD.
- */
- def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassManifest[F]) {
- saveAsHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
- }
-
- /**
- * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
- * supporting the key and value types K and V in this RDD. Compress the result with the
- * supplied codec.
- */
- def saveAsHadoopFile[F <: OutputFormat[K, V]](
- path: String, codec: Class[_ <: CompressionCodec]) (implicit fm: ClassManifest[F]) {
- saveAsHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]], codec)
- }
-
- /**
- * Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat`
- * (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD.
- */
- def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String)(implicit fm: ClassManifest[F]) {
- saveAsNewAPIHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
- }
-
- /**
- * Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat`
- * (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD.
- */
- def saveAsNewAPIHadoopFile(
- path: String,
- keyClass: Class[_],
- valueClass: Class[_],
- outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
- conf: Configuration = self.context.hadoopConfiguration) {
- val job = new NewAPIHadoopJob(conf)
- job.setOutputKeyClass(keyClass)
- job.setOutputValueClass(valueClass)
- val wrappedConf = new SerializableWritable(job.getConfiguration)
- NewFileOutputFormat.setOutputPath(job, new Path(path))
- val formatter = new SimpleDateFormat("yyyyMMddHHmm")
- val jobtrackerID = formatter.format(new Date())
- val stageId = self.id
- def writeShard(context: TaskContext, iter: Iterator[(K,V)]): Int = {
- // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
- // around by taking a mod. We expect that no task will be attempted 2 billion times.
- val attemptNumber = (context.attemptId % Int.MaxValue).toInt
- /* "reduce task" <split #> <attempt # = spark task #> */
- val attemptId = newTaskAttemptID(jobtrackerID, stageId, false, context.splitId, attemptNumber)
- val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
- val format = outputFormatClass.newInstance
- val committer = format.getOutputCommitter(hadoopContext)
- committer.setupTask(hadoopContext)
- val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
- while (iter.hasNext) {
- val (k, v) = iter.next()
- writer.write(k, v)
- }
- writer.close(hadoopContext)
- committer.commitTask(hadoopContext)
- return 1
- }
- val jobFormat = outputFormatClass.newInstance
- /* apparently we need a TaskAttemptID to construct an OutputCommitter;
- * however we're only going to use this local OutputCommitter for
- * setupJob/commitJob, so we just use a dummy "map" task.
- */
- val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, true, 0, 0)
- val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
- val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
- jobCommitter.setupJob(jobTaskContext)
- val count = self.context.runJob(self, writeShard _).sum
- jobCommitter.commitJob(jobTaskContext)
- jobCommitter.cleanupJob(jobTaskContext)
- }
-
- /**
- * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
- * supporting the key and value types K and V in this RDD. Compress with the supplied codec.
- */
- def saveAsHadoopFile(
- path: String,
- keyClass: Class[_],
- valueClass: Class[_],
- outputFormatClass: Class[_ <: OutputFormat[_, _]],
- codec: Class[_ <: CompressionCodec]) {
- saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass,
- new JobConf(self.context.hadoopConfiguration), Some(codec))
- }
-
- /**
- * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
- * supporting the key and value types K and V in this RDD.
- */
- def saveAsHadoopFile(
- path: String,
- keyClass: Class[_],
- valueClass: Class[_],
- outputFormatClass: Class[_ <: OutputFormat[_, _]],
- conf: JobConf = new JobConf(self.context.hadoopConfiguration),
- codec: Option[Class[_ <: CompressionCodec]] = None) {
- conf.setOutputKeyClass(keyClass)
- conf.setOutputValueClass(valueClass)
- // conf.setOutputFormat(outputFormatClass) // Doesn't work in Scala 2.9 due to what may be a generics bug
- conf.set("mapred.output.format.class", outputFormatClass.getName)
- for (c <- codec) {
- conf.setCompressMapOutput(true)
- conf.set("mapred.output.compress", "true")
- conf.setMapOutputCompressorClass(c)
- conf.set("mapred.output.compression.codec", c.getCanonicalName)
- conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
- }
- conf.setOutputCommitter(classOf[FileOutputCommitter])
- FileOutputFormat.setOutputPath(conf, SparkHadoopWriter.createPathFromString(path, conf))
- saveAsHadoopDataset(conf)
- }
-
- /**
- * Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for
- * that storage system. The JobConf should set an OutputFormat and any output paths required
- * (e.g. a table name to write to) in the same way as it would be configured for a Hadoop
- * MapReduce job.
- */
- def saveAsHadoopDataset(conf: JobConf) {
- val outputFormatClass = conf.getOutputFormat
- val keyClass = conf.getOutputKeyClass
- val valueClass = conf.getOutputValueClass
- if (outputFormatClass == null) {
- throw new SparkException("Output format class not set")
- }
- if (keyClass == null) {
- throw new SparkException("Output key class not set")
- }
- if (valueClass == null) {
- throw new SparkException("Output value class not set")
- }
-
- logInfo("Saving as hadoop file of type (" + keyClass.getSimpleName+ ", " + valueClass.getSimpleName+ ")")
-
- val writer = new SparkHadoopWriter(conf)
- writer.preSetup()
-
- def writeToFile(context: TaskContext, iter: Iterator[(K, V)]) {
- // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
- // around by taking a mod. We expect that no task will be attempted 2 billion times.
- val attemptNumber = (context.attemptId % Int.MaxValue).toInt
-
- writer.setup(context.stageId, context.splitId, attemptNumber)
- writer.open()
-
- var count = 0
- while(iter.hasNext) {
- val record = iter.next()
- count += 1
- writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
- }
-
- writer.close()
- writer.commit()
- }
-
- self.context.runJob(self, writeToFile _)
- writer.commitJob()
- writer.cleanup()
- }
-
- /**
- * Return an RDD with the keys of each tuple.
- */
- def keys: RDD[K] = self.map(_._1)
-
- /**
- * Return an RDD with the values of each tuple.
- */
- def values: RDD[V] = self.map(_._2)
-
- private[spark] def getKeyClass() = implicitly[ClassManifest[K]].erasure
-
- private[spark] def getValueClass() = implicitly[ClassManifest[V]].erasure
-}
-
-
-private[spark] object Manifests {
- val seqSeqManifest = classManifest[Seq[Seq[_]]]
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/Partitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index 4dce260..0e2c987 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -17,6 +17,9 @@
package org.apache.spark
+import org.apache.spark.util.Utils
+import org.apache.spark.rdd.RDD
+
/**
* An object that defines how the elements in a key-value pair RDD are partitioned by key.
* Maps each key to a partition ID, from 0 to `numPartitions - 1`.