You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2021/05/18 04:50:52 UTC
[incubator-wayang] 04/08: [WAYANG-28] creation a base of
DataQuantaHackit and extension of test
This is an automated email from the ASF dual-hosted git repository.
bertty pushed a commit to branch debugger
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit 4cdd676f519952aba107e80532e1bba220ea1ddf
Author: Bertty Contreras-Rojas <be...@scalytics.io>
AuthorDate: Fri May 14 08:55:46 2021 -0400
[WAYANG-28] creation a base of DataQuantaHackit and extension of test
---
.../wayang-hackit/wayang-hackit-api/pom.xml | 30 +
.../plugin/hackit/api/DataQuantaHackit.scala | 395 ++++++++++++
.../java/org/apache/wayang/api/JavaApiTest.java | 711 ---------------------
.../test/scala/org/apache/wayang/api/ApiTest.scala | 575 -----------------
.../plugin/hackit/api/ApiExtensionTest.scala | 41 ++
.../wayang-hackit-api/src/test/wayang.properties | 18 +
6 files changed, 484 insertions(+), 1286 deletions(-)
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-api/pom.xml b/wayang-plugins/wayang-hackit/wayang-hackit-api/pom.xml
index 548f9e6..ae95199 100644
--- a/wayang-plugins/wayang-hackit/wayang-hackit-api/pom.xml
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-api/pom.xml
@@ -30,6 +30,11 @@
<artifactId>wayang-hackit-api</artifactId>
+ <properties>
+ <java-module-name>org.apache.wayang.plugin.hackit.api</java-module-name>
+ <spark.version>2.4.0</spark.version>
+ </properties>
+
<dependencyManagement>
<dependencies>
<dependency>
@@ -50,6 +55,31 @@
</dependency>
<dependency>
<groupId>org.apache.wayang</groupId>
+ <artifactId>wayang-api-scala-java_2.11</artifactId>
+ <version>0.6.0-SNAPSHOT</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.wayang</groupId>
+ <artifactId>wayang-spark_${scala.mayor.version}</artifactId>
+ <version>0.6.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.mayor.version}</artifactId>
+ <version>${spark.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.wayang</groupId>
+ <artifactId>wayang-sqlite3</artifactId>
+ <version>0.6.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.wayang</groupId>
<artifactId>wayang-java</artifactId>
<version>0.6.0-SNAPSHOT</version>
<scope>test</scope>
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-api/src/main/scala/org/apache/wayang/plugin/hackit/api/DataQuantaHackit.scala b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/main/scala/org/apache/wayang/plugin/hackit/api/DataQuantaHackit.scala
new file mode 100644
index 0000000..3f76c7b
--- /dev/null
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/main/scala/org/apache/wayang/plugin/hackit/api/DataQuantaHackit.scala
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.wayang.plugin.hackit.api
+
+import org.apache.wayang.api.{PlanBuilder, basicDataUnitType, dataSetType, groupedDataSetType, groupedDataUnitType}
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaCreator, KeyedDataQuanta}
+import org.apache.wayang.basic.data.{Tuple2 => WayangTuple2}
+import org.apache.wayang.basic.function.ProjectionDescriptor
+import org.apache.wayang.basic.operators.{CartesianOperator, CoGroupOperator, CountOperator, DistinctOperator, FilterOperator, FlatMapOperator, GlobalMaterializedGroupOperator, GlobalReduceOperator, IntersectOperator, JoinOperator, MapOperator, MapPartitionsOperator, MaterializedGroupByOperator, ReduceByOperator, SampleOperator, SortOperator, UnionAllOperator, ZipWithIdOperator}
+import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBinaryOperator, SerializableFunction, SerializablePredicate}
+import org.apache.wayang.core.function.{FlatMapDescriptor, MapPartitionsDescriptor, PredicateDescriptor, ReduceDescriptor, TransformationDescriptor}
+import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
+import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator
+import org.apache.wayang.core.plan.wayangplan.{ElementaryOperator, Operator, OutputSlot, WayangPlan}
+
+import java.lang
+import java.lang.{Iterable => JavaIterable}
+import java.util.function.IntUnaryOperator
+import scala.reflect.ClassTag
+
+/**
+ * Represents an intermediate result/data flow edge in a [[WayangPlan]].
+ *
+ * @param operator a unary [[Operator]] that produces this instance
+ * @param ev$1 the data type of the elements in this instance
+ * @param planBuilder keeps track of the [[WayangPlan]] being build
+ */
+class DataQuantaHackit[Out: ClassTag]
+ (override val operator: ElementaryOperator, outputIndex: Int = 0)
+ (implicit override val planBuilder: PlanBuilder)
+ extends DataQuanta[Out](operator, outputIndex) {
+
+ /**
+ * Feed this instance into a [[MapOperator]].
+ *
+ * @param udf a Java 8 lambda expression as UDF for the [[MapOperator]]
+ * @param udfLoad optional [[LoadProfileEstimator]] for the `udf`
+ * @return a new instance representing the [[MapOperator]]'s output
+ */
+ override def mapJava[NewOut: ClassTag](udf: SerializableFunction[Out, NewOut],
+ udfLoad: LoadProfileEstimator = null): DataQuantaHackit[NewOut] = {
+ val lala = new SerializableFunction[Out, NewOut] {
+ override def apply(t: Out): NewOut = {
+ println(t)
+ udf.apply(t)
+ }
+ }
+ val mapOperator = new MapOperator(new TransformationDescriptor(
+ lala, basicDataUnitType[Out], basicDataUnitType[NewOut], udfLoad
+ ))
+ this.connectTo(mapOperator, 0)
+ DataQuantaHackit.wrap[NewOut](mapOperator)
+ }
+
+ /**
+ * Feed this instance into a [[MapPartitionsOperator]].
+ *
+ * @param udf a Java 8 lambda expression as UDF for the [[MapPartitionsOperator]]
+ * @param selectivity selectivity of the UDF
+ * @param udfLoad optional [[LoadProfileEstimator]] for the `udf`
+ * @return a new instance representing the [[MapOperator]]'s output
+ */
+ override def mapPartitionsJava[NewOut: ClassTag](udf: SerializableFunction[JavaIterable[Out], JavaIterable[NewOut]],
+ selectivity: ProbabilisticDoubleInterval = null,
+ udfLoad: LoadProfileEstimator = null): DataQuantaHackit[NewOut] = {
+ val mapOperator = new MapPartitionsOperator(
+ new MapPartitionsDescriptor(udf, basicDataUnitType[Out], basicDataUnitType[NewOut], selectivity, udfLoad)
+ )
+ this.connectTo(mapOperator, 0)
+ DataQuantaHackit.wrap[NewOut](mapOperator)
+ }
+
+ /**
+ * Feed this instance into a [[MapOperator]] with a [[ProjectionDescriptor]].
+ *
+ * @param fieldNames names of the fields to be projected
+ * @return a new instance representing the [[MapOperator]]'s output
+ */
+ override def project[NewOut: ClassTag](fieldNames: Seq[String]): DataQuantaHackit[NewOut] = {
+ val projectionOperator = new MapOperator(
+ new ProjectionDescriptor(basicDataUnitType[Out], basicDataUnitType[NewOut], fieldNames: _*)
+ )
+ this.connectTo(projectionOperator, 0)
+ DataQuantaHackit.wrap[NewOut](projectionOperator)
+ }
+
+ /**
+ * Feed this instance into a [[FilterOperator]].
+ *
+ * @param udf UDF for the [[FilterOperator]]
+ * @param sqlUdf UDF as SQL `WHERE` clause
+ * @param selectivity selectivity of the UDF
+ * @param udfLoad optional [[LoadProfileEstimator]] for the `udf`
+ * @return a new instance representing the [[FilterOperator]]'s output
+ */
+ override def filterJava(udf: SerializablePredicate[Out],
+ sqlUdf: String = null,
+ selectivity: ProbabilisticDoubleInterval = null,
+ udfLoad: LoadProfileEstimator = null): DataQuantaHackit[Out] = {
+ val filterOperator = new FilterOperator(new PredicateDescriptor(
+ udf, this.output.getType.getDataUnitType.toBasicDataUnitType, selectivity, udfLoad
+ ).withSqlImplementation(sqlUdf))
+ this.connectTo(filterOperator, 0)
+ DataQuantaHackit.wrap[Out](filterOperator)
+ }
+
+ /**
+ * Feed this instance into a [[FlatMapOperator]].
+ *
+ * @param udf a Java 8 lambda expression as UDF for the [[FlatMapOperator]]
+ * @param selectivity selectivity of the UDF
+ * @param udfLoad optional [[LoadProfileEstimator]] for the `udf`
+ * @return a new instance representing the [[FlatMapOperator]]'s output
+ */
+ override def flatMapJava[NewOut: ClassTag](udf: SerializableFunction[Out, JavaIterable[NewOut]],
+ selectivity: ProbabilisticDoubleInterval = null,
+ udfLoad: LoadProfileEstimator = null): DataQuantaHackit[NewOut] = {
+ val flatMapOperator = new FlatMapOperator(new FlatMapDescriptor(
+ udf, basicDataUnitType[Out], basicDataUnitType[NewOut], selectivity, udfLoad
+ ))
+ this.connectTo(flatMapOperator, 0)
+ DataQuantaHackit.wrap[NewOut](flatMapOperator)
+ }
+
+
+ /**
+ * Feed this instance into a [[SampleOperator]].
+ *
+ * @param sampleSizeFunction absolute size of the sample as a function of the current iteration number
+ * @param datasetSize optional size of the dataset to be sampled
+ * @param sampleMethod the [[SampleOperator.Methods]] to use for sampling
+ * @return a new instance representing the [[FlatMapOperator]]'s output
+ */
+ override def sampleDynamicJava(sampleSizeFunction: IntUnaryOperator,
+ datasetSize: Long = SampleOperator.UNKNOWN_DATASET_SIZE,
+ seed: Option[Long] = None,
+ sampleMethod: SampleOperator.Methods = SampleOperator.Methods.ANY): DataQuantaHackit[Out] = {
+ if (seed.isEmpty) {
+ val sampleOperator = new SampleOperator(
+ sampleSizeFunction,
+ dataSetType[Out],
+ sampleMethod
+ )
+ sampleOperator.setDatasetSize(datasetSize)
+ this.connectTo(sampleOperator, 0)
+ DataQuantaHackit.wrap[Out](sampleOperator)
+ }
+ else {
+ val sampleOperator = new SampleOperator(
+ sampleSizeFunction,
+ dataSetType[Out],
+ sampleMethod,
+ seed.get
+ )
+ sampleOperator.setDatasetSize(datasetSize)
+ this.connectTo(sampleOperator, 0)
+ DataQuantaHackit.wrap[Out](sampleOperator)
+ }
+ }
+
+ /**
+ * Assigns this instance a key extractor, which enables some key-based operations.
+ *
+ * @see KeyedDataQuanta
+ * @param keyExtractor extracts the key from the [[DataQuantaDefault]]
+ * @return the [[KeyedDataQuanta]]
+ */
+ //TODO validate this implementation
+ override def keyByJava[Key: ClassTag](keyExtractor: SerializableFunction[Out, Key]) : KeyedDataQuanta[Out, Key] = {
+ new KeyedDataQuanta[Out, Key](this, keyExtractor)
+ }
+
+ /**
+ * Feed this instance into a [[ReduceByOperator]].
+ *
+ * @param keyUdf UDF to extract the grouping key from the data quanta
+ * @param udf aggregation UDF for the [[ReduceByOperator]]
+ * @param udfLoad optional [[LoadProfileEstimator]] for the `udf`
+ * @return a new instance representing the [[ReduceByOperator]]'s output
+ */
+ override def reduceByKeyJava[Key: ClassTag](keyUdf: SerializableFunction[Out, Key],
+ udf: SerializableBinaryOperator[Out],
+ udfLoad: LoadProfileEstimator = null)
+ : DataQuantaHackit[Out] = {
+ val reduceByOperator = new ReduceByOperator(
+ new TransformationDescriptor(keyUdf, basicDataUnitType[Out], basicDataUnitType[Key]),
+ new ReduceDescriptor(udf, groupedDataUnitType[Out], basicDataUnitType[Out], udfLoad)
+ )
+ this.connectTo(reduceByOperator, 0)
+ DataQuantaHackit.wrap[Out](reduceByOperator)
+ }
+
+ /**
+ * Feed this instance into a [[MaterializedGroupByOperator]].
+ *
+ * @param keyUdf UDF to extract the grouping key from the data quanta
+ * @param keyUdfLoad optional [[LoadProfileEstimator]] for the `keyUdf`
+ * @return a new instance representing the [[MaterializedGroupByOperator]]'s output
+ */
+ override def groupByKeyJava[Key: ClassTag](keyUdf: SerializableFunction[Out, Key],
+ keyUdfLoad: LoadProfileEstimator = null): DataQuantaHackit[java.lang.Iterable[Out]] = {
+ val groupByOperator = new MaterializedGroupByOperator(
+ new TransformationDescriptor(keyUdf, basicDataUnitType[Out], basicDataUnitType[Key], keyUdfLoad),
+ dataSetType[Out],
+ groupedDataSetType[Out]
+ )
+ this.connectTo(groupByOperator, 0)
+ DataQuantaHackit.wrap[java.lang.Iterable[Out]](groupByOperator)
+ }
+
+ /**
+ * Feed this instance into a [[GlobalReduceOperator]].
+ *
+ * @param udf aggregation UDF for the [[GlobalReduceOperator]]
+ * @param udfLoad optional [[LoadProfileEstimator]] for the `udf`
+ * @return a new instance representing the [[GlobalReduceOperator]]'s output
+ */
+ override def reduceJava(udf: SerializableBinaryOperator[Out],
+ udfLoad: LoadProfileEstimator = null): DataQuantaHackit[Out] = {
+ val globalReduceOperator = new GlobalReduceOperator(
+ new ReduceDescriptor(udf, groupedDataUnitType[Out], basicDataUnitType[Out], udfLoad)
+ )
+ this.connectTo(globalReduceOperator, 0)
+ DataQuantaHackit.wrap[Out](globalReduceOperator)
+ }
+
+ /**
+ * Feeds this and a further instance into a [[JoinOperator]].
+ *
+ * @param thisKeyUdf UDF to extract keys from data quanta in this instance
+ * @param that the other instance
+ * @param thatKeyUdf UDF to extract keys from data quanta from `that` instance
+ * @return a new instance representing the [[JoinOperator]]'s output
+ */
+ override def joinJava[ThatOut: ClassTag, Key: ClassTag](thisKeyUdf: SerializableFunction[Out, Key], that: DataQuanta[ThatOut], thatKeyUdf: SerializableFunction[ThatOut, Key]): DataQuantaHackit[WayangTuple2[Out, ThatOut]] = {
+ require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.")
+ val joinOperator = new JoinOperator(
+ new TransformationDescriptor(thisKeyUdf, basicDataUnitType[Out], basicDataUnitType[Key]),
+ new TransformationDescriptor(thatKeyUdf, basicDataUnitType[ThatOut], basicDataUnitType[Key])
+ )
+ this.connectTo(joinOperator, 0)
+ that.connectTo(joinOperator, 1)
+ DataQuantaHackit.wrap[WayangTuple2[Out, ThatOut]](joinOperator)
+ }
+
+ /**
+ * Feeds this and a further instance into a [[CoGroupOperator]].
+ *
+ * @param thisKeyUdf UDF to extract keys from data quanta in this instance
+ * @param that the other instance
+ * @param thatKeyUdf UDF to extract keys from data quanta from `that` instance
+ * @return a new instance representing the [[CoGroupOperator]]'s output
+ */
+ override def coGroupJava[ThatOut: ClassTag, Key: ClassTag](thisKeyUdf: SerializableFunction[Out, Key], that: DataQuanta[ThatOut], thatKeyUdf: SerializableFunction[ThatOut, Key]): DataQuantaHackit[WayangTuple2[JavaIterable[Out], JavaIterable[ThatOut]]] = {
+ require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.")
+ val coGroupOperator = new CoGroupOperator(
+ new TransformationDescriptor(thisKeyUdf, basicDataUnitType[Out], basicDataUnitType[Key]),
+ new TransformationDescriptor(thatKeyUdf, basicDataUnitType[ThatOut], basicDataUnitType[Key])
+ )
+ this.connectTo(coGroupOperator, 0)
+ that.connectTo(coGroupOperator, 1)
+ DataQuantaHackit.wrap[WayangTuple2[java.lang.Iterable[Out], java.lang.Iterable[ThatOut]]](coGroupOperator)
+ }
+
+
+ /**
+ * Feeds this and a further instance into a [[SortOperator]].
+ *
+ * @param keyUdf UDF to extract key from data quanta in this instance
+ * @return a new instance representing the [[SortOperator]]'s output
+ */
+ override def sortJava[Key: ClassTag]
+ (keyUdf: SerializableFunction[Out, Key])
+ : DataQuantaHackit[Out] = {
+ val sortOperator = new SortOperator(new TransformationDescriptor(
+ keyUdf, basicDataUnitType[Out], basicDataUnitType[Key]))
+ this.connectTo(sortOperator, 0)
+ DataQuantaHackit.wrap[Out](sortOperator)
+ }
+
+
+ /**
+ * Feed this instance into a [[GlobalMaterializedGroupOperator]].
+ *
+ * @return a new instance representing the [[GlobalMaterializedGroupOperator]]'s output
+ */
+ override def group(): DataQuantaHackit[JavaIterable[Out]] = {
+ val groupOperator = new GlobalMaterializedGroupOperator(dataSetType[Out], groupedDataSetType[Out])
+ this.connectTo(groupOperator, 0)
+ DataQuantaHackit.wrap[JavaIterable[Out]](groupOperator)
+ }
+
+ /**
+ * Feed this instance and a further instance into a [[UnionAllOperator]].
+ *
+ * @param that the other instance to union with
+ * @return a new instance representing the [[UnionAllOperator]]'s output
+ */
+ override def union(that: DataQuanta[Out]): DataQuantaHackit[Out] = {
+ require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.")
+ val unionAllOperator = new UnionAllOperator(dataSetType[Out])
+ this.connectTo(unionAllOperator, 0)
+ that.connectTo(unionAllOperator, 1)
+ DataQuantaHackit.wrap[Out](unionAllOperator)
+ }
+
+ /**
+ * Feed this instance and a further instance into a [[IntersectOperator]].
+ *
+ * @param that the other instance to intersect with
+ * @return a new instance representing the [[IntersectOperator]]'s output
+ */
+ override def intersect(that: DataQuanta[Out]): DataQuantaHackit[Out] = {
+ require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.")
+ val intersectOperator = new IntersectOperator(dataSetType[Out])
+ this.connectTo(intersectOperator, 0)
+ that.connectTo(intersectOperator, 1)
+ DataQuantaHackit.wrap[Out](intersectOperator)
+ }
+
+ /**
+ * Feeds this and a further instance into a [[CartesianOperator]].
+ *
+ * @param that the other instance
+ * @return a new instance representing the [[CartesianOperator]]'s output
+ */
+ override def cartesian[ThatOut: ClassTag](that: DataQuanta[ThatOut]): DataQuantaHackit[WayangTuple2[Out, ThatOut]] = {
+ require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.")
+ val cartesianOperator = new CartesianOperator(dataSetType[Out], dataSetType[ThatOut])
+ this.connectTo(cartesianOperator, 0)
+ that.connectTo(cartesianOperator, 1)
+ DataQuantaHackit.wrap[WayangTuple2[Out, ThatOut]](cartesianOperator)
+ }
+
+ /**
+ * Feeds this instance into a [[ZipWithIdOperator]].
+ *
+ * @return a new instance representing the [[ZipWithIdOperator]]'s output
+ */
+ override def zipWithId: DataQuantaHackit[WayangTuple2[lang.Long, Out]] = {
+ val zipWithIdOperator = new ZipWithIdOperator(dataSetType[Out])
+ this.connectTo(zipWithIdOperator, 0)
+ DataQuantaHackit.wrap[WayangTuple2[lang.Long, Out]](zipWithIdOperator)
+ }
+
+ /**
+ * Feeds this instance into a [[DistinctOperator]].
+ *
+ * @return a new instance representing the [[DistinctOperator]]'s output
+ */
+ override def distinct: DataQuantaHackit[Out] = {
+ val distinctOperator = new DistinctOperator(dataSetType[Out])
+ this.connectTo(distinctOperator, 0)
+ DataQuantaHackit.wrap[Out](distinctOperator)
+ }
+
+ /**
+ * Feeds this instance into a [[CountOperator]].
+ *
+ * @return a new instance representing the [[CountOperator]]'s output
+ */
+ override def count: DataQuantaHackit[lang.Long] = {
+ val countOperator = new CountOperator(dataSetType[Out])
+ this.connectTo(countOperator, 0)
+ DataQuantaHackit.wrap[lang.Long](countOperator)
+ }
+}
+
+object DataQuantaHackit extends DataQuantaCreator{
+
+ def wrap[T:ClassTag](operator: ElementaryOperator, outputIndex: Int = 0)(implicit planBuilder: PlanBuilder): DataQuantaHackit[T] = {
+ new DataQuantaHackit[T](operator, outputIndex)
+ }
+
+ def create[T](output: OutputSlot[T])(implicit planBuilder: PlanBuilder): DataQuantaHackit[_] =
+ new DataQuantaHackit(output.getOwner.asInstanceOf[ElementaryOperator], output.getIndex)(ClassTag(output.getType.getDataUnitType.getTypeClass), planBuilder)
+
+}
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/java/org/apache/wayang/api/JavaApiTest.java b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/java/org/apache/wayang/api/JavaApiTest.java
deleted file mode 100644
index aaf1f16..0000000
--- a/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/java/org/apache/wayang/api/JavaApiTest.java
+++ /dev/null
@@ -1,711 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.wayang.api;
-
-import org.apache.wayang.core.api.WayangContext;
-import org.apache.wayang.core.util.WayangCollections;
-import org.apache.wayang.java.Java;
-//import org.apache.wayang.spark.Spark;
-//import org.apache.wayang.sqlite3.Sqlite3;
-//import org.apache.wayang.sqlite3.operators.Sqlite3TableSource;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
-/**
- * Test suite for the Java API.
- */
-public class JavaApiTest {
-
-// private Configuration sqlite3Configuration;
-//
-// @Before
-// public void setUp() throws SQLException, IOException {
-// // Generate test data.
-// this.sqlite3Configuration = new Configuration();
-// File sqlite3dbFile = File.createTempFile("wayang-sqlite3", "db");
-// sqlite3dbFile.deleteOnExit();
-// this.sqlite3Configuration.setProperty("wayang.sqlite3.jdbc.url", "jdbc:sqlite:" + sqlite3dbFile.getAbsolutePath());
-// try (Connection connection = Sqlite3.platform().createDatabaseDescriptor(this.sqlite3Configuration).createJdbcConnection()) {
-// Statement statement = connection.createStatement();
-// statement.addBatch("DROP TABLE IF EXISTS customer;");
-// statement.addBatch("CREATE TABLE customer (name TEXT, age INT);");
-// statement.addBatch("INSERT INTO customer VALUES ('John', 20)");
-// statement.addBatch("INSERT INTO customer VALUES ('Timmy', 16)");
-// statement.addBatch("INSERT INTO customer VALUES ('Evelyn', 35)");
-// statement.executeBatch();
-// }
-// }
-
- @Test
- public void testMapReduce() {
- WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
- JavaPlanBuilder javaPlanBuilder = new JavaPlanBuilder(wayangContext);
-
- List<Integer> inputCollection = Arrays.asList(0, 1, 2, 3, 4);
- Collection<Integer> outputCollection = javaPlanBuilder
- .loadCollection(inputCollection).withName("load numbers")
- .map(i -> i * i).withName("square")
- .reduce((a, b) -> a + b).withName("sum")
- .collect();
-
- Assert.assertEquals(WayangCollections.asSet(1 + 4 + 9 + 16), WayangCollections.asSet(outputCollection));
- }
-
-// @Test
-// public void testMapReduceBy() {
-// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
-// JavaPlanBuilder javaPlanBuilder = new JavaPlanBuilder(wayangContext);
-//
-// List<Integer> inputCollection = Arrays.asList(0, 1, 2, 3, 4);
-// Collection<Integer> outputCollection = javaPlanBuilder
-// .loadCollection(inputCollection).withName("load numbers")
-// .map(i -> i * i).withName("square")
-// .reduceByKey(i -> i & 1, (a, b) -> a + b).withName("sum")
-// .collect();
-//
-// Assert.assertEquals(WayangCollections.asSet(4 + 16, 1 + 9), WayangCollections.asSet(outputCollection));
-// }
-//
-// @Test
-// public void testBroadcast2() {
-// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
-// JavaPlanBuilder javaPlanBuilder = new JavaPlanBuilder(wayangContext);
-//
-// List<Integer> inputCollection = Arrays.asList(0, 1, 2, 3, 4);
-// List<Integer> offsetCollection = Collections.singletonList(-2);
-//
-// LoadCollectionDataQuantaBuilder<Integer> offsetDataQuanta = javaPlanBuilder
-// .loadCollection(offsetCollection)
-// .withName("load offset");
-//
-// Collection<Integer> outputCollection = javaPlanBuilder
-// .loadCollection(inputCollection).withName("load numbers")
-// .map(new AddOffset("offset")).withName("add offset").withBroadcast(offsetDataQuanta, "offset")
-// .collect();
-//
-// Assert.assertEquals(WayangCollections.asSet(-2, -1, 0, 1, 2), WayangCollections.asSet(outputCollection));
-// }
-//
-// @Test
-// public void testCustomOperatorShortCut() {
-// // Set up WayangContext.
-// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
-//
-// final List<Integer> inputValues = WayangArrays.asList(0, 1, 2, 3);
-//
-// // Build and execute a Wayang plan.
-// final Collection<Integer> outputValues = new JavaPlanBuilder(wayangContext)
-// .loadCollection(inputValues).withName("Load input values")
-// .<Integer>customOperator(new JavaMapOperator<>(
-// DataSetType.createDefault(Integer.class),
-// DataSetType.createDefault(Integer.class),
-// new TransformationDescriptor<>(
-// i -> i + 2,
-// Integer.class, Integer.class
-// )
-// )).withName("Add 2")
-// .collect();
-//
-// // Check the outcome.
-// final List<Integer> expectedOutputValues = WayangArrays.asList(2, 3, 4, 5);
-// Assert.assertEquals(WayangCollections.asSet(expectedOutputValues), WayangCollections.asSet(outputValues));
-// }
-//
-// @Test
-// public void testWordCount() {
-// // Set up WayangContext.
-// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
-//
-// final List<String> inputValues = Arrays.asList("Big data is big.", "Is data big data?");
-//
-// // Build and execute a Wayang plan.
-// final Collection<Tuple2<String, Integer>> outputValues = new JavaPlanBuilder(wayangContext)
-// .loadCollection(inputValues).withName("Load input values")
-// .flatMap(line -> Arrays.asList(line.split("\\s+"))).withName("Split words")
-// .map(token -> token.replaceAll("\\W+", "").toLowerCase()).withName("To lower case")
-// .map(word -> new Tuple2<>(word, 1)).withName("Attach counter")
-// .reduceByKey(Tuple2::getField0, (t1, t2) -> new Tuple2<>(t1.field0, t1.field1 + t2.field1)).withName("Sum counters")
-// .collect();
-//
-// // Check the outcome.
-// final Set<Tuple2<String, Integer>> expectedOutputValues = WayangCollections.asSet(
-// new Tuple2<>("big", 3),
-// new Tuple2<>("is", 2),
-// new Tuple2<>("data", 3)
-// );
-// Assert.assertEquals(WayangCollections.asSet(expectedOutputValues), WayangCollections.asSet(outputValues));
-// }
-//
-// @Test
-// public void testWordCountOnSparkAndJava() {
-// // Set up WayangContext.
-// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()).with(Spark.basicPlugin());
-//
-// final List<String> inputValues = Arrays.asList("Big data is big.", "Is data big data?");
-//
-// // Build and execute a Wayang plan.
-// final Collection<Tuple2<String, Integer>> outputValues = new JavaPlanBuilder(wayangContext)
-// .loadCollection(inputValues).withName("Load input values")
-// .flatMap(line -> Arrays.asList(line.split("\\s+"))).withName("Split words")
-// .map(token -> token.replaceAll("\\W+", "").toLowerCase()).withName("To lower case")
-// .map(word -> new Tuple2<>(word, 1)).withName("Attach counter")
-// .reduceByKey(Tuple2::getField0, (t1, t2) -> new Tuple2<>(t1.field0, t1.field1 + t2.field1)).withName("Sum counters")
-// .collect();
-//
-// // Check the outcome.
-// final Set<Tuple2<String, Integer>> expectedOutputValues = WayangCollections.asSet(
-// new Tuple2<>("big", 3),
-// new Tuple2<>("is", 2),
-// new Tuple2<>("data", 3)
-// );
-// Assert.assertEquals(WayangCollections.asSet(expectedOutputValues), WayangCollections.asSet(outputValues));
-// }
-//
-// @Test
-// public void testSample() {
-// // Set up WayangContext.
-// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()).with(Spark.basicPlugin());
-//
-// // Create some input values.
-// final List<Integer> inputValues = WayangArrays.asList(WayangArrays.range(100));
-//
-// // Build and execute a Wayang plan.
-// final Collection<Integer> outputValues = new JavaPlanBuilder(wayangContext)
-// .loadCollection(inputValues).withName("Load input values")
-// .sample(10).withName("Sample")
-// .collect();
-//
-// // Check the outcome.
-// Assert.assertEquals(10, outputValues.size());
-// Assert.assertEquals(10, WayangCollections.asSet(outputValues).size());
-//
-// }
-//
-// @Test
-// public void testDoWhile() {
-// // Set up WayangContext.
-// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
-//
-// // Generate test data.
-// final List<Integer> inputValues = WayangArrays.asList(1, 2);
-//
-// // Build and execute a word count WayangPlan.
-//
-// final Collection<Integer> outputValues = new JavaPlanBuilder(wayangContext)
-// .loadCollection(inputValues).withName("Load input values")
-// .doWhile(
-// values -> values.stream().mapToInt(i -> i).sum() > 100,
-// start -> {
-// final GlobalReduceDataQuantaBuilder<Integer> sum =
-// start.reduce((a, b) -> a + b).withName("sum");
-// return new Tuple<>(
-// start.union(sum).withName("Old+new"),
-// sum
-// );
-// }
-// ).withConditionClass(Integer.class).withName("While <= 100")
-// .collect();
-//
-// Set<Integer> expectedValues = WayangCollections.asSet(1, 2, 3, 6, 12, 24, 48, 96, 192);
-// Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
-// }
-//
-// private static class AddOffset implements FunctionDescriptor.ExtendedSerializableFunction<Integer, Integer> {
-//
-// private final String broadcastName;
-//
-// private int offset;
-//
-// public AddOffset(String broadcastName) {
-// this.broadcastName = broadcastName;
-// }
-//
-// @Override
-// public void open(ExecutionContext ctx) {
-// this.offset = WayangCollections.getSingle(ctx.<Integer>getBroadcast(this.broadcastName));
-// }
-//
-// @Override
-// public Integer apply(Integer input) {
-// return input + this.offset;
-// }
-// }
-//
-// @Test
-// public void testRepeat() {
-// // Set up WayangContext.
-// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
-//
-// // Generate test data.
-// final List<Integer> inputValues = WayangArrays.asList(1, 2);
-//
-// // Build and execute a word count WayangPlan.
-//
-// final Collection<Integer> outputValues = new JavaPlanBuilder(wayangContext)
-// .loadCollection(inputValues).withName("Load input values")
-// .repeat(3, start -> start
-// .reduce((a, b) -> a * b).withName("Multiply")
-// .flatMap(v -> Arrays.asList(v, v + 1)).withName("Duplicate").withOutputClass(Integer.class)
-// ).withName("Repeat 3x")
-// .collect();
-//
-// Set<Integer> expectedValues = WayangCollections.asSet(42, 43);
-// Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
-// }
-//
-// private static class SelectWords implements PredicateDescriptor.ExtendedSerializablePredicate<String> {
-//
-// private final String broadcastName;
-//
-// private Collection<Character> selectors;
-//
-// public SelectWords(String broadcastName) {
-// this.broadcastName = broadcastName;
-// }
-//
-// @Override
-// public void open(ExecutionContext ctx) {
-// this.selectors = ctx.getBroadcast(this.broadcastName);
-// }
-//
-// @Override
-// public boolean test(String word) {
-// return this.selectors.stream().anyMatch(c -> word.indexOf(c) >= 0);
-// }
-// }
-//
-// @Test
-// public void testBroadcast() {
-// // Set up WayangContext.
-// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
-// JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
-//
-// // Generate test data.
-// final List<String> inputValues = Arrays.asList("Hello", "World", "Hi", "Mars");
-// final List<Character> selectors = Arrays.asList('o', 'l');
-//
-// // Execute the job.
-// final DataQuantaBuilder<?, Character> selectorsDataSet = builder.loadCollection(selectors).withName("Load selectors");
-// final Collection<String> outputValues = builder
-// .loadCollection(inputValues).withName("Load input values")
-// .filter(new SelectWords("selectors")).withName("Filter words")
-// .withBroadcast(selectorsDataSet, "selectors")
-// .collect();
-//
-// // Verify the outcome.
-// Set<String> expectedValues = WayangCollections.asSet("Hello", "World");
-// Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
-// }
-//
-// @Test
-// public void testGroupBy() {
-// // Set up WayangContext.
-// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
-// JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
-//
-// // Generate test data.
-// final List<Integer> inputValues = Arrays.asList(1, 2, 3, 4, 5, 7, 8, 9, 10);
-//
-// // Execute the job.
-// final Collection<Double> outputValues = builder
-// .loadCollection(inputValues).withName("Load input values")
-// .groupByKey(i -> i % 2).withName("group odd and even")
-// .map(group -> {
-// List<Integer> sortedGroup = StreamSupport.stream(group.spliterator(), false)
-// .sorted()
-// .collect(Collectors.toList());
-// int sizeDivTwo = sortedGroup.size() / 2;
-// return sortedGroup.size() % 2 == 0 ?
-// (sortedGroup.get(sizeDivTwo - 1) + sortedGroup.get(sizeDivTwo)) / 2d :
-// (double) sortedGroup.get(sizeDivTwo);
-// })
-// .collect();
-//
-// // Verify the outcome.
-// Set<Double> expectedValues = WayangCollections.asSet(5d, 6d);
-// Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
-// }
-//
-// @Test
-// public void testJoin() {
-// // Set up WayangContext.
-// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
-// JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
-//
-// // Generate test data.
-// final List<Tuple2<String, Integer>> inputValues1 = Arrays.asList(
-// new Tuple2<>("Water", 0),
-// new Tuple2<>("Tonic", 5),
-// new Tuple2<>("Juice", 10)
-// );
-// final List<Tuple2<String, String>> inputValues2 = Arrays.asList(
-// new Tuple2<>("Apple juice", "Juice"),
-// new Tuple2<>("Tap water", "Water"),
-// new Tuple2<>("Orange juice", "Juice")
-// );
-//
-// // Execute the job.
-// final LoadCollectionDataQuantaBuilder<Tuple2<String, Integer>> dataQuanta1 = builder.loadCollection(inputValues1);
-// final LoadCollectionDataQuantaBuilder<Tuple2<String, String>> dataQuanta2 = builder.loadCollection(inputValues2);
-// final Collection<Tuple2<String, Integer>> outputValues = dataQuanta1
-// .join(Tuple2::getField0, dataQuanta2, Tuple2::getField1)
-// .map(joinTuple -> new Tuple2<>(joinTuple.getField1().getField0(), joinTuple.getField0().getField1()))
-// .collect();
-//
-// // Verify the outcome.
-// Set<Tuple2<String, Integer>> expectedValues = WayangCollections.asSet(
-// new Tuple2<>("Apple juice", 10),
-// new Tuple2<>("Orange juice", 10),
-// new Tuple2<>("Tap water", 0)
-// );
-// Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
-// }
-//
-// @Test
-// public void testJoinAndAssemble() {
-// // Set up WayangContext.
-// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
-// JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
-//
-// // Generate test data.
-// final List<Tuple2<String, Integer>> inputValues1 = Arrays.asList(
-// new Tuple2<>("Water", 0),
-// new Tuple2<>("Tonic", 5),
-// new Tuple2<>("Juice", 10)
-// );
-// final List<Tuple2<String, String>> inputValues2 = Arrays.asList(
-// new Tuple2<>("Apple juice", "Juice"),
-// new Tuple2<>("Tap water", "Water"),
-// new Tuple2<>("Orange juice", "Juice")
-// );
-//
-// // Execute the job.
-// final LoadCollectionDataQuantaBuilder<Tuple2<String, Integer>> dataQuanta1 = builder.loadCollection(inputValues1);
-// final LoadCollectionDataQuantaBuilder<Tuple2<String, String>> dataQuanta2 = builder.loadCollection(inputValues2);
-// final Collection<Tuple2<String, Integer>> outputValues = dataQuanta1.keyBy(Tuple2::getField0)
-// .join(dataQuanta2.keyBy(Tuple2::getField1))
-// .assemble((val1, val2) -> new Tuple2<>(val2.getField0(), val1.getField1()))
-// .collect();
-//
-// // Verify the outcome.
-// Set<Tuple2<String, Integer>> expectedValues = WayangCollections.asSet(
-// new Tuple2<>("Apple juice", 10),
-// new Tuple2<>("Orange juice", 10),
-// new Tuple2<>("Tap water", 0)
-// );
-// Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
-// }
-//
-// @Test
-// public void testCoGroup() {
-// // Set up WayangContext.
-// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()).with(Spark.basicPlugin());
-// JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
-//
-// // Generate test data.
-// final List<Tuple2<String, Integer>> inputValues1 = Arrays.asList(
-// new Tuple2<>("Water", 0),
-// new Tuple2<>("Cola", 5),
-// new Tuple2<>("Juice", 10)
-// );
-// final List<Tuple2<String, String>> inputValues2 = Arrays.asList(
-// new Tuple2<>("Apple juice", "Juice"),
-// new Tuple2<>("Tap water", "Water"),
-// new Tuple2<>("Orange juice", "Juice")
-// );
-//
-// // Execute the job.
-// final LoadCollectionDataQuantaBuilder<Tuple2<String, Integer>> dataQuanta1 = builder.loadCollection(inputValues1);
-// final LoadCollectionDataQuantaBuilder<Tuple2<String, String>> dataQuanta2 = builder.loadCollection(inputValues2);
-// final Collection<Tuple2<Set<Tuple2<String, Integer>>, Set<Tuple2<String, String>>>> outputValues = dataQuanta1
-// .coGroup(Tuple2::getField0, dataQuanta2, Tuple2::getField1)
-// .map(joinTuple -> new Tuple2<>(
-// WayangCollections.asSet(joinTuple.getField0()),
-// WayangCollections.asSet(joinTuple.getField1())
-// ))
-// .collect();
-//
-// // Verify the outcome.
-// Set<Tuple2<Set<Tuple2<String, Integer>>, Set<Tuple2<String, String>>>> expectedValues = WayangCollections.asSet(
-// new Tuple2<>(
-// WayangCollections.asSet(new Tuple2<>("Water", 0)),
-// WayangCollections.asSet(new Tuple2<>("Tap water", "Water"))
-// ),
-// new Tuple2<>(
-// WayangCollections.asSet(new Tuple2<>("Cola", 5)),
-// WayangCollections.asSet()
-// ), new Tuple2<>(
-// WayangCollections.asSet(new Tuple2<>("Juice", 10)),
-// WayangCollections.asSet(new Tuple2<>("Apple juice", "Juice"), new Tuple2<>("Orange juice", "Juice"))
-// )
-// );
-// Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
-// }
-//
-// @Test
-// public void testCoGroupViaKeyBy() {
-// // Set up WayangContext.
-// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()).with(Spark.basicPlugin());
-// JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
-//
-// // Generate test data.
-// final List<Tuple2<String, Integer>> inputValues1 = Arrays.asList(
-// new Tuple2<>("Water", 0),
-// new Tuple2<>("Cola", 5),
-// new Tuple2<>("Juice", 10)
-// );
-// final List<Tuple2<String, String>> inputValues2 = Arrays.asList(
-// new Tuple2<>("Apple juice", "Juice"),
-// new Tuple2<>("Tap water", "Water"),
-// new Tuple2<>("Orange juice", "Juice")
-// );
-//
-// // Execute the job.
-// final LoadCollectionDataQuantaBuilder<Tuple2<String, Integer>> dataQuanta1 = builder.loadCollection(inputValues1);
-// final LoadCollectionDataQuantaBuilder<Tuple2<String, String>> dataQuanta2 = builder.loadCollection(inputValues2);
-// final Collection<Tuple2<Set<Tuple2<String, Integer>>, Set<Tuple2<String, String>>>> outputValues =
-// dataQuanta1.keyBy(Tuple2::getField0)
-// .coGroup(dataQuanta2.keyBy(Tuple2::getField1))
-// .map(joinTuple -> new Tuple2<>(
-// WayangCollections.asSet(joinTuple.getField0()),
-// WayangCollections.asSet(joinTuple.getField1())
-// ))
-// .collect();
-//
-// // Verify the outcome.
-// Set<Tuple2<Set<Tuple2<String, Integer>>, Set<Tuple2<String, String>>>> expectedValues = WayangCollections.asSet(
-// new Tuple2<>(
-// WayangCollections.asSet(new Tuple2<>("Water", 0)),
-// WayangCollections.asSet(new Tuple2<>("Tap water", "Water"))
-// ),
-// new Tuple2<>(
-// WayangCollections.asSet(new Tuple2<>("Cola", 5)),
-// WayangCollections.asSet()
-// ), new Tuple2<>(
-// WayangCollections.asSet(new Tuple2<>("Juice", 10)),
-// WayangCollections.asSet(new Tuple2<>("Apple juice", "Juice"), new Tuple2<>("Orange juice", "Juice"))
-// )
-// );
-// Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
-// }
-//
-// @Test
-// public void testIntersect() {
-// // Set up WayangContext.
-// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
-// JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
-//
-// // Generate test data.
-// final List<Integer> inputValues1 = Arrays.asList(1, 2, 3, 4, 5, 7, 8, 9, 10);
-// final List<Integer> inputValues2 = Arrays.asList(0, 2, 3, 3, 4, 5, 7, 8, 9, 11);
-//
-// // Execute the job.
-// final LoadCollectionDataQuantaBuilder<Integer> dataQuanta1 = builder.loadCollection(inputValues1);
-// final LoadCollectionDataQuantaBuilder<Integer> dataQuanta2 = builder.loadCollection(inputValues2);
-// final Collection<Integer> outputValues = dataQuanta1.intersect(dataQuanta2).collect();
-//
-// // Verify the outcome.
-// Set<Integer> expectedValues = WayangCollections.asSet(2, 3, 4, 5, 7, 8, 9);
-// Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
-// }
-//
-// @Test
-// public void testSort() {
-// // Set up WayangContext.
-// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
-// JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
-//
-// // Generate test data.
-// final List<Integer> inputValues1 = Arrays.asList(3, 4, 5, 2, 1);
-//
-// // Execute the job.
-// final LoadCollectionDataQuantaBuilder<Integer> dataQuanta1 = builder.loadCollection(inputValues1);
-// final Collection<Integer> outputValues = dataQuanta1.sort(r -> r).collect();
-//
-// // Verify the outcome.
-// List<Integer> expectedValues = Arrays.asList(1, 2, 3, 4, 5);
-// Assert.assertEquals(expectedValues, WayangCollections.asList(outputValues));
-// }
-//
-//
-// @Test
-// public void testPageRank() {
-// // Set up WayangContext.
-// WayangContext wayangContext = new WayangContext()
-// .with(Java.basicPlugin())
-// .with(Java.graphPlugin());
-// JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
-//
-// // Create a test graph.
-// Collection<Tuple2<Long, Long>> edges = Arrays.asList(
-// new Tuple2<>(0L, 1L),
-// new Tuple2<>(0L, 2L),
-// new Tuple2<>(0L, 3L),
-// new Tuple2<>(1L, 0L),
-// new Tuple2<>(2L, 1L),
-// new Tuple2<>(3L, 2L),
-// new Tuple2<>(3L, 1L)
-// );
-//
-// // Execute the job.
-// Collection<Tuple2<Long, Float>> pageRanks = builder.loadCollection(edges).asEdges()
-// .pageRank(20)
-// .collect();
-// List<Tuple2<Long, Float>> sortedPageRanks = new ArrayList<>(pageRanks);
-// sortedPageRanks.sort((pr1, pr2) -> Float.compare(pr2.field1, pr1.field1));
-//
-// System.out.println(sortedPageRanks);
-// Assert.assertEquals(1L, sortedPageRanks.get(0).field0.longValue());
-// Assert.assertEquals(0L, sortedPageRanks.get(1).field0.longValue());
-// Assert.assertEquals(2L, sortedPageRanks.get(2).field0.longValue());
-// Assert.assertEquals(3L, sortedPageRanks.get(3).field0.longValue());
-// }
-//
-// @Test
-// public void testMapPartitions() {
-// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
-// JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
-//
-// // Generate test data.
-// List<Integer> inputValues = WayangArrays.asList(0, 1, 2, 3, 4, 6, 8);
-//
-// // Execute the job.
-// Collection<Tuple2<String, Integer>> outputValues = builder.loadCollection(inputValues)
-// .mapPartitions(partition -> {
-// int numEvens = 0, numOdds = 0;
-// for (Integer value : partition) {
-// if ((value & 1) == 0) numEvens++;
-// else numOdds++;
-// }
-// return Arrays.asList(
-// new Tuple2<>("odd", numOdds),
-// new Tuple2<>("even", numEvens)
-// );
-// })
-// .reduceByKey(Tuple2::getField0, (t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1()))
-// .collect();
-//
-// // Check the output.
-// Set<Tuple2<String, Integer>> expectedOutput = WayangCollections.asSet(
-// new Tuple2<>("even", 5), new Tuple2<>("odd", 2)
-// );
-// Assert.assertEquals(expectedOutput, WayangCollections.asSet(outputValues));
-// }
-//
-// @Test
-// public void testZipWithId() {
-// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
-// JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
-//
-// // Generate test data.
-// List<Integer> inputValues = new ArrayList<>(42 * 100);
-// for (int i = 0; i < 100; i++) {
-// for (int j = 0; j < 42; j++) {
-// inputValues.add(i);
-// }
-// }
-//
-// // Execute the job.
-// Collection<Tuple2<Integer, Integer>> outputValues = builder.loadCollection(inputValues)
-// .zipWithId()
-// .groupByKey(Tuple2::getField1)
-// .map(group -> {
-// int distinctIds = (int) StreamSupport.stream(group.spliterator(), false)
-// .map(Tuple2::getField0)
-// .distinct()
-// .count();
-// return new Tuple2<>(distinctIds, 1);
-// })
-// .reduceByKey(Tuple2::getField0, (t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1()))
-// .collect();
-//
-// // Check the output.
-// Set<Tuple2<Integer, Integer>> expectedOutput = Collections.singleton(new Tuple2<>(42, 100));
-// Assert.assertEquals(expectedOutput, WayangCollections.asSet(outputValues));
-// }
-//
-// @Test
-// public void testWriteTextFile() throws IOException, URISyntaxException {
-// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
-// JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
-//
-// // Generate test data.
-// List<Double> inputValues = Arrays.asList(0d, 1 / 3d, 2 / 3d, 1d, 4 / 3d, 5 / 3d);
-//
-// // Execute the job.
-// File tempDir = LocalFileSystem.findTempDir();
-// String targetUrl = LocalFileSystem.toURL(new File(tempDir, "testWriteTextFile.txt"));
-//
-// builder
-// .loadCollection(inputValues)
-// .writeTextFile(targetUrl, d -> String.format("%.2f", d), "testWriteTextFile()");
-//
-// // Check the output.
-// Set<String> actualLines = Files.lines(Paths.get(new URI(targetUrl))).collect(Collectors.toSet());
-// Set<String> expectedLines = inputValues.stream().map(d -> String.format("%.2f", d)).collect(Collectors.toSet());
-// Assert.assertEquals(expectedLines, actualLines);
-// }
-//
-// @Test
-// public void testSqlOnJava() throws IOException, SQLException {
-// // Execute job.
-// final WayangContext wayangCtx = new WayangContext(this.sqlite3Configuration)
-// .with(Java.basicPlugin())
-// .with(Sqlite3.plugin());
-// JavaPlanBuilder builder = new JavaPlanBuilder(wayangCtx, "testSqlOnJava()");
-// final Collection<String> outputValues = builder
-// .readTable(new Sqlite3TableSource("customer", "name", "age"))
-// .filter(r -> (Integer) r.getField(1) >= 18).withSqlUdf("age >= 18").withTargetPlatform(Java.platform())
-// .asRecords().projectRecords(new String[]{"name"})
-// .map(record -> (String) record.getField(0))
-// .collect();
-//
-// // Test the outcome.
-// Assert.assertEquals(
-// WayangCollections.asSet("John", "Evelyn"),
-// WayangCollections.asSet(outputValues)
-// );
-// }
-//
-// @Test
-// public void testSqlOnSqlite3() throws IOException, SQLException {
-// // Execute job.
-// final WayangContext wayangCtx = new WayangContext(this.sqlite3Configuration)
-// .with(Java.basicPlugin())
-// .with(Sqlite3.plugin());
-// JavaPlanBuilder builder = new JavaPlanBuilder(wayangCtx, "testSqlOnSqlite3()");
-// final Collection<String> outputValues = builder
-// .readTable(new Sqlite3TableSource("customer", "name", "age"))
-// .filter(r -> (Integer) r.getField(1) >= 18).withSqlUdf("age >= 18")
-// .asRecords().projectRecords(new String[]{"name"}).withTargetPlatform(Sqlite3.platform())
-// .map(record -> (String) record.getField(0))
-// .collect();
-//
-// // Test the outcome.
-// Assert.assertEquals(
-// WayangCollections.asSet("John", "Evelyn"),
-// WayangCollections.asSet(outputValues)
-// );
-// }
-
-}
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/scala/org/apache/wayang/api/ApiTest.scala b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/scala/org/apache/wayang/api/ApiTest.scala
deleted file mode 100644
index ed1a7ac..0000000
--- a/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/scala/org/apache/wayang/api/ApiTest.scala
+++ /dev/null
@@ -1,575 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.wayang.api
-
-import org.apache.wayang.basic.WayangBasics
-import org.apache.wayang.core.api.{Configuration, WayangContext}
-import org.apache.wayang.core.function.FunctionDescriptor.ExtendedSerializablePredicate
-import org.apache.wayang.core.function.{ExecutionContext, TransformationDescriptor}
-import org.apache.wayang.core.util.fs.LocalFileSystem
-import org.apache.wayang.java.Java
-import org.junit.{Assert, Test}
-
-import java.io.File
-import java.net.URI
-import java.nio.file.{Files, Paths}
-import java.sql.{Connection, Statement}
-import java.util.function.Consumer
-
-/**
- * Tests the Wayang API.
- */
-class ApiTest {
-
- @Test
- def testReadMapCollect(): Unit = {
- // Set up WayangContext.
- val wayangContext = new WayangContext()
- .withPlugin(Java.basicPlugin)
-// .withPlugin(Spark.basicPlugin)
- // Generate some test data.
- val inputValues = (for (i <- 1 to 10) yield i).toArray
-
- // Build and execute a Wayang plan.
- val outputValues = wayangContext
- .loadCollection(inputValues).withName("Load input values")
- .map(_ + 2).withName("Add 2")
- .collect()
-
- // Check the outcome.
- val expectedOutputValues = inputValues.map(_ + 2)
- Assert.assertArrayEquals(expectedOutputValues, outputValues.toArray)
- }
-
-// @Test
-// def testCustomOperator(): Unit = {
-// // Set up WayangContext.
-// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
-//
-// // Generate some test data.
-// val inputValues = (for (i <- 1 to 10) yield i).toArray
-//
-// // Build and execute a Wayang plan.
-// val inputDataSet = wayang.loadCollection(inputValues).withName("Load input values")
-//
-// // Add the custom operator.
-// val IndexedSeq(addedValues) = wayang.customOperator(new JavaMapOperator(
-// dataSetType[Int],
-// dataSetType[Int],
-// new TransformationDescriptor(
-// toSerializableFunction[Int, Int](_ + 2),
-// basicDataUnitType[Int], basicDataUnitType[Int]
-// )
-// ), inputDataSet)
-// addedValues.withName("Add 2")
-//
-// // Collect the result.
-// val outputValues = addedValues.asInstanceOf[DataQuanta[Int]].collect()
-//
-// // Check the outcome.
-// val expectedOutputValues = inputValues.map(_ + 2)
-// Assert.assertArrayEquals(expectedOutputValues, outputValues.toArray)
-// }
-//
-// @Test
-// def testCustomOperatorShortCut(): Unit = {
-// // Set up WayangContext.
-// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
-//
-// // Generate some test data.
-// val inputValues = (for (i <- 1 to 10) yield i).toArray
-//
-// // Build and execute a Wayang plan.
-// val outputValues = wayang
-// .loadCollection(inputValues).withName("Load input values")
-// .customOperator[Int](new JavaMapOperator(
-// dataSetType[Int],
-// dataSetType[Int],
-// new TransformationDescriptor(
-// toSerializableFunction[Int, Int](_ + 2),
-// basicDataUnitType[Int], basicDataUnitType[Int]
-// )
-// )).withName("Add 2")
-// .collect()
-//
-// // Check the outcome.
-// val expectedOutputValues = inputValues.map(_ + 2)
-// Assert.assertArrayEquals(expectedOutputValues, outputValues.toArray)
-// }
-//
-// @Test
-// def testWordCount(): Unit = {
-// // Set up WayangContext.
-// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
-//
-// // Generate some test data.
-// val inputValues = Array("Big data is big.", "Is data big data?")
-//
-// // Build and execute a word count WayangPlan.
-// val wordCounts = wayang
-// .loadCollection(inputValues).withName("Load input values")
-// .flatMap(_.split("\\s+")).withName("Split words")
-// .map(_.replaceAll("\\W+", "").toLowerCase).withName("To lowercase")
-// .map((_, 1)).withName("Attach counter")
-// .reduceByKey(_._1, (a, b) => (a._1, a._2 + b._2)).withName("Sum counters")
-// .collect().toSet
-//
-// val expectedWordCounts = Set(("big", 3), ("is", 2), ("data", 3))
-//
-// Assert.assertEquals(expectedWordCounts, wordCounts)
-// }
-//
-// @Test
-// def testWordCountOnSparkAndJava(): Unit = {
-// // Set up WayangContext.
-// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
-//
-// // Generate some test data.
-// val inputValues = Array("Big data is big.", "Is data big data?")
-//
-// // Build and execute a word count WayangPlan.
-// val wordCounts = wayang
-// .loadCollection(inputValues).withName("Load input values").withTargetPlatforms(Java.platform)
-// .flatMap(_.split("\\s+")).withName("Split words").withTargetPlatforms(Java.platform)
-// .map(_.replaceAll("\\W+", "").toLowerCase).withName("To lowercase").withTargetPlatforms(Spark.platform)
-// .map((_, 1)).withName("Attach counter").withTargetPlatforms(Spark.platform)
-// .reduceByKey(_._1, (a, b) => (a._1, a._2 + b._2)).withName("Sum counters").withTargetPlatforms(Spark.platform)
-// .collect().toSet
-//
-// val expectedWordCounts = Set(("big", 3), ("is", 2), ("data", 3))
-//
-// Assert.assertEquals(expectedWordCounts, wordCounts)
-// }
-//
-// @Test
-// def testSample(): Unit = {
-// // Set up WayangContext.
-// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
-//
-// // Generate some test data.
-// val inputValues = for (i <- 0 until 100) yield i
-//
-// // Build and execute the WayangPlan.
-// val sample = wayang
-// .loadCollection(inputValues)
-// .sample(10)
-// .collect()
-//
-// // Check the result.
-// Assert.assertEquals(10, sample.size)
-// Assert.assertEquals(10, sample.toSet.size)
-// }
-//
-// @Test
-// def testDoWhile(): Unit = {
-// // Set up WayangContext.
-// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
-//
-// // Generate some test data.
-// val inputValues = Array(1, 2)
-//
-// // Build and execute a word count WayangPlan.
-//
-// val values = wayang
-// .loadCollection(inputValues).withName("Load input values")
-// .doWhile[Int](vals => vals.max > 100, {
-// start =>
-// val sum = start.reduce(_ + _).withName("Sum")
-// (start.union(sum).withName("Old+new"), sum)
-// }).withName("While <= 100")
-// .collect().toSet
-//
-// val expectedValues = Set(1, 2, 3, 6, 12, 24, 48, 96, 192)
-// Assert.assertEquals(expectedValues, values)
-// }
-//
-// @Test
-// def testRepeat(): Unit = {
-// // Set up WayangContext.
-// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
-//
-// // Generate some test data.
-// val inputValues = Array(1, 2)
-//
-// // Build and execute a word count WayangPlan.
-//
-// val values = wayang
-// .loadCollection(inputValues).withName("Load input values").withName(inputValues.mkString(","))
-// .repeat(3,
-// _.reduce(_ * _).withName("Multiply")
-// .flatMap(v => Seq(v, v + 1)).withName("Duplicate")
-// ).withName("Repeat 3x")
-// .collect().toSet
-//
-// // initial: 1,2 -> 1st: 2,3 -> 2nd: 6,7 => 3rd: 42,43
-// val expectedValues = Set(42, 43)
-// Assert.assertEquals(expectedValues, values)
-// }
-//
-// @Test
-// def testBroadcast() = {
-// // Set up WayangContext.
-// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
-// val builder = new PlanBuilder(wayang)
-//
-// // Generate some test data.
-// val inputStrings = Array("Hello", "World", "Hi", "Mars")
-// val selectors = Array('o', 'l')
-//
-// val selectorsDataSet = builder.loadCollection(selectors).withName("Load selectors")
-//
-// // Build and execute a word count WayangPlan.
-// val values = builder
-// .loadCollection(inputStrings).withName("Load input values")
-// .filterJava(new ExtendedSerializablePredicate[String] {
-//
-// var selectors: Iterable[Char] = _
-//
-// override def open(ctx: ExecutionContext): Unit = {
-// import scala.collection.JavaConversions._
-// selectors = collectionAsScalaIterable(ctx.getBroadcast[Char]("selectors"))
-// }
-//
-// override def test(t: String): Boolean = selectors.forall(selector => t.contains(selector))
-//
-// }).withName("Filter words")
-// .withBroadcast(selectorsDataSet, "selectors")
-// .collect().toSet
-//
-// val expectedValues = Set("Hello", "World")
-// Assert.assertEquals(expectedValues, values)
-// }
-//
-// @Test
-// def testGroupBy() = {
-// // Set up WayangContext.
-// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
-//
-// val inputValues = Array(1, 2, 3, 4, 5, 7, 8, 9, 10)
-//
-// val result = wayang
-// .loadCollection(inputValues)
-// .groupByKey(_ % 2).withName("group odd and even")
-// .map {
-// group =>
-// import scala.collection.JavaConversions._
-// val buffer = group.toBuffer
-// buffer.sortBy(identity)
-// if (buffer.size % 2 == 0) (buffer(buffer.size / 2 - 1) + buffer(buffer.size / 2)) / 2
-// else buffer(buffer.size / 2)
-// }.withName("median")
-// .collect()
-//
-// val expectedValues = Set(5, 6)
-// Assert.assertEquals(expectedValues, result.toSet)
-// }
-//
-// @Test
-// def testGroup() = {
-// // Set up WayangContext.
-// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
-//
-// val inputValues = Array(1, 2, 3, 4, 5, 7, 8, 9, 10)
-//
-// val result = wayang
-// .loadCollection(inputValues)
-// .group()
-// .map {
-// group =>
-// import scala.collection.JavaConversions._
-// val buffer = group.toBuffer
-// buffer.sortBy(int => int)
-// if (buffer.size % 2 == 0) (buffer(buffer.size / 2) + buffer(buffer.size / 2 + 1)) / 2
-// else buffer(buffer.size / 2)
-// }
-// .collect()
-//
-// val expectedValues = Set(5)
-// Assert.assertEquals(expectedValues, result.toSet)
-// }
-//
-// @Test
-// def testJoin() = {
-// // Set up WayangContext.
-// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
-//
-// val inputValues1 = Array(("Water", 0), ("Tonic", 5), ("Juice", 10))
-// val inputValues2 = Array(("Apple juice", "Juice"), ("Tap water", "Water"), ("Orange juice", "Juice"))
-//
-// val builder = new PlanBuilder(wayang)
-// val dataQuanta1 = builder.loadCollection(inputValues1)
-// val dataQuanta2 = builder.loadCollection(inputValues2)
-// val result = dataQuanta1
-// .join[(String, String), String](_._1, dataQuanta2, _._2)
-// .map(joinTuple => (joinTuple.field1._1, joinTuple.field0._2))
-// .collect()
-//
-// val expectedValues = Set(("Apple juice", 10), ("Tap water", 0), ("Orange juice", 10))
-// Assert.assertEquals(expectedValues, result.toSet)
-// }
-//
-// @Test
-// def testJoinAndAssemble() = {
-// // Set up WayangContext.
-// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
-//
-// val inputValues1 = Array(("Water", 0), ("Tonic", 5), ("Juice", 10))
-// val inputValues2 = Array(("Apple juice", "Juice"), ("Tap water", "Water"), ("Orange juice", "Juice"))
-//
-// val builder = new PlanBuilder(wayang)
-// val dataQuanta1 = builder.loadCollection(inputValues1)
-// val dataQuanta2 = builder.loadCollection(inputValues2)
-// val result = dataQuanta1.keyBy(_._1).join(dataQuanta2.keyBy(_._2))
-// .assemble((dq1, dq2) => (dq2._1, dq1._2))
-// .collect()
-//
-// val expectedValues = Set(("Apple juice", 10), ("Tap water", 0), ("Orange juice", 10))
-// Assert.assertEquals(expectedValues, result.toSet)
-// }
-//
-//
-// @Test
-// def testCoGroup() = {
-// // Set up WayangContext.
-// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
-//
-// val inputValues1 = Array(("Water", 0), ("Cola", 5), ("Juice", 10))
-// val inputValues2 = Array(("Apple juice", "Juice"), ("Tap water", "Water"), ("Orange juice", "Juice"))
-//
-// val builder = new PlanBuilder(wayang)
-// val dataQuanta1 = builder.loadCollection(inputValues1)
-// val dataQuanta2 = builder.loadCollection(inputValues2)
-// val result = dataQuanta1
-// .coGroup[(String, String), String](_._1, dataQuanta2, _._2)
-// .collect()
-//
-// import scala.collection.JavaConversions._
-// val actualValues = result.map(coGroup => (coGroup.field0.toSet, coGroup.field1.toSet)).toSet
-// val expectedValues = Set(
-// (Set(("Water", 0)), Set(("Tap water", "Water"))),
-// (Set(("Cola", 5)), Set()),
-// (Set(("Juice", 10)), Set(("Apple juice", "Juice"), ("Orange juice", "Juice")))
-// )
-// Assert.assertEquals(expectedValues, actualValues)
-// }
-//
-// @Test
-// def testIntersect() = {
-// // Set up WayangContext.
-// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
-//
-// val inputValues1 = Array(1, 2, 3, 4, 5, 7, 8, 9, 10)
-// val inputValues2 = Array(0, 2, 3, 3, 4, 5, 7, 8, 9, 11)
-//
-// val builder = new PlanBuilder(wayang)
-// val dataQuanta1 = builder.loadCollection(inputValues1)
-// val dataQuanta2 = builder.loadCollection(inputValues2)
-// val result = dataQuanta1
-// .intersect(dataQuanta2)
-// .collect()
-//
-// val expectedValues = Set(2, 3, 4, 5, 7, 8, 9)
-// Assert.assertEquals(expectedValues, result.toSet)
-// }
-//
-//
-// @Test
-// def testSort() = {
-// // Set up WayangContext.
-// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
-//
-// val inputValues1 = Array(3, 4, 5, 2, 1)
-//
-// val builder = new PlanBuilder(wayang)
-// val dataQuanta1 = builder.loadCollection(inputValues1)
-// val result = dataQuanta1
-// .sort(r=>r)
-// .collect()
-//
-// val expectedValues = Array(1, 2, 3, 4, 5)
-// Assert.assertArrayEquals(expectedValues, result.toArray)
-// }
-//
-//
-// @Test
-// def testPageRank() = {
-// // Set up WayangContext.
-// val wayang = new WayangContext()
-// .withPlugin(Java.graphPlugin)
-// .withPlugin(WayangBasics.graphPlugin)
-// .withPlugin(Java.basicPlugin)
-// import org.apache.wayang.api.graph._
-//
-// val edges = Seq((0, 1), (0, 2), (0, 3), (1, 0), (2, 1), (3, 2), (3, 1)).map(t => Edge(t._1, t._2))
-//
-// val pageRanks = wayang
-// .loadCollection(edges).withName("Load edges")
-// .pageRank(20).withName("PageRank")
-// .collect()
-// .map(t => t.field0.longValue -> t.field1)
-// .toMap
-//
-// print(pageRanks)
-// // Let's not check absolute numbers but only the relative ordering.
-// Assert.assertTrue(pageRanks(1) > pageRanks(0))
-// Assert.assertTrue(pageRanks(0) > pageRanks(2))
-// Assert.assertTrue(pageRanks(2) > pageRanks(3))
-// }
-//
-// @Test
-// def testMapPartitions() = {
-// // Set up WayangContext.
-// val wayang = new WayangContext()
-// .withPlugin(Java.basicPlugin())
-// .withPlugin(Spark.basicPlugin)
-//
-// val typeCounts = wayang
-// .loadCollection(Seq(0, 1, 2, 3, 4, 6, 8))
-// .mapPartitions { ints =>
-// var (numOdds, numEvens) = (0, 0)
-// ints.foreach(i => if ((i & 1) == 0) numEvens += 1 else numOdds += 1)
-// Seq(("odd", numOdds), ("even", numEvens))
-// }
-// .reduceByKey(_._1, { case ((kind1, count1), (kind2, count2)) => (kind1, count1 + count2) })
-// .collect()
-//
-// Assert.assertEquals(Set(("odd", 2), ("even", 5)), typeCounts.toSet)
-// }
-//
-// @Test
-// def testZipWithId() = {
-// // Set up WayangContext.
-// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
-//
-// val inputValues = for (i <- 0 until 100; j <- 0 until 42) yield i
-//
-// val result = wayang
-// .loadCollection(inputValues)
-// .zipWithId
-// .groupByKey(_.field1)
-// .map { group =>
-// import scala.collection.JavaConversions._
-// (group.map(_.field0).toSet.size, 1)
-// }
-// .reduceByKey(_._1, (t1, t2) => (t1._1, t1._2 + t2._2))
-// .collect()
-//
-// val expectedValues = Set((42, 100))
-// Assert.assertEquals(expectedValues, result.toSet)
-// }
-//
-// @Test
-// def testWriteTextFile() = {
-// val tempDir = LocalFileSystem.findTempDir
-// val targetUrl = LocalFileSystem.toURL(new File(tempDir, "testWriteTextFile.txt"))
-//
-// // Set up WayangContext.
-// val wayang = new WayangContext().withPlugin(Java.basicPlugin)
-//
-// val inputValues = for (i <- 0 to 5) yield i * 0.333333333333
-//
-// val result = wayang
-// .loadCollection(inputValues)
-// .writeTextFile(targetUrl, formatterUdf = d => f"${d % .2f}")
-//
-// val lines = scala.collection.mutable.Set[String]()
-// Files.lines(Paths.get(new URI(targetUrl))).forEach(new Consumer[String] {
-// override def accept(line: String): Unit = lines += line
-// })
-//
-// val expectedLines = inputValues.map(v => f"${v % .2f}").toSet
-// Assert.assertEquals(expectedLines, lines)
-// }
-//
-// @Test
-// def testSqlOnJava() = {
-// // Initialize some test data.
-// val configuration = new Configuration
-// val sqlite3dbFile = File.createTempFile("wayang-sqlite3", "db")
-// sqlite3dbFile.deleteOnExit()
-// configuration.setProperty("wayang.sqlite3.jdbc.url", "jdbc:sqlite:" + sqlite3dbFile.getAbsolutePath)
-//
-// try {
-// val connection: Connection = Sqlite3.platform.createDatabaseDescriptor(configuration).createJdbcConnection
-// try {
-// val statement: Statement = connection.createStatement
-// statement.addBatch("DROP TABLE IF EXISTS customer;")
-// statement.addBatch("CREATE TABLE customer (name TEXT, age INT);")
-// statement.addBatch("INSERT INTO customer VALUES ('John', 20)")
-// statement.addBatch("INSERT INTO customer VALUES ('Timmy', 16)")
-// statement.addBatch("INSERT INTO customer VALUES ('Evelyn', 35)")
-// statement.executeBatch()
-// } finally {
-// if (connection != null) connection.close()
-// }
-// }
-//
-// // Set up WayangContext.
-// val wayang = new WayangContext(configuration).withPlugin(Java.basicPlugin).withPlugin(Sqlite3.plugin)
-//
-// val result = wayang
-// .readTable(new Sqlite3TableSource("customer", "name", "age"))
-// .filter(r => r.getField(1).asInstanceOf[Integer] >= 18, sqlUdf = "age >= 18").withTargetPlatforms(Java.platform)
-// .projectRecords(Seq("name"))
-// .map(_.getField(0).asInstanceOf[String])
-// .collect()
-// .toSet
-//
-// val expectedValues = Set("John", "Evelyn")
-// Assert.assertEquals(expectedValues, result)
-// }
-//
-// @Test
-// def testSqlOnSqlite3() = {
-// // Initialize some test data.
-// val configuration = new Configuration
-// val sqlite3dbFile = File.createTempFile("wayang-sqlite3", "db")
-// sqlite3dbFile.deleteOnExit()
-// configuration.setProperty("wayang.sqlite3.jdbc.url", "jdbc:sqlite:" + sqlite3dbFile.getAbsolutePath)
-//
-// try {
-// val connection: Connection = Sqlite3.platform.createDatabaseDescriptor(configuration).createJdbcConnection
-// try {
-// val statement: Statement = connection.createStatement
-// statement.addBatch("DROP TABLE IF EXISTS customer;")
-// statement.addBatch("CREATE TABLE customer (name TEXT, age INT);")
-// statement.addBatch("INSERT INTO customer VALUES ('John', 20)")
-// statement.addBatch("INSERT INTO customer VALUES ('Timmy', 16)")
-// statement.addBatch("INSERT INTO customer VALUES ('Evelyn', 35)")
-// statement.executeBatch
-// } finally {
-// if (connection != null) connection.close()
-// }
-// }
-//
-// // Set up WayangContext.
-// val wayang = new WayangContext(configuration).withPlugin(Java.basicPlugin).withPlugin(Sqlite3.plugin)
-//
-// val result = wayang
-// .readTable(new Sqlite3TableSource("customer", "name", "age"))
-// .filter(r => r.getField(1).asInstanceOf[Integer] >= 18, sqlUdf = "age >= 18")
-// .projectRecords(Seq("name")).withTargetPlatforms(Sqlite3.platform)
-// .map(_.getField(0).asInstanceOf[String])
-// .collect()
-// .toSet
-//
-// val expectedValues = Set("John", "Evelyn")
-// Assert.assertEquals(expectedValues, result)
-// }
-}
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/scala/org/apache/wayang/plugin/hackit/api/ApiExtensionTest.scala b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/scala/org/apache/wayang/plugin/hackit/api/ApiExtensionTest.scala
new file mode 100644
index 0000000..59a0fd9
--- /dev/null
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/scala/org/apache/wayang/plugin/hackit/api/ApiExtensionTest.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.wayang.plugin.hackit.api
+
+import org.apache.wayang.api.ApiTest
+import org.apache.wayang.api.dataquanta.DataQuantaFactory
+import org.junit.{BeforeClass, Test}
+import org.junit.jupiter.api.{BeforeAll, BeforeEach}
+
+class ApiExtensionTest extends ApiTest {
+
+ @BeforeEach
+ def setUp() ={
+ DataQuantaFactory.setTemplate(DataQuantaHackit);
+ }
+
+ @Test
+ override def testReadMapCollect(): Unit = {
+ DataQuantaFactory.setTemplate(DataQuantaHackit);
+
+ super.testReadMapCollect()
+ }
+
+}
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/wayang.properties b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/wayang.properties
new file mode 100644
index 0000000..b296279
--- /dev/null
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/wayang.properties
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+spark.driver.host = localhost