You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:59:26 UTC

[42/69] [abbrv] [partial] Initial work to rename package to org.apache.spark

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
new file mode 100644
index 0000000..618a7b3
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java
+
+import java.util.{Map => JMap}
+
+import scala.collection.JavaConversions
+import scala.collection.JavaConversions._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapred.InputFormat
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
+
+import org.apache.spark.{Accumulable, AccumulableParam, Accumulator, AccumulatorParam, RDD, SparkContext}
+import org.apache.spark.SparkContext.IntAccumulatorParam
+import org.apache.spark.SparkContext.DoubleAccumulatorParam
+import org.apache.spark.broadcast.Broadcast
+
+import com.google.common.base.Optional
+
+/**
+ * A Java-friendly version of [[org.apache.spark.SparkContext]] that returns [[org.apache.spark.api.java.JavaRDD]]s and
+ * works with Java collections instead of Scala ones.
+ */
+class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWorkaround {
+
+  /**
+   * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
+   * @param appName A name for your application, to display on the cluster web UI
+   */
+  def this(master: String, appName: String) = this(new SparkContext(master, appName))
+
+  /**
+   * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
+   * @param appName A name for your application, to display on the cluster web UI
+   * @param sparkHome The SPARK_HOME directory on the slave nodes
+   * @param jarFile JAR file to send to the cluster. This can be a path on the local file system
+   *                or an HDFS, HTTP, HTTPS, or FTP URL.
+   */
+  def this(master: String, appName: String, sparkHome: String, jarFile: String) =
+    this(new SparkContext(master, appName, sparkHome, Seq(jarFile)))
+
+  /**
+   * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
+   * @param appName A name for your application, to display on the cluster web UI
+   * @param sparkHome The SPARK_HOME directory on the slave nodes
+   * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
+   *             system or HDFS, HTTP, HTTPS, or FTP URLs.
+   */
+  def this(master: String, appName: String, sparkHome: String, jars: Array[String]) =
+    this(new SparkContext(master, appName, sparkHome, jars.toSeq))
+
+  /**
+   * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
+   * @param appName A name for your application, to display on the cluster web UI
+   * @param sparkHome The SPARK_HOME directory on the slave nodes
+   * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
+   *             system or HDFS, HTTP, HTTPS, or FTP URLs.
+   * @param environment Environment variables to set on worker nodes
+   */
+  def this(master: String, appName: String, sparkHome: String, jars: Array[String],
+      environment: JMap[String, String]) =
+    this(new SparkContext(master, appName, sparkHome, jars.toSeq, environment))
+
+  private[spark] val env = sc.env
+
+  /** Distribute a local Scala collection to form an RDD. */
+  def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = {
+    implicit val cm: ClassManifest[T] =
+      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+    sc.parallelize(JavaConversions.asScalaBuffer(list), numSlices)
+  }
+
+  /** Distribute a local Scala collection to form an RDD. */
+  def parallelize[T](list: java.util.List[T]): JavaRDD[T] =
+    parallelize(list, sc.defaultParallelism)
+
+  /** Distribute a local Scala collection to form an RDD. */
+  def parallelizePairs[K, V](list: java.util.List[Tuple2[K, V]], numSlices: Int)
+  : JavaPairRDD[K, V] = {
+    implicit val kcm: ClassManifest[K] =
+      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
+    implicit val vcm: ClassManifest[V] =
+      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
+    JavaPairRDD.fromRDD(sc.parallelize(JavaConversions.asScalaBuffer(list), numSlices))
+  }
+
+  /** Distribute a local Scala collection to form an RDD. */
+  def parallelizePairs[K, V](list: java.util.List[Tuple2[K, V]]): JavaPairRDD[K, V] =
+    parallelizePairs(list, sc.defaultParallelism)
+
+  /** Distribute a local Scala collection to form an RDD. */
+  def parallelizeDoubles(list: java.util.List[java.lang.Double], numSlices: Int): JavaDoubleRDD =
+    JavaDoubleRDD.fromRDD(sc.parallelize(JavaConversions.asScalaBuffer(list).map(_.doubleValue()),
+      numSlices))
+
+  /** Distribute a local Scala collection to form an RDD. */
+  def parallelizeDoubles(list: java.util.List[java.lang.Double]): JavaDoubleRDD =
+    parallelizeDoubles(list, sc.defaultParallelism)
+
+  /**
+   * Read a text file from HDFS, a local file system (available on all nodes), or any
+   * Hadoop-supported file system URI, and return it as an RDD of Strings.
+   */
+  def textFile(path: String): JavaRDD[String] = sc.textFile(path)
+
+  /**
+   * Read a text file from HDFS, a local file system (available on all nodes), or any
+   * Hadoop-supported file system URI, and return it as an RDD of Strings.
+   */
+  def textFile(path: String, minSplits: Int): JavaRDD[String] = sc.textFile(path, minSplits)
+
+  /**Get an RDD for a Hadoop SequenceFile with given key and value types. */
+  def sequenceFile[K, V](path: String,
+    keyClass: Class[K],
+    valueClass: Class[V],
+    minSplits: Int
+    ): JavaPairRDD[K, V] = {
+    implicit val kcm = ClassManifest.fromClass(keyClass)
+    implicit val vcm = ClassManifest.fromClass(valueClass)
+    new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minSplits))
+  }
+
+  /**Get an RDD for a Hadoop SequenceFile. */
+  def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]):
+  JavaPairRDD[K, V] = {
+    implicit val kcm = ClassManifest.fromClass(keyClass)
+    implicit val vcm = ClassManifest.fromClass(valueClass)
+    new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass))
+  }
+
+  /**
+   * Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and
+   * BytesWritable values that contain a serialized partition. This is still an experimental storage
+   * format and may not be supported exactly as is in future Spark releases. It will also be pretty
+   * slow if you use the default serializer (Java serialization), though the nice thing about it is
+   * that there's very little effort required to save arbitrary objects.
+   */
+  def objectFile[T](path: String, minSplits: Int): JavaRDD[T] = {
+    implicit val cm: ClassManifest[T] =
+      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+    sc.objectFile(path, minSplits)(cm)
+  }
+
+  /**
+   * Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and
+   * BytesWritable values that contain a serialized partition. This is still an experimental storage
+   * format and may not be supported exactly as is in future Spark releases. It will also be pretty
+   * slow if you use the default serializer (Java serialization), though the nice thing about it is
+   * that there's very little effort required to save arbitrary objects.
+   */
+  def objectFile[T](path: String): JavaRDD[T] = {
+    implicit val cm: ClassManifest[T] =
+      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+    sc.objectFile(path)(cm)
+  }
+
+  /**
+   * Get an RDD for a Hadoop-readable dataset from a Hadooop JobConf giving its InputFormat and any
+   * other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
+   * etc).
+   */
+  def hadoopRDD[K, V, F <: InputFormat[K, V]](
+    conf: JobConf,
+    inputFormatClass: Class[F],
+    keyClass: Class[K],
+    valueClass: Class[V],
+    minSplits: Int
+    ): JavaPairRDD[K, V] = {
+    implicit val kcm = ClassManifest.fromClass(keyClass)
+    implicit val vcm = ClassManifest.fromClass(valueClass)
+    new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minSplits))
+  }
+
+  /**
+   * Get an RDD for a Hadoop-readable dataset from a Hadooop JobConf giving its InputFormat and any
+   * other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
+   * etc).
+   */
+  def hadoopRDD[K, V, F <: InputFormat[K, V]](
+    conf: JobConf,
+    inputFormatClass: Class[F],
+    keyClass: Class[K],
+    valueClass: Class[V]
+    ): JavaPairRDD[K, V] = {
+    implicit val kcm = ClassManifest.fromClass(keyClass)
+    implicit val vcm = ClassManifest.fromClass(valueClass)
+    new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass))
+  }
+
+  /** Get an RDD for a Hadoop file with an arbitrary InputFormat */
+  def hadoopFile[K, V, F <: InputFormat[K, V]](
+    path: String,
+    inputFormatClass: Class[F],
+    keyClass: Class[K],
+    valueClass: Class[V],
+    minSplits: Int
+    ): JavaPairRDD[K, V] = {
+    implicit val kcm = ClassManifest.fromClass(keyClass)
+    implicit val vcm = ClassManifest.fromClass(valueClass)
+    new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits))
+  }
+
+  /** Get an RDD for a Hadoop file with an arbitrary InputFormat */
+  def hadoopFile[K, V, F <: InputFormat[K, V]](
+    path: String,
+    inputFormatClass: Class[F],
+    keyClass: Class[K],
+    valueClass: Class[V]
+    ): JavaPairRDD[K, V] = {
+    implicit val kcm = ClassManifest.fromClass(keyClass)
+    implicit val vcm = ClassManifest.fromClass(valueClass)
+    new JavaPairRDD(sc.hadoopFile(path,
+      inputFormatClass, keyClass, valueClass))
+  }
+
+  /**
+   * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
+   * and extra configuration options to pass to the input format.
+   */
+  def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
+    path: String,
+    fClass: Class[F],
+    kClass: Class[K],
+    vClass: Class[V],
+    conf: Configuration): JavaPairRDD[K, V] = {
+    implicit val kcm = ClassManifest.fromClass(kClass)
+    implicit val vcm = ClassManifest.fromClass(vClass)
+    new JavaPairRDD(sc.newAPIHadoopFile(path, fClass, kClass, vClass, conf))
+  }
+
+  /**
+   * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
+   * and extra configuration options to pass to the input format.
+   */
+  def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
+    conf: Configuration,
+    fClass: Class[F],
+    kClass: Class[K],
+    vClass: Class[V]): JavaPairRDD[K, V] = {
+    implicit val kcm = ClassManifest.fromClass(kClass)
+    implicit val vcm = ClassManifest.fromClass(vClass)
+    new JavaPairRDD(sc.newAPIHadoopRDD(conf, fClass, kClass, vClass))
+  }
+
+  /** Build the union of two or more RDDs. */
+  override def union[T](first: JavaRDD[T], rest: java.util.List[JavaRDD[T]]): JavaRDD[T] = {
+    val rdds: Seq[RDD[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd)
+    implicit val cm: ClassManifest[T] = first.classManifest
+    sc.union(rdds)(cm)
+  }
+
+  /** Build the union of two or more RDDs. */
+  override def union[K, V](first: JavaPairRDD[K, V], rest: java.util.List[JavaPairRDD[K, V]])
+      : JavaPairRDD[K, V] = {
+    val rdds: Seq[RDD[(K, V)]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd)
+    implicit val cm: ClassManifest[(K, V)] = first.classManifest
+    implicit val kcm: ClassManifest[K] = first.kManifest
+    implicit val vcm: ClassManifest[V] = first.vManifest
+    new JavaPairRDD(sc.union(rdds)(cm))(kcm, vcm)
+  }
+
+  /** Build the union of two or more RDDs. */
+  override def union(first: JavaDoubleRDD, rest: java.util.List[JavaDoubleRDD]): JavaDoubleRDD = {
+    val rdds: Seq[RDD[Double]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.srdd)
+    new JavaDoubleRDD(sc.union(rdds))
+  }
+
+  /**
+   * Create an [[org.apache.spark.Accumulator]] integer variable, which tasks can "add" values
+   * to using the `add` method. Only the master can access the accumulator's `value`.
+   */
+  def intAccumulator(initialValue: Int): Accumulator[java.lang.Integer] =
+    sc.accumulator(initialValue)(IntAccumulatorParam).asInstanceOf[Accumulator[java.lang.Integer]]
+
+  /**
+   * Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values
+   * to using the `add` method. Only the master can access the accumulator's `value`.
+   */
+  def doubleAccumulator(initialValue: Double): Accumulator[java.lang.Double] =
+    sc.accumulator(initialValue)(DoubleAccumulatorParam).asInstanceOf[Accumulator[java.lang.Double]]
+
+  /**
+   * Create an [[org.apache.spark.Accumulator]] integer variable, which tasks can "add" values
+   * to using the `add` method. Only the master can access the accumulator's `value`.
+   */
+  def accumulator(initialValue: Int): Accumulator[java.lang.Integer] = intAccumulator(initialValue)
+
+  /**
+   * Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values
+   * to using the `add` method. Only the master can access the accumulator's `value`.
+   */
+  def accumulator(initialValue: Double): Accumulator[java.lang.Double] =
+    doubleAccumulator(initialValue)
+
+  /**
+   * Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add" values
+   * to using the `add` method. Only the master can access the accumulator's `value`.
+   */
+  def accumulator[T](initialValue: T, accumulatorParam: AccumulatorParam[T]): Accumulator[T] =
+    sc.accumulator(initialValue)(accumulatorParam)
+
+  /**
+   * Create an [[org.apache.spark.Accumulable]] shared variable of the given type, to which tasks can
+   * "add" values with `add`. Only the master can access the accumuable's `value`.
+   */
+  def accumulable[T, R](initialValue: T, param: AccumulableParam[T, R]): Accumulable[T, R] =
+    sc.accumulable(initialValue)(param)
+
+  /**
+   * Broadcast a read-only variable to the cluster, returning a [[org.apache.spark.Broadcast]] object for
+   * reading it in distributed functions. The variable will be sent to each cluster only once.
+   */
+  def broadcast[T](value: T): Broadcast[T] = sc.broadcast(value)
+
+  /** Shut down the SparkContext. */
+  def stop() {
+    sc.stop()
+  }
+
+  /**
+   * Get Spark's home location from either a value set through the constructor,
+   * or the spark.home Java property, or the SPARK_HOME environment variable
+   * (in that order of preference). If neither of these is set, return None.
+   */
+  def getSparkHome(): Optional[String] = JavaUtils.optionToOptional(sc.getSparkHome())
+
+  /**
+   * Add a file to be downloaded with this Spark job on every node.
+   * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
+   * filesystems), or an HTTP, HTTPS or FTP URI.  To access the file in Spark jobs,
+   * use `SparkFiles.get(path)` to find its download location.
+   */
+  def addFile(path: String) {
+    sc.addFile(path)
+  }
+
+  /**
+   * Adds a JAR dependency for all tasks to be executed on this SparkContext in the future.
+   * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
+   * filesystems), or an HTTP, HTTPS or FTP URI.
+   */
+  def addJar(path: String) {
+    sc.addJar(path)
+  }
+
+  /**
+   * Clear the job's list of JARs added by `addJar` so that they do not get downloaded to
+   * any new nodes.
+   */
+  def clearJars() {
+    sc.clearJars()
+  }
+
+  /**
+   * Clear the job's list of files added by `addFile` so that they do not get downloaded to
+   * any new nodes.
+   */
+  def clearFiles() {
+    sc.clearFiles()
+  }
+
+  /**
+   * Returns the Hadoop configuration used for the Hadoop code (e.g. file systems) we reuse.
+   */
+  def hadoopConfiguration(): Configuration = {
+    sc.hadoopConfiguration
+  }
+
+  /**
+   * Set the directory under which RDDs are going to be checkpointed. The directory must
+   * be a HDFS path if running on a cluster. If the directory does not exist, it will
+   * be created. If the directory exists and useExisting is set to true, then the
+   * exisiting directory will be used. Otherwise an exception will be thrown to
+   * prevent accidental overriding of checkpoint files in the existing directory.
+   */
+  def setCheckpointDir(dir: String, useExisting: Boolean) {
+    sc.setCheckpointDir(dir, useExisting)
+  }
+
+  /**
+   * Set the directory under which RDDs are going to be checkpointed. The directory must
+   * be a HDFS path if running on a cluster. If the directory does not exist, it will
+   * be created. If the directory exists, an exception will be thrown to prevent accidental
+   * overriding of checkpoint files.
+   */
+  def setCheckpointDir(dir: String) {
+    sc.setCheckpointDir(dir)
+  }
+
+  protected def checkpointFile[T](path: String): JavaRDD[T] = {
+    implicit val cm: ClassManifest[T] =
+      implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+    new JavaRDD(sc.checkpointFile(path))
+  }
+}
+
+object JavaSparkContext {
+  implicit def fromSparkContext(sc: SparkContext): JavaSparkContext = new JavaSparkContext(sc)
+
+  implicit def toSparkContext(jsc: JavaSparkContext): SparkContext = jsc.sc
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java
new file mode 100644
index 0000000..c9cbce5
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContextVarargsWorkaround.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java;
+
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.List;
+
+// See
+// http://scala-programming-language.1934581.n4.nabble.com/Workaround-for-implementing-java-varargs-in-2-7-2-final-tp1944767p1944772.html
+abstract class JavaSparkContextVarargsWorkaround {
+  public <T> JavaRDD<T> union(JavaRDD<T>... rdds) {
+    if (rdds.length == 0) {
+      throw new IllegalArgumentException("Union called on empty list");
+    }
+    ArrayList<JavaRDD<T>> rest = new ArrayList<JavaRDD<T>>(rdds.length - 1);
+    for (int i = 1; i < rdds.length; i++) {
+      rest.add(rdds[i]);
+    }
+    return union(rdds[0], rest);
+  }
+
+  public JavaDoubleRDD union(JavaDoubleRDD... rdds) {
+    if (rdds.length == 0) {
+      throw new IllegalArgumentException("Union called on empty list");
+    }
+    ArrayList<JavaDoubleRDD> rest = new ArrayList<JavaDoubleRDD>(rdds.length - 1);
+    for (int i = 1; i < rdds.length; i++) {
+      rest.add(rdds[i]);
+    }
+    return union(rdds[0], rest);
+  }
+
+  public <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V>... rdds) {
+    if (rdds.length == 0) {
+      throw new IllegalArgumentException("Union called on empty list");
+    }
+    ArrayList<JavaPairRDD<K, V>> rest = new ArrayList<JavaPairRDD<K, V>>(rdds.length - 1);
+    for (int i = 1; i < rdds.length; i++) {
+      rest.add(rdds[i]);
+    }
+    return union(rdds[0], rest);
+  }
+
+  // These methods take separate "first" and "rest" elements to avoid having the same type erasure
+  abstract public <T> JavaRDD<T> union(JavaRDD<T> first, List<JavaRDD<T>> rest);
+  abstract public JavaDoubleRDD union(JavaDoubleRDD first, List<JavaDoubleRDD> rest);
+  abstract public <K, V> JavaPairRDD<K, V> union(JavaPairRDD<K, V> first, List<JavaPairRDD<K, V>> rest);
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala b/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala
new file mode 100644
index 0000000..ecbf188
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java
+
+import com.google.common.base.Optional
+
+object JavaUtils {
+  def optionToOptional[T](option: Option[T]): Optional[T] =
+    option match {
+      case Some(value) => Optional.of(value)
+      case None => Optional.absent()
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/api/java/StorageLevels.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/StorageLevels.java b/core/src/main/scala/org/apache/spark/api/java/StorageLevels.java
new file mode 100644
index 0000000..0744269
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/java/StorageLevels.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java;
+
+import org.apache.spark.storage.StorageLevel;
+
+/**
+ * Expose some commonly useful storage level constants.
+ */
+public class StorageLevels {
+  public static final StorageLevel NONE = new StorageLevel(false, false, false, 1);
+  public static final StorageLevel DISK_ONLY = new StorageLevel(true, false, false, 1);
+  public static final StorageLevel DISK_ONLY_2 = new StorageLevel(true, false, false, 2);
+  public static final StorageLevel MEMORY_ONLY = new StorageLevel(false, true, true, 1);
+  public static final StorageLevel MEMORY_ONLY_2 = new StorageLevel(false, true, true, 2);
+  public static final StorageLevel MEMORY_ONLY_SER = new StorageLevel(false, true, false, 1);
+  public static final StorageLevel MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, 2);
+  public static final StorageLevel MEMORY_AND_DISK = new StorageLevel(true, true, true, 1);
+  public static final StorageLevel MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2);
+  public static final StorageLevel MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, 1);
+  public static final StorageLevel MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2);
+
+  /**
+   * Create a new StorageLevel object.
+   * @param useDisk saved to disk, if true
+   * @param useMemory saved to memory, if true
+   * @param deserialized saved as deserialized objects, if true
+   * @param replication replication factor
+   */
+  public static StorageLevel create(boolean useDisk, boolean useMemory, boolean deserialized, int replication) {
+    return StorageLevel.apply(useDisk, useMemory, deserialized, replication);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/DoubleFlatMapFunction.java b/core/src/main/scala/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
new file mode 100644
index 0000000..4830067
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function;
+
+
+import scala.runtime.AbstractFunction1;
+
+import java.io.Serializable;
+
+/**
+ * A function that returns zero or more records of type Double from each input record.
+ */
+// DoubleFlatMapFunction does not extend FlatMapFunction because flatMap is
+// overloaded for both FlatMapFunction and DoubleFlatMapFunction.
+public abstract class DoubleFlatMapFunction<T> extends AbstractFunction1<T, Iterable<Double>>
+  implements Serializable {
+
+  public abstract Iterable<Double> call(T t);
+
+  @Override
+  public final Iterable<Double> apply(T t) { return call(t); }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.java b/core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.java
new file mode 100644
index 0000000..db34cd1
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function;
+
+
+import scala.runtime.AbstractFunction1;
+
+import java.io.Serializable;
+
+/**
+ * A function that returns Doubles, and can be used to construct DoubleRDDs.
+ */
+// DoubleFunction does not extend Function because some UDF functions, like map,
+// are overloaded for both Function and DoubleFunction.
+public abstract class DoubleFunction<T> extends WrappedFunction1<T, Double>
+  implements Serializable {
+
+  public abstract Double call(T t) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction.scala
new file mode 100644
index 0000000..158539a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function
+
+/**
+ * A function that returns zero or more output records from each input record.
+ */
+abstract class FlatMapFunction[T, R] extends Function[T, java.lang.Iterable[R]] {
+  @throws(classOf[Exception])
+  def call(x: T) : java.lang.Iterable[R]
+
+  def elementType() : ClassManifest[R] = ClassManifest.Any.asInstanceOf[ClassManifest[R]]
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction2.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction2.scala b/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction2.scala
new file mode 100644
index 0000000..5ef6a81
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction2.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function
+
+/**
+ * A function that takes two inputs and returns zero or more output records.
+ */
+abstract class FlatMapFunction2[A, B, C] extends Function2[A, B, java.lang.Iterable[C]] {
+  @throws(classOf[Exception])
+  def call(a: A, b:B) : java.lang.Iterable[C]
+
+  def elementType() : ClassManifest[C] = ClassManifest.Any.asInstanceOf[ClassManifest[C]]
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/api/java/function/Function.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function.java b/core/src/main/scala/org/apache/spark/api/java/function/Function.java
new file mode 100644
index 0000000..b9070cf
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/java/function/Function.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function;
+
+import scala.reflect.ClassManifest;
+import scala.reflect.ClassManifest$;
+import scala.runtime.AbstractFunction1;
+
+import java.io.Serializable;
+
+
+/**
+ * Base class for functions whose return types do not create special RDDs. PairFunction and
+ * DoubleFunction are handled separately, to allow PairRDDs and DoubleRDDs to be constructed
+ * when mapping RDDs of other types.
+ */
+public abstract class Function<T, R> extends WrappedFunction1<T, R> implements Serializable {
+  public abstract R call(T t) throws Exception;
+
+  public ClassManifest<R> returnType() {
+    return (ClassManifest<R>) ClassManifest$.MODULE$.fromClass(Object.class);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/api/java/function/Function2.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function2.java b/core/src/main/scala/org/apache/spark/api/java/function/Function2.java
new file mode 100644
index 0000000..d4c9154
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/java/function/Function2.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function;
+
+import scala.reflect.ClassManifest;
+import scala.reflect.ClassManifest$;
+import scala.runtime.AbstractFunction2;
+
+import java.io.Serializable;
+
+/**
+ * A two-argument function that takes arguments of type T1 and T2 and returns an R.
+ */
+public abstract class Function2<T1, T2, R> extends WrappedFunction2<T1, T2, R>
+  implements Serializable {
+
+  public abstract R call(T1 t1, T2 t2) throws Exception;
+
+  public ClassManifest<R> returnType() {
+    return (ClassManifest<R>) ClassManifest$.MODULE$.fromClass(Object.class);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.java b/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.java
new file mode 100644
index 0000000..c0e5544
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function;
+
+import scala.Tuple2;
+import scala.reflect.ClassManifest;
+import scala.reflect.ClassManifest$;
+import scala.runtime.AbstractFunction1;
+
+import java.io.Serializable;
+
+/**
+ * A function that returns zero or more key-value pair records from each input record. The
+ * key-value pairs are represented as scala.Tuple2 objects.
+ */
+// PairFlatMapFunction does not extend FlatMapFunction because flatMap is
+// overloaded for both FlatMapFunction and PairFlatMapFunction.
+public abstract class PairFlatMapFunction<T, K, V>
+  extends WrappedFunction1<T, Iterable<Tuple2<K, V>>>
+  implements Serializable {
+
+  public abstract Iterable<Tuple2<K, V>> call(T t) throws Exception;
+
+  public ClassManifest<K> keyType() {
+    return (ClassManifest<K>) ClassManifest$.MODULE$.fromClass(Object.class);
+  }
+
+  public ClassManifest<V> valueType() {
+    return (ClassManifest<V>) ClassManifest$.MODULE$.fromClass(Object.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.java b/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.java
new file mode 100644
index 0000000..40480fe
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function;
+
+import scala.Tuple2;
+import scala.reflect.ClassManifest;
+import scala.reflect.ClassManifest$;
+import scala.runtime.AbstractFunction1;
+
+import java.io.Serializable;
+
+/**
+ * A function that returns key-value pairs (Tuple2<K, V>), and can be used to construct PairRDDs.
+ */
+// PairFunction does not extend Function because some UDF functions, like map,
+// are overloaded for both Function and PairFunction.
+public abstract class PairFunction<T, K, V>
+  extends WrappedFunction1<T, Tuple2<K, V>>
+  implements Serializable {
+
+  public abstract Tuple2<K, V> call(T t) throws Exception;
+
+  public ClassManifest<K> keyType() {
+    return (ClassManifest<K>) ClassManifest$.MODULE$.fromClass(Object.class);
+  }
+
+  public ClassManifest<V> valueType() {
+    return (ClassManifest<V>) ClassManifest$.MODULE$.fromClass(Object.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/api/java/function/VoidFunction.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/VoidFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/VoidFunction.scala
new file mode 100644
index 0000000..ea94313
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/java/function/VoidFunction.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function
+
+/**
+ * A function with no return value.
+ */
+// This allows Java users to write void methods without having to return Unit.
+abstract class VoidFunction[T] extends Serializable {
+  @throws(classOf[Exception])
+  def call(t: T) : Unit
+}
+
+// VoidFunction cannot extend AbstractFunction1 (because that would force users to explicitly
+// return Unit), so it is implicitly converted to a Function1[T, Unit]:
+object VoidFunction {
+  implicit def toFunction[T](f: VoidFunction[T]) : Function1[T, Unit] = ((x : T) => f.call(x))
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction1.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction1.scala b/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction1.scala
new file mode 100644
index 0000000..cfe694f
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction1.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function
+
+import scala.runtime.AbstractFunction1
+
+/**
+ * Subclass of Function1 for ease of calling from Java. The main thing it does is re-expose the
+ * apply() method as call() and declare that it can throw Exception (since AbstractFunction1.apply
+ * isn't marked to allow that).
+ */
+private[spark] abstract class WrappedFunction1[T, R] extends AbstractFunction1[T, R] {
+  @throws(classOf[Exception])
+  def call(t: T): R
+
+  final def apply(t: T): R = call(t)
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction2.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction2.scala b/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction2.scala
new file mode 100644
index 0000000..eb9277c
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction2.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function
+
+import scala.runtime.AbstractFunction2
+
+/**
+ * Subclass of Function2 for ease of calling from Java. The main thing it does is re-expose the
+ * apply() method as call() and declare that it can throw Exception (since AbstractFunction2.apply
+ * isn't marked to allow that).
+ */
+private[spark] abstract class WrappedFunction2[T1, T2, R] extends AbstractFunction2[T1, T2, R] {
+  @throws(classOf[Exception])
+  def call(t1: T1, t2: T2): R
+
+  final def apply(t1: T1, t2: T2): R = call(t1, t2)
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala
new file mode 100644
index 0000000..eea63d5
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import org.apache.spark.Partitioner
+import org.apache.spark.Utils
+import java.util.Arrays
+
+/**
+ * A [[org.apache.spark.Partitioner]] that performs handling of byte arrays, for use by the Python API.
+ *
+ * Stores the unique id() of the Python-side partitioning function so that it is incorporated into
+ * equality comparisons.  Correctness requires that the id is a unique identifier for the
+ * lifetime of the program (i.e. that it is not re-used as the id of a different partitioning
+ * function).  This can be ensured by using the Python id() function and maintaining a reference
+ * to the Python partitioning function so that its id() is not reused.
+ */
+private[spark] class PythonPartitioner(
+  override val numPartitions: Int,
+  val pyPartitionFunctionId: Long)
+  extends Partitioner {
+
+  override def getPartition(key: Any): Int = key match {
+    case null => 0
+    case key: Array[Byte] => Utils.nonNegativeMod(Arrays.hashCode(key), numPartitions)
+    case _ => Utils.nonNegativeMod(key.hashCode(), numPartitions)
+  }
+
+  override def equals(other: Any): Boolean = other match {
+    case h: PythonPartitioner =>
+      h.numPartitions == numPartitions && h.pyPartitionFunctionId == pyPartitionFunctionId
+    case _ =>
+      false
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
new file mode 100644
index 0000000..621f0fe
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io._
+import java.net._
+import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections}
+
+import scala.collection.JavaConversions._
+
+import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark._
+import org.apache.spark.rdd.PipedRDD
+
+
+private[spark] class PythonRDD[T: ClassManifest](
+    parent: RDD[T],
+    command: Seq[String],
+    envVars: JMap[String, String],
+    pythonIncludes: JList[String],
+    preservePartitoning: Boolean,
+    pythonExec: String,
+    broadcastVars: JList[Broadcast[Array[Byte]]],
+    accumulator: Accumulator[JList[Array[Byte]]])
+  extends RDD[Array[Byte]](parent) {
+
+  val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
+
+  // Similar to Runtime.exec(), if we are given a single string, split it into words
+  // using a standard StringTokenizer (i.e. by spaces)
+  def this(parent: RDD[T], command: String, envVars: JMap[String, String],
+      pythonIncludes: JList[String],
+      preservePartitoning: Boolean, pythonExec: String,
+      broadcastVars: JList[Broadcast[Array[Byte]]],
+      accumulator: Accumulator[JList[Array[Byte]]]) =
+    this(parent, PipedRDD.tokenize(command), envVars, pythonIncludes, preservePartitoning, pythonExec,
+      broadcastVars, accumulator)
+
+  override def getPartitions = parent.partitions
+
+  override val partitioner = if (preservePartitoning) parent.partitioner else None
+
+
+  override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {
+    val startTime = System.currentTimeMillis
+    val env = SparkEnv.get
+    val worker = env.createPythonWorker(pythonExec, envVars.toMap)
+
+    // Start a thread to feed the process input from our parent's iterator
+    new Thread("stdin writer for " + pythonExec) {
+      override def run() {
+        try {
+          SparkEnv.set(env)
+          val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize)
+          val dataOut = new DataOutputStream(stream)
+          val printOut = new PrintWriter(stream)
+          // Partition index
+          dataOut.writeInt(split.index)
+          // sparkFilesDir
+          PythonRDD.writeAsPickle(SparkFiles.getRootDirectory, dataOut)
+          // Broadcast variables
+          dataOut.writeInt(broadcastVars.length)
+          for (broadcast <- broadcastVars) {
+            dataOut.writeLong(broadcast.id)
+            dataOut.writeInt(broadcast.value.length)
+            dataOut.write(broadcast.value)
+          }
+          // Python includes (*.zip and *.egg files)
+          dataOut.writeInt(pythonIncludes.length)
+          for (f <- pythonIncludes) {
+            PythonRDD.writeAsPickle(f, dataOut)
+          }
+          dataOut.flush()
+          // Serialized user code
+          for (elem <- command) {
+            printOut.println(elem)
+          }
+          printOut.flush()
+          // Data values
+          for (elem <- parent.iterator(split, context)) {
+            PythonRDD.writeAsPickle(elem, dataOut)
+          }
+          dataOut.flush()
+          printOut.flush()
+          worker.shutdownOutput()
+        } catch {
+          case e: IOException =>
+            // This can happen for legitimate reasons if the Python code stops returning data before we are done
+            // passing elements through, e.g., for take(). Just log a message to say it happened.
+            logInfo("stdin writer to Python finished early")
+            logDebug("stdin writer to Python finished early", e)
+        }
+      }
+    }.start()
+
+    // Return an iterator that read lines from the process's stdout
+    val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize))
+    return new Iterator[Array[Byte]] {
+      def next(): Array[Byte] = {
+        val obj = _nextObj
+        if (hasNext) {
+          // FIXME: can deadlock if worker is waiting for us to
+          // respond to current message (currently irrelevant because
+          // output is shutdown before we read any input)
+          _nextObj = read()
+        }
+        obj
+      }
+
+      private def read(): Array[Byte] = {
+        try {
+          stream.readInt() match {
+            case length if length > 0 =>
+              val obj = new Array[Byte](length)
+              stream.readFully(obj)
+              obj
+            case -3 =>
+              // Timing data from worker
+              val bootTime = stream.readLong()
+              val initTime = stream.readLong()
+              val finishTime = stream.readLong()
+              val boot = bootTime - startTime
+              val init = initTime - bootTime
+              val finish = finishTime - initTime
+              val total = finishTime - startTime
+              logInfo("Times: total = %s, boot = %s, init = %s, finish = %s".format(total, boot, init, finish))
+              read
+            case -2 =>
+              // Signals that an exception has been thrown in python
+              val exLength = stream.readInt()
+              val obj = new Array[Byte](exLength)
+              stream.readFully(obj)
+              throw new PythonException(new String(obj))
+            case -1 =>
+              // We've finished the data section of the output, but we can still
+              // read some accumulator updates; let's do that, breaking when we
+              // get a negative length record.
+              var len2 = stream.readInt()
+              while (len2 >= 0) {
+                val update = new Array[Byte](len2)
+                stream.readFully(update)
+                accumulator += Collections.singletonList(update)
+                len2 = stream.readInt()
+              }
+              new Array[Byte](0)
+          }
+        } catch {
+          case eof: EOFException => {
+            throw new SparkException("Python worker exited unexpectedly (crashed)", eof)
+          }
+          case e => throw e
+        }
+      }
+
+      var _nextObj = read()
+
+      def hasNext = _nextObj.length != 0
+    }
+  }
+
+  val asJavaRDD : JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this)
+}
+
+/** Thrown for exceptions in user Python code. */
+private class PythonException(msg: String) extends Exception(msg)
+
+/**
+ * Form an RDD[(Array[Byte], Array[Byte])] from key-value pairs returned from Python.
+ * This is used by PySpark's shuffle operations.
+ */
+private class PairwiseRDD(prev: RDD[Array[Byte]]) extends
+  RDD[(Array[Byte], Array[Byte])](prev) {
+  override def getPartitions = prev.partitions
+  override def compute(split: Partition, context: TaskContext) =
+    prev.iterator(split, context).grouped(2).map {
+      case Seq(a, b) => (a, b)
+      case x          => throw new SparkException("PairwiseRDD: unexpected value: " + x)
+    }
+  val asJavaPairRDD : JavaPairRDD[Array[Byte], Array[Byte]] = JavaPairRDD.fromRDD(this)
+}
+
+private[spark] object PythonRDD {
+
+  /** Strips the pickle PROTO and STOP opcodes from the start and end of a pickle */
+  def stripPickle(arr: Array[Byte]) : Array[Byte] = {
+    arr.slice(2, arr.length - 1)
+  }
+
+  /**
+   * Write strings, pickled Python objects, or pairs of pickled objects to a data output stream.
+   * The data format is a 32-bit integer representing the pickled object's length (in bytes),
+   * followed by the pickled data.
+   *
+   * Pickle module:
+   *
+   *    http://docs.python.org/2/library/pickle.html
+   *
+   * The pickle protocol is documented in the source of the `pickle` and `pickletools` modules:
+   *
+   *    http://hg.python.org/cpython/file/2.6/Lib/pickle.py
+   *    http://hg.python.org/cpython/file/2.6/Lib/pickletools.py
+   *
+   * @param elem the object to write
+   * @param dOut a data output stream
+   */
+  def writeAsPickle(elem: Any, dOut: DataOutputStream) {
+    if (elem.isInstanceOf[Array[Byte]]) {
+      val arr = elem.asInstanceOf[Array[Byte]]
+      dOut.writeInt(arr.length)
+      dOut.write(arr)
+    } else if (elem.isInstanceOf[scala.Tuple2[Array[Byte], Array[Byte]]]) {
+      val t = elem.asInstanceOf[scala.Tuple2[Array[Byte], Array[Byte]]]
+      val length = t._1.length + t._2.length - 3 - 3 + 4  // stripPickle() removes 3 bytes
+      dOut.writeInt(length)
+      dOut.writeByte(Pickle.PROTO)
+      dOut.writeByte(Pickle.TWO)
+      dOut.write(PythonRDD.stripPickle(t._1))
+      dOut.write(PythonRDD.stripPickle(t._2))
+      dOut.writeByte(Pickle.TUPLE2)
+      dOut.writeByte(Pickle.STOP)
+    } else if (elem.isInstanceOf[String]) {
+      // For uniformity, strings are wrapped into Pickles.
+      val s = elem.asInstanceOf[String].getBytes("UTF-8")
+      val length = 2 + 1 + 4 + s.length + 1
+      dOut.writeInt(length)
+      dOut.writeByte(Pickle.PROTO)
+      dOut.writeByte(Pickle.TWO)
+      dOut.write(Pickle.BINUNICODE)
+      dOut.writeInt(Integer.reverseBytes(s.length))
+      dOut.write(s)
+      dOut.writeByte(Pickle.STOP)
+    } else {
+      throw new SparkException("Unexpected RDD type")
+    }
+  }
+
+  def readRDDFromPickleFile(sc: JavaSparkContext, filename: String, parallelism: Int) :
+  JavaRDD[Array[Byte]] = {
+    val file = new DataInputStream(new FileInputStream(filename))
+    val objs = new collection.mutable.ArrayBuffer[Array[Byte]]
+    try {
+      while (true) {
+        val length = file.readInt()
+        val obj = new Array[Byte](length)
+        file.readFully(obj)
+        objs.append(obj)
+      }
+    } catch {
+      case eof: EOFException => {}
+      case e => throw e
+    }
+    JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
+  }
+
+  def writeIteratorToPickleFile[T](items: java.util.Iterator[T], filename: String) {
+    import scala.collection.JavaConverters._
+    writeIteratorToPickleFile(items.asScala, filename)
+  }
+
+  def writeIteratorToPickleFile[T](items: Iterator[T], filename: String) {
+    val file = new DataOutputStream(new FileOutputStream(filename))
+    for (item <- items) {
+      writeAsPickle(item, file)
+    }
+    file.close()
+  }
+
+  def takePartition[T](rdd: RDD[T], partition: Int): Iterator[T] = {
+    implicit val cm : ClassManifest[T] = rdd.elementClassManifest
+    rdd.context.runJob(rdd, ((x: Iterator[T]) => x.toArray), Seq(partition), true).head.iterator
+  }
+}
+
+private object Pickle {
+  val PROTO: Byte = 0x80.toByte
+  val TWO: Byte = 0x02.toByte
+  val BINUNICODE: Byte = 'X'
+  val STOP: Byte = '.'
+  val TUPLE2: Byte = 0x86.toByte
+  val EMPTY_LIST: Byte = ']'
+  val MARK: Byte = '('
+  val APPENDS: Byte = 'e'
+}
+
+private class BytesToString extends org.apache.spark.api.java.function.Function[Array[Byte], String] {
+  override def call(arr: Array[Byte]) : String = new String(arr, "UTF-8")
+}
+
+/**
+ * Internal class that acts as an `AccumulatorParam` for Python accumulators. Inside, it
+ * collects a list of pickled strings that we pass to Python through a socket.
+ */
+class PythonAccumulatorParam(@transient serverHost: String, serverPort: Int)
+  extends AccumulatorParam[JList[Array[Byte]]] {
+
+  Utils.checkHost(serverHost, "Expected hostname")
+
+  val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
+
+  override def zero(value: JList[Array[Byte]]): JList[Array[Byte]] = new JArrayList
+
+  override def addInPlace(val1: JList[Array[Byte]], val2: JList[Array[Byte]])
+      : JList[Array[Byte]] = {
+    if (serverHost == null) {
+      // This happens on the worker node, where we just want to remember all the updates
+      val1.addAll(val2)
+      val1
+    } else {
+      // This happens on the master, where we pass the updates to Python through a socket
+      val socket = new Socket(serverHost, serverPort)
+      val in = socket.getInputStream
+      val out = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream, bufferSize))
+      out.writeInt(val2.size)
+      for (array <- val2) {
+        out.writeInt(array.length)
+        out.write(array)
+      }
+      out.flush()
+      // Wait for a byte from the Python side as an acknowledgement
+      val byteRead = in.read()
+      if (byteRead == -1) {
+        throw new SparkException("EOF reached before Python server acknowledged")
+      }
+      socket.close()
+      null
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
new file mode 100644
index 0000000..08e3f67
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.python
+
+import java.io.{File, DataInputStream, IOException}
+import java.net.{Socket, SocketException, InetAddress}
+
+import scala.collection.JavaConversions._
+
+import org.apache.spark._
+
+private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String, String])
+    extends Logging {
+  var daemon: Process = null
+  val daemonHost = InetAddress.getByAddress(Array(127, 0, 0, 1))
+  var daemonPort: Int = 0
+
+  def create(): Socket = {
+    synchronized {
+      // Start the daemon if it hasn't been started
+      startDaemon()
+
+      // Attempt to connect, restart and retry once if it fails
+      try {
+        new Socket(daemonHost, daemonPort)
+      } catch {
+        case exc: SocketException => {
+          logWarning("Python daemon unexpectedly quit, attempting to restart")
+          stopDaemon()
+          startDaemon()
+          new Socket(daemonHost, daemonPort)
+        }
+        case e => throw e
+      }
+    }
+  }
+
+  def stop() {
+    stopDaemon()
+  }
+
+  private def startDaemon() {
+    synchronized {
+      // Is it already running?
+      if (daemon != null) {
+        return
+      }
+
+      try {
+        // Create and start the daemon
+        val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME")
+        val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/daemon.py"))
+        val workerEnv = pb.environment()
+        workerEnv.putAll(envVars)
+        val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH")
+        workerEnv.put("PYTHONPATH", pythonPath)
+        daemon = pb.start()
+
+        // Redirect the stderr to ours
+        new Thread("stderr reader for " + pythonExec) {
+          override def run() {
+            scala.util.control.Exception.ignoring(classOf[IOException]) {
+              // FIXME HACK: We copy the stream on the level of bytes to
+              // attempt to dodge encoding problems.
+              val in = daemon.getErrorStream
+              var buf = new Array[Byte](1024)
+              var len = in.read(buf)
+              while (len != -1) {
+                System.err.write(buf, 0, len)
+                len = in.read(buf)
+              }
+            }
+          }
+        }.start()
+
+        val in = new DataInputStream(daemon.getInputStream)
+        daemonPort = in.readInt()
+
+        // Redirect further stdout output to our stderr
+        new Thread("stdout reader for " + pythonExec) {
+          override def run() {
+            scala.util.control.Exception.ignoring(classOf[IOException]) {
+              // FIXME HACK: We copy the stream on the level of bytes to
+              // attempt to dodge encoding problems.
+              var buf = new Array[Byte](1024)
+              var len = in.read(buf)
+              while (len != -1) {
+                System.err.write(buf, 0, len)
+                len = in.read(buf)
+              }
+            }
+          }
+        }.start()
+      } catch {
+        case e => {
+          stopDaemon()
+          throw e
+        }
+      }
+
+      // Important: don't close daemon's stdin (daemon.getOutputStream) so it can correctly
+      // detect our disappearance.
+    }
+  }
+
+  private def stopDaemon() {
+    synchronized {
+      // Request shutdown of existing daemon by sending SIGTERM
+      if (daemon != null) {
+        daemon.destroy()
+      }
+
+      daemon = null
+      daemonPort = 0
+    }
+  }
+}