You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:59:09 UTC
[25/69] [abbrv] [partial] Initial work to rename package to
org.apache.spark
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/api/java/JavaSparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/spark/api/java/JavaSparkContext.scala
deleted file mode 100644
index 29d5700..0000000
--- a/core/src/main/scala/spark/api/java/JavaSparkContext.scala
+++ /dev/null
@@ -1,418 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.api.java
-
-import java.util.{Map => JMap}
-
-import scala.collection.JavaConversions
-import scala.collection.JavaConversions._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapred.InputFormat
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
-
-import spark.{Accumulable, AccumulableParam, Accumulator, AccumulatorParam, RDD, SparkContext}
-import spark.SparkContext.IntAccumulatorParam
-import spark.SparkContext.DoubleAccumulatorParam
-import spark.broadcast.Broadcast
-
-import com.google.common.base.Optional
-
-/**
- * A Java-friendly version of [[spark.SparkContext]] that returns [[spark.api.java.JavaRDD]]s and
- * works with Java collections instead of Scala ones.
- */
-class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWorkaround {
-
- /**
- * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
- * @param appName A name for your application, to display on the cluster web UI
- */
- def this(master: String, appName: String) = this(new SparkContext(master, appName))
-
- /**
- * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
- * @param appName A name for your application, to display on the cluster web UI
- * @param sparkHome The SPARK_HOME directory on the slave nodes
- * @param jarFile JAR file to send to the cluster. This can be a path on the local file system
- * or an HDFS, HTTP, HTTPS, or FTP URL.
- */
- def this(master: String, appName: String, sparkHome: String, jarFile: String) =
- this(new SparkContext(master, appName, sparkHome, Seq(jarFile)))
-
- /**
- * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
- * @param appName A name for your application, to display on the cluster web UI
- * @param sparkHome The SPARK_HOME directory on the slave nodes
- * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
- * system or HDFS, HTTP, HTTPS, or FTP URLs.
- */
- def this(master: String, appName: String, sparkHome: String, jars: Array[String]) =
- this(new SparkContext(master, appName, sparkHome, jars.toSeq))
-
- /**
- * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
- * @param appName A name for your application, to display on the cluster web UI
- * @param sparkHome The SPARK_HOME directory on the slave nodes
- * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
- * system or HDFS, HTTP, HTTPS, or FTP URLs.
- * @param environment Environment variables to set on worker nodes
- */
- def this(master: String, appName: String, sparkHome: String, jars: Array[String],
- environment: JMap[String, String]) =
- this(new SparkContext(master, appName, sparkHome, jars.toSeq, environment))
-
- private[spark] val env = sc.env
-
- /** Distribute a local Scala collection to form an RDD. */
- def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = {
- implicit val cm: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
- sc.parallelize(JavaConversions.asScalaBuffer(list), numSlices)
- }
-
- /** Distribute a local Scala collection to form an RDD. */
- def parallelize[T](list: java.util.List[T]): JavaRDD[T] =
- parallelize(list, sc.defaultParallelism)
-
- /** Distribute a local Scala collection to form an RDD. */
- def parallelizePairs[K, V](list: java.util.List[Tuple2[K, V]], numSlices: Int)
- : JavaPairRDD[K, V] = {
- implicit val kcm: ClassManifest[K] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
- implicit val vcm: ClassManifest[V] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
- JavaPairRDD.fromRDD(sc.parallelize(JavaConversions.asScalaBuffer(list), numSlices))
- }
-
- /** Distribute a local Scala collection to form an RDD. */
- def parallelizePairs[K, V](list: java.util.List[Tuple2[K, V]]): JavaPairRDD[K, V] =
- parallelizePairs(list, sc.defaultParallelism)
-
- /** Distribute a local Scala collection to form an RDD. */
- def parallelizeDoubles(list: java.util.List[java.lang.Double], numSlices: Int): JavaDoubleRDD =
- JavaDoubleRDD.fromRDD(sc.parallelize(JavaConversions.asScalaBuffer(list).map(_.doubleValue()),
- numSlices))
-
- /** Distribute a local Scala collection to form an RDD. */
- def parallelizeDoubles(list: java.util.List[java.lang.Double]): JavaDoubleRDD =
- parallelizeDoubles(list, sc.defaultParallelism)
-
- /**
- * Read a text file from HDFS, a local file system (available on all nodes), or any
- * Hadoop-supported file system URI, and return it as an RDD of Strings.
- */
- def textFile(path: String): JavaRDD[String] = sc.textFile(path)
-
- /**
- * Read a text file from HDFS, a local file system (available on all nodes), or any
- * Hadoop-supported file system URI, and return it as an RDD of Strings.
- */
- def textFile(path: String, minSplits: Int): JavaRDD[String] = sc.textFile(path, minSplits)
-
- /**Get an RDD for a Hadoop SequenceFile with given key and value types. */
- def sequenceFile[K, V](path: String,
- keyClass: Class[K],
- valueClass: Class[V],
- minSplits: Int
- ): JavaPairRDD[K, V] = {
- implicit val kcm = ClassManifest.fromClass(keyClass)
- implicit val vcm = ClassManifest.fromClass(valueClass)
- new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minSplits))
- }
-
- /**Get an RDD for a Hadoop SequenceFile. */
- def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]):
- JavaPairRDD[K, V] = {
- implicit val kcm = ClassManifest.fromClass(keyClass)
- implicit val vcm = ClassManifest.fromClass(valueClass)
- new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass))
- }
-
- /**
- * Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and
- * BytesWritable values that contain a serialized partition. This is still an experimental storage
- * format and may not be supported exactly as is in future Spark releases. It will also be pretty
- * slow if you use the default serializer (Java serialization), though the nice thing about it is
- * that there's very little effort required to save arbitrary objects.
- */
- def objectFile[T](path: String, minSplits: Int): JavaRDD[T] = {
- implicit val cm: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
- sc.objectFile(path, minSplits)(cm)
- }
-
- /**
- * Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and
- * BytesWritable values that contain a serialized partition. This is still an experimental storage
- * format and may not be supported exactly as is in future Spark releases. It will also be pretty
- * slow if you use the default serializer (Java serialization), though the nice thing about it is
- * that there's very little effort required to save arbitrary objects.
- */
- def objectFile[T](path: String): JavaRDD[T] = {
- implicit val cm: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
- sc.objectFile(path)(cm)
- }
-
- /**
- * Get an RDD for a Hadoop-readable dataset from a Hadooop JobConf giving its InputFormat and any
- * other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
- * etc).
- */
- def hadoopRDD[K, V, F <: InputFormat[K, V]](
- conf: JobConf,
- inputFormatClass: Class[F],
- keyClass: Class[K],
- valueClass: Class[V],
- minSplits: Int
- ): JavaPairRDD[K, V] = {
- implicit val kcm = ClassManifest.fromClass(keyClass)
- implicit val vcm = ClassManifest.fromClass(valueClass)
- new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minSplits))
- }
-
- /**
- * Get an RDD for a Hadoop-readable dataset from a Hadooop JobConf giving its InputFormat and any
- * other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
- * etc).
- */
- def hadoopRDD[K, V, F <: InputFormat[K, V]](
- conf: JobConf,
- inputFormatClass: Class[F],
- keyClass: Class[K],
- valueClass: Class[V]
- ): JavaPairRDD[K, V] = {
- implicit val kcm = ClassManifest.fromClass(keyClass)
- implicit val vcm = ClassManifest.fromClass(valueClass)
- new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass))
- }
-
- /** Get an RDD for a Hadoop file with an arbitrary InputFormat */
- def hadoopFile[K, V, F <: InputFormat[K, V]](
- path: String,
- inputFormatClass: Class[F],
- keyClass: Class[K],
- valueClass: Class[V],
- minSplits: Int
- ): JavaPairRDD[K, V] = {
- implicit val kcm = ClassManifest.fromClass(keyClass)
- implicit val vcm = ClassManifest.fromClass(valueClass)
- new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits))
- }
-
- /** Get an RDD for a Hadoop file with an arbitrary InputFormat */
- def hadoopFile[K, V, F <: InputFormat[K, V]](
- path: String,
- inputFormatClass: Class[F],
- keyClass: Class[K],
- valueClass: Class[V]
- ): JavaPairRDD[K, V] = {
- implicit val kcm = ClassManifest.fromClass(keyClass)
- implicit val vcm = ClassManifest.fromClass(valueClass)
- new JavaPairRDD(sc.hadoopFile(path,
- inputFormatClass, keyClass, valueClass))
- }
-
- /**
- * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
- * and extra configuration options to pass to the input format.
- */
- def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
- path: String,
- fClass: Class[F],
- kClass: Class[K],
- vClass: Class[V],
- conf: Configuration): JavaPairRDD[K, V] = {
- implicit val kcm = ClassManifest.fromClass(kClass)
- implicit val vcm = ClassManifest.fromClass(vClass)
- new JavaPairRDD(sc.newAPIHadoopFile(path, fClass, kClass, vClass, conf))
- }
-
- /**
- * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
- * and extra configuration options to pass to the input format.
- */
- def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
- conf: Configuration,
- fClass: Class[F],
- kClass: Class[K],
- vClass: Class[V]): JavaPairRDD[K, V] = {
- implicit val kcm = ClassManifest.fromClass(kClass)
- implicit val vcm = ClassManifest.fromClass(vClass)
- new JavaPairRDD(sc.newAPIHadoopRDD(conf, fClass, kClass, vClass))
- }
-
- /** Build the union of two or more RDDs. */
- override def union[T](first: JavaRDD[T], rest: java.util.List[JavaRDD[T]]): JavaRDD[T] = {
- val rdds: Seq[RDD[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd)
- implicit val cm: ClassManifest[T] = first.classManifest
- sc.union(rdds)(cm)
- }
-
- /** Build the union of two or more RDDs. */
- override def union[K, V](first: JavaPairRDD[K, V], rest: java.util.List[JavaPairRDD[K, V]])
- : JavaPairRDD[K, V] = {
- val rdds: Seq[RDD[(K, V)]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd)
- implicit val cm: ClassManifest[(K, V)] = first.classManifest
- implicit val kcm: ClassManifest[K] = first.kManifest
- implicit val vcm: ClassManifest[V] = first.vManifest
- new JavaPairRDD(sc.union(rdds)(cm))(kcm, vcm)
- }
-
- /** Build the union of two or more RDDs. */
- override def union(first: JavaDoubleRDD, rest: java.util.List[JavaDoubleRDD]): JavaDoubleRDD = {
- val rdds: Seq[RDD[Double]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.srdd)
- new JavaDoubleRDD(sc.union(rdds))
- }
-
- /**
- * Create an [[spark.Accumulator]] integer variable, which tasks can "add" values
- * to using the `add` method. Only the master can access the accumulator's `value`.
- */
- def intAccumulator(initialValue: Int): Accumulator[java.lang.Integer] =
- sc.accumulator(initialValue)(IntAccumulatorParam).asInstanceOf[Accumulator[java.lang.Integer]]
-
- /**
- * Create an [[spark.Accumulator]] double variable, which tasks can "add" values
- * to using the `add` method. Only the master can access the accumulator's `value`.
- */
- def doubleAccumulator(initialValue: Double): Accumulator[java.lang.Double] =
- sc.accumulator(initialValue)(DoubleAccumulatorParam).asInstanceOf[Accumulator[java.lang.Double]]
-
- /**
- * Create an [[spark.Accumulator]] integer variable, which tasks can "add" values
- * to using the `add` method. Only the master can access the accumulator's `value`.
- */
- def accumulator(initialValue: Int): Accumulator[java.lang.Integer] = intAccumulator(initialValue)
-
- /**
- * Create an [[spark.Accumulator]] double variable, which tasks can "add" values
- * to using the `add` method. Only the master can access the accumulator's `value`.
- */
- def accumulator(initialValue: Double): Accumulator[java.lang.Double] =
- doubleAccumulator(initialValue)
-
- /**
- * Create an [[spark.Accumulator]] variable of a given type, which tasks can "add" values
- * to using the `add` method. Only the master can access the accumulator's `value`.
- */
- def accumulator[T](initialValue: T, accumulatorParam: AccumulatorParam[T]): Accumulator[T] =
- sc.accumulator(initialValue)(accumulatorParam)
-
- /**
- * Create an [[spark.Accumulable]] shared variable of the given type, to which tasks can
- * "add" values with `add`. Only the master can access the accumuable's `value`.
- */
- def accumulable[T, R](initialValue: T, param: AccumulableParam[T, R]): Accumulable[T, R] =
- sc.accumulable(initialValue)(param)
-
- /**
- * Broadcast a read-only variable to the cluster, returning a [[spark.Broadcast]] object for
- * reading it in distributed functions. The variable will be sent to each cluster only once.
- */
- def broadcast[T](value: T): Broadcast[T] = sc.broadcast(value)
-
- /** Shut down the SparkContext. */
- def stop() {
- sc.stop()
- }
-
- /**
- * Get Spark's home location from either a value set through the constructor,
- * or the spark.home Java property, or the SPARK_HOME environment variable
- * (in that order of preference). If neither of these is set, return None.
- */
- def getSparkHome(): Optional[String] = JavaUtils.optionToOptional(sc.getSparkHome())
-
- /**
- * Add a file to be downloaded with this Spark job on every node.
- * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
- * filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
- * use `SparkFiles.get(path)` to find its download location.
- */
- def addFile(path: String) {
- sc.addFile(path)
- }
-
- /**
- * Adds a JAR dependency for all tasks to be executed on this SparkContext in the future.
- * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
- * filesystems), or an HTTP, HTTPS or FTP URI.
- */
- def addJar(path: String) {
- sc.addJar(path)
- }
-
- /**
- * Clear the job's list of JARs added by `addJar` so that they do not get downloaded to
- * any new nodes.
- */
- def clearJars() {
- sc.clearJars()
- }
-
- /**
- * Clear the job's list of files added by `addFile` so that they do not get downloaded to
- * any new nodes.
- */
- def clearFiles() {
- sc.clearFiles()
- }
-
- /**
- * Returns the Hadoop configuration used for the Hadoop code (e.g. file systems) we reuse.
- */
- def hadoopConfiguration(): Configuration = {
- sc.hadoopConfiguration
- }
-
- /**
- * Set the directory under which RDDs are going to be checkpointed. The directory must
- * be a HDFS path if running on a cluster. If the directory does not exist, it will
- * be created. If the directory exists and useExisting is set to true, then the
- * exisiting directory will be used. Otherwise an exception will be thrown to
- * prevent accidental overriding of checkpoint files in the existing directory.
- */
- def setCheckpointDir(dir: String, useExisting: Boolean) {
- sc.setCheckpointDir(dir, useExisting)
- }
-
- /**
- * Set the directory under which RDDs are going to be checkpointed. The directory must
- * be a HDFS path if running on a cluster. If the directory does not exist, it will
- * be created. If the directory exists, an exception will be thrown to prevent accidental
- * overriding of checkpoint files.
- */
- def setCheckpointDir(dir: String) {
- sc.setCheckpointDir(dir)
- }
-
- protected def checkpointFile[T](path: String): JavaRDD[T] = {
- implicit val cm: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
- new JavaRDD(sc.checkpointFile(path))
- }
-}
-
-object JavaSparkContext {
- implicit def fromSparkContext(sc: SparkContext): JavaSparkContext = new JavaSparkContext(sc)
-
- implicit def toSparkContext(jsc: JavaSparkContext): SparkContext = jsc.sc
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/api/java/JavaSparkContextVarargsWorkaround.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/api/java/JavaSparkContextVarargsWorkaround.java b/core/src/main/scala/spark/api/java/JavaSparkContextVarargsWorkaround.java
deleted file mode 100644
index 42b1de0..0000000
--- a/core/src/main/scala/spark/api/java/JavaSparkContextVarargsWorkaround.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.api.java;
-
-import java.util.Arrays;
-import java.util.ArrayList;
-import java.util.List;
-
-// See
-// http://scala-programming-language.1934581.n4.nabble.com/Workaround-for-implementing-java-varargs-in-2-7-2-final-tp1944767p1944772.html
-abstract class JavaSparkContextVarargsWorkaround {
- public <T> JavaRDD<T> union(JavaRDD<T>... rdds) {
- if (rdds.length == 0) {
- throw new IllegalArgumentException("Union called on empty list");
- }
- ArrayList<JavaRDD<T>> rest = new ArrayList<JavaRDD<T>>(rdds.length - 1);
- for (int i = 1; i < rdds.length; i++) {
- rest.add(rdds[i]);
- }
- return union(rdds[0], rest);
- }
-
- public JavaDoubleRDD union(JavaDoubleRDD... rdds) {
- if (rdds.length == 0) {
- throw new IllegalArgumentException("Union called on empty list");
- }
- ArrayList<JavaDoubleRDD> rest = new ArrayList<JavaDoubleRDD>(rdds.length - 1);
- for (int i = 1; i < rdds.length; i++) {
- rest.add(rdds[i]);
- }
- return union(rdds[0], rest);
- }
-
- public <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V>... rdds) {
- if (rdds.length == 0) {
- throw new IllegalArgumentException("Union called on empty list");
- }
- ArrayList<JavaPairRDD<K, V>> rest = new ArrayList<JavaPairRDD<K, V>>(rdds.length - 1);
- for (int i = 1; i < rdds.length; i++) {
- rest.add(rdds[i]);
- }
- return union(rdds[0], rest);
- }
-
- // These methods take separate "first" and "rest" elements to avoid having the same type erasure
- abstract public <T> JavaRDD<T> union(JavaRDD<T> first, List<JavaRDD<T>> rest);
- abstract public JavaDoubleRDD union(JavaDoubleRDD first, List<JavaDoubleRDD> rest);
- abstract public <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V> first, List<JavaPairRDD<K, V>> rest);
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/api/java/JavaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/api/java/JavaUtils.scala b/core/src/main/scala/spark/api/java/JavaUtils.scala
deleted file mode 100644
index ffc131a..0000000
--- a/core/src/main/scala/spark/api/java/JavaUtils.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.api.java
-
-import com.google.common.base.Optional
-
-object JavaUtils {
- def optionToOptional[T](option: Option[T]): Optional[T] =
- option match {
- case Some(value) => Optional.of(value)
- case None => Optional.absent()
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/api/java/StorageLevels.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/api/java/StorageLevels.java b/core/src/main/scala/spark/api/java/StorageLevels.java
deleted file mode 100644
index f385636..0000000
--- a/core/src/main/scala/spark/api/java/StorageLevels.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.api.java;
-
-import spark.storage.StorageLevel;
-
-/**
- * Expose some commonly useful storage level constants.
- */
-public class StorageLevels {
- public static final StorageLevel NONE = new StorageLevel(false, false, false, 1);
- public static final StorageLevel DISK_ONLY = new StorageLevel(true, false, false, 1);
- public static final StorageLevel DISK_ONLY_2 = new StorageLevel(true, false, false, 2);
- public static final StorageLevel MEMORY_ONLY = new StorageLevel(false, true, true, 1);
- public static final StorageLevel MEMORY_ONLY_2 = new StorageLevel(false, true, true, 2);
- public static final StorageLevel MEMORY_ONLY_SER = new StorageLevel(false, true, false, 1);
- public static final StorageLevel MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, 2);
- public static final StorageLevel MEMORY_AND_DISK = new StorageLevel(true, true, true, 1);
- public static final StorageLevel MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2);
- public static final StorageLevel MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, 1);
- public static final StorageLevel MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2);
-
- /**
- * Create a new StorageLevel object.
- * @param useDisk saved to disk, if true
- * @param useMemory saved to memory, if true
- * @param deserialized saved as deserialized objects, if true
- * @param replication replication factor
- */
- public static StorageLevel create(boolean useDisk, boolean useMemory, boolean deserialized, int replication) {
- return StorageLevel.apply(useDisk, useMemory, deserialized, replication);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/api/java/function/DoubleFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/api/java/function/DoubleFlatMapFunction.java b/core/src/main/scala/spark/api/java/function/DoubleFlatMapFunction.java
deleted file mode 100644
index 8bc88d7..0000000
--- a/core/src/main/scala/spark/api/java/function/DoubleFlatMapFunction.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.api.java.function;
-
-
-import scala.runtime.AbstractFunction1;
-
-import java.io.Serializable;
-
-/**
- * A function that returns zero or more records of type Double from each input record.
- */
-// DoubleFlatMapFunction does not extend FlatMapFunction because flatMap is
-// overloaded for both FlatMapFunction and DoubleFlatMapFunction.
-public abstract class DoubleFlatMapFunction<T> extends AbstractFunction1<T, Iterable<Double>>
- implements Serializable {
-
- public abstract Iterable<Double> call(T t);
-
- @Override
- public final Iterable<Double> apply(T t) { return call(t); }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/api/java/function/DoubleFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/api/java/function/DoubleFunction.java b/core/src/main/scala/spark/api/java/function/DoubleFunction.java
deleted file mode 100644
index 1aa1e5d..0000000
--- a/core/src/main/scala/spark/api/java/function/DoubleFunction.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.api.java.function;
-
-
-import scala.runtime.AbstractFunction1;
-
-import java.io.Serializable;
-
-/**
- * A function that returns Doubles, and can be used to construct DoubleRDDs.
- */
-// DoubleFunction does not extend Function because some UDF functions, like map,
-// are overloaded for both Function and DoubleFunction.
-public abstract class DoubleFunction<T> extends WrappedFunction1<T, Double>
- implements Serializable {
-
- public abstract Double call(T t) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/api/java/function/FlatMapFunction.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/api/java/function/FlatMapFunction.scala b/core/src/main/scala/spark/api/java/function/FlatMapFunction.scala
deleted file mode 100644
index 9eb0cfe..0000000
--- a/core/src/main/scala/spark/api/java/function/FlatMapFunction.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.api.java.function
-
-/**
- * A function that returns zero or more output records from each input record.
- */
-abstract class FlatMapFunction[T, R] extends Function[T, java.lang.Iterable[R]] {
- @throws(classOf[Exception])
- def call(x: T) : java.lang.Iterable[R]
-
- def elementType() : ClassManifest[R] = ClassManifest.Any.asInstanceOf[ClassManifest[R]]
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/api/java/function/FlatMapFunction2.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/api/java/function/FlatMapFunction2.scala b/core/src/main/scala/spark/api/java/function/FlatMapFunction2.scala
deleted file mode 100644
index dda9871..0000000
--- a/core/src/main/scala/spark/api/java/function/FlatMapFunction2.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.api.java.function
-
-/**
- * A function that takes two inputs and returns zero or more output records.
- */
-abstract class FlatMapFunction2[A, B, C] extends Function2[A, B, java.lang.Iterable[C]] {
- @throws(classOf[Exception])
- def call(a: A, b:B) : java.lang.Iterable[C]
-
- def elementType() : ClassManifest[C] = ClassManifest.Any.asInstanceOf[ClassManifest[C]]
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/api/java/function/Function.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/api/java/function/Function.java b/core/src/main/scala/spark/api/java/function/Function.java
deleted file mode 100644
index 2a2ea0a..0000000
--- a/core/src/main/scala/spark/api/java/function/Function.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.api.java.function;
-
-import scala.reflect.ClassManifest;
-import scala.reflect.ClassManifest$;
-import scala.runtime.AbstractFunction1;
-
-import java.io.Serializable;
-
-
-/**
- * Base class for functions whose return types do not create special RDDs. PairFunction and
- * DoubleFunction are handled separately, to allow PairRDDs and DoubleRDDs to be constructed
- * when mapping RDDs of other types.
- */
-public abstract class Function<T, R> extends WrappedFunction1<T, R> implements Serializable {
- public abstract R call(T t) throws Exception;
-
- public ClassManifest<R> returnType() {
- return (ClassManifest<R>) ClassManifest$.MODULE$.fromClass(Object.class);
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/api/java/function/Function2.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/api/java/function/Function2.java b/core/src/main/scala/spark/api/java/function/Function2.java
deleted file mode 100644
index 952d31e..0000000
--- a/core/src/main/scala/spark/api/java/function/Function2.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.api.java.function;
-
-import scala.reflect.ClassManifest;
-import scala.reflect.ClassManifest$;
-import scala.runtime.AbstractFunction2;
-
-import java.io.Serializable;
-
-/**
- * A two-argument function that takes arguments of type T1 and T2 and returns an R.
- */
-public abstract class Function2<T1, T2, R> extends WrappedFunction2<T1, T2, R>
- implements Serializable {
-
- public abstract R call(T1 t1, T2 t2) throws Exception;
-
- public ClassManifest<R> returnType() {
- return (ClassManifest<R>) ClassManifest$.MODULE$.fromClass(Object.class);
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java b/core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java
deleted file mode 100644
index 4aad602..0000000
--- a/core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.api.java.function;
-
-import scala.Tuple2;
-import scala.reflect.ClassManifest;
-import scala.reflect.ClassManifest$;
-import scala.runtime.AbstractFunction1;
-
-import java.io.Serializable;
-
-/**
- * A function that returns zero or more key-value pair records from each input record. The
- * key-value pairs are represented as scala.Tuple2 objects.
- */
-// PairFlatMapFunction does not extend FlatMapFunction because flatMap is
-// overloaded for both FlatMapFunction and PairFlatMapFunction.
-public abstract class PairFlatMapFunction<T, K, V>
- extends WrappedFunction1<T, Iterable<Tuple2<K, V>>>
- implements Serializable {
-
- public abstract Iterable<Tuple2<K, V>> call(T t) throws Exception;
-
- public ClassManifest<K> keyType() {
- return (ClassManifest<K>) ClassManifest$.MODULE$.fromClass(Object.class);
- }
-
- public ClassManifest<V> valueType() {
- return (ClassManifest<V>) ClassManifest$.MODULE$.fromClass(Object.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/api/java/function/PairFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/api/java/function/PairFunction.java b/core/src/main/scala/spark/api/java/function/PairFunction.java
deleted file mode 100644
index ccfe64e..0000000
--- a/core/src/main/scala/spark/api/java/function/PairFunction.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.api.java.function;
-
-import scala.Tuple2;
-import scala.reflect.ClassManifest;
-import scala.reflect.ClassManifest$;
-import scala.runtime.AbstractFunction1;
-
-import java.io.Serializable;
-
-/**
- * A function that returns key-value pairs (Tuple2<K, V>), and can be used to construct PairRDDs.
- */
-// PairFunction does not extend Function because some UDF functions, like map,
-// are overloaded for both Function and PairFunction.
-public abstract class PairFunction<T, K, V>
- extends WrappedFunction1<T, Tuple2<K, V>>
- implements Serializable {
-
- public abstract Tuple2<K, V> call(T t) throws Exception;
-
- public ClassManifest<K> keyType() {
- return (ClassManifest<K>) ClassManifest$.MODULE$.fromClass(Object.class);
- }
-
- public ClassManifest<V> valueType() {
- return (ClassManifest<V>) ClassManifest$.MODULE$.fromClass(Object.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/api/java/function/VoidFunction.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/api/java/function/VoidFunction.scala b/core/src/main/scala/spark/api/java/function/VoidFunction.scala
deleted file mode 100644
index f6fc0b0..0000000
--- a/core/src/main/scala/spark/api/java/function/VoidFunction.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.api.java.function
-
-/**
- * A function with no return value.
- */
-// This allows Java users to write void methods without having to return Unit.
-abstract class VoidFunction[T] extends Serializable {
- @throws(classOf[Exception])
- def call(t: T) : Unit
-}
-
-// VoidFunction cannot extend AbstractFunction1 (because that would force users to explicitly
-// return Unit), so it is implicitly converted to a Function1[T, Unit]:
-object VoidFunction {
- implicit def toFunction[T](f: VoidFunction[T]) : Function1[T, Unit] = ((x : T) => f.call(x))
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/api/java/function/WrappedFunction1.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/api/java/function/WrappedFunction1.scala b/core/src/main/scala/spark/api/java/function/WrappedFunction1.scala
deleted file mode 100644
index 1758a38..0000000
--- a/core/src/main/scala/spark/api/java/function/WrappedFunction1.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.api.java.function
-
-import scala.runtime.AbstractFunction1
-
-/**
- * Subclass of Function1 for ease of calling from Java. The main thing it does is re-expose the
- * apply() method as call() and declare that it can throw Exception (since AbstractFunction1.apply
- * isn't marked to allow that).
- */
-private[spark] abstract class WrappedFunction1[T, R] extends AbstractFunction1[T, R] {
- @throws(classOf[Exception])
- def call(t: T): R
-
- final def apply(t: T): R = call(t)
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/api/java/function/WrappedFunction2.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/api/java/function/WrappedFunction2.scala b/core/src/main/scala/spark/api/java/function/WrappedFunction2.scala
deleted file mode 100644
index b093567..0000000
--- a/core/src/main/scala/spark/api/java/function/WrappedFunction2.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.api.java.function
-
-import scala.runtime.AbstractFunction2
-
-/**
- * Subclass of Function2 for ease of calling from Java. The main thing it does is re-expose the
- * apply() method as call() and declare that it can throw Exception (since AbstractFunction2.apply
- * isn't marked to allow that).
- */
-private[spark] abstract class WrappedFunction2[T1, T2, R] extends AbstractFunction2[T1, T2, R] {
- @throws(classOf[Exception])
- def call(t1: T1, t2: T2): R
-
- final def apply(t1: T1, t2: T2): R = call(t1, t2)
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/api/python/PythonPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/api/python/PythonPartitioner.scala b/core/src/main/scala/spark/api/python/PythonPartitioner.scala
deleted file mode 100644
index ac112b8..0000000
--- a/core/src/main/scala/spark/api/python/PythonPartitioner.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.api.python
-
-import spark.Partitioner
-import spark.Utils
-import java.util.Arrays
-
-/**
- * A [[spark.Partitioner]] that performs handling of byte arrays, for use by the Python API.
- *
- * Stores the unique id() of the Python-side partitioning function so that it is incorporated into
- * equality comparisons. Correctness requires that the id is a unique identifier for the
- * lifetime of the program (i.e. that it is not re-used as the id of a different partitioning
- * function). This can be ensured by using the Python id() function and maintaining a reference
- * to the Python partitioning function so that its id() is not reused.
- */
-private[spark] class PythonPartitioner(
- override val numPartitions: Int,
- val pyPartitionFunctionId: Long)
- extends Partitioner {
-
- override def getPartition(key: Any): Int = key match {
- case null => 0
- case key: Array[Byte] => Utils.nonNegativeMod(Arrays.hashCode(key), numPartitions)
- case _ => Utils.nonNegativeMod(key.hashCode(), numPartitions)
- }
-
- override def equals(other: Any): Boolean = other match {
- case h: PythonPartitioner =>
- h.numPartitions == numPartitions && h.pyPartitionFunctionId == pyPartitionFunctionId
- case _ =>
- false
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala
deleted file mode 100644
index 4967143..0000000
--- a/core/src/main/scala/spark/api/python/PythonRDD.scala
+++ /dev/null
@@ -1,344 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.api.python
-
-import java.io._
-import java.net._
-import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections}
-
-import scala.collection.JavaConversions._
-
-import spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
-import spark.broadcast.Broadcast
-import spark._
-import spark.rdd.PipedRDD
-
-
-private[spark] class PythonRDD[T: ClassManifest](
- parent: RDD[T],
- command: Seq[String],
- envVars: JMap[String, String],
- pythonIncludes: JList[String],
- preservePartitoning: Boolean,
- pythonExec: String,
- broadcastVars: JList[Broadcast[Array[Byte]]],
- accumulator: Accumulator[JList[Array[Byte]]])
- extends RDD[Array[Byte]](parent) {
-
- val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
-
- // Similar to Runtime.exec(), if we are given a single string, split it into words
- // using a standard StringTokenizer (i.e. by spaces)
- def this(parent: RDD[T], command: String, envVars: JMap[String, String],
- pythonIncludes: JList[String],
- preservePartitoning: Boolean, pythonExec: String,
- broadcastVars: JList[Broadcast[Array[Byte]]],
- accumulator: Accumulator[JList[Array[Byte]]]) =
- this(parent, PipedRDD.tokenize(command), envVars, pythonIncludes, preservePartitoning, pythonExec,
- broadcastVars, accumulator)
-
- override def getPartitions = parent.partitions
-
- override val partitioner = if (preservePartitoning) parent.partitioner else None
-
-
- override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {
- val startTime = System.currentTimeMillis
- val env = SparkEnv.get
- val worker = env.createPythonWorker(pythonExec, envVars.toMap)
-
- // Start a thread to feed the process input from our parent's iterator
- new Thread("stdin writer for " + pythonExec) {
- override def run() {
- try {
- SparkEnv.set(env)
- val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize)
- val dataOut = new DataOutputStream(stream)
- val printOut = new PrintWriter(stream)
- // Partition index
- dataOut.writeInt(split.index)
- // sparkFilesDir
- PythonRDD.writeAsPickle(SparkFiles.getRootDirectory, dataOut)
- // Broadcast variables
- dataOut.writeInt(broadcastVars.length)
- for (broadcast <- broadcastVars) {
- dataOut.writeLong(broadcast.id)
- dataOut.writeInt(broadcast.value.length)
- dataOut.write(broadcast.value)
- }
- // Python includes (*.zip and *.egg files)
- dataOut.writeInt(pythonIncludes.length)
- for (f <- pythonIncludes) {
- PythonRDD.writeAsPickle(f, dataOut)
- }
- dataOut.flush()
- // Serialized user code
- for (elem <- command) {
- printOut.println(elem)
- }
- printOut.flush()
- // Data values
- for (elem <- parent.iterator(split, context)) {
- PythonRDD.writeAsPickle(elem, dataOut)
- }
- dataOut.flush()
- printOut.flush()
- worker.shutdownOutput()
- } catch {
- case e: IOException =>
- // This can happen for legitimate reasons if the Python code stops returning data before we are done
- // passing elements through, e.g., for take(). Just log a message to say it happened.
- logInfo("stdin writer to Python finished early")
- logDebug("stdin writer to Python finished early", e)
- }
- }
- }.start()
-
- // Return an iterator that read lines from the process's stdout
- val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize))
- return new Iterator[Array[Byte]] {
- def next(): Array[Byte] = {
- val obj = _nextObj
- if (hasNext) {
- // FIXME: can deadlock if worker is waiting for us to
- // respond to current message (currently irrelevant because
- // output is shutdown before we read any input)
- _nextObj = read()
- }
- obj
- }
-
- private def read(): Array[Byte] = {
- try {
- stream.readInt() match {
- case length if length > 0 =>
- val obj = new Array[Byte](length)
- stream.readFully(obj)
- obj
- case -3 =>
- // Timing data from worker
- val bootTime = stream.readLong()
- val initTime = stream.readLong()
- val finishTime = stream.readLong()
- val boot = bootTime - startTime
- val init = initTime - bootTime
- val finish = finishTime - initTime
- val total = finishTime - startTime
- logInfo("Times: total = %s, boot = %s, init = %s, finish = %s".format(total, boot, init, finish))
- read
- case -2 =>
- // Signals that an exception has been thrown in python
- val exLength = stream.readInt()
- val obj = new Array[Byte](exLength)
- stream.readFully(obj)
- throw new PythonException(new String(obj))
- case -1 =>
- // We've finished the data section of the output, but we can still
- // read some accumulator updates; let's do that, breaking when we
- // get a negative length record.
- var len2 = stream.readInt()
- while (len2 >= 0) {
- val update = new Array[Byte](len2)
- stream.readFully(update)
- accumulator += Collections.singletonList(update)
- len2 = stream.readInt()
- }
- new Array[Byte](0)
- }
- } catch {
- case eof: EOFException => {
- throw new SparkException("Python worker exited unexpectedly (crashed)", eof)
- }
- case e => throw e
- }
- }
-
- var _nextObj = read()
-
- def hasNext = _nextObj.length != 0
- }
- }
-
- val asJavaRDD : JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this)
-}
-
-/** Thrown for exceptions in user Python code. */
-private class PythonException(msg: String) extends Exception(msg)
-
-/**
- * Form an RDD[(Array[Byte], Array[Byte])] from key-value pairs returned from Python.
- * This is used by PySpark's shuffle operations.
- */
-private class PairwiseRDD(prev: RDD[Array[Byte]]) extends
- RDD[(Array[Byte], Array[Byte])](prev) {
- override def getPartitions = prev.partitions
- override def compute(split: Partition, context: TaskContext) =
- prev.iterator(split, context).grouped(2).map {
- case Seq(a, b) => (a, b)
- case x => throw new SparkException("PairwiseRDD: unexpected value: " + x)
- }
- val asJavaPairRDD : JavaPairRDD[Array[Byte], Array[Byte]] = JavaPairRDD.fromRDD(this)
-}
-
-private[spark] object PythonRDD {
-
- /** Strips the pickle PROTO and STOP opcodes from the start and end of a pickle */
- def stripPickle(arr: Array[Byte]) : Array[Byte] = {
- arr.slice(2, arr.length - 1)
- }
-
- /**
- * Write strings, pickled Python objects, or pairs of pickled objects to a data output stream.
- * The data format is a 32-bit integer representing the pickled object's length (in bytes),
- * followed by the pickled data.
- *
- * Pickle module:
- *
- * http://docs.python.org/2/library/pickle.html
- *
- * The pickle protocol is documented in the source of the `pickle` and `pickletools` modules:
- *
- * http://hg.python.org/cpython/file/2.6/Lib/pickle.py
- * http://hg.python.org/cpython/file/2.6/Lib/pickletools.py
- *
- * @param elem the object to write
- * @param dOut a data output stream
- */
- def writeAsPickle(elem: Any, dOut: DataOutputStream) {
- if (elem.isInstanceOf[Array[Byte]]) {
- val arr = elem.asInstanceOf[Array[Byte]]
- dOut.writeInt(arr.length)
- dOut.write(arr)
- } else if (elem.isInstanceOf[scala.Tuple2[Array[Byte], Array[Byte]]]) {
- val t = elem.asInstanceOf[scala.Tuple2[Array[Byte], Array[Byte]]]
- val length = t._1.length + t._2.length - 3 - 3 + 4 // stripPickle() removes 3 bytes
- dOut.writeInt(length)
- dOut.writeByte(Pickle.PROTO)
- dOut.writeByte(Pickle.TWO)
- dOut.write(PythonRDD.stripPickle(t._1))
- dOut.write(PythonRDD.stripPickle(t._2))
- dOut.writeByte(Pickle.TUPLE2)
- dOut.writeByte(Pickle.STOP)
- } else if (elem.isInstanceOf[String]) {
- // For uniformity, strings are wrapped into Pickles.
- val s = elem.asInstanceOf[String].getBytes("UTF-8")
- val length = 2 + 1 + 4 + s.length + 1
- dOut.writeInt(length)
- dOut.writeByte(Pickle.PROTO)
- dOut.writeByte(Pickle.TWO)
- dOut.write(Pickle.BINUNICODE)
- dOut.writeInt(Integer.reverseBytes(s.length))
- dOut.write(s)
- dOut.writeByte(Pickle.STOP)
- } else {
- throw new SparkException("Unexpected RDD type")
- }
- }
-
- def readRDDFromPickleFile(sc: JavaSparkContext, filename: String, parallelism: Int) :
- JavaRDD[Array[Byte]] = {
- val file = new DataInputStream(new FileInputStream(filename))
- val objs = new collection.mutable.ArrayBuffer[Array[Byte]]
- try {
- while (true) {
- val length = file.readInt()
- val obj = new Array[Byte](length)
- file.readFully(obj)
- objs.append(obj)
- }
- } catch {
- case eof: EOFException => {}
- case e => throw e
- }
- JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
- }
-
- def writeIteratorToPickleFile[T](items: java.util.Iterator[T], filename: String) {
- import scala.collection.JavaConverters._
- writeIteratorToPickleFile(items.asScala, filename)
- }
-
- def writeIteratorToPickleFile[T](items: Iterator[T], filename: String) {
- val file = new DataOutputStream(new FileOutputStream(filename))
- for (item <- items) {
- writeAsPickle(item, file)
- }
- file.close()
- }
-
- def takePartition[T](rdd: RDD[T], partition: Int): Iterator[T] = {
- implicit val cm : ClassManifest[T] = rdd.elementClassManifest
- rdd.context.runJob(rdd, ((x: Iterator[T]) => x.toArray), Seq(partition), true).head.iterator
- }
-}
-
-private object Pickle {
- val PROTO: Byte = 0x80.toByte
- val TWO: Byte = 0x02.toByte
- val BINUNICODE: Byte = 'X'
- val STOP: Byte = '.'
- val TUPLE2: Byte = 0x86.toByte
- val EMPTY_LIST: Byte = ']'
- val MARK: Byte = '('
- val APPENDS: Byte = 'e'
-}
-
-private class BytesToString extends spark.api.java.function.Function[Array[Byte], String] {
- override def call(arr: Array[Byte]) : String = new String(arr, "UTF-8")
-}
-
-/**
- * Internal class that acts as an `AccumulatorParam` for Python accumulators. Inside, it
- * collects a list of pickled strings that we pass to Python through a socket.
- */
-class PythonAccumulatorParam(@transient serverHost: String, serverPort: Int)
- extends AccumulatorParam[JList[Array[Byte]]] {
-
- Utils.checkHost(serverHost, "Expected hostname")
-
- val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
-
- override def zero(value: JList[Array[Byte]]): JList[Array[Byte]] = new JArrayList
-
- override def addInPlace(val1: JList[Array[Byte]], val2: JList[Array[Byte]])
- : JList[Array[Byte]] = {
- if (serverHost == null) {
- // This happens on the worker node, where we just want to remember all the updates
- val1.addAll(val2)
- val1
- } else {
- // This happens on the master, where we pass the updates to Python through a socket
- val socket = new Socket(serverHost, serverPort)
- val in = socket.getInputStream
- val out = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream, bufferSize))
- out.writeInt(val2.size)
- for (array <- val2) {
- out.writeInt(array.length)
- out.write(array)
- }
- out.flush()
- // Wait for a byte from the Python side as an acknowledgement
- val byteRead = in.read()
- if (byteRead == -1) {
- throw new SparkException("EOF reached before Python server acknowledged")
- }
- socket.close()
- null
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala
deleted file mode 100644
index 14f8320..0000000
--- a/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.api.python
-
-import java.io.{File, DataInputStream, IOException}
-import java.net.{Socket, SocketException, InetAddress}
-
-import scala.collection.JavaConversions._
-
-import spark._
-
-private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String, String])
- extends Logging {
- var daemon: Process = null
- val daemonHost = InetAddress.getByAddress(Array(127, 0, 0, 1))
- var daemonPort: Int = 0
-
- def create(): Socket = {
- synchronized {
- // Start the daemon if it hasn't been started
- startDaemon()
-
- // Attempt to connect, restart and retry once if it fails
- try {
- new Socket(daemonHost, daemonPort)
- } catch {
- case exc: SocketException => {
- logWarning("Python daemon unexpectedly quit, attempting to restart")
- stopDaemon()
- startDaemon()
- new Socket(daemonHost, daemonPort)
- }
- case e => throw e
- }
- }
- }
-
- def stop() {
- stopDaemon()
- }
-
- private def startDaemon() {
- synchronized {
- // Is it already running?
- if (daemon != null) {
- return
- }
-
- try {
- // Create and start the daemon
- val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME")
- val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/daemon.py"))
- val workerEnv = pb.environment()
- workerEnv.putAll(envVars)
- val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH")
- workerEnv.put("PYTHONPATH", pythonPath)
- daemon = pb.start()
-
- // Redirect the stderr to ours
- new Thread("stderr reader for " + pythonExec) {
- override def run() {
- scala.util.control.Exception.ignoring(classOf[IOException]) {
- // FIXME HACK: We copy the stream on the level of bytes to
- // attempt to dodge encoding problems.
- val in = daemon.getErrorStream
- var buf = new Array[Byte](1024)
- var len = in.read(buf)
- while (len != -1) {
- System.err.write(buf, 0, len)
- len = in.read(buf)
- }
- }
- }
- }.start()
-
- val in = new DataInputStream(daemon.getInputStream)
- daemonPort = in.readInt()
-
- // Redirect further stdout output to our stderr
- new Thread("stdout reader for " + pythonExec) {
- override def run() {
- scala.util.control.Exception.ignoring(classOf[IOException]) {
- // FIXME HACK: We copy the stream on the level of bytes to
- // attempt to dodge encoding problems.
- var buf = new Array[Byte](1024)
- var len = in.read(buf)
- while (len != -1) {
- System.err.write(buf, 0, len)
- len = in.read(buf)
- }
- }
- }
- }.start()
- } catch {
- case e => {
- stopDaemon()
- throw e
- }
- }
-
- // Important: don't close daemon's stdin (daemon.getOutputStream) so it can correctly
- // detect our disappearance.
- }
- }
-
- private def stopDaemon() {
- synchronized {
- // Request shutdown of existing daemon by sending SIGTERM
- if (daemon != null) {
- daemon.destroy()
- }
-
- daemon = null
- daemonPort = 0
- }
- }
-}