You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by wo...@apache.org on 2020/02/11 05:50:28 UTC
[incubator-nemo] branch master updated: [NEMO-429] SWPP TEAM20 Code
Smell Fix (#283)
This is an automated email from the ASF dual-hosted git repository.
wonook pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new a7b9cf6 [NEMO-429] SWPP TEAM20 Code Smell Fix (#283)
a7b9cf6 is described below
commit a7b9cf6441b930ea29be49ca996f1ad393c8f754
Author: Jeongyoon Eo <je...@gmail.com>
AuthorDate: Tue Feb 11 14:50:17 2020 +0900
[NEMO-429] SWPP TEAM20 Code Smell Fix (#283)
JIRA: NEMO-429: SWPP TEAM20 Code Smell Fix
Major changes:
- Fixed code smells (SWPP Code Smell session)
Co-authored-by: jihoonrf <kz...@snu.ac.kr>
Co-authored-by: Spiraline <44...@users.noreply.github.com>
---
.../nemo/common/exception/DataSourceException.java | 30 +-
.../frontend/spark/coder/SparkEncoderFactory.java | 5 +-
.../frontend/spark/core/JavaSparkContext.java | 26 +-
.../compiler/frontend/spark/core/SparkContext.java | 8 +-
.../frontend/spark/core/rdd/JavaPairRDD.java | 711 ---------------------
.../frontend/spark/core/rdd/SparkJavaPairRDD.java | 661 +++++++++++++++++++
.../core/rdd/{JavaRDD.java => SparkJavaRDD.java} | 284 ++++----
.../nemo/compiler/frontend/spark/sql/Dataset.java | 12 +-
.../compiler/frontend/spark/core/rdd/RDD.scala | 6 +-
.../pass/compiletime/annotating/LambdaPass.java | 5 +-
.../compiler/backend/nemo/DAGConverterTest.java | 2 +-
.../nemo/examples/beam/AlternatingLeastSquare.java | 4 +-
.../nemo/examples/beam/GenericSourceSink.java | 19 +-
.../apache/nemo/examples/spark/JavaMapReduce.java | 10 +-
.../apache/nemo/examples/spark/JavaSparkPi.java | 4 +-
.../nemo/examples/spark/JavaWordAndLineCount.java | 16 +-
.../apache/nemo/examples/spark/JavaWordCount.java | 12 +-
.../examples/spark/sql/JavaSparkSQLExample.java | 24 +-
.../runtime/executor/data/block/FileBlock.java | 31 +-
.../MinOccupancyFirstSchedulingPolicy.java | 2 +-
20 files changed, 903 insertions(+), 969 deletions(-)
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LambdaPass.java b/common/src/main/java/org/apache/nemo/common/exception/DataSourceException.java
similarity index 55%
copy from compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LambdaPass.java
copy to common/src/main/java/org/apache/nemo/common/exception/DataSourceException.java
index b6342e2..f5d94ea 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LambdaPass.java
+++ b/common/src/main/java/org/apache/nemo/common/exception/DataSourceException.java
@@ -16,27 +16,19 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.nemo.compiler.optimizer.pass.compiletime.annotating;
-
-import org.apache.nemo.common.ir.IRDAG;
-import org.apache.nemo.common.ir.vertex.executionproperty.ResourceLambdaProperty;
+package org.apache.nemo.common.exception;
/**
- * Lambda Pass.
- * Description: A part of lambda executor, assigning LambdaResourceProperty
+ * DataSourceException
+ * Thrown when any exception occurs in data sources. ex. processing data from file systems.
*/
-@Annotates(ResourceLambdaProperty.class)
-public final class LambdaPass extends AnnotatingPass {
-
- public LambdaPass() {
- super(LambdaPass.class);
- }
-
- @Override
- public IRDAG apply(final IRDAG dag) {
- dag.getVertices().forEach(vertex -> {
- vertex.setPropertyPermanently(ResourceLambdaProperty.of(ResourceLambdaProperty.Value.ON));
- });
- return dag;
+public final class DataSourceException extends RuntimeException {
+ /**
+ * DataSourceException.
+ *
+ * @param throwable the throwable to throw.
+ */
+ public DataSourceException(final Throwable throwable) {
+ super(throwable);
}
}
diff --git a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/coder/SparkEncoderFactory.java b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/coder/SparkEncoderFactory.java
index bb1174b..7df8024 100644
--- a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/coder/SparkEncoderFactory.java
+++ b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/coder/SparkEncoderFactory.java
@@ -32,7 +32,7 @@ import java.io.OutputStream;
* @param <T> type of the object to serialize.
*/
public final class SparkEncoderFactory<T> implements EncoderFactory<T> {
- private final Serializer serializer;
+ private final transient Serializer serializer;
/**
* Default constructor.
@@ -61,8 +61,7 @@ public final class SparkEncoderFactory<T> implements EncoderFactory<T> {
* @param <T2> type of the object to serialize.
*/
private final class SparkEncoder<T2> implements Encoder<T2> {
-
- private final SerializationStream out;
+ private final transient SerializationStream out;
/**
* Constructor.
diff --git a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/JavaSparkContext.java b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/JavaSparkContext.java
index d4cd2a7..b1a7448 100644
--- a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/JavaSparkContext.java
+++ b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/JavaSparkContext.java
@@ -18,7 +18,7 @@
*/
package org.apache.nemo.compiler.frontend.spark.core;
-import org.apache.nemo.compiler.frontend.spark.core.rdd.JavaRDD;
+import org.apache.nemo.compiler.frontend.spark.core.rdd.SparkJavaRDD;
import org.apache.spark.SparkContext;
import java.util.List;
@@ -39,46 +39,46 @@ public final class JavaSparkContext {
}
/**
- * Create a String {@link JavaRDD} from a text file path.
+ * Create a String {@link SparkJavaRDD} from a text file path.
*
* @param path the path to read.
* @return the RDD.
*/
- public JavaRDD<String> textFile(final String path) {
+ public SparkJavaRDD<String> textFile(final String path) {
return this.textFile(path, 1);
}
/**
- * Create a String {@link JavaRDD} from a text file path with specific minimum parallelism.
+ * Create a String {@link SparkJavaRDD} from a text file path with specific minimum parallelism.
*
* @param path the path to read.
* @param minPartitions the minimum parallelism.
* @return the RDD.
*/
- public JavaRDD<String> textFile(final String path, final int minPartitions) {
- return JavaRDD.of(sparkContext, minPartitions, path);
+ public SparkJavaRDD<String> textFile(final String path, final int minPartitions) {
+ return SparkJavaRDD.of(sparkContext, minPartitions, path);
}
/**
- * Initiate a JavaRDD with the number of parallelism.
+ * Initiate a SparkJavaRDD with the number of parallelism.
*
* @param list input data as list.
* @param <T> type of the initial element.
- * @return the newly initiated JavaRDD.
+ * @return the newly initiated SparkJavaRDD.
*/
- public <T> JavaRDD<T> parallelize(final List<T> list) {
+ public <T> SparkJavaRDD<T> parallelize(final List<T> list) {
return this.parallelize(list, 1);
}
/**
- * Initiate a JavaRDD with the number of parallelism.
+ * Initiate a SparkJavaRDD with the number of parallelism.
*
* @param l input data as list.
* @param slices number of slices (parallelism).
* @param <T> type of the initial element.
- * @return the newly initiated JavaRDD.
+ * @return the newly initiated SparkJavaRDD.
*/
- public <T> JavaRDD<T> parallelize(final List<T> l, final int slices) {
- return JavaRDD.of(this.sparkContext, l, slices);
+ public <T> SparkJavaRDD<T> parallelize(final List<T> l, final int slices) {
+ return SparkJavaRDD.of(this.sparkContext, l, slices);
}
}
diff --git a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/SparkContext.java b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/SparkContext.java
index c12b3b0..12bd878 100644
--- a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/SparkContext.java
+++ b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/SparkContext.java
@@ -19,7 +19,7 @@
package org.apache.nemo.compiler.frontend.spark.core;
import org.apache.nemo.compiler.frontend.spark.SparkBroadcastVariables;
-import org.apache.nemo.compiler.frontend.spark.core.rdd.JavaRDD;
+import org.apache.nemo.compiler.frontend.spark.core.rdd.SparkJavaRDD;
import org.apache.nemo.compiler.frontend.spark.core.rdd.RDD;
import org.apache.spark.SparkConf;
import org.apache.spark.broadcast.Broadcast;
@@ -55,19 +55,19 @@ public final class SparkContext extends org.apache.spark.SparkContext {
}
/**
- * Initiate a JavaRDD with the number of parallelism.
+ * Initiate a SparkJavaRDD with the number of parallelism.
*
* @param seq input data as list.
* @param numSlices number of slices (parallelism).
* @param evidence type of the initial element.
- * @return the newly initiated JavaRDD.
+ * @return the newly initiated SparkJavaRDD.
*/
@Override
public <T> RDD<T> parallelize(final Seq<T> seq,
final int numSlices,
final ClassTag<T> evidence) {
final List<T> javaList = scala.collection.JavaConversions.seqAsJavaList(seq);
- return JavaRDD.of(this.sparkContext, javaList, numSlices).rdd();
+ return SparkJavaRDD.of(this.sparkContext, javaList, numSlices).rdd();
}
@Override
diff --git a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/rdd/JavaPairRDD.java b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/rdd/JavaPairRDD.java
deleted file mode 100644
index 1c85965..0000000
--- a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/rdd/JavaPairRDD.java
+++ /dev/null
@@ -1,711 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nemo.compiler.frontend.spark.core.rdd;
-
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.spark.Partitioner;
-import org.apache.spark.SparkContext;
-import org.apache.spark.api.java.Optional;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.partial.BoundedDouble;
-import org.apache.spark.partial.PartialResult;
-import org.apache.spark.serializer.Serializer;
-import org.apache.spark.storage.StorageLevel;
-import scala.Tuple2;
-import scala.Tuple3;
-import scala.Tuple4;
-import scala.reflect.ClassTag$;
-
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Java RDD for pairs.
- *
- * @param <K> key type.
- * @param <V> value type.
- */
-public final class JavaPairRDD<K, V> extends org.apache.spark.api.java.JavaPairRDD<K, V> {
-
- private final RDD<Tuple2<K, V>> rdd;
-
- /**
- * Static method to create a JavaPairRDD object from {@link RDD}.
- *
- * @param rddFrom the RDD to parse.
- * @param <K> type of the key.
- * @param <V> type of the value.
- * @return the parsed JavaPairRDD object.
- */
- public static <K, V> JavaPairRDD<K, V> fromRDD(final RDD<Tuple2<K, V>> rddFrom) {
- return new JavaPairRDD<>(rddFrom);
- }
-
- @Override
- public JavaPairRDD<K, V> wrapRDD(final org.apache.spark.rdd.RDD<Tuple2<K, V>> rddFrom) {
- if (!(rddFrom instanceof RDD)) {
- throw new UnsupportedOperationException("Cannot wrap Spark RDD as Nemo RDD!");
- }
- return fromRDD((RDD<Tuple2<K, V>>) rddFrom);
- }
-
- @Override
- public RDD<Tuple2<K, V>> rdd() {
- return rdd;
- }
-
- /**
- * Constructor with existing nemo RDD.
- *
- * @param rdd the Nemo rdd to wrap.
- */
- JavaPairRDD(final RDD<Tuple2<K, V>> rdd) {
- super(rdd, ClassTag$.MODULE$.apply(Object.class), ClassTag$.MODULE$.apply(Object.class));
-
- this.rdd = rdd;
- }
-
- /**
- * @return the spark context.
- */
- public SparkContext getSparkContext() {
- return rdd.sparkContext();
- }
-
- /////////////// TRANSFORMATIONS ///////////////
-
- @Override
- public JavaPairRDD<K, V> reduceByKey(final Function2<V, V, V> func) {
- // Explicit conversion
- final PairRDDFunctions<K, V> pairRdd = RDD.rddToPairRDDFunctions(
- rdd, ClassTag$.MODULE$.apply(Object.class), ClassTag$.MODULE$.apply(Object.class), null);
- final RDD<Tuple2<K, V>> reducedRdd = pairRdd.reduceByKey(func);
- return JavaPairRDD.fromRDD(reducedRdd);
- }
-
- @Override
- public <R> JavaRDD<R> map(final Function<Tuple2<K, V>, R> f) {
- return rdd.map(f, ClassTag$.MODULE$.apply(Object.class)).toJavaRDD();
- }
-
- /////////////// ACTIONS ///////////////
-
- @Override
- public List<Tuple2<K, V>> collect() {
- return rdd.collectAsList();
- }
-
- /////////////// UNSUPPORTED METHODS ///////////////
- //TODO#92: Implement the unimplemented transformations/actions & dataset initialization methods for Spark frontend.
-
- @Override
- public JavaPairRDD<K, V> cache() {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public JavaPairRDD<K, V> persist(final StorageLevel newLevel) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public JavaPairRDD<K, V> unpersist() {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public JavaPairRDD<K, V> unpersist(final boolean blocking) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public JavaPairRDD<K, V> distinct() {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public JavaPairRDD<K, V> distinct(final int numPartitions) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public JavaPairRDD<K, V> filter(final Function<Tuple2<K, V>, Boolean> f) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public JavaPairRDD<K, V> coalesce(final int numPartitions) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public JavaPairRDD<K, V> coalesce(final int numPartitions,
- final boolean shuffle) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public JavaPairRDD<K, V> repartition(final int numPartitions) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public JavaPairRDD<K, V> sample(final boolean withReplacement,
- final double fraction) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public JavaPairRDD<K, V> sample(final boolean withReplacement,
- final double fraction,
- final long seed) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public JavaPairRDD<K, V> sampleByKey(final boolean withReplacement,
- final Map<K, Double> fractions,
- final long seed) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public JavaPairRDD<K, V> sampleByKey(final boolean withReplacement,
- final Map<K, Double> fractions) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public JavaPairRDD<K, V> sampleByKeyExact(final boolean withReplacement,
- final Map<K, Double> fractions,
- final long seed) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public JavaPairRDD<K, V> sampleByKeyExact(final boolean withReplacement,
- final Map<K, Double> fractions) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public JavaPairRDD<K, V> union(final org.apache.spark.api.java.JavaPairRDD<K, V> other) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public JavaPairRDD<K, V> intersection(final org.apache.spark.api.java.JavaPairRDD<K, V> other) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public Tuple2<K, V> first() {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public <C> JavaPairRDD<K, C> combineByKey(final Function<V, C> createCombiner,
- final Function2<C, V, C> mergeValue,
- final Function2<C, C, C> mergeCombiners,
- final Partitioner partitioner,
- final boolean mapSideCombine,
- final Serializer serializer) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public <C> JavaPairRDD<K, C> combineByKey(final Function<V, C> createCombiner,
- final Function2<C, V, C> mergeValue,
- final Function2<C, C, C> mergeCombiners,
- final Partitioner partitioner) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public <C> JavaPairRDD<K, C> combineByKey(final Function<V, C> createCombiner,
- final Function2<C, V, C> mergeValue,
- final Function2<C, C, C> mergeCombiners,
- final int numPartitions) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public JavaPairRDD<K, V> reduceByKey(final Partitioner partitioner,
- final Function2<V, V, V> func) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public Map<K, V> reduceByKeyLocally(final Function2<V, V, V> func) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public Map<K, Long> countByKey() {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public PartialResult<Map<K, BoundedDouble>> countByKeyApprox(final long timeout) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public PartialResult<Map<K, BoundedDouble>> countByKeyApprox(final long timeout,
- final double confidence) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public double countByKeyApprox$default$2() {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public <U> JavaPairRDD<K, U> aggregateByKey(final U zeroValue,
- final Partitioner partitioner,
- final Function2<U, V, U> seqFunc,
- final Function2<U, U, U> combFunc) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public <U> JavaPairRDD<K, U> aggregateByKey(final U zeroValue,
- final int numPartitions,
- final Function2<U, V, U> seqFunc,
- final Function2<U, U, U> combFunc) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public <U> JavaPairRDD<K, U> aggregateByKey(final U zeroValue,
- final Function2<U, V, U> seqFunc,
- final Function2<U, U, U> combFunc) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public JavaPairRDD<K, V> foldByKey(final V zeroValue,
- final Partitioner partitioner,
- final Function2<V, V, V> func) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public JavaPairRDD<K, V> foldByKey(final V zeroValue,
- final int numPartitions,
- final Function2<V, V, V> func) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public JavaPairRDD<K, V> foldByKey(final V zeroValue,
- final Function2<V, V, V> func) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public JavaPairRDD<K, V> reduceByKey(final Function2<V, V, V> func,
- final int numPartitions) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public JavaPairRDD<K, Iterable<V>> groupByKey(final Partitioner partitioner) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public JavaPairRDD<K, Iterable<V>> groupByKey(final int numPartitions) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public JavaPairRDD<K, V> subtract(final org.apache.spark.api.java.JavaPairRDD<K, V> other) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public JavaPairRDD<K, V> subtract(final org.apache.spark.api.java.JavaPairRDD<K, V> other,
- final int numPartitions) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public JavaPairRDD<K, V> subtract(final org.apache.spark.api.java.JavaPairRDD<K, V> other,
- final Partitioner p) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public <W> JavaPairRDD<K, V> subtractByKey(final org.apache.spark.api.java.JavaPairRDD<K, W> other) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public <W> JavaPairRDD<K, V> subtractByKey(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
- final int numPartitions) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public <W> JavaPairRDD<K, V> subtractByKey(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
- final Partitioner p) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public JavaPairRDD<K, V> partitionBy(final Partitioner partitioner) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public <W> JavaPairRDD<K, Tuple2<V, W>> join(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
- final Partitioner partitioner) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public <W> JavaPairRDD<K, Tuple2<V, Optional<W>>>
- leftOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
- final Partitioner partitioner) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public <W> JavaPairRDD<K, Tuple2<Optional<V>, W>>
- rightOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
- final Partitioner partitioner) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public <W> JavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>>
- fullOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
- final Partitioner partitioner) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public <C> JavaPairRDD<K, C> combineByKey(final Function<V, C> createCombiner,
- final Function2<C, V, C> mergeValue,
- final Function2<C, C, C> mergeCombiners) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public org.apache.spark.api.java.JavaPairRDD<K, Iterable<V>> groupByKey() {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public <W> JavaPairRDD<K, Tuple2<V, W>> join(final org.apache.spark.api.java.JavaPairRDD<K, W> other) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public <W> JavaPairRDD<K, Tuple2<V, W>> join(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
- final int numPartitions) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public <W> JavaPairRDD<K, Tuple2<V, Optional<W>>>
- leftOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public <W> JavaPairRDD<K, Tuple2<V, Optional<W>>>
- leftOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
- final int numPartitions) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public <W> JavaPairRDD<K, Tuple2<Optional<V>, W>>
- rightOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public <W> JavaPairRDD<K, Tuple2<Optional<V>, W>>
- rightOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
- final int numPartitions) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public <W> JavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>>
- fullOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public <W> JavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>>
- fullOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
- final int numPartitions) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public Map<K, V> collectAsMap() {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public <U> JavaPairRDD<K, U> mapValues(final Function<V, U> f) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public <U> JavaPairRDD<K, U> flatMapValues(final Function<V, Iterable<U>> f) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public <W> JavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>>
- cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
- final Partitioner partitioner) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public <W1, W2> JavaPairRDD<K, Tuple3<Iterable<V>, Iterable<W1>, Iterable<W2>>>
- cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W1> other1,
- final org.apache.spark.api.java.JavaPairRDD<K, W2> other2,
- final Partitioner partitioner) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public <W1, W2, W3> JavaPairRDD<K, Tuple4<Iterable<V>, Iterable<W1>, Iterable<W2>, Iterable<W3>>>
- cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W1> other1,
- final org.apache.spark.api.java.JavaPairRDD<K, W2> other2,
- final org.apache.spark.api.java.JavaPairRDD<K, W3> other3,
- final Partitioner partitioner) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public <W> JavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>>
- cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W> other) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public <W1, W2> JavaPairRDD<K, Tuple3<Iterable<V>, Iterable<W1>, Iterable<W2>>>
- cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W1> other1,
- final org.apache.spark.api.java.JavaPairRDD<K, W2> other2) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public <W1, W2, W3> JavaPairRDD<K, Tuple4<Iterable<V>, Iterable<W1>, Iterable<W2>, Iterable<W3>>>
- cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W1> other1,
- final org.apache.spark.api.java.JavaPairRDD<K, W2> other2,
- final org.apache.spark.api.java.JavaPairRDD<K, W3> other3) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public <W> JavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>>
- cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
- final int numPartitions) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public <W1, W2> JavaPairRDD<K, Tuple3<Iterable<V>, Iterable<W1>, Iterable<W2>>>
- cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W1> other1,
- final org.apache.spark.api.java.JavaPairRDD<K, W2> other2,
- final int numPartitions) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public <W1, W2, W3> JavaPairRDD<K, Tuple4<Iterable<V>, Iterable<W1>, Iterable<W2>, Iterable<W3>>>
- cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W1> other1,
- final org.apache.spark.api.java.JavaPairRDD<K, W2> other2,
- final org.apache.spark.api.java.JavaPairRDD<K, W3> other3,
- final int numPartitions) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public <W> JavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>>
- groupWith(final org.apache.spark.api.java.JavaPairRDD<K, W> other) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public <W1, W2> JavaPairRDD<K, Tuple3<Iterable<V>, Iterable<W1>, Iterable<W2>>>
- groupWith(final org.apache.spark.api.java.JavaPairRDD<K, W1> other1,
- final org.apache.spark.api.java.JavaPairRDD<K, W2> other2) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public <W1, W2, W3> JavaPairRDD<K, Tuple4<Iterable<V>, Iterable<W1>, Iterable<W2>, Iterable<W3>>>
- groupWith(final org.apache.spark.api.java.JavaPairRDD<K, W1> other1,
- final org.apache.spark.api.java.JavaPairRDD<K, W2> other2,
- final org.apache.spark.api.java.JavaPairRDD<K, W3> other3) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public List<V> lookup(final K key) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- // Commented out due to an IDE issue
- /*@Override
- public <F extends OutputFormat<?, ?>> void saveAsHadoopFile(final String path,
- final Class<?> keyClass,
- final Class<?> valueClass,
- final Class<F> outputFormatClass,
- final JobConf conf) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public <F extends OutputFormat<?, ?>> void saveAsHadoopFile(final String path,
- final Class<?> keyClass,
- final Class<?> valueClass,
- final Class<F> outputFormatClass) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public <F extends OutputFormat<?, ?>> void saveAsHadoopFile(final String path,
- final Class<?> keyClass,
- final Class<?> valueClass,
- final Class<F> outputFormatClass,
- final Class<? extends CompressionCodec> codec) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public <F extends org.apache.hadoop.mapreduce.OutputFormat<?, ?>> void
- saveAsNewAPIHadoopFile(final String path,
- final Class<?> keyClass,
- final Class<?> valueClass,
- final Class<F> outputFormatClass,
- final Configuration conf) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public void saveAsNewAPIHadoopDataset(final Configuration conf) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public <F extends org.apache.hadoop.mapreduce.OutputFormat<?, ?>> void
- saveAsNewAPIHadoopFile(final String path,
- final Class<?> keyClass,
- final Class<?> valueClass,
- final Class<F> outputFormatClass) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }*/
-
- @Override
- public void saveAsHadoopDataset(final JobConf conf) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public JavaPairRDD<K, V> repartitionAndSortWithinPartitions(final Partitioner partitioner) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public JavaPairRDD<K, V> repartitionAndSortWithinPartitions(final Partitioner partitioner,
- final Comparator<K> comp) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public JavaPairRDD<K, V> sortByKey() {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public JavaPairRDD<K, V> sortByKey(final boolean ascending) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public JavaPairRDD<K, V> sortByKey(final boolean ascending,
- final int numPartitions) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public JavaPairRDD<K, V> sortByKey(final Comparator<K> comp) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public JavaPairRDD<K, V> sortByKey(final Comparator<K> comp,
- final boolean ascending) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public JavaPairRDD<K, V> sortByKey(final Comparator<K> comp,
- final boolean ascending,
- final int numPartitions) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public JavaRDD<K> keys() {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public JavaRDD<V> values() {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public JavaPairRDD<K, Long> countApproxDistinctByKey(final double relativeSD,
- final Partitioner partitioner) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public JavaPairRDD<K, Long> countApproxDistinctByKey(final double relativeSD,
- final int numPartitions) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public JavaPairRDD<K, Long> countApproxDistinctByKey(final double relativeSD) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-
- @Override
- public JavaPairRDD<K, V> setName(final String name) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
- }
-}
diff --git a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/rdd/SparkJavaPairRDD.java b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/rdd/SparkJavaPairRDD.java
new file mode 100644
index 0000000..83e538c
--- /dev/null
+++ b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/rdd/SparkJavaPairRDD.java
@@ -0,0 +1,661 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.spark.core.rdd;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.spark.Partitioner;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.Optional;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.partial.BoundedDouble;
+import org.apache.spark.partial.PartialResult;
+import org.apache.spark.serializer.Serializer;
+import org.apache.spark.storage.StorageLevel;
+import scala.Tuple2;
+import scala.Tuple3;
+import scala.Tuple4;
+import scala.reflect.ClassTag$;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Java RDD for pairs.
+ *
+ * @param <K> key type.
+ * @param <V> value type.
+ */
+public final class SparkJavaPairRDD<K, V> extends org.apache.spark.api.java.JavaPairRDD<K, V> {
+
+ private final RDD<Tuple2<K, V>> rdd;
+ private static final String NOT_YET_SUPPORTED = "Operation not yet supported.";
+
+ /**
+ * Static method to create a SparkJavaPairRDD object from {@link RDD}.
+ *
+ * @param rddFrom the RDD to parse.
+ * @param <K> type of the key.
+ * @param <V> type of the value.
+ * @return the parsed SparkJavaPairRDD object.
+ */
+ public static <K, V> SparkJavaPairRDD<K, V> fromRDD(final RDD<Tuple2<K, V>> rddFrom) {
+ return new SparkJavaPairRDD<>(rddFrom);
+ }
+
+ @Override
+ public SparkJavaPairRDD<K, V> wrapRDD(final org.apache.spark.rdd.RDD<Tuple2<K, V>> rddFrom) {
+ if (!(rddFrom instanceof RDD)) {
+ throw new UnsupportedOperationException("Cannot wrap Spark RDD as Nemo RDD!");
+ }
+ return fromRDD((RDD<Tuple2<K, V>>) rddFrom);
+ }
+
+ @Override
+ public RDD<Tuple2<K, V>> rdd() {
+ return rdd;
+ }
+
+ /**
+ * Constructor with existing nemo RDD.
+ *
+ * @param rdd the Nemo rdd to wrap.
+ */
+ SparkJavaPairRDD(final RDD<Tuple2<K, V>> rdd) {
+ super(rdd, ClassTag$.MODULE$.apply(Object.class), ClassTag$.MODULE$.apply(Object.class));
+
+ this.rdd = rdd;
+ }
+
+ /**
+ * @return the spark context.
+ */
+ public SparkContext getSparkContext() {
+ return rdd.sparkContext();
+ }
+
+ /////////////// TRANSFORMATIONS ///////////////
+
+ @Override
+ public SparkJavaPairRDD<K, V> reduceByKey(final Function2<V, V, V> func) {
+ // Explicit conversion
+ final PairRDDFunctions<K, V> pairRdd = RDD.rddToPairRDDFunctions(
+ rdd, ClassTag$.MODULE$.apply(Object.class), ClassTag$.MODULE$.apply(Object.class), null);
+ final RDD<Tuple2<K, V>> reducedRdd = pairRdd.reduceByKey(func);
+ return SparkJavaPairRDD.fromRDD(reducedRdd);
+ }
+
+ @Override
+ public <R> SparkJavaRDD<R> map(final Function<Tuple2<K, V>, R> f) {
+ return rdd.map(f, ClassTag$.MODULE$.apply(Object.class)).toJavaRDD();
+ }
+
+ /////////////// ACTIONS ///////////////
+
+ @Override
+ public List<Tuple2<K, V>> collect() {
+ return rdd.collectAsList();
+ }
+
+ /**
+ * TODO#92: Implement the unimplemented transformations/actions & dataset initialization methods for Spark frontend.
+ */
+ @Override
+ public SparkJavaPairRDD<K, V> cache() {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public SparkJavaPairRDD<K, V> persist(final StorageLevel newLevel) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public SparkJavaPairRDD<K, V> unpersist() {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public SparkJavaPairRDD<K, V> unpersist(final boolean blocking) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public SparkJavaPairRDD<K, V> distinct() {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public SparkJavaPairRDD<K, V> distinct(final int numPartitions) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public SparkJavaPairRDD<K, V> filter(final Function<Tuple2<K, V>, Boolean> f) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public SparkJavaPairRDD<K, V> coalesce(final int numPartitions) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public SparkJavaPairRDD<K, V> coalesce(final int numPartitions,
+ final boolean shuffle) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public SparkJavaPairRDD<K, V> repartition(final int numPartitions) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public SparkJavaPairRDD<K, V> sample(final boolean withReplacement,
+ final double fraction) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public SparkJavaPairRDD<K, V> sample(final boolean withReplacement,
+ final double fraction,
+ final long seed) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public SparkJavaPairRDD<K, V> sampleByKey(final boolean withReplacement,
+ final Map<K, Double> fractions,
+ final long seed) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public SparkJavaPairRDD<K, V> sampleByKey(final boolean withReplacement,
+ final Map<K, Double> fractions) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public SparkJavaPairRDD<K, V> sampleByKeyExact(final boolean withReplacement,
+ final Map<K, Double> fractions,
+ final long seed) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public SparkJavaPairRDD<K, V> sampleByKeyExact(final boolean withReplacement,
+ final Map<K, Double> fractions) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public SparkJavaPairRDD<K, V> union(final org.apache.spark.api.java.JavaPairRDD<K, V> other) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public SparkJavaPairRDD<K, V> intersection(final org.apache.spark.api.java.JavaPairRDD<K, V> other) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public Tuple2<K, V> first() {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public <C> SparkJavaPairRDD<K, C> combineByKey(final Function<V, C> createCombiner,
+ final Function2<C, V, C> mergeValue,
+ final Function2<C, C, C> mergeCombiners,
+ final Partitioner partitioner,
+ final boolean mapSideCombine,
+ final Serializer serializer) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public <C> SparkJavaPairRDD<K, C> combineByKey(final Function<V, C> createCombiner,
+ final Function2<C, V, C> mergeValue,
+ final Function2<C, C, C> mergeCombiners,
+ final Partitioner partitioner) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public <C> SparkJavaPairRDD<K, C> combineByKey(final Function<V, C> createCombiner,
+ final Function2<C, V, C> mergeValue,
+ final Function2<C, C, C> mergeCombiners,
+ final int numPartitions) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public SparkJavaPairRDD<K, V> reduceByKey(final Partitioner partitioner,
+ final Function2<V, V, V> func) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public Map<K, V> reduceByKeyLocally(final Function2<V, V, V> func) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public Map<K, Long> countByKey() {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public PartialResult<Map<K, BoundedDouble>> countByKeyApprox(final long timeout) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public PartialResult<Map<K, BoundedDouble>> countByKeyApprox(final long timeout,
+ final double confidence) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public double countByKeyApprox$default$2() {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public <U> SparkJavaPairRDD<K, U> aggregateByKey(final U zeroValue,
+ final Partitioner partitioner,
+ final Function2<U, V, U> seqFunc,
+ final Function2<U, U, U> combFunc) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public <U> SparkJavaPairRDD<K, U> aggregateByKey(final U zeroValue,
+ final int numPartitions,
+ final Function2<U, V, U> seqFunc,
+ final Function2<U, U, U> combFunc) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public <U> SparkJavaPairRDD<K, U> aggregateByKey(final U zeroValue,
+ final Function2<U, V, U> seqFunc,
+ final Function2<U, U, U> combFunc) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public SparkJavaPairRDD<K, V> foldByKey(final V zeroValue,
+ final Partitioner partitioner,
+ final Function2<V, V, V> func) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public SparkJavaPairRDD<K, V> foldByKey(final V zeroValue,
+ final int numPartitions,
+ final Function2<V, V, V> func) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public SparkJavaPairRDD<K, V> foldByKey(final V zeroValue,
+ final Function2<V, V, V> func) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public SparkJavaPairRDD<K, V> reduceByKey(final Function2<V, V, V> func,
+ final int numPartitions) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public SparkJavaPairRDD<K, Iterable<V>> groupByKey(final Partitioner partitioner) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public SparkJavaPairRDD<K, Iterable<V>> groupByKey(final int numPartitions) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public SparkJavaPairRDD<K, V> subtract(final org.apache.spark.api.java.JavaPairRDD<K, V> other) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public SparkJavaPairRDD<K, V> subtract(final org.apache.spark.api.java.JavaPairRDD<K, V> other,
+ final int numPartitions) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public SparkJavaPairRDD<K, V> subtract(final org.apache.spark.api.java.JavaPairRDD<K, V> other,
+ final Partitioner p) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public <W> SparkJavaPairRDD<K, V> subtractByKey(final org.apache.spark.api.java.JavaPairRDD<K, W> other) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public <W> SparkJavaPairRDD<K, V> subtractByKey(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
+ final int numPartitions) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public <W> SparkJavaPairRDD<K, V> subtractByKey(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
+ final Partitioner p) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public SparkJavaPairRDD<K, V> partitionBy(final Partitioner partitioner) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public <W> SparkJavaPairRDD<K, Tuple2<V, W>> join(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
+ final Partitioner partitioner) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public <W> SparkJavaPairRDD<K, Tuple2<V, Optional<W>>>
+ leftOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
+ final Partitioner partitioner) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public <W> SparkJavaPairRDD<K, Tuple2<Optional<V>, W>>
+ rightOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
+ final Partitioner partitioner) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public <W> SparkJavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>>
+ fullOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
+ final Partitioner partitioner) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public <C> SparkJavaPairRDD<K, C> combineByKey(final Function<V, C> createCombiner,
+ final Function2<C, V, C> mergeValue,
+ final Function2<C, C, C> mergeCombiners) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public org.apache.spark.api.java.JavaPairRDD<K, Iterable<V>> groupByKey() {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public <W> SparkJavaPairRDD<K, Tuple2<V, W>> join(final org.apache.spark.api.java.JavaPairRDD<K, W> other) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public <W> SparkJavaPairRDD<K, Tuple2<V, W>> join(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
+ final int numPartitions) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public <W> SparkJavaPairRDD<K, Tuple2<V, Optional<W>>>
+ leftOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public <W> SparkJavaPairRDD<K, Tuple2<V, Optional<W>>>
+ leftOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
+ final int numPartitions) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public <W> SparkJavaPairRDD<K, Tuple2<Optional<V>, W>>
+ rightOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public <W> SparkJavaPairRDD<K, Tuple2<Optional<V>, W>>
+ rightOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
+ final int numPartitions) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public <W> SparkJavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>>
+ fullOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public <W> SparkJavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>>
+ fullOuterJoin(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
+ final int numPartitions) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public Map<K, V> collectAsMap() {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public <U> SparkJavaPairRDD<K, U> mapValues(final Function<V, U> f) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public <U> SparkJavaPairRDD<K, U> flatMapValues(final Function<V, Iterable<U>> f) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public <W> SparkJavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>>
+ cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
+ final Partitioner partitioner) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public <W1, W2> SparkJavaPairRDD<K, Tuple3<Iterable<V>, Iterable<W1>, Iterable<W2>>>
+ cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W1> other1,
+ final org.apache.spark.api.java.JavaPairRDD<K, W2> other2,
+ final Partitioner partitioner) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public <W1, W2, W3> SparkJavaPairRDD<K, Tuple4<Iterable<V>, Iterable<W1>, Iterable<W2>, Iterable<W3>>>
+ cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W1> other1,
+ final org.apache.spark.api.java.JavaPairRDD<K, W2> other2,
+ final org.apache.spark.api.java.JavaPairRDD<K, W3> other3,
+ final Partitioner partitioner) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public <W> SparkJavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>>
+ cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W> other) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public <W1, W2> SparkJavaPairRDD<K, Tuple3<Iterable<V>, Iterable<W1>, Iterable<W2>>>
+ cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W1> other1,
+ final org.apache.spark.api.java.JavaPairRDD<K, W2> other2) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public <W1, W2, W3> SparkJavaPairRDD<K, Tuple4<Iterable<V>, Iterable<W1>, Iterable<W2>, Iterable<W3>>>
+ cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W1> other1,
+ final org.apache.spark.api.java.JavaPairRDD<K, W2> other2,
+ final org.apache.spark.api.java.JavaPairRDD<K, W3> other3) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public <W> SparkJavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>>
+ cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W> other,
+ final int numPartitions) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public <W1, W2> SparkJavaPairRDD<K, Tuple3<Iterable<V>, Iterable<W1>, Iterable<W2>>>
+ cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W1> other1,
+ final org.apache.spark.api.java.JavaPairRDD<K, W2> other2,
+ final int numPartitions) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public <W1, W2, W3> SparkJavaPairRDD<K, Tuple4<Iterable<V>, Iterable<W1>, Iterable<W2>, Iterable<W3>>>
+ cogroup(final org.apache.spark.api.java.JavaPairRDD<K, W1> other1,
+ final org.apache.spark.api.java.JavaPairRDD<K, W2> other2,
+ final org.apache.spark.api.java.JavaPairRDD<K, W3> other3,
+ final int numPartitions) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public <W> SparkJavaPairRDD<K, Tuple2<Iterable<V>, Iterable<W>>>
+ groupWith(final org.apache.spark.api.java.JavaPairRDD<K, W> other) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public <W1, W2> SparkJavaPairRDD<K, Tuple3<Iterable<V>, Iterable<W1>, Iterable<W2>>>
+ groupWith(final org.apache.spark.api.java.JavaPairRDD<K, W1> other1,
+ final org.apache.spark.api.java.JavaPairRDD<K, W2> other2) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public <W1, W2, W3> SparkJavaPairRDD<K, Tuple4<Iterable<V>, Iterable<W1>, Iterable<W2>, Iterable<W3>>>
+ groupWith(final org.apache.spark.api.java.JavaPairRDD<K, W1> other1,
+ final org.apache.spark.api.java.JavaPairRDD<K, W2> other2,
+ final org.apache.spark.api.java.JavaPairRDD<K, W3> other3) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public List<V> lookup(final K key) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public void saveAsHadoopDataset(final JobConf conf) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public SparkJavaPairRDD<K, V> repartitionAndSortWithinPartitions(final Partitioner partitioner) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public SparkJavaPairRDD<K, V> repartitionAndSortWithinPartitions(final Partitioner partitioner,
+ final Comparator<K> comp) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public SparkJavaPairRDD<K, V> sortByKey() {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public SparkJavaPairRDD<K, V> sortByKey(final boolean ascending) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public SparkJavaPairRDD<K, V> sortByKey(final boolean ascending,
+ final int numPartitions) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public SparkJavaPairRDD<K, V> sortByKey(final Comparator<K> comp) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public SparkJavaPairRDD<K, V> sortByKey(final Comparator<K> comp,
+ final boolean ascending) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public SparkJavaPairRDD<K, V> sortByKey(final Comparator<K> comp,
+ final boolean ascending,
+ final int numPartitions) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public SparkJavaRDD<K> keys() {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public SparkJavaRDD<V> values() {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public SparkJavaPairRDD<K, Long> countApproxDistinctByKey(final double relativeSD,
+ final Partitioner partitioner) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public SparkJavaPairRDD<K, Long> countApproxDistinctByKey(final double relativeSD,
+ final int numPartitions) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public SparkJavaPairRDD<K, Long> countApproxDistinctByKey(final double relativeSD) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+
+ @Override
+ public SparkJavaPairRDD<K, V> setName(final String name) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
+ }
+}
diff --git a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/rdd/JavaRDD.java b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/rdd/SparkJavaRDD.java
similarity index 52%
rename from compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/rdd/JavaRDD.java
rename to compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/rdd/SparkJavaRDD.java
index 69d2571..9761a17 100644
--- a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/rdd/JavaRDD.java
+++ b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/core/rdd/SparkJavaRDD.java
@@ -54,9 +54,10 @@ import java.util.concurrent.atomic.AtomicInteger;
*
* @param <T> type of the final element.
*/
-public final class JavaRDD<T> extends org.apache.spark.api.java.JavaRDD<T> {
+public final class SparkJavaRDD<T> extends org.apache.spark.api.java.JavaRDD<T> {
private final RDD<T> rdd;
+ private static final String NOT_YET_SUPPORTED = "Operation not yet supported.";
/**
* Static method to create a RDD object from an iterable object.
@@ -65,11 +66,11 @@ public final class JavaRDD<T> extends org.apache.spark.api.java.JavaRDD<T> {
* @param initialData initial data.
* @param parallelism parallelism information.
* @param <T> type of the resulting object.
- * @return the new JavaRDD object.
+ * @return the new SparkJavaRDD object.
*/
- public static <T> JavaRDD<T> of(final SparkContext sparkContext,
- final Iterable<T> initialData,
- final Integer parallelism) {
+ public static <T> SparkJavaRDD<T> of(final SparkContext sparkContext,
+ final Iterable<T> initialData,
+ final Integer parallelism) {
final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
final IRVertex initializedSourceVertex = new InMemorySourceVertex<>(initialData);
@@ -79,20 +80,20 @@ public final class JavaRDD<T> extends org.apache.spark.api.java.JavaRDD<T> {
final RDD<T> nemoRdd = new RDD<>(sparkContext, builder.buildWithoutSourceSinkCheck(),
initializedSourceVertex, Option.empty(), ClassTag$.MODULE$.apply(Object.class));
- return new JavaRDD<>(nemoRdd);
+ return new SparkJavaRDD<>(nemoRdd);
}
/**
- * Static method to create a JavaRDD object from an text file.
+ * Static method to create a SparkJavaRDD object from an text file.
*
* @param sparkContext the spark context containing configurations.
* @param minPartitions the minimum number of partitions.
* @param inputPath the path of the input text file.
- * @return the new JavaRDD object
+ * @return the new SparkJavaRDD object
*/
- public static JavaRDD<String> of(final SparkContext sparkContext,
- final int minPartitions,
- final String inputPath) {
+ public static SparkJavaRDD<String> of(final SparkContext sparkContext,
+ final int minPartitions,
+ final String inputPath) {
final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
final org.apache.spark.rdd.RDD<String> textRdd = sparkContext.textFile(inputPath, minPartitions);
@@ -101,19 +102,19 @@ public final class JavaRDD<T> extends org.apache.spark.api.java.JavaRDD<T> {
textSourceVertex.setProperty(ParallelismProperty.of(numPartitions));
builder.addVertex(textSourceVertex);
- return new JavaRDD<>(textRdd, sparkContext, builder.buildWithoutSourceSinkCheck(), textSourceVertex);
+ return new SparkJavaRDD<>(textRdd, sparkContext, builder.buildWithoutSourceSinkCheck(), textSourceVertex);
}
/**
- * Static method to create a JavaRDD object from a Dataset.
+ * Static method to create a SparkJavaRDD object from a Dataset.
*
* @param sparkSession spark session containing configurations.
* @param dataset dataset to read initial data from.
* @param <T> type of the resulting object.
- * @return the new JavaRDD object.
+ * @return the new SparkJavaRDD object.
*/
- public static <T> JavaRDD<T> of(final SparkSession sparkSession,
- final Dataset<T> dataset) {
+ public static <T> SparkJavaRDD<T> of(final SparkSession sparkSession,
+ final Dataset<T> dataset) {
final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
final IRVertex sparkBoundedSourceVertex = new SparkDatasetBoundedSourceVertex<>(sparkSession, dataset);
@@ -121,23 +122,23 @@ public final class JavaRDD<T> extends org.apache.spark.api.java.JavaRDD<T> {
sparkBoundedSourceVertex.setProperty(ParallelismProperty.of(sparkRDD.getNumPartitions()));
builder.addVertex(sparkBoundedSourceVertex);
- return new JavaRDD<>(
+ return new SparkJavaRDD<>(
sparkRDD, sparkSession.sparkContext(), builder.buildWithoutSourceSinkCheck(), sparkBoundedSourceVertex);
}
/**
- * Static method to create a JavaRDD object from {@link RDD}.
+ * Static method to create a SparkJavaRDD object from {@link RDD}.
*
* @param rddFrom the RDD to parse.
* @param <T> type of the resulting object.
- * @return the parsed JavaRDD object.
+ * @return the parsed SparkJavaRDD object.
*/
- public static <T> JavaRDD<T> fromRDD(final RDD<T> rddFrom) {
- return new JavaRDD<>(rddFrom);
+ public static <T> SparkJavaRDD<T> fromRDD(final RDD<T> rddFrom) {
+ return new SparkJavaRDD<>(rddFrom);
}
@Override
- public JavaRDD<T> wrapRDD(final org.apache.spark.rdd.RDD<T> rddFrom) {
+ public SparkJavaRDD<T> wrapRDD(final org.apache.spark.rdd.RDD<T> rddFrom) {
if (!(rddFrom instanceof RDD)) {
throw new UnsupportedOperationException("Cannot wrap Spark RDD as Nemo RDD!");
}
@@ -154,7 +155,7 @@ public final class JavaRDD<T> extends org.apache.spark.api.java.JavaRDD<T> {
*
* @param rdd the Nemo rdd to wrap.
*/
- JavaRDD(final RDD<T> rdd) {
+ SparkJavaRDD(final RDD<T> rdd) {
super(rdd, ClassTag$.MODULE$.apply(Object.class));
this.rdd = rdd;
}
@@ -167,10 +168,10 @@ public final class JavaRDD<T> extends org.apache.spark.api.java.JavaRDD<T> {
* @param dag the IR DAG in construction.
* @param lastVertex the last vertex of the DAG in construction.
*/
- JavaRDD(final org.apache.spark.rdd.RDD<T> sparkRDD,
- final SparkContext sparkContext,
- final DAG<IRVertex, IREdge> dag,
- final IRVertex lastVertex) {
+ SparkJavaRDD(final org.apache.spark.rdd.RDD<T> sparkRDD,
+ final SparkContext sparkContext,
+ final DAG<IRVertex, IREdge> dag,
+ final IRVertex lastVertex) {
super(sparkRDD, ClassTag$.MODULE$.apply(Object.class));
this.rdd = new RDD<>(sparkContext, dag, lastVertex, Option.apply(sparkRDD), ClassTag$.MODULE$.apply(Object.class));
@@ -183,10 +184,10 @@ public final class JavaRDD<T> extends org.apache.spark.api.java.JavaRDD<T> {
*
* @param func function to apply.
* @param <O> output type.
- * @return the JavaRDD with the extended DAG.
+ * @return the SparkJavaRDD with the extended DAG.
*/
@Override
- public <O> JavaRDD<O> map(final Function<T, O> func) {
+ public <O> SparkJavaRDD<O> map(final Function<T, O> func) {
return rdd.map(func, ClassTag$.MODULE$.apply(Object.class)).toJavaRDD();
}
@@ -195,10 +196,10 @@ public final class JavaRDD<T> extends org.apache.spark.api.java.JavaRDD<T> {
*
* @param func function to apply.
* @param <O> output type.
- * @return the JavaRDD with the extended DAG.
+ * @return the SparkJavaRDD with the extended DAG.
*/
@Override
- public <O> JavaRDD<O> flatMap(final FlatMapFunction<T, O> func) {
+ public <O> SparkJavaRDD<O> flatMap(final FlatMapFunction<T, O> func) {
return rdd.flatMap(func, ClassTag$.MODULE$.apply(Object.class)).toJavaRDD();
}
@@ -208,10 +209,10 @@ public final class JavaRDD<T> extends org.apache.spark.api.java.JavaRDD<T> {
* @see org.apache.spark.api.java.JavaRDD#mapToPair : PairFunction.
*/
@Override
- public <K2, V2> JavaPairRDD<K2, V2> mapToPair(final PairFunction<T, K2, V2> f) {
+ public <K2, V2> SparkJavaPairRDD<K2, V2> mapToPair(final PairFunction<T, K2, V2> f) {
final RDD<Tuple2<K2, V2>> pairRdd =
rdd.map(SparkFrontendUtils.pairFunctionToPlainFunction(f), ClassTag$.MODULE$.apply(Object.class));
- return JavaPairRDD.fromRDD(pairRdd);
+ return SparkJavaPairRDD.fromRDD(pairRdd);
}
/////////////// ACTIONS ///////////////
@@ -251,386 +252,385 @@ public final class JavaRDD<T> extends org.apache.spark.api.java.JavaRDD<T> {
/////////////// CACHING ///////////////
@Override
- public JavaRDD<T> persist(final StorageLevel newLevel) {
+ public SparkJavaRDD<T> persist(final StorageLevel newLevel) {
return rdd.persist(newLevel).toJavaRDD();
}
@Override
- public JavaRDD<T> cache() {
+ public SparkJavaRDD<T> cache() {
return rdd.cache().toJavaRDD();
}
- /////////////// UNSUPPORTED TRANSFORMATIONS ///////////////
- //TODO#92: Implement the unimplemented transformations/actions & dataset initialization methods for Spark frontend.
+ /**
+ * TODO#92: Implement the unimplemented transformations/actions & dataset initialization methods for Spark frontend.
+ */
@Override
- public JavaRDD<T> coalesce(final int numPartitions) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ public SparkJavaRDD<T> coalesce(final int numPartitions) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
- public JavaRDD<T> coalesce(final int numPartitions, final boolean shuffle) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ public SparkJavaRDD<T> coalesce(final int numPartitions, final boolean shuffle) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
- public JavaRDD<T> distinct() {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ public SparkJavaRDD<T> distinct() {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
- public JavaRDD<T> distinct(final int numPartitions) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ public SparkJavaRDD<T> distinct(final int numPartitions) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
- public JavaRDD<T> filter(final Function<T, Boolean> f) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ public SparkJavaRDD<T> filter(final Function<T, Boolean> f) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
- public JavaRDD<List<T>> glom() {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ public SparkJavaRDD<List<T>> glom() {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
- public <U> JavaRDD<U> mapPartitions(final FlatMapFunction<Iterator<T>, U> f) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ public <U> SparkJavaRDD<U> mapPartitions(final FlatMapFunction<Iterator<T>, U> f) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
- public <U> JavaRDD<U> mapPartitions(final FlatMapFunction<Iterator<T>, U> f, final boolean preservesPartitioning) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ public <U> SparkJavaRDD<U> mapPartitions(final FlatMapFunction<Iterator<T>, U> f,
+ final boolean preservesPartitioning) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
- public <R> JavaRDD<R> mapPartitionsWithIndex(final Function2<Integer, Iterator<T>, Iterator<R>> f,
- final boolean preservesPartitioning) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ public <R> SparkJavaRDD<R> mapPartitionsWithIndex(final Function2<Integer, Iterator<T>, Iterator<R>> f,
+ final boolean preservesPartitioning) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
- public JavaRDD<T>[] randomSplit(final double[] weights) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ public SparkJavaRDD<T>[] randomSplit(final double[] weights) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
- public JavaRDD<T>[] randomSplit(final double[] weights, final long seed) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ public SparkJavaRDD<T>[] randomSplit(final double[] weights, final long seed) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
- public JavaRDD<T> repartition(final int numPartitions) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ public SparkJavaRDD<T> repartition(final int numPartitions) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
- public JavaRDD<T> sample(final boolean withReplacement, final double fraction) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ public SparkJavaRDD<T> sample(final boolean withReplacement, final double fraction) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
- public JavaRDD<T> sample(final boolean withReplacement, final double fraction, final long seed) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ public SparkJavaRDD<T> sample(final boolean withReplacement, final double fraction, final long seed) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
- public JavaRDD<T> setName(final String name) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ public SparkJavaRDD<T> setName(final String name) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
- public <S> JavaRDD<T> sortBy(final Function<T, S> f, final boolean ascending, final int numPartitions) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ public <S> SparkJavaRDD<T> sortBy(final Function<T, S> f, final boolean ascending, final int numPartitions) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
- public JavaRDD<T> unpersist() {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ public SparkJavaRDD<T> unpersist() {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
- public JavaRDD<T> unpersist(final boolean blocking) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ public SparkJavaRDD<T> unpersist(final boolean blocking) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
- /////////////// UNSUPPORTED TRANSFORMATION TO PAIR RDD ///////////////
- //TODO#92: Implement the unimplemented transformations/actions & dataset initialization methods for Spark frontend.
-
@Override
- public <K2, V2> JavaPairRDD<K2, V2> flatMapToPair(final PairFlatMapFunction<T, K2, V2> f) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ public <K2, V2> SparkJavaPairRDD<K2, V2> flatMapToPair(final PairFlatMapFunction<T, K2, V2> f) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
- public <U> JavaPairRDD<U, Iterable<T>> groupBy(final Function<T, U> f) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ public <U> SparkJavaPairRDD<U, Iterable<T>> groupBy(final Function<T, U> f) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
- public <U> JavaPairRDD<U, Iterable<T>> groupBy(final Function<T, U> f, final int numPartitions) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ public <U> SparkJavaPairRDD<U, Iterable<T>> groupBy(final Function<T, U> f, final int numPartitions) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
- public <U> JavaPairRDD<U, T> keyBy(final Function<T, U> f) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ public <U> SparkJavaPairRDD<U, T> keyBy(final Function<T, U> f) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
- public <K2, V2> JavaPairRDD<K2, V2> mapPartitionsToPair(final PairFlatMapFunction<Iterator<T>, K2, V2> f) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ public <K2, V2> SparkJavaPairRDD<K2, V2> mapPartitionsToPair(final PairFlatMapFunction<Iterator<T>, K2, V2> f) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
- public <K2, V2> JavaPairRDD<K2, V2> mapPartitionsToPair(final PairFlatMapFunction<java.util.Iterator<T>, K2, V2> f,
- final boolean preservesPartitioning) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ public <K2, V2> SparkJavaPairRDD<K2, V2> mapPartitionsToPair(
+ final PairFlatMapFunction<java.util.Iterator<T>, K2, V2> f,
+ final boolean preservesPartitioning) {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
- public JavaPairRDD<T, Long> zipWithIndex() {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ public SparkJavaPairRDD<T, Long> zipWithIndex() {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
- public JavaPairRDD<T, Long> zipWithUniqueId() {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ public SparkJavaPairRDD<T, Long> zipWithUniqueId() {
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
- /////////////// UNSUPPORTED ACTIONS ///////////////
- //TODO#92: Implement the unimplemented transformations/actions & dataset initialization methods for Spark frontend.
+ /////////////// ACTIONS ///////////////
@Override
public <U> U aggregate(final U zeroValue, final Function2<U, T, U> seqOp, final Function2<U, U, U> combOp) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public void checkpoint() {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public JavaFutureAction<List<T>> collectAsync() {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public List<T>[] collectPartitions(final int[] partitionIds) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public long count() {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public PartialResult<BoundedDouble> countApprox(final long timeout) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public PartialResult<BoundedDouble> countApprox(final long timeout, final double confidence) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public long countApproxDistinct(final double relativeSD) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public JavaFutureAction<Long> countAsync() {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public Map<T, Long> countByValue() {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public PartialResult<Map<T, BoundedDouble>> countByValueApprox(final long timeout) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public PartialResult<Map<T, BoundedDouble>> countByValueApprox(final long timeout, final double confidence) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public T first() {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public T fold(final T zeroValue, final Function2<T, T, T> f) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public void foreach(final VoidFunction<T> f) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public JavaFutureAction<Void> foreachAsync(final VoidFunction<T> f) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public void foreachPartition(final VoidFunction<Iterator<T>> f) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public JavaFutureAction<Void> foreachPartitionAsync(final VoidFunction<Iterator<T>> f) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public Optional<String> getCheckpointFile() {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public int getNumPartitions() {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public StorageLevel getStorageLevel() {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public int id() {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public boolean isCheckpointed() {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public boolean isEmpty() {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public Iterator<T> iterator(final Partition split, final TaskContext taskContext) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public T max(final Comparator<T> comp) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public T min(final Comparator<T> comp) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public String name() {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public org.apache.spark.api.java.Optional<Partitioner> partitioner() {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public List<Partition> partitions() {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public void saveAsObjectFile(final String path) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public void saveAsTextFile(final String path,
final Class<? extends org.apache.hadoop.io.compress.CompressionCodec> codec) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public List<T> take(final int num) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public JavaFutureAction<List<T>> takeAsync(final int num) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public List<T> takeOrdered(final int num) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public List<T> takeOrdered(final int num, final Comparator<T> comp) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public List<T> takeSample(final boolean withReplacement, final int num) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public List<T> takeSample(final boolean withReplacement, final int num, final long seed) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public String toDebugString() {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public Iterator<T> toLocalIterator() {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public List<T> top(final int num) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public List<T> top(final int num, final Comparator<T> comp) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public <U> U treeAggregate(final U zeroValue, final Function2<U, T, U> seqOp, final Function2<U, U, U> combOp) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public <U> U treeAggregate(final U zeroValue, final Function2<U, T, U> seqOp,
final Function2<U, U, U> combOp, final int depth) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public T treeReduce(final Function2<T, T, T> f) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
@Override
public T treeReduce(final Function2<T, T, T> f, final int depth) {
- throw new UnsupportedOperationException("Operation not yet implemented.");
+ throw new UnsupportedOperationException(NOT_YET_SUPPORTED);
}
}
diff --git a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/sql/Dataset.java b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/sql/Dataset.java
index 7eb999e..396fa3a 100644
--- a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/sql/Dataset.java
+++ b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/sql/Dataset.java
@@ -18,7 +18,7 @@
*/
package org.apache.nemo.compiler.frontend.spark.sql;
-import org.apache.nemo.compiler.frontend.spark.core.rdd.JavaRDD;
+import org.apache.nemo.compiler.frontend.spark.core.rdd.SparkJavaRDD;
import org.apache.nemo.compiler.frontend.spark.core.rdd.RDD;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapPartitionsFunction;
@@ -69,13 +69,13 @@ public final class Dataset<T> extends org.apache.spark.sql.Dataset<T> implements
* @return the new javaRDD component.
*/
@Override
- public JavaRDD<T> javaRDD() {
+ public SparkJavaRDD<T> javaRDD() {
return this.toJavaRDD();
}
@Override
- public JavaRDD<T> toJavaRDD() {
- return JavaRDD.of((SparkSession) super.sparkSession(), this);
+ public SparkJavaRDD<T> toJavaRDD() {
+ return SparkJavaRDD.of((SparkSession) super.sparkSession(), this);
}
/**
@@ -100,8 +100,8 @@ public final class Dataset<T> extends org.apache.spark.sql.Dataset<T> implements
*/
@Override
public RDD<T> rdd() {
- final JavaRDD<T> javaRDD = JavaRDD.of((SparkSession) super.sparkSession(), this);
- return javaRDD.rdd();
+ final SparkJavaRDD<T> sparkJavaRDD = SparkJavaRDD.of((SparkSession) super.sparkSession(), this);
+ return sparkJavaRDD.rdd();
}
@Override
diff --git a/compiler/frontend/spark/src/main/scala/org/apache/nemo/compiler/frontend/spark/core/rdd/RDD.scala b/compiler/frontend/spark/src/main/scala/org/apache/nemo/compiler/frontend/spark/core/rdd/RDD.scala
index 758e962..957ff31 100644
--- a/compiler/frontend/spark/src/main/scala/org/apache/nemo/compiler/frontend/spark/core/rdd/RDD.scala
+++ b/compiler/frontend/spark/src/main/scala/org/apache/nemo/compiler/frontend/spark/core/rdd/RDD.scala
@@ -81,10 +81,10 @@ final class RDD[T: ClassTag] protected[rdd] (
}
/**
- * @return converted JavaRDD.
+ * @return converted SparkJavaRDD.
*/
- override def toJavaRDD() : JavaRDD[T] = {
- new JavaRDD[T](this)
+ override def toJavaRDD() : SparkJavaRDD[T] = {
+ new SparkJavaRDD[T](this)
}
/**
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LambdaPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LambdaPass.java
index b6342e2..67353e8 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LambdaPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/LambdaPass.java
@@ -34,9 +34,8 @@ public final class LambdaPass extends AnnotatingPass {
@Override
public IRDAG apply(final IRDAG dag) {
- dag.getVertices().forEach(vertex -> {
- vertex.setPropertyPermanently(ResourceLambdaProperty.of(ResourceLambdaProperty.Value.ON));
- });
+ dag.getVertices().forEach(vertex ->
+ vertex.setPropertyPermanently(ResourceLambdaProperty.of(ResourceLambdaProperty.Value.ON)));
return dag;
}
}
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/backend/nemo/DAGConverterTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/backend/nemo/DAGConverterTest.java
index 373eb16..803b5ee 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/backend/nemo/DAGConverterTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/backend/nemo/DAGConverterTest.java
@@ -96,7 +96,7 @@ public final class DAGConverterTest {
final List<Stage> sortedPhysicalDAG = physicalDAG.getTopologicalSort();
final Stage physicalStage1 = sortedPhysicalDAG.get(0);
final Stage physicalStage2 = sortedPhysicalDAG.get(1);
- assertEquals(physicalDAG.getVertices().size(), 2);
+ assertEquals(2, physicalDAG.getVertices().size());
assertEquals(0, physicalDAG.getIncomingEdgesOf(physicalStage1).size());
assertEquals(1, physicalDAG.getIncomingEdgesOf(physicalStage2).size());
assertEquals(1, physicalDAG.getOutgoingEdgesOf(physicalStage1).size());
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java
index d917310..5b878d9 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java
@@ -329,7 +329,7 @@ public final class AlternatingLeastSquare {
* @throws Exception Exception on the way.
*/
@ProcessElement
- public void processElement(final ProcessContext c) throws Exception {
+ public void processElement(final ProcessContext c) {
final float[] result = new float[numFeatures];
final KV<Integer, KV<int[], float[]>> element = c.element();
@@ -356,7 +356,7 @@ public final class AlternatingLeastSquare {
* @param args arguments.
* @throws ClassNotFoundException exception.
*/
- public static void main(final String[] args) throws ClassNotFoundException {
+ public static void main(final String[] args) {
final Long start = System.currentTimeMillis();
LOG.info(Arrays.toString(args));
final String inputFilePath = args[0];
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java
index 7aff8da..04bdefe 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java
@@ -38,8 +38,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.nemo.common.exception.DataSourceException;
import java.io.IOException;
import java.util.UUID;
@@ -125,12 +124,10 @@ final class GenericSourceSink {
* Write output to HDFS according to the parallelism.
*/
final class HDFSWrite extends DoFn<String, Void> {
- private static final Logger LOG = LoggerFactory.getLogger(HDFSWrite.class.getName());
-
private final String path;
- private Path fileName;
- private FileSystem fileSystem;
- private FSDataOutputStream outputStream;
+ private transient Path fileName;
+ private transient FileSystem fileSystem;
+ private transient FSDataOutputStream outputStream;
/**
* Constructor.
@@ -145,17 +142,17 @@ final class HDFSWrite extends DoFn<String, Void> {
* Writes to exactly one file.
* (The number of total output files are determined according to the parallelism.)
* i.e. if parallelism is 2, then there are total 2 output files.
+ * TODO #273: Our custom HDFSWrite should implement WriteOperation
*/
@Setup
public void setup() {
// Creating a side-effect in Setup is discouraged, but we do it anyways for now as we're extending DoFn.
- // TODO #273: Our custom HDFSWrite should implement WriteOperation
fileName = new Path(path + UUID.randomUUID().toString());
try {
fileSystem = fileName.getFileSystem(new JobConf());
outputStream = fileSystem.create(fileName, false);
} catch (IOException e) {
- throw new RuntimeException(e);
+ throw new DataSourceException(new Exception("File system setup failed " + e));
}
}
@@ -166,13 +163,13 @@ final class HDFSWrite extends DoFn<String, Void> {
* @throws Exception exception.
*/
@ProcessElement
- public void processElement(final ProcessContext c) throws Exception {
+ public void processElement(final ProcessContext c) throws DataSourceException, IOException {
try {
outputStream.writeBytes(c.element() + "\n");
} catch (Exception e) {
outputStream.close();
fileSystem.delete(fileName, true);
- throw new RuntimeException(e);
+ throw new DataSourceException(new Exception("Processing data from source failed " + e));
}
}
diff --git a/examples/spark/src/main/java/org/apache/nemo/examples/spark/JavaMapReduce.java b/examples/spark/src/main/java/org/apache/nemo/examples/spark/JavaMapReduce.java
index 2e0ba47..0fd5b0e 100644
--- a/examples/spark/src/main/java/org/apache/nemo/examples/spark/JavaMapReduce.java
+++ b/examples/spark/src/main/java/org/apache/nemo/examples/spark/JavaMapReduce.java
@@ -19,8 +19,8 @@
package org.apache.nemo.examples.spark;
import org.apache.nemo.compiler.frontend.spark.core.JavaSparkContext;
-import org.apache.nemo.compiler.frontend.spark.core.rdd.JavaPairRDD;
-import org.apache.nemo.compiler.frontend.spark.core.rdd.JavaRDD;
+import org.apache.nemo.compiler.frontend.spark.core.rdd.SparkJavaPairRDD;
+import org.apache.nemo.compiler.frontend.spark.core.rdd.SparkJavaRDD;
import org.apache.nemo.compiler.frontend.spark.sql.SparkSession;
import scala.Tuple2;
@@ -64,15 +64,15 @@ public final class JavaMapReduce {
// Run MR
final JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
- final JavaRDD<String> data = jsc.textFile(input, parallelism);
- final JavaPairRDD<String, Long> documentToCount = data
+ final SparkJavaRDD<String> data = jsc.textFile(input, parallelism);
+ final SparkJavaPairRDD<String, Long> documentToCount = data
.mapToPair(line -> {
final String[] words = line.split(" +");
final String documentId = words[0] + "#" + words[1];
final long count = Long.parseLong(words[2]);
return new Tuple2<>(documentId, count);
});
- final JavaRDD<String> documentToSum = documentToCount
+ final SparkJavaRDD<String> documentToSum = documentToCount
.reduceByKey((i1, i2) -> i1 + i2)
.map(t -> t._1() + ": " + t._2());
documentToSum.saveAsTextFile(output);
diff --git a/examples/spark/src/main/java/org/apache/nemo/examples/spark/JavaSparkPi.java b/examples/spark/src/main/java/org/apache/nemo/examples/spark/JavaSparkPi.java
index 0e716ea..cfc7ad0 100644
--- a/examples/spark/src/main/java/org/apache/nemo/examples/spark/JavaSparkPi.java
+++ b/examples/spark/src/main/java/org/apache/nemo/examples/spark/JavaSparkPi.java
@@ -19,7 +19,7 @@
package org.apache.nemo.examples.spark;
import org.apache.nemo.compiler.frontend.spark.core.JavaSparkContext;
-import org.apache.nemo.compiler.frontend.spark.core.rdd.JavaRDD;
+import org.apache.nemo.compiler.frontend.spark.core.rdd.SparkJavaRDD;
import org.apache.nemo.compiler.frontend.spark.sql.SparkSession;
import java.util.ArrayList;
@@ -58,7 +58,7 @@ public final class JavaSparkPi {
l.add(i);
}
- JavaRDD<Integer> dataSet = jsc.parallelize(l, slices);
+ SparkJavaRDD<Integer> dataSet = jsc.parallelize(l, slices);
int count = dataSet.map(integer -> {
double x = Math.random() * 2 - 1;
diff --git a/examples/spark/src/main/java/org/apache/nemo/examples/spark/JavaWordAndLineCount.java b/examples/spark/src/main/java/org/apache/nemo/examples/spark/JavaWordAndLineCount.java
index 8263331..8d13bfe 100644
--- a/examples/spark/src/main/java/org/apache/nemo/examples/spark/JavaWordAndLineCount.java
+++ b/examples/spark/src/main/java/org/apache/nemo/examples/spark/JavaWordAndLineCount.java
@@ -18,8 +18,8 @@
*/
package org.apache.nemo.examples.spark;
-import org.apache.nemo.compiler.frontend.spark.core.rdd.JavaPairRDD;
-import org.apache.nemo.compiler.frontend.spark.core.rdd.JavaRDD;
+import org.apache.nemo.compiler.frontend.spark.core.rdd.SparkJavaPairRDD;
+import org.apache.nemo.compiler.frontend.spark.core.rdd.SparkJavaRDD;
import org.apache.nemo.compiler.frontend.spark.sql.SparkSession;
import scala.Tuple2;
@@ -60,19 +60,19 @@ public final class JavaWordAndLineCount {
.appName("JavaWordAndLineCount")
.getOrCreate();
- JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
+ SparkJavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
- JavaPairRDD<String, Integer> lineOnes = lines.mapToPair(s -> new Tuple2<>("line count", 1));
+ SparkJavaPairRDD<String, Integer> lineOnes = lines.mapToPair(s -> new Tuple2<>("line count", 1));
- JavaPairRDD<String, Integer> lineCounts = lineOnes.reduceByKey((i1, i2) -> i1 + i2);
+ SparkJavaPairRDD<String, Integer> lineCounts = lineOnes.reduceByKey((i1, i2) -> i1 + i2);
List<Tuple2<String, Integer>> lineOutput = lineCounts.collect();
- JavaRDD<String> words = lines.flatMap(s -> Arrays.asList(SPACE.split(s)).iterator());
+ SparkJavaRDD<String> words = lines.flatMap(s -> Arrays.asList(SPACE.split(s)).iterator());
- JavaPairRDD<String, Integer> wordOnes = words.mapToPair(s -> new Tuple2<>(s, 1));
+ SparkJavaPairRDD<String, Integer> wordOnes = words.mapToPair(s -> new Tuple2<>(s, 1));
- JavaPairRDD<String, Integer> wordCounts = wordOnes.reduceByKey((i1, i2) -> i1 + i2);
+ SparkJavaPairRDD<String, Integer> wordCounts = wordOnes.reduceByKey((i1, i2) -> i1 + i2);
List<Tuple2<String, Integer>> wordOutput = wordCounts.collect();
diff --git a/examples/spark/src/main/java/org/apache/nemo/examples/spark/JavaWordCount.java b/examples/spark/src/main/java/org/apache/nemo/examples/spark/JavaWordCount.java
index ce9cadd..552dae9 100644
--- a/examples/spark/src/main/java/org/apache/nemo/examples/spark/JavaWordCount.java
+++ b/examples/spark/src/main/java/org/apache/nemo/examples/spark/JavaWordCount.java
@@ -18,8 +18,8 @@
*/
package org.apache.nemo.examples.spark;
-import org.apache.nemo.compiler.frontend.spark.core.rdd.JavaPairRDD;
-import org.apache.nemo.compiler.frontend.spark.core.rdd.JavaRDD;
+import org.apache.nemo.compiler.frontend.spark.core.rdd.SparkJavaPairRDD;
+import org.apache.nemo.compiler.frontend.spark.core.rdd.SparkJavaRDD;
import org.apache.nemo.compiler.frontend.spark.sql.SparkSession;
import scala.Tuple2;
@@ -60,13 +60,13 @@ public final class JavaWordCount {
.appName("JavaWordCount")
.getOrCreate();
- JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
+ SparkJavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
- JavaRDD<String> words = lines.flatMap(s -> Arrays.asList(SPACE.split(s)).iterator());
+ SparkJavaRDD<String> words = lines.flatMap(s -> Arrays.asList(SPACE.split(s)).iterator());
- JavaPairRDD<String, Integer> ones = words.mapToPair(s -> new Tuple2<>(s, 1));
+ SparkJavaPairRDD<String, Integer> ones = words.mapToPair(s -> new Tuple2<>(s, 1));
- JavaPairRDD<String, Integer> counts = ones.reduceByKey((i1, i2) -> i1 + i2);
+ SparkJavaPairRDD<String, Integer> counts = ones.reduceByKey((i1, i2) -> i1 + i2);
List<Tuple2<String, Integer>> output = counts.collect();
diff --git a/examples/spark/src/main/java/org/apache/nemo/examples/spark/sql/JavaSparkSQLExample.java b/examples/spark/src/main/java/org/apache/nemo/examples/spark/sql/JavaSparkSQLExample.java
index 8a4004a..92832d9 100644
--- a/examples/spark/src/main/java/org/apache/nemo/examples/spark/sql/JavaSparkSQLExample.java
+++ b/examples/spark/src/main/java/org/apache/nemo/examples/spark/sql/JavaSparkSQLExample.java
@@ -18,7 +18,7 @@
*/
package org.apache.nemo.examples.spark.sql;
-import org.apache.nemo.compiler.frontend.spark.core.rdd.JavaRDD;
+import org.apache.nemo.compiler.frontend.spark.core.rdd.SparkJavaRDD;
import org.apache.nemo.compiler.frontend.spark.sql.Dataset;
import org.apache.nemo.compiler.frontend.spark.sql.SparkSession;
import org.apache.spark.api.java.function.Function;
@@ -44,6 +44,8 @@ import static org.apache.spark.sql.functions.col;
* This code has been copied from the Apache Spark (https://github.com/apache/spark) to demonstrate a spark example.
*/
public final class JavaSparkSQLExample {
+ private static final String PEOPLE = "people";
+ private static final String NAME = "Name: ";
/**
* Private constructor.
@@ -182,7 +184,7 @@ public final class JavaSparkSQLExample {
// +----+-----+
// Register the DataFrame as a SQL temporary view
- df.createOrReplaceTempView("people");
+ df.createOrReplaceTempView(PEOPLE);
Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
sqlDF.show();
@@ -195,7 +197,7 @@ public final class JavaSparkSQLExample {
// +----+-------+
// Register the DataFrame as a global temporary view
- df.createGlobalTempView("people");
+ df.createGlobalTempView(PEOPLE);
// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show();
@@ -274,7 +276,7 @@ public final class JavaSparkSQLExample {
*/
private static void runInferSchemaExample(final SparkSession spark, final String peopleTxt) {
// Create an RDD of Person objects from a text file
- JavaRDD<Person> peopleRDD = spark.read()
+ SparkJavaRDD<Person> peopleRDD = spark.read()
.textFile(peopleTxt)
.javaRDD()
.map(line -> {
@@ -288,7 +290,7 @@ public final class JavaSparkSQLExample {
// Apply a schema to an RDD of JavaBeans to get a DataFrame
Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);
// Register the DataFrame as a temporary view
- peopleDF.createOrReplaceTempView("people");
+ peopleDF.createOrReplaceTempView(PEOPLE);
// SQL statements can be run by using the sql methods provided by spark
Dataset<Row> teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
@@ -296,7 +298,7 @@ public final class JavaSparkSQLExample {
// The columns of a row in the result can be accessed by field index
Encoder<String> stringEncoder = Encoders.STRING();
Dataset<String> teenagerNamesByIndexDF = teenagersDF.map(
- (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
+ (MapFunction<Row, String>) row -> NAME + row.getString(0),
stringEncoder);
teenagerNamesByIndexDF.show();
// +------------+
@@ -307,7 +309,7 @@ public final class JavaSparkSQLExample {
// or by field name
Dataset<String> teenagerNamesByFieldDF = teenagersDF.map(
- (MapFunction<Row, String>) row -> "Name: " + row.<String>getAs("name"),
+ (MapFunction<Row, String>) row -> NAME + row.<String>getAs("name"),
stringEncoder);
teenagerNamesByFieldDF.show();
// +------------+
@@ -325,7 +327,7 @@ public final class JavaSparkSQLExample {
*/
private static void runProgrammaticSchemaExample(final SparkSession spark, final String peopleTxt) {
// Create an RDD
- JavaRDD<String> peopleRDD = spark.read()
+ SparkJavaRDD<String> peopleRDD = spark.read()
.textFile(peopleTxt)
.toJavaRDD();
@@ -341,7 +343,7 @@ public final class JavaSparkSQLExample {
StructType schema = DataTypes.createStructType(fields);
// Convert records of the RDD (people) to Rows
- JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> {
+ SparkJavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> {
String[] attributes = record.split(",");
return RowFactory.create(attributes[0], attributes[1].trim());
});
@@ -350,7 +352,7 @@ public final class JavaSparkSQLExample {
Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);
// Creates a temporary view using the DataFrame
- peopleDataFrame.createOrReplaceTempView("people");
+ peopleDataFrame.createOrReplaceTempView(PEOPLE);
// SQL can be run over a temporary view created using DataFrames
Dataset<Row> results = spark.sql("SELECT name FROM people");
@@ -358,7 +360,7 @@ public final class JavaSparkSQLExample {
// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
Dataset<String> namesDS = results.map(
- (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
+ (MapFunction<Row, String>) row -> NAME + row.getString(0),
Encoders.STRING());
namesDS.show();
// +-------------+
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/FileBlock.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/FileBlock.java
index 05c310e..68d1784 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/FileBlock.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/FileBlock.java
@@ -29,11 +29,8 @@ import org.apache.nemo.runtime.executor.data.FileArea;
import org.apache.nemo.runtime.executor.data.metadata.FileMetadata;
import org.apache.nemo.runtime.executor.data.metadata.PartitionMetadata;
import org.apache.nemo.runtime.executor.data.partition.NonSerializedPartition;
-import org.apache.nemo.runtime.executor.data.partition.Partition;
import org.apache.nemo.runtime.executor.data.partition.SerializedPartition;
import org.apache.nemo.runtime.executor.data.streamchainer.Serializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.NotThreadSafe;
@@ -62,7 +59,6 @@ import java.util.Optional;
*/
@NotThreadSafe
public final class FileBlock<K extends Serializable> implements Block<K> {
- private static final Logger LOG = LoggerFactory.getLogger(FileBlock.class.getName());
private final String id;
private final Map<K, SerializedPartition<K>> nonCommittedPartitionsMap;
private final Serializer serializer;
@@ -70,6 +66,7 @@ public final class FileBlock<K extends Serializable> implements Block<K> {
private final FileMetadata<K> metadata;
private final MemoryPoolAssigner memoryPoolAssigner;
private static final String ALREADY_COMMITED = "The partition is already committed!";
+ private static final String CANNOT_RETRIEVE_BEFORE_COMMITED = "Cannot retrieve elements before a block is committed!";
/**
* Constructor.
@@ -127,7 +124,7 @@ public final class FileBlock<K extends Serializable> implements Block<K> {
*/
@Override
public void write(final K key,
- final Object element) throws BlockWriteException {
+ final Object element) {
if (metadata.isCommitted()) {
throw new BlockWriteException(new Throwable(ALREADY_COMMITED));
} else {
@@ -152,8 +149,7 @@ public final class FileBlock<K extends Serializable> implements Block<K> {
* @throws BlockWriteException for any error occurred while trying to write a block.
*/
@Override
- public void writePartitions(final Iterable<NonSerializedPartition<K>> partitions)
- throws BlockWriteException {
+ public void writePartitions(final Iterable<NonSerializedPartition<K>> partitions) {
if (metadata.isCommitted()) {
throw new BlockWriteException(new Throwable(ALREADY_COMMITED));
} else {
@@ -175,8 +171,7 @@ public final class FileBlock<K extends Serializable> implements Block<K> {
* @throws BlockWriteException for any error occurred while trying to write a block.
*/
@Override
- public void writeSerializedPartitions(final Iterable<SerializedPartition<K>> partitions)
- throws BlockWriteException {
+ public void writeSerializedPartitions(final Iterable<SerializedPartition<K>> partitions) {
if (metadata.isCommitted()) {
throw new BlockWriteException(new Throwable(ALREADY_COMMITED));
} else {
@@ -196,9 +191,9 @@ public final class FileBlock<K extends Serializable> implements Block<K> {
* @throws BlockFetchException for any error occurred while trying to fetch a block.
*/
@Override
- public Iterable<NonSerializedPartition<K>> readPartitions(final KeyRange keyRange) throws BlockFetchException {
+ public Iterable<NonSerializedPartition<K>> readPartitions(final KeyRange keyRange) {
if (!metadata.isCommitted()) {
- throw new BlockFetchException(new Throwable("Cannot retrieve elements before a block is committed"));
+ throw new BlockFetchException(new Throwable(CANNOT_RETRIEVE_BEFORE_COMMITED));
} else {
// Deserialize the data
final List<NonSerializedPartition<K>> deserializedPartitions = new ArrayList<>();
@@ -242,9 +237,9 @@ public final class FileBlock<K extends Serializable> implements Block<K> {
* @throws BlockFetchException for any error occurred while trying to fetch a block.
*/
@Override
- public Iterable<SerializedPartition<K>> readSerializedPartitions(final KeyRange keyRange) throws BlockFetchException {
+ public Iterable<SerializedPartition<K>> readSerializedPartitions(final KeyRange keyRange) {
if (!metadata.isCommitted()) {
- throw new BlockFetchException(new Throwable("Cannot retrieve elements before a block is committed"));
+ throw new BlockFetchException(new Throwable(CANNOT_RETRIEVE_BEFORE_COMMITED));
} else {
// Deserialize the data
final List<SerializedPartition<K>> partitionsInRange = new ArrayList<>();
@@ -303,7 +298,7 @@ public final class FileBlock<K extends Serializable> implements Block<K> {
*/
public List<FileArea> asFileAreas(final KeyRange keyRange) throws IOException {
if (!metadata.isCommitted()) {
- throw new IOException("Cannot retrieve elements before a block is committed");
+ throw new IOException(CANNOT_RETRIEVE_BEFORE_COMMITED);
} else {
final List<FileArea> fileAreas = new ArrayList<>();
for (final PartitionMetadata<K> partitionMetadata : metadata.getPartitionMetadataList()) {
@@ -335,7 +330,7 @@ public final class FileBlock<K extends Serializable> implements Block<K> {
* @throws BlockWriteException for any error occurred while trying to write a block.
*/
@Override
- public synchronized Optional<Map<K, Long>> commit() throws BlockWriteException {
+ public synchronized Optional<Map<K, Long>> commit() {
try {
if (!metadata.isCommitted()) {
commitPartitions();
@@ -364,12 +359,12 @@ public final class FileBlock<K extends Serializable> implements Block<K> {
* The committed partitions will be flushed to the storage.
*/
@Override
- public synchronized void commitPartitions() throws BlockWriteException {
+ public synchronized void commitPartitions() {
final List<SerializedPartition<K>> partitions = new ArrayList<>();
try {
- for (final Partition<?, K> partition : nonCommittedPartitionsMap.values()) {
+ for (final SerializedPartition<K> partition : nonCommittedPartitionsMap.values()) {
partition.commit();
- partitions.add((SerializedPartition<K>) partition);
+ partitions.add(partition);
}
writeToFile(partitions);
nonCommittedPartitionsMap.clear();
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicy.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicy.java
index b416445..6083461 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicy.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicy.java
@@ -42,7 +42,7 @@ public final class MinOccupancyFirstSchedulingPolicy implements SchedulingPolicy
public ExecutorRepresenter selectExecutor(final Collection<ExecutorRepresenter> executors, final Task task) {
final OptionalInt minOccupancy =
executors.stream()
- .map(executor -> executor.getNumOfRunningTasks())
+ .map(ExecutorRepresenter::getNumOfRunningTasks)
.mapToInt(i -> i).min();
if (!minOccupancy.isPresent()) {