You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:59:09 UTC

[25/69] [abbrv] [partial] Initial work to rename package to org.apache.spark

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/api/java/JavaSparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/spark/api/java/JavaSparkContext.scala
deleted file mode 100644
index 29d5700..0000000
--- a/core/src/main/scala/spark/api/java/JavaSparkContext.scala
+++ /dev/null
@@ -1,418 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.api.java
-
-import java.util.{Map => JMap}
-
-import scala.collection.JavaConversions
-import scala.collection.JavaConversions._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapred.InputFormat
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
-
-import spark.{Accumulable, AccumulableParam, Accumulator, AccumulatorParam, RDD, SparkContext}
-import spark.SparkContext.IntAccumulatorParam
-import spark.SparkContext.DoubleAccumulatorParam
-import spark.broadcast.Broadcast
-
-import com.google.common.base.Optional
-
-/**
- * A Java-friendly version of [[spark.SparkContext]] that returns [[spark.api.java.JavaRDD]]s and
- * works with Java collections instead of Scala ones.
- */
-class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWorkaround {
-
-  /**
-   * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
-   * @param appName A name for your application, to display on the cluster web UI
-   */
-  def this(master: String, appName: String) = this(new SparkContext(master, appName))
-
-  /**
-   * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
-   * @param appName A name for your application, to display on the cluster web UI
-   * @param sparkHome The SPARK_HOME directory on the slave nodes
-   * @param jarFile JAR file to send to the cluster. This can be a path on the local file system
-   *                or an HDFS, HTTP, HTTPS, or FTP URL.
-   */
-  def this(master: String, appName: String, sparkHome: String, jarFile: String) =
-    this(new SparkContext(master, appName, sparkHome, Seq(jarFile)))
-
-  /**
-   * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
-   * @param appName A name for your application, to display on the cluster web UI
-   * @param sparkHome The SPARK_HOME directory on the slave nodes
-   * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
-   *             system or HDFS, HTTP, HTTPS, or FTP URLs.
-   */
-  def this(master: String, appName: String, sparkHome: String, jars: Array[String]) =
-    this(new SparkContext(master, appName, sparkHome, jars.toSeq))
-
-  /**
-   * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
-   * @param appName A name for your application, to display on the cluster web UI
-   * @param sparkHome The SPARK_HOME directory on the slave nodes
-   * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
-   *             system or HDFS, HTTP, HTTPS, or FTP URLs.
-   * @param environment Environment variables to set on worker nodes
-   */
-  def this(master: String, appName: String, sparkHome: String, jars: Array[String],
-      environment: JMap[String, String]) =
-    this(new SparkContext(master, appName, sparkHome, jars.toSeq, environment))
-
-  private[spark] val env = sc.env
-
-  /** Distribute a local Scala collection to form an RDD. */
-  def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = {
-    implicit val cm: ClassManifest[T] =
-      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
-    sc.parallelize(JavaConversions.asScalaBuffer(list), numSlices)
-  }
-
-  /** Distribute a local Scala collection to form an RDD. */
-  def parallelize[T](list: java.util.List[T]): JavaRDD[T] =
-    parallelize(list, sc.defaultParallelism)
-
-  /** Distribute a local Scala collection to form an RDD. */
-  def parallelizePairs[K, V](list: java.util.List[Tuple2[K, V]], numSlices: Int)
-  : JavaPairRDD[K, V] = {
-    implicit val kcm: ClassManifest[K] =
-      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
-    implicit val vcm: ClassManifest[V] =
-      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
-    JavaPairRDD.fromRDD(sc.parallelize(JavaConversions.asScalaBuffer(list), numSlices))
-  }
-
-  /** Distribute a local Scala collection to form an RDD. */
-  def parallelizePairs[K, V](list: java.util.List[Tuple2[K, V]]): JavaPairRDD[K, V] =
-    parallelizePairs(list, sc.defaultParallelism)
-
-  /** Distribute a local Scala collection to form an RDD. */
-  def parallelizeDoubles(list: java.util.List[java.lang.Double], numSlices: Int): JavaDoubleRDD =
-    JavaDoubleRDD.fromRDD(sc.parallelize(JavaConversions.asScalaBuffer(list).map(_.doubleValue()),
-      numSlices))
-
-  /** Distribute a local Scala collection to form an RDD. */
-  def parallelizeDoubles(list: java.util.List[java.lang.Double]): JavaDoubleRDD =
-    parallelizeDoubles(list, sc.defaultParallelism)
-
-  /**
-   * Read a text file from HDFS, a local file system (available on all nodes), or any
-   * Hadoop-supported file system URI, and return it as an RDD of Strings.
-   */
-  def textFile(path: String): JavaRDD[String] = sc.textFile(path)
-
-  /**
-   * Read a text file from HDFS, a local file system (available on all nodes), or any
-   * Hadoop-supported file system URI, and return it as an RDD of Strings.
-   */
-  def textFile(path: String, minSplits: Int): JavaRDD[String] = sc.textFile(path, minSplits)
-
-  /**Get an RDD for a Hadoop SequenceFile with given key and value types. */
-  def sequenceFile[K, V](path: String,
-    keyClass: Class[K],
-    valueClass: Class[V],
-    minSplits: Int
-    ): JavaPairRDD[K, V] = {
-    implicit val kcm = ClassManifest.fromClass(keyClass)
-    implicit val vcm = ClassManifest.fromClass(valueClass)
-    new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minSplits))
-  }
-
-  /**Get an RDD for a Hadoop SequenceFile. */
-  def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]):
-  JavaPairRDD[K, V] = {
-    implicit val kcm = ClassManifest.fromClass(keyClass)
-    implicit val vcm = ClassManifest.fromClass(valueClass)
-    new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass))
-  }
-
-  /**
-   * Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and
-   * BytesWritable values that contain a serialized partition. This is still an experimental storage
-   * format and may not be supported exactly as is in future Spark releases. It will also be pretty
-   * slow if you use the default serializer (Java serialization), though the nice thing about it is
-   * that there's very little effort required to save arbitrary objects.
-   */
-  def objectFile[T](path: String, minSplits: Int): JavaRDD[T] = {
-    implicit val cm: ClassManifest[T] =
-      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
-    sc.objectFile(path, minSplits)(cm)
-  }
-
-  /**
-   * Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and
-   * BytesWritable values that contain a serialized partition. This is still an experimental storage
-   * format and may not be supported exactly as is in future Spark releases. It will also be pretty
-   * slow if you use the default serializer (Java serialization), though the nice thing about it is
-   * that there's very little effort required to save arbitrary objects.
-   */
-  def objectFile[T](path: String): JavaRDD[T] = {
-    implicit val cm: ClassManifest[T] =
-      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
-    sc.objectFile(path)(cm)
-  }
-
-  /**
-   * Get an RDD for a Hadoop-readable dataset from a Hadooop JobConf giving its InputFormat and any
-   * other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
-   * etc).
-   */
-  def hadoopRDD[K, V, F <: InputFormat[K, V]](
-    conf: JobConf,
-    inputFormatClass: Class[F],
-    keyClass: Class[K],
-    valueClass: Class[V],
-    minSplits: Int
-    ): JavaPairRDD[K, V] = {
-    implicit val kcm = ClassManifest.fromClass(keyClass)
-    implicit val vcm = ClassManifest.fromClass(valueClass)
-    new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minSplits))
-  }
-
-  /**
-   * Get an RDD for a Hadoop-readable dataset from a Hadooop JobConf giving its InputFormat and any
-   * other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
-   * etc).
-   */
-  def hadoopRDD[K, V, F <: InputFormat[K, V]](
-    conf: JobConf,
-    inputFormatClass: Class[F],
-    keyClass: Class[K],
-    valueClass: Class[V]
-    ): JavaPairRDD[K, V] = {
-    implicit val kcm = ClassManifest.fromClass(keyClass)
-    implicit val vcm = ClassManifest.fromClass(valueClass)
-    new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass))
-  }
-
-  /** Get an RDD for a Hadoop file with an arbitrary InputFormat */
-  def hadoopFile[K, V, F <: InputFormat[K, V]](
-    path: String,
-    inputFormatClass: Class[F],
-    keyClass: Class[K],
-    valueClass: Class[V],
-    minSplits: Int
-    ): JavaPairRDD[K, V] = {
-    implicit val kcm = ClassManifest.fromClass(keyClass)
-    implicit val vcm = ClassManifest.fromClass(valueClass)
-    new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits))
-  }
-
-  /** Get an RDD for a Hadoop file with an arbitrary InputFormat */
-  def hadoopFile[K, V, F <: InputFormat[K, V]](
-    path: String,
-    inputFormatClass: Class[F],
-    keyClass: Class[K],
-    valueClass: Class[V]
-    ): JavaPairRDD[K, V] = {
-    implicit val kcm = ClassManifest.fromClass(keyClass)
-    implicit val vcm = ClassManifest.fromClass(valueClass)
-    new JavaPairRDD(sc.hadoopFile(path,
-      inputFormatClass, keyClass, valueClass))
-  }
-
-  /**
-   * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
-   * and extra configuration options to pass to the input format.
-   */
-  def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
-    path: String,
-    fClass: Class[F],
-    kClass: Class[K],
-    vClass: Class[V],
-    conf: Configuration): JavaPairRDD[K, V] = {
-    implicit val kcm = ClassManifest.fromClass(kClass)
-    implicit val vcm = ClassManifest.fromClass(vClass)
-    new JavaPairRDD(sc.newAPIHadoopFile(path, fClass, kClass, vClass, conf))
-  }
-
-  /**
-   * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
-   * and extra configuration options to pass to the input format.
-   */
-  def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
-    conf: Configuration,
-    fClass: Class[F],
-    kClass: Class[K],
-    vClass: Class[V]): JavaPairRDD[K, V] = {
-    implicit val kcm = ClassManifest.fromClass(kClass)
-    implicit val vcm = ClassManifest.fromClass(vClass)
-    new JavaPairRDD(sc.newAPIHadoopRDD(conf, fClass, kClass, vClass))
-  }
-
-  /** Build the union of two or more RDDs. */
-  override def union[T](first: JavaRDD[T], rest: java.util.List[JavaRDD[T]]): JavaRDD[T] = {
-    val rdds: Seq[RDD[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd)
-    implicit val cm: ClassManifest[T] = first.classManifest
-    sc.union(rdds)(cm)
-  }
-
-  /** Build the union of two or more RDDs. */
-  override def union[K, V](first: JavaPairRDD[K, V], rest: java.util.List[JavaPairRDD[K, V]])
-      : JavaPairRDD[K, V] = {
-    val rdds: Seq[RDD[(K, V)]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd)
-    implicit val cm: ClassManifest[(K, V)] = first.classManifest
-    implicit val kcm: ClassManifest[K] = first.kManifest
-    implicit val vcm: ClassManifest[V] = first.vManifest
-    new JavaPairRDD(sc.union(rdds)(cm))(kcm, vcm)
-  }
-
-  /** Build the union of two or more RDDs. */
-  override def union(first: JavaDoubleRDD, rest: java.util.List[JavaDoubleRDD]): JavaDoubleRDD = {
-    val rdds: Seq[RDD[Double]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.srdd)
-    new JavaDoubleRDD(sc.union(rdds))
-  }
-
-  /**
-   * Create an [[spark.Accumulator]] integer variable, which tasks can "add" values
-   * to using the `add` method. Only the master can access the accumulator's `value`.
-   */
-  def intAccumulator(initialValue: Int): Accumulator[java.lang.Integer] =
-    sc.accumulator(initialValue)(IntAccumulatorParam).asInstanceOf[Accumulator[java.lang.Integer]]
-
-  /**
-   * Create an [[spark.Accumulator]] double variable, which tasks can "add" values
-   * to using the `add` method. Only the master can access the accumulator's `value`.
-   */
-  def doubleAccumulator(initialValue: Double): Accumulator[java.lang.Double] =
-    sc.accumulator(initialValue)(DoubleAccumulatorParam).asInstanceOf[Accumulator[java.lang.Double]]
-
-  /**
-   * Create an [[spark.Accumulator]] integer variable, which tasks can "add" values
-   * to using the `add` method. Only the master can access the accumulator's `value`.
-   */
-  def accumulator(initialValue: Int): Accumulator[java.lang.Integer] = intAccumulator(initialValue)
-
-  /**
-   * Create an [[spark.Accumulator]] double variable, which tasks can "add" values
-   * to using the `add` method. Only the master can access the accumulator's `value`.
-   */
-  def accumulator(initialValue: Double): Accumulator[java.lang.Double] =
-    doubleAccumulator(initialValue)
-
-  /**
-   * Create an [[spark.Accumulator]] variable of a given type, which tasks can "add" values
-   * to using the `add` method. Only the master can access the accumulator's `value`.
-   */
-  def accumulator[T](initialValue: T, accumulatorParam: AccumulatorParam[T]): Accumulator[T] =
-    sc.accumulator(initialValue)(accumulatorParam)
-
-  /**
-   * Create an [[spark.Accumulable]] shared variable of the given type, to which tasks can
-   * "add" values with `add`. Only the master can access the accumuable's `value`.
-   */
-  def accumulable[T, R](initialValue: T, param: AccumulableParam[T, R]): Accumulable[T, R] =
-    sc.accumulable(initialValue)(param)
-
-  /**
-   * Broadcast a read-only variable to the cluster, returning a [[spark.Broadcast]] object for
-   * reading it in distributed functions. The variable will be sent to each cluster only once.
-   */
-  def broadcast[T](value: T): Broadcast[T] = sc.broadcast(value)
-
-  /** Shut down the SparkContext. */
-  def stop() {
-    sc.stop()
-  }
-
-  /**
-   * Get Spark's home location from either a value set through the constructor,
-   * or the spark.home Java property, or the SPARK_HOME environment variable
-   * (in that order of preference). If neither of these is set, return None.
-   */
-  def getSparkHome(): Optional[String] = JavaUtils.optionToOptional(sc.getSparkHome())
-
-  /**
-   * Add a file to be downloaded with this Spark job on every node.
-   * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
-   * filesystems), or an HTTP, HTTPS or FTP URI.  To access the file in Spark jobs,
-   * use `SparkFiles.get(path)` to find its download location.
-   */
-  def addFile(path: String) {
-    sc.addFile(path)
-  }
-
-  /**
-   * Adds a JAR dependency for all tasks to be executed on this SparkContext in the future.
-   * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
-   * filesystems), or an HTTP, HTTPS or FTP URI.
-   */
-  def addJar(path: String) {
-    sc.addJar(path)
-  }
-
-  /**
-   * Clear the job's list of JARs added by `addJar` so that they do not get downloaded to
-   * any new nodes.
-   */
-  def clearJars() {
-    sc.clearJars()
-  }
-
-  /**
-   * Clear the job's list of files added by `addFile` so that they do not get downloaded to
-   * any new nodes.
-   */
-  def clearFiles() {
-    sc.clearFiles()
-  }
-
-  /**
-   * Returns the Hadoop configuration used for the Hadoop code (e.g. file systems) we reuse.
-   */
-  def hadoopConfiguration(): Configuration = {
-    sc.hadoopConfiguration
-  }
-
-  /**
-   * Set the directory under which RDDs are going to be checkpointed. The directory must
-   * be a HDFS path if running on a cluster. If the directory does not exist, it will
-   * be created. If the directory exists and useExisting is set to true, then the
-   * exisiting directory will be used. Otherwise an exception will be thrown to
-   * prevent accidental overriding of checkpoint files in the existing directory.
-   */
-  def setCheckpointDir(dir: String, useExisting: Boolean) {
-    sc.setCheckpointDir(dir, useExisting)
-  }
-
-  /**
-   * Set the directory under which RDDs are going to be checkpointed. The directory must
-   * be a HDFS path if running on a cluster. If the directory does not exist, it will
-   * be created. If the directory exists, an exception will be thrown to prevent accidental
-   * overriding of checkpoint files.
-   */
-  def setCheckpointDir(dir: String) {
-    sc.setCheckpointDir(dir)
-  }
-
-  protected def checkpointFile[T](path: String): JavaRDD[T] = {
-    implicit val cm: ClassManifest[T] =
-      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
-    new JavaRDD(sc.checkpointFile(path))
-  }
-}
-
-object JavaSparkContext {
-  implicit def fromSparkContext(sc: SparkContext): JavaSparkContext = new JavaSparkContext(sc)
-
-  implicit def toSparkContext(jsc: JavaSparkContext): SparkContext = jsc.sc
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/api/java/JavaSparkContextVarargsWorkaround.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/api/java/JavaSparkContextVarargsWorkaround.java b/core/src/main/scala/spark/api/java/JavaSparkContextVarargsWorkaround.java
deleted file mode 100644
index 42b1de0..0000000
--- a/core/src/main/scala/spark/api/java/JavaSparkContextVarargsWorkaround.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.api.java;
-
-import java.util.Arrays;
-import java.util.ArrayList;
-import java.util.List;
-
-// See
-// http://scala-programming-language.1934581.n4.nabble.com/Workaround-for-implementing-java-varargs-in-2-7-2-final-tp1944767p1944772.html
-abstract class JavaSparkContextVarargsWorkaround {
-  public <T> JavaRDD<T> union(JavaRDD<T>... rdds) {
-    if (rdds.length == 0) {
-      throw new IllegalArgumentException("Union called on empty list");
-    }
-    ArrayList<JavaRDD<T>> rest = new ArrayList<JavaRDD<T>>(rdds.length - 1);
-    for (int i = 1; i < rdds.length; i++) {
-      rest.add(rdds[i]);
-    }
-    return union(rdds[0], rest);
-  }
-
-  public JavaDoubleRDD union(JavaDoubleRDD... rdds) {
-    if (rdds.length == 0) {
-      throw new IllegalArgumentException("Union called on empty list");
-    }
-    ArrayList<JavaDoubleRDD> rest = new ArrayList<JavaDoubleRDD>(rdds.length - 1);
-    for (int i = 1; i < rdds.length; i++) {
-      rest.add(rdds[i]);
-    }
-    return union(rdds[0], rest);
-  }
-
-  public <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V>... rdds) {
-    if (rdds.length == 0) {
-      throw new IllegalArgumentException("Union called on empty list");
-    }
-    ArrayList<JavaPairRDD<K, V>> rest = new ArrayList<JavaPairRDD<K, V>>(rdds.length - 1);
-    for (int i = 1; i < rdds.length; i++) {
-      rest.add(rdds[i]);
-    }
-    return union(rdds[0], rest);
-  }
-
-  // These methods take separate "first" and "rest" elements to avoid having the same type erasure
-  abstract public <T> JavaRDD<T> union(JavaRDD<T> first, List<JavaRDD<T>> rest);
-  abstract public JavaDoubleRDD union(JavaDoubleRDD first, List<JavaDoubleRDD> rest);
-  abstract public <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V> first, List<JavaPairRDD<K, V>> rest);
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/api/java/JavaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/api/java/JavaUtils.scala b/core/src/main/scala/spark/api/java/JavaUtils.scala
deleted file mode 100644
index ffc131a..0000000
--- a/core/src/main/scala/spark/api/java/JavaUtils.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.api.java
-
-import com.google.common.base.Optional
-
-object JavaUtils {
-  def optionToOptional[T](option: Option[T]): Optional[T] =
-    option match {
-      case Some(value) => Optional.of(value)
-      case None => Optional.absent()
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/api/java/StorageLevels.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/api/java/StorageLevels.java b/core/src/main/scala/spark/api/java/StorageLevels.java
deleted file mode 100644
index f385636..0000000
--- a/core/src/main/scala/spark/api/java/StorageLevels.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.api.java;
-
-import spark.storage.StorageLevel;
-
-/**
- * Expose some commonly useful storage level constants.
- */
-public class StorageLevels {
-  public static final StorageLevel NONE = new StorageLevel(false, false, false, 1);
-  public static final StorageLevel DISK_ONLY = new StorageLevel(true, false, false, 1);
-  public static final StorageLevel DISK_ONLY_2 = new StorageLevel(true, false, false, 2);
-  public static final StorageLevel MEMORY_ONLY = new StorageLevel(false, true, true, 1);
-  public static final StorageLevel MEMORY_ONLY_2 = new StorageLevel(false, true, true, 2);
-  public static final StorageLevel MEMORY_ONLY_SER = new StorageLevel(false, true, false, 1);
-  public static final StorageLevel MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, 2);
-  public static final StorageLevel MEMORY_AND_DISK = new StorageLevel(true, true, true, 1);
-  public static final StorageLevel MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2);
-  public static final StorageLevel MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, 1);
-  public static final StorageLevel MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2);
-
-  /**
-   * Create a new StorageLevel object.
-   * @param useDisk saved to disk, if true
-   * @param useMemory saved to memory, if true
-   * @param deserialized saved as deserialized objects, if true
-   * @param replication replication factor
-   */
-  public static StorageLevel create(boolean useDisk, boolean useMemory, boolean deserialized, int replication) {
-    return StorageLevel.apply(useDisk, useMemory, deserialized, replication);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/api/java/function/DoubleFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/api/java/function/DoubleFlatMapFunction.java b/core/src/main/scala/spark/api/java/function/DoubleFlatMapFunction.java
deleted file mode 100644
index 8bc88d7..0000000
--- a/core/src/main/scala/spark/api/java/function/DoubleFlatMapFunction.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.api.java.function;
-
-
-import scala.runtime.AbstractFunction1;
-
-import java.io.Serializable;
-
-/**
- * A function that returns zero or more records of type Double from each input record.
- */
-// DoubleFlatMapFunction does not extend FlatMapFunction because flatMap is
-// overloaded for both FlatMapFunction and DoubleFlatMapFunction.
-public abstract class DoubleFlatMapFunction<T> extends AbstractFunction1<T, Iterable<Double>>
-  implements Serializable {
-
-  public abstract Iterable<Double> call(T t);
-
-  @Override
-  public final Iterable<Double> apply(T t) { return call(t); }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/api/java/function/DoubleFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/api/java/function/DoubleFunction.java b/core/src/main/scala/spark/api/java/function/DoubleFunction.java
deleted file mode 100644
index 1aa1e5d..0000000
--- a/core/src/main/scala/spark/api/java/function/DoubleFunction.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.api.java.function;
-
-
-import scala.runtime.AbstractFunction1;
-
-import java.io.Serializable;
-
-/**
- * A function that returns Doubles, and can be used to construct DoubleRDDs.
- */
-// DoubleFunction does not extend Function because some UDF functions, like map,
-// are overloaded for both Function and DoubleFunction.
-public abstract class DoubleFunction<T> extends WrappedFunction1<T, Double>
-  implements Serializable {
-
-  public abstract Double call(T t) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/api/java/function/FlatMapFunction.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/api/java/function/FlatMapFunction.scala b/core/src/main/scala/spark/api/java/function/FlatMapFunction.scala
deleted file mode 100644
index 9eb0cfe..0000000
--- a/core/src/main/scala/spark/api/java/function/FlatMapFunction.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.api.java.function
-
-/**
- * A function that returns zero or more output records from each input record.
- */
-abstract class FlatMapFunction[T, R] extends Function[T, java.lang.Iterable[R]] {
-  @throws(classOf[Exception])
-  def call(x: T) : java.lang.Iterable[R]
-
-  def elementType() : ClassManifest[R] = ClassManifest.Any.asInstanceOf[ClassManifest[R]]
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/api/java/function/FlatMapFunction2.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/api/java/function/FlatMapFunction2.scala b/core/src/main/scala/spark/api/java/function/FlatMapFunction2.scala
deleted file mode 100644
index dda9871..0000000
--- a/core/src/main/scala/spark/api/java/function/FlatMapFunction2.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.api.java.function
-
-/**
- * A function that takes two inputs and returns zero or more output records.
- */
-abstract class FlatMapFunction2[A, B, C] extends Function2[A, B, java.lang.Iterable[C]] {
-  @throws(classOf[Exception])
-  def call(a: A, b:B) : java.lang.Iterable[C]
-
-  def elementType() : ClassManifest[C] = ClassManifest.Any.asInstanceOf[ClassManifest[C]]
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/api/java/function/Function.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/api/java/function/Function.java b/core/src/main/scala/spark/api/java/function/Function.java
deleted file mode 100644
index 2a2ea0a..0000000
--- a/core/src/main/scala/spark/api/java/function/Function.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.api.java.function;
-
-import scala.reflect.ClassManifest;
-import scala.reflect.ClassManifest$;
-import scala.runtime.AbstractFunction1;
-
-import java.io.Serializable;
-
-
-/**
- * Base class for functions whose return types do not create special RDDs. PairFunction and
- * DoubleFunction are handled separately, to allow PairRDDs and DoubleRDDs to be constructed
- * when mapping RDDs of other types.
- */
-public abstract class Function<T, R> extends WrappedFunction1<T, R> implements Serializable {
-  public abstract R call(T t) throws Exception;
-
-  public ClassManifest<R> returnType() {
-    return (ClassManifest<R>) ClassManifest$.MODULE$.fromClass(Object.class);
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/api/java/function/Function2.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/api/java/function/Function2.java b/core/src/main/scala/spark/api/java/function/Function2.java
deleted file mode 100644
index 952d31e..0000000
--- a/core/src/main/scala/spark/api/java/function/Function2.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.api.java.function;
-
-import scala.reflect.ClassManifest;
-import scala.reflect.ClassManifest$;
-import scala.runtime.AbstractFunction2;
-
-import java.io.Serializable;
-
-/**
- * A two-argument function that takes arguments of type T1 and T2 and returns an R.
- */
-public abstract class Function2<T1, T2, R> extends WrappedFunction2<T1, T2, R>
-  implements Serializable {
-
-  public abstract R call(T1 t1, T2 t2) throws Exception;
-
-  public ClassManifest<R> returnType() {
-    return (ClassManifest<R>) ClassManifest$.MODULE$.fromClass(Object.class);
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java b/core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java
deleted file mode 100644
index 4aad602..0000000
--- a/core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.api.java.function;
-
-import scala.Tuple2;
-import scala.reflect.ClassManifest;
-import scala.reflect.ClassManifest$;
-import scala.runtime.AbstractFunction1;
-
-import java.io.Serializable;
-
-/**
- * A function that returns zero or more key-value pair records from each input record. The
- * key-value pairs are represented as scala.Tuple2 objects.
- */
-// PairFlatMapFunction does not extend FlatMapFunction because flatMap is
-// overloaded for both FlatMapFunction and PairFlatMapFunction.
-public abstract class PairFlatMapFunction<T, K, V>
-  extends WrappedFunction1<T, Iterable<Tuple2<K, V>>>
-  implements Serializable {
-
-  public abstract Iterable<Tuple2<K, V>> call(T t) throws Exception;
-
-  public ClassManifest<K> keyType() {
-    return (ClassManifest<K>) ClassManifest$.MODULE$.fromClass(Object.class);
-  }
-
-  public ClassManifest<V> valueType() {
-    return (ClassManifest<V>) ClassManifest$.MODULE$.fromClass(Object.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/api/java/function/PairFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/api/java/function/PairFunction.java b/core/src/main/scala/spark/api/java/function/PairFunction.java
deleted file mode 100644
index ccfe64e..0000000
--- a/core/src/main/scala/spark/api/java/function/PairFunction.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.api.java.function;
-
-import scala.Tuple2;
-import scala.reflect.ClassManifest;
-import scala.reflect.ClassManifest$;
-import scala.runtime.AbstractFunction1;
-
-import java.io.Serializable;
-
-/**
- * A function that returns key-value pairs (Tuple2<K, V>), and can be used to construct PairRDDs.
- */
-// PairFunction does not extend Function because some UDF functions, like map,
-// are overloaded for both Function and PairFunction.
-public abstract class PairFunction<T, K, V>
-  extends WrappedFunction1<T, Tuple2<K, V>>
-  implements Serializable {
-
-  public abstract Tuple2<K, V> call(T t) throws Exception;
-
-  public ClassManifest<K> keyType() {
-    return (ClassManifest<K>) ClassManifest$.MODULE$.fromClass(Object.class);
-  }
-
-  public ClassManifest<V> valueType() {
-    return (ClassManifest<V>) ClassManifest$.MODULE$.fromClass(Object.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/api/java/function/VoidFunction.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/api/java/function/VoidFunction.scala b/core/src/main/scala/spark/api/java/function/VoidFunction.scala
deleted file mode 100644
index f6fc0b0..0000000
--- a/core/src/main/scala/spark/api/java/function/VoidFunction.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.api.java.function
-
-/**
- * A function with no return value.
- */
-// This allows Java users to write void methods without having to return Unit.
-abstract class VoidFunction[T] extends Serializable {
-  @throws(classOf[Exception])
-  def call(t: T) : Unit
-}
-
-// VoidFunction cannot extend AbstractFunction1 (because that would force users to explicitly
-// return Unit), so it is implicitly converted to a Function1[T, Unit]:
-object VoidFunction {
-  implicit def toFunction[T](f: VoidFunction[T]) : Function1[T, Unit] = ((x : T) => f.call(x))
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/api/java/function/WrappedFunction1.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/api/java/function/WrappedFunction1.scala b/core/src/main/scala/spark/api/java/function/WrappedFunction1.scala
deleted file mode 100644
index 1758a38..0000000
--- a/core/src/main/scala/spark/api/java/function/WrappedFunction1.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.api.java.function
-
-import scala.runtime.AbstractFunction1
-
-/**
- * Subclass of Function1 for ease of calling from Java. The main thing it does is re-expose the
- * apply() method as call() and declare that it can throw Exception (since AbstractFunction1.apply
- * isn't marked to allow that).
- */
-private[spark] abstract class WrappedFunction1[T, R] extends AbstractFunction1[T, R] {
-  @throws(classOf[Exception])
-  def call(t: T): R
-
-  final def apply(t: T): R = call(t)
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/api/java/function/WrappedFunction2.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/api/java/function/WrappedFunction2.scala b/core/src/main/scala/spark/api/java/function/WrappedFunction2.scala
deleted file mode 100644
index b093567..0000000
--- a/core/src/main/scala/spark/api/java/function/WrappedFunction2.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.api.java.function
-
-import scala.runtime.AbstractFunction2
-
-/**
- * Subclass of Function2 for ease of calling from Java. The main thing it does is re-expose the
- * apply() method as call() and declare that it can throw Exception (since AbstractFunction2.apply
- * isn't marked to allow that).
- */
-private[spark] abstract class WrappedFunction2[T1, T2, R] extends AbstractFunction2[T1, T2, R] {
-  @throws(classOf[Exception])
-  def call(t1: T1, t2: T2): R
-
-  final def apply(t1: T1, t2: T2): R = call(t1, t2)
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/api/python/PythonPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/api/python/PythonPartitioner.scala b/core/src/main/scala/spark/api/python/PythonPartitioner.scala
deleted file mode 100644
index ac112b8..0000000
--- a/core/src/main/scala/spark/api/python/PythonPartitioner.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.api.python
-
-import spark.Partitioner
-import spark.Utils
-import java.util.Arrays
-
-/**
- * A [[spark.Partitioner]] that performs handling of byte arrays, for use by the Python API.
- *
- * Stores the unique id() of the Python-side partitioning function so that it is incorporated into
- * equality comparisons.  Correctness requires that the id is a unique identifier for the
- * lifetime of the program (i.e. that it is not re-used as the id of a different partitioning
- * function).  This can be ensured by using the Python id() function and maintaining a reference
- * to the Python partitioning function so that its id() is not reused.
- */
-private[spark] class PythonPartitioner(
-  override val numPartitions: Int,
-  val pyPartitionFunctionId: Long)
-  extends Partitioner {
-
-  override def getPartition(key: Any): Int = key match {
-    case null => 0
-    case key: Array[Byte] => Utils.nonNegativeMod(Arrays.hashCode(key), numPartitions)
-    case _ => Utils.nonNegativeMod(key.hashCode(), numPartitions)
-  }
-
-  override def equals(other: Any): Boolean = other match {
-    case h: PythonPartitioner =>
-      h.numPartitions == numPartitions && h.pyPartitionFunctionId == pyPartitionFunctionId
-    case _ =>
-      false
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala
deleted file mode 100644
index 4967143..0000000
--- a/core/src/main/scala/spark/api/python/PythonRDD.scala
+++ /dev/null
@@ -1,344 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.api.python
-
-import java.io._
-import java.net._
-import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections}
-
-import scala.collection.JavaConversions._
-
-import spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
-import spark.broadcast.Broadcast
-import spark._
-import spark.rdd.PipedRDD
-
-
-private[spark] class PythonRDD[T: ClassManifest](
-    parent: RDD[T],
-    command: Seq[String],
-    envVars: JMap[String, String],
-    pythonIncludes: JList[String],
-    preservePartitoning: Boolean,
-    pythonExec: String,
-    broadcastVars: JList[Broadcast[Array[Byte]]],
-    accumulator: Accumulator[JList[Array[Byte]]])
-  extends RDD[Array[Byte]](parent) {
-
-  val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
-
-  // Similar to Runtime.exec(), if we are given a single string, split it into words
-  // using a standard StringTokenizer (i.e. by spaces)
-  def this(parent: RDD[T], command: String, envVars: JMap[String, String],
-      pythonIncludes: JList[String],
-      preservePartitoning: Boolean, pythonExec: String,
-      broadcastVars: JList[Broadcast[Array[Byte]]],
-      accumulator: Accumulator[JList[Array[Byte]]]) =
-    this(parent, PipedRDD.tokenize(command), envVars, pythonIncludes, preservePartitoning, pythonExec,
-      broadcastVars, accumulator)
-
-  override def getPartitions = parent.partitions
-
-  override val partitioner = if (preservePartitoning) parent.partitioner else None
-
-
-  override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {
-    val startTime = System.currentTimeMillis
-    val env = SparkEnv.get
-    val worker = env.createPythonWorker(pythonExec, envVars.toMap)
-
-    // Start a thread to feed the process input from our parent's iterator
-    new Thread("stdin writer for " + pythonExec) {
-      override def run() {
-        try {
-          SparkEnv.set(env)
-          val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize)
-          val dataOut = new DataOutputStream(stream)
-          val printOut = new PrintWriter(stream)
-          // Partition index
-          dataOut.writeInt(split.index)
-          // sparkFilesDir
-          PythonRDD.writeAsPickle(SparkFiles.getRootDirectory, dataOut)
-          // Broadcast variables
-          dataOut.writeInt(broadcastVars.length)
-          for (broadcast <- broadcastVars) {
-            dataOut.writeLong(broadcast.id)
-            dataOut.writeInt(broadcast.value.length)
-            dataOut.write(broadcast.value)
-          }
-          // Python includes (*.zip and *.egg files)
-          dataOut.writeInt(pythonIncludes.length)
-          for (f <- pythonIncludes) {
-            PythonRDD.writeAsPickle(f, dataOut)
-          }
-          dataOut.flush()
-          // Serialized user code
-          for (elem <- command) {
-            printOut.println(elem)
-          }
-          printOut.flush()
-          // Data values
-          for (elem <- parent.iterator(split, context)) {
-            PythonRDD.writeAsPickle(elem, dataOut)
-          }
-          dataOut.flush()
-          printOut.flush()
-          worker.shutdownOutput()
-        } catch {
-          case e: IOException =>
-            // This can happen for legitimate reasons if the Python code stops returning data before we are done
-            // passing elements through, e.g., for take(). Just log a message to say it happened.
-            logInfo("stdin writer to Python finished early")
-            logDebug("stdin writer to Python finished early", e)
-        }
-      }
-    }.start()
-
-    // Return an iterator that read lines from the process's stdout
-    val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize))
-    return new Iterator[Array[Byte]] {
-      def next(): Array[Byte] = {
-        val obj = _nextObj
-        if (hasNext) {
-          // FIXME: can deadlock if worker is waiting for us to
-          // respond to current message (currently irrelevant because
-          // output is shutdown before we read any input)
-          _nextObj = read()
-        }
-        obj
-      }
-
-      private def read(): Array[Byte] = {
-        try {
-          stream.readInt() match {
-            case length if length > 0 =>
-              val obj = new Array[Byte](length)
-              stream.readFully(obj)
-              obj
-            case -3 =>
-              // Timing data from worker
-              val bootTime = stream.readLong()
-              val initTime = stream.readLong()
-              val finishTime = stream.readLong()
-              val boot = bootTime - startTime
-              val init = initTime - bootTime
-              val finish = finishTime - initTime
-              val total = finishTime - startTime
-              logInfo("Times: total = %s, boot = %s, init = %s, finish = %s".format(total, boot, init, finish))
-              read
-            case -2 =>
-              // Signals that an exception has been thrown in python
-              val exLength = stream.readInt()
-              val obj = new Array[Byte](exLength)
-              stream.readFully(obj)
-              throw new PythonException(new String(obj))
-            case -1 =>
-              // We've finished the data section of the output, but we can still
-              // read some accumulator updates; let's do that, breaking when we
-              // get a negative length record.
-              var len2 = stream.readInt()
-              while (len2 >= 0) {
-                val update = new Array[Byte](len2)
-                stream.readFully(update)
-                accumulator += Collections.singletonList(update)
-                len2 = stream.readInt()
-              }
-              new Array[Byte](0)
-          }
-        } catch {
-          case eof: EOFException => {
-            throw new SparkException("Python worker exited unexpectedly (crashed)", eof)
-          }
-          case e => throw e
-        }
-      }
-
-      var _nextObj = read()
-
-      def hasNext = _nextObj.length != 0
-    }
-  }
-
-  val asJavaRDD : JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this)
-}
-
-/** Thrown for exceptions in user Python code. */
-private class PythonException(msg: String) extends Exception(msg)
-
-/**
- * Form an RDD[(Array[Byte], Array[Byte])] from key-value pairs returned from Python.
- * This is used by PySpark's shuffle operations.
- */
-private class PairwiseRDD(prev: RDD[Array[Byte]]) extends
-  RDD[(Array[Byte], Array[Byte])](prev) {
-  override def getPartitions = prev.partitions
-  override def compute(split: Partition, context: TaskContext) =
-    prev.iterator(split, context).grouped(2).map {
-      case Seq(a, b) => (a, b)
-      case x          => throw new SparkException("PairwiseRDD: unexpected value: " + x)
-    }
-  val asJavaPairRDD : JavaPairRDD[Array[Byte], Array[Byte]] = JavaPairRDD.fromRDD(this)
-}
-
-private[spark] object PythonRDD {
-
-  /** Strips the pickle PROTO and STOP opcodes from the start and end of a pickle */
-  def stripPickle(arr: Array[Byte]) : Array[Byte] = {
-    arr.slice(2, arr.length - 1)
-  }
-
-  /**
-   * Write strings, pickled Python objects, or pairs of pickled objects to a data output stream.
-   * The data format is a 32-bit integer representing the pickled object's length (in bytes),
-   * followed by the pickled data.
-   *
-   * Pickle module:
-   *
-   *    http://docs.python.org/2/library/pickle.html
-   *
-   * The pickle protocol is documented in the source of the `pickle` and `pickletools` modules:
-   *
-   *    http://hg.python.org/cpython/file/2.6/Lib/pickle.py
-   *    http://hg.python.org/cpython/file/2.6/Lib/pickletools.py
-   *
-   * @param elem the object to write
-   * @param dOut a data output stream
-   */
-  def writeAsPickle(elem: Any, dOut: DataOutputStream) {
-    if (elem.isInstanceOf[Array[Byte]]) {
-      val arr = elem.asInstanceOf[Array[Byte]]
-      dOut.writeInt(arr.length)
-      dOut.write(arr)
-    } else if (elem.isInstanceOf[scala.Tuple2[Array[Byte], Array[Byte]]]) {
-      val t = elem.asInstanceOf[scala.Tuple2[Array[Byte], Array[Byte]]]
-      val length = t._1.length + t._2.length - 3 - 3 + 4  // stripPickle() removes 3 bytes
-      dOut.writeInt(length)
-      dOut.writeByte(Pickle.PROTO)
-      dOut.writeByte(Pickle.TWO)
-      dOut.write(PythonRDD.stripPickle(t._1))
-      dOut.write(PythonRDD.stripPickle(t._2))
-      dOut.writeByte(Pickle.TUPLE2)
-      dOut.writeByte(Pickle.STOP)
-    } else if (elem.isInstanceOf[String]) {
-      // For uniformity, strings are wrapped into Pickles.
-      val s = elem.asInstanceOf[String].getBytes("UTF-8")
-      val length = 2 + 1 + 4 + s.length + 1
-      dOut.writeInt(length)
-      dOut.writeByte(Pickle.PROTO)
-      dOut.writeByte(Pickle.TWO)
-      dOut.write(Pickle.BINUNICODE)
-      dOut.writeInt(Integer.reverseBytes(s.length))
-      dOut.write(s)
-      dOut.writeByte(Pickle.STOP)
-    } else {
-      throw new SparkException("Unexpected RDD type")
-    }
-  }
-
-  def readRDDFromPickleFile(sc: JavaSparkContext, filename: String, parallelism: Int) :
-  JavaRDD[Array[Byte]] = {
-    val file = new DataInputStream(new FileInputStream(filename))
-    val objs = new collection.mutable.ArrayBuffer[Array[Byte]]
-    try {
-      while (true) {
-        val length = file.readInt()
-        val obj = new Array[Byte](length)
-        file.readFully(obj)
-        objs.append(obj)
-      }
-    } catch {
-      case eof: EOFException => {}
-      case e => throw e
-    }
-    JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
-  }
-
-  def writeIteratorToPickleFile[T](items: java.util.Iterator[T], filename: String) {
-    import scala.collection.JavaConverters._
-    writeIteratorToPickleFile(items.asScala, filename)
-  }
-
-  def writeIteratorToPickleFile[T](items: Iterator[T], filename: String) {
-    val file = new DataOutputStream(new FileOutputStream(filename))
-    for (item <- items) {
-      writeAsPickle(item, file)
-    }
-    file.close()
-  }
-
-  def takePartition[T](rdd: RDD[T], partition: Int): Iterator[T] = {
-    implicit val cm : ClassManifest[T] = rdd.elementClassManifest
-    rdd.context.runJob(rdd, ((x: Iterator[T]) => x.toArray), Seq(partition), true).head.iterator
-  }
-}
-
-private object Pickle {
-  val PROTO: Byte = 0x80.toByte
-  val TWO: Byte = 0x02.toByte
-  val BINUNICODE: Byte = 'X'
-  val STOP: Byte = '.'
-  val TUPLE2: Byte = 0x86.toByte
-  val EMPTY_LIST: Byte = ']'
-  val MARK: Byte = '('
-  val APPENDS: Byte = 'e'
-}
-
-private class BytesToString extends spark.api.java.function.Function[Array[Byte], String] {
-  override def call(arr: Array[Byte]) : String = new String(arr, "UTF-8")
-}
-
-/**
- * Internal class that acts as an `AccumulatorParam` for Python accumulators. Inside, it
- * collects a list of pickled strings that we pass to Python through a socket.
- */
-class PythonAccumulatorParam(@transient serverHost: String, serverPort: Int)
-  extends AccumulatorParam[JList[Array[Byte]]] {
-
-  Utils.checkHost(serverHost, "Expected hostname")
-
-  val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
-
-  override def zero(value: JList[Array[Byte]]): JList[Array[Byte]] = new JArrayList
-
-  override def addInPlace(val1: JList[Array[Byte]], val2: JList[Array[Byte]])
-      : JList[Array[Byte]] = {
-    if (serverHost == null) {
-      // This happens on the worker node, where we just want to remember all the updates
-      val1.addAll(val2)
-      val1
-    } else {
-      // This happens on the master, where we pass the updates to Python through a socket
-      val socket = new Socket(serverHost, serverPort)
-      val in = socket.getInputStream
-      val out = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream, bufferSize))
-      out.writeInt(val2.size)
-      for (array <- val2) {
-        out.writeInt(array.length)
-        out.write(array)
-      }
-      out.flush()
-      // Wait for a byte from the Python side as an acknowledgement
-      val byteRead = in.read()
-      if (byteRead == -1) {
-        throw new SparkException("EOF reached before Python server acknowledged")
-      }
-      socket.close()
-      null
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala
deleted file mode 100644
index 14f8320..0000000
--- a/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.api.python
-
-import java.io.{File, DataInputStream, IOException}
-import java.net.{Socket, SocketException, InetAddress}
-
-import scala.collection.JavaConversions._
-
-import spark._
-
-private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String, String])
-    extends Logging {
-  var daemon: Process = null
-  val daemonHost = InetAddress.getByAddress(Array(127, 0, 0, 1))
-  var daemonPort: Int = 0
-
-  def create(): Socket = {
-    synchronized {
-      // Start the daemon if it hasn't been started
-      startDaemon()
-
-      // Attempt to connect, restart and retry once if it fails
-      try {
-        new Socket(daemonHost, daemonPort)
-      } catch {
-        case exc: SocketException => {
-          logWarning("Python daemon unexpectedly quit, attempting to restart")
-          stopDaemon()
-          startDaemon()
-          new Socket(daemonHost, daemonPort)
-        }
-        case e => throw e
-      }
-    }
-  }
-
-  def stop() {
-    stopDaemon()
-  }
-
-  private def startDaemon() {
-    synchronized {
-      // Is it already running?
-      if (daemon != null) {
-        return
-      }
-
-      try {
-        // Create and start the daemon
-        val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME")
-        val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/daemon.py"))
-        val workerEnv = pb.environment()
-        workerEnv.putAll(envVars)
-        val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH")
-        workerEnv.put("PYTHONPATH", pythonPath)
-        daemon = pb.start()
-
-        // Redirect the stderr to ours
-        new Thread("stderr reader for " + pythonExec) {
-          override def run() {
-            scala.util.control.Exception.ignoring(classOf[IOException]) {
-              // FIXME HACK: We copy the stream on the level of bytes to
-              // attempt to dodge encoding problems.
-              val in = daemon.getErrorStream
-              var buf = new Array[Byte](1024)
-              var len = in.read(buf)
-              while (len != -1) {
-                System.err.write(buf, 0, len)
-                len = in.read(buf)
-              }
-            }
-          }
-        }.start()
-
-        val in = new DataInputStream(daemon.getInputStream)
-        daemonPort = in.readInt()
-
-        // Redirect further stdout output to our stderr
-        new Thread("stdout reader for " + pythonExec) {
-          override def run() {
-            scala.util.control.Exception.ignoring(classOf[IOException]) {
-              // FIXME HACK: We copy the stream on the level of bytes to
-              // attempt to dodge encoding problems.
-              var buf = new Array[Byte](1024)
-              var len = in.read(buf)
-              while (len != -1) {
-                System.err.write(buf, 0, len)
-                len = in.read(buf)
-              }
-            }
-          }
-        }.start()
-      } catch {
-        case e => {
-          stopDaemon()
-          throw e
-        }
-      }
-
-      // Important: don't close daemon's stdin (daemon.getOutputStream) so it can correctly
-      // detect our disappearance.
-    }
-  }
-
-  private def stopDaemon() {
-    synchronized {
-      // Request shutdown of existing daemon by sending SIGTERM
-      if (daemon != null) {
-        daemon.destroy()
-      }
-
-      daemon = null
-      daemonPort = 0
-    }
-  }
-}