You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by wo...@apache.org on 2020/02/11 05:50:28 UTC

[incubator-nemo] branch master updated: [NEMO-429] SWPP TEAM20 Code Smell Fix (#283)

This is an automated email from the ASF dual-hosted git repository.

wonook pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new a7b9cf6  [NEMO-429] SWPP TEAM20 Code Smell Fix (#283)
a7b9cf6 is described below

commit a7b9cf6441b930ea29be49ca996f1ad393c8f754
Author: Jeongyoon Eo <je...@gmail.com>
AuthorDate: Tue Feb 11 14:50:17 2020 +0900

    [NEMO-429] SWPP TEAM20 Code Smell Fix (#283)
    
    JIRA: NEMO-429: SWPP TEAM20 Code Smell Fix
    
    Major changes:
    - Fixed code smells (SWPP Code Smell session)
    
    Co-authored-by: jihoonrf <kz...@snu.ac.kr>
    Co-authored-by: Spiraline <44...@users.noreply.github.com>
---
 .../nemo/common/exception/DataSourceException.java |  30 +-
 .../frontend/spark/coder/SparkEncoderFactory.java  |   5 +-
 .../frontend/spark/core/JavaSparkContext.java      |  26 +-
 .../compiler/frontend/spark/core/SparkContext.java |   8 +-
 .../frontend/spark/core/rdd/JavaPairRDD.java       | 711 ---------------------
 .../frontend/spark/core/rdd/SparkJavaPairRDD.java  | 661 +++++++++++++++++++
 .../core/rdd/{JavaRDD.java => SparkJavaRDD.java}   | 284 ++++----
 .../nemo/compiler/frontend/spark/sql/Dataset.java  |  12 +-
 .../compiler/frontend/spark/core/rdd/RDD.scala     |   6 +-
 .../pass/compiletime/annotating/LambdaPass.java    |   5 +-
 .../compiler/backend/nemo/DAGConverterTest.java    |   2 +-
 .../nemo/examples/beam/AlternatingLeastSquare.java |   4 +-
 .../nemo/examples/beam/GenericSourceSink.java      |  19 +-
 .../apache/nemo/examples/spark/JavaMapReduce.java  |  10 +-
 .../apache/nemo/examples/spark/JavaSparkPi.java    |   4 +-
 .../nemo/examples/spark/JavaWordAndLineCount.java  |  16 +-
 .../apache/nemo/examples/spark/JavaWordCount.java  |  12 +-
 .../examples/spark/sql/JavaSparkSQLExample.java    |  24 +-
 .../runtime/executor/data/block/FileBlock.java     |  31 +-
 .../MinOccupancyFirstSchedulingPolicy.java         |   2 +-
 20 files changed, 903 insertions(+), 969 deletions(-)

diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LambdaPass.java b/common/src/main/java/org/apache/nemo/common/exception/DataSourceException.java
similarity index 55%
copy from compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LambdaPass.java
copy to common/src/main/java/org/apache/nemo/common/exception/DataSourceException.java
index b6342e2..f5d94ea 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LambdaPass.java
+++ b/common/src/main/java/org/apache/nemo/common/exception/DataSourceException.java
@@ -16,27 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.nemo.compiler.optimizer.pass.compiletime.annotating;
-
-import org.apache.nemo.common.ir.IRDAG;
-import org.apache.nemo.common.ir.vertex.executionproperty.ResourceLambdaProperty;
+package org.apache.nemo.common.exception;
 
 /**
- * Lambda Pass.
- * Description: A part of lambda executor, assigning LambdaResourceProperty
+ * DataSourceException
+ * Thrown when any exception occurs in data sources. ex. processing data from file systems.
  */
-@Annotates(ResourceLambdaProperty.class)
-public final class LambdaPass extends AnnotatingPass {
-
-  public LambdaPass() {
-    super(LambdaPass.class);
-  }
-
-  @Override
-  public IRDAG apply(final IRDAG dag) {
-    dag.getVertices().forEach(vertex -> {
-      vertex.setPropertyPermanently(ResourceLambdaProperty.of(ResourceLambdaProperty.Value.ON));
-    });
-    return dag;
+public final class DataSourceException extends RuntimeException {
+  /**
+   * DataSourceException.
+   *
+   * @param throwable the throwable to throw.
+   */
+  public DataSourceException(final Throwable throwable) {
+    super(throwable);
   }
 }
diff --git a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/coder/SparkEncoderFactory.java b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/coder/SparkEncoderFactory.java
index bb1174b..7df8024 100644
--- a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/coder/SparkEncoderFactory.java
+++ b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/coder/SparkEncoderFactory.java
@@ -32,7 +32,7 @@ import java.io.OutputStream;
  * @param <T> type of the object to serialize.
  */
 public final class SparkEncoderFactory<T> implements EncoderFactory<T> {
-  private final Serializer serializer;
+  private final transient Serializer serializer;
 
   /**
    * Default constructor.
@@ -61,8 +61,7 @@ public final class SparkEncoderFactory<T> implements EncoderFactory<T> {
    * @param <T2> type of the object to serialize.
    */
   private final class SparkEncoder<T2> implements Encoder<T2> {
-
-    private final SerializationStream out;
+    private final transient SerializationStream out;
 
     /**
      * Constructor.
diff --git a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/JavaSparkContext.java b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/JavaSparkContext.java
index d4cd2a7..b1a7448 100644
--- a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/JavaSparkContext.java
+++ b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/JavaSparkContext.java
@@ -18,7 +18,7 @@
  */
 package org.apache.nemo.compiler.frontend.spark.core;
 
-import org.apache.nemo.compiler.frontend.spark.core.rdd.JavaRDD;
+import org.apache.nemo.compiler.frontend.spark.core.rdd.SparkJavaRDD;
 import org.apache.spark.SparkContext;
 
 import java.util.List;
@@ -39,46 +39,46 @@ public final class JavaSparkContext {
   }
 
   /**
-   * Create a String {@link JavaRDD} from a text file path.
+   * Create a String {@link SparkJavaRDD} from a text file path.
    *
    * @param path the path to read.
    * @return the RDD.
    */
-  public JavaRDD<String> textFile(final String path) {
+  public SparkJavaRDD<String> textFile(final String path) {
     return this.textFile(path, 1);
   }
 
   /**
-   * Create a String {@link JavaRDD} from a text file path with specific minimum parallelism.
+   * Create a String {@link SparkJavaRDD} from a text file path with specific minimum parallelism.
    *
    * @param path          the path to read.
    * @param minPartitions the minimum parallelism.
    * @return the RDD.
    */
-  public JavaRDD<String> textFile(final String path, final int minPartitions) {
-    return JavaRDD.of(sparkContext, minPartitions, path);
+  public SparkJavaRDD<String> textFile(final String path, final int minPartitions) {
+    return SparkJavaRDD.of(sparkContext, minPartitions, path);
   }
 
   /**
-   * Initiate a JavaRDD with the number of parallelism.
+   * Initiate a SparkJavaRDD with the number of parallelism.
    *
    * @param list input data as list.
    * @param <T>  type of the initial element.
-   * @return the newly initiated JavaRDD.
+   * @return the newly initiated SparkJavaRDD.
    */
-  public <T> JavaRDD<T> parallelize(final List<T> list) {
+  public <T> SparkJavaRDD<T> parallelize(final List<T> list) {
     return this.parallelize(list, 1);
   }
 
   /**
-   * Initiate a JavaRDD with the number of parallelism.
+   * Initiate a SparkJavaRDD with the number of parallelism.
    *
    * @param l      input data as list.
    * @param slices number of slices (parallelism).
    * @param <T>    type of the initial element.
-   * @return the newly initiated JavaRDD.
+   * @return the newly initiated SparkJavaRDD.
    */
-  public <T> JavaRDD<T> parallelize(final List<T> l, final int slices) {
-    return JavaRDD.of(this.sparkContext, l, slices);
+  public <T> SparkJavaRDD<T> parallelize(final List<T> l, final int slices) {
+    return SparkJavaRDD.of(this.sparkContext, l, slices);
   }
 }
diff --git a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/SparkContext.java b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/SparkContext.java
index c12b3b0..12bd878 100644
--- a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/SparkContext.java
+++ b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/SparkContext.java
@@ -19,7 +19,7 @@
 package org.apache.nemo.compiler.frontend.spark.core;
 
 import org.apache.nemo.compiler.frontend.spark.SparkBroadcastVariables;
-import org.apache.nemo.compiler.frontend.spark.core.rdd.JavaRDD;
+import org.apache.nemo.compiler.frontend.spark.core.rdd.SparkJavaRDD;
 import org.apache.nemo.compiler.frontend.spark.core.rdd.RDD;
 import org.apache.spark.SparkConf;
 import org.apache.spark.broadcast.Broadcast;
@@ -55,19 +55,19 @@ public final class SparkContext extends org.apache.spark.SparkContext {
   }
 
   /**
-   * Initiate a JavaRDD with the number of parallelism.
+   * Initiate a SparkJavaRDD with the number of parallelism.
    *
    * @param seq       input data as list.
    * @param numSlices number of slices (parallelism).
    * @param evidence  type of the initial element.
-   * @return the newly initiated JavaRDD.
+   * @return the newly initiated SparkJavaRDD.
    */
   @Override
   public <T> RDD<T> parallelize(final Seq<T> seq,
                                 final int numSlices,
                                 final ClassTag<T> evidence) {
     final List<T> javaList = scala.collection.JavaConversions.seqAsJavaList(seq);
-    return JavaRDD.of(this.sparkContext, javaList, numSlices).rdd();
+    return SparkJavaRDD.of(this.sparkContext, javaList, numSlices).rdd();
   }
 
   @Override
diff --git a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/rdd/JavaPairRDD.java b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/rdd/JavaPairRDD.java
deleted file mode 100644
index 1c85965..0000000
--- a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/rdd/JavaPairRDD.java
+++ /dev/null
@@ -1,711 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nemo.compiler.frontend.spark.core.rdd;
-
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.spark.Partitioner;
-import org.apache.spark.SparkContext;
-import org.apache.spark.api.java.Optional;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.partial.BoundedDouble;
-import org.apache.spark.partial.PartialResult;
-import org.apache.spark.serializer.Serializer;
-import org.apache.spark.storage.StorageLevel;
-import scala.Tuple2;
-import scala.Tuple3;
-import scala.Tuple4;
-import scala.reflect.ClassTag$;
-
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Java RDD for pairs.
- *
- * @param <K> key type.
- * @param <V> value type.
- */
-public final class JavaPairRDD<K, V> extends org.apache.spark.api.java.JavaPairRDD<K, V> {
-
-  private final RDD<Tuple2<K, V>> rdd;
-
-  /**
-   * Static method to create a JavaPairRDD object from {@link RDD}.
-   *
-   * @param rddFrom the RDD to parse.
-   * @param <K>     type of the key.
-   * @param <V>     type of the value.
-   * @return the parsed JavaPairRDD object.
-   */
-  public static <K, V> JavaPairRDD<K, V> fromRDD(final RDD<Tuple2<K, V>> rddFrom) {
-    return new JavaPairRDD<>(rddFrom);
-  }
-
-  @Override
-  public JavaPairRDD<K, V> wrapRDD(final org.apache.spark.rdd.RDD<Tuple2<K, V>> rddFrom) {
-    if (!(rddFrom instanceof RDD)) {
-      throw new UnsupportedOperationException("Cannot wrap Spark RDD as Nemo RDD!");
-    }
-    return fromRDD((RDD<Tuple2<K, V>>) rddFrom);
-  }
-
-  @Override
-  public RDD<Tuple2<K, V>> rdd() {
-    return rdd;
-  }
-
-  /**
-   * Constructor with existing nemo RDD.
-   *
-   * @param rdd the Nemo rdd to wrap.
-   */
-  JavaPairRDD(final RDD<Tuple2<K, V>> rdd) {
-    super(rdd, ClassTag$.MODULE$.apply(Object.class), ClassTag$.MODULE$.apply(Object.class));
-
-    this.rdd = rdd;
-  }
-
-  /**
-   * @return the spark context.
-   */
-  public SparkContext getSparkContext() {
-    return rdd.sparkContext();
-  }
-
-  /////////////// TRANSFORMATIONS ///////////////
-
-  @Override
-  public JavaPairRDD<K, V> reduceByKey(final Function2<V, V, V> func) {
-    // Explicit conversion
-    final PairRDDFunctions<K, V> pairRdd = RDD.rddToPairRDDFunctions(
-      rdd, ClassTag$.MODULE$.apply(Object.class), ClassTag$.MODULE$.apply(Object.class), null);
-    final RDD<Tuple2<K, V>> reducedRdd = pairRdd.reduceByKey(func);
-    return JavaPairRDD.fromRDD(reducedRdd);
-  }
-
-  @Override
-  public <R> JavaRDD<R> map(final Function<Tuple2<K, V>, R> f) {
-    return rdd.map(f, ClassTag$.MODULE$.apply(Object.class)).toJavaRDD();
-  }
-
-  /////////////// ACTIONS ///////////////
-
-  @Override
-  public List<Tuple2<K, V>> collect() {
-    return rdd.collectAsList();
-  }
-
-  /////////////// UNSUPPORTED METHODS ///////////////
-  //TODO#92: Implement the unimplemented transformations/actions & dataset initialization methods for Spark frontend.
-
-  @Override
-  public JavaPairRDD<K, V> cache() {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public JavaPairRDD<K, V> persist(final StorageLevel newLevel) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public JavaPairRDD<K, V> unpersist() {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public JavaPairRDD<K, V> unpersist(final boolean blocking) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public JavaPairRDD<K, V> distinct() {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public JavaPairRDD<K, V> distinct(final int numPartitions) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public JavaPairRDD<K, V> filter(final Function<Tuple2<K, V>, Boolean> f) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public JavaPairRDD<K, V> coalesce(final int numPartitions) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public JavaPairRDD<K, V> coalesce(final int numPartitions,
-                                    final boolean shuffle) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public JavaPairRDD<K, V> repartition(final int numPartitions) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public JavaPairRDD<K, V> sample(final boolean withReplacement,
-                                  final double fraction) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public JavaPairRDD<K, V> sample(final boolean withReplacement,
-                                  final double fraction,
-                                  final long seed) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public JavaPairRDD<K, V> sampleByKey(final boolean withReplacement,
-                                       final Map<K, Double> fractions,
-                                       final long seed) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public JavaPairRDD<K, V> sampleByKey(final boolean withReplacement,
-                                       final Map<K, Double> fractions) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public JavaPairRDD<K, V> sampleByKeyExact(final boolean withReplacement,
-                                            final Map<K, Double> fractions,
-                                            final long seed) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public JavaPairRDD<K, V> sampleByKeyExact(final boolean withReplacement,
-                                            final Map<K, Double> fractions) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public JavaPairRDD<K, V> union(final org.apache.spark.api.java.JavaPairRDD<K, V> other) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public JavaPairRDD<K, V> intersection(final org.apache.spark.api.java.JavaPairRDD<K, V> other) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public Tuple2<K, V> first() {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public <C> JavaPairRDD<K, C> combineByKey(final Function<V, C> createCombiner,
-                                            final Function2<C, V, C> mergeValue,
-                                            final Function2<C, C, C> mergeCombiners,
-                                            final Partitioner partitioner,
-                                            final boolean mapSideCombine,
-                                            final Serializer serializer) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public <C> JavaPairRDD<K, C> combineByKey(final Function<V, C> createCombiner,
-                                            final Function2<C, V, C> mergeValue,
-                                            final Function2<C, C, C> mergeCombiners,
-                                            final Partitioner partitioner) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public <C> JavaPairRDD<K, C> combineByKey(final Function<V, C> createCombiner,
-                                            final Function2<C, V, C> mergeValue,
-                                            final Function2<C, C, C> mergeCombiners,
-                                            final int numPartitions) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public JavaPairRDD<K, V> reduceByKey(final Partitioner partitioner,
-                                       final Function2<V, V, V> func) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public Map<K, V> reduceByKeyLocally(final Function2<V, V, V> func) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public Map<K, Long> countByKey() {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public PartialResult<Map<K, BoundedDouble>> countByKeyApprox(final long timeout) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public PartialResult<Map<K, BoundedDouble>> countByKeyApprox(final long timeout,
-                                                               final double confidence) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public double countByKeyApprox$default$2() {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public <U> JavaPairRDD<K, U> aggregateByKey(final U zeroValue,
-                                              final Partitioner partitioner,
-                                              final Function2<U, V, U> seqFunc,
-                                              final Function2<U, U, U> combFunc) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public <U> JavaPairRDD<K, U> aggregateByKey(final U zeroValue,
-                                              final int numPartitions,
-                                              final Function2<U, V, U> seqFunc,
-                                              final Function2<U, U, U> combFunc) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public <U> JavaPairRDD<K, U> aggregateByKey(final U zeroValue,
-                                              final Function2<U, V, U> seqFunc,
-                                              final Function2<U, U, U> combFunc) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public JavaPairRDD<K, V> foldByKey(final V zeroValue,
-                                     final Partitioner partitioner,
-                                     final Function2<V, V, V> func) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public JavaPairRDD<K, V> foldByKey(final V zeroValue,
-                                     final int numPartitions,
-                                     final Function2<V, V, V> func) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public JavaPairRDD<K, V> foldByKey(final V zeroValue,
-                                     final Function2<V, V, V> func) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public JavaPairRDD<K, V> reduceByKey(final Function2<V, V, V> func,
-                                       final int numPartitions) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public JavaPairRDD<K, Iterable<V>> groupByKey(final Partitioner partitioner) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public JavaPairRDD<K, Iterable<V>> groupByKey(final int numPartitions) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public JavaPairRDD<K, V> subtract(final org.apache.spark.api.java.JavaPairRDD<K, V> other) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public JavaPairRDD<K, V> subtract(final org.apache.spark.api.java.JavaPairRDD<K, V> other,
-                                    final int numPartitions) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public JavaPairRDD<K, V> subtract(final org.apache.spark.api.java.JavaPairRDD<K, V> other,
-                                    final Partitioner p) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public <W> JavaPairRDD<K, V> subtractByKey(final org.apache.spark.api.java.JavaPairRDD<K, W> other) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public <W> JavaPairRDD<K, V> subtractByKey(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
-                                             final int numPartitions) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public <W> JavaPairRDD<K, V> subtractByKey(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
-                                             final Partitioner p) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public JavaPairRDD<K, V> partitionBy(final Partitioner partitioner) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public <W> JavaPairRDD<K, Tuple2<V, W>> join(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
-                                               final Partitioner partitioner) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public <W> JavaPairRDD<K, Tuple2<V, Optional<W>>>
-  leftOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
-                final Partitioner partitioner) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public <W> JavaPairRDD<K, Tuple2<Optional<V>, W>>
-  rightOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
-                 final Partitioner partitioner) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public <W> JavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>>
-  fullOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
-                final Partitioner partitioner) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public <C> JavaPairRDD<K, C> combineByKey(final Function<V, C> createCombiner,
-                                            final Function2<C, V, C> mergeValue,
-                                            final Function2<C, C, C> mergeCombiners) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public org.apache.spark.api.java.JavaPairRDD<K, Iterable<V>> groupByKey() {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public <W> JavaPairRDD<K, Tuple2<V, W>> join(final org.apache.spark.api.java.JavaPairRDD<K, W> other) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public <W> JavaPairRDD<K, Tuple2<V, W>> join(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
-                                               final int numPartitions) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public <W> JavaPairRDD<K, Tuple2<V, Optional<W>>>
-  leftOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public <W> JavaPairRDD<K, Tuple2<V, Optional<W>>>
-  leftOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
-                final int numPartitions) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public <W> JavaPairRDD<K, Tuple2<Optional<V>, W>>
-  rightOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public <W> JavaPairRDD<K, Tuple2<Optional<V>, W>>
-  rightOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
-                 final int numPartitions) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public <W> JavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>>
-  fullOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public <W> JavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>>
-  fullOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
-                final int numPartitions) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public Map<K, V> collectAsMap() {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public <U> JavaPairRDD<K, U> mapValues(final Function<V, U> f) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public <U> JavaPairRDD<K, U> flatMapValues(final Function<V, Iterable<U>> f) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public <W> JavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>>
-  cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
-          final Partitioner partitioner) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public <W1, W2> JavaPairRDD<K, Tuple3<Iterable<V>, Iterable<W1>, Iterable<W2>>>
-  cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W1> other1,
-          final org.apache.spark.api.java.JavaPairRDD<K, W2> other2,
-          final Partitioner partitioner) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public <W1, W2, W3> JavaPairRDD<K, Tuple4<Iterable<V>, Iterable<W1>, Iterable<W2>, Iterable<W3>>>
-  cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W1> other1,
-          final org.apache.spark.api.java.JavaPairRDD<K, W2> other2,
-          final org.apache.spark.api.java.JavaPairRDD<K, W3> other3,
-          final Partitioner partitioner) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public <W> JavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>>
-  cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W> other) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public <W1, W2> JavaPairRDD<K, Tuple3<Iterable<V>, Iterable<W1>, Iterable<W2>>>
-  cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W1> other1,
-          final org.apache.spark.api.java.JavaPairRDD<K, W2> other2) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public <W1, W2, W3> JavaPairRDD<K, Tuple4<Iterable<V>, Iterable<W1>, Iterable<W2>, Iterable<W3>>>
-  cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W1> other1,
-          final org.apache.spark.api.java.JavaPairRDD<K, W2> other2,
-          final org.apache.spark.api.java.JavaPairRDD<K, W3> other3) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public <W> JavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>>
-  cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
-          final int numPartitions) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public <W1, W2> JavaPairRDD<K, Tuple3<Iterable<V>, Iterable<W1>, Iterable<W2>>>
-  cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W1> other1,
-          final org.apache.spark.api.java.JavaPairRDD<K, W2> other2,
-          final int numPartitions) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public <W1, W2, W3> JavaPairRDD<K, Tuple4<Iterable<V>, Iterable<W1>, Iterable<W2>, Iterable<W3>>>
-  cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W1> other1,
-          final org.apache.spark.api.java.JavaPairRDD<K, W2> other2,
-          final org.apache.spark.api.java.JavaPairRDD<K, W3> other3,
-          final int numPartitions) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public <W> JavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>>
-  groupWith(final org.apache.spark.api.java.JavaPairRDD<K, W> other) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public <W1, W2> JavaPairRDD<K, Tuple3<Iterable<V>, Iterable<W1>, Iterable<W2>>>
-  groupWith(final org.apache.spark.api.java.JavaPairRDD<K, W1> other1,
-            final org.apache.spark.api.java.JavaPairRDD<K, W2> other2) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public <W1, W2, W3> JavaPairRDD<K, Tuple4<Iterable<V>, Iterable<W1>, Iterable<W2>, Iterable<W3>>>
-  groupWith(final org.apache.spark.api.java.JavaPairRDD<K, W1> other1,
-            final org.apache.spark.api.java.JavaPairRDD<K, W2> other2,
-            final org.apache.spark.api.java.JavaPairRDD<K, W3> other3) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public List<V> lookup(final K key) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  // Commented out due to an IDE issue
-  /*@Override
-  public <F extends OutputFormat<?, ?>> void saveAsHadoopFile(final String path,
-                                                              final Class<?> keyClass,
-                                                              final Class<?> valueClass,
-                                                              final Class<F> outputFormatClass,
-                                                              final JobConf conf) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public <F extends OutputFormat<?, ?>> void saveAsHadoopFile(final String path,
-                                                              final Class<?> keyClass,
-                                                              final Class<?> valueClass,
-                                                              final Class<F> outputFormatClass) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public <F extends OutputFormat<?, ?>> void saveAsHadoopFile(final String path,
-                                                              final Class<?> keyClass,
-                                                              final Class<?> valueClass,
-                                                              final Class<F> outputFormatClass,
-                                                              final Class<? extends CompressionCodec> codec) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public <F extends org.apache.hadoop.mapreduce.OutputFormat<?, ?>> void
-  saveAsNewAPIHadoopFile(final String path,
-                         final Class<?> keyClass,
-                         final Class<?> valueClass,
-                         final Class<F> outputFormatClass,
-                         final Configuration conf) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public void saveAsNewAPIHadoopDataset(final Configuration conf) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public <F extends org.apache.hadoop.mapreduce.OutputFormat<?, ?>> void
-  saveAsNewAPIHadoopFile(final String path,
-                         final Class<?> keyClass,
-                         final Class<?> valueClass,
-                         final Class<F> outputFormatClass) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }*/
-
-  @Override
-  public void saveAsHadoopDataset(final JobConf conf) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public JavaPairRDD<K, V> repartitionAndSortWithinPartitions(final Partitioner partitioner) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public JavaPairRDD<K, V> repartitionAndSortWithinPartitions(final Partitioner partitioner,
-                                                              final Comparator<K> comp) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public JavaPairRDD<K, V> sortByKey() {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public JavaPairRDD<K, V> sortByKey(final boolean ascending) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public JavaPairRDD<K, V> sortByKey(final boolean ascending,
-                                     final int numPartitions) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public JavaPairRDD<K, V> sortByKey(final Comparator<K> comp) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public JavaPairRDD<K, V> sortByKey(final Comparator<K> comp,
-                                     final boolean ascending) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public JavaPairRDD<K, V> sortByKey(final Comparator<K> comp,
-                                     final boolean ascending,
-                                     final int numPartitions) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public JavaRDD<K> keys() {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public JavaRDD<V> values() {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public JavaPairRDD<K, Long> countApproxDistinctByKey(final double relativeSD,
-                                                       final Partitioner partitioner) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public JavaPairRDD<K, Long> countApproxDistinctByKey(final double relativeSD,
-                                                       final int numPartitions) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public JavaPairRDD<K, Long> countApproxDistinctByKey(final double relativeSD) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-
-  @Override
-  public JavaPairRDD<K, V> setName(final String name) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
-  }
-}
diff --git a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/rdd/SparkJavaPairRDD.java b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/rdd/SparkJavaPairRDD.java
new file mode 100644
index 0000000..83e538c
--- /dev/null
+++ b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/rdd/SparkJavaPairRDD.java
@@ -0,0 +1,661 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.spark.core.rdd;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.spark.Partitioner;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.Optional;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.partial.BoundedDouble;
+import org.apache.spark.partial.PartialResult;
+import org.apache.spark.serializer.Serializer;
+import org.apache.spark.storage.StorageLevel;
+import scala.Tuple2;
+import scala.Tuple3;
+import scala.Tuple4;
+import scala.reflect.ClassTag$;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Java RDD for pairs.
+ *
+ * @param <K> key type.
+ * @param <V> value type.
+ */
+public final class SparkJavaPairRDD<K, V> extends org.apache.spark.api.java.JavaPairRDD<K, V> {
+
+  private final RDD<Tuple2<K, V>> rdd;
+  private static final String NOT_YET_SUPPORTED = "Operation not yet supported.";
+
+  /**
+   * Static method to create a SparkJavaPairRDD object from {@link RDD}.
+   *
+   * @param rddFrom the RDD to parse.
+   * @param <K>     type of the key.
+   * @param <V>     type of the value.
+   * @return the parsed SparkJavaPairRDD object.
+   */
+  public static <K, V> SparkJavaPairRDD<K, V> fromRDD(final RDD<Tuple2<K, V>> rddFrom) {
+    return new SparkJavaPairRDD<>(rddFrom);
+  }
+
+  @Override
+  public SparkJavaPairRDD<K, V> wrapRDD(final org.apache.spark.rdd.RDD<Tuple2<K, V>> rddFrom) {
+    if (!(rddFrom instanceof RDD)) {
+      throw new UnsupportedOperationException("Cannot wrap Spark RDD as Nemo RDD!");
+    }
+    return fromRDD((RDD<Tuple2<K, V>>) rddFrom);
+  }
+
+  @Override
+  public RDD<Tuple2<K, V>> rdd() {
+    return rdd;
+  }
+
+  /**
+   * Constructor with existing nemo RDD.
+   *
+   * @param rdd the Nemo rdd to wrap.
+   */
+  SparkJavaPairRDD(final RDD<Tuple2<K, V>> rdd) {
+    super(rdd, ClassTag$.MODULE$.apply(Object.class), ClassTag$.MODULE$.apply(Object.class));
+
+    this.rdd = rdd;
+  }
+
+  /**
+   * @return the spark context.
+   */
+  public SparkContext getSparkContext() {
+    return rdd.sparkContext();
+  }
+
+  /////////////// TRANSFORMATIONS ///////////////
+
+  @Override
+  public SparkJavaPairRDD<K, V> reduceByKey(final Function2<V, V, V> func) {
+    // Explicit conversion
+    final PairRDDFunctions<K, V> pairRdd = RDD.rddToPairRDDFunctions(
+      rdd, ClassTag$.MODULE$.apply(Object.class), ClassTag$.MODULE$.apply(Object.class), null);
+    final RDD<Tuple2<K, V>> reducedRdd = pairRdd.reduceByKey(func);
+    return SparkJavaPairRDD.fromRDD(reducedRdd);
+  }
+
+  @Override
+  public <R> SparkJavaRDD<R> map(final Function<Tuple2<K, V>, R> f) {
+    return rdd.map(f, ClassTag$.MODULE$.apply(Object.class)).toJavaRDD();
+  }
+
+  /////////////// ACTIONS ///////////////
+
+  @Override
+  public List<Tuple2<K, V>> collect() {
+    return rdd.collectAsList();
+  }
+
+  /**
+   * TODO#92: Implement the unimplemented transformations/actions & dataset initialization methods for Spark frontend.
+   */
+  @Override
+  public SparkJavaPairRDD<K, V> cache() {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public SparkJavaPairRDD<K, V> persist(final StorageLevel newLevel) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public SparkJavaPairRDD<K, V> unpersist() {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public SparkJavaPairRDD<K, V> unpersist(final boolean blocking) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public SparkJavaPairRDD<K, V> distinct() {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public SparkJavaPairRDD<K, V> distinct(final int numPartitions) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public SparkJavaPairRDD<K, V> filter(final Function<Tuple2<K, V>, Boolean> f) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public SparkJavaPairRDD<K, V> coalesce(final int numPartitions) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public SparkJavaPairRDD<K, V> coalesce(final int numPartitions,
+                                         final boolean shuffle) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public SparkJavaPairRDD<K, V> repartition(final int numPartitions) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public SparkJavaPairRDD<K, V> sample(final boolean withReplacement,
+                                       final double fraction) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public SparkJavaPairRDD<K, V> sample(final boolean withReplacement,
+                                       final double fraction,
+                                       final long seed) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public SparkJavaPairRDD<K, V> sampleByKey(final boolean withReplacement,
+                                            final Map<K, Double> fractions,
+                                            final long seed) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public SparkJavaPairRDD<K, V> sampleByKey(final boolean withReplacement,
+                                            final Map<K, Double> fractions) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public SparkJavaPairRDD<K, V> sampleByKeyExact(final boolean withReplacement,
+                                                 final Map<K, Double> fractions,
+                                                 final long seed) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public SparkJavaPairRDD<K, V> sampleByKeyExact(final boolean withReplacement,
+                                                 final Map<K, Double> fractions) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public SparkJavaPairRDD<K, V> union(final org.apache.spark.api.java.JavaPairRDD<K, V> other) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public SparkJavaPairRDD<K, V> intersection(final org.apache.spark.api.java.JavaPairRDD<K, V> other) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public Tuple2<K, V> first() {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public <C> SparkJavaPairRDD<K, C> combineByKey(final Function<V, C> createCombiner,
+                                                 final Function2<C, V, C> mergeValue,
+                                                 final Function2<C, C, C> mergeCombiners,
+                                                 final Partitioner partitioner,
+                                                 final boolean mapSideCombine,
+                                                 final Serializer serializer) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public <C> SparkJavaPairRDD<K, C> combineByKey(final Function<V, C> createCombiner,
+                                                 final Function2<C, V, C> mergeValue,
+                                                 final Function2<C, C, C> mergeCombiners,
+                                                 final Partitioner partitioner) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public <C> SparkJavaPairRDD<K, C> combineByKey(final Function<V, C> createCombiner,
+                                                 final Function2<C, V, C> mergeValue,
+                                                 final Function2<C, C, C> mergeCombiners,
+                                                 final int numPartitions) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public SparkJavaPairRDD<K, V> reduceByKey(final Partitioner partitioner,
+                                            final Function2<V, V, V> func) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public Map<K, V> reduceByKeyLocally(final Function2<V, V, V> func) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public Map<K, Long> countByKey() {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public PartialResult<Map<K, BoundedDouble>> countByKeyApprox(final long timeout) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public PartialResult<Map<K, BoundedDouble>> countByKeyApprox(final long timeout,
+                                                               final double confidence) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public double countByKeyApprox$default$2() {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public <U> SparkJavaPairRDD<K, U> aggregateByKey(final U zeroValue,
+                                                   final Partitioner partitioner,
+                                                   final Function2<U, V, U> seqFunc,
+                                                   final Function2<U, U, U> combFunc) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public <U> SparkJavaPairRDD<K, U> aggregateByKey(final U zeroValue,
+                                                   final int numPartitions,
+                                                   final Function2<U, V, U> seqFunc,
+                                                   final Function2<U, U, U> combFunc) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public <U> SparkJavaPairRDD<K, U> aggregateByKey(final U zeroValue,
+                                                   final Function2<U, V, U> seqFunc,
+                                                   final Function2<U, U, U> combFunc) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public SparkJavaPairRDD<K, V> foldByKey(final V zeroValue,
+                                          final Partitioner partitioner,
+                                          final Function2<V, V, V> func) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public SparkJavaPairRDD<K, V> foldByKey(final V zeroValue,
+                                          final int numPartitions,
+                                          final Function2<V, V, V> func) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public SparkJavaPairRDD<K, V> foldByKey(final V zeroValue,
+                                          final Function2<V, V, V> func) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public SparkJavaPairRDD<K, V> reduceByKey(final Function2<V, V, V> func,
+                                            final int numPartitions) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public SparkJavaPairRDD<K, Iterable<V>> groupByKey(final Partitioner partitioner) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public SparkJavaPairRDD<K, Iterable<V>> groupByKey(final int numPartitions) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public SparkJavaPairRDD<K, V> subtract(final org.apache.spark.api.java.JavaPairRDD<K, V> other) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public SparkJavaPairRDD<K, V> subtract(final org.apache.spark.api.java.JavaPairRDD<K, V> other,
+                                         final int numPartitions) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public SparkJavaPairRDD<K, V> subtract(final org.apache.spark.api.java.JavaPairRDD<K, V> other,
+                                         final Partitioner p) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public <W> SparkJavaPairRDD<K, V> subtractByKey(final org.apache.spark.api.java.JavaPairRDD<K, W> other) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public <W> SparkJavaPairRDD<K, V> subtractByKey(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
+                                                  final int numPartitions) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public <W> SparkJavaPairRDD<K, V> subtractByKey(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
+                                                  final Partitioner p) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public SparkJavaPairRDD<K, V> partitionBy(final Partitioner partitioner) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public <W> SparkJavaPairRDD<K, Tuple2<V, W>> join(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
+                                                    final Partitioner partitioner) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public <W> SparkJavaPairRDD<K, Tuple2<V, Optional<W>>>
+  leftOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
+                final Partitioner partitioner) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public <W> SparkJavaPairRDD<K, Tuple2<Optional<V>, W>>
+  rightOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
+                 final Partitioner partitioner) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public <W> SparkJavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>>
+  fullOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
+                final Partitioner partitioner) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public <C> SparkJavaPairRDD<K, C> combineByKey(final Function<V, C> createCombiner,
+                                                 final Function2<C, V, C> mergeValue,
+                                                 final Function2<C, C, C> mergeCombiners) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public org.apache.spark.api.java.JavaPairRDD<K, Iterable<V>> groupByKey() {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public <W> SparkJavaPairRDD<K, Tuple2<V, W>> join(final org.apache.spark.api.java.JavaPairRDD<K, W> other) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public <W> SparkJavaPairRDD<K, Tuple2<V, W>> join(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
+                                                    final int numPartitions) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public <W> SparkJavaPairRDD<K, Tuple2<V, Optional<W>>>
+  leftOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public <W> SparkJavaPairRDD<K, Tuple2<V, Optional<W>>>
+  leftOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
+                final int numPartitions) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public <W> SparkJavaPairRDD<K, Tuple2<Optional<V>, W>>
+  rightOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public <W> SparkJavaPairRDD<K, Tuple2<Optional<V>, W>>
+  rightOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
+                 final int numPartitions) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public <W> SparkJavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>>
+  fullOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public <W> SparkJavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>>
+  fullOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
+                final int numPartitions) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public Map<K, V> collectAsMap() {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public <U> SparkJavaPairRDD<K, U> mapValues(final Function<V, U> f) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public <U> SparkJavaPairRDD<K, U> flatMapValues(final Function<V, Iterable<U>> f) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public <W> SparkJavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>>
+  cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
+          final Partitioner partitioner) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public <W1, W2> SparkJavaPairRDD<K, Tuple3<Iterable<V>, Iterable<W1>, Iterable<W2>>>
+  cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W1> other1,
+          final org.apache.spark.api.java.JavaPairRDD<K, W2> other2,
+          final Partitioner partitioner) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public <W1, W2, W3> SparkJavaPairRDD<K, Tuple4<Iterable<V>, Iterable<W1>, Iterable<W2>, Iterable<W3>>>
+  cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W1> other1,
+          final org.apache.spark.api.java.JavaPairRDD<K, W2> other2,
+          final org.apache.spark.api.java.JavaPairRDD<K, W3> other3,
+          final Partitioner partitioner) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public <W> SparkJavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>>
+  cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W> other) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public <W1, W2> SparkJavaPairRDD<K, Tuple3<Iterable<V>, Iterable<W1>, Iterable<W2>>>
+  cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W1> other1,
+          final org.apache.spark.api.java.JavaPairRDD<K, W2> other2) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public <W1, W2, W3> SparkJavaPairRDD<K, Tuple4<Iterable<V>, Iterable<W1>, Iterable<W2>, Iterable<W3>>>
+  cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W1> other1,
+          final org.apache.spark.api.java.JavaPairRDD<K, W2> other2,
+          final org.apache.spark.api.java.JavaPairRDD<K, W3> other3) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public <W> SparkJavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>>
+  cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
+          final int numPartitions) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public <W1, W2> SparkJavaPairRDD<K, Tuple3<Iterable<V>, Iterable<W1>, Iterable<W2>>>
+  cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W1> other1,
+          final org.apache.spark.api.java.JavaPairRDD<K, W2> other2,
+          final int numPartitions) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public <W1, W2, W3> SparkJavaPairRDD<K, Tuple4<Iterable<V>, Iterable<W1>, Iterable<W2>, Iterable<W3>>>
+  cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W1> other1,
+          final org.apache.spark.api.java.JavaPairRDD<K, W2> other2,
+          final org.apache.spark.api.java.JavaPairRDD<K, W3> other3,
+          final int numPartitions) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public <W> SparkJavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>>
+  groupWith(final org.apache.spark.api.java.JavaPairRDD<K, W> other) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public <W1, W2> SparkJavaPairRDD<K, Tuple3<Iterable<V>, Iterable<W1>, Iterable<W2>>>
+  groupWith(final org.apache.spark.api.java.JavaPairRDD<K, W1> other1,
+            final org.apache.spark.api.java.JavaPairRDD<K, W2> other2) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public <W1, W2, W3> SparkJavaPairRDD<K, Tuple4<Iterable<V>, Iterable<W1>, Iterable<W2>, Iterable<W3>>>
+  groupWith(final org.apache.spark.api.java.JavaPairRDD<K, W1> other1,
+            final org.apache.spark.api.java.JavaPairRDD<K, W2> other2,
+            final org.apache.spark.api.java.JavaPairRDD<K, W3> other3) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public List<V> lookup(final K key) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public void saveAsHadoopDataset(final JobConf conf) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public SparkJavaPairRDD<K, V> repartitionAndSortWithinPartitions(final Partitioner partitioner) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public SparkJavaPairRDD<K, V> repartitionAndSortWithinPartitions(final Partitioner partitioner,
+                                                                   final Comparator<K> comp) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public SparkJavaPairRDD<K, V> sortByKey() {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public SparkJavaPairRDD<K, V> sortByKey(final boolean ascending) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public SparkJavaPairRDD<K, V> sortByKey(final boolean ascending,
+                                          final int numPartitions) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public SparkJavaPairRDD<K, V> sortByKey(final Comparator<K> comp) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public SparkJavaPairRDD<K, V> sortByKey(final Comparator<K> comp,
+                                          final boolean ascending) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public SparkJavaPairRDD<K, V> sortByKey(final Comparator<K> comp,
+                                          final boolean ascending,
+                                          final int numPartitions) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public SparkJavaRDD<K> keys() {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public SparkJavaRDD<V> values() {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public SparkJavaPairRDD<K, Long> countApproxDistinctByKey(final double relativeSD,
+                                                            final Partitioner partitioner) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public SparkJavaPairRDD<K, Long> countApproxDistinctByKey(final double relativeSD,
+                                                            final int numPartitions) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public SparkJavaPairRDD<K, Long> countApproxDistinctByKey(final double relativeSD) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+
+  @Override
+  public SparkJavaPairRDD<K, V> setName(final String name) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+  }
+}
diff --git a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/rdd/JavaRDD.java b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/rdd/SparkJavaRDD.java
similarity index 52%
rename from compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/rdd/JavaRDD.java
rename to compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/rdd/SparkJavaRDD.java
index 69d2571..9761a17 100644
--- a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/rdd/JavaRDD.java
+++ b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/rdd/SparkJavaRDD.java
@@ -54,9 +54,10 @@ import java.util.concurrent.atomic.AtomicInteger;
  *
  * @param <T> type of the final element.
  */
-public final class JavaRDD<T> extends org.apache.spark.api.java.JavaRDD<T> {
+public final class SparkJavaRDD<T> extends org.apache.spark.api.java.JavaRDD<T> {
 
   private final RDD<T> rdd;
+  private static final String NOT_YET_SUPPORTED = "Operation not yet supported.";
 
   /**
    * Static method to create a RDD object from an iterable object.
@@ -65,11 +66,11 @@ public final class JavaRDD<T> extends org.apache.spark.api.java.JavaRDD<T> {
    * @param initialData  initial data.
    * @param parallelism  parallelism information.
    * @param <T>          type of the resulting object.
-   * @return the new JavaRDD object.
+   * @return the new SparkJavaRDD object.
    */
-  public static <T> JavaRDD<T> of(final SparkContext sparkContext,
-                                  final Iterable<T> initialData,
-                                  final Integer parallelism) {
+  public static <T> SparkJavaRDD<T> of(final SparkContext sparkContext,
+                                       final Iterable<T> initialData,
+                                       final Integer parallelism) {
     final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
 
     final IRVertex initializedSourceVertex = new InMemorySourceVertex<>(initialData);
@@ -79,20 +80,20 @@ public final class JavaRDD<T> extends org.apache.spark.api.java.JavaRDD<T> {
     final RDD<T> nemoRdd = new RDD<>(sparkContext, builder.buildWithoutSourceSinkCheck(),
       initializedSourceVertex, Option.empty(), ClassTag$.MODULE$.apply(Object.class));
 
-    return new JavaRDD<>(nemoRdd);
+    return new SparkJavaRDD<>(nemoRdd);
   }
 
   /**
-   * Static method to create a JavaRDD object from an text file.
+   * Static method to create a SparkJavaRDD object from an text file.
    *
    * @param sparkContext  the spark context containing configurations.
    * @param minPartitions the minimum number of partitions.
    * @param inputPath     the path of the input text file.
-   * @return the new JavaRDD object
+   * @return the new SparkJavaRDD object
    */
-  public static JavaRDD<String> of(final SparkContext sparkContext,
-                                   final int minPartitions,
-                                   final String inputPath) {
+  public static SparkJavaRDD<String> of(final SparkContext sparkContext,
+                                        final int minPartitions,
+                                        final String inputPath) {
     final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
 
     final org.apache.spark.rdd.RDD<String> textRdd = sparkContext.textFile(inputPath, minPartitions);
@@ -101,19 +102,19 @@ public final class JavaRDD<T> extends org.apache.spark.api.java.JavaRDD<T> {
     textSourceVertex.setProperty(ParallelismProperty.of(numPartitions));
     builder.addVertex(textSourceVertex);
 
-    return new JavaRDD<>(textRdd, sparkContext, builder.buildWithoutSourceSinkCheck(), textSourceVertex);
+    return new SparkJavaRDD<>(textRdd, sparkContext, builder.buildWithoutSourceSinkCheck(), textSourceVertex);
   }
 
   /**
-   * Static method to create a JavaRDD object from a Dataset.
+   * Static method to create a SparkJavaRDD object from a Dataset.
    *
    * @param sparkSession spark session containing configurations.
    * @param dataset      dataset to read initial data from.
    * @param <T>          type of the resulting object.
-   * @return the new JavaRDD object.
+   * @return the new SparkJavaRDD object.
    */
-  public static <T> JavaRDD<T> of(final SparkSession sparkSession,
-                                  final Dataset<T> dataset) {
+  public static <T> SparkJavaRDD<T> of(final SparkSession sparkSession,
+                                       final Dataset<T> dataset) {
     final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
 
     final IRVertex sparkBoundedSourceVertex = new SparkDatasetBoundedSourceVertex<>(sparkSession, dataset);
@@ -121,23 +122,23 @@ public final class JavaRDD<T> extends org.apache.spark.api.java.JavaRDD<T> {
     sparkBoundedSourceVertex.setProperty(ParallelismProperty.of(sparkRDD.getNumPartitions()));
     builder.addVertex(sparkBoundedSourceVertex);
 
-    return new JavaRDD<>(
+    return new SparkJavaRDD<>(
       sparkRDD, sparkSession.sparkContext(), builder.buildWithoutSourceSinkCheck(), sparkBoundedSourceVertex);
   }
 
   /**
-   * Static method to create a JavaRDD object from {@link RDD}.
+   * Static method to create a SparkJavaRDD object from {@link RDD}.
    *
    * @param rddFrom the RDD to parse.
    * @param <T>     type of the resulting object.
-   * @return the parsed JavaRDD object.
+   * @return the parsed SparkJavaRDD object.
    */
-  public static <T> JavaRDD<T> fromRDD(final RDD<T> rddFrom) {
-    return new JavaRDD<>(rddFrom);
+  public static <T> SparkJavaRDD<T> fromRDD(final RDD<T> rddFrom) {
+    return new SparkJavaRDD<>(rddFrom);
   }
 
   @Override
-  public JavaRDD<T> wrapRDD(final org.apache.spark.rdd.RDD<T> rddFrom) {
+  public SparkJavaRDD<T> wrapRDD(final org.apache.spark.rdd.RDD<T> rddFrom) {
     if (!(rddFrom instanceof RDD)) {
       throw new UnsupportedOperationException("Cannot wrap Spark RDD as Nemo RDD!");
     }
@@ -154,7 +155,7 @@ public final class JavaRDD<T> extends org.apache.spark.api.java.JavaRDD<T> {
    *
    * @param rdd the Nemo rdd to wrap.
    */
-  JavaRDD(final RDD<T> rdd) {
+  SparkJavaRDD(final RDD<T> rdd) {
     super(rdd, ClassTag$.MODULE$.apply(Object.class));
     this.rdd = rdd;
   }
@@ -167,10 +168,10 @@ public final class JavaRDD<T> extends org.apache.spark.api.java.JavaRDD<T> {
    * @param dag          the IR DAG in construction.
    * @param lastVertex   the last vertex of the DAG in construction.
    */
-  JavaRDD(final org.apache.spark.rdd.RDD<T> sparkRDD,
-          final SparkContext sparkContext,
-          final DAG<IRVertex, IREdge> dag,
-          final IRVertex lastVertex) {
+  SparkJavaRDD(final org.apache.spark.rdd.RDD<T> sparkRDD,
+               final SparkContext sparkContext,
+               final DAG<IRVertex, IREdge> dag,
+               final IRVertex lastVertex) {
     super(sparkRDD, ClassTag$.MODULE$.apply(Object.class));
 
     this.rdd = new RDD<>(sparkContext, dag, lastVertex, Option.apply(sparkRDD), ClassTag$.MODULE$.apply(Object.class));
@@ -183,10 +184,10 @@ public final class JavaRDD<T> extends org.apache.spark.api.java.JavaRDD<T> {
    *
    * @param func function to apply.
    * @param <O>  output type.
-   * @return the JavaRDD with the extended DAG.
+   * @return the SparkJavaRDD with the extended DAG.
    */
   @Override
-  public <O> JavaRDD<O> map(final Function<T, O> func) {
+  public <O> SparkJavaRDD<O> map(final Function<T, O> func) {
     return rdd.map(func, ClassTag$.MODULE$.apply(Object.class)).toJavaRDD();
   }
 
@@ -195,10 +196,10 @@ public final class JavaRDD<T> extends org.apache.spark.api.java.JavaRDD<T> {
    *
    * @param func function to apply.
    * @param <O>  output type.
-   * @return the JavaRDD with the extended DAG.
+   * @return the SparkJavaRDD with the extended DAG.
    */
   @Override
-  public <O> JavaRDD<O> flatMap(final FlatMapFunction<T, O> func) {
+  public <O> SparkJavaRDD<O> flatMap(final FlatMapFunction<T, O> func) {
     return rdd.flatMap(func, ClassTag$.MODULE$.apply(Object.class)).toJavaRDD();
   }
 
@@ -208,10 +209,10 @@ public final class JavaRDD<T> extends org.apache.spark.api.java.JavaRDD<T> {
    * @see org.apache.spark.api.java.JavaRDD#mapToPair : PairFunction.
    */
   @Override
-  public <K2, V2> JavaPairRDD<K2, V2> mapToPair(final PairFunction<T, K2, V2> f) {
+  public <K2, V2> SparkJavaPairRDD<K2, V2> mapToPair(final PairFunction<T, K2, V2> f) {
     final RDD<Tuple2<K2, V2>> pairRdd =
       rdd.map(SparkFrontendUtils.pairFunctionToPlainFunction(f), ClassTag$.MODULE$.apply(Object.class));
-    return JavaPairRDD.fromRDD(pairRdd);
+    return SparkJavaPairRDD.fromRDD(pairRdd);
   }
 
   /////////////// ACTIONS ///////////////
@@ -251,386 +252,385 @@ public final class JavaRDD<T> extends org.apache.spark.api.java.JavaRDD<T> {
   /////////////// CACHING ///////////////
 
   @Override
-  public JavaRDD<T> persist(final StorageLevel newLevel) {
+  public SparkJavaRDD<T> persist(final StorageLevel newLevel) {
     return rdd.persist(newLevel).toJavaRDD();
   }
 
   @Override
-  public JavaRDD<T> cache() {
+  public SparkJavaRDD<T> cache() {
     return rdd.cache().toJavaRDD();
   }
 
-  /////////////// UNSUPPORTED TRANSFORMATIONS ///////////////
-  //TODO#92: Implement the unimplemented transformations/actions & dataset initialization methods for Spark frontend.
+  /**
+   * TODO#92: Implement the unimplemented transformations/actions & dataset initialization methods for Spark frontend.
+   */
 
   @Override
-  public JavaRDD<T> coalesce(final int numPartitions) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+  public SparkJavaRDD<T> coalesce(final int numPartitions) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
-  public JavaRDD<T> coalesce(final int numPartitions, final boolean shuffle) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+  public SparkJavaRDD<T> coalesce(final int numPartitions, final boolean shuffle) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
-  public JavaRDD<T> distinct() {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+  public SparkJavaRDD<T> distinct() {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
-  public JavaRDD<T> distinct(final int numPartitions) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+  public SparkJavaRDD<T> distinct(final int numPartitions) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
-  public JavaRDD<T> filter(final Function<T, Boolean> f) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+  public SparkJavaRDD<T> filter(final Function<T, Boolean> f) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
-  public JavaRDD<List<T>> glom() {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+  public SparkJavaRDD<List<T>> glom() {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
-  public <U> JavaRDD<U> mapPartitions(final FlatMapFunction<Iterator<T>, U> f) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+  public <U> SparkJavaRDD<U> mapPartitions(final FlatMapFunction<Iterator<T>, U> f) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
-  public <U> JavaRDD<U> mapPartitions(final FlatMapFunction<Iterator<T>, U> f, final boolean preservesPartitioning) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+  public <U> SparkJavaRDD<U> mapPartitions(final FlatMapFunction<Iterator<T>, U> f,
+                                           final boolean preservesPartitioning) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
-  public <R> JavaRDD<R> mapPartitionsWithIndex(final Function2<Integer, Iterator<T>, Iterator<R>> f,
-                                               final boolean preservesPartitioning) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+  public <R> SparkJavaRDD<R> mapPartitionsWithIndex(final Function2<Integer, Iterator<T>, Iterator<R>> f,
+                                                    final boolean preservesPartitioning) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
-  public JavaRDD<T>[] randomSplit(final double[] weights) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+  public SparkJavaRDD<T>[] randomSplit(final double[] weights) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
-  public JavaRDD<T>[] randomSplit(final double[] weights, final long seed) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+  public SparkJavaRDD<T>[] randomSplit(final double[] weights, final long seed) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
-  public JavaRDD<T> repartition(final int numPartitions) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+  public SparkJavaRDD<T> repartition(final int numPartitions) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
-  public JavaRDD<T> sample(final boolean withReplacement, final double fraction) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+  public SparkJavaRDD<T> sample(final boolean withReplacement, final double fraction) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
-  public JavaRDD<T> sample(final boolean withReplacement, final double fraction, final long seed) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+  public SparkJavaRDD<T> sample(final boolean withReplacement, final double fraction, final long seed) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
-  public JavaRDD<T> setName(final String name) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+  public SparkJavaRDD<T> setName(final String name) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
-  public <S> JavaRDD<T> sortBy(final Function<T, S> f, final boolean ascending, final int numPartitions) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+  public <S> SparkJavaRDD<T> sortBy(final Function<T, S> f, final boolean ascending, final int numPartitions) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
-  public JavaRDD<T> unpersist() {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+  public SparkJavaRDD<T> unpersist() {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
-  public JavaRDD<T> unpersist(final boolean blocking) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+  public SparkJavaRDD<T> unpersist(final boolean blocking) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
-  /////////////// UNSUPPORTED TRANSFORMATION TO PAIR RDD ///////////////
-  //TODO#92: Implement the unimplemented transformations/actions & dataset initialization methods for Spark frontend.
-
   @Override
-  public <K2, V2> JavaPairRDD<K2, V2> flatMapToPair(final PairFlatMapFunction<T, K2, V2> f) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+  public <K2, V2> SparkJavaPairRDD<K2, V2> flatMapToPair(final PairFlatMapFunction<T, K2, V2> f) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
-  public <U> JavaPairRDD<U, Iterable<T>> groupBy(final Function<T, U> f) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+  public <U> SparkJavaPairRDD<U, Iterable<T>> groupBy(final Function<T, U> f) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
-  public <U> JavaPairRDD<U, Iterable<T>> groupBy(final Function<T, U> f, final int numPartitions) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+  public <U> SparkJavaPairRDD<U, Iterable<T>> groupBy(final Function<T, U> f, final int numPartitions) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
-  public <U> JavaPairRDD<U, T> keyBy(final Function<T, U> f) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+  public <U> SparkJavaPairRDD<U, T> keyBy(final Function<T, U> f) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
-  public <K2, V2> JavaPairRDD<K2, V2> mapPartitionsToPair(final PairFlatMapFunction<Iterator<T>, K2, V2> f) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+  public <K2, V2> SparkJavaPairRDD<K2, V2> mapPartitionsToPair(final PairFlatMapFunction<Iterator<T>, K2, V2> f) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
-  public <K2, V2> JavaPairRDD<K2, V2> mapPartitionsToPair(final PairFlatMapFunction<java.util.Iterator<T>, K2, V2> f,
-                                                          final boolean preservesPartitioning) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+  public <K2, V2> SparkJavaPairRDD<K2, V2> mapPartitionsToPair(
+    final PairFlatMapFunction<java.util.Iterator<T>, K2, V2> f,
+    final boolean preservesPartitioning) {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
-  public JavaPairRDD<T, Long> zipWithIndex() {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+  public SparkJavaPairRDD<T, Long> zipWithIndex() {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
-  public JavaPairRDD<T, Long> zipWithUniqueId() {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+  public SparkJavaPairRDD<T, Long> zipWithUniqueId() {
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
-  /////////////// UNSUPPORTED ACTIONS ///////////////
-  //TODO#92: Implement the unimplemented transformations/actions & dataset initialization methods for Spark frontend.
+  /////////////// ACTIONS ///////////////
 
   @Override
   public <U> U aggregate(final U zeroValue, final Function2<U, T, U> seqOp, final Function2<U, U, U> combOp) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public void checkpoint() {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
 
   @Override
   public JavaFutureAction<List<T>> collectAsync() {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public List<T>[] collectPartitions(final int[] partitionIds) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public long count() {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public PartialResult<BoundedDouble> countApprox(final long timeout) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public PartialResult<BoundedDouble> countApprox(final long timeout, final double confidence) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public long countApproxDistinct(final double relativeSD) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public JavaFutureAction<Long> countAsync() {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public Map<T, Long> countByValue() {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public PartialResult<Map<T, BoundedDouble>> countByValueApprox(final long timeout) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public PartialResult<Map<T, BoundedDouble>> countByValueApprox(final long timeout, final double confidence) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public T first() {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public T fold(final T zeroValue, final Function2<T, T, T> f) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public void foreach(final VoidFunction<T> f) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public JavaFutureAction<Void> foreachAsync(final VoidFunction<T> f) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public void foreachPartition(final VoidFunction<Iterator<T>> f) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public JavaFutureAction<Void> foreachPartitionAsync(final VoidFunction<Iterator<T>> f) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public Optional<String> getCheckpointFile() {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public int getNumPartitions() {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public StorageLevel getStorageLevel() {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public int id() {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public boolean isCheckpointed() {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public boolean isEmpty() {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public Iterator<T> iterator(final Partition split, final TaskContext taskContext) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public T max(final Comparator<T> comp) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public T min(final Comparator<T> comp) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public String name() {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public org.apache.spark.api.java.Optional<Partitioner> partitioner() {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public List<Partition> partitions() {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public void saveAsObjectFile(final String path) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public void saveAsTextFile(final String path,
                              final Class<? extends org.apache.hadoop.io.compress.CompressionCodec> codec) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public List<T> take(final int num) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public JavaFutureAction<List<T>> takeAsync(final int num) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public List<T> takeOrdered(final int num) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public List<T> takeOrdered(final int num, final Comparator<T> comp) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public List<T> takeSample(final boolean withReplacement, final int num) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public List<T> takeSample(final boolean withReplacement, final int num, final long seed) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public String toDebugString() {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public Iterator<T> toLocalIterator() {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public List<T> top(final int num) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public List<T> top(final int num, final Comparator<T> comp) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public <U> U treeAggregate(final U zeroValue, final Function2<U, T, U> seqOp, final Function2<U, U, U> combOp) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public <U> U treeAggregate(final U zeroValue, final Function2<U, T, U> seqOp,
                              final Function2<U, U, U> combOp, final int depth) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public T treeReduce(final Function2<T, T, T> f) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 
   @Override
   public T treeReduce(final Function2<T, T, T> f, final int depth) {
-    throw new UnsupportedOperationException("Operation not yet implemented.");
+    throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
   }
 }
diff --git a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/sql/Dataset.java b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/sql/Dataset.java
index 7eb999e..396fa3a 100644
--- a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/sql/Dataset.java
+++ b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/sql/Dataset.java
@@ -18,7 +18,7 @@
  */
 package org.apache.nemo.compiler.frontend.spark.sql;
 
-import org.apache.nemo.compiler.frontend.spark.core.rdd.JavaRDD;
+import org.apache.nemo.compiler.frontend.spark.core.rdd.SparkJavaRDD;
 import org.apache.nemo.compiler.frontend.spark.core.rdd.RDD;
 import org.apache.spark.api.java.function.MapFunction;
 import org.apache.spark.api.java.function.MapPartitionsFunction;
@@ -69,13 +69,13 @@ public final class Dataset<T> extends org.apache.spark.sql.Dataset<T> implements
    * @return the new javaRDD component.
    */
   @Override
-  public JavaRDD<T> javaRDD() {
+  public SparkJavaRDD<T> javaRDD() {
     return this.toJavaRDD();
   }
 
   @Override
-  public JavaRDD<T> toJavaRDD() {
-    return JavaRDD.of((SparkSession) super.sparkSession(), this);
+  public SparkJavaRDD<T> toJavaRDD() {
+    return SparkJavaRDD.of((SparkSession) super.sparkSession(), this);
   }
 
   /**
@@ -100,8 +100,8 @@ public final class Dataset<T> extends org.apache.spark.sql.Dataset<T> implements
    */
   @Override
   public RDD<T> rdd() {
-    final JavaRDD<T> javaRDD = JavaRDD.of((SparkSession) super.sparkSession(), this);
-    return javaRDD.rdd();
+    final SparkJavaRDD<T> sparkJavaRDD = SparkJavaRDD.of((SparkSession) super.sparkSession(), this);
+    return sparkJavaRDD.rdd();
   }
 
   @Override
diff --git a/compiler/frontend/spark/src/main/scala/org/apache/nemo/compiler/frontend/spark/core/rdd/RDD.scala b/compiler/frontend/spark/src/main/scala/org/apache/nemo/compiler/frontend/spark/core/rdd/RDD.scala
index 758e962..957ff31 100644
--- a/compiler/frontend/spark/src/main/scala/org/apache/nemo/compiler/frontend/spark/core/rdd/RDD.scala
+++ b/compiler/frontend/spark/src/main/scala/org/apache/nemo/compiler/frontend/spark/core/rdd/RDD.scala
@@ -81,10 +81,10 @@ final class RDD[T: ClassTag] protected[rdd] (
   }
 
   /**
-   * @return converted JavaRDD.
+   * @return converted SparkJavaRDD.
    */
-  override def toJavaRDD() : JavaRDD[T] = {
-    new JavaRDD[T](this)
+  override def toJavaRDD() : SparkJavaRDD[T] = {
+    new SparkJavaRDD[T](this)
   }
 
   /**
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LambdaPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LambdaPass.java
index b6342e2..67353e8 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LambdaPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LambdaPass.java
@@ -34,9 +34,8 @@ public final class LambdaPass extends AnnotatingPass {
 
   @Override
   public IRDAG apply(final IRDAG dag) {
-    dag.getVertices().forEach(vertex -> {
-      vertex.setPropertyPermanently(ResourceLambdaProperty.of(ResourceLambdaProperty.Value.ON));
-    });
+    dag.getVertices().forEach(vertex ->
+      vertex.setPropertyPermanently(ResourceLambdaProperty.of(ResourceLambdaProperty.Value.ON)));
     return dag;
   }
 }
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/backend/nemo/DAGConverterTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/backend/nemo/DAGConverterTest.java
index 373eb16..803b5ee 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/backend/nemo/DAGConverterTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/backend/nemo/DAGConverterTest.java
@@ -96,7 +96,7 @@ public final class DAGConverterTest {
     final List<Stage> sortedPhysicalDAG = physicalDAG.getTopologicalSort();
     final Stage physicalStage1 = sortedPhysicalDAG.get(0);
     final Stage physicalStage2 = sortedPhysicalDAG.get(1);
-    assertEquals(physicalDAG.getVertices().size(), 2);
+    assertEquals(2, physicalDAG.getVertices().size());
     assertEquals(0, physicalDAG.getIncomingEdgesOf(physicalStage1).size());
     assertEquals(1, physicalDAG.getIncomingEdgesOf(physicalStage2).size());
     assertEquals(1, physicalDAG.getOutgoingEdgesOf(physicalStage1).size());
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java
index d917310..5b878d9 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java
@@ -329,7 +329,7 @@ public final class AlternatingLeastSquare {
      * @throws Exception Exception on the way.
      */
     @ProcessElement
-    public void processElement(final ProcessContext c) throws Exception {
+    public void processElement(final ProcessContext c) {
       final float[] result = new float[numFeatures];
 
       final KV<Integer, KV<int[], float[]>> element = c.element();
@@ -356,7 +356,7 @@ public final class AlternatingLeastSquare {
    * @param args arguments.
    * @throws ClassNotFoundException exception.
    */
-  public static void main(final String[] args) throws ClassNotFoundException {
+  public static void main(final String[] args) {
     final Long start = System.currentTimeMillis();
     LOG.info(Arrays.toString(args));
     final String inputFilePath = args[0];
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java
index 7aff8da..04bdefe 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java
@@ -38,8 +38,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.nemo.common.exception.DataSourceException;
 
 import java.io.IOException;
 import java.util.UUID;
@@ -125,12 +124,10 @@ final class GenericSourceSink {
  * Write output to HDFS according to the parallelism.
  */
 final class HDFSWrite extends DoFn<String, Void> {
-  private static final Logger LOG = LoggerFactory.getLogger(HDFSWrite.class.getName());
-
   private final String path;
-  private Path fileName;
-  private FileSystem fileSystem;
-  private FSDataOutputStream outputStream;
+  private transient Path fileName;
+  private transient FileSystem fileSystem;
+  private transient FSDataOutputStream outputStream;
 
   /**
    * Constructor.
@@ -145,17 +142,17 @@ final class HDFSWrite extends DoFn<String, Void> {
    * Writes to exactly one file.
    * (The number of total output files are determined according to the parallelism.)
    * i.e. if parallelism is 2, then there are total 2 output files.
+   * TODO #273: Our custom HDFSWrite should implement WriteOperation
    */
   @Setup
   public void setup() {
     // Creating a side-effect in Setup is discouraged, but we do it anyways for now as we're extending DoFn.
-    // TODO #273: Our custom HDFSWrite should implement WriteOperation
     fileName = new Path(path + UUID.randomUUID().toString());
     try {
       fileSystem = fileName.getFileSystem(new JobConf());
       outputStream = fileSystem.create(fileName, false);
     } catch (IOException e) {
-      throw new RuntimeException(e);
+      throw new DataSourceException(new Exception("File system setup failed " + e));
     }
   }
 
@@ -166,13 +163,13 @@ final class HDFSWrite extends DoFn<String, Void> {
    * @throws Exception exception.
    */
   @ProcessElement
-  public void processElement(final ProcessContext c) throws Exception {
+  public void processElement(final ProcessContext c) throws DataSourceException, IOException {
     try {
       outputStream.writeBytes(c.element() + "\n");
     } catch (Exception e) {
       outputStream.close();
       fileSystem.delete(fileName, true);
-      throw new RuntimeException(e);
+      throw new DataSourceException(new Exception("Processing data from source failed " + e));
     }
   }
 
diff --git a/examples/spark/src/main/java/org/apache/nemo/examples/spark/JavaMapReduce.java b/examples/spark/src/main/java/org/apache/nemo/examples/spark/JavaMapReduce.java
index 2e0ba47..0fd5b0e 100644
--- a/examples/spark/src/main/java/org/apache/nemo/examples/spark/JavaMapReduce.java
+++ b/examples/spark/src/main/java/org/apache/nemo/examples/spark/JavaMapReduce.java
@@ -19,8 +19,8 @@
 package org.apache.nemo.examples.spark;
 
 import org.apache.nemo.compiler.frontend.spark.core.JavaSparkContext;
-import org.apache.nemo.compiler.frontend.spark.core.rdd.JavaPairRDD;
-import org.apache.nemo.compiler.frontend.spark.core.rdd.JavaRDD;
+import org.apache.nemo.compiler.frontend.spark.core.rdd.SparkJavaPairRDD;
+import org.apache.nemo.compiler.frontend.spark.core.rdd.SparkJavaRDD;
 import org.apache.nemo.compiler.frontend.spark.sql.SparkSession;
 import scala.Tuple2;
 
@@ -64,15 +64,15 @@ public final class JavaMapReduce {
 
     // Run MR
     final JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
-    final JavaRDD<String> data = jsc.textFile(input, parallelism);
-    final JavaPairRDD<String, Long> documentToCount = data
+    final SparkJavaRDD<String> data = jsc.textFile(input, parallelism);
+    final SparkJavaPairRDD<String, Long> documentToCount = data
       .mapToPair(line -> {
         final String[] words = line.split(" +");
         final String documentId = words[0] + "#" + words[1];
         final long count = Long.parseLong(words[2]);
         return new Tuple2<>(documentId, count);
       });
-    final JavaRDD<String> documentToSum = documentToCount
+    final SparkJavaRDD<String> documentToSum = documentToCount
       .reduceByKey((i1, i2) -> i1 + i2)
       .map(t -> t._1() + ": " + t._2());
     documentToSum.saveAsTextFile(output);
diff --git a/examples/spark/src/main/java/org/apache/nemo/examples/spark/JavaSparkPi.java b/examples/spark/src/main/java/org/apache/nemo/examples/spark/JavaSparkPi.java
index 0e716ea..cfc7ad0 100644
--- a/examples/spark/src/main/java/org/apache/nemo/examples/spark/JavaSparkPi.java
+++ b/examples/spark/src/main/java/org/apache/nemo/examples/spark/JavaSparkPi.java
@@ -19,7 +19,7 @@
 package org.apache.nemo.examples.spark;
 
 import org.apache.nemo.compiler.frontend.spark.core.JavaSparkContext;
-import org.apache.nemo.compiler.frontend.spark.core.rdd.JavaRDD;
+import org.apache.nemo.compiler.frontend.spark.core.rdd.SparkJavaRDD;
 import org.apache.nemo.compiler.frontend.spark.sql.SparkSession;
 
 import java.util.ArrayList;
@@ -58,7 +58,7 @@ public final class JavaSparkPi {
       l.add(i);
     }
 
-    JavaRDD<Integer> dataSet = jsc.parallelize(l, slices);
+    SparkJavaRDD<Integer> dataSet = jsc.parallelize(l, slices);
 
     int count = dataSet.map(integer -> {
       double x = Math.random() * 2 - 1;
diff --git a/examples/spark/src/main/java/org/apache/nemo/examples/spark/JavaWordAndLineCount.java b/examples/spark/src/main/java/org/apache/nemo/examples/spark/JavaWordAndLineCount.java
index 8263331..8d13bfe 100644
--- a/examples/spark/src/main/java/org/apache/nemo/examples/spark/JavaWordAndLineCount.java
+++ b/examples/spark/src/main/java/org/apache/nemo/examples/spark/JavaWordAndLineCount.java
@@ -18,8 +18,8 @@
  */
 package org.apache.nemo.examples.spark;
 
-import org.apache.nemo.compiler.frontend.spark.core.rdd.JavaPairRDD;
-import org.apache.nemo.compiler.frontend.spark.core.rdd.JavaRDD;
+import org.apache.nemo.compiler.frontend.spark.core.rdd.SparkJavaPairRDD;
+import org.apache.nemo.compiler.frontend.spark.core.rdd.SparkJavaRDD;
 import org.apache.nemo.compiler.frontend.spark.sql.SparkSession;
 import scala.Tuple2;
 
@@ -60,19 +60,19 @@ public final class JavaWordAndLineCount {
       .appName("JavaWordAndLineCount")
       .getOrCreate();
 
-    JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
+    SparkJavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
 
-    JavaPairRDD<String, Integer> lineOnes = lines.mapToPair(s -> new Tuple2<>("line count", 1));
+    SparkJavaPairRDD<String, Integer> lineOnes = lines.mapToPair(s -> new Tuple2<>("line count", 1));
 
-    JavaPairRDD<String, Integer> lineCounts = lineOnes.reduceByKey((i1, i2) -> i1 + i2);
+    SparkJavaPairRDD<String, Integer> lineCounts = lineOnes.reduceByKey((i1, i2) -> i1 + i2);
 
     List<Tuple2<String, Integer>> lineOutput = lineCounts.collect();
 
-    JavaRDD<String> words = lines.flatMap(s -> Arrays.asList(SPACE.split(s)).iterator());
+    SparkJavaRDD<String> words = lines.flatMap(s -> Arrays.asList(SPACE.split(s)).iterator());
 
-    JavaPairRDD<String, Integer> wordOnes = words.mapToPair(s -> new Tuple2<>(s, 1));
+    SparkJavaPairRDD<String, Integer> wordOnes = words.mapToPair(s -> new Tuple2<>(s, 1));
 
-    JavaPairRDD<String, Integer> wordCounts = wordOnes.reduceByKey((i1, i2) -> i1 + i2);
+    SparkJavaPairRDD<String, Integer> wordCounts = wordOnes.reduceByKey((i1, i2) -> i1 + i2);
 
     List<Tuple2<String, Integer>> wordOutput = wordCounts.collect();
 
diff --git a/examples/spark/src/main/java/org/apache/nemo/examples/spark/JavaWordCount.java b/examples/spark/src/main/java/org/apache/nemo/examples/spark/JavaWordCount.java
index ce9cadd..552dae9 100644
--- a/examples/spark/src/main/java/org/apache/nemo/examples/spark/JavaWordCount.java
+++ b/examples/spark/src/main/java/org/apache/nemo/examples/spark/JavaWordCount.java
@@ -18,8 +18,8 @@
  */
 package org.apache.nemo.examples.spark;
 
-import org.apache.nemo.compiler.frontend.spark.core.rdd.JavaPairRDD;
-import org.apache.nemo.compiler.frontend.spark.core.rdd.JavaRDD;
+import org.apache.nemo.compiler.frontend.spark.core.rdd.SparkJavaPairRDD;
+import org.apache.nemo.compiler.frontend.spark.core.rdd.SparkJavaRDD;
 import org.apache.nemo.compiler.frontend.spark.sql.SparkSession;
 import scala.Tuple2;
 
@@ -60,13 +60,13 @@ public final class JavaWordCount {
       .appName("JavaWordCount")
       .getOrCreate();
 
-    JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
+    SparkJavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
 
-    JavaRDD<String> words = lines.flatMap(s -> Arrays.asList(SPACE.split(s)).iterator());
+    SparkJavaRDD<String> words = lines.flatMap(s -> Arrays.asList(SPACE.split(s)).iterator());
 
-    JavaPairRDD<String, Integer> ones = words.mapToPair(s -> new Tuple2<>(s, 1));
+    SparkJavaPairRDD<String, Integer> ones = words.mapToPair(s -> new Tuple2<>(s, 1));
 
-    JavaPairRDD<String, Integer> counts = ones.reduceByKey((i1, i2) -> i1 + i2);
+    SparkJavaPairRDD<String, Integer> counts = ones.reduceByKey((i1, i2) -> i1 + i2);
 
     List<Tuple2<String, Integer>> output = counts.collect();
 
diff --git a/examples/spark/src/main/java/org/apache/nemo/examples/spark/sql/JavaSparkSQLExample.java b/examples/spark/src/main/java/org/apache/nemo/examples/spark/sql/JavaSparkSQLExample.java
index 8a4004a..92832d9 100644
--- a/examples/spark/src/main/java/org/apache/nemo/examples/spark/sql/JavaSparkSQLExample.java
+++ b/examples/spark/src/main/java/org/apache/nemo/examples/spark/sql/JavaSparkSQLExample.java
@@ -18,7 +18,7 @@
  */
 package org.apache.nemo.examples.spark.sql;
 
-import org.apache.nemo.compiler.frontend.spark.core.rdd.JavaRDD;
+import org.apache.nemo.compiler.frontend.spark.core.rdd.SparkJavaRDD;
 import org.apache.nemo.compiler.frontend.spark.sql.Dataset;
 import org.apache.nemo.compiler.frontend.spark.sql.SparkSession;
 import org.apache.spark.api.java.function.Function;
@@ -44,6 +44,8 @@ import static org.apache.spark.sql.functions.col;
  * This code has been copied from the Apache Spark (https://github.com/apache/spark) to demonstrate a spark example.
  */
 public final class JavaSparkSQLExample {
+  private static final String PEOPLE = "people";
+  private static final String NAME = "Name: ";
 
   /**
    * Private constructor.
@@ -182,7 +184,7 @@ public final class JavaSparkSQLExample {
     // +----+-----+
 
     // Register the DataFrame as a SQL temporary view
-    df.createOrReplaceTempView("people");
+    df.createOrReplaceTempView(PEOPLE);
 
     Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
     sqlDF.show();
@@ -195,7 +197,7 @@ public final class JavaSparkSQLExample {
     // +----+-------+
 
     // Register the DataFrame as a global temporary view
-    df.createGlobalTempView("people");
+    df.createGlobalTempView(PEOPLE);
 
     // Global temporary view is tied to a system preserved database `global_temp`
     spark.sql("SELECT * FROM global_temp.people").show();
@@ -274,7 +276,7 @@ public final class JavaSparkSQLExample {
    */
   private static void runInferSchemaExample(final SparkSession spark, final String peopleTxt) {
     // Create an RDD of Person objects from a text file
-    JavaRDD<Person> peopleRDD = spark.read()
+    SparkJavaRDD<Person> peopleRDD = spark.read()
       .textFile(peopleTxt)
       .javaRDD()
       .map(line -> {
@@ -288,7 +290,7 @@ public final class JavaSparkSQLExample {
     // Apply a schema to an RDD of JavaBeans to get a DataFrame
     Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);
     // Register the DataFrame as a temporary view
-    peopleDF.createOrReplaceTempView("people");
+    peopleDF.createOrReplaceTempView(PEOPLE);
 
     // SQL statements can be run by using the sql methods provided by spark
     Dataset<Row> teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
@@ -296,7 +298,7 @@ public final class JavaSparkSQLExample {
     // The columns of a row in the result can be accessed by field index
     Encoder<String> stringEncoder = Encoders.STRING();
     Dataset<String> teenagerNamesByIndexDF = teenagersDF.map(
-      (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
+      (MapFunction<Row, String>) row -> NAME + row.getString(0),
       stringEncoder);
     teenagerNamesByIndexDF.show();
     // +------------+
@@ -307,7 +309,7 @@ public final class JavaSparkSQLExample {
 
     // or by field name
     Dataset<String> teenagerNamesByFieldDF = teenagersDF.map(
-      (MapFunction<Row, String>) row -> "Name: " + row.<String>getAs("name"),
+      (MapFunction<Row, String>) row -> NAME + row.<String>getAs("name"),
       stringEncoder);
     teenagerNamesByFieldDF.show();
     // +------------+
@@ -325,7 +327,7 @@ public final class JavaSparkSQLExample {
    */
   private static void runProgrammaticSchemaExample(final SparkSession spark, final String peopleTxt) {
     // Create an RDD
-    JavaRDD<String> peopleRDD = spark.read()
+    SparkJavaRDD<String> peopleRDD = spark.read()
       .textFile(peopleTxt)
       .toJavaRDD();
 
@@ -341,7 +343,7 @@ public final class JavaSparkSQLExample {
     StructType schema = DataTypes.createStructType(fields);
 
     // Convert records of the RDD (people) to Rows
-    JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> {
+    SparkJavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> {
       String[] attributes = record.split(",");
       return RowFactory.create(attributes[0], attributes[1].trim());
     });
@@ -350,7 +352,7 @@ public final class JavaSparkSQLExample {
     Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);
 
     // Creates a temporary view using the DataFrame
-    peopleDataFrame.createOrReplaceTempView("people");
+    peopleDataFrame.createOrReplaceTempView(PEOPLE);
 
     // SQL can be run over a temporary view created using DataFrames
     Dataset<Row> results = spark.sql("SELECT name FROM people");
@@ -358,7 +360,7 @@ public final class JavaSparkSQLExample {
     // The results of SQL queries are DataFrames and support all the normal RDD operations
     // The columns of a row in the result can be accessed by field index or by field name
     Dataset<String> namesDS = results.map(
-      (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
+      (MapFunction<Row, String>) row -> NAME + row.getString(0),
       Encoders.STRING());
     namesDS.show();
     // +-------------+
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/FileBlock.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/FileBlock.java
index 05c310e..68d1784 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/FileBlock.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/FileBlock.java
@@ -29,11 +29,8 @@ import org.apache.nemo.runtime.executor.data.FileArea;
 import org.apache.nemo.runtime.executor.data.metadata.FileMetadata;
 import org.apache.nemo.runtime.executor.data.metadata.PartitionMetadata;
 import org.apache.nemo.runtime.executor.data.partition.NonSerializedPartition;
-import org.apache.nemo.runtime.executor.data.partition.Partition;
 import org.apache.nemo.runtime.executor.data.partition.SerializedPartition;
 import org.apache.nemo.runtime.executor.data.streamchainer.Serializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.NotThreadSafe;
 
@@ -62,7 +59,6 @@ import java.util.Optional;
  */
 @NotThreadSafe
 public final class FileBlock<K extends Serializable> implements Block<K> {
-  private static final Logger LOG = LoggerFactory.getLogger(FileBlock.class.getName());
   private final String id;
   private final Map<K, SerializedPartition<K>> nonCommittedPartitionsMap;
   private final Serializer serializer;
@@ -70,6 +66,7 @@ public final class FileBlock<K extends Serializable> implements Block<K> {
   private final FileMetadata<K> metadata;
   private final MemoryPoolAssigner memoryPoolAssigner;
   private static final String ALREADY_COMMITED = "The partition is already committed!";
+  private static final String CANNOT_RETRIEVE_BEFORE_COMMITED = "Cannot retrieve elements before a block is committed!";
 
   /**
    * Constructor.
@@ -127,7 +124,7 @@ public final class FileBlock<K extends Serializable> implements Block<K> {
    */
   @Override
   public void write(final K key,
-                    final Object element) throws BlockWriteException {
+                    final Object element) {
     if (metadata.isCommitted()) {
       throw new BlockWriteException(new Throwable(ALREADY_COMMITED));
     } else {
@@ -152,8 +149,7 @@ public final class FileBlock<K extends Serializable> implements Block<K> {
    * @throws BlockWriteException for any error occurred while trying to write a block.
    */
   @Override
-  public void writePartitions(final Iterable<NonSerializedPartition<K>> partitions)
-    throws BlockWriteException {
+  public void writePartitions(final Iterable<NonSerializedPartition<K>> partitions) {
     if (metadata.isCommitted()) {
       throw new BlockWriteException(new Throwable(ALREADY_COMMITED));
     } else {
@@ -175,8 +171,7 @@ public final class FileBlock<K extends Serializable> implements Block<K> {
    * @throws BlockWriteException for any error occurred while trying to write a block.
    */
   @Override
-  public void writeSerializedPartitions(final Iterable<SerializedPartition<K>> partitions)
-    throws BlockWriteException {
+  public void writeSerializedPartitions(final Iterable<SerializedPartition<K>> partitions) {
     if (metadata.isCommitted()) {
       throw new BlockWriteException(new Throwable(ALREADY_COMMITED));
     } else {
@@ -196,9 +191,9 @@ public final class FileBlock<K extends Serializable> implements Block<K> {
    * @throws BlockFetchException for any error occurred while trying to fetch a block.
    */
   @Override
-  public Iterable<NonSerializedPartition<K>> readPartitions(final KeyRange keyRange) throws BlockFetchException {
+  public Iterable<NonSerializedPartition<K>> readPartitions(final KeyRange keyRange) {
     if (!metadata.isCommitted()) {
-      throw new BlockFetchException(new Throwable("Cannot retrieve elements before a block is committed"));
+      throw new BlockFetchException(new Throwable(CANNOT_RETRIEVE_BEFORE_COMMITED));
     } else {
       // Deserialize the data
       final List<NonSerializedPartition<K>> deserializedPartitions = new ArrayList<>();
@@ -242,9 +237,9 @@ public final class FileBlock<K extends Serializable> implements Block<K> {
    * @throws BlockFetchException for any error occurred while trying to fetch a block.
    */
   @Override
-  public Iterable<SerializedPartition<K>> readSerializedPartitions(final KeyRange keyRange) throws BlockFetchException {
+  public Iterable<SerializedPartition<K>> readSerializedPartitions(final KeyRange keyRange) {
     if (!metadata.isCommitted()) {
-      throw new BlockFetchException(new Throwable("Cannot retrieve elements before a block is committed"));
+      throw new BlockFetchException(new Throwable(CANNOT_RETRIEVE_BEFORE_COMMITED));
     } else {
       // Deserialize the data
       final List<SerializedPartition<K>> partitionsInRange = new ArrayList<>();
@@ -303,7 +298,7 @@ public final class FileBlock<K extends Serializable> implements Block<K> {
    */
   public List<FileArea> asFileAreas(final KeyRange keyRange) throws IOException {
     if (!metadata.isCommitted()) {
-      throw new IOException("Cannot retrieve elements before a block is committed");
+      throw new IOException(CANNOT_RETRIEVE_BEFORE_COMMITED);
     } else {
       final List<FileArea> fileAreas = new ArrayList<>();
       for (final PartitionMetadata<K> partitionMetadata : metadata.getPartitionMetadataList()) {
@@ -335,7 +330,7 @@ public final class FileBlock<K extends Serializable> implements Block<K> {
    * @throws BlockWriteException for any error occurred while trying to write a block.
    */
   @Override
-  public synchronized Optional<Map<K, Long>> commit() throws BlockWriteException {
+  public synchronized Optional<Map<K, Long>> commit() {
     try {
       if (!metadata.isCommitted()) {
         commitPartitions();
@@ -364,12 +359,12 @@ public final class FileBlock<K extends Serializable> implements Block<K> {
    * The committed partitions will be flushed to the storage.
    */
   @Override
-  public synchronized void commitPartitions() throws BlockWriteException {
+  public synchronized void commitPartitions() {
     final List<SerializedPartition<K>> partitions = new ArrayList<>();
     try {
-      for (final Partition<?, K> partition : nonCommittedPartitionsMap.values()) {
+      for (final SerializedPartition<K> partition : nonCommittedPartitionsMap.values()) {
         partition.commit();
-        partitions.add((SerializedPartition<K>) partition);
+        partitions.add(partition);
       }
       writeToFile(partitions);
       nonCommittedPartitionsMap.clear();
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicy.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicy.java
index b416445..6083461 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicy.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicy.java
@@ -42,7 +42,7 @@ public final class MinOccupancyFirstSchedulingPolicy implements SchedulingPolicy
   public ExecutorRepresenter selectExecutor(final Collection<ExecutorRepresenter> executors, final Task task) {
     final OptionalInt minOccupancy =
       executors.stream()
-        .map(executor -> executor.getNumOfRunningTasks())
+        .map(ExecutorRepresenter::getNumOfRunningTasks)
         .mapToInt(i -> i).min();
 
     if (!minOccupancy.isPresent()) {