You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2021/05/18 04:50:52 UTC

[incubator-wayang] 04/08: [WAYANG-28] creation a base of DataQuantaHackit and extension of test

This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch debugger
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit 4cdd676f519952aba107e80532e1bba220ea1ddf
Author: Bertty Contreras-Rojas <be...@scalytics.io>
AuthorDate: Fri May 14 08:55:46 2021 -0400

    [WAYANG-28] creation a base of DataQuantaHackit and extension of test
---
 .../wayang-hackit/wayang-hackit-api/pom.xml        |  30 +
 .../plugin/hackit/api/DataQuantaHackit.scala       | 395 ++++++++++++
 .../java/org/apache/wayang/api/JavaApiTest.java    | 711 ---------------------
 .../test/scala/org/apache/wayang/api/ApiTest.scala | 575 -----------------
 .../plugin/hackit/api/ApiExtensionTest.scala       |  41 ++
 .../wayang-hackit-api/src/test/wayang.properties   |  18 +
 6 files changed, 484 insertions(+), 1286 deletions(-)

diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-api/pom.xml b/wayang-plugins/wayang-hackit/wayang-hackit-api/pom.xml
index 548f9e6..ae95199 100644
--- a/wayang-plugins/wayang-hackit/wayang-hackit-api/pom.xml
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-api/pom.xml
@@ -30,6 +30,11 @@
 
     <artifactId>wayang-hackit-api</artifactId>
 
+    <properties>
+        <java-module-name>org.apache.wayang.plugin.hackit.api</java-module-name>
+        <spark.version>2.4.0</spark.version>
+    </properties>
+
     <dependencyManagement>
         <dependencies>
             <dependency>
@@ -50,6 +55,31 @@
         </dependency>
         <dependency>
             <groupId>org.apache.wayang</groupId>
+            <artifactId>wayang-api-scala-java_2.11</artifactId>
+            <version>0.6.0-SNAPSHOT</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.wayang</groupId>
+            <artifactId>wayang-spark_${scala.mayor.version}</artifactId>
+            <version>0.6.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.mayor.version}</artifactId>
+            <version>${spark.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.wayang</groupId>
+            <artifactId>wayang-sqlite3</artifactId>
+            <version>0.6.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.wayang</groupId>
             <artifactId>wayang-java</artifactId>
             <version>0.6.0-SNAPSHOT</version>
             <scope>test</scope>
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-api/src/main/scala/org/apache/wayang/plugin/hackit/api/DataQuantaHackit.scala b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/main/scala/org/apache/wayang/plugin/hackit/api/DataQuantaHackit.scala
new file mode 100644
index 0000000..3f76c7b
--- /dev/null
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/main/scala/org/apache/wayang/plugin/hackit/api/DataQuantaHackit.scala
@@ -0,0 +1,395 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ */
+
+package org.apache.wayang.plugin.hackit.api
+
+import org.apache.wayang.api.{PlanBuilder, basicDataUnitType, dataSetType, groupedDataSetType, groupedDataUnitType}
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaCreator, KeyedDataQuanta}
+import org.apache.wayang.basic.data.{Tuple2 => WayangTuple2}
+import org.apache.wayang.basic.function.ProjectionDescriptor
+import org.apache.wayang.basic.operators.{CartesianOperator, CoGroupOperator, CountOperator, DistinctOperator, FilterOperator, FlatMapOperator, GlobalMaterializedGroupOperator, GlobalReduceOperator, IntersectOperator, JoinOperator, MapOperator, MapPartitionsOperator, MaterializedGroupByOperator, ReduceByOperator, SampleOperator, SortOperator, UnionAllOperator, ZipWithIdOperator}
+import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBinaryOperator, SerializableFunction, SerializablePredicate}
+import org.apache.wayang.core.function.{FlatMapDescriptor, MapPartitionsDescriptor, PredicateDescriptor, ReduceDescriptor, TransformationDescriptor}
+import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
+import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator
+import org.apache.wayang.core.plan.wayangplan.{ElementaryOperator, Operator, OutputSlot, WayangPlan}
+
+import java.lang
+import java.lang.{Iterable => JavaIterable}
+import java.util.function.IntUnaryOperator
+import scala.reflect.ClassTag
+
+/**
+ * Represents an intermediate result/data flow edge in a [[WayangPlan]].
+ *
+ * @param operator    a unary [[Operator]] that produces this instance
+ * @param ev$1        the data type of the elements in this instance
+ * @param planBuilder keeps track of the [[WayangPlan]] being build
+ */
+class DataQuantaHackit[Out: ClassTag]
+                            (override val operator: ElementaryOperator, outputIndex: Int = 0)
+                            (implicit override val  planBuilder: PlanBuilder)
+                    extends DataQuanta[Out](operator, outputIndex) {
+
+  /**
+   * Feed this instance into a [[MapOperator]].
+   *
+   * @param udf     a Java 8 lambda expression as UDF for the [[MapOperator]]
+   * @param udfLoad optional [[LoadProfileEstimator]] for the `udf`
+   * @return a new instance representing the [[MapOperator]]'s output
+   */
+  override def mapJava[NewOut: ClassTag](udf: SerializableFunction[Out, NewOut],
+                                         udfLoad: LoadProfileEstimator = null): DataQuantaHackit[NewOut] = {
+    val lala = new SerializableFunction[Out, NewOut] {
+      override def apply(t: Out): NewOut = {
+        println(t)
+        udf.apply(t)
+      }
+    }
+    val mapOperator = new MapOperator(new TransformationDescriptor(
+      lala, basicDataUnitType[Out], basicDataUnitType[NewOut], udfLoad
+    ))
+    this.connectTo(mapOperator, 0)
+    DataQuantaHackit.wrap[NewOut](mapOperator)
+  }
+
+  /**
+   * Feed this instance into a [[MapPartitionsOperator]].
+   *
+   * @param udf         a Java 8 lambda expression as UDF for the [[MapPartitionsOperator]]
+   * @param selectivity selectivity of the UDF
+   * @param udfLoad     optional [[LoadProfileEstimator]] for the `udf`
+   * @return a new instance representing the [[MapOperator]]'s output
+   */
+  override def mapPartitionsJava[NewOut: ClassTag](udf: SerializableFunction[JavaIterable[Out], JavaIterable[NewOut]],
+                                                   selectivity: ProbabilisticDoubleInterval = null,
+                                                   udfLoad: LoadProfileEstimator = null): DataQuantaHackit[NewOut] = {
+    val mapOperator = new MapPartitionsOperator(
+      new MapPartitionsDescriptor(udf, basicDataUnitType[Out], basicDataUnitType[NewOut], selectivity, udfLoad)
+    )
+    this.connectTo(mapOperator, 0)
+    DataQuantaHackit.wrap[NewOut](mapOperator)
+  }
+
+  /**
+   * Feed this instance into a [[MapOperator]] with a [[ProjectionDescriptor]].
+   *
+   * @param fieldNames names of the fields to be projected
+   * @return a new instance representing the [[MapOperator]]'s output
+   */
+  override def project[NewOut: ClassTag](fieldNames: Seq[String]): DataQuantaHackit[NewOut] = {
+    val projectionOperator = new MapOperator(
+      new ProjectionDescriptor(basicDataUnitType[Out], basicDataUnitType[NewOut], fieldNames: _*)
+    )
+    this.connectTo(projectionOperator, 0)
+    DataQuantaHackit.wrap[NewOut](projectionOperator)
+  }
+
+  /**
+   * Feed this instance into a [[FilterOperator]].
+   *
+   * @param udf         UDF for the [[FilterOperator]]
+   * @param sqlUdf      UDF as SQL `WHERE` clause
+   * @param selectivity selectivity of the UDF
+   * @param udfLoad     optional [[LoadProfileEstimator]] for the `udf`
+   * @return a new instance representing the [[FilterOperator]]'s output
+   */
+  override def filterJava(udf: SerializablePredicate[Out],
+                          sqlUdf: String = null,
+                          selectivity: ProbabilisticDoubleInterval = null,
+                          udfLoad: LoadProfileEstimator = null): DataQuantaHackit[Out] = {
+    val filterOperator = new FilterOperator(new PredicateDescriptor(
+      udf, this.output.getType.getDataUnitType.toBasicDataUnitType, selectivity, udfLoad
+    ).withSqlImplementation(sqlUdf))
+    this.connectTo(filterOperator, 0)
+    DataQuantaHackit.wrap[Out](filterOperator)
+  }
+
+  /**
+   * Feed this instance into a [[FlatMapOperator]].
+   *
+   * @param udf         a Java 8 lambda expression as UDF for the [[FlatMapOperator]]
+   * @param selectivity selectivity of the UDF
+   * @param udfLoad     optional [[LoadProfileEstimator]] for the `udf`
+   * @return a new instance representing the [[FlatMapOperator]]'s output
+   */
+  override def flatMapJava[NewOut: ClassTag](udf: SerializableFunction[Out, JavaIterable[NewOut]],
+                                             selectivity: ProbabilisticDoubleInterval = null,
+                                             udfLoad: LoadProfileEstimator = null): DataQuantaHackit[NewOut] = {
+    val flatMapOperator = new FlatMapOperator(new FlatMapDescriptor(
+      udf, basicDataUnitType[Out], basicDataUnitType[NewOut], selectivity, udfLoad
+    ))
+    this.connectTo(flatMapOperator, 0)
+    DataQuantaHackit.wrap[NewOut](flatMapOperator)
+  }
+
+
+  /**
+   * Feed this instance into a [[SampleOperator]].
+   *
+   * @param sampleSizeFunction absolute size of the sample as a function of the current iteration number
+   * @param datasetSize        optional size of the dataset to be sampled
+   * @param sampleMethod       the [[SampleOperator.Methods]] to use for sampling
+   * @return a new instance representing the [[FlatMapOperator]]'s output
+   */
+  override def sampleDynamicJava(sampleSizeFunction: IntUnaryOperator,
+                                 datasetSize: Long = SampleOperator.UNKNOWN_DATASET_SIZE,
+                                 seed: Option[Long] = None,
+                                 sampleMethod: SampleOperator.Methods = SampleOperator.Methods.ANY): DataQuantaHackit[Out] = {
+    if (seed.isEmpty) {
+      val sampleOperator = new SampleOperator(
+        sampleSizeFunction,
+        dataSetType[Out],
+        sampleMethod
+      )
+      sampleOperator.setDatasetSize(datasetSize)
+      this.connectTo(sampleOperator, 0)
+      DataQuantaHackit.wrap[Out](sampleOperator)
+    }
+    else {
+      val sampleOperator = new SampleOperator(
+        sampleSizeFunction,
+        dataSetType[Out],
+        sampleMethod,
+        seed.get
+      )
+      sampleOperator.setDatasetSize(datasetSize)
+      this.connectTo(sampleOperator, 0)
+      DataQuantaHackit.wrap[Out](sampleOperator)
+    }
+  }
+
+  /**
+   * Assigns this instance a key extractor, which enables some key-based operations.
+   *
+   * @see KeyedDataQuanta
+   * @param keyExtractor extracts the key from the [[DataQuantaDefault]]
+   * @return the [[KeyedDataQuanta]]
+   */
+  //TODO validate this implementation
+  override def keyByJava[Key: ClassTag](keyExtractor: SerializableFunction[Out, Key]) : KeyedDataQuanta[Out, Key] = {
+    new KeyedDataQuanta[Out, Key](this, keyExtractor)
+  }
+
+  /**
+   * Feed this instance into a [[ReduceByOperator]].
+   *
+   * @param keyUdf  UDF to extract the grouping key from the data quanta
+   * @param udf     aggregation UDF for the [[ReduceByOperator]]
+   * @param udfLoad optional [[LoadProfileEstimator]] for the `udf`
+   * @return a new instance representing the [[ReduceByOperator]]'s output
+   */
+  override def reduceByKeyJava[Key: ClassTag](keyUdf: SerializableFunction[Out, Key],
+                                              udf: SerializableBinaryOperator[Out],
+                                              udfLoad: LoadProfileEstimator = null)
+  : DataQuantaHackit[Out] = {
+    val reduceByOperator = new ReduceByOperator(
+      new TransformationDescriptor(keyUdf, basicDataUnitType[Out], basicDataUnitType[Key]),
+      new ReduceDescriptor(udf, groupedDataUnitType[Out], basicDataUnitType[Out], udfLoad)
+    )
+    this.connectTo(reduceByOperator, 0)
+    DataQuantaHackit.wrap[Out](reduceByOperator)
+  }
+
+  /**
+   * Feed this instance into a [[MaterializedGroupByOperator]].
+   *
+   * @param keyUdf     UDF to extract the grouping key from the data quanta
+   * @param keyUdfLoad optional [[LoadProfileEstimator]] for the `keyUdf`
+   * @return a new instance representing the [[MaterializedGroupByOperator]]'s output
+   */
+  override def groupByKeyJava[Key: ClassTag](keyUdf: SerializableFunction[Out, Key],
+                                             keyUdfLoad: LoadProfileEstimator = null): DataQuantaHackit[java.lang.Iterable[Out]] = {
+    val groupByOperator = new MaterializedGroupByOperator(
+      new TransformationDescriptor(keyUdf, basicDataUnitType[Out], basicDataUnitType[Key], keyUdfLoad),
+      dataSetType[Out],
+      groupedDataSetType[Out]
+    )
+    this.connectTo(groupByOperator, 0)
+    DataQuantaHackit.wrap[java.lang.Iterable[Out]](groupByOperator)
+  }
+
+  /**
+   * Feed this instance into a [[GlobalReduceOperator]].
+   *
+   * @param udf     aggregation UDF for the [[GlobalReduceOperator]]
+   * @param udfLoad optional [[LoadProfileEstimator]] for the `udf`
+   * @return a new instance representing the [[GlobalReduceOperator]]'s output
+   */
+  override def reduceJava(udf: SerializableBinaryOperator[Out],
+                          udfLoad: LoadProfileEstimator = null): DataQuantaHackit[Out] = {
+    val globalReduceOperator = new GlobalReduceOperator(
+      new ReduceDescriptor(udf, groupedDataUnitType[Out], basicDataUnitType[Out], udfLoad)
+    )
+    this.connectTo(globalReduceOperator, 0)
+    DataQuantaHackit.wrap[Out](globalReduceOperator)
+  }
+
+  /**
+   * Feeds this and a further instance into a [[JoinOperator]].
+   *
+   * @param thisKeyUdf UDF to extract keys from data quanta in this instance
+   * @param that       the other instance
+   * @param thatKeyUdf UDF to extract keys from data quanta from `that` instance
+   * @return a new instance representing the [[JoinOperator]]'s output
+   */
+  override def joinJava[ThatOut: ClassTag, Key: ClassTag](thisKeyUdf: SerializableFunction[Out, Key], that: DataQuanta[ThatOut], thatKeyUdf: SerializableFunction[ThatOut, Key]): DataQuantaHackit[WayangTuple2[Out, ThatOut]] = {
+    require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.")
+    val joinOperator = new JoinOperator(
+      new TransformationDescriptor(thisKeyUdf, basicDataUnitType[Out], basicDataUnitType[Key]),
+      new TransformationDescriptor(thatKeyUdf, basicDataUnitType[ThatOut], basicDataUnitType[Key])
+    )
+    this.connectTo(joinOperator, 0)
+    that.connectTo(joinOperator, 1)
+    DataQuantaHackit.wrap[WayangTuple2[Out, ThatOut]](joinOperator)
+  }
+
+  /**
+   * Feeds this and a further instance into a [[CoGroupOperator]].
+   *
+   * @param thisKeyUdf UDF to extract keys from data quanta in this instance
+   * @param that       the other instance
+   * @param thatKeyUdf UDF to extract keys from data quanta from `that` instance
+   * @return a new instance representing the [[CoGroupOperator]]'s output
+   */
+  override def coGroupJava[ThatOut: ClassTag, Key: ClassTag](thisKeyUdf: SerializableFunction[Out, Key], that: DataQuanta[ThatOut], thatKeyUdf: SerializableFunction[ThatOut, Key]): DataQuantaHackit[WayangTuple2[JavaIterable[Out], JavaIterable[ThatOut]]] = {
+    require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.")
+    val coGroupOperator = new CoGroupOperator(
+      new TransformationDescriptor(thisKeyUdf, basicDataUnitType[Out], basicDataUnitType[Key]),
+      new TransformationDescriptor(thatKeyUdf, basicDataUnitType[ThatOut], basicDataUnitType[Key])
+    )
+    this.connectTo(coGroupOperator, 0)
+    that.connectTo(coGroupOperator, 1)
+    DataQuantaHackit.wrap[WayangTuple2[java.lang.Iterable[Out], java.lang.Iterable[ThatOut]]](coGroupOperator)
+  }
+
+
+  /**
+   * Feeds this and a further instance into a [[SortOperator]].
+   *
+   * @param keyUdf UDF to extract key from data quanta in this instance
+   * @return a new instance representing the [[SortOperator]]'s output
+   */
+  override def sortJava[Key: ClassTag]
+  (keyUdf: SerializableFunction[Out, Key])
+  : DataQuantaHackit[Out] = {
+    val sortOperator = new SortOperator(new TransformationDescriptor(
+      keyUdf, basicDataUnitType[Out], basicDataUnitType[Key]))
+    this.connectTo(sortOperator, 0)
+    DataQuantaHackit.wrap[Out](sortOperator)
+  }
+
+
+  /**
+   * Feed this instance into a [[GlobalMaterializedGroupOperator]].
+   *
+   * @return a new instance representing the [[GlobalMaterializedGroupOperator]]'s output
+   */
+  override def group(): DataQuantaHackit[JavaIterable[Out]] = {
+    val groupOperator = new GlobalMaterializedGroupOperator(dataSetType[Out], groupedDataSetType[Out])
+    this.connectTo(groupOperator, 0)
+    DataQuantaHackit.wrap[JavaIterable[Out]](groupOperator)
+  }
+
+  /**
+   * Feed this instance and a further instance into a [[UnionAllOperator]].
+   *
+   * @param that the other instance to union with
+   * @return a new instance representing the [[UnionAllOperator]]'s output
+   */
+  override def union(that: DataQuanta[Out]): DataQuantaHackit[Out] = {
+    require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.")
+    val unionAllOperator = new UnionAllOperator(dataSetType[Out])
+    this.connectTo(unionAllOperator, 0)
+    that.connectTo(unionAllOperator, 1)
+    DataQuantaHackit.wrap[Out](unionAllOperator)
+  }
+
+  /**
+   * Feed this instance and a further instance into a [[IntersectOperator]].
+   *
+   * @param that the other instance to intersect with
+   * @return a new instance representing the [[IntersectOperator]]'s output
+   */
+  override def intersect(that: DataQuanta[Out]): DataQuantaHackit[Out] =  {
+    require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.")
+    val intersectOperator = new IntersectOperator(dataSetType[Out])
+    this.connectTo(intersectOperator, 0)
+    that.connectTo(intersectOperator, 1)
+    DataQuantaHackit.wrap[Out](intersectOperator)
+  }
+
+  /**
+   * Feeds this and a further instance into a [[CartesianOperator]].
+   *
+   * @param that the other instance
+   * @return a new instance representing the [[CartesianOperator]]'s output
+   */
+  override def cartesian[ThatOut: ClassTag](that: DataQuanta[ThatOut]): DataQuantaHackit[WayangTuple2[Out, ThatOut]] = {
+    require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.")
+    val cartesianOperator = new CartesianOperator(dataSetType[Out], dataSetType[ThatOut])
+    this.connectTo(cartesianOperator, 0)
+    that.connectTo(cartesianOperator, 1)
+    DataQuantaHackit.wrap[WayangTuple2[Out, ThatOut]](cartesianOperator)
+  }
+
+  /**
+   * Feeds this instance into a [[ZipWithIdOperator]].
+   *
+   * @return a new instance representing the [[ZipWithIdOperator]]'s output
+   */
+  override def zipWithId: DataQuantaHackit[WayangTuple2[lang.Long, Out]] = {
+    val zipWithIdOperator = new ZipWithIdOperator(dataSetType[Out])
+    this.connectTo(zipWithIdOperator, 0)
+    DataQuantaHackit.wrap[WayangTuple2[lang.Long, Out]](zipWithIdOperator)
+  }
+
+  /**
+   * Feeds this instance into a [[DistinctOperator]].
+   *
+   * @return a new instance representing the [[DistinctOperator]]'s output
+   */
+  override def distinct: DataQuantaHackit[Out] = {
+    val distinctOperator = new DistinctOperator(dataSetType[Out])
+    this.connectTo(distinctOperator, 0)
+    DataQuantaHackit.wrap[Out](distinctOperator)
+  }
+
+  /**
+   * Feeds this instance into a [[CountOperator]].
+   *
+   * @return a new instance representing the [[CountOperator]]'s output
+   */
+  override def count: DataQuantaHackit[lang.Long] = {
+    val countOperator = new CountOperator(dataSetType[Out])
+    this.connectTo(countOperator, 0)
+    DataQuantaHackit.wrap[lang.Long](countOperator)
+  }
+}
+
+object DataQuantaHackit extends DataQuantaCreator{
+
+  def wrap[T:ClassTag](operator: ElementaryOperator, outputIndex: Int = 0)(implicit planBuilder: PlanBuilder): DataQuantaHackit[T] = {
+    new DataQuantaHackit[T](operator, outputIndex)
+  }
+
+  def create[T](output: OutputSlot[T])(implicit planBuilder: PlanBuilder): DataQuantaHackit[_] =
+    new DataQuantaHackit(output.getOwner.asInstanceOf[ElementaryOperator], output.getIndex)(ClassTag(output.getType.getDataUnitType.getTypeClass), planBuilder)
+
+}
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/java/org/apache/wayang/api/JavaApiTest.java b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/java/org/apache/wayang/api/JavaApiTest.java
deleted file mode 100644
index aaf1f16..0000000
--- a/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/java/org/apache/wayang/api/JavaApiTest.java
+++ /dev/null
@@ -1,711 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.wayang.api;
-
-import org.apache.wayang.core.api.WayangContext;
-import org.apache.wayang.core.util.WayangCollections;
-import org.apache.wayang.java.Java;
-//import org.apache.wayang.spark.Spark;
-//import org.apache.wayang.sqlite3.Sqlite3;
-//import org.apache.wayang.sqlite3.operators.Sqlite3TableSource;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
-/**
- * Test suite for the Java API.
- */
-public class JavaApiTest {
-
-//    private Configuration sqlite3Configuration;
-//
-//    @Before
-//    public void setUp() throws SQLException, IOException {
-//        // Generate test data.
-//        this.sqlite3Configuration = new Configuration();
-//        File sqlite3dbFile = File.createTempFile("wayang-sqlite3", "db");
-//        sqlite3dbFile.deleteOnExit();
-//        this.sqlite3Configuration.setProperty("wayang.sqlite3.jdbc.url", "jdbc:sqlite:" + sqlite3dbFile.getAbsolutePath());
-//        try (Connection connection = Sqlite3.platform().createDatabaseDescriptor(this.sqlite3Configuration).createJdbcConnection()) {
-//            Statement statement = connection.createStatement();
-//            statement.addBatch("DROP TABLE IF EXISTS customer;");
-//            statement.addBatch("CREATE TABLE customer (name TEXT, age INT);");
-//            statement.addBatch("INSERT INTO customer VALUES ('John', 20)");
-//            statement.addBatch("INSERT INTO customer VALUES ('Timmy', 16)");
-//            statement.addBatch("INSERT INTO customer VALUES ('Evelyn', 35)");
-//            statement.executeBatch();
-//        }
-//    }
-
-    @Test
-    public void testMapReduce() {
-        WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
-        JavaPlanBuilder javaPlanBuilder = new JavaPlanBuilder(wayangContext);
-
-        List<Integer> inputCollection = Arrays.asList(0, 1, 2, 3, 4);
-        Collection<Integer> outputCollection = javaPlanBuilder
-                .loadCollection(inputCollection).withName("load numbers")
-                .map(i -> i * i).withName("square")
-                .reduce((a, b) -> a + b).withName("sum")
-                .collect();
-
-        Assert.assertEquals(WayangCollections.asSet(1 + 4 + 9 + 16), WayangCollections.asSet(outputCollection));
-    }
-
-//    @Test
-//    public void testMapReduceBy() {
-//        WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
-//        JavaPlanBuilder javaPlanBuilder = new JavaPlanBuilder(wayangContext);
-//
-//        List<Integer> inputCollection = Arrays.asList(0, 1, 2, 3, 4);
-//        Collection<Integer> outputCollection = javaPlanBuilder
-//                .loadCollection(inputCollection).withName("load numbers")
-//                .map(i -> i * i).withName("square")
-//                .reduceByKey(i -> i & 1, (a, b) -> a + b).withName("sum")
-//                .collect();
-//
-//        Assert.assertEquals(WayangCollections.asSet(4 + 16, 1 + 9), WayangCollections.asSet(outputCollection));
-//    }
-//
-//    @Test
-//    public void testBroadcast2() {
-//        WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
-//        JavaPlanBuilder javaPlanBuilder = new JavaPlanBuilder(wayangContext);
-//
-//        List<Integer> inputCollection = Arrays.asList(0, 1, 2, 3, 4);
-//        List<Integer> offsetCollection = Collections.singletonList(-2);
-//
-//        LoadCollectionDataQuantaBuilder<Integer> offsetDataQuanta = javaPlanBuilder
-//                .loadCollection(offsetCollection)
-//                .withName("load offset");
-//
-//        Collection<Integer> outputCollection = javaPlanBuilder
-//                .loadCollection(inputCollection).withName("load numbers")
-//                .map(new AddOffset("offset")).withName("add offset").withBroadcast(offsetDataQuanta, "offset")
-//                .collect();
-//
-//        Assert.assertEquals(WayangCollections.asSet(-2, -1, 0, 1, 2), WayangCollections.asSet(outputCollection));
-//    }
-//
-//    @Test
-//    public void testCustomOperatorShortCut() {
-//        // Set up WayangContext.
-//        WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
-//
-//        final List<Integer> inputValues = WayangArrays.asList(0, 1, 2, 3);
-//
-//        // Build and execute a Wayang plan.
-//        final Collection<Integer> outputValues = new JavaPlanBuilder(wayangContext)
-//                .loadCollection(inputValues).withName("Load input values")
-//                .<Integer>customOperator(new JavaMapOperator<>(
-//                        DataSetType.createDefault(Integer.class),
-//                        DataSetType.createDefault(Integer.class),
-//                        new TransformationDescriptor<>(
-//                                i -> i + 2,
-//                                Integer.class, Integer.class
-//                        )
-//                )).withName("Add 2")
-//                .collect();
-//
-//        // Check the outcome.
-//        final List<Integer> expectedOutputValues = WayangArrays.asList(2, 3, 4, 5);
-//        Assert.assertEquals(WayangCollections.asSet(expectedOutputValues), WayangCollections.asSet(outputValues));
-//    }
-//
-//    @Test
-//    public void testWordCount() {
-//        // Set up WayangContext.
-//        WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
-//
-//        final List<String> inputValues = Arrays.asList("Big data is big.", "Is data big data?");
-//
-//        // Build and execute a Wayang plan.
-//        final Collection<Tuple2<String, Integer>> outputValues = new JavaPlanBuilder(wayangContext)
-//                .loadCollection(inputValues).withName("Load input values")
-//                .flatMap(line -> Arrays.asList(line.split("\\s+"))).withName("Split words")
-//                .map(token -> token.replaceAll("\\W+", "").toLowerCase()).withName("To lower case")
-//                .map(word -> new Tuple2<>(word, 1)).withName("Attach counter")
-//                .reduceByKey(Tuple2::getField0, (t1, t2) -> new Tuple2<>(t1.field0, t1.field1 + t2.field1)).withName("Sum counters")
-//                .collect();
-//
-//        // Check the outcome.
-//        final Set<Tuple2<String, Integer>> expectedOutputValues = WayangCollections.asSet(
-//                new Tuple2<>("big", 3),
-//                new Tuple2<>("is", 2),
-//                new Tuple2<>("data", 3)
-//        );
-//        Assert.assertEquals(WayangCollections.asSet(expectedOutputValues), WayangCollections.asSet(outputValues));
-//    }
-//
-//    @Test
-//    public void testWordCountOnSparkAndJava() {
-//        // Set up WayangContext.
-//        WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()).with(Spark.basicPlugin());
-//
-//        final List<String> inputValues = Arrays.asList("Big data is big.", "Is data big data?");
-//
-//        // Build and execute a Wayang plan.
-//        final Collection<Tuple2<String, Integer>> outputValues = new JavaPlanBuilder(wayangContext)
-//                .loadCollection(inputValues).withName("Load input values")
-//                .flatMap(line -> Arrays.asList(line.split("\\s+"))).withName("Split words")
-//                .map(token -> token.replaceAll("\\W+", "").toLowerCase()).withName("To lower case")
-//                .map(word -> new Tuple2<>(word, 1)).withName("Attach counter")
-//                .reduceByKey(Tuple2::getField0, (t1, t2) -> new Tuple2<>(t1.field0, t1.field1 + t2.field1)).withName("Sum counters")
-//                .collect();
-//
-//        // Check the outcome.
-//        final Set<Tuple2<String, Integer>> expectedOutputValues = WayangCollections.asSet(
-//                new Tuple2<>("big", 3),
-//                new Tuple2<>("is", 2),
-//                new Tuple2<>("data", 3)
-//        );
-//        Assert.assertEquals(WayangCollections.asSet(expectedOutputValues), WayangCollections.asSet(outputValues));
-//    }
-//
-//    @Test
-//    public void testSample() {
-//        // Set up WayangContext.
-//        WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()).with(Spark.basicPlugin());
-//
-//        // Create some input values.
-//        final List<Integer> inputValues = WayangArrays.asList(WayangArrays.range(100));
-//
-//        // Build and execute a Wayang plan.
-//        final Collection<Integer> outputValues = new JavaPlanBuilder(wayangContext)
-//                .loadCollection(inputValues).withName("Load input values")
-//                .sample(10).withName("Sample")
-//                .collect();
-//
-//        // Check the outcome.
-//        Assert.assertEquals(10, outputValues.size());
-//        Assert.assertEquals(10, WayangCollections.asSet(outputValues).size());
-//
-//    }
-//
-//    @Test
-//    public void testDoWhile() {
-//        // Set up WayangContext.
-//        WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
-//
-//        // Generate test data.
-//        final List<Integer> inputValues = WayangArrays.asList(1, 2);
-//
-//        // Build and execute a word count WayangPlan.
-//
-//        final Collection<Integer> outputValues = new JavaPlanBuilder(wayangContext)
-//                .loadCollection(inputValues).withName("Load input values")
-//                .doWhile(
-//                        values -> values.stream().mapToInt(i -> i).sum() > 100,
-//                        start -> {
-//                            final GlobalReduceDataQuantaBuilder<Integer> sum =
-//                                    start.reduce((a, b) -> a + b).withName("sum");
-//                            return new Tuple<>(
-//                                    start.union(sum).withName("Old+new"),
-//                                    sum
-//                            );
-//                        }
-//                ).withConditionClass(Integer.class).withName("While <= 100")
-//                .collect();
-//
-//        Set<Integer> expectedValues = WayangCollections.asSet(1, 2, 3, 6, 12, 24, 48, 96, 192);
-//        Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
-//    }
-//
-//    private static class AddOffset implements FunctionDescriptor.ExtendedSerializableFunction<Integer, Integer> {
-//
-//        private final String broadcastName;
-//
-//        private int offset;
-//
-//        public AddOffset(String broadcastName) {
-//            this.broadcastName = broadcastName;
-//        }
-//
-//        @Override
-//        public void open(ExecutionContext ctx) {
-//            this.offset = WayangCollections.getSingle(ctx.<Integer>getBroadcast(this.broadcastName));
-//        }
-//
-//        @Override
-//        public Integer apply(Integer input) {
-//            return input + this.offset;
-//        }
-//    }
-//
-//    @Test
-//    public void testRepeat() {
-//        // Set up WayangContext.
-//        WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
-//
-//        // Generate test data.
-//        final List<Integer> inputValues = WayangArrays.asList(1, 2);
-//
-//        // Build and execute a word count WayangPlan.
-//
-//        final Collection<Integer> outputValues = new JavaPlanBuilder(wayangContext)
-//                .loadCollection(inputValues).withName("Load input values")
-//                .repeat(3, start -> start
-//                        .reduce((a, b) -> a * b).withName("Multiply")
-//                        .flatMap(v -> Arrays.asList(v, v + 1)).withName("Duplicate").withOutputClass(Integer.class)
-//                ).withName("Repeat 3x")
-//                .collect();
-//
-//        Set<Integer> expectedValues = WayangCollections.asSet(42, 43);
-//        Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
-//    }
-//
-//    private static class SelectWords implements PredicateDescriptor.ExtendedSerializablePredicate<String> {
-//
-//        private final String broadcastName;
-//
-//        private Collection<Character> selectors;
-//
-//        public SelectWords(String broadcastName) {
-//            this.broadcastName = broadcastName;
-//        }
-//
-//        @Override
-//        public void open(ExecutionContext ctx) {
-//            this.selectors = ctx.getBroadcast(this.broadcastName);
-//        }
-//
-//        @Override
-//        public boolean test(String word) {
-//            return this.selectors.stream().anyMatch(c -> word.indexOf(c) >= 0);
-//        }
-//    }
-//
-//    @Test
-//    public void testBroadcast() {
-//        // Set up WayangContext.
-//        WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
-//        JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
-//
-//        // Generate test data.
-//        final List<String> inputValues = Arrays.asList("Hello", "World", "Hi", "Mars");
-//        final List<Character> selectors = Arrays.asList('o', 'l');
-//
-//        // Execute the job.
-//        final DataQuantaBuilder<?, Character> selectorsDataSet = builder.loadCollection(selectors).withName("Load selectors");
-//        final Collection<String> outputValues = builder
-//                .loadCollection(inputValues).withName("Load input values")
-//                .filter(new SelectWords("selectors")).withName("Filter words")
-//                .withBroadcast(selectorsDataSet, "selectors")
-//                .collect();
-//
-//        // Verify the outcome.
-//        Set<String> expectedValues = WayangCollections.asSet("Hello", "World");
-//        Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
-//    }
-//
-//    @Test
-//    public void testGroupBy() {
-//        // Set up WayangContext.
-//        WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
-//        JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
-//
-//        // Generate test data.
-//        final List<Integer> inputValues = Arrays.asList(1, 2, 3, 4, 5, 7, 8, 9, 10);
-//
-//        // Execute the job.
-//        final Collection<Double> outputValues = builder
-//                .loadCollection(inputValues).withName("Load input values")
-//                .groupByKey(i -> i % 2).withName("group odd and even")
-//                .map(group -> {
-//                    List<Integer> sortedGroup = StreamSupport.stream(group.spliterator(), false)
-//                            .sorted()
-//                            .collect(Collectors.toList());
-//                    int sizeDivTwo = sortedGroup.size() / 2;
-//                    return sortedGroup.size() % 2 == 0 ?
-//                            (sortedGroup.get(sizeDivTwo - 1) + sortedGroup.get(sizeDivTwo)) / 2d :
-//                            (double) sortedGroup.get(sizeDivTwo);
-//                })
-//                .collect();
-//
-//        // Verify the outcome.
-//        Set<Double> expectedValues = WayangCollections.asSet(5d, 6d);
-//        Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
-//    }
-//
-//    @Test
-//    public void testJoin() {
-//        // Set up WayangContext.
-//        WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
-//        JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
-//
-//        // Generate test data.
-//        final List<Tuple2<String, Integer>> inputValues1 = Arrays.asList(
-//                new Tuple2<>("Water", 0),
-//                new Tuple2<>("Tonic", 5),
-//                new Tuple2<>("Juice", 10)
-//        );
-//        final List<Tuple2<String, String>> inputValues2 = Arrays.asList(
-//                new Tuple2<>("Apple juice", "Juice"),
-//                new Tuple2<>("Tap water", "Water"),
-//                new Tuple2<>("Orange juice", "Juice")
-//        );
-//
-//        // Execute the job.
-//        final LoadCollectionDataQuantaBuilder<Tuple2<String, Integer>> dataQuanta1 = builder.loadCollection(inputValues1);
-//        final LoadCollectionDataQuantaBuilder<Tuple2<String, String>> dataQuanta2 = builder.loadCollection(inputValues2);
-//        final Collection<Tuple2<String, Integer>> outputValues = dataQuanta1
-//                .join(Tuple2::getField0, dataQuanta2, Tuple2::getField1)
-//                .map(joinTuple -> new Tuple2<>(joinTuple.getField1().getField0(), joinTuple.getField0().getField1()))
-//                .collect();
-//
-//        // Verify the outcome.
-//        Set<Tuple2<String, Integer>> expectedValues = WayangCollections.asSet(
-//                new Tuple2<>("Apple juice", 10),
-//                new Tuple2<>("Orange juice", 10),
-//                new Tuple2<>("Tap water", 0)
-//        );
-//        Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
-//    }
-//
-//    @Test
-//    public void testJoinAndAssemble() {
-//        // Set up WayangContext.
-//        WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
-//        JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
-//
-//        // Generate test data.
-//        final List<Tuple2<String, Integer>> inputValues1 = Arrays.asList(
-//                new Tuple2<>("Water", 0),
-//                new Tuple2<>("Tonic", 5),
-//                new Tuple2<>("Juice", 10)
-//        );
-//        final List<Tuple2<String, String>> inputValues2 = Arrays.asList(
-//                new Tuple2<>("Apple juice", "Juice"),
-//                new Tuple2<>("Tap water", "Water"),
-//                new Tuple2<>("Orange juice", "Juice")
-//        );
-//
-//        // Execute the job.
-//        final LoadCollectionDataQuantaBuilder<Tuple2<String, Integer>> dataQuanta1 = builder.loadCollection(inputValues1);
-//        final LoadCollectionDataQuantaBuilder<Tuple2<String, String>> dataQuanta2 = builder.loadCollection(inputValues2);
-//        final Collection<Tuple2<String, Integer>> outputValues = dataQuanta1.keyBy(Tuple2::getField0)
-//                .join(dataQuanta2.keyBy(Tuple2::getField1))
-//                .assemble((val1, val2) -> new Tuple2<>(val2.getField0(), val1.getField1()))
-//                .collect();
-//
-//        // Verify the outcome.
-//        Set<Tuple2<String, Integer>> expectedValues = WayangCollections.asSet(
-//                new Tuple2<>("Apple juice", 10),
-//                new Tuple2<>("Orange juice", 10),
-//                new Tuple2<>("Tap water", 0)
-//        );
-//        Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
-//    }
-//
-//    @Test
-//    public void testCoGroup() {
-//        // Set up WayangContext.
-//        WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()).with(Spark.basicPlugin());
-//        JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
-//
-//        // Generate test data.
-//        final List<Tuple2<String, Integer>> inputValues1 = Arrays.asList(
-//                new Tuple2<>("Water", 0),
-//                new Tuple2<>("Cola", 5),
-//                new Tuple2<>("Juice", 10)
-//        );
-//        final List<Tuple2<String, String>> inputValues2 = Arrays.asList(
-//                new Tuple2<>("Apple juice", "Juice"),
-//                new Tuple2<>("Tap water", "Water"),
-//                new Tuple2<>("Orange juice", "Juice")
-//        );
-//
-//        // Execute the job.
-//        final LoadCollectionDataQuantaBuilder<Tuple2<String, Integer>> dataQuanta1 = builder.loadCollection(inputValues1);
-//        final LoadCollectionDataQuantaBuilder<Tuple2<String, String>> dataQuanta2 = builder.loadCollection(inputValues2);
-//        final Collection<Tuple2<Set<Tuple2<String, Integer>>, Set<Tuple2<String, String>>>> outputValues = dataQuanta1
-//                .coGroup(Tuple2::getField0, dataQuanta2, Tuple2::getField1)
-//                .map(joinTuple -> new Tuple2<>(
-//                        WayangCollections.asSet(joinTuple.getField0()),
-//                        WayangCollections.asSet(joinTuple.getField1())
-//                ))
-//                .collect();
-//
-//        // Verify the outcome.
-//        Set<Tuple2<Set<Tuple2<String, Integer>>, Set<Tuple2<String, String>>>> expectedValues = WayangCollections.asSet(
-//                new Tuple2<>(
-//                        WayangCollections.asSet(new Tuple2<>("Water", 0)),
-//                        WayangCollections.asSet(new Tuple2<>("Tap water", "Water"))
-//                ),
-//                new Tuple2<>(
-//                        WayangCollections.asSet(new Tuple2<>("Cola", 5)),
-//                        WayangCollections.asSet()
-//                ), new Tuple2<>(
-//                        WayangCollections.asSet(new Tuple2<>("Juice", 10)),
-//                        WayangCollections.asSet(new Tuple2<>("Apple juice", "Juice"), new Tuple2<>("Orange juice", "Juice"))
-//                )
-//        );
-//        Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
-//    }
-//
-//    @Test
-//    public void testCoGroupViaKeyBy() {
-//        // Set up WayangContext.
-//        WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()).with(Spark.basicPlugin());
-//        JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
-//
-//        // Generate test data.
-//        final List<Tuple2<String, Integer>> inputValues1 = Arrays.asList(
-//                new Tuple2<>("Water", 0),
-//                new Tuple2<>("Cola", 5),
-//                new Tuple2<>("Juice", 10)
-//        );
-//        final List<Tuple2<String, String>> inputValues2 = Arrays.asList(
-//                new Tuple2<>("Apple juice", "Juice"),
-//                new Tuple2<>("Tap water", "Water"),
-//                new Tuple2<>("Orange juice", "Juice")
-//        );
-//
-//        // Execute the job.
-//        final LoadCollectionDataQuantaBuilder<Tuple2<String, Integer>> dataQuanta1 = builder.loadCollection(inputValues1);
-//        final LoadCollectionDataQuantaBuilder<Tuple2<String, String>> dataQuanta2 = builder.loadCollection(inputValues2);
-//        final Collection<Tuple2<Set<Tuple2<String, Integer>>, Set<Tuple2<String, String>>>> outputValues =
-//                dataQuanta1.keyBy(Tuple2::getField0)
-//                        .coGroup(dataQuanta2.keyBy(Tuple2::getField1))
-//                        .map(joinTuple -> new Tuple2<>(
-//                                WayangCollections.asSet(joinTuple.getField0()),
-//                                WayangCollections.asSet(joinTuple.getField1())
-//                        ))
-//                        .collect();
-//
-//        // Verify the outcome.
-//        Set<Tuple2<Set<Tuple2<String, Integer>>, Set<Tuple2<String, String>>>> expectedValues = WayangCollections.asSet(
-//                new Tuple2<>(
-//                        WayangCollections.asSet(new Tuple2<>("Water", 0)),
-//                        WayangCollections.asSet(new Tuple2<>("Tap water", "Water"))
-//                ),
-//                new Tuple2<>(
-//                        WayangCollections.asSet(new Tuple2<>("Cola", 5)),
-//                        WayangCollections.asSet()
-//                ), new Tuple2<>(
-//                        WayangCollections.asSet(new Tuple2<>("Juice", 10)),
-//                        WayangCollections.asSet(new Tuple2<>("Apple juice", "Juice"), new Tuple2<>("Orange juice", "Juice"))
-//                )
-//        );
-//        Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
-//    }
-//
-//    @Test
-//    public void testIntersect() {
-//        // Set up WayangContext.
-//        WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
-//        JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
-//
-//        // Generate test data.
-//        final List<Integer> inputValues1 = Arrays.asList(1, 2, 3, 4, 5, 7, 8, 9, 10);
-//        final List<Integer> inputValues2 = Arrays.asList(0, 2, 3, 3, 4, 5, 7, 8, 9, 11);
-//
-//        // Execute the job.
-//        final LoadCollectionDataQuantaBuilder<Integer> dataQuanta1 = builder.loadCollection(inputValues1);
-//        final LoadCollectionDataQuantaBuilder<Integer> dataQuanta2 = builder.loadCollection(inputValues2);
-//        final Collection<Integer> outputValues = dataQuanta1.intersect(dataQuanta2).collect();
-//
-//        // Verify the outcome.
-//        Set<Integer> expectedValues = WayangCollections.asSet(2, 3, 4, 5, 7, 8, 9);
-//        Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
-//    }
-//
-//    @Test
-//    public void testSort() {
-//        // Set up WayangContext.
-//        WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
-//        JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
-//
-//        // Generate test data.
-//        final List<Integer> inputValues1 = Arrays.asList(3, 4, 5, 2, 1);
-//
-//        // Execute the job.
-//        final LoadCollectionDataQuantaBuilder<Integer> dataQuanta1 = builder.loadCollection(inputValues1);
-//        final Collection<Integer> outputValues = dataQuanta1.sort(r -> r).collect();
-//
-//        // Verify the outcome.
-//        List<Integer> expectedValues = Arrays.asList(1, 2, 3, 4, 5);
-//        Assert.assertEquals(expectedValues, WayangCollections.asList(outputValues));
-//    }
-//
-//
-//    @Test
-//    public void testPageRank() {
-//        // Set up WayangContext.
-//        WayangContext wayangContext = new WayangContext()
-//                .with(Java.basicPlugin())
-//                .with(Java.graphPlugin());
-//        JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
-//
-//        // Create a test graph.
-//        Collection<Tuple2<Long, Long>> edges = Arrays.asList(
-//                new Tuple2<>(0L, 1L),
-//                new Tuple2<>(0L, 2L),
-//                new Tuple2<>(0L, 3L),
-//                new Tuple2<>(1L, 0L),
-//                new Tuple2<>(2L, 1L),
-//                new Tuple2<>(3L, 2L),
-//                new Tuple2<>(3L, 1L)
-//        );
-//
-//        // Execute the job.
-//        Collection<Tuple2<Long, Float>> pageRanks = builder.loadCollection(edges).asEdges()
-//                .pageRank(20)
-//                .collect();
-//        List<Tuple2<Long, Float>> sortedPageRanks = new ArrayList<>(pageRanks);
-//        sortedPageRanks.sort((pr1, pr2) -> Float.compare(pr2.field1, pr1.field1));
-//
-//        System.out.println(sortedPageRanks);
-//        Assert.assertEquals(1L, sortedPageRanks.get(0).field0.longValue());
-//        Assert.assertEquals(0L, sortedPageRanks.get(1).field0.longValue());
-//        Assert.assertEquals(2L, sortedPageRanks.get(2).field0.longValue());
-//        Assert.assertEquals(3L, sortedPageRanks.get(3).field0.longValue());
-//    }
-//
-//    @Test
-//    public void testMapPartitions() {
-//        WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
-//        JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
-//
-//        // Generate test data.
-//        List<Integer> inputValues = WayangArrays.asList(0, 1, 2, 3, 4, 6, 8);
-//
-//        // Execute the job.
-//        Collection<Tuple2<String, Integer>> outputValues = builder.loadCollection(inputValues)
-//                .mapPartitions(partition -> {
-//                    int numEvens = 0, numOdds = 0;
-//                    for (Integer value : partition) {
-//                        if ((value & 1) == 0) numEvens++;
-//                        else numOdds++;
-//                    }
-//                    return Arrays.asList(
-//                            new Tuple2<>("odd", numOdds),
-//                            new Tuple2<>("even", numEvens)
-//                    );
-//                })
-//                .reduceByKey(Tuple2::getField0, (t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1()))
-//                .collect();
-//
-//        // Check the output.
-//        Set<Tuple2<String, Integer>> expectedOutput = WayangCollections.asSet(
-//                new Tuple2<>("even", 5), new Tuple2<>("odd", 2)
-//        );
-//        Assert.assertEquals(expectedOutput, WayangCollections.asSet(outputValues));
-//    }
-//
-//    @Test
-//    public void testZipWithId() {
-//        WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
-//        JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
-//
-//        // Generate test data.
-//        List<Integer> inputValues = new ArrayList<>(42 * 100);
-//        for (int i = 0; i < 100; i++) {
-//            for (int j = 0; j < 42; j++) {
-//                inputValues.add(i);
-//            }
-//        }
-//
-//        // Execute the job.
-//        Collection<Tuple2<Integer, Integer>> outputValues = builder.loadCollection(inputValues)
-//                .zipWithId()
-//                .groupByKey(Tuple2::getField1)
-//                .map(group -> {
-//                    int distinctIds = (int) StreamSupport.stream(group.spliterator(), false)
-//                            .map(Tuple2::getField0)
-//                            .distinct()
-//                            .count();
-//                    return new Tuple2<>(distinctIds, 1);
-//                })
-//                .reduceByKey(Tuple2::getField0, (t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1()))
-//                .collect();
-//
-//        // Check the output.
-//        Set<Tuple2<Integer, Integer>> expectedOutput = Collections.singleton(new Tuple2<>(42, 100));
-//        Assert.assertEquals(expectedOutput, WayangCollections.asSet(outputValues));
-//    }
-//
-//    @Test
-//    public void testWriteTextFile() throws IOException, URISyntaxException {
-//        WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
-//        JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
-//
-//        // Generate test data.
-//        List<Double> inputValues = Arrays.asList(0d, 1 / 3d, 2 / 3d, 1d, 4 / 3d, 5 / 3d);
-//
-//        // Execute the job.
-//        File tempDir = LocalFileSystem.findTempDir();
-//        String targetUrl = LocalFileSystem.toURL(new File(tempDir, "testWriteTextFile.txt"));
-//
-//        builder
-//                .loadCollection(inputValues)
-//                .writeTextFile(targetUrl, d -> String.format("%.2f", d), "testWriteTextFile()");
-//
-//        // Check the output.
-//        Set<String> actualLines = Files.lines(Paths.get(new URI(targetUrl))).collect(Collectors.toSet());
-//        Set<String> expectedLines = inputValues.stream().map(d -> String.format("%.2f", d)).collect(Collectors.toSet());
-//        Assert.assertEquals(expectedLines, actualLines);
-//    }
-//
-//    @Test
-//    public void testSqlOnJava() throws IOException, SQLException {
-//        // Execute job.
-//        final WayangContext wayangCtx = new WayangContext(this.sqlite3Configuration)
-//                .with(Java.basicPlugin())
-//                .with(Sqlite3.plugin());
-//        JavaPlanBuilder builder = new JavaPlanBuilder(wayangCtx, "testSqlOnJava()");
-//        final Collection<String> outputValues = builder
-//                .readTable(new Sqlite3TableSource("customer", "name", "age"))
-//                .filter(r -> (Integer) r.getField(1) >= 18).withSqlUdf("age >= 18").withTargetPlatform(Java.platform())
-//                .asRecords().projectRecords(new String[]{"name"})
-//                .map(record -> (String) record.getField(0))
-//                .collect();
-//
-//        // Test the outcome.
-//        Assert.assertEquals(
-//                WayangCollections.asSet("John", "Evelyn"),
-//                WayangCollections.asSet(outputValues)
-//        );
-//    }
-//
-//    @Test
-//    public void testSqlOnSqlite3() throws IOException, SQLException {
-//        // Execute job.
-//        final WayangContext wayangCtx = new WayangContext(this.sqlite3Configuration)
-//                .with(Java.basicPlugin())
-//                .with(Sqlite3.plugin());
-//        JavaPlanBuilder builder = new JavaPlanBuilder(wayangCtx, "testSqlOnSqlite3()");
-//        final Collection<String> outputValues = builder
-//                .readTable(new Sqlite3TableSource("customer", "name", "age"))
-//                .filter(r -> (Integer) r.getField(1) >= 18).withSqlUdf("age >= 18")
-//                .asRecords().projectRecords(new String[]{"name"}).withTargetPlatform(Sqlite3.platform())
-//                .map(record -> (String) record.getField(0))
-//                .collect();
-//
-//        // Test the outcome.
-//        Assert.assertEquals(
-//                WayangCollections.asSet("John", "Evelyn"),
-//                WayangCollections.asSet(outputValues)
-//        );
-//    }
-
-}
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/scala/org/apache/wayang/api/ApiTest.scala b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/scala/org/apache/wayang/api/ApiTest.scala
deleted file mode 100644
index ed1a7ac..0000000
--- a/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/scala/org/apache/wayang/api/ApiTest.scala
+++ /dev/null
@@ -1,575 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.wayang.api
-
-import org.apache.wayang.basic.WayangBasics
-import org.apache.wayang.core.api.{Configuration, WayangContext}
-import org.apache.wayang.core.function.FunctionDescriptor.ExtendedSerializablePredicate
-import org.apache.wayang.core.function.{ExecutionContext, TransformationDescriptor}
-import org.apache.wayang.core.util.fs.LocalFileSystem
-import org.apache.wayang.java.Java
-import org.junit.{Assert, Test}
-
-import java.io.File
-import java.net.URI
-import java.nio.file.{Files, Paths}
-import java.sql.{Connection, Statement}
-import java.util.function.Consumer
-
-/**
-  * Tests the Wayang API.
-  */
-class ApiTest {
-
-  @Test
-  def testReadMapCollect(): Unit = {
-    // Set up WayangContext.
-    val wayangContext = new WayangContext()
-                         .withPlugin(Java.basicPlugin)
-//                         .withPlugin(Spark.basicPlugin)
-    // Generate some test data.
-    val inputValues = (for (i <- 1 to 10) yield i).toArray
-
-    // Build and execute a Wayang plan.
-    val outputValues = wayangContext
-      .loadCollection(inputValues).withName("Load input values")
-      .map(_ + 2).withName("Add 2")
-      .collect()
-
-    // Check the outcome.
-    val expectedOutputValues = inputValues.map(_ + 2)
-    Assert.assertArrayEquals(expectedOutputValues, outputValues.toArray)
-  }
-
-//  @Test
-//  def testCustomOperator(): Unit = {
-//    // Set up WayangContext.
-//    val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
-//
-//    // Generate some test data.
-//    val inputValues = (for (i <- 1 to 10) yield i).toArray
-//
-//    // Build and execute a Wayang plan.
-//    val inputDataSet = wayang.loadCollection(inputValues).withName("Load input values")
-//
-//    // Add the custom operator.
-//    val IndexedSeq(addedValues) = wayang.customOperator(new JavaMapOperator(
-//      dataSetType[Int],
-//      dataSetType[Int],
-//      new TransformationDescriptor(
-//        toSerializableFunction[Int, Int](_ + 2),
-//        basicDataUnitType[Int], basicDataUnitType[Int]
-//      )
-//    ), inputDataSet)
-//    addedValues.withName("Add 2")
-//
-//    // Collect the result.
-//    val outputValues = addedValues.asInstanceOf[DataQuanta[Int]].collect()
-//
-//    // Check the outcome.
-//    val expectedOutputValues = inputValues.map(_ + 2)
-//    Assert.assertArrayEquals(expectedOutputValues, outputValues.toArray)
-//  }
-//
-//  @Test
-//  def testCustomOperatorShortCut(): Unit = {
-//    // Set up WayangContext.
-//    val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
-//
-//    // Generate some test data.
-//    val inputValues = (for (i <- 1 to 10) yield i).toArray
-//
-//    // Build and execute a Wayang plan.
-//    val outputValues = wayang
-//      .loadCollection(inputValues).withName("Load input values")
-//      .customOperator[Int](new JavaMapOperator(
-//      dataSetType[Int],
-//      dataSetType[Int],
-//      new TransformationDescriptor(
-//        toSerializableFunction[Int, Int](_ + 2),
-//        basicDataUnitType[Int], basicDataUnitType[Int]
-//      )
-//    )).withName("Add 2")
-//      .collect()
-//
-//    // Check the outcome.
-//    val expectedOutputValues = inputValues.map(_ + 2)
-//    Assert.assertArrayEquals(expectedOutputValues, outputValues.toArray)
-//  }
-//
-//  @Test
-//  def testWordCount(): Unit = {
-//    // Set up WayangContext.
-//    val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
-//
-//    // Generate some test data.
-//    val inputValues = Array("Big data is big.", "Is data big data?")
-//
-//    // Build and execute a word count WayangPlan.
-//    val wordCounts = wayang
-//      .loadCollection(inputValues).withName("Load input values")
-//      .flatMap(_.split("\\s+")).withName("Split words")
-//      .map(_.replaceAll("\\W+", "").toLowerCase).withName("To lowercase")
-//      .map((_, 1)).withName("Attach counter")
-//      .reduceByKey(_._1, (a, b) => (a._1, a._2 + b._2)).withName("Sum counters")
-//      .collect().toSet
-//
-//    val expectedWordCounts = Set(("big", 3), ("is", 2), ("data", 3))
-//
-//    Assert.assertEquals(expectedWordCounts, wordCounts)
-//  }
-//
-//  @Test
-//  def testWordCountOnSparkAndJava(): Unit = {
-//    // Set up WayangContext.
-//    val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
-//
-//    // Generate some test data.
-//    val inputValues = Array("Big data is big.", "Is data big data?")
-//
-//    // Build and execute a word count WayangPlan.
-//    val wordCounts = wayang
-//      .loadCollection(inputValues).withName("Load input values").withTargetPlatforms(Java.platform)
-//      .flatMap(_.split("\\s+")).withName("Split words").withTargetPlatforms(Java.platform)
-//      .map(_.replaceAll("\\W+", "").toLowerCase).withName("To lowercase").withTargetPlatforms(Spark.platform)
-//      .map((_, 1)).withName("Attach counter").withTargetPlatforms(Spark.platform)
-//      .reduceByKey(_._1, (a, b) => (a._1, a._2 + b._2)).withName("Sum counters").withTargetPlatforms(Spark.platform)
-//      .collect().toSet
-//
-//    val expectedWordCounts = Set(("big", 3), ("is", 2), ("data", 3))
-//
-//    Assert.assertEquals(expectedWordCounts, wordCounts)
-//  }
-//
-//  @Test
-//  def testSample(): Unit = {
-//    // Set up WayangContext.
-//    val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
-//
-//    // Generate some test data.
-//    val inputValues = for (i <- 0 until 100) yield i
-//
-//    // Build and execute the WayangPlan.
-//    val sample = wayang
-//      .loadCollection(inputValues)
-//      .sample(10)
-//      .collect()
-//
-//    // Check the result.
-//    Assert.assertEquals(10, sample.size)
-//    Assert.assertEquals(10, sample.toSet.size)
-//  }
-//
-//  @Test
-//  def testDoWhile(): Unit = {
-//    // Set up WayangContext.
-//    val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
-//
-//    // Generate some test data.
-//    val inputValues = Array(1, 2)
-//
-//    // Build and execute a word count WayangPlan.
-//
-//    val values = wayang
-//      .loadCollection(inputValues).withName("Load input values")
-//      .doWhile[Int](vals => vals.max > 100, {
-//      start =>
-//        val sum = start.reduce(_ + _).withName("Sum")
-//        (start.union(sum).withName("Old+new"), sum)
-//    }).withName("While <= 100")
-//      .collect().toSet
-//
-//    val expectedValues = Set(1, 2, 3, 6, 12, 24, 48, 96, 192)
-//    Assert.assertEquals(expectedValues, values)
-//  }
-//
-//  @Test
-//  def testRepeat(): Unit = {
-//    // Set up WayangContext.
-//    val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
-//
-//    // Generate some test data.
-//    val inputValues = Array(1, 2)
-//
-//    // Build and execute a word count WayangPlan.
-//
-//    val values = wayang
-//      .loadCollection(inputValues).withName("Load input values").withName(inputValues.mkString(","))
-//      .repeat(3,
-//        _.reduce(_ * _).withName("Multiply")
-//          .flatMap(v => Seq(v, v + 1)).withName("Duplicate")
-//      ).withName("Repeat 3x")
-//      .collect().toSet
-//
-//    // initial: 1,2 -> 1st: 2,3 -> 2nd: 6,7 => 3rd: 42,43
-//    val expectedValues = Set(42, 43)
-//    Assert.assertEquals(expectedValues, values)
-//  }
-//
-//  @Test
-//  def testBroadcast() = {
-//    // Set up WayangContext.
-//    val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
-//    val builder = new PlanBuilder(wayang)
-//
-//    // Generate some test data.
-//    val inputStrings = Array("Hello", "World", "Hi", "Mars")
-//    val selectors = Array('o', 'l')
-//
-//    val selectorsDataSet = builder.loadCollection(selectors).withName("Load selectors")
-//
-//    // Build and execute a word count WayangPlan.
-//    val values = builder
-//      .loadCollection(inputStrings).withName("Load input values")
-//      .filterJava(new ExtendedSerializablePredicate[String] {
-//
-//        var selectors: Iterable[Char] = _
-//
-//        override def open(ctx: ExecutionContext): Unit = {
-//          import scala.collection.JavaConversions._
-//          selectors = collectionAsScalaIterable(ctx.getBroadcast[Char]("selectors"))
-//        }
-//
-//        override def test(t: String): Boolean = selectors.forall(selector => t.contains(selector))
-//
-//      }).withName("Filter words")
-//      .withBroadcast(selectorsDataSet, "selectors")
-//      .collect().toSet
-//
-//    val expectedValues = Set("Hello", "World")
-//    Assert.assertEquals(expectedValues, values)
-//  }
-//
-//  @Test
-//  def testGroupBy() = {
-//    // Set up WayangContext.
-//    val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
-//
-//    val inputValues = Array(1, 2, 3, 4, 5, 7, 8, 9, 10)
-//
-//    val result = wayang
-//      .loadCollection(inputValues)
-//      .groupByKey(_ % 2).withName("group odd and even")
-//      .map {
-//        group =>
-//          import scala.collection.JavaConversions._
-//          val buffer = group.toBuffer
-//          buffer.sortBy(identity)
-//          if (buffer.size % 2 == 0) (buffer(buffer.size / 2 - 1) + buffer(buffer.size / 2)) / 2
-//          else buffer(buffer.size / 2)
-//      }.withName("median")
-//      .collect()
-//
-//    val expectedValues = Set(5, 6)
-//    Assert.assertEquals(expectedValues, result.toSet)
-//  }
-//
-//  @Test
-//  def testGroup() = {
-//    // Set up WayangContext.
-//    val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
-//
-//    val inputValues = Array(1, 2, 3, 4, 5, 7, 8, 9, 10)
-//
-//    val result = wayang
-//      .loadCollection(inputValues)
-//      .group()
-//      .map {
-//        group =>
-//          import scala.collection.JavaConversions._
-//          val buffer = group.toBuffer
-//          buffer.sortBy(int => int)
-//          if (buffer.size % 2 == 0) (buffer(buffer.size / 2) + buffer(buffer.size / 2 + 1)) / 2
-//          else buffer(buffer.size / 2)
-//      }
-//      .collect()
-//
-//    val expectedValues = Set(5)
-//    Assert.assertEquals(expectedValues, result.toSet)
-//  }
-//
-//  @Test
-//  def testJoin() = {
-//    // Set up WayangContext.
-//    val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
-//
-//    val inputValues1 = Array(("Water", 0), ("Tonic", 5), ("Juice", 10))
-//    val inputValues2 = Array(("Apple juice", "Juice"), ("Tap water", "Water"), ("Orange juice", "Juice"))
-//
-//    val builder = new PlanBuilder(wayang)
-//    val dataQuanta1 = builder.loadCollection(inputValues1)
-//    val dataQuanta2 = builder.loadCollection(inputValues2)
-//    val result = dataQuanta1
-//      .join[(String, String), String](_._1, dataQuanta2, _._2)
-//      .map(joinTuple => (joinTuple.field1._1, joinTuple.field0._2))
-//      .collect()
-//
-//    val expectedValues = Set(("Apple juice", 10), ("Tap water", 0), ("Orange juice", 10))
-//    Assert.assertEquals(expectedValues, result.toSet)
-//  }
-//
-//  @Test
-//  def testJoinAndAssemble() = {
-//    // Set up WayangContext.
-//    val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
-//
-//    val inputValues1 = Array(("Water", 0), ("Tonic", 5), ("Juice", 10))
-//    val inputValues2 = Array(("Apple juice", "Juice"), ("Tap water", "Water"), ("Orange juice", "Juice"))
-//
-//    val builder = new PlanBuilder(wayang)
-//    val dataQuanta1 = builder.loadCollection(inputValues1)
-//    val dataQuanta2 = builder.loadCollection(inputValues2)
-//    val result = dataQuanta1.keyBy(_._1).join(dataQuanta2.keyBy(_._2))
-//      .assemble((dq1, dq2) => (dq2._1, dq1._2))
-//      .collect()
-//
-//    val expectedValues = Set(("Apple juice", 10), ("Tap water", 0), ("Orange juice", 10))
-//    Assert.assertEquals(expectedValues, result.toSet)
-//  }
-//
-//
-//  @Test
-//  def testCoGroup() = {
-//    // Set up WayangContext.
-//    val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
-//
-//    val inputValues1 = Array(("Water", 0), ("Cola", 5), ("Juice", 10))
-//    val inputValues2 = Array(("Apple juice", "Juice"), ("Tap water", "Water"), ("Orange juice", "Juice"))
-//
-//    val builder = new PlanBuilder(wayang)
-//    val dataQuanta1 = builder.loadCollection(inputValues1)
-//    val dataQuanta2 = builder.loadCollection(inputValues2)
-//    val result = dataQuanta1
-//      .coGroup[(String, String), String](_._1, dataQuanta2, _._2)
-//      .collect()
-//
-//    import scala.collection.JavaConversions._
-//    val actualValues = result.map(coGroup => (coGroup.field0.toSet, coGroup.field1.toSet)).toSet
-//    val expectedValues = Set(
-//      (Set(("Water", 0)), Set(("Tap water", "Water"))),
-//      (Set(("Cola", 5)), Set()),
-//      (Set(("Juice", 10)), Set(("Apple juice", "Juice"), ("Orange juice", "Juice")))
-//    )
-//    Assert.assertEquals(expectedValues, actualValues)
-//  }
-//
-//  @Test
-//  def testIntersect() = {
-//    // Set up WayangContext.
-//    val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
-//
-//    val inputValues1 = Array(1, 2, 3, 4, 5, 7, 8, 9, 10)
-//    val inputValues2 = Array(0, 2, 3, 3, 4, 5, 7, 8, 9, 11)
-//
-//    val builder = new PlanBuilder(wayang)
-//    val dataQuanta1 = builder.loadCollection(inputValues1)
-//    val dataQuanta2 = builder.loadCollection(inputValues2)
-//    val result = dataQuanta1
-//      .intersect(dataQuanta2)
-//      .collect()
-//
-//    val expectedValues = Set(2, 3, 4, 5, 7, 8, 9)
-//    Assert.assertEquals(expectedValues, result.toSet)
-//  }
-//
-//
-//  @Test
-//  def testSort() = {
-//    // Set up WayangContext.
-//    val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
-//
-//    val inputValues1 = Array(3, 4, 5, 2, 1)
-//
-//    val builder = new PlanBuilder(wayang)
-//    val dataQuanta1 = builder.loadCollection(inputValues1)
-//    val result = dataQuanta1
-//      .sort(r=>r)
-//      .collect()
-//
-//    val expectedValues = Array(1, 2, 3, 4, 5)
-//    Assert.assertArrayEquals(expectedValues, result.toArray)
-//  }
-//
-//
-//  @Test
-//  def testPageRank() = {
-//    // Set up WayangContext.
-//    val wayang = new WayangContext()
-//      .withPlugin(Java.graphPlugin)
-//      .withPlugin(WayangBasics.graphPlugin)
-//      .withPlugin(Java.basicPlugin)
-//    import org.apache.wayang.api.graph._
-//
-//    val edges = Seq((0, 1), (0, 2), (0, 3), (1, 0), (2, 1), (3, 2), (3, 1)).map(t => Edge(t._1, t._2))
-//
-//    val pageRanks = wayang
-//      .loadCollection(edges).withName("Load edges")
-//      .pageRank(20).withName("PageRank")
-//      .collect()
-//      .map(t => t.field0.longValue -> t.field1)
-//      .toMap
-//
-//    print(pageRanks)
-//    // Let's not check absolute numbers but only the relative ordering.
-//    Assert.assertTrue(pageRanks(1) > pageRanks(0))
-//    Assert.assertTrue(pageRanks(0) > pageRanks(2))
-//    Assert.assertTrue(pageRanks(2) > pageRanks(3))
-//  }
-//
-//  @Test
-//  def testMapPartitions() = {
-//    // Set up WayangContext.
-//    val wayang = new WayangContext()
-//      .withPlugin(Java.basicPlugin())
-//      .withPlugin(Spark.basicPlugin)
-//
-//    val typeCounts = wayang
-//      .loadCollection(Seq(0, 1, 2, 3, 4, 6, 8))
-//        .mapPartitions { ints =>
-//          var (numOdds, numEvens) = (0, 0)
-//          ints.foreach(i => if ((i & 1) == 0) numEvens += 1 else numOdds += 1)
-//          Seq(("odd", numOdds), ("even", numEvens))
-//        }
-//      .reduceByKey(_._1, { case ((kind1, count1), (kind2, count2)) => (kind1, count1 + count2) })
-//      .collect()
-//
-//    Assert.assertEquals(Set(("odd", 2), ("even", 5)), typeCounts.toSet)
-//  }
-//
-//  @Test
-//  def testZipWithId() = {
-//    // Set up WayangContext.
-//    val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
-//
-//    val inputValues = for (i <- 0 until 100; j <- 0 until 42) yield i
-//
-//    val result = wayang
-//      .loadCollection(inputValues)
-//      .zipWithId
-//      .groupByKey(_.field1)
-//      .map { group =>
-//        import scala.collection.JavaConversions._
-//        (group.map(_.field0).toSet.size, 1)
-//      }
-//      .reduceByKey(_._1, (t1, t2) => (t1._1, t1._2 + t2._2))
-//      .collect()
-//
-//    val expectedValues = Set((42, 100))
-//    Assert.assertEquals(expectedValues, result.toSet)
-//  }
-//
-//  @Test
-//  def testWriteTextFile() = {
-//    val tempDir = LocalFileSystem.findTempDir
-//    val targetUrl = LocalFileSystem.toURL(new File(tempDir, "testWriteTextFile.txt"))
-//
-//    // Set up WayangContext.
-//    val wayang = new WayangContext().withPlugin(Java.basicPlugin)
-//
-//    val inputValues = for (i <- 0 to 5) yield i * 0.333333333333
-//
-//    val result = wayang
-//      .loadCollection(inputValues)
-//      .writeTextFile(targetUrl, formatterUdf = d => f"${d % .2f}")
-//
-//    val lines = scala.collection.mutable.Set[String]()
-//    Files.lines(Paths.get(new URI(targetUrl))).forEach(new Consumer[String] {
-//      override def accept(line: String): Unit = lines += line
-//    })
-//
-//    val expectedLines = inputValues.map(v => f"${v % .2f}").toSet
-//    Assert.assertEquals(expectedLines, lines)
-//  }
-//
-//  @Test
-//  def testSqlOnJava() = {
-//    // Initialize some test data.
-//    val configuration = new Configuration
-//    val sqlite3dbFile = File.createTempFile("wayang-sqlite3", "db")
-//    sqlite3dbFile.deleteOnExit()
-//    configuration.setProperty("wayang.sqlite3.jdbc.url", "jdbc:sqlite:" + sqlite3dbFile.getAbsolutePath)
-//
-//    try {
-//      val connection: Connection = Sqlite3.platform.createDatabaseDescriptor(configuration).createJdbcConnection
-//      try {
-//        val statement: Statement = connection.createStatement
-//        statement.addBatch("DROP TABLE IF EXISTS customer;")
-//        statement.addBatch("CREATE TABLE customer (name TEXT, age INT);")
-//        statement.addBatch("INSERT INTO customer VALUES ('John', 20)")
-//        statement.addBatch("INSERT INTO customer VALUES ('Timmy', 16)")
-//        statement.addBatch("INSERT INTO customer VALUES ('Evelyn', 35)")
-//        statement.executeBatch()
-//      } finally {
-//        if (connection != null) connection.close()
-//      }
-//    }
-//
-//    // Set up WayangContext.
-//    val wayang = new WayangContext(configuration).withPlugin(Java.basicPlugin).withPlugin(Sqlite3.plugin)
-//
-//    val result = wayang
-//      .readTable(new Sqlite3TableSource("customer", "name", "age"))
-//      .filter(r => r.getField(1).asInstanceOf[Integer] >= 18, sqlUdf = "age >= 18").withTargetPlatforms(Java.platform)
-//      .projectRecords(Seq("name"))
-//      .map(_.getField(0).asInstanceOf[String])
-//      .collect()
-//      .toSet
-//
-//    val expectedValues = Set("John", "Evelyn")
-//    Assert.assertEquals(expectedValues, result)
-//  }
-//
-//  @Test
-//  def testSqlOnSqlite3() = {
-//    // Initialize some test data.
-//    val configuration = new Configuration
-//    val sqlite3dbFile = File.createTempFile("wayang-sqlite3", "db")
-//    sqlite3dbFile.deleteOnExit()
-//    configuration.setProperty("wayang.sqlite3.jdbc.url", "jdbc:sqlite:" + sqlite3dbFile.getAbsolutePath)
-//
-//    try {
-//      val connection: Connection = Sqlite3.platform.createDatabaseDescriptor(configuration).createJdbcConnection
-//      try {
-//        val statement: Statement = connection.createStatement
-//        statement.addBatch("DROP TABLE IF EXISTS customer;")
-//        statement.addBatch("CREATE TABLE customer (name TEXT, age INT);")
-//        statement.addBatch("INSERT INTO customer VALUES ('John', 20)")
-//        statement.addBatch("INSERT INTO customer VALUES ('Timmy', 16)")
-//        statement.addBatch("INSERT INTO customer VALUES ('Evelyn', 35)")
-//        statement.executeBatch
-//      } finally {
-//        if (connection != null) connection.close()
-//      }
-//    }
-//
-//    // Set up WayangContext.
-//    val wayang = new WayangContext(configuration).withPlugin(Java.basicPlugin).withPlugin(Sqlite3.plugin)
-//
-//    val result = wayang
-//      .readTable(new Sqlite3TableSource("customer", "name", "age"))
-//      .filter(r => r.getField(1).asInstanceOf[Integer] >= 18, sqlUdf = "age >= 18")
-//      .projectRecords(Seq("name")).withTargetPlatforms(Sqlite3.platform)
-//      .map(_.getField(0).asInstanceOf[String])
-//      .collect()
-//      .toSet
-//
-//    val expectedValues = Set("John", "Evelyn")
-//    Assert.assertEquals(expectedValues, result)
-//  }
-}
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/scala/org/apache/wayang/plugin/hackit/api/ApiExtensionTest.scala b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/scala/org/apache/wayang/plugin/hackit/api/ApiExtensionTest.scala
new file mode 100644
index 0000000..59a0fd9
--- /dev/null
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/scala/org/apache/wayang/plugin/hackit/api/ApiExtensionTest.scala
@@ -0,0 +1,41 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ */
+
+package org.apache.wayang.plugin.hackit.api
+
+import org.apache.wayang.api.ApiTest
+import org.apache.wayang.api.dataquanta.DataQuantaFactory
+import org.junit.{BeforeClass, Test}
+import org.junit.jupiter.api.{BeforeAll, BeforeEach}
+
+class ApiExtensionTest extends ApiTest {
+
+  @BeforeEach
+  def setUp() ={
+    DataQuantaFactory.setTemplate(DataQuantaHackit);
+  }
+
+  @Test
+  override def testReadMapCollect(): Unit = {
+    DataQuantaFactory.setTemplate(DataQuantaHackit);
+
+    super.testReadMapCollect()
+  }
+
+}
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/wayang.properties b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/wayang.properties
new file mode 100644
index 0000000..b296279
--- /dev/null
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/wayang.properties
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+spark.driver.host = localhost