You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nemo.apache.org by GitBox <gi...@apache.org> on 2018/06/12 04:16:13 UTC

[GitHub] wonook closed pull request #28: [NEMO-12] Frontend support for Scala Spark

wonook closed pull request #28: [NEMO-12] Frontend support for Scala Spark
URL: https://github.com/apache/incubator-nemo/pull/28
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/compiler/frontend/spark/pom.xml b/compiler/frontend/spark/pom.xml
index cd41e4c5..1157ee25 100644
--- a/compiler/frontend/spark/pom.xml
+++ b/compiler/frontend/spark/pom.xml
@@ -50,6 +50,17 @@ limitations under the License.
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-core_2.11</artifactId>
             <version>${spark.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-client</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+            <version>${hadoop.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.spark</groupId>
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/java/JavaSparkContext.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/JavaSparkContext.java
similarity index 95%
rename from compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/java/JavaSparkContext.java
rename to compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/JavaSparkContext.java
index cca48cb9..049410da 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/java/JavaSparkContext.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/JavaSparkContext.java
@@ -13,8 +13,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.compiler.frontend.spark.core.java;
+package edu.snu.nemo.compiler.frontend.spark.core;
 
+import edu.snu.nemo.compiler.frontend.spark.core.rdd.JavaRDD;
 import org.apache.spark.SparkContext;
 
 import java.util.List;
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/RDD.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/RDD.java
deleted file mode 100644
index 800942e2..00000000
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/RDD.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.compiler.frontend.spark.core;
-
-import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.dag.DAGBuilder;
-import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.vertex.LoopVertex;
-import org.apache.spark.Partition;
-import org.apache.spark.SparkContext;
-import org.apache.spark.TaskContext;
-import scala.collection.Iterator;
-import scala.reflect.ClassTag$;
-
-import java.util.Stack;
-
-/**
- * RDD for Nemo.
- * @param <T> type of data.
- */
-public final class RDD<T> extends org.apache.spark.rdd.RDD<T> {
-  private final Stack<LoopVertex> loopVertexStack;
-  private final DAG<IRVertex, IREdge> dag;
-
-  /**
-   * Static method to create a RDD object.
-   * @param sparkContext spark context containing configurations.
-   * @param <T> type of the resulting object.
-   * @return the new JavaRDD object.
-   */
-  public static <T> RDD<T> of(final SparkContext sparkContext) {
-    return new RDD<>(sparkContext, new DAGBuilder<IRVertex, IREdge>().buildWithoutSourceSinkCheck());
-  }
-
-  /**
-   * Constructor.
-   * @param sparkContext spark context containing configurations.
-   * @param dag the current DAG.
-   */
-  private RDD(final SparkContext sparkContext, final DAG<IRVertex, IREdge> dag) {
-    super(sparkContext, null, ClassTag$.MODULE$.apply((Class<T>) Object.class));
-
-    this.loopVertexStack = new Stack<>();
-    this.dag = dag;
-  }
-
-  @Override
-  public Iterator<T> compute(final Partition partition, final TaskContext taskContext) {
-    throw new UnsupportedOperationException("Operation unsupported.");
-  }
-
-  @Override
-  public Partition[] getPartitions() {
-    throw new UnsupportedOperationException("Operation unsupported.");
-  }
-}
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/SparkContext.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/SparkContext.java
new file mode 100644
index 00000000..60f43a9c
--- /dev/null
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/SparkContext.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.frontend.spark.core;
+
+import edu.snu.nemo.compiler.frontend.spark.core.rdd.JavaRDD;
+import edu.snu.nemo.compiler.frontend.spark.core.rdd.RDD;
+import org.apache.spark.SparkConf;
+import scala.collection.Seq;
+import scala.reflect.ClassTag;
+
+import java.util.List;
+
+/**
+ * Spark context wrapper for in Nemo.
+ */
+public final class SparkContext extends org.apache.spark.SparkContext {
+  private final org.apache.spark.SparkContext sparkContext;
+
+  /**
+   * Constructor.
+   */
+  public SparkContext() {
+    this.sparkContext = org.apache.spark.SparkContext.getOrCreate();
+  }
+
+  /**
+   * Constructor with configuration.
+   *
+   * @param sparkConf spark configuration to wrap.
+   */
+  public SparkContext(final SparkConf sparkConf) {
+    super(sparkConf);
+    this.sparkContext = org.apache.spark.SparkContext.getOrCreate(sparkConf);
+  }
+
+  /**
+   * Initiate a JavaRDD with the number of parallelism.
+   *
+   * @param seq        input data as list.
+   * @param numSlices  number of slices (parallelism).
+   * @param evidence   type of the initial element.
+   * @return the newly initiated JavaRDD.
+   */
+  @Override
+  public <T> RDD<T> parallelize(final Seq<T> seq,
+                                final int numSlices,
+                                final ClassTag<T> evidence) {
+    final List<T> javaList = scala.collection.JavaConversions.seqAsJavaList(seq);
+    return JavaRDD.of(this.sparkContext, javaList, numSlices).rdd();
+  }
+}
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/java/SparkFrontendUtils.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java
similarity index 54%
rename from compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/java/SparkFrontendUtils.java
rename to compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java
index e49a2d29..1cf535a6 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/java/SparkFrontendUtils.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/SparkFrontendUtils.java
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.compiler.frontend.spark.core.java;
+package edu.snu.nemo.compiler.frontend.spark.core;
 
 import edu.snu.nemo.client.JobLauncher;
 import edu.snu.nemo.common.dag.DAG;
@@ -29,15 +29,23 @@
 import edu.snu.nemo.compiler.frontend.spark.transform.CollectTransform;
 import edu.snu.nemo.compiler.frontend.spark.transform.GroupByKeyTransform;
 import edu.snu.nemo.compiler.frontend.spark.transform.ReduceByKeyTransform;
-import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.serializer.JavaSerializer;
 import org.apache.spark.serializer.KryoSerializer;
 import org.apache.spark.serializer.Serializer;
+import scala.Function1;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.TraversableOnce;
 
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.ObjectInputStream;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Stack;
 
@@ -53,10 +61,11 @@ private SparkFrontendUtils() {
 
   /**
    * Derive Spark serializer from a spark context.
+   *
    * @param sparkContext spark context to derive the serializer from.
    * @return the serializer.
    */
-  public static Serializer deriveSerializerFrom(final SparkContext sparkContext) {
+  public static Serializer deriveSerializerFrom(final org.apache.spark.SparkContext sparkContext) {
     if (sparkContext.conf().get("spark.serializer", "")
         .equals("org.apache.spark.serializer.KryoSerializer")) {
       return new KryoSerializer(sparkContext.conf());
@@ -67,11 +76,12 @@ public static Serializer deriveSerializerFrom(final SparkContext sparkContext) {
 
   /**
    * Collect data by running the DAG.
-   * @param dag the DAG to execute.
+   *
+   * @param dag             the DAG to execute.
    * @param loopVertexStack loop vertex stack.
-   * @param lastVertex last vertex added to the dag.
-   * @param serializer serializer for the edges.
-   * @param <T> type of the return data.
+   * @param lastVertex      last vertex added to the dag.
+   * @param serializer      serializer for the edges.
+   * @param <T>             type of the return data.
    * @return the data collected.
    */
   public static <T> List<T> collect(final DAG<IRVertex, IREdge> dag, final Stack<LoopVertex> loopVertexStack,
@@ -102,9 +112,13 @@ public static Serializer deriveSerializerFrom(final SparkContext sparkContext) {
       // TODO #740: remove this part, and make it properly transfer with executor.
       File file = new File(resultFile + i);
       while (file.exists()) {
-        try (final FileInputStream fin = new FileInputStream(file)) {
-          try (final ObjectInputStream ois = new ObjectInputStream(fin)) {
-            result.addAll((List<T>) ois.readObject());
+        try (
+            final FileInputStream fis = new FileInputStream(file);
+            final ObjectInputStream dis = new ObjectInputStream(fis)
+        ) {
+          final int size = dis.readInt(); // Read the number of collected T recorded in CollectTransform.
+          for (int j = 0; j < size; j++) {
+            result.add((T) dis.readObject());
           }
         }
 
@@ -121,12 +135,13 @@ public static Serializer deriveSerializerFrom(final SparkContext sparkContext) {
 
   /**
    * Retrieve communication pattern of the edge.
+   *
    * @param src source vertex.
    * @param dst destination vertex.
    * @return the communication pattern.
    */
-  static DataCommunicationPatternProperty.Value getEdgeCommunicationPattern(final IRVertex src,
-                                                                            final IRVertex dst) {
+  public static DataCommunicationPatternProperty.Value getEdgeCommunicationPattern(final IRVertex src,
+                                                                                   final IRVertex dst) {
     if (dst instanceof OperatorVertex
         && (((OperatorVertex) dst).getTransform() instanceof ReduceByKeyTransform
         || ((OperatorVertex) dst).getTransform() instanceof GroupByKeyTransform)) {
@@ -135,4 +150,76 @@ public static Serializer deriveSerializerFrom(final SparkContext sparkContext) {
       return DataCommunicationPatternProperty.Value.OneToOne;
     }
   }
+
+  /**
+   * Converts a {@link Function1} to a corresponding {@link Function}.
+   *
+   * @param scalaFunction the scala function to convert.
+   * @param <I>           the type of input.
+   * @param <O>           the type of output.
+   * @return the converted Java function.
+   */
+  public static <I, O> Function<I, O> toJavaFunction(final Function1<I, O> scalaFunction) {
+    return new Function<I, O>() {
+      @Override
+      public O call(final I v1) throws Exception {
+        return scalaFunction.apply(v1);
+      }
+    };
+  }
+
+  /**
+   * Converts a {@link scala.Function2} to a corresponding {@link org.apache.spark.api.java.function.Function2}.
+   *
+   * @param scalaFunction the scala function to convert.
+   * @param <I1>          the type of first input.
+   * @param <I2>          the type of second input.
+   * @param <O>           the type of output.
+   * @return the converted Java function.
+   */
+  public static <I1, I2, O> Function2<I1, I2, O> toJavaFunction(final scala.Function2<I1, I2, O> scalaFunction) {
+    return new Function2<I1, I2, O>() {
+      @Override
+      public O call(final I1 v1, final I2 v2) throws Exception {
+        return scalaFunction.apply(v1, v2);
+      }
+    };
+  }
+
+  /**
+   * Converts a {@link Function1} to a corresponding {@link FlatMapFunction}.
+   *
+   * @param scalaFunction the scala function to convert.
+   * @param <I>           the type of input.
+   * @param <O>           the type of output.
+   * @return the converted Java function.
+   */
+  public static <I, O> FlatMapFunction<I, O> toJavaFlatMapFunction(
+      final Function1<I, TraversableOnce<O>> scalaFunction) {
+    return new FlatMapFunction<I, O>() {
+      @Override
+      public Iterator<O> call(final I i) throws Exception {
+        return JavaConverters.asJavaIteratorConverter(scalaFunction.apply(i).toIterator()).asJava();
+      }
+    };
+  }
+
+  /**
+   * Converts a {@link PairFunction} to a plain map {@link Function}.
+   *
+   * @param pairFunction the pair function to convert.
+   * @param <T>          the type of original element.
+   * @param <K>          the type of converted key.
+   * @param <V>          the type of converted value.
+   * @return the converted map function.
+   */
+  public static <T, K, V> Function<T, Tuple2<K, V>> pairFunctionToPlainFunction(
+      final PairFunction<T, K, V> pairFunction) {
+    return new Function<T, Tuple2<K, V>>() {
+      @Override
+      public Tuple2<K, V> call(final T elem) throws Exception {
+        return pairFunction.call(elem);
+      }
+    };
+  }
 }
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/java/JavaPairRDD.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/java/JavaPairRDD.java
deleted file mode 100644
index f9791204..00000000
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/java/JavaPairRDD.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.compiler.frontend.spark.core.java;
-
-import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.dag.DAGBuilder;
-import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.edge.executionproperty.KeyExtractorProperty;
-import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.vertex.LoopVertex;
-import edu.snu.nemo.common.ir.vertex.OperatorVertex;
-import edu.snu.nemo.compiler.frontend.spark.SparkKeyExtractor;
-import edu.snu.nemo.compiler.frontend.spark.coder.SparkCoder;
-import edu.snu.nemo.compiler.frontend.spark.core.RDD;
-import edu.snu.nemo.compiler.frontend.spark.transform.MapTransform;
-import edu.snu.nemo.compiler.frontend.spark.transform.ReduceByKeyTransform;
-import org.apache.spark.SparkContext;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.serializer.Serializer;
-import scala.Tuple2;
-import scala.reflect.ClassTag$;
-
-import java.util.List;
-import java.util.Stack;
-
-import static edu.snu.nemo.compiler.frontend.spark.core.java.SparkFrontendUtils.getEdgeCommunicationPattern;
-
-/**
- * Java RDD for pairs.
- * @param <K> key type.
- * @param <V> value type.
- */
-public final class JavaPairRDD<K, V> extends org.apache.spark.api.java.JavaPairRDD<K, V> {
-  private final SparkContext sparkContext;
-  private final Stack<LoopVertex> loopVertexStack;
-  private final DAG<IRVertex, IREdge> dag;
-  private final IRVertex lastVertex;
-  private final Serializer serializer;
-
-  /**
-   * Constructor.
-   * @param sparkContext spark context containing configurations.
-   * @param dag the current DAG.
-   * @param lastVertex last vertex added to the builder.
-   */
-  JavaPairRDD(final SparkContext sparkContext, final DAG<IRVertex, IREdge> dag, final IRVertex lastVertex) {
-    // TODO #366: resolve while implementing scala RDD.
-    super(RDD.<Tuple2<K, V>>of(sparkContext),
-        ClassTag$.MODULE$.apply((Class<K>) Object.class), ClassTag$.MODULE$.apply((Class<V>) Object.class));
-
-    this.loopVertexStack = new Stack<>();
-    this.sparkContext = sparkContext;
-    this.dag = dag;
-    this.lastVertex = lastVertex;
-    this.serializer = SparkFrontendUtils.deriveSerializerFrom(sparkContext);
-  }
-
-  /**
-   * @return the spark context.
-   */
-  public SparkContext getSparkContext() {
-    return sparkContext;
-  }
-
-  /////////////// TRANSFORMATIONS ///////////////
-
-  @Override
-  public JavaPairRDD<K, V> reduceByKey(final Function2<V, V, V> func) {
-    final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>(dag);
-
-    final IRVertex reduceByKeyVertex = new OperatorVertex(new ReduceByKeyTransform<K, V>(func));
-    builder.addVertex(reduceByKeyVertex, loopVertexStack);
-
-    final IREdge newEdge = new IREdge(getEdgeCommunicationPattern(lastVertex, reduceByKeyVertex),
-        lastVertex, reduceByKeyVertex, new SparkCoder(serializer));
-    newEdge.setProperty(KeyExtractorProperty.of(new SparkKeyExtractor()));
-    builder.connectVertices(newEdge);
-
-    return new JavaPairRDD<>(this.sparkContext, builder.buildWithoutSourceSinkCheck(), reduceByKeyVertex);
-  }
-
-  @Override
-  public <R> JavaRDD<R> map(final Function<Tuple2<K, V>, R> f) {
-    final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>(dag);
-
-    final IRVertex mapVertex = new OperatorVertex(new MapTransform<>(f));
-    builder.addVertex(mapVertex, loopVertexStack);
-
-    final IREdge newEdge = new IREdge(getEdgeCommunicationPattern(lastVertex, mapVertex),
-        lastVertex, mapVertex, new SparkCoder(serializer));
-    newEdge.setProperty(KeyExtractorProperty.of(new SparkKeyExtractor()));
-    builder.connectVertices(newEdge);
-
-    return new JavaRDD<>(this.sparkContext, builder.buildWithoutSourceSinkCheck(), mapVertex);
-  }
-
-  /////////////// ACTIONS ///////////////
-
-  @Override
-  public List<Tuple2<K, V>> collect() {
-    return SparkFrontendUtils.collect(dag, loopVertexStack, lastVertex, serializer);
-  }
-
-  //TODO#776: support unimplemented RDD transformation/actions.
-}
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/rdd/JavaPairRDD.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/rdd/JavaPairRDD.java
new file mode 100644
index 00000000..f0682e11
--- /dev/null
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/rdd/JavaPairRDD.java
@@ -0,0 +1,707 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.frontend.spark.core.rdd;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.spark.Partitioner;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.Optional;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.partial.BoundedDouble;
+import org.apache.spark.partial.PartialResult;
+import org.apache.spark.serializer.Serializer;
+import org.apache.spark.storage.StorageLevel;
+import scala.Tuple2;
+import scala.Tuple3;
+import scala.Tuple4;
+import scala.reflect.ClassTag$;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Java RDD for pairs.
+ * @param <K> key type.
+ * @param <V> value type.
+ */
+public final class JavaPairRDD<K, V> extends org.apache.spark.api.java.JavaPairRDD<K, V> {
+
+  private final RDD<Tuple2<K, V>> rdd;
+
+  /**
+   * Static method to create a JavaPairRDD object from {@link RDD}.
+   *
+   * @param rddFrom the RDD to parse.
+   * @param <K>     type of the key.
+   * @param <V>     type of the value.
+   * @return the parsed JavaPairRDD object.
+   */
+  public static <K, V> JavaPairRDD<K, V> fromRDD(final RDD<Tuple2<K, V>> rddFrom) {
+    return new JavaPairRDD<>(rddFrom);
+  }
+
+  @Override
+  public JavaPairRDD<K, V> wrapRDD(final org.apache.spark.rdd.RDD<Tuple2<K, V>> rddFrom) {
+    if (!(rddFrom instanceof RDD)) {
+      throw new UnsupportedOperationException("Cannot wrap Spark RDD as Nemo RDD!");
+    }
+    return fromRDD((RDD<Tuple2<K, V>>) rddFrom);
+  }
+
+  @Override
+  public RDD<Tuple2<K, V>> rdd() {
+    return rdd;
+  }
+
+  /**
+   * Constructor with existing nemo RDD.
+   *
+   * @param rdd the Nemo rdd to wrap.
+   */
+  JavaPairRDD(final RDD<Tuple2<K, V>> rdd) {
+    super(rdd, ClassTag$.MODULE$.apply(Object.class), ClassTag$.MODULE$.apply(Object.class));
+
+    this.rdd = rdd;
+  }
+
+  /**
+   * @return the spark context.
+   */
+  public SparkContext getSparkContext() {
+    return rdd.sparkContext();
+  }
+
+  /////////////// TRANSFORMATIONS ///////////////
+
+  @Override
+  public JavaPairRDD<K, V> reduceByKey(final Function2<V, V, V> func) {
+    // Explicit conversion
+    final PairRDDFunctions<K, V> pairRdd = RDD.rddToPairRDDFunctions(
+        rdd, ClassTag$.MODULE$.apply(Object.class), ClassTag$.MODULE$.apply(Object.class), null);
+    final RDD<Tuple2<K, V>> reducedRdd = pairRdd.reduceByKey(func);
+    return JavaPairRDD.fromRDD(reducedRdd);
+  }
+
+  @Override
+  public <R> JavaRDD<R> map(final Function<Tuple2<K, V>, R> f) {
+    return rdd.map(f, ClassTag$.MODULE$.apply(Object.class)).toJavaRDD();
+  }
+
+  /////////////// ACTIONS ///////////////
+
+  @Override
+  public List<Tuple2<K, V>> collect() {
+    return rdd.collectAsList();
+  }
+
+  /////////////// UNSUPPORTED METHODS ///////////////
+  //TODO#92: Implement the unimplemented transformations/actions & dataset initialization methods for Spark frontend.
+
+  @Override
+  public JavaPairRDD<K, V> cache() {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public JavaPairRDD<K, V> persist(final StorageLevel newLevel) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public JavaPairRDD<K, V> unpersist() {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public JavaPairRDD<K, V> unpersist(final boolean blocking) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public JavaPairRDD<K, V> distinct() {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public JavaPairRDD<K, V> distinct(final int numPartitions) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public JavaPairRDD<K, V> filter(final Function<Tuple2<K, V>, Boolean> f) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public JavaPairRDD<K, V> coalesce(final int numPartitions) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public JavaPairRDD<K, V> coalesce(final int numPartitions,
+                                    final boolean shuffle) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public JavaPairRDD<K, V> repartition(final int numPartitions) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public JavaPairRDD<K, V> sample(final boolean withReplacement,
+                                  final double fraction) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public JavaPairRDD<K, V> sample(final boolean withReplacement,
+                                  final double fraction,
+                                  final long seed) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public JavaPairRDD<K, V> sampleByKey(final boolean withReplacement,
+                                       final Map<K, Double> fractions,
+                                       final long seed) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public JavaPairRDD<K, V> sampleByKey(final boolean withReplacement,
+                                       final Map<K, Double> fractions) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public JavaPairRDD<K, V> sampleByKeyExact(final boolean withReplacement,
+                                            final Map<K, Double> fractions,
+                                            final long seed) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public JavaPairRDD<K, V> sampleByKeyExact(final boolean withReplacement,
+                                            final Map<K, Double> fractions) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public JavaPairRDD<K, V> union(final org.apache.spark.api.java.JavaPairRDD<K, V> other) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public JavaPairRDD<K, V> intersection(final org.apache.spark.api.java.JavaPairRDD<K, V> other) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public Tuple2<K, V> first() {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public <C> JavaPairRDD<K, C> combineByKey(final Function<V, C> createCombiner,
+                                            final Function2<C, V, C> mergeValue,
+                                            final Function2<C, C, C> mergeCombiners,
+                                            final Partitioner partitioner,
+                                            final boolean mapSideCombine,
+                                            final Serializer serializer) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public <C> JavaPairRDD<K, C> combineByKey(final Function<V, C> createCombiner,
+                                            final Function2<C, V, C> mergeValue,
+                                            final Function2<C, C, C> mergeCombiners,
+                                            final Partitioner partitioner) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public <C> JavaPairRDD<K, C> combineByKey(final Function<V, C> createCombiner,
+                                            final Function2<C, V, C> mergeValue,
+                                            final Function2<C, C, C> mergeCombiners,
+                                            final int numPartitions) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public JavaPairRDD<K, V> reduceByKey(final Partitioner partitioner,
+                                       final Function2<V, V, V> func) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public Map<K, V> reduceByKeyLocally(final Function2<V, V, V> func) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public Map<K, Long> countByKey() {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public PartialResult<Map<K, BoundedDouble>> countByKeyApprox(final long timeout) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public PartialResult<Map<K, BoundedDouble>> countByKeyApprox(final long timeout,
+                                                               final double confidence) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public double countByKeyApprox$default$2() {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public <U> JavaPairRDD<K, U> aggregateByKey(final U zeroValue,
+                                              final Partitioner partitioner,
+                                              final Function2<U, V, U> seqFunc,
+                                              final Function2<U, U, U> combFunc) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public <U> JavaPairRDD<K, U> aggregateByKey(final U zeroValue,
+                                              final int numPartitions,
+                                              final Function2<U, V, U> seqFunc,
+                                              final Function2<U, U, U> combFunc) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public <U> JavaPairRDD<K, U> aggregateByKey(final U zeroValue,
+                                              final Function2<U, V, U> seqFunc,
+                                              final Function2<U, U, U> combFunc) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public JavaPairRDD<K, V> foldByKey(final V zeroValue,
+                                     final Partitioner partitioner,
+                                     final Function2<V, V, V> func) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public JavaPairRDD<K, V> foldByKey(final V zeroValue,
+                                     final int numPartitions,
+                                     final Function2<V, V, V> func) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public JavaPairRDD<K, V> foldByKey(final V zeroValue,
+                                     final Function2<V, V, V> func) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public JavaPairRDD<K, V> reduceByKey(final Function2<V, V, V> func,
+                                       final int numPartitions) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public JavaPairRDD<K, Iterable<V>> groupByKey(final Partitioner partitioner) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public JavaPairRDD<K, Iterable<V>> groupByKey(final int numPartitions) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public JavaPairRDD<K, V> subtract(final org.apache.spark.api.java.JavaPairRDD<K, V> other) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public JavaPairRDD<K, V> subtract(final org.apache.spark.api.java.JavaPairRDD<K, V> other,
+                                    final int numPartitions) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public JavaPairRDD<K, V> subtract(final org.apache.spark.api.java.JavaPairRDD<K, V> other,
+                                    final Partitioner p) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public <W> JavaPairRDD<K, V> subtractByKey(final org.apache.spark.api.java.JavaPairRDD<K, W> other) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public <W> JavaPairRDD<K, V> subtractByKey(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
+                                             final int numPartitions) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public <W> JavaPairRDD<K, V> subtractByKey(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
+                                             final Partitioner p) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public JavaPairRDD<K, V> partitionBy(final Partitioner partitioner) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public <W> JavaPairRDD<K, Tuple2<V, W>> join(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
+                                               final Partitioner partitioner) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public <W> JavaPairRDD<K, Tuple2<V, Optional<W>>>
+  leftOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
+                final Partitioner partitioner) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public <W> JavaPairRDD<K, Tuple2<Optional<V>, W>>
+  rightOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
+                 final Partitioner partitioner) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public <W> JavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>>
+  fullOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
+                final Partitioner partitioner) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public <C> JavaPairRDD<K, C> combineByKey(final Function<V, C> createCombiner,
+                                            final Function2<C, V, C> mergeValue,
+                                            final Function2<C, C, C> mergeCombiners) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public org.apache.spark.api.java.JavaPairRDD<K, Iterable<V>> groupByKey() {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public <W> JavaPairRDD<K, Tuple2<V, W>> join(final org.apache.spark.api.java.JavaPairRDD<K, W> other) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public <W> JavaPairRDD<K, Tuple2<V, W>> join(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
+                                               final int numPartitions) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public <W> JavaPairRDD<K, Tuple2<V, Optional<W>>>
+  leftOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public <W> JavaPairRDD<K, Tuple2<V, Optional<W>>>
+  leftOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
+                final int numPartitions) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public <W> JavaPairRDD<K, Tuple2<Optional<V>, W>>
+  rightOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public <W> JavaPairRDD<K, Tuple2<Optional<V>, W>>
+  rightOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
+                 final int numPartitions) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public <W> JavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>>
+  fullOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public <W> JavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>>
+  fullOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
+                final int numPartitions) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public Map<K, V> collectAsMap() {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public <U> JavaPairRDD<K, U> mapValues(final Function<V, U> f) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public <U> JavaPairRDD<K, U> flatMapValues(final Function<V, Iterable<U>> f) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public <W> JavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>>
+  cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
+          final Partitioner partitioner) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public <W1, W2> JavaPairRDD<K, Tuple3<Iterable<V>, Iterable<W1>, Iterable<W2>>>
+  cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W1> other1,
+          final org.apache.spark.api.java.JavaPairRDD<K, W2> other2,
+          final Partitioner partitioner) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public <W1, W2, W3> JavaPairRDD<K, Tuple4<Iterable<V>, Iterable<W1>, Iterable<W2>, Iterable<W3>>>
+  cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W1> other1,
+          final org.apache.spark.api.java.JavaPairRDD<K, W2> other2,
+          final org.apache.spark.api.java.JavaPairRDD<K, W3> other3,
+          final Partitioner partitioner) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public <W> JavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>>
+  cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W> other) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public <W1, W2> JavaPairRDD<K, Tuple3<Iterable<V>, Iterable<W1>, Iterable<W2>>>
+  cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W1> other1,
+          final org.apache.spark.api.java.JavaPairRDD<K, W2> other2) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public <W1, W2, W3> JavaPairRDD<K, Tuple4<Iterable<V>, Iterable<W1>, Iterable<W2>, Iterable<W3>>>
+  cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W1> other1,
+          final org.apache.spark.api.java.JavaPairRDD<K, W2> other2,
+          final org.apache.spark.api.java.JavaPairRDD<K, W3> other3) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public <W> JavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>>
+  cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
+          final int numPartitions) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public <W1, W2> JavaPairRDD<K, Tuple3<Iterable<V>, Iterable<W1>, Iterable<W2>>>
+  cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W1> other1,
+          final org.apache.spark.api.java.JavaPairRDD<K, W2> other2,
+          final int numPartitions) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public <W1, W2, W3> JavaPairRDD<K, Tuple4<Iterable<V>, Iterable<W1>, Iterable<W2>, Iterable<W3>>>
+  cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W1> other1,
+          final org.apache.spark.api.java.JavaPairRDD<K, W2> other2,
+          final org.apache.spark.api.java.JavaPairRDD<K, W3> other3,
+          final int numPartitions) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public <W> JavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>>
+  groupWith(final org.apache.spark.api.java.JavaPairRDD<K, W> other) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public <W1, W2> JavaPairRDD<K, Tuple3<Iterable<V>, Iterable<W1>, Iterable<W2>>>
+  groupWith(final org.apache.spark.api.java.JavaPairRDD<K, W1> other1,
+            final org.apache.spark.api.java.JavaPairRDD<K, W2> other2) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public <W1, W2, W3> JavaPairRDD<K, Tuple4<Iterable<V>, Iterable<W1>, Iterable<W2>, Iterable<W3>>>
+  groupWith(final org.apache.spark.api.java.JavaPairRDD<K, W1> other1,
+            final org.apache.spark.api.java.JavaPairRDD<K, W2> other2,
+            final org.apache.spark.api.java.JavaPairRDD<K, W3> other3) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public List<V> lookup(final K key) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  // Commented out due to an IDE issue
+  /*@Override
+  public <F extends OutputFormat<?, ?>> void saveAsHadoopFile(final String path,
+                                                              final Class<?> keyClass,
+                                                              final Class<?> valueClass,
+                                                              final Class<F> outputFormatClass,
+                                                              final JobConf conf) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public <F extends OutputFormat<?, ?>> void saveAsHadoopFile(final String path,
+                                                              final Class<?> keyClass,
+                                                              final Class<?> valueClass,
+                                                              final Class<F> outputFormatClass) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public <F extends OutputFormat<?, ?>> void saveAsHadoopFile(final String path,
+                                                              final Class<?> keyClass,
+                                                              final Class<?> valueClass,
+                                                              final Class<F> outputFormatClass,
+                                                              final Class<? extends CompressionCodec> codec) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public <F extends org.apache.hadoop.mapreduce.OutputFormat<?, ?>> void
+  saveAsNewAPIHadoopFile(final String path,
+                         final Class<?> keyClass,
+                         final Class<?> valueClass,
+                         final Class<F> outputFormatClass,
+                         final Configuration conf) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public void saveAsNewAPIHadoopDataset(final Configuration conf) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public <F extends org.apache.hadoop.mapreduce.OutputFormat<?, ?>> void
+  saveAsNewAPIHadoopFile(final String path,
+                         final Class<?> keyClass,
+                         final Class<?> valueClass,
+                         final Class<F> outputFormatClass) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }*/
+
+  @Override
+  public void saveAsHadoopDataset(final JobConf conf) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public JavaPairRDD<K, V> repartitionAndSortWithinPartitions(final Partitioner partitioner) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public JavaPairRDD<K, V> repartitionAndSortWithinPartitions(final Partitioner partitioner,
+                                                              final Comparator<K> comp) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public JavaPairRDD<K, V> sortByKey() {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public JavaPairRDD<K, V> sortByKey(final boolean ascending) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public JavaPairRDD<K, V> sortByKey(final boolean ascending,
+                                     final int numPartitions) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public JavaPairRDD<K, V> sortByKey(final Comparator<K> comp) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public JavaPairRDD<K, V> sortByKey(final Comparator<K> comp,
+                                     final boolean ascending) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public JavaPairRDD<K, V> sortByKey(final Comparator<K> comp,
+                                     final boolean ascending,
+                                     final int numPartitions) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public JavaRDD<K> keys() {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public JavaRDD<V> values() {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public JavaPairRDD<K, Long> countApproxDistinctByKey(final double relativeSD,
+                                                       final Partitioner partitioner) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public JavaPairRDD<K, Long> countApproxDistinctByKey(final double relativeSD,
+                                                       final int numPartitions) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public JavaPairRDD<K, Long> countApproxDistinctByKey(final double relativeSD) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+
+  @Override
+  public JavaPairRDD<K, V> setName(final String name) {
+    throw new UnsupportedOperationException("Operation not yet implemented.");
+  }
+}
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/java/JavaRDD.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/rdd/JavaRDD.java
similarity index 74%
rename from compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/java/JavaRDD.java
rename to compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/rdd/JavaRDD.java
index a3e3fd6b..2fad99c3 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/java/JavaRDD.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/rdd/JavaRDD.java
@@ -13,52 +13,43 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.compiler.frontend.spark.core.java;
+package edu.snu.nemo.compiler.frontend.spark.core.rdd;
 
-import edu.snu.nemo.client.JobLauncher;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.edge.executionproperty.KeyExtractorProperty;
-import edu.snu.nemo.common.ir.vertex.*;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.vertex.InMemorySourceVertex;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
-import edu.snu.nemo.common.ir.vertex.transform.Transform;
-import edu.snu.nemo.compiler.frontend.spark.SparkKeyExtractor;
-import edu.snu.nemo.compiler.frontend.spark.coder.SparkCoder;
-import edu.snu.nemo.compiler.frontend.spark.core.RDD;
+import edu.snu.nemo.compiler.frontend.spark.core.SparkFrontendUtils;
 import edu.snu.nemo.compiler.frontend.spark.source.SparkDatasetBoundedSourceVertex;
 import edu.snu.nemo.compiler.frontend.spark.source.SparkTextFileBoundedSourceVertex;
 import edu.snu.nemo.compiler.frontend.spark.sql.Dataset;
 import edu.snu.nemo.compiler.frontend.spark.sql.SparkSession;
-import edu.snu.nemo.compiler.frontend.spark.transform.*;
 import org.apache.spark.*;
 import org.apache.spark.api.java.JavaFutureAction;
 import org.apache.spark.api.java.Optional;
 import org.apache.spark.api.java.function.*;
 import org.apache.spark.partial.BoundedDouble;
 import org.apache.spark.partial.PartialResult;
-import org.apache.spark.serializer.Serializer;
 import org.apache.spark.storage.StorageLevel;
+import scala.Option;
+import scala.Tuple2;
 import scala.reflect.ClassTag$;
 
 import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static edu.snu.nemo.compiler.frontend.spark.core.java.SparkFrontendUtils.getEdgeCommunicationPattern;
-
 /**
  * Java RDD.
  * @param <T> type of the final element.
  */
 public final class JavaRDD<T> extends org.apache.spark.api.java.JavaRDD<T> {
-  private final SparkContext sparkContext;
-  private final Stack<LoopVertex> loopVertexStack;
-  private final DAG<IRVertex, IREdge> dag;
-  private final IRVertex lastVertex;
-  private final Serializer serializer;
+
+  private final RDD<T> rdd;
 
   /**
-   * Static method to create a JavaRDD object from an iterable object.
+   * Static method to create a RDD object from an iterable object.
    *
    * @param sparkContext spark context containing configurations.
    * @param initialData  initial data.
@@ -68,36 +59,39 @@
    */
   public static <T> JavaRDD<T> of(final SparkContext sparkContext,
                                   final Iterable<T> initialData,
-                                  final int parallelism) {
+                                  final Integer parallelism) {
     final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
 
     final IRVertex initializedSourceVertex = new InMemorySourceVertex<>(initialData);
     initializedSourceVertex.setProperty(ParallelismProperty.of(parallelism));
     builder.addVertex(initializedSourceVertex);
 
-    return new JavaRDD<>(sparkContext, builder.buildWithoutSourceSinkCheck(), initializedSourceVertex);
+    final RDD<T> nemoRdd = new RDD<>(sparkContext, builder.buildWithoutSourceSinkCheck(),
+        initializedSourceVertex, Option.empty(), ClassTag$.MODULE$.apply(Object.class));
+
+    return new JavaRDD<>(nemoRdd);
   }
 
   /**
    * Static method to create a JavaRDD object from an text file.
    *
    * @param sparkContext  the spark context containing configurations.
-   * @param minPartitions the minimum nubmer of partitions.
+   * @param minPartitions the minimum number of partitions.
    * @param inputPath     the path of the input text file.
-   * @param <T>           the type of resulting object.
    * @return the new JavaRDD object
    */
-  public static <T> JavaRDD<T> of(final SparkContext sparkContext,
-                                  final int minPartitions,
-                                  final String inputPath) {
+  public static JavaRDD<String> of(final SparkContext sparkContext,
+                                   final int minPartitions,
+                                   final String inputPath) {
     final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
 
-    final int numPartitions = sparkContext.textFile(inputPath, minPartitions).getNumPartitions();
+    final org.apache.spark.rdd.RDD<String> textRdd = sparkContext.textFile(inputPath, minPartitions);
+    final int numPartitions = textRdd.getNumPartitions();
     final IRVertex textSourceVertex = new SparkTextFileBoundedSourceVertex(sparkContext, inputPath, numPartitions);
     textSourceVertex.setProperty(ParallelismProperty.of(numPartitions));
     builder.addVertex(textSourceVertex);
 
-    return new JavaRDD<>(sparkContext, builder.buildWithoutSourceSinkCheck(), textSourceVertex);
+    return new JavaRDD<>(textRdd, sparkContext, builder.buildWithoutSourceSinkCheck(), textSourceVertex);
   }
 
   /**
@@ -113,35 +107,63 @@
     final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
 
     final IRVertex sparkBoundedSourceVertex = new SparkDatasetBoundedSourceVertex<>(sparkSession, dataset);
-    sparkBoundedSourceVertex.setProperty(ParallelismProperty.of(dataset.rdd().getNumPartitions()));
+    final org.apache.spark.rdd.RDD<T> sparkRDD = dataset.sparkRDD();
+    sparkBoundedSourceVertex.setProperty(ParallelismProperty.of(sparkRDD.getNumPartitions()));
     builder.addVertex(sparkBoundedSourceVertex);
 
-    return new JavaRDD<>(sparkSession.sparkContext(), builder.buildWithoutSourceSinkCheck(), sparkBoundedSourceVertex);
+    return new JavaRDD<>(
+        sparkRDD, sparkSession.sparkContext(), builder.buildWithoutSourceSinkCheck(), sparkBoundedSourceVertex);
   }
 
   /**
-   * Constructor.
+   * Static method to create a JavaRDD object from {@link RDD}.
    *
-   * @param sparkContext spark context containing configurations.
-   * @param dag          the current DAG.
-   * @param lastVertex   last vertex added to the builder.
+   * @param rddFrom the RDD to parse.
+   * @param <T>     type of the resulting object.
+   * @return the parsed JavaRDD object.
    */
-  JavaRDD(final SparkContext sparkContext, final DAG<IRVertex, IREdge> dag, final IRVertex lastVertex) {
-    // TODO #366: resolve while implementing scala RDD.
-    super(RDD.of(sparkContext), ClassTag$.MODULE$.apply((Class<T>) Object.class));
+  public static <T> JavaRDD<T> fromRDD(final RDD<T> rddFrom) {
+    return new JavaRDD<>(rddFrom);
+  }
 
-    this.loopVertexStack = new Stack<>();
-    this.sparkContext = sparkContext;
-    this.dag = dag;
-    this.lastVertex = lastVertex;
-    this.serializer = SparkFrontendUtils.deriveSerializerFrom(sparkContext);
+  @Override
+  public JavaRDD<T> wrapRDD(final org.apache.spark.rdd.RDD<T> rddFrom) {
+    if (!(rddFrom instanceof RDD)) {
+      throw new UnsupportedOperationException("Cannot wrap Spark RDD as Nemo RDD!");
+    }
+    return fromRDD((RDD<T>) rddFrom);
+  }
+
+  @Override
+  public RDD<T> rdd() {
+    return rdd;
   }
 
   /**
-   * @return the spark context.
+   * Constructor with existing nemo RDD.
+   *
+   * @param rdd the Nemo rdd to wrap.
    */
-  public SparkContext getSparkContext() {
-    return sparkContext;
+  JavaRDD(final RDD<T> rdd) {
+    super(rdd, ClassTag$.MODULE$.apply(Object.class));
+    this.rdd = rdd;
+  }
+
+  /**
+   * Constructor with Spark source RDD.
+   *
+   * @param sparkRDD     the Spark source rdd to wrap.
+   * @param sparkContext the Spark context in the wrapped rdd.
+   * @param dag          the IR DAG in construction.
+   * @param lastVertex   the last vertex of the DAG in construction.
+   */
+  JavaRDD(final org.apache.spark.rdd.RDD<T> sparkRDD,
+          final SparkContext sparkContext,
+          final DAG<IRVertex, IREdge> dag,
+          final IRVertex lastVertex) {
+    super(sparkRDD, ClassTag$.MODULE$.apply(Object.class));
+
+    this.rdd = new RDD<>(sparkContext, dag, lastVertex, Option.apply(sparkRDD), ClassTag$.MODULE$.apply(Object.class));
   }
 
   /////////////// TRANSFORMATIONS ///////////////
@@ -151,53 +173,35 @@ public SparkContext getSparkContext() {
    *
    * @param func function to apply.
    * @param <O>  output type.
-   * @return the JavaRDD with the DAG.
+   * @return the JavaRDD with the extended DAG.
    */
   @Override
   public <O> JavaRDD<O> map(final Function<T, O> func) {
-    final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>(dag);
-
-    final IRVertex mapVertex = new OperatorVertex(new MapTransform<>(func));
-    builder.addVertex(mapVertex, loopVertexStack);
-
-    final IREdge newEdge = new IREdge(getEdgeCommunicationPattern(lastVertex, mapVertex),
-        lastVertex, mapVertex, new SparkCoder(serializer));
-    newEdge.setProperty(KeyExtractorProperty.of(new SparkKeyExtractor()));
-    builder.connectVertices(newEdge);
-
-    return new JavaRDD<>(this.sparkContext, builder.buildWithoutSourceSinkCheck(), mapVertex);
+    return rdd.map(func, ClassTag$.MODULE$.apply(Object.class)).toJavaRDD();
   }
 
+  /**
+   * Flat map transform.
+   *
+   * @param func function to apply.
+   * @param <O>  output type.
+   * @return the JavaRDD with the extended DAG.
+   */
   @Override
-  public <U> JavaRDD<U> flatMap(final FlatMapFunction<T, U> f) {
-    final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>(dag);
-
-    final IRVertex flatMapVertex = new OperatorVertex(new FlatMapTransform<>(f));
-    builder.addVertex(flatMapVertex, loopVertexStack);
-
-    final IREdge newEdge = new IREdge(getEdgeCommunicationPattern(lastVertex, flatMapVertex),
-        lastVertex, flatMapVertex, new SparkCoder(serializer));
-    newEdge.setProperty(KeyExtractorProperty.of(new SparkKeyExtractor()));
-    builder.connectVertices(newEdge);
-
-    return new JavaRDD<>(this.sparkContext, builder.buildWithoutSourceSinkCheck(), flatMapVertex);
+  public <O> JavaRDD<O> flatMap(final FlatMapFunction<T, O> func) {
+    return rdd.flatMap(func, ClassTag$.MODULE$.apply(Object.class)).toJavaRDD();
   }
 
   /////////////// TRANSFORMATION TO PAIR RDD ///////////////
 
+  /**
+   * @see org.apache.spark.api.java.JavaRDD#mapToPair(PairFunction).
+   */
   @Override
   public <K2, V2> JavaPairRDD<K2, V2> mapToPair(final PairFunction<T, K2, V2> f) {
-    final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>(dag);
-
-    final IRVertex mapToPairVertex = new OperatorVertex(new MapToPairTransform<>(f));
-    builder.addVertex(mapToPairVertex, loopVertexStack);
-
-    final IREdge newEdge = new IREdge(getEdgeCommunicationPattern(lastVertex, mapToPairVertex),
-        lastVertex, mapToPairVertex, new SparkCoder(serializer));
-    newEdge.setProperty(KeyExtractorProperty.of(new SparkKeyExtractor()));
-    builder.connectVertices(newEdge);
-
-    return new JavaPairRDD<>(this.sparkContext, builder.buildWithoutSourceSinkCheck(), mapToPairVertex);
+    final RDD<Tuple2<K2, V2>> pairRdd =
+        rdd.map(SparkFrontendUtils.pairFunctionToPlainFunction(f), ClassTag$.MODULE$.apply(Object.class));
+    return JavaPairRDD.fromRDD(pairRdd);
   }
 
   /////////////// ACTIONS ///////////////
@@ -221,47 +225,21 @@ public static Integer getResultId() {
    */
   @Override
   public T reduce(final Function2<T, T, T> func) {
-    final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>(dag);
-
-    final IRVertex reduceVertex = new OperatorVertex(new ReduceTransform<>(func));
-    builder.addVertex(reduceVertex, loopVertexStack);
-
-    final IREdge newEdge = new IREdge(getEdgeCommunicationPattern(lastVertex, reduceVertex),
-        lastVertex, reduceVertex, new SparkCoder(serializer));
-    newEdge.setProperty(KeyExtractorProperty.of(new SparkKeyExtractor()));
-    builder.connectVertices(newEdge);
-
-    return ReduceTransform.reduceIterator(collect().iterator(), func);
+    return rdd.reduce(func);
   }
 
   @Override
   public List<T> collect() {
-    return SparkFrontendUtils.collect(dag, loopVertexStack, lastVertex, serializer);
+    return rdd.collectAsList();
   }
 
   @Override
   public void saveAsTextFile(final String path) {
-
-    // Check if given path is HDFS path.
-    final boolean isHDFSPath = path.startsWith("hdfs://") || path.startsWith("s3a://") || path.startsWith("file://");
-    final Transform textFileTransform = isHDFSPath
-        ? new HDFSTextFileTransform(path) : new LocalTextFileTransform(path);
-
-    final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>(dag);
-
-    final IRVertex flatMapVertex = new OperatorVertex(textFileTransform);
-    builder.addVertex(flatMapVertex, loopVertexStack);
-
-    final IREdge newEdge = new IREdge(getEdgeCommunicationPattern(lastVertex, flatMapVertex),
-        lastVertex, flatMapVertex, new SparkCoder(serializer));
-    newEdge.setProperty(KeyExtractorProperty.of(new SparkKeyExtractor()));
-    builder.connectVertices(newEdge);
-
-    JobLauncher.launchDAG(builder.build());
+    rdd.saveAsTextFile(path);
   }
 
   /////////////// UNSUPPORTED TRANSFORMATIONS ///////////////
-  //TODO#776: support unimplemented RDD transformation/actions.
+  //TODO#92: Implement the unimplemented transformations/actions & dataset initialization methods for Spark frontend.
 
   @Override
   public JavaRDD<T> cache() {
@@ -365,7 +343,7 @@ public void saveAsTextFile(final String path) {
   }
 
   /////////////// UNSUPPORTED TRANSFORMATION TO PAIR RDD ///////////////
-  //TODO#776: support unimplemented RDD transformation/actions.
+  //TODO#92: Implement the unimplemented transformations/actions & dataset initialization methods for Spark frontend.
 
   @Override
   public <K2, V2> JavaPairRDD<K2, V2> flatMapToPair(final PairFlatMapFunction<T, K2, V2> f) {
@@ -409,7 +387,7 @@ public void saveAsTextFile(final String path) {
   }
 
   /////////////// UNSUPPORTED ACTIONS ///////////////
-  //TODO#776: support unimplemented RDD transformation/actions.
+  //TODO#92: Implement the unimplemented transformations/actions & dataset initialization methods for Spark frontend.
 
   @Override
   public <U> U aggregate(final U zeroValue, final Function2<U, T, U> seqOp, final Function2<U, U, U> combOp) {
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java
index 6e83b7f4..0746be5b 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java
@@ -40,7 +40,7 @@
    */
   public SparkDatasetBoundedSourceVertex(final SparkSession sparkSession, final Dataset<T> dataset) {
     this.readables = new ArrayList<>();
-    final RDD rdd = dataset.rdd();
+    final RDD rdd = dataset.sparkRDD();
     final Partition[] partitions = rdd.getPartitions();
     for (int i = 0; i < partitions.length; i++) {
       readables.add(new SparkDatasetBoundedSourceReadable(
@@ -113,7 +113,7 @@ private SparkDatasetBoundedSourceReadable(final Partition partition,
       final Dataset<T> dataset = SparkSession.initializeDataset(spark, commands);
 
       // Spark does lazy evaluation: it doesn't load the full dataset, but only the partition it is asked for.
-      final RDD<T> rdd = dataset.rdd();
+      final RDD<T> rdd = dataset.sparkRDD();
       return () -> JavaConverters.asJavaIteratorConverter(
           rdd.iterator(rdd.getPartitions()[partitionIndex], TaskContext$.MODULE$.empty())).asJava();
     }
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java
index cac26749..5fab7944 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java
@@ -93,6 +93,7 @@ public void clearInternalStates() {
      * @param partition      the partition to wrap.
      * @param sparkConf      configuration needed to build the SparkContext.
      * @param partitionIndex partition for this readable.
+     * @param inputPath      the input file path.
      * @param numPartitions  the total number of partitions.
      */
     private SparkTextFileBoundedSourceReadable(final Partition partition,
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/sql/Dataset.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/sql/Dataset.java
index e61259aa..ab50be69 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/sql/Dataset.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/sql/Dataset.java
@@ -15,7 +15,8 @@
  */
 package edu.snu.nemo.compiler.frontend.spark.sql;
 
-import edu.snu.nemo.compiler.frontend.spark.core.java.JavaRDD;
+import edu.snu.nemo.compiler.frontend.spark.core.rdd.JavaRDD;
+import edu.snu.nemo.compiler.frontend.spark.core.rdd.RDD;
 import org.apache.spark.api.java.function.MapFunction;
 import org.apache.spark.api.java.function.MapPartitionsFunction;
 import org.apache.spark.sql.Column;
@@ -73,6 +74,32 @@ private Dataset(final SparkSession sparkSession, final LogicalPlan logicalPlan,
     return JavaRDD.of((SparkSession) super.sparkSession(), this);
   }
 
+  /**
+   * Create a actual {@link RDD} component of Spark to get the source data.
+   * This method should not be called by any user program.
+   *
+   * @return a Spark RDD from this dataset.
+   */
+  public org.apache.spark.rdd.RDD<T> sparkRDD() {
+    return super.rdd();
+  }
+
+  /**
+   * Create a {@link RDD} component from this data set.
+   * To transparently give our RDD to user programs, this method have to be overridden.
+   *
+   * By overriding this method, if a method (such as reduce) of super ({@link org.apache.spark.sql.Dataset}) is called
+   * and it uses super's rdd, the rdd will be our rdd returned by this method.
+   * This is an intended behavior and the result will be calculated by our system.
+   *
+   * @return the new RDD component.
+   */
+   @Override
+   public RDD<T> rdd() {
+     final JavaRDD<T> javaRDD = JavaRDD.of((SparkSession) super.sparkSession(), this);
+     return javaRDD.rdd();
+   }
+
   @Override
   public Dataset<Row> agg(final Column expr, final Column... exprs) {
     final boolean userTriggered = initializeFunction(expr, exprs);
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/sql/SparkSession.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/sql/SparkSession.java
index 5b501095..65515cc3 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/sql/SparkSession.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/sql/SparkSession.java
@@ -15,9 +15,15 @@
  */
 package edu.snu.nemo.compiler.frontend.spark.sql;
 
+import edu.snu.nemo.client.JobLauncher;
+import edu.snu.nemo.compiler.frontend.spark.core.SparkContext;
+import edu.snu.nemo.conf.JobConf;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
 import org.apache.spark.SparkConf;
-import org.apache.spark.SparkContext;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.rdd.RDD;
 import org.apache.spark.sql.Encoder;
@@ -46,9 +52,10 @@
    * Constructor.
    *
    * @param sparkContext the spark context for the session.
-   * @param initialConf initial spark session configuration.
+   * @param initialConf  initial spark session configuration.
    */
-  private SparkSession(final SparkContext sparkContext, final Map<String, String> initialConf) {
+  private SparkSession(final SparkContext sparkContext,
+                       final Map<String, String> initialConf) {
     super(sparkContext);
     this.datasetCommandsList = new LinkedHashMap<>();
     this.initialConf = initialConf;
@@ -251,9 +258,9 @@ public DataFrameReader read() {
    * @param initialConf  initial configuration of the spark session.
    * @return our spark session class.
    */
-  public static SparkSession from(final org.apache.spark.sql.SparkSession sparkSession,
-                                  final Map<String, String> initialConf) {
-    return new SparkSession(sparkSession.sparkContext(), initialConf);
+  private static SparkSession from(final org.apache.spark.sql.SparkSession sparkSession,
+                                   final Map<String, String> initialConf) {
+    return new SparkSession((SparkContext) sparkSession.sparkContext(), initialConf);
   }
 
   /**
@@ -306,7 +313,7 @@ public Builder config(final String key, final String value) {
 
     @Override
     public Builder master(final String master) {
-      return (Builder) super.master(master);
+      return config("spark.master", master);
     }
 
     @Override
@@ -317,6 +324,22 @@ public SparkSession getOrCreate() {
 
       UserGroupInformation.setLoginUser(UserGroupInformation.createRemoteUser("ubuntu"));
 
+      // Set up spark context with given options.
+      final SparkConf sparkConf = new SparkConf();
+      if (!options.containsKey("spark.app.name")) {
+        try {
+          // get and override configurations from JobLauncher.
+          final Configuration configurations = JobLauncher.getBuiltJobConf();
+          final Injector injector = Tang.Factory.getTang().newInjector(configurations);
+          options.put("spark.app.name", injector.getNamedInstance(JobConf.JobId.class));
+        } catch (final InjectionException e) {
+          throw new RuntimeException(e);
+        }
+      }
+      options.forEach(sparkConf::set);
+      final SparkContext sparkContext = new edu.snu.nemo.compiler.frontend.spark.core.SparkContext(sparkConf);
+      super.sparkContext(sparkContext);
+
       return SparkSession.from(super.getOrCreate(), this.options);
     }
   }
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/CollectTransform.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/CollectTransform.java
index 702e3bd3..6cdd7ecd 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/CollectTransform.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/CollectTransform.java
@@ -17,7 +17,7 @@
 
 import edu.snu.nemo.common.ir.OutputCollector;
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
-import edu.snu.nemo.compiler.frontend.spark.core.java.JavaRDD;
+import edu.snu.nemo.compiler.frontend.spark.core.rdd.JavaRDD;
 
 import java.io.FileOutputStream;
 import java.io.ObjectOutputStream;
@@ -57,11 +57,14 @@ public void onData(final T element) {
   @Override
   public void close() {
     try (
-        FileOutputStream fos = new FileOutputStream(filename);
-        ObjectOutputStream oos = new ObjectOutputStream(fos)
+        final FileOutputStream fos = new FileOutputStream(filename);
+        final ObjectOutputStream oos = new ObjectOutputStream(fos)
     ) {
-      oos.writeObject(list);
-      oos.close();
+      // Write the length of list at first. This is needed internally and must not shown in the collected result.
+      oos.writeInt(list.size());
+      for (final T t : list) {
+        oos.writeObject(t);
+      }
     } catch (Exception e) {
       throw new RuntimeException("Exception while file closing in CollectTransform " + e);
     }
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/HDFSTextFileTransform.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/HDFSTextFileTransform.java
index 03f3215c..2a38b359 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/HDFSTextFileTransform.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/HDFSTextFileTransform.java
@@ -30,9 +30,8 @@
 /**
  * Transform which saves elements to a HDFS text file for Spark.
  * @param <I> input type.
- * @param <O> output type.
  */
-public final class HDFSTextFileTransform<I, O> implements Transform<I, O> {
+public final class HDFSTextFileTransform<I> implements Transform<I, String> {
   private final String path;
   private Path fileName;
   private List<I> elements;
@@ -47,7 +46,7 @@ public HDFSTextFileTransform(final String path) {
   }
 
   @Override
-  public void prepare(final Context context, final OutputCollector<O> outputCollector) {
+  public void prepare(final Transform.Context context, final OutputCollector<String> outputCollector) {
     fileName = new Path(path + UUID.randomUUID().toString());
     this.elements = new ArrayList<>();
   }
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/LocalTextFileTransform.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/LocalTextFileTransform.java
index 8fe7bcf4..a233a260 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/LocalTextFileTransform.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/LocalTextFileTransform.java
@@ -26,9 +26,8 @@
 /**
  * Transform which saves elements to a local text file for Spark.
  * @param <I> input type.
- * @param <O> output type.
  */
-public final class LocalTextFileTransform<I, O> implements Transform<I, O> {
+public final class LocalTextFileTransform<I> implements Transform<I, String> {
   private final String path;
   private String fileName;
   private List<I> elements;
@@ -43,7 +42,7 @@ public LocalTextFileTransform(final String path) {
   }
 
   @Override
-  public void prepare(final Context context, final OutputCollector<O> outputCollector) {
+  public void prepare(final Context context, final OutputCollector<String> outputCollector) {
     fileName = path + UUID.randomUUID().toString();
     this.elements = new ArrayList<>();
   }
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/MapTransform.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/MapTransform.java
index 976eb502..14e68895 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/MapTransform.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/MapTransform.java
@@ -41,6 +41,7 @@ public void prepare(final Context context, final OutputCollector<O> oc) {
     this.outputCollector = oc;
   }
 
+  @Override
   public void onData(final I element) {
       try {
         outputCollector.emit(func.call(element));
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/ReduceTransform.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/ReduceTransform.java
index e22d125d..853d8e0b 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/ReduceTransform.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/transform/ReduceTransform.java
@@ -19,7 +19,7 @@
 import com.esotericsoftware.kryo.io.Output;
 import edu.snu.nemo.common.ir.OutputCollector;
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
-import edu.snu.nemo.compiler.frontend.spark.core.java.JavaRDD;
+import edu.snu.nemo.compiler.frontend.spark.core.rdd.JavaRDD;
 import org.apache.spark.api.java.function.Function2;
 
 import javax.annotation.Nullable;
diff --git a/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/PairRDDFunctions.scala b/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/PairRDDFunctions.scala
new file mode 100644
index 00000000..bb1c496d
--- /dev/null
+++ b/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/PairRDDFunctions.scala
@@ -0,0 +1,324 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.frontend.spark.core.rdd
+
+import java.util
+
+import edu.snu.nemo.common.dag.DAGBuilder
+import edu.snu.nemo.common.ir.edge.IREdge
+import edu.snu.nemo.common.ir.edge.executionproperty.KeyExtractorProperty
+import edu.snu.nemo.common.ir.vertex.{IRVertex, LoopVertex, OperatorVertex}
+import edu.snu.nemo.compiler.frontend.spark.SparkKeyExtractor
+import edu.snu.nemo.compiler.frontend.spark.coder.SparkCoder
+import edu.snu.nemo.compiler.frontend.spark.core.SparkFrontendUtils
+import edu.snu.nemo.compiler.frontend.spark.transform.ReduceByKeyTransform
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.compress.CompressionCodec
+import org.apache.hadoop.mapred.{JobConf, OutputFormat}
+import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
+import org.apache.spark.api.java.function.Function2
+import org.apache.spark.partial.{BoundedDouble, PartialResult}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.{Partitioner, rdd}
+
+import scala.reflect.ClassTag
+
+/**
+ * Extra functions available on RDDs of (key, value) pairs through an implicit conversion in Nemo.
+ */
+final class PairRDDFunctions[K: ClassTag, V: ClassTag] protected[rdd] (
+    self: RDD[(K, V)]) extends org.apache.spark.rdd.PairRDDFunctions[K, V](self) {
+
+  private val loopVertexStack = new util.Stack[LoopVertex]
+
+  /////////////// WRAPPER FUNCTIONS /////////////
+
+  /**
+   * A scala wrapper for reduceByKey transformation.
+   */
+  override def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {
+    val javaFunc = SparkFrontendUtils.toJavaFunction(func)
+    reduceByKey(javaFunc)
+  }
+
+  /////////////// TRANSFORMATIONS ///////////////
+
+  /**
+   * Merge the values for each key using an associative and commutative reduce function. This will
+   * also perform the merging locally on each mapper before sending results to a reducer, similarly
+   * to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
+   * parallelism level.
+   */
+  protected[rdd] def reduceByKey(javaFunc: Function2[V, V, V]): RDD[(K, V)] = {
+    val builder = new DAGBuilder[IRVertex, IREdge](self.dag)
+
+    val reduceByKeyVertex = new OperatorVertex(new ReduceByKeyTransform[K, V](javaFunc))
+    builder.addVertex(reduceByKeyVertex, loopVertexStack)
+
+    val newEdge = new IREdge(SparkFrontendUtils.getEdgeCommunicationPattern(self.lastVertex, reduceByKeyVertex),
+      self.lastVertex, reduceByKeyVertex, new SparkCoder[Tuple2[K, V]](self.serializer))
+    newEdge.setProperty(KeyExtractorProperty.of(new SparkKeyExtractor))
+    builder.connectVertices(newEdge)
+
+    new RDD[(K, V)](self._sc, builder.buildWithoutSourceSinkCheck, reduceByKeyVertex, Option.empty)
+  }
+
+  /////////////// UNSUPPORTED METHODS ///////////////
+  //TODO#92: Implement the unimplemented transformations/actions & dataset initialization methods for Spark frontend.
+  override def combineByKeyWithClassTag[C](createCombiner: V => C, mergeValue: (C, V) => C,
+                                           mergeCombiners: (C, C) => C, partitioner: Partitioner,
+                                           mapSideCombine: Boolean, serializer: Serializer)
+                                          (implicit ct: ClassTag[C]): RDD[(K, C)] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C,
+                               mergeCombiners: (C, C) => C, partitioner: Partitioner,
+                               mapSideCombine: Boolean, serializer: Serializer): RDD[(K, C)] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C,
+                               mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def combineByKeyWithClassTag[C](createCombiner: V => C, mergeValue: (C, V) => C,
+                                           mergeCombiners: (C, C) => C, numPartitions: Int)
+                                          (implicit ct: ClassTag[C]): RDD[(K, C)] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def aggregateByKey[U](zeroValue: U, partitioner: Partitioner)
+                                (seqOp: (U, V) => U, combOp: (U, U) => U)
+                                (implicit evidence$1: ClassTag[U]): RDD[(K, U)] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def aggregateByKey[U](zeroValue: U, numPartitions: Int)
+                                (seqOp: (U, V) => U, combOp: (U, U) => U)
+                                (implicit evidence$2: ClassTag[U]): RDD[(K, U)] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U)
+                                (implicit evidence$3: ClassTag[U]): RDD[(K, U)] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def sampleByKey(withReplacement: Boolean,
+                           fractions: collection.Map[K, Double], seed: Long): RDD[(K, V)] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def sampleByKeyExact(withReplacement: Boolean,
+                                fractions: collection.Map[K, Double], seed: Long): RDD[(K, V)] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def reduceByKeyLocally(func: (V, V) => V): collection.Map[K, V] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def countByKey(): collection.Map[K, Long] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def countByKeyApprox(timeout: Long, confidence: Double): PartialResult[collection.Map[K, BoundedDouble]] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def countApproxDistinctByKey(p: Int, sp: Int, partitioner: Partitioner): RDD[(K, Long)] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def countApproxDistinctByKey(relativeSD: Double): RDD[(K, Long)] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def partitionBy(partitioner: Partitioner): RDD[(K, V)] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def join[W](other: rdd.RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def leftOuterJoin[W](other: rdd.RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def rightOuterJoin[W](other: rdd.RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def fullOuterJoin[W](other: rdd.RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], Option[W]))] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C,
+                               mergeCombiners: (C, C) => C): RDD[(K, C)] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def combineByKeyWithClassTag[C](createCombiner: V => C, mergeValue: (C, V) => C,
+                                           mergeCombiners: (C, C) => C)
+                                          (implicit ct: ClassTag[C]): RDD[(K, C)] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def groupByKey(): RDD[(K, Iterable[V])] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def join[W](other: rdd.RDD[(K, W)]): RDD[(K, (V, W))] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def join[W](other: rdd.RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def leftOuterJoin[W](other: rdd.RDD[(K, W)]): RDD[(K, (V, Option[W]))] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def leftOuterJoin[W](other: rdd.RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def rightOuterJoin[W](other: rdd.RDD[(K, W)]): RDD[(K, (Option[V], W))] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def rightOuterJoin[W](other: rdd.RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def fullOuterJoin[W](other: rdd.RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def fullOuterJoin[W](other: rdd.RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def collectAsMap(): collection.Map[K, V] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def mapValues[U](f: V => U): RDD[(K, U)] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def cogroup[W1, W2, W3](other1: rdd.RDD[(K, W1)], other2: rdd.RDD[(K, W2)],
+                                   other3: rdd.RDD[(K, W3)], partitioner: Partitioner)
+  : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def cogroup[W](other: rdd.RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def cogroup[W1, W2](other1: rdd.RDD[(K, W1)],
+                               other2: rdd.RDD[(K, W2)],
+                               partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def cogroup[W1, W2, W3](other1: rdd.RDD[(K, W1)], other2: rdd.RDD[(K, W2)],
+                                   other3: rdd.RDD[(K, W3)])
+  : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def cogroup[W](other: rdd.RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def cogroup[W1, W2](other1: rdd.RDD[(K, W1)], other2: rdd.RDD[(K, W2)])
+  : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def cogroup[W](other: rdd.RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def cogroup[W1, W2](other1: rdd.RDD[(K, W1)],
+                               other2: rdd.RDD[(K, W2)],
+                               numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def cogroup[W1, W2, W3](other1: rdd.RDD[(K, W1)], other2: rdd.RDD[(K, W2)],
+                                   other3: rdd.RDD[(K, W3)], numPartitions: Int)
+  : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def groupWith[W](other: rdd.RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def groupWith[W1, W2](other1: rdd.RDD[(K, W1)], other2: rdd.RDD[(K, W2)])
+  : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def groupWith[W1, W2, W3](other1: rdd.RDD[(K, W1)], other2: rdd.RDD[(K, W2)], other3: rdd.RDD[(K, W3)])
+  : rdd.RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def subtractByKey[W](other: rdd.RDD[(K, W)])(implicit evidence$4: ClassTag[W]): RDD[(K, V)] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def subtractByKey[W](other: rdd.RDD[(K, W)], numPartitions: Int)
+                               (implicit evidence$5: ClassTag[W]): RDD[(K, V)] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def subtractByKey[W](other: rdd.RDD[(K, W)], p: Partitioner)
+                               (implicit evidence$6: ClassTag[W]): RDD[(K, V)] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def lookup(key: K): Seq[V] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]): Unit =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String, codec: Class[_ <: CompressionCodec])
+                                                        (implicit fm: ClassTag[F]): Unit =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]): Unit =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def saveAsNewAPIHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_],
+                                      outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
+                                      conf: Configuration): Unit =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_],
+                                outputFormatClass: Class[_ <: OutputFormat[_, _]],
+                                codec: Class[_ <: CompressionCodec]): Unit =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_],
+                                outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: JobConf,
+                                codec: Option[Class[_ <: CompressionCodec]]): Unit =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def saveAsNewAPIHadoopDataset(conf: Configuration): Unit =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def saveAsHadoopDataset(conf: JobConf): Unit =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def keys: RDD[K] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def values: RDD[V] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+}
diff --git a/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/RDD.scala b/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/RDD.scala
new file mode 100644
index 00000000..cfa144df
--- /dev/null
+++ b/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/RDD.scala
@@ -0,0 +1,521 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.frontend.spark.core.rdd
+
+import java.util
+
+import edu.snu.nemo.client.JobLauncher
+import edu.snu.nemo.common.dag.{DAG, DAGBuilder}
+import edu.snu.nemo.common.ir.edge.IREdge
+import edu.snu.nemo.common.ir.edge.executionproperty.KeyExtractorProperty
+import edu.snu.nemo.common.ir.vertex.{IRVertex, LoopVertex, OperatorVertex}
+import edu.snu.nemo.compiler.frontend.spark.SparkKeyExtractor
+import edu.snu.nemo.compiler.frontend.spark.coder.SparkCoder
+import edu.snu.nemo.compiler.frontend.spark.core.SparkFrontendUtils
+import edu.snu.nemo.compiler.frontend.spark.transform._
+import org.apache.hadoop.io.WritableFactory
+import org.apache.hadoop.io.compress.CompressionCodec
+import org.apache.spark.api.java.function.{FlatMapFunction, Function, Function2}
+import org.apache.spark.partial.{BoundedDouble, PartialResult}
+import org.apache.spark.rdd.{AsyncRDDActions, DoubleRDDFunctions, OrderedRDDFunctions, PartitionCoalescer, SequenceFileRDDFunctions}
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.{Dependency, Partition, Partitioner, SparkContext, TaskContext}
+
+import scala.language.implicitConversions
+import scala.reflect.ClassTag
+
+/**
+ * RDD for Nemo.
+ */
+final class RDD[T: ClassTag] protected[rdd] (
+    protected[rdd] val _sc: SparkContext,
+    private val deps: Seq[Dependency[_]],
+    protected[rdd] val dag: DAG[IRVertex, IREdge],
+    protected[rdd] val lastVertex: IRVertex,
+    private val sourceRDD: Option[org.apache.spark.rdd.RDD[T]]) extends org.apache.spark.rdd.RDD[T](_sc, deps) {
+
+  private val loopVertexStack = new util.Stack[LoopVertex]
+  protected[rdd] val serializer: Serializer = SparkFrontendUtils.deriveSerializerFrom(_sc)
+
+  /**
+   * Constructor without dependencies (not needed in Nemo RDD).
+   *
+   * @param sparkContext the spark context.
+   */
+  protected[rdd] def this(sparkContext: SparkContext,
+                          dagFrom: DAG[IRVertex, IREdge],
+                          lastVertexFrom: IRVertex,
+                          sourceRDDFrom: Option[org.apache.spark.rdd.RDD[T]]) = {
+    this(sparkContext, Nil, dagFrom, lastVertexFrom, sourceRDDFrom)
+  }
+
+  /**
+   * @return converted JavaRDD.
+   */
+  override def toJavaRDD() : JavaRDD[T] = {
+    new JavaRDD[T](this)
+  }
+
+  /**
+   * Not supported yet.
+   */
+  override def getPartitions: Array[Partition] = {
+    throw new UnsupportedOperationException("Operation unsupported.")
+  }
+
+  /**
+   * Not supported yet.
+   */
+  override def compute(split: Partition, context: TaskContext): Iterator[T] = {
+    throw new UnsupportedOperationException("Operation unsupported.")
+  }
+
+  /////////////// WRAPPER FUNCTIONS /////////////
+
+  /**
+   * A scala wrapper for map transformation.
+   */
+  override def map[U](f: (T) => U)(implicit evidence$3: ClassManifest[U]): RDD[U] = {
+    val javaFunc = SparkFrontendUtils.toJavaFunction(f)
+    map(javaFunc)
+  }
+
+  /**
+   * A scala wrapper for flatMap transformation.
+   */
+  override def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = {
+    val javaFunc = SparkFrontendUtils.toJavaFlatMapFunction(f)
+    flatMap(javaFunc)
+  }
+
+  /**
+   * A scala wrapper for reduce action.
+   */
+  override def reduce(f: (T, T) => T): T = {
+    val javaFunc = SparkFrontendUtils.toJavaFunction(f)
+    reduce(javaFunc)
+  }
+
+  /**
+   * A scala wrapper for collect action.
+   *
+   * @return the collected value.
+   * @note This method should only be used if the resulting array is expected to be small, as
+   *       all the data is loaded into the driver's memory.
+   */
+  override def collect(): Array[T] =
+    collectAsList().toArray().asInstanceOf[Array[T]]
+
+  /////////////// TRANSFORMATIONS ///////////////
+
+  /**
+   * Return a new RDD by applying a function to all elements of this RDD.
+   */
+  protected[rdd] def map[U: ClassTag](javaFunc: Function[T, U]): RDD[U] = {
+    val builder: DAGBuilder[IRVertex, IREdge] = new DAGBuilder[IRVertex, IREdge](dag)
+
+    val mapVertex: IRVertex = new OperatorVertex(new MapTransform[T, U](javaFunc))
+    builder.addVertex(mapVertex, loopVertexStack)
+
+    val newEdge: IREdge = new IREdge(SparkFrontendUtils.getEdgeCommunicationPattern(lastVertex, mapVertex),
+      lastVertex, mapVertex, new SparkCoder[T](serializer))
+    newEdge.setProperty(KeyExtractorProperty.of(new SparkKeyExtractor))
+    builder.connectVertices(newEdge)
+
+    new RDD[U](_sc, builder.buildWithoutSourceSinkCheck, mapVertex, Option.empty)
+  }
+
+  /**
+   * Return a new RDD by first applying a function to all elements of this
+   * RDD, and then flattening the results.
+   */
+  protected[rdd] def flatMap[U: ClassTag](javaFunc: FlatMapFunction[T, U]): RDD[U] = {
+    val builder = new DAGBuilder[IRVertex, IREdge](dag)
+
+    val flatMapVertex = new OperatorVertex(new FlatMapTransform[T, U](javaFunc))
+    builder.addVertex(flatMapVertex, loopVertexStack)
+
+    val newEdge = new IREdge(SparkFrontendUtils.getEdgeCommunicationPattern(lastVertex, flatMapVertex),
+      lastVertex, flatMapVertex, new SparkCoder[T](serializer))
+    newEdge.setProperty(KeyExtractorProperty.of(new SparkKeyExtractor))
+    builder.connectVertices(newEdge)
+
+    new RDD[U](_sc, builder.buildWithoutSourceSinkCheck, flatMapVertex, Option.empty)
+  }
+
+  /////////////// ACTIONS ///////////////
+
+  /**
+   * Return a list that contains all of the elements in this RDD.
+   *
+   * @note This method should only be used if the resulting array is expected to be small, as
+   *       all the data is loaded into the driver's memory.
+   */
+  protected[rdd] def collectAsList(): util.List[T] =
+    SparkFrontendUtils.collect(dag, loopVertexStack, lastVertex, serializer)
+
+  /**
+   * Reduces the elements of this RDD using the specified commutative and
+   * associative binary operator.
+   */
+  protected[rdd] def reduce(javaFunc: Function2[T, T, T]): T = {
+    val builder = new DAGBuilder[IRVertex, IREdge](dag)
+
+    val reduceVertex = new OperatorVertex(new ReduceTransform[T](javaFunc))
+    builder.addVertex(reduceVertex, loopVertexStack)
+
+    val newEdge = new IREdge(SparkFrontendUtils.getEdgeCommunicationPattern(lastVertex, reduceVertex),
+      lastVertex, reduceVertex, new SparkCoder[T](serializer))
+    newEdge.setProperty(KeyExtractorProperty.of(new SparkKeyExtractor))
+
+    builder.connectVertices(newEdge)
+    ReduceTransform.reduceIterator(
+      SparkFrontendUtils.collect(dag, loopVertexStack, lastVertex, serializer).iterator(), javaFunc)
+  }
+
+  /**
+   * Save this RDD as a text file, using string representations of elements.
+   */
+  override def saveAsTextFile(path: String): Unit = {
+    // Check if given path is HDFS path.
+    val isHDFSPath = path.startsWith("hdfs://") || path.startsWith("s3a://") || path.startsWith("file://")
+    val textFileTransform =
+      if (isHDFSPath) new HDFSTextFileTransform[T](path)
+      else new LocalTextFileTransform[T](path)
+
+    val builder = new DAGBuilder[IRVertex, IREdge](dag)
+    val flatMapVertex = new OperatorVertex(textFileTransform)
+
+    builder.addVertex(flatMapVertex, loopVertexStack)
+    val newEdge = new IREdge(SparkFrontendUtils.getEdgeCommunicationPattern(lastVertex, flatMapVertex),
+      lastVertex, flatMapVertex, new SparkCoder[T](serializer))
+    newEdge.setProperty(KeyExtractorProperty.of(new SparkKeyExtractor))
+
+    builder.connectVertices(newEdge)
+    JobLauncher.launchDAG(builder.build)
+  }
+
+  /////////////// UNSUPPORTED METHODS ///////////////
+  //TODO#92: Implement the unimplemented transformations/actions & dataset initialization methods for Spark frontend.
+
+  override protected def getDependencies: Seq[Dependency[_]] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override protected def getPreferredLocations(split: Partition): Seq[String] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def sparkContext: SparkContext = super.sparkContext
+
+  override def setName(_name: String): RDD.this.type =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def persist(newLevel: StorageLevel): RDD.this.type =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def persist(): RDD.this.type =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def cache(): RDD.this.type =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def unpersist(blocking: Boolean): RDD.this.type =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def getStorageLevel: StorageLevel =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def filter(f: (T) => Boolean): RDD[T] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def distinct(numPartitions: Int)(implicit ord: Ordering[T]): RDD[T] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def distinct(): RDD[T] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def repartition(numPartitions: Int)(implicit ord: Ordering[T]): RDD[T] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def coalesce(numPartitions: Int, shuffle: Boolean, partitionCoalescer: Option[PartitionCoalescer])
+                       (implicit ord: Ordering[T]): RDD[T] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def sample(withReplacement: Boolean, fraction: Double, seed: Long): RDD[T] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def randomSplit(weights: Array[Double], seed: Long): Array[org.apache.spark.rdd.RDD[T]] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def takeSample(withReplacement: Boolean, num: Int, seed: Long): Array[T] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def union(other: org.apache.spark.rdd.RDD[T]): RDD[T] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def ++(other: org.apache.spark.rdd.RDD[T]): RDD[T] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def sortBy[K](f: (T) => K, ascending: Boolean, numPartitions: Int)
+                        (implicit ord: Ordering[K], ctag: ClassManifest[K]): RDD[T] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def intersection(other: org.apache.spark.rdd.RDD[T]): RDD[T] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def intersection(other: org.apache.spark.rdd.RDD[T], partitioner: Partitioner)
+                           (implicit ord: Ordering[T]): RDD[T] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def intersection(other: org.apache.spark.rdd.RDD[T], numPartitions: Int): RDD[T] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def glom(): RDD[Array[T]] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def cartesian[U](other: org.apache.spark.rdd.RDD[U])(implicit evidence$5: ClassManifest[U]): RDD[(T, U)] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def groupBy[K](f: (T) => K)(implicit kt: ClassManifest[K]): RDD[(K, Iterable[T])] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def groupBy[K](f: (T) => K, numPartitions: Int)(implicit kt: ClassManifest[K]): RDD[(K, Iterable[T])] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def groupBy[K](f: (T) => K, p: Partitioner)
+                         (implicit kt: ClassManifest[K], ord: Ordering[K]): RDD[(K, Iterable[T])] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def pipe(command: String): RDD[String] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def pipe(command: String, env: scala.collection.Map[String, String]): RDD[String] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def pipe(command: Seq[String], env: scala.collection.Map[String, String],
+                    printPipeContext: ((String) => Unit) => Unit,
+                    printRDDElement: (T, (String) => Unit) => Unit, separateWorkingDir: Boolean,
+                    bufferSize: Int, encoding: String): RDD[String] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def mapPartitions[U](f: (Iterator[T]) => Iterator[U], preservesPartitioning: Boolean)
+                               (implicit evidence$6: ClassManifest[U]): RDD[U] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def mapPartitionsWithIndex[U](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean)
+                                        (implicit evidence$9: ClassManifest[U]): RDD[U] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def zip[U](other: org.apache.spark.rdd.RDD[U])(implicit evidence$10: ClassManifest[U]): RDD[(T, U)] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def zipPartitions[B, V](rdd2: org.apache.spark.rdd.RDD[B], preservesPartitioning: Boolean)
+                                  (f: (Iterator[T], Iterator[B]) => Iterator[V])
+                                  (implicit evidence$11: ClassManifest[B], evidence$12: ClassManifest[V]): RDD[V] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def zipPartitions[B, V](rdd2: org.apache.spark.rdd.RDD[B])(f: (Iterator[T], Iterator[B]) => Iterator[V])
+                                  (implicit evidence$13: ClassManifest[B], evidence$14: ClassManifest[V]): RDD[V] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def zipPartitions[B, C, V](rdd2: org.apache.spark.rdd.RDD[B],
+                                      rdd3: org.apache.spark.rdd.RDD[C], preservesPartitioning: Boolean)
+                                     (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V])
+                                     (implicit evidence$15: ClassManifest[B], evidence$16: ClassManifest[C],
+                                      evidence$17: ClassManifest[V]): RDD[V] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def zipPartitions[B, C, V](rdd2: org.apache.spark.rdd.RDD[B],
+                                      rdd3: org.apache.spark.rdd.RDD[C])
+                                     (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V])
+                                     (implicit evidence$18: ClassManifest[B], evidence$19: ClassManifest[C],
+                                      evidence$20: ClassManifest[V]): RDD[V] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def zipPartitions[B, C, D, V](rdd2: org.apache.spark.rdd.RDD[B],
+                                         rdd3: org.apache.spark.rdd.RDD[C],
+                                         rdd4: org.apache.spark.rdd.RDD[D], preservesPartitioning: Boolean)
+                                        (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V])
+                                        (implicit evidence$21: ClassManifest[B], evidence$22: ClassManifest[C],
+                                         evidence$23: ClassManifest[D], evidence$24: ClassManifest[V]): RDD[V] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def zipPartitions[B, C, D, V](rdd2: org.apache.spark.rdd.RDD[B],
+                                         rdd3: org.apache.spark.rdd.RDD[C],
+                                         rdd4: org.apache.spark.rdd.RDD[D])
+                                        (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V])
+                                        (implicit evidence$25: ClassManifest[B], evidence$26: ClassManifest[C],
+                                         evidence$27: ClassManifest[D], evidence$28: ClassManifest[V]): RDD[V] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def foreach(f: (T) => Unit): Unit =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def foreachPartition(f: (Iterator[T]) => Unit): Unit =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def toLocalIterator: Iterator[T] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def collect[U](f: PartialFunction[T, U])(implicit evidence$29: ClassManifest[U]): RDD[U] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def subtract(other: org.apache.spark.rdd.RDD[T]): RDD[T] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def subtract(other: org.apache.spark.rdd.RDD[T], numPartitions: Int): RDD[T] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def subtract(other: org.apache.spark.rdd.RDD[T], p: Partitioner)(implicit ord: Ordering[T]): RDD[T] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def treeReduce(f: (T, T) => T, depth: Int): T =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def fold(zeroValue: T)(op: (T, T) => T): T =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def aggregate[U](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U)
+                           (implicit evidence$30: ClassManifest[U]): U =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def treeAggregate[U](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U, depth: Int)
+                               (implicit evidence$31: ClassManifest[U]): U =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def count(): Long =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def countApprox(timeout: Long, confidence: Double): PartialResult[BoundedDouble] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def countByValue()(implicit ord: Ordering[T]): Map[T, Long] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def countByValueApprox(timeout: Long, confidence: Double)
+                                 (implicit ord: Ordering[T]): PartialResult[scala.collection.Map[T, BoundedDouble]] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def countApproxDistinct(p: Int, sp: Int): Long =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def countApproxDistinct(relativeSD: Double): Long =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def zipWithIndex(): RDD[(T, Long)] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def zipWithUniqueId(): RDD[(T, Long)] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def take(num: Int): Array[T] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def first(): T =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def top(num: Int)(implicit ord: Ordering[T]): Array[T] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def max()(implicit ord: Ordering[T]): T =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def min()(implicit ord: Ordering[T]): T =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def isEmpty(): Boolean =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def saveAsObjectFile(path: String): Unit =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def keyBy[K](f: (T) => K): RDD[(K, T)] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def checkpoint(): Unit =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def localCheckpoint(): RDD.this.type =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def isCheckpointed: Boolean =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def getCheckpointFile: Option[String] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def firstParent[U](implicit evidence$32: ClassManifest[U]): RDD[U] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def parent[U](j: Int)(implicit evidence$33: ClassManifest[U]): RDD[U] =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def context: SparkContext =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override protected def clearDependencies(): Unit =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def toDebugString: String =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+
+  override def toString(): String =
+    throw new UnsupportedOperationException("Operation not yet implemented.")
+}
+
+/**
+ * Defines implicit functions that provide extra functionalities on RDDs of specific types.
+ *
+ * For example, [[RDD.rddToPairRDDFunctions]] converts an RDD into a [[PairRDDFunctions]] for
+ * key-value-pair RDDs, and enabling extra functionalities such as `PairRDDFunctions.reduceByKey`.
+ */
+object RDD {
+  implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
+    (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
+    new PairRDDFunctions(rdd)
+  }
+
+  implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]): AsyncRDDActions[T] = {
+    throw new UnsupportedOperationException("Operation unsupported.")
+  }
+
+  implicit def rddToSequenceFileRDDFunctions[K, V](rdd: RDD[(K, V)])
+    (implicit kt: ClassTag[K], vt: ClassTag[V],
+     keyWritableFactory: WritableFactory,
+     valueWritableFactory: WritableFactory): SequenceFileRDDFunctions[K, V] = {
+    throw new UnsupportedOperationException("Operation unsupported.")
+  }
+
+  implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](rdd: RDD[(K, V)])
+  : OrderedRDDFunctions[K, V, (K, V)] = {
+    throw new UnsupportedOperationException("Operation unsupported.")
+  }
+
+  implicit def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]): DoubleRDDFunctions = {
+    throw new UnsupportedOperationException("Operation unsupported.")
+  }
+
+  implicit def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T])
+  : DoubleRDDFunctions = {
+    throw new UnsupportedOperationException("Operation unsupported.")
+  }
+}
diff --git a/examples/spark/src/main/java/edu/snu/nemo/examples/spark/JavaMapReduce.java b/examples/spark/src/main/java/edu/snu/nemo/examples/spark/JavaMapReduce.java
index e07b80b8..7d19327b 100644
--- a/examples/spark/src/main/java/edu/snu/nemo/examples/spark/JavaMapReduce.java
+++ b/examples/spark/src/main/java/edu/snu/nemo/examples/spark/JavaMapReduce.java
@@ -16,9 +16,9 @@
  */
 package edu.snu.nemo.examples.spark;
 
-import edu.snu.nemo.compiler.frontend.spark.core.java.JavaPairRDD;
-import edu.snu.nemo.compiler.frontend.spark.core.java.JavaRDD;
-import edu.snu.nemo.compiler.frontend.spark.core.java.JavaSparkContext;
+import edu.snu.nemo.compiler.frontend.spark.core.rdd.JavaPairRDD;
+import edu.snu.nemo.compiler.frontend.spark.core.rdd.JavaRDD;
+import edu.snu.nemo.compiler.frontend.spark.core.JavaSparkContext;
 import edu.snu.nemo.compiler.frontend.spark.sql.SparkSession;
 import scala.Tuple2;
 
@@ -48,7 +48,7 @@ public static void main(final String[] args) throws Exception {
 
     final SparkSession.Builder sparkBuilder = SparkSession
         .builder()
-        .appName("JavaWordCount");
+        .appName("JavaMapReduce");
     if (yarn) {
       sparkBuilder
           .config("mapreduce.input.fileinputformat.input.dir.recursive", "true")
diff --git a/examples/spark/src/main/java/edu/snu/nemo/examples/spark/JavaSparkPi.java b/examples/spark/src/main/java/edu/snu/nemo/examples/spark/JavaSparkPi.java
index dfa29b3c..3be5d2b5 100644
--- a/examples/spark/src/main/java/edu/snu/nemo/examples/spark/JavaSparkPi.java
+++ b/examples/spark/src/main/java/edu/snu/nemo/examples/spark/JavaSparkPi.java
@@ -16,8 +16,8 @@
  */
 package edu.snu.nemo.examples.spark;
 
-import edu.snu.nemo.compiler.frontend.spark.core.java.JavaRDD;
-import edu.snu.nemo.compiler.frontend.spark.core.java.JavaSparkContext;
+import edu.snu.nemo.compiler.frontend.spark.core.rdd.JavaRDD;
+import edu.snu.nemo.compiler.frontend.spark.core.JavaSparkContext;
 import edu.snu.nemo.compiler.frontend.spark.sql.SparkSession;
 
 import java.util.ArrayList;
diff --git a/examples/spark/src/main/java/edu/snu/nemo/examples/spark/JavaWordCount.java b/examples/spark/src/main/java/edu/snu/nemo/examples/spark/JavaWordCount.java
index 15336de8..f214fafa 100644
--- a/examples/spark/src/main/java/edu/snu/nemo/examples/spark/JavaWordCount.java
+++ b/examples/spark/src/main/java/edu/snu/nemo/examples/spark/JavaWordCount.java
@@ -16,8 +16,8 @@
  */
 package edu.snu.nemo.examples.spark;
 
-import edu.snu.nemo.compiler.frontend.spark.core.java.JavaPairRDD;
-import edu.snu.nemo.compiler.frontend.spark.core.java.JavaRDD;
+import edu.snu.nemo.compiler.frontend.spark.core.rdd.JavaPairRDD;
+import edu.snu.nemo.compiler.frontend.spark.core.rdd.JavaRDD;
 import edu.snu.nemo.compiler.frontend.spark.sql.SparkSession;
 import scala.Tuple2;
 
diff --git a/examples/spark/src/main/java/edu/snu/nemo/examples/spark/sql/JavaSparkSQLExample.java b/examples/spark/src/main/java/edu/snu/nemo/examples/spark/sql/JavaSparkSQLExample.java
index fea35e60..87db34e2 100644
--- a/examples/spark/src/main/java/edu/snu/nemo/examples/spark/sql/JavaSparkSQLExample.java
+++ b/examples/spark/src/main/java/edu/snu/nemo/examples/spark/sql/JavaSparkSQLExample.java
@@ -23,7 +23,7 @@
 import java.util.Collections;
 import java.io.Serializable;
 
-import edu.snu.nemo.compiler.frontend.spark.core.java.JavaRDD;
+import edu.snu.nemo.compiler.frontend.spark.core.rdd.JavaRDD;
 import edu.snu.nemo.compiler.frontend.spark.sql.Dataset;
 import edu.snu.nemo.compiler.frontend.spark.sql.SparkSession;
 import org.apache.spark.api.java.function.Function;
diff --git a/examples/spark/src/main/scala/edu/snu/nemo/examples/spark/SparkWordCount.scala b/examples/spark/src/main/scala/edu/snu/nemo/examples/spark/SparkWordCount.scala
new file mode 100644
index 00000000..df64bc8a
--- /dev/null
+++ b/examples/spark/src/main/scala/edu/snu/nemo/examples/spark/SparkWordCount.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package edu.snu.nemo.examples.spark
+
+import edu.snu.nemo.compiler.frontend.spark.sql.SparkSession
+
+/**
+  * Computes counts of each data key.
+  */
+object SparkWordCount {
+  def main(args: Array[String]) {
+    if (args.length < 1) {
+      System.err.println("Usage: JavaWordCount <input_file> [<output_file>]")
+      System.exit(1)
+    }
+    val spark = SparkSession
+      .builder
+      .appName("Spark Word Count")
+      .getOrCreate()
+
+    val lines = spark.read().textFile(args(0)).rdd()
+
+    val words = lines.flatMap(s => s.split(" +"))
+
+    val ones = words.map(s => (s, 1))
+
+    val counts = ones.reduceByKey((i1, i2) => i1 + i2)
+
+    val parsed = counts.map(tuple => tuple._1 + ": " + tuple._2.toString)
+
+    val writeMode = args(1) != null // write to file or print
+
+    if (writeMode) { // print to output file
+      parsed.saveAsTextFile(args(1))
+    } else { // print to console.
+      val output = parsed.collect()
+      for (elem <- output) {
+        println(elem)
+      }
+    }
+    spark.stop()
+  }
+}
+// scalastyle:on println
diff --git a/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkITCase.java b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkJavaITCase.java
similarity index 98%
rename from examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkITCase.java
rename to examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkJavaITCase.java
index ace78831..f1e9a36c 100644
--- a/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkITCase.java
+++ b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkJavaITCase.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2017 Seoul National University
+ * Copyright (C) 2018 Seoul National University
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -34,7 +34,7 @@
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(JobLauncher.class)
 @PowerMockIgnore("javax.management.*")
-public final class SparkITCase {
+public final class SparkJavaITCase {
   private static final int TIMEOUT = 180000;
   private static ArgBuilder builder;
   private static final String fileBasePath = System.getProperty("user.dir") + "/../resources/";
@@ -54,7 +54,6 @@ public void testSparkWordCount() throws Exception {
     final String inputFilePath = fileBasePath + inputFileName;
     final String outputFilePath = fileBasePath + outputFileName;
 
-
     JobLauncher.main(builder
         .addJobId(JavaWordCount.class.getSimpleName() + "_test")
         .addUserMain(JavaWordCount.class.getCanonicalName())
diff --git a/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkScalaITCase.java b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkScalaITCase.java
new file mode 100644
index 00000000..59b0508f
--- /dev/null
+++ b/examples/spark/src/test/java/edu/snu/nemo/examples/spark/SparkScalaITCase.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.examples.spark;
+
+import edu.snu.nemo.client.JobLauncher;
+import edu.snu.nemo.common.test.ArgBuilder;
+import edu.snu.nemo.common.test.ExampleTestUtil;
+import edu.snu.nemo.compiler.optimizer.policy.DefaultPolicy;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+/**
+ * Test Spark programs with JobLauncher.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(JobLauncher.class)
+@PowerMockIgnore("javax.management.*")
+public final class SparkScalaITCase {
+  private static final int TIMEOUT = 120000;
+  private static ArgBuilder builder;
+  private static final String fileBasePath = System.getProperty("user.dir") + "/../resources/";
+  private static final String executorResourceFileName = fileBasePath + "spark_sample_executor_resources.json";
+
+  @Before
+  public void setUp() {
+    builder = new ArgBuilder()
+        .addResourceJson(executorResourceFileName);
+  }
+
+  @Test(timeout = TIMEOUT)
+  public void testPi() throws Exception {
+    final String numParallelism = "3";
+
+    JobLauncher.main(builder
+        .addJobId(SparkPi.class.getSimpleName() + "_test")
+        .addUserMain(SparkPi.class.getCanonicalName())
+        .addUserArgs(numParallelism)
+        .addOptimizationPolicy(DefaultPolicy.class.getCanonicalName())
+        .build());
+  }
+
+  @Test(timeout = TIMEOUT)
+  public void testWordCount() throws Exception {
+    final String inputFileName = "sample_input_wordcount";
+    final String outputFileName = "sample_output_wordcount";
+    final String testResourceFilename = "test_output_wordcount";
+    final String inputFilePath = fileBasePath + inputFileName;
+    final String outputFilePath = fileBasePath + outputFileName;
+
+    JobLauncher.main(builder
+        .addJobId(SparkWordCount.class.getSimpleName() + "_test")
+        .addUserMain(SparkWordCount.class.getCanonicalName())
+        .addUserArgs(inputFilePath, outputFilePath)
+        .addOptimizationPolicy(DefaultPolicy.class.getCanonicalName())
+        .build());
+
+    try {
+      ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, testResourceFilename);
+    } finally {
+      ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
+    }
+  }
+}
\ No newline at end of file


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services