You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2021/09/22 15:21:48 UTC

[incubator-wayang] 07/15: [WAYANG-31] structure the code on different way

This is an automated email from the ASF dual-hosted git repository.

bertty pushed a commit to branch WAYANG-28
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git

commit d39ab4f63c1967fc856f64c6aca07eec3fb07807
Author: Bertty Contreras-Rojas <be...@scalytics.io>
AuthorDate: Thu May 13 12:10:46 2021 -0400

    [WAYANG-31] structure the code on different way
---
 lolo                                               | 121 ++++
 .../org/apache/wayang/api/DataQuantaBuilder.scala  | 440 -------------
 .../org/apache/wayang/api/JavaPlanBuilder.scala    |   3 +-
 .../scala/org/apache/wayang/api/PlanBuilder.scala  |   1 +
 .../org/apache/wayang/api/RecordDataQuanta.scala   |   1 +
 .../wayang/api/RecordDataQuantaBuilder.scala       |   3 +-
 .../wayang/api/{ => dataquanta}/DataQuanta.scala   |  98 +--
 .../wayang/api/dataquanta/DataQuantaBuilder.scala  | 441 +++++++++++++
 .../wayang/api/dataquanta/JoinedDataQuanta.scala   |  56 ++
 .../wayang/api/dataquanta/KeyedDataQuanta.scala    |  53 ++
 .../builder}/BasicDataQuantaBuilder.scala          |   5 +-
 .../builder}/CartesianDataQuantaBuilder.scala      |   5 +-
 .../builder}/CoGroupDataQuantaBuilder.scala        |   5 +-
 .../builder}/CountDataQuantaBuilder.scala          |   5 +-
 .../builder}/CustomOperatorDataQuantaBuilder.scala |   5 +-
 .../builder}/DistinctDataQuantaBuilder.scala       |   5 +-
 .../builder}/DoWhileDataQuantaBuilder.scala        |   5 +-
 .../builder}/FakeDataQuantaBuilder.scala           |   5 +-
 .../builder}/FilterDataQuantaBuilder.scala         |   5 +-
 .../builder}/FlatMapDataQuantaBuilder.scala        |   5 +-
 .../builder}/GlobalGroupDataQuantaBuilder.scala    |   5 +-
 .../builder}/GlobalReduceDataQuantaBuilder.scala   |   5 +-
 .../builder}/GroupByDataQuantaBuilder.scala        |   5 +-
 .../builder}/IntersectDataQuantaBuilder.scala      |   5 +-
 .../builder}/JoinDataQuantaBuilder.scala           |   5 +-
 .../builder}/KeyedDataQuantaBuilder.scala          |   5 +-
 .../builder}/LoadCollectionDataQuantaBuilder.scala |   5 +-
 .../builder}/MapDataQuantaBuilder.scala            |   5 +-
 .../builder}/MapPartitionsDataQuantaBuilder.scala  |   5 +-
 .../builder}/ProjectionDataQuantaBuilder.scala     |   5 +-
 .../builder}/ReduceByDataQuantaBuilder.scala       |   5 +-
 .../builder}/RepeatDataQuantaBuilder.scala         |   5 +-
 .../builder}/SampleDataQuantaBuilder.scala         |   5 +-
 .../builder}/SortDataQuantaBuilder.scala           |   5 +-
 .../builder}/UnarySourceDataQuantaBuilder.scala    |   5 +-
 .../builder}/UnionDataQuantaBuilder.scala          |   5 +-
 .../builder}/ZipWithIdDataQuantaBuilder.scala      |   5 +-
 .../apache/wayang/api/graph/EdgeDataQuanta.scala   |   1 +
 .../wayang/api/graph/EdgeDataQuantaBuilder.scala   |   5 +-
 .../org/apache/wayang/api/graph/package.scala      |   1 +
 .../main/scala/org/apache/wayang/api/package.scala |   3 +-
 .../wayang/api/util/DataQuantaBuilderCache.scala   |   4 +-
 .../api/util/DataQuantaBuilderDecorator.scala      |   3 +-
 .../java/org/apache/wayang/api/JavaApiTest.java    |   5 +-
 .../test/scala/org/apache/wayang/api/ApiTest.scala |   3 +-
 .../wayang-hackit/wayang-hackit-api/pom.xml        |  33 +
 .../java/org/apache/wayang/api/JavaApiTest.java    | 711 +++++++++++++++++++++
 .../test/scala/org/apache/wayang/api/ApiTest.scala | 575 +++++++++++++++++
 .../plugin/hackit/core/tagger/HackitTagger.java    |   1 +
 .../java/org/apache/wayang/tests/RegressionIT.java |   4 +-
 .../java/org/apache/wayang/tests/WayangPlans.java  |   2 +-
 51 files changed, 2117 insertions(+), 586 deletions(-)

diff --git a/lolo b/lolo
new file mode 100644
index 0000000..11a7c99
--- /dev/null
+++ b/lolo
@@ -0,0 +1,121 @@
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/bin/java -ea -Didea.test.cyclic.buffer.size=1048576 -javaagent
+/Users/bertty/Library/Application Support/JetBrains/Toolbox/apps/IDEA-U/ch-0/211.7142.45/IntelliJ IDEA.app/Contents/lib/idea_rt.jar=61718
+/Users/bertty/Library/Application Support/JetBrains/Toolbox/apps/IDEA-U/ch-0/211.7142.45/IntelliJ IDEA.app/Contents/bin -Dfile.encoding=UTF-8 -classpath /Users/bertty/Library/Application Support/JetBrains/Toolbox/apps/IDEA-U/ch-0/211.7142.45/IntelliJ IDEA.app/Contents/lib/idea_rt.jar
+/Users/bertty/.m2/repository/org/junit/platform/junit-platform-launcher/1.6.1/junit-platform-launcher-1.6.1.jar
+/Users/bertty/Library/Application Support/JetBrains/Toolbox/apps/IDEA-U/ch-0/211.7142.45/IntelliJ IDEA.app/Contents/plugins/junit/lib/junit5-rt.jar
+/Users/bertty/Library/Application Support/JetBrains/Toolbox/apps/IDEA-U/ch-0/211.7142.45/IntelliJ IDEA.app/Contents/plugins/junit/lib/junit-rt.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/jre/lib/charsets.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/jre/lib/ext/cldrdata.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/jre/lib/ext/dnsns.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/jre/lib/ext/jaccess.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/jre/lib/ext/jfxrt.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/jre/lib/ext/localedata.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/jre/lib/ext/nashorn.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/jre/lib/ext/sunec.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/jre/lib/ext/sunjce_provider.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/jre/lib/ext/sunpkcs11.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/jre/lib/ext/zipfs.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/jre/lib/jce.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/jre/lib/jfxswt.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/jre/lib/jsse.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/jre/lib/management-agent.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/jre/lib/resources.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/jre/lib/rt.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/lib/ant-javafx.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/lib/dt.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/lib/javafx-mx.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/lib/jconsole.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/lib/packager.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/lib/sa-jdi.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/lib/tools.jar
+/Users/bertty/IdeaProjects/incubator-wayang/wayang-plugins/wayang-hackit/wayang-hackit-api/target/test-classes
+/Users/bertty/IdeaProjects/incubator-wayang/wayang-plugins/wayang-hackit/wayang-hackit-api/target/classes
+/Users/bertty/IdeaProjects/incubator-wayang/wayang-api/wayang-api-scala-java/target/classes
+/Users/bertty/IdeaProjects/incubator-wayang/wayang-commons/wayang-core/target/classes
+/Users/bertty/.m2/repository/org/json/json/20160212/json-20160212.jar
+/Users/bertty/.m2/repository/org/yaml/snakeyaml/1.17/snakeyaml-1.17.jar
+/Users/bertty/.m2/repository/de/odysseus/juel/juel-api/2.2.7/juel-api-2.2.7.jar
+/Users/bertty/.m2/repository/de/odysseus/juel/juel-impl/2.2.7/juel-impl-2.2.7.jar
+/Users/bertty/.m2/repository/de/hpi/isg/profiledb-instrumentation/0.1.1/profiledb-instrumentation-0.1.1.jar
+/Users/bertty/.m2/repository/org/apache/commons/commons-lang3/3.3.2/commons-lang3-3.3.2.jar
+/Users/bertty/.m2/repository/org/apache/logging/log4j/log4j-api/2.14.0/log4j-api-2.14.0.jar
+/Users/bertty/.m2/repository/org/apache/logging/log4j/log4j-core/2.14.0/log4j-core-2.14.0.jar
+/Users/bertty/.m2/repository/net/sf/trove4j/trove4j/3.0.3/trove4j-3.0.3.jar
+/Users/bertty/IdeaProjects/incubator-wayang/wayang-commons/wayang-basic/target/classes
+/Users/bertty/.m2/repository/de/hpi/isg/profiledb-store/0.1.1/profiledb-store-0.1.1.jar
+/Users/bertty/IdeaProjects/incubator-wayang/wayang-platforms/wayang-java/target/classes
+/Users/bertty/.m2/repository/org/scala-lang/scala-library/2.11.12/scala-library-2.11.12.jar
+/Users/bertty/.m2/repository/org/apache/hadoop/hadoop-common/2.7.7/hadoop-common-2.7.7.jar
+/Users/bertty/.m2/repository/org/apache/hadoop/hadoop-annotations/2.7.7/hadoop-annotations-2.7.7.jar
+/Users/bertty/.m2/repository/com/google/guava/guava/11.0.2/guava-11.0.2.jar
+/Users/bertty/.m2/repository/commons-cli/commons-cli/1.2/commons-cli-1.2.jar
+/Users/bertty/.m2/repository/org/apache/commons/commons-math3/3.1.1/commons-math3-3.1.1.jar
+/Users/bertty/.m2/repository/xmlenc/xmlenc/0.52/xmlenc-0.52.jar
+/Users/bertty/.m2/repository/commons-httpclient/commons-httpclient/3.1/commons-httpclient-3.1.jar
+/Users/bertty/.m2/repository/commons-codec/commons-codec/1.4/commons-codec-1.4.jar
+/Users/bertty/.m2/repository/commons-io/commons-io/2.4/commons-io-2.4.jar
+/Users/bertty/.m2/repository/commons-net/commons-net/3.1/commons-net-3.1.jar
+/Users/bertty/.m2/repository/commons-collections/commons-collections/3.2.2/commons-collections-3.2.2.jar
+/Users/bertty/.m2/repository/org/mortbay/jetty/jetty-sslengine/6.1.26/jetty-sslengine-6.1.26.jar
+/Users/bertty/.m2/repository/commons-logging/commons-logging/1.1.3/commons-logging-1.1.3.jar
+/Users/bertty/.m2/repository/log4j/log4j/1.2.17/log4j-1.2.17.jar
+/Users/bertty/.m2/repository/commons-lang/commons-lang/2.6/commons-lang-2.6.jar
+/Users/bertty/.m2/repository/commons-configuration/commons-configuration/1.6/commons-configuration-1.6.jar
+/Users/bertty/.m2/repository/commons-digester/commons-digester/1.8/commons-digester-1.8.jar
+/Users/bertty/.m2/repository/commons-beanutils/commons-beanutils/1.7.0/commons-beanutils-1.7.0.jar
+/Users/bertty/.m2/repository/commons-beanutils/commons-beanutils-core/1.8.0/commons-beanutils-core-1.8.0.jar
+/Users/bertty/.m2/repository/org/slf4j/slf4j-api/1.7.10/slf4j-api-1.7.10.jar
+/Users/bertty/.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar
+/Users/bertty/.m2/repository/org/codehaus/jackson/jackson-core-asl/1.9.13/jackson-core-asl-1.9.13.jar
+/Users/bertty/.m2/repository/org/codehaus/jackson/jackson-mapper-asl/1.9.13/jackson-mapper-asl-1.9.13.jar
+/Users/bertty/.m2/repository/org/apache/avro/avro/1.7.4/avro-1.7.4.jar
+/Users/bertty/.m2/repository/com/thoughtworks/paranamer/paranamer/2.3/paranamer-2.3.jar
+/Users/bertty/.m2/repository/org/xerial/snappy/snappy-java/1.0.4.1/snappy-java-1.0.4.1.jar
+/Users/bertty/.m2/repository/com/google/protobuf/protobuf-java/2.5.0/protobuf-java-2.5.0.jar
+/Users/bertty/.m2/repository/com/google/code/gson/gson/2.2.4/gson-2.2.4.jar
+/Users/bertty/.m2/repository/org/apache/hadoop/hadoop-auth/2.7.7/hadoop-auth-2.7.7.jar
+/Users/bertty/.m2/repository/org/apache/httpcomponents/httpclient/4.2.5/httpclient-4.2.5.jar
+/Users/bertty/.m2/repository/org/apache/httpcomponents/httpcore/4.2.4/httpcore-4.2.4.jar
+/Users/bertty/.m2/repository/org/apache/directory/server/apacheds-kerberos-codec/2.0.0-M15/apacheds-kerberos-codec-2.0.0-M15.jar
+/Users/bertty/.m2/repository/org/apache/directory/server/apacheds-i18n/2.0.0-M15/apacheds-i18n-2.0.0-M15.jar
+/Users/bertty/.m2/repository/org/apache/directory/api/api-asn1-api/1.0.0-M20/api-asn1-api-1.0.0-M20.jar
+/Users/bertty/.m2/repository/org/apache/directory/api/api-util/1.0.0-M20/api-util-1.0.0-M20.jar
+/Users/bertty/.m2/repository/org/apache/curator/curator-framework/2.7.1/curator-framework-2.7.1.jar
+/Users/bertty/.m2/repository/org/apache/curator/curator-client/2.7.1/curator-client-2.7.1.jar
+/Users/bertty/.m2/repository/org/apache/curator/curator-recipes/2.7.1/curator-recipes-2.7.1.jar
+/Users/bertty/.m2/repository/com/google/code/findbugs/jsr305/3.0.0/jsr305-3.0.0.jar
+/Users/bertty/.m2/repository/org/apache/htrace/htrace-core/3.1.0-incubating/htrace-core-3.1.0-incubating.jar
+/Users/bertty/.m2/repository/org/apache/zookeeper/zookeeper/3.4.6/zookeeper-3.4.6.jar
+/Users/bertty/.m2/repository/org/apache/commons/commons-compress/1.4.1/commons-compress-1.4.1.jar
+/Users/bertty/.m2/repository/org/tukaani/xz/1.0/xz-1.0.jar
+/Users/bertty/.m2/repository/org/apache/hadoop/hadoop-hdfs/2.7.7/hadoop-hdfs-2.7.7.jar
+/Users/bertty/.m2/repository/org/mortbay/jetty/jetty-util/6.1.26/jetty-util-6.1.26.jar
+/Users/bertty/.m2/repository/io/netty/netty/3.6.2.Final/netty-3.6.2.Final.jar
+/Users/bertty/.m2/repository/io/netty/netty-all/4.0.23.Final/netty-all-4.0.23.Final.jar
+/Users/bertty/.m2/repository/xerces/xercesImpl/2.9.1/xercesImpl-2.9.1.jar
+/Users/bertty/.m2/repository/xml-apis/xml-apis/1.3.04/xml-apis-1.3.04.jar
+/Users/bertty/.m2/repository/org/fusesource/leveldbjni/leveldbjni-all/1.8/leveldbjni-all-1.8.jar
+/Users/bertty/.m2/repository/org/junit/jupiter/junit-jupiter/5.6.1/junit-jupiter-5.6.1.jar
+/Users/bertty/.m2/repository/org/junit/jupiter/junit-jupiter-api/5.6.1/junit-jupiter-api-5.6.1.jar
+/Users/bertty/.m2/repository/org/opentest4j/opentest4j/1.2.0/opentest4j-1.2.0.jar
+/Users/bertty/.m2/repository/org/junit/platform/junit-platform-commons/1.6.1/junit-platform-commons-1.6.1.jar
+/Users/bertty/.m2/repository/org/junit/jupiter/junit-jupiter-params/5.6.1/junit-jupiter-params-5.6.1.jar
+/Users/bertty/.m2/repository/org/junit/jupiter/junit-jupiter-engine/5.6.1/junit-jupiter-engine-5.6.1.jar
+/Users/bertty/.m2/repository/org/junit/vintage/junit-vintage-engine/5.6.1/junit-vintage-engine-5.6.1.jar
+/Users/bertty/.m2/repository/org/apiguardian/apiguardian-api/1.1.0/apiguardian-api-1.1.0.jar
+/Users/bertty/.m2/repository/org/junit/platform/junit-platform-engine/1.6.1/junit-platform-engine-1.6.1.jar
+/Users/bertty/.m2/repository/junit/junit/4.12/junit-4.12.jar
+/Users/bertty/.m2/repository/org/hamcrest/hamcrest-core/1.3/hamcrest-core-1.3.jar
+/Users/bertty/.m2/repository/org/mockito/mockito-core/3.5.10/mockito-core-3.5.10.jar
+/Users/bertty/.m2/repository/net/bytebuddy/byte-buddy/1.10.13/byte-buddy-1.10.13.jar
+/Users/bertty/.m2/repository/net/bytebuddy/byte-buddy-agent/1.10.13/byte-buddy-agent-1.10.13.jar
+/Users/bertty/.m2/repository/org/objenesis/objenesis/3.1/objenesis-3.1.jar
+/Users/bertty/.m2/repository/org/mockito/mockito-junit-jupiter/3.5.10/mockito-junit-jupiter-3.5.10.jar
+/Users/bertty/.m2/repository/org/assertj/assertj-core/3.17.2/assertj-core-3.17.2.jar
+/Users/bertty/.m2/repository/org/antlr/antlr4/4.9.1/antlr4-4.9.1.jar
+/Users/bertty/.m2/repository/org/antlr/antlr4-runtime/4.9.1/antlr4-runtime-4.9.1.jar
+/Users/bertty/.m2/repository/org/antlr/antlr-runtime/3.5.2/antlr-runtime-3.5.2.jar
+/Users/bertty/.m2/repository/org/antlr/ST4/4.3/ST4-4.3.jar
+/Users/bertty/.m2/repository/org/abego/treelayout/org.abego.treelayout.core/1.0.3/org.abego.treelayout.core-1.0.3.jar
+/Users/bertty/.m2/repository/org/glassfish/javax.json/1.0.4/javax.json-1.0.4.jar
+/Users/bertty/.m2/repository/com/ibm/icu/icu4j/61.1/icu4j-61.1.jar com.intellij.rt.junit.JUnitStarter -ideVersion5 -junit5 org.apache.wayang.api.ApiTest,testReadMapCollect
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala
deleted file mode 100644
index c6e5dcb..0000000
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala
+++ /dev/null
@@ -1,440 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.wayang.api
-
-
-import java.util.function.{Consumer, IntUnaryOperator, Function => JavaFunction}
-import java.util.{Collection => JavaCollection}
-import de.hpi.isg.profiledb.store.model.Experiment
-import org.apache.wayang.api.dataquantabuilder.{BasicDataQuantaBuilder, CartesianDataQuantaBuilder, CoGroupDataQuantaBuilder, CountDataQuantaBuilder, CustomOperatorDataQuantaBuilder, DistinctDataQuantaBuilder, DoWhileDataQuantaBuilder, FilterDataQuantaBuilder, FlatMapDataQuantaBuilder, GlobalGroupDataQuantaBuilder, GlobalReduceDataQuantaBuilder, GroupByDataQuantaBuilder, IntersectDataQuantaBuilder, JoinDataQuantaBuilder, KeyedDataQuantaBuilder, MapDataQuantaBuilder, MapPartitionsDataQuan [...]
-import org.apache.wayang.api.graph.{Edge, EdgeDataQuantaBuilder, EdgeDataQuantaBuilderDecorator}
-import org.apache.wayang.api.util.{DataQuantaBuilderCache, TypeTrap}
-import org.apache.wayang.basic.data.{Record, Tuple2 => RT2}
-import org.apache.wayang.basic.operators.{GlobalReduceOperator, LocalCallbackSink, MapOperator, SampleOperator}
-import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBiFunction, SerializableBinaryOperator, SerializableFunction, SerializablePredicate}
-import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
-import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator
-import org.apache.wayang.core.optimizer.costs.{LoadEstimator, LoadProfile, LoadProfileEstimator}
-import org.apache.wayang.core.plan.wayangplan.{Operator, OutputSlot, UnarySource, WayangPlan}
-import org.apache.wayang.core.platform.Platform
-import org.apache.wayang.core.types.DataSetType
-import org.apache.wayang.core.util.{Logging, ReflectionUtils, WayangCollections, Tuple => WayangTuple}
-
-import scala.collection.mutable.ListBuffer
-import scala.reflect.ClassTag
-
-/**
-  * Trait/interface for builders of [[DataQuanta]]. The purpose of the builders is to provide a convenient
-  * Java API for Wayang that compensates for lacking default and named arguments.
-  */
-trait DataQuantaBuilder[+This <: DataQuantaBuilder[_, Out], Out] extends Logging {
-
-  /**
-    * The type of the [[DataQuanta]] to be built.
-    */
-  protected[api] def outputTypeTrap: TypeTrap
-
-  /**
-    * Provide a [[JavaPlanBuilder]] to which this instance is associated.
-    */
-  protected[api] implicit def javaPlanBuilder: JavaPlanBuilder
-
-  /**
-    * Set a name for the [[DataQuanta]] and its associated [[org.apache.wayang.core.plan.wayangplan.Operator]]s.
-    *
-    * @param name the name
-    * @return this instance
-    */
-  def withName(name: String): This
-
-  /**
-    * Set an [[Experiment]] for the currently built [[org.apache.wayang.core.api.Job]].
-    *
-    * @param experiment the [[Experiment]]
-    * @return this instance
-    */
-  def withExperiment(experiment: Experiment): This
-
-  /**
-    * Explicitly set an output [[DataSetType]] for the currently built [[DataQuanta]]. Note that it is not
-    * always necessary to set it and that it can be inferred in some situations.
-    *
-    * @param outputType the output [[DataSetType]]
-    * @return this instance
-    */
-  def withOutputType(outputType: DataSetType[Out]): This
-
-  /**
-    * Explicitly set an output [[Class]] for the currently built [[DataQuanta]]. Note that it is not
-    * always necessary to set it and that it can be inferred in some situations.
-    *
-    * @param cls the output [[Class]]
-    * @return this instance
-    */
-  def withOutputClass(cls: Class[Out]): This
-
-  /**
-    * Register a broadcast with the [[DataQuanta]] to be built
-    *
-    * @param sender        a [[DataQuantaBuilder]] constructing the broadcasted [[DataQuanta]]
-    * @param broadcastName the name of the broadcast
-    * @return this instance
-    */
-  def withBroadcast[Sender <: DataQuantaBuilder[_, _]](sender: Sender, broadcastName: String): This
-
-  /**
-    * Set a [[CardinalityEstimator]] for the currently built [[DataQuanta]].
-    *
-    * @param cardinalityEstimator the [[CardinalityEstimator]]
-    * @return this instance
-    */
-  def withCardinalityEstimator(cardinalityEstimator: CardinalityEstimator): This
-
-  /**
-    * Add a target [[Platform]] on which the currently built [[DataQuanta]] should be calculated. Can be invoked
-    * multiple times to set multiple possilbe target [[Platform]]s or not at all to impose no restrictions.
-    *
-    * @param platform the [[CardinalityEstimator]]
-    * @return this instance
-    */
-  def withTargetPlatform(platform: Platform): This
-
-  /**
-    * Register the JAR file containing the given [[Class]] with the currently built [[org.apache.wayang.core.api.Job]].
-    *
-    * @param cls the [[Class]]
-    * @return this instance
-    */
-  def withUdfJarOf(cls: Class[_]): This
-
-  /**
-    * Register a JAR file with the currently built [[org.apache.wayang.core.api.Job]].
-    *
-    * @param path the path of the JAR file
-    * @return this instance
-    */
-  def withUdfJar(path: String): This
-
-  /**
-    * Provide a [[ClassTag]] for the constructed [[DataQuanta]].
-    *
-    * @return the [[ClassTag]]
-    */
-  protected[api] implicit def classTag: ClassTag[Out] = ClassTag(outputTypeTrap.typeClass)
-
-  /**
-    * Feed the built [[DataQuanta]] into a [[MapOperator]].
-    *
-    * @param udf the UDF for the [[MapOperator]]
-    * @return a [[MapDataQuantaBuilder]]
-    */
-  def map[NewOut](udf: SerializableFunction[Out, NewOut]) = new MapDataQuantaBuilder(this, udf)
-
-  /**
-    * Feed the built [[DataQuanta]] into a [[MapOperator]] with a [[org.apache.wayang.basic.function.ProjectionDescriptor]].
-    *
-    * @param fieldNames field names for the [[org.apache.wayang.basic.function.ProjectionDescriptor]]
-    * @return a [[MapDataQuantaBuilder]]
-    */
-  def project[NewOut](fieldNames: Array[String]) = new ProjectionDataQuantaBuilder(this, fieldNames)
-
-  /**
-    * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.FilterOperator]].
-    *
-    * @param udf filter UDF
-    * @return a [[FilterDataQuantaBuilder]]
-    */
-  def filter(udf: SerializablePredicate[Out]) = new FilterDataQuantaBuilder(this, udf)
-
-  /**
-    * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.FlatMapOperator]].
-    *
-    * @param udf the UDF for the [[org.apache.wayang.basic.operators.FlatMapOperator]]
-    * @return a [[FlatMapDataQuantaBuilder]]
-    */
-  def flatMap[NewOut](udf: SerializableFunction[Out, java.lang.Iterable[NewOut]]) = new FlatMapDataQuantaBuilder(this, udf)
-
-  /**
-    * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.MapPartitionsOperator]].
-    *
-    * @param udf the UDF for the [[org.apache.wayang.basic.operators.MapPartitionsOperator]]
-    * @return a [[MapPartitionsDataQuantaBuilder]]
-    */
-  def mapPartitions[NewOut](udf: SerializableFunction[java.lang.Iterable[Out], java.lang.Iterable[NewOut]]) =
-    new MapPartitionsDataQuantaBuilder(this, udf)
-
-  /**
-    * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.SampleOperator]].
-    *
-    * @param sampleSize the absolute size of the sample
-    * @return a [[SampleDataQuantaBuilder]]
-    */
-  def sample(sampleSize: Int): SampleDataQuantaBuilder[Out] = this.sample(new IntUnaryOperator {
-    override def applyAsInt(operand: Int): Int = sampleSize
-  })
-
-
-  /**
-    * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.SampleOperator]].
-    *
-    * @param sampleSizeFunction the absolute size of the sample as a function of the current iteration number
-    * @return a [[SampleDataQuantaBuilder]]
-    */
-  def sample(sampleSizeFunction: IntUnaryOperator) = new SampleDataQuantaBuilder[Out](this, sampleSizeFunction)
-
-  /**
-    * Annotates a key to this instance.
-    * @param keyExtractor extracts the key from the data quanta
-    * @return a [[KeyedDataQuantaBuilder]]
-    */
-  def keyBy[Key](keyExtractor: SerializableFunction[Out, Key]) = new KeyedDataQuantaBuilder[Out, Key](this, keyExtractor)
-
-  /**
-    * Feed the built [[DataQuanta]] into a [[GlobalReduceOperator]].
-    *
-    * @param udf the UDF for the [[GlobalReduceOperator]]
-    * @return a [[GlobalReduceDataQuantaBuilder]]
-    */
-  def reduce(udf: SerializableBinaryOperator[Out]) = new GlobalReduceDataQuantaBuilder(this, udf)
-
-  /**
-    * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.ReduceByOperator]].
-    *
-    * @param keyUdf the key UDF for the [[org.apache.wayang.basic.operators.ReduceByOperator]]
-    * @param udf    the UDF for the [[org.apache.wayang.basic.operators.ReduceByOperator]]
-    * @return a [[ReduceByDataQuantaBuilder]]
-    */
-  def reduceByKey[Key](keyUdf: SerializableFunction[Out, Key], udf: SerializableBinaryOperator[Out]) =
-    new ReduceByDataQuantaBuilder(this, keyUdf, udf)
-
-  /**
-    * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.MaterializedGroupByOperator]].
-    *
-    * @param keyUdf the key UDF for the [[org.apache.wayang.basic.operators.MaterializedGroupByOperator]]
-    * @return a [[GroupByDataQuantaBuilder]]
-    */
-  def groupByKey[Key](keyUdf: SerializableFunction[Out, Key]) =
-    new GroupByDataQuantaBuilder(this, keyUdf)
-
-  /**
-    * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.GlobalMaterializedGroupOperator]].
-    *
-    * @return a [[GlobalGroupDataQuantaBuilder]]
-    */
-  def group() = new GlobalGroupDataQuantaBuilder(this)
-
-  /**
-    * Feed the built [[DataQuanta]] of this and the given instance into a
-    * [[org.apache.wayang.basic.operators.UnionAllOperator]].
-    *
-    * @param that the other [[DataQuantaBuilder]] to union with
-    * @return a [[UnionDataQuantaBuilder]]
-    */
-  def union(that: DataQuantaBuilder[_, Out]) = new UnionDataQuantaBuilder(this, that)
-
-  /**
-    * Feed the built [[DataQuanta]] of this and the given instance into a
-    * [[org.apache.wayang.basic.operators.IntersectOperator]].
-    *
-    * @param that the other [[DataQuantaBuilder]] to intersect with
-    * @return an [[IntersectDataQuantaBuilder]]
-    */
-  def intersect(that: DataQuantaBuilder[_, Out]) = new IntersectDataQuantaBuilder(this, that)
-
-  /**
-    * Feed the built [[DataQuanta]] of this and the given instance into a
-    * [[org.apache.wayang.basic.operators.JoinOperator]].
-    *
-    * @param thisKeyUdf the key extraction UDF for this instance
-    * @param that       the other [[DataQuantaBuilder]] to join with
-    * @param thatKeyUdf the key extraction UDF for `that` instance
-    * @return a [[JoinDataQuantaBuilder]]
-    */
-  def join[ThatOut, Key](thisKeyUdf: SerializableFunction[Out, Key],
-                         that: DataQuantaBuilder[_, ThatOut],
-                         thatKeyUdf: SerializableFunction[ThatOut, Key]) =
-    new JoinDataQuantaBuilder(this, that, thisKeyUdf, thatKeyUdf)
-
-  /**
-    * Feed the built [[DataQuanta]] of this and the given instance into a
-    * [[org.apache.wayang.basic.operators.CoGroupOperator]].
-    *
-    * @param thisKeyUdf the key extraction UDF for this instance
-    * @param that       the other [[DataQuantaBuilder]] to join with
-    * @param thatKeyUdf the key extraction UDF for `that` instance
-    * @return a [[CoGroupDataQuantaBuilder]]
-    */
-  def coGroup[ThatOut, Key](thisKeyUdf: SerializableFunction[Out, Key],
-                         that: DataQuantaBuilder[_, ThatOut],
-                         thatKeyUdf: SerializableFunction[ThatOut, Key]) =
-    new CoGroupDataQuantaBuilder(this, that, thisKeyUdf, thatKeyUdf)
-
-
-  /**
-    * Feed the built [[DataQuanta]] of this and the given instance into a
-    * [[org.apache.wayang.basic.operators.SortOperator]].
-    *
-    * @param keyUdf the key extraction UDF for this instance
-    * @return a [[SortDataQuantaBuilder]]
-    */
-  def sort[Key](keyUdf: SerializableFunction[Out, Key]) =
-    new SortDataQuantaBuilder(this, keyUdf)
-
-  /**
-    * Feed the built [[DataQuanta]] of this and the given instance into a
-    * [[org.apache.wayang.basic.operators.CartesianOperator]].
-    *
-    * @return a [[CartesianDataQuantaBuilder]]
-    */
-  def cartesian[ThatOut](that: DataQuantaBuilder[_, ThatOut]) = new CartesianDataQuantaBuilder(this, that)
-
-  /**
-    * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.ZipWithIdOperator]].
-    *
-    * @return a [[ZipWithIdDataQuantaBuilder]] representing the [[org.apache.wayang.basic.operators.ZipWithIdOperator]]'s output
-    */
-  def zipWithId = new ZipWithIdDataQuantaBuilder(this)
-
-  /**
-    * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.DistinctOperator]].
-    *
-    * @return a [[DistinctDataQuantaBuilder]] representing the [[org.apache.wayang.basic.operators.DistinctOperator]]'s output
-    */
-  def distinct = new DistinctDataQuantaBuilder(this)
-
-  /**
-    * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.CountOperator]].
-    *
-    * @return a [[CountDataQuantaBuilder]] representing the [[org.apache.wayang.basic.operators.CountOperator]]'s output
-    */
-  def count = new CountDataQuantaBuilder(this)
-
-  /**
-    * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.DoWhileOperator]].
-    *
-    * @return a [[DoWhileDataQuantaBuilder]]
-    */
-  def doWhile[Conv](conditionUdf: SerializablePredicate[JavaCollection[Conv]],
-                    bodyBuilder: JavaFunction[DataQuantaBuilder[_, Out], WayangTuple[DataQuantaBuilder[_, Out], DataQuantaBuilder[_, Conv]]]) =
-    new DoWhileDataQuantaBuilder(this, conditionUdf.asInstanceOf[SerializablePredicate[JavaCollection[Conv]]], bodyBuilder)
-
-  /**
-    * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.RepeatOperator]].
-    *
-    * @return a [[DoWhileDataQuantaBuilder]]
-    */
-  def repeat(numRepetitions: Int, bodyBuilder: JavaFunction[DataQuantaBuilder[_, Out], DataQuantaBuilder[_, Out]]) =
-    new RepeatDataQuantaBuilder(this, numRepetitions, bodyBuilder)
-
-  /**
-    * Feed the built [[DataQuanta]] into a custom [[Operator]] with a single [[org.apache.wayang.core.plan.wayangplan.InputSlot]]
-    * and a single [[OutputSlot]].
-    *
-    * @param operator the custom [[Operator]]
-    * @tparam T the type of the output [[DataQuanta]]
-    * @return a [[CustomOperatorDataQuantaBuilder]]
-    */
-  def customOperator[T](operator: Operator) = {
-    assert(operator.getNumInputs == 1, "customOperator(...) only allows for operators with a single input.")
-    assert(operator.getNumOutputs == 1, "customOperator(...) only allows for operators with a single output.")
-    new CustomOperatorDataQuantaBuilder[T](operator, 0, new DataQuantaBuilderCache, this)
-  }
-
-  /**
-    * Feed the built [[DataQuanta]] into a [[LocalCallbackSink]] that collects all data quanta locally. This triggers
-    * execution of the constructed [[WayangPlan]].
-    *
-    * @return the collected data quanta
-    */
-  def collect(): JavaCollection[Out] = {
-    import scala.collection.JavaConversions._
-    this.dataQuanta().collect()
-  }
-
-  /**
-    * Feed the built [[DataQuanta]] into a [[JavaFunction]] that runs locally. This triggers
-    * execution of the constructed [[WayangPlan]].
-    *
-    * @param f the [[JavaFunction]]
-    * @return the collected data quanta
-    */
-  def forEach(f: Consumer[Out]): Unit = this.dataQuanta().foreachJava(f)
-
-  /**
-    * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.TextFileSink]]. This triggers
-    * execution of the constructed [[WayangPlan]].
-    *
-    * @param url     the URL of the file to be written
-    * @param jobName optional name for the [[WayangPlan]]
-    * @return the collected data quanta
-    */
-  def writeTextFile(url: String, formatterUdf: SerializableFunction[Out, String], jobName: String): Unit =
-    this.writeTextFile(url, formatterUdf, jobName, null)
-
-  /**
-    * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.TextFileSink]]. This triggers
-    * execution of the constructed [[WayangPlan]].
-    *
-    * @param url the URL of the file to be written
-    * @return the collected data quanta
-    */
-  def writeTextFile(url: String,
-                    formatterUdf: SerializableFunction[Out, String],
-                    jobName: String,
-                    udfLoadProfileEstimator: LoadProfileEstimator): Unit = {
-    this.javaPlanBuilder.withJobName(jobName)
-    this.dataQuanta().writeTextFileJava(url, formatterUdf, udfLoadProfileEstimator)
-  }
-
-  /**
-    * Enriches the set of operations to [[Record]]-based ones. This instances must deal with data quanta of
-    * type [[Record]], though. Because of Java's type erasure, we need to leave it up to you whether this
-    * operation is applicable.
-    *
-    * @return a [[RecordDataQuantaBuilder]]
-    */
-  def asRecords[T <: RecordDataQuantaBuilder[T]]: RecordDataQuantaBuilder[T] = {
-    this match {
-      case records: RecordDataQuantaBuilder[_] => records.asInstanceOf[RecordDataQuantaBuilder[T]]
-      case _ => new RecordDataQuantaBuilderDecorator(this.asInstanceOf[DataQuantaBuilder[_, Record]])
-    }
-  }
-
-  /**
-    * Enriches the set of operations to [[Edge]]-based ones. This instances must deal with data quanta of
-    * type [[Edge]], though. Because of Java's type erasure, we need to leave it up to you whether this
-    * operation is applicable.
-    *
-    * @return a [[EdgeDataQuantaBuilder]]
-    */
-  def asEdges[T <: EdgeDataQuantaBuilder[T]]: EdgeDataQuantaBuilder[T] = {
-    this match {
-      case edges: RecordDataQuantaBuilder[_] => edges.asInstanceOf[EdgeDataQuantaBuilder[T]]
-      case _ => new EdgeDataQuantaBuilderDecorator(this.asInstanceOf[DataQuantaBuilder[_, Edge]])
-    }
-  }
-
-  /**
-    * Get or create the [[DataQuanta]] built by this instance.
-    *
-    * @return the [[DataQuanta]]
-    */
-  protected[api] def dataQuanta(): DataQuanta[Out]
-
-}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala
index 2c88f58..bcb916f 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala
@@ -21,7 +21,8 @@ package org.apache.wayang.api
 import java.util.{Collection => JavaCollection}
 import de.hpi.isg.profiledb.store.model.Experiment
 import org.apache.commons.lang3.Validate
-import org.apache.wayang.api.dataquantabuilder.{CustomOperatorDataQuantaBuilder, LoadCollectionDataQuantaBuilder, UnarySourceDataQuantaBuilder}
+import org.apache.wayang.api.dataquanta.builder.{CustomOperatorDataQuantaBuilder, LoadCollectionDataQuantaBuilder, UnarySourceDataQuantaBuilder}
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
 import org.apache.wayang.api.util.DataQuantaBuilderCache
 import org.apache.wayang.basic.data.Record
 import org.apache.wayang.basic.operators.{TableSource, TextFileSource}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala
index 8600e23..1c59ca9 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala
@@ -21,6 +21,7 @@ package org.apache.wayang.api
 import de.hpi.isg.profiledb.store.model.Experiment
 import org.apache.commons.lang3.Validate
 import org.apache.wayang.api
+import org.apache.wayang.api.dataquanta.DataQuanta
 import org.apache.wayang.basic.data.Record
 import org.apache.wayang.basic.operators.{CollectionSource, TableSource, TextFileSource}
 import org.apache.wayang.core.api.WayangContext
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/RecordDataQuanta.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/RecordDataQuanta.scala
index cc95e39..61c6909 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/RecordDataQuanta.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/RecordDataQuanta.scala
@@ -18,6 +18,7 @@
 
 package org.apache.wayang.api
 
+import org.apache.wayang.api.dataquanta.DataQuanta
 import org.apache.wayang.basic.data.Record
 import org.apache.wayang.basic.function.ProjectionDescriptor
 import org.apache.wayang.basic.operators.MapOperator
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/RecordDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/RecordDataQuantaBuilder.scala
index 5dc045b..0f1c1e2 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/RecordDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/RecordDataQuantaBuilder.scala
@@ -18,7 +18,8 @@
 
 package org.apache.wayang.api
 
-import org.apache.wayang.api.dataquantabuilder.BasicDataQuantaBuilder
+import org.apache.wayang.api.dataquanta.builder.BasicDataQuantaBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
 import org.apache.wayang.api.util.DataQuantaBuilderDecorator
 import org.apache.wayang.basic.data.Record
 import org.apache.wayang.basic.function.ProjectionDescriptor
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuanta.scala
similarity index 90%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuanta.scala
index 89722cc..a614467 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuanta.scala
@@ -1,29 +1,28 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *       http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
  */
 
-package org.apache.wayang.api
-
-import _root_.java.lang.{Iterable => JavaIterable}
-import _root_.java.util.function.{Consumer, IntUnaryOperator, BiFunction => JavaBiFunction, Function => JavaFunction}
-import _root_.java.util.{Collection => JavaCollection}
+package org.apache.wayang.api.dataquanta
 
 import de.hpi.isg.profiledb.store.model.Experiment
 import org.apache.commons.lang3.Validate
+import org.apache.wayang.api.{PlanBuilder, basicDataUnitType, dataSetType, groupedDataSetType, groupedDataUnitType, toConsumer, toSerializableBinaryOperator, toSerializableFlatteningFunction, toSerializableFunction, toSerializablePartitionFunction, toSerializablePredicate}
+import org.apache.wayang.basic.data.{Tuple2 => WayangTuple2}
 import org.apache.wayang.basic.function.ProjectionDescriptor
 import org.apache.wayang.basic.operators._
 import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBinaryOperator, SerializableFunction, SerializablePredicate}
@@ -34,8 +33,10 @@ import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator
 import org.apache.wayang.core.plan.wayangplan._
 import org.apache.wayang.core.platform.Platform
 import org.apache.wayang.core.util.{Tuple => WayangTuple}
-import org.apache.wayang.basic.data.{Tuple2 => WayangTuple2}
 
+import java.lang.{Iterable => JavaIterable}
+import java.util.function.{Consumer, IntUnaryOperator, BiFunction => JavaBiFunction, Function => JavaFunction}
+import java.util.{Collection => JavaCollection}
 import scala.collection.JavaConversions
 import scala.collection.JavaConversions._
 import scala.reflect._
@@ -866,65 +867,6 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I
 
 }
 
-/**
-  * This class provides operations on [[DataQuanta]] with additional operations.
-  */
-class KeyedDataQuanta[Out: ClassTag, Key: ClassTag](val dataQuanta: DataQuanta[Out],
-                                                    val keyExtractor: SerializableFunction[Out, Key]) {
-
-  /**
-    * Performs a join. The join fields are governed by the [[KeyedDataQuanta]]'s keys.
-    *
-    * @param that the other [[KeyedDataQuanta]] to join with
-    * @return the join product [[DataQuanta]]
-    */
-  def join[ThatOut: ClassTag](that: KeyedDataQuanta[ThatOut, Key]):
-  DataQuanta[WayangTuple2[Out, ThatOut]] =
-    dataQuanta.joinJava[ThatOut, Key](this.keyExtractor, that.dataQuanta, that.keyExtractor)
-
-  /**
-    * Performs a co-group. The grouping fields are governed by the [[KeyedDataQuanta]]'s keys.
-    *
-    * @param that the other [[KeyedDataQuanta]] to co-group with
-    * @return the co-grouped [[DataQuanta]]
-    */
-  def coGroup[ThatOut: ClassTag](that: KeyedDataQuanta[ThatOut, Key]):
-  DataQuanta[WayangTuple2[java.lang.Iterable[Out], java.lang.Iterable[ThatOut]]] =
-    dataQuanta.coGroupJava[ThatOut, Key](this.keyExtractor, that.dataQuanta, that.keyExtractor)
-
-}
-
-/**
-  * This class amends joined [[DataQuanta]] with additional operations.
-  */
-class JoinedDataQuanta[Out0: ClassTag, Out1: ClassTag]
-(val dataQuanta: DataQuanta[WayangTuple2[Out0, Out1]]) {
-
-  /**
-    * Assembles a new element from a join product tuple.
-    *
-    * @param udf     creates the output data quantum from two joinable data quanta
-    * @param udfLoad optional [[LoadProfileEstimator]] for the `udf`
-    * @return the join product [[DataQuanta]]
-    */
-  def assemble[NewOut: ClassTag](udf: (Out0, Out1) => NewOut,
-                                 udfLoad: LoadProfileEstimator = null):
-  DataQuanta[NewOut] =
-    dataQuanta.map(joinTuple => udf.apply(joinTuple.field0, joinTuple.field1), udfLoad)
-
-  /**
-    * Assembles a new element from a join product tuple.
-    *
-    * @param assembler creates the output data quantum from two joinable data quanta
-    * @param udfLoad optional [[LoadProfileEstimator]] for the `udf`
-    * @return the join product [[DataQuanta]]
-    */
-  def assembleJava[NewOut: ClassTag](assembler: JavaBiFunction[Out0, Out1, NewOut],
-                                     udfLoad: LoadProfileEstimator = null): DataQuanta[NewOut] =
-    dataQuanta.map(join => assembler.apply(join.field0, join.field1), udfLoad)
-
-}
-
 object DataQuanta {
 
   def create[T](output: OutputSlot[T])(implicit planBuilder: PlanBuilder): DataQuanta[_] =
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaBuilder.scala
new file mode 100644
index 0000000..c0a8e8e
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaBuilder.scala
@@ -0,0 +1,441 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ */
+
+package org.apache.wayang.api.dataquanta
+
+import de.hpi.isg.profiledb.store.model.Experiment
+import org.apache.wayang.api.dataquanta.builder.{CartesianDataQuantaBuilder, CoGroupDataQuantaBuilder, CountDataQuantaBuilder, CustomOperatorDataQuantaBuilder, DistinctDataQuantaBuilder, DoWhileDataQuantaBuilder, FilterDataQuantaBuilder, FlatMapDataQuantaBuilder, GlobalGroupDataQuantaBuilder, GlobalReduceDataQuantaBuilder, GroupByDataQuantaBuilder, IntersectDataQuantaBuilder, JoinDataQuantaBuilder, KeyedDataQuantaBuilder, MapDataQuantaBuilder, MapPartitionsDataQuantaBuilder, ProjectionDa [...]
+import org.apache.wayang.api.graph.{Edge, EdgeDataQuantaBuilder, EdgeDataQuantaBuilderDecorator}
+import org.apache.wayang.api.util.{DataQuantaBuilderCache, TypeTrap}
+import org.apache.wayang.api.{JavaPlanBuilder, RecordDataQuantaBuilder, RecordDataQuantaBuilderDecorator}
+import org.apache.wayang.basic.data.Record
+import org.apache.wayang.basic.operators.{GlobalReduceOperator, LocalCallbackSink, MapOperator}
+import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBinaryOperator, SerializableFunction, SerializablePredicate}
+import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator
+import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator
+import org.apache.wayang.core.plan.wayangplan.{Operator, OutputSlot, WayangPlan}
+import org.apache.wayang.core.platform.Platform
+import org.apache.wayang.core.types.DataSetType
+import org.apache.wayang.core.util.Logging
+import org.apache.wayang.core.util.{Tuple => WayangTuple}
+
+import java.util.function.{Consumer, IntUnaryOperator, Function => JavaFunction}
+import java.util.{Collection => JavaCollection}
+import scala.reflect.ClassTag
+
+/**
+ * Trait/interface for builders of [[DataQuanta]]. The purpose of the builders is to provide a convenient
+ * Java API for Wayang that compensates for lacking default and named arguments.
+ */
+trait DataQuantaBuilder[+This <: DataQuantaBuilder[_, Out], Out] extends Logging {
+
+  /**
+   * The type of the [[DataQuanta]] to be built.
+   */
+  protected[api] def outputTypeTrap: TypeTrap
+
+  /**
+   * Provide a [[JavaPlanBuilder]] to which this instance is associated.
+   */
+  protected[api] implicit def javaPlanBuilder: JavaPlanBuilder
+
+  /**
+   * Set a name for the [[DataQuanta]] and its associated [[org.apache.wayang.core.plan.wayangplan.Operator]]s.
+   *
+   * @param name the name
+   * @return this instance
+   */
+  def withName(name: String): This
+
+  /**
+   * Set an [[Experiment]] for the currently built [[org.apache.wayang.core.api.Job]].
+   *
+   * @param experiment the [[Experiment]]
+   * @return this instance
+   */
+  def withExperiment(experiment: Experiment): This
+
+  /**
+   * Explicitly set an output [[DataSetType]] for the currently built [[DataQuanta]]. Note that it is not
+   * always necessary to set it and that it can be inferred in some situations.
+   *
+   * @param outputType the output [[DataSetType]]
+   * @return this instance
+   */
+  def withOutputType(outputType: DataSetType[Out]): This
+
+  /**
+   * Explicitly set an output [[Class]] for the currently built [[DataQuanta]]. Note that it is not
+   * always necessary to set it and that it can be inferred in some situations.
+   *
+   * @param cls the output [[Class]]
+   * @return this instance
+   */
+  def withOutputClass(cls: Class[Out]): This
+
+  /**
+   * Register a broadcast with the [[DataQuanta]] to be built
+   *
+   * @param sender        a [[DataQuantaBuilder]] constructing the broadcasted [[DataQuanta]]
+   * @param broadcastName the name of the broadcast
+   * @return this instance
+   */
+  def withBroadcast[Sender <: DataQuantaBuilder[_, _]](sender: Sender, broadcastName: String): This
+
+  /**
+   * Set a [[CardinalityEstimator]] for the currently built [[DataQuanta]].
+   *
+   * @param cardinalityEstimator the [[CardinalityEstimator]]
+   * @return this instance
+   */
+  def withCardinalityEstimator(cardinalityEstimator: CardinalityEstimator): This
+
+  /**
+   * Add a target [[Platform]] on which the currently built [[DataQuanta]] should be calculated. Can be invoked
+   * multiple times to set multiple possilbe target [[Platform]]s or not at all to impose no restrictions.
+   *
+   * @param platform the [[CardinalityEstimator]]
+   * @return this instance
+   */
+  def withTargetPlatform(platform: Platform): This
+
+  /**
+   * Register the JAR file containing the given [[Class]] with the currently built [[org.apache.wayang.core.api.Job]].
+   *
+   * @param cls the [[Class]]
+   * @return this instance
+   */
+  def withUdfJarOf(cls: Class[_]): This
+
+  /**
+   * Register a JAR file with the currently built [[org.apache.wayang.core.api.Job]].
+   *
+   * @param path the path of the JAR file
+   * @return this instance
+   */
+  def withUdfJar(path: String): This
+
+  /**
+   * Provide a [[ClassTag]] for the constructed [[DataQuanta]].
+   *
+   * @return the [[ClassTag]]
+   */
+  protected[api] implicit def classTag: ClassTag[Out] = ClassTag(outputTypeTrap.typeClass)
+
+  /**
+   * Feed the built [[DataQuanta]] into a [[MapOperator]].
+   *
+   * @param udf the UDF for the [[MapOperator]]
+   * @return a [[MapDataQuantaBuilder]]
+   */
+  def map[NewOut](udf: SerializableFunction[Out, NewOut]) = new MapDataQuantaBuilder(this, udf)
+
+  /**
+   * Feed the built [[DataQuanta]] into a [[MapOperator]] with a [[org.apache.wayang.basic.function.ProjectionDescriptor]].
+   *
+   * @param fieldNames field names for the [[org.apache.wayang.basic.function.ProjectionDescriptor]]
+   * @return a [[MapDataQuantaBuilder]]
+   */
+  def project[NewOut](fieldNames: Array[String]) = new ProjectionDataQuantaBuilder(this, fieldNames)
+
+  /**
+   * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.FilterOperator]].
+   *
+   * @param udf filter UDF
+   * @return a [[FilterDataQuantaBuilder]]
+   */
+  def filter(udf: SerializablePredicate[Out]) = new FilterDataQuantaBuilder(this, udf)
+
+  /**
+   * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.FlatMapOperator]].
+   *
+   * @param udf the UDF for the [[org.apache.wayang.basic.operators.FlatMapOperator]]
+   * @return a [[FlatMapDataQuantaBuilder]]
+   */
+  def flatMap[NewOut](udf: SerializableFunction[Out, java.lang.Iterable[NewOut]]) = new FlatMapDataQuantaBuilder(this, udf)
+
+  /**
+   * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.MapPartitionsOperator]].
+   *
+   * @param udf the UDF for the [[org.apache.wayang.basic.operators.MapPartitionsOperator]]
+   * @return a [[MapPartitionsDataQuantaBuilder]]
+   */
+  def mapPartitions[NewOut](udf: SerializableFunction[java.lang.Iterable[Out], java.lang.Iterable[NewOut]]) =
+    new MapPartitionsDataQuantaBuilder(this, udf)
+
+  /**
+   * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.SampleOperator]].
+   *
+   * @param sampleSize the absolute size of the sample
+   * @return a [[SampleDataQuantaBuilder]]
+   */
+  def sample(sampleSize: Int): SampleDataQuantaBuilder[Out] = this.sample(new IntUnaryOperator {
+    override def applyAsInt(operand: Int): Int = sampleSize
+  })
+
+
+  /**
+   * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.SampleOperator]].
+   *
+   * @param sampleSizeFunction the absolute size of the sample as a function of the current iteration number
+   * @return a [[SampleDataQuantaBuilder]]
+   */
+  def sample(sampleSizeFunction: IntUnaryOperator) = new SampleDataQuantaBuilder[Out](this, sampleSizeFunction)
+
+  /**
+   * Annotates a key to this instance.
+   *
+   * @param keyExtractor extracts the key from the data quanta
+   * @return a [[KeyedDataQuantaBuilder]]
+   */
+  def keyBy[Key](keyExtractor: SerializableFunction[Out, Key]) = new KeyedDataQuantaBuilder[Out, Key](this, keyExtractor)
+
+  /**
+   * Feed the built [[DataQuanta]] into a [[GlobalReduceOperator]].
+   *
+   * @param udf the UDF for the [[GlobalReduceOperator]]
+   * @return a [[GlobalReduceDataQuantaBuilder]]
+   */
+  def reduce(udf: SerializableBinaryOperator[Out]) = new GlobalReduceDataQuantaBuilder(this, udf)
+
+  /**
+   * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.ReduceByOperator]].
+   *
+   * @param keyUdf the key UDF for the [[org.apache.wayang.basic.operators.ReduceByOperator]]
+   * @param udf    the UDF for the [[org.apache.wayang.basic.operators.ReduceByOperator]]
+   * @return a [[ReduceByDataQuantaBuilder]]
+   */
+  def reduceByKey[Key](keyUdf: SerializableFunction[Out, Key], udf: SerializableBinaryOperator[Out]) =
+    new ReduceByDataQuantaBuilder(this, keyUdf, udf)
+
+  /**
+   * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.MaterializedGroupByOperator]].
+   *
+   * @param keyUdf the key UDF for the [[org.apache.wayang.basic.operators.MaterializedGroupByOperator]]
+   * @return a [[GroupByDataQuantaBuilder]]
+   */
+  def groupByKey[Key](keyUdf: SerializableFunction[Out, Key]) =
+    new GroupByDataQuantaBuilder(this, keyUdf)
+
+  /**
+   * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.GlobalMaterializedGroupOperator]].
+   *
+   * @return a [[GlobalGroupDataQuantaBuilder]]
+   */
+  def group() = new GlobalGroupDataQuantaBuilder(this)
+
+  /**
+   * Feed the built [[DataQuanta]] of this and the given instance into a
+   * [[org.apache.wayang.basic.operators.UnionAllOperator]].
+   *
+   * @param that the other [[DataQuantaBuilder]] to union with
+   * @return a [[UnionDataQuantaBuilder]]
+   */
+  def union(that: DataQuantaBuilder[_, Out]) = new UnionDataQuantaBuilder(this, that)
+
+  /**
+   * Feed the built [[DataQuanta]] of this and the given instance into a
+   * [[org.apache.wayang.basic.operators.IntersectOperator]].
+   *
+   * @param that the other [[DataQuantaBuilder]] to intersect with
+   * @return an [[IntersectDataQuantaBuilder]]
+   */
+  def intersect(that: DataQuantaBuilder[_, Out]) = new IntersectDataQuantaBuilder(this, that)
+
+  /**
+   * Feed the built [[DataQuanta]] of this and the given instance into a
+   * [[org.apache.wayang.basic.operators.JoinOperator]].
+   *
+   * @param thisKeyUdf the key extraction UDF for this instance
+   * @param that       the other [[DataQuantaBuilder]] to join with
+   * @param thatKeyUdf the key extraction UDF for `that` instance
+   * @return a [[JoinDataQuantaBuilder]]
+   */
+  def join[ThatOut, Key](thisKeyUdf: SerializableFunction[Out, Key],
+                         that: DataQuantaBuilder[_, ThatOut],
+                         thatKeyUdf: SerializableFunction[ThatOut, Key]) =
+    new JoinDataQuantaBuilder(this, that, thisKeyUdf, thatKeyUdf)
+
+  /**
+   * Feed the built [[DataQuanta]] of this and the given instance into a
+   * [[org.apache.wayang.basic.operators.CoGroupOperator]].
+   *
+   * @param thisKeyUdf the key extraction UDF for this instance
+   * @param that       the other [[DataQuantaBuilder]] to join with
+   * @param thatKeyUdf the key extraction UDF for `that` instance
+   * @return a [[CoGroupDataQuantaBuilder]]
+   */
+  def coGroup[ThatOut, Key](thisKeyUdf: SerializableFunction[Out, Key],
+                            that: DataQuantaBuilder[_, ThatOut],
+                            thatKeyUdf: SerializableFunction[ThatOut, Key]) =
+    new CoGroupDataQuantaBuilder(this, that, thisKeyUdf, thatKeyUdf)
+
+
+  /**
+   * Feed the built [[DataQuanta]] of this and the given instance into a
+   * [[org.apache.wayang.basic.operators.SortOperator]].
+   *
+   * @param keyUdf the key extraction UDF for this instance
+   * @return a [[SortDataQuantaBuilder]]
+   */
+  def sort[Key](keyUdf: SerializableFunction[Out, Key]) =
+    new SortDataQuantaBuilder(this, keyUdf)
+
+  /**
+   * Feed the built [[DataQuanta]] of this and the given instance into a
+   * [[org.apache.wayang.basic.operators.CartesianOperator]].
+   *
+   * @return a [[CartesianDataQuantaBuilder]]
+   */
+  def cartesian[ThatOut](that: DataQuantaBuilder[_, ThatOut]) = new CartesianDataQuantaBuilder(this, that)
+
+  /**
+   * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.ZipWithIdOperator]].
+   *
+   * @return a [[ZipWithIdDataQuantaBuilder]] representing the [[org.apache.wayang.basic.operators.ZipWithIdOperator]]'s output
+   */
+  def zipWithId = new ZipWithIdDataQuantaBuilder(this)
+
+  /**
+   * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.DistinctOperator]].
+   *
+   * @return a [[DistinctDataQuantaBuilder]] representing the [[org.apache.wayang.basic.operators.DistinctOperator]]'s output
+   */
+  def distinct = new DistinctDataQuantaBuilder(this)
+
+  /**
+   * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.CountOperator]].
+   *
+   * @return a [[CountDataQuantaBuilder]] representing the [[org.apache.wayang.basic.operators.CountOperator]]'s output
+   */
+  def count = new CountDataQuantaBuilder(this)
+
+  /**
+   * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.DoWhileOperator]].
+   *
+   * @return a [[DoWhileDataQuantaBuilder]]
+   */
+  def doWhile[Conv](conditionUdf: SerializablePredicate[JavaCollection[Conv]],
+                    bodyBuilder: JavaFunction[DataQuantaBuilder[_, Out], WayangTuple[DataQuantaBuilder[_, Out], DataQuantaBuilder[_, Conv]]]) =
+    new DoWhileDataQuantaBuilder(this, conditionUdf.asInstanceOf[SerializablePredicate[JavaCollection[Conv]]], bodyBuilder)
+
+  /**
+   * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.RepeatOperator]].
+   *
+   * @return a [[DoWhileDataQuantaBuilder]]
+   */
+  def repeat(numRepetitions: Int, bodyBuilder: JavaFunction[DataQuantaBuilder[_, Out], DataQuantaBuilder[_, Out]]) =
+    new RepeatDataQuantaBuilder(this, numRepetitions, bodyBuilder)
+
+  /**
+   * Feed the built [[DataQuanta]] into a custom [[Operator]] with a single [[org.apache.wayang.core.plan.wayangplan.InputSlot]]
+   * and a single [[OutputSlot]].
+   *
+   * @param operator the custom [[Operator]]
+   * @tparam T the type of the output [[DataQuanta]]
+   * @return a [[CustomOperatorDataQuantaBuilder]]
+   */
+  def customOperator[T](operator: Operator) = {
+    assert(operator.getNumInputs == 1, "customOperator(...) only allows for operators with a single input.")
+    assert(operator.getNumOutputs == 1, "customOperator(...) only allows for operators with a single output.")
+    new CustomOperatorDataQuantaBuilder[T](operator, 0, new DataQuantaBuilderCache, this)
+  }
+
+  /**
+   * Feed the built [[DataQuanta]] into a [[LocalCallbackSink]] that collects all data quanta locally. This triggers
+   * execution of the constructed [[WayangPlan]].
+   *
+   * @return the collected data quanta
+   */
+  def collect(): JavaCollection[Out] = {
+    import scala.collection.JavaConversions._
+    this.dataQuanta().collect()
+  }
+
+  /**
+   * Feed the built [[DataQuanta]] into a [[JavaFunction]] that runs locally. This triggers
+   * execution of the constructed [[WayangPlan]].
+   *
+   * @param f the [[JavaFunction]]
+   * @return the collected data quanta
+   */
+  def forEach(f: Consumer[Out]): Unit = this.dataQuanta().foreachJava(f)
+
+  /**
+   * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.TextFileSink]]. This triggers
+   * execution of the constructed [[WayangPlan]].
+   *
+   * @param url     the URL of the file to be written
+   * @param jobName optional name for the [[WayangPlan]]
+   * @return the collected data quanta
+   */
+  def writeTextFile(url: String, formatterUdf: SerializableFunction[Out, String], jobName: String): Unit =
+    this.writeTextFile(url, formatterUdf, jobName, null)
+
+  /**
+   * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.TextFileSink]]. This triggers
+   * execution of the constructed [[WayangPlan]].
+   *
+   * @param url the URL of the file to be written
+   * @return the collected data quanta
+   */
+  def writeTextFile(url: String,
+                    formatterUdf: SerializableFunction[Out, String],
+                    jobName: String,
+                    udfLoadProfileEstimator: LoadProfileEstimator): Unit = {
+    this.javaPlanBuilder.withJobName(jobName)
+    this.dataQuanta().writeTextFileJava(url, formatterUdf, udfLoadProfileEstimator)
+  }
+
+  /**
+   * Enriches the set of operations to [[Record]]-based ones. This instances must deal with data quanta of
+   * type [[Record]], though. Because of Java's type erasure, we need to leave it up to you whether this
+   * operation is applicable.
+   *
+   * @return a [[RecordDataQuantaBuilder]]
+   */
+  def asRecords[T <: RecordDataQuantaBuilder[T]]: RecordDataQuantaBuilder[T] = {
+    this match {
+      case records: RecordDataQuantaBuilder[_] => records.asInstanceOf[RecordDataQuantaBuilder[T]]
+      case _ => new RecordDataQuantaBuilderDecorator(this.asInstanceOf[DataQuantaBuilder[_, Record]])
+    }
+  }
+
+  /**
+   * Enriches the set of operations to [[Edge]]-based ones. This instances must deal with data quanta of
+   * type [[Edge]], though. Because of Java's type erasure, we need to leave it up to you whether this
+   * operation is applicable.
+   *
+   * @return a [[EdgeDataQuantaBuilder]]
+   */
+  def asEdges[T <: EdgeDataQuantaBuilder[T]]: EdgeDataQuantaBuilder[T] = {
+    this match {
+      case edges: RecordDataQuantaBuilder[_] => edges.asInstanceOf[EdgeDataQuantaBuilder[T]]
+      case _ => new EdgeDataQuantaBuilderDecorator(this.asInstanceOf[DataQuantaBuilder[_, Edge]])
+    }
+  }
+
+  /**
+   * Get or create the [[DataQuanta]] built by this instance.
+   *
+   * @return the [[DataQuanta]]
+   */
+  protected[api] def dataQuanta(): DataQuanta[Out]
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/JoinedDataQuanta.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/JoinedDataQuanta.scala
new file mode 100644
index 0000000..99768cc
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/JoinedDataQuanta.scala
@@ -0,0 +1,56 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ */
+
+package org.apache.wayang.api.dataquanta
+
+import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator
+import org.apache.wayang.basic.data.{Tuple2 => WayangTuple2}
+
+import scala.reflect.ClassTag
+
+/**
+ * This class amends joined [[DataQuanta]] with additional operations.
+ */
+class JoinedDataQuanta[Out0: ClassTag, Out1: ClassTag]
+(val dataQuanta: DataQuanta[WayangTuple2[Out0, Out1]]) {
+
+  /**
+   * Assembles a new element from a join product tuple.
+   *
+   * @param udf     creates the output data quantum from two joinable data quanta
+   * @param udfLoad optional [[LoadProfileEstimator]] for the `udf`
+   * @return the join product [[DataQuanta]]
+   */
+  def assemble[NewOut: ClassTag](udf: (Out0, Out1) => NewOut,
+                                 udfLoad: LoadProfileEstimator = null):
+  DataQuanta[NewOut] =
+    dataQuanta.map(joinTuple => udf.apply(joinTuple.field0, joinTuple.field1), udfLoad)
+
+  /**
+   * Assembles a new element from a join product tuple.
+   *
+   * @param assembler creates the output data quantum from two joinable data quanta
+   * @param udfLoad   optional [[LoadProfileEstimator]] for the `udf`
+   * @return the join product [[DataQuanta]]
+   */
+  def assembleJava[NewOut: ClassTag](assembler: JavaBiFunction[Out0, Out1, NewOut],
+                                     udfLoad: LoadProfileEstimator = null): DataQuanta[NewOut] =
+    dataQuanta.map(join => assembler.apply(join.field0, join.field1), udfLoad)
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/KeyedDataQuanta.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/KeyedDataQuanta.scala
new file mode 100644
index 0000000..fb6e3f1
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/KeyedDataQuanta.scala
@@ -0,0 +1,53 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ */
+
+package org.apache.wayang.api.dataquanta
+
+import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction
+import org.apache.wayang.basic.data.{Tuple2 => WayangTuple2}
+
+import scala.reflect.ClassTag
+
+/**
+ * This class provides operations on [[DataQuanta]] with additional operations.
+ */
+class KeyedDataQuanta[Out: ClassTag, Key: ClassTag](val dataQuanta: DataQuanta[Out],
+                                                    val keyExtractor: SerializableFunction[Out, Key]) {
+
+  /**
+   * Performs a join. The join fields are governed by the [[KeyedDataQuanta]]'s keys.
+   *
+   * @param that the other [[KeyedDataQuanta]] to join with
+   * @return the join product [[DataQuanta]]
+   */
+  def join[ThatOut: ClassTag](that: KeyedDataQuanta[ThatOut, Key]):
+  DataQuanta[WayangTuple2[Out, ThatOut]] =
+    dataQuanta.joinJava[ThatOut, Key](this.keyExtractor, that.dataQuanta, that.keyExtractor)
+
+  /**
+   * Performs a co-group. The grouping fields are governed by the [[KeyedDataQuanta]]'s keys.
+   *
+   * @param that the other [[KeyedDataQuanta]] to co-group with
+   * @return the co-grouped [[DataQuanta]]
+   */
+  def coGroup[ThatOut: ClassTag](that: KeyedDataQuanta[ThatOut, Key]):
+  DataQuanta[WayangTuple2[java.lang.Iterable[Out], java.lang.Iterable[ThatOut]]] =
+    dataQuanta.coGroupJava[ThatOut, Key](this.keyExtractor, that.dataQuanta, that.keyExtractor)
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/BasicDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/BasicDataQuantaBuilder.scala
similarity index 96%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/BasicDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/BasicDataQuantaBuilder.scala
index 9aad1a1..78cb3bb 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/BasicDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/BasicDataQuantaBuilder.scala
@@ -17,11 +17,12 @@
  *   under the License.
  */
 
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
 
 import de.hpi.isg.profiledb.store.model.Experiment
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
 import org.apache.wayang.api.util.TypeTrap
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
 import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator
 import org.apache.wayang.core.platform.Platform
 import org.apache.wayang.core.types.DataSetType
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CartesianDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/CartesianDataQuantaBuilder.scala
similarity index 91%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CartesianDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/CartesianDataQuantaBuilder.scala
index 40d4fae..fad5ff8 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CartesianDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/CartesianDataQuantaBuilder.scala
@@ -17,9 +17,10 @@
  *   under the License.
  */
 
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
 
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder, dataSetType}
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
+import org.apache.wayang.api.{JavaPlanBuilder, dataSetType}
 import org.apache.wayang.basic.data.{Tuple2 => WT2}
 /**
  * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.CartesianOperator]]s.
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CoGroupDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/CoGroupDataQuantaBuilder.scala
similarity index 96%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CoGroupDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/CoGroupDataQuantaBuilder.scala
index aa1d88a..79f2b5f 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CoGroupDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/CoGroupDataQuantaBuilder.scala
@@ -17,9 +17,10 @@
  *   under the License.
  */
 
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
 
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder, dataSetType}
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
+import org.apache.wayang.api.{JavaPlanBuilder, dataSetType}
 import org.apache.wayang.basic.data.{Tuple2 => WT2}
 import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction
 import org.apache.wayang.core.optimizer.costs.LoadEstimator
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CountDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/CountDataQuantaBuilder.scala
similarity index 88%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CountDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/CountDataQuantaBuilder.scala
index d8f7d18..9d51e7c 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CountDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/CountDataQuantaBuilder.scala
@@ -17,9 +17,10 @@
  *   under the License.
  */
 
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
 
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder, dataSetType}
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
+import org.apache.wayang.api.{JavaPlanBuilder, dataSetType}
 
 /**
  * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.CountOperator]]s.
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CustomOperatorDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/CustomOperatorDataQuantaBuilder.scala
similarity index 93%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CustomOperatorDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/CustomOperatorDataQuantaBuilder.scala
index 5fca840..09c88f2 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CustomOperatorDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/CustomOperatorDataQuantaBuilder.scala
@@ -17,10 +17,11 @@
  *   under the License.
  */
 
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
 
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
 import org.apache.wayang.api.util.DataQuantaBuilderCache
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
 import org.apache.wayang.core.plan.wayangplan.{Operator, OutputSlot, WayangPlan}
 
 /**
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/DistinctDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/DistinctDataQuantaBuilder.scala
similarity index 90%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/DistinctDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/DistinctDataQuantaBuilder.scala
index 9e8e044..f112388 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/DistinctDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/DistinctDataQuantaBuilder.scala
@@ -17,10 +17,11 @@
  *   under the License.
  */
 
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
 
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
 import org.apache.wayang.api.util.TypeTrap
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
 
 /**
  * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.DistinctOperator]]s.
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/DoWhileDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/DoWhileDataQuantaBuilder.scala
similarity index 96%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/DoWhileDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/DoWhileDataQuantaBuilder.scala
index 34de7be..aabbb3a 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/DoWhileDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/DoWhileDataQuantaBuilder.scala
@@ -17,10 +17,11 @@
  *   under the License.
  */
 
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
 
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
 import org.apache.wayang.api.util.TypeTrap
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
 import org.apache.wayang.core.function.FunctionDescriptor.SerializablePredicate
 import org.apache.wayang.core.optimizer.costs.{LoadProfile, LoadProfileEstimator}
 import org.apache.wayang.core.types.DataSetType
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/FakeDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/FakeDataQuantaBuilder.scala
similarity index 90%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/FakeDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/FakeDataQuantaBuilder.scala
index 6b52c40..a8982d8 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/FakeDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/FakeDataQuantaBuilder.scala
@@ -17,9 +17,10 @@
  *   under the License.
  */
 
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
 
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
 
 import scala.reflect.ClassTag
 
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/FilterDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/FilterDataQuantaBuilder.scala
similarity index 95%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/FilterDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/FilterDataQuantaBuilder.scala
index 3d33c80..ce546e3 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/FilterDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/FilterDataQuantaBuilder.scala
@@ -17,10 +17,11 @@
  *   under the License.
  */
 
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
 
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
 import org.apache.wayang.api.util.TypeTrap
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
 import org.apache.wayang.basic.operators.MapOperator
 import org.apache.wayang.core.function.FunctionDescriptor.{SerializableFunction, SerializablePredicate}
 import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/FlatMapDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/FlatMapDataQuantaBuilder.scala
similarity index 95%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/FlatMapDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/FlatMapDataQuantaBuilder.scala
index 83b4383..19aadc2 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/FlatMapDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/FlatMapDataQuantaBuilder.scala
@@ -17,9 +17,10 @@
  *   under the License.
  */
 
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
 
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
 import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction
 import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
 import org.apache.wayang.core.optimizer.costs.{LoadProfile, LoadProfileEstimator}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/GlobalGroupDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/GlobalGroupDataQuantaBuilder.scala
similarity index 88%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/GlobalGroupDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/GlobalGroupDataQuantaBuilder.scala
index abc1d9a..794d8c4 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/GlobalGroupDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/GlobalGroupDataQuantaBuilder.scala
@@ -17,9 +17,10 @@
  *   under the License.
  */
 
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
 
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
 
 /**
  * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.GlobalMaterializedGroupOperator]]s.
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/GlobalReduceDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/GlobalReduceDataQuantaBuilder.scala
similarity index 94%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/GlobalReduceDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/GlobalReduceDataQuantaBuilder.scala
index c3efc95..dac4e4e 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/GlobalReduceDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/GlobalReduceDataQuantaBuilder.scala
@@ -17,10 +17,11 @@
  *   under the License.
  */
 
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
 
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
 import org.apache.wayang.api.util.TypeTrap
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
 import org.apache.wayang.core.function.FunctionDescriptor.SerializableBinaryOperator
 import org.apache.wayang.core.optimizer.costs.{LoadProfile, LoadProfileEstimator}
 import org.apache.wayang.core.types.DataSetType
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/GroupByDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/GroupByDataQuantaBuilder.scala
similarity index 94%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/GroupByDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/GroupByDataQuantaBuilder.scala
index b61c71e..3531271 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/GroupByDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/GroupByDataQuantaBuilder.scala
@@ -17,9 +17,10 @@
  *   under the License.
  */
 
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
 
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
 import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction
 import org.apache.wayang.core.optimizer.costs.{LoadProfile, LoadProfileEstimator}
 import org.apache.wayang.core.types.DataSetType
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/IntersectDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/IntersectDataQuantaBuilder.scala
similarity index 90%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/IntersectDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/IntersectDataQuantaBuilder.scala
index 6db8ffe..71fcf76 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/IntersectDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/IntersectDataQuantaBuilder.scala
@@ -17,9 +17,10 @@
  *   under the License.
  */
 
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
 
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
 
 /**
  * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.IntersectOperator]]s.
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/JoinDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/JoinDataQuantaBuilder.scala
similarity index 97%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/JoinDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/JoinDataQuantaBuilder.scala
index ab49ffb..4cfb938 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/JoinDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/JoinDataQuantaBuilder.scala
@@ -17,9 +17,10 @@
  *   under the License.
  */
 
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
 
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder, dataSetType}
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
+import org.apache.wayang.api.{JavaPlanBuilder, dataSetType}
 import org.apache.wayang.basic.data.{Tuple2 => WT2}
 import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBiFunction, SerializableFunction}
 import org.apache.wayang.core.optimizer.costs.LoadEstimator
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/KeyedDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/KeyedDataQuantaBuilder.scala
similarity index 92%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/KeyedDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/KeyedDataQuantaBuilder.scala
index 9ddc101..7e34472 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/KeyedDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/KeyedDataQuantaBuilder.scala
@@ -17,9 +17,10 @@
  *   under the License.
  */
 
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
 
-import org.apache.wayang.api.{DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.DataQuantaBuilder
 import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction
 
 /**
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/LoadCollectionDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/LoadCollectionDataQuantaBuilder.scala
similarity index 91%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/LoadCollectionDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/LoadCollectionDataQuantaBuilder.scala
index 748416e..d5ee73f 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/LoadCollectionDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/LoadCollectionDataQuantaBuilder.scala
@@ -17,9 +17,10 @@
  *   under the License.
  */
 
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
 
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
 import org.apache.wayang.core.types.DataSetType
 import org.apache.wayang.core.util.WayangCollections
 
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/MapDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/MapDataQuantaBuilder.scala
similarity index 94%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/MapDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/MapDataQuantaBuilder.scala
index deb15a9..f0927f9 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/MapDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/MapDataQuantaBuilder.scala
@@ -17,9 +17,10 @@
  *   under the License.
  */
 
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
 
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
 import org.apache.wayang.basic.operators.MapOperator
 import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction
 import org.apache.wayang.core.optimizer.costs.{LoadProfile, LoadProfileEstimator}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/MapPartitionsDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/MapPartitionsDataQuantaBuilder.scala
similarity index 95%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/MapPartitionsDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/MapPartitionsDataQuantaBuilder.scala
index e686565..708cfc6 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/MapPartitionsDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/MapPartitionsDataQuantaBuilder.scala
@@ -17,9 +17,10 @@
  *   under the License.
  */
 
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
 
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
 import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction
 import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
 import org.apache.wayang.core.optimizer.costs.{LoadProfile, LoadProfileEstimator}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/ProjectionDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/ProjectionDataQuantaBuilder.scala
similarity index 90%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/ProjectionDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/ProjectionDataQuantaBuilder.scala
index 28ede1e..9e3f7ce 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/ProjectionDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/ProjectionDataQuantaBuilder.scala
@@ -17,9 +17,10 @@
  *   under the License.
  */
 
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
 
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
 
 /**
  * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.MapOperator]]s with
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/ReduceByDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/ReduceByDataQuantaBuilder.scala
similarity index 96%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/ReduceByDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/ReduceByDataQuantaBuilder.scala
index ca14576..9dbd9bf 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/ReduceByDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/ReduceByDataQuantaBuilder.scala
@@ -17,10 +17,11 @@
  *   under the License.
  */
 
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
 
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
 import org.apache.wayang.api.util.TypeTrap
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
 import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBinaryOperator, SerializableFunction}
 import org.apache.wayang.core.optimizer.costs.{LoadProfile, LoadProfileEstimator}
 import org.apache.wayang.core.types.DataSetType
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/RepeatDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/RepeatDataQuantaBuilder.scala
similarity index 92%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/RepeatDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/RepeatDataQuantaBuilder.scala
index f7170f7..5246823 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/RepeatDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/RepeatDataQuantaBuilder.scala
@@ -17,10 +17,11 @@
  *   under the License.
  */
 
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
 
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
 import org.apache.wayang.api.util.TypeTrap
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
 
 import java.util.function.{Function => JavaFunction}
 
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/SampleDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/SampleDataQuantaBuilder.scala
similarity index 94%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/SampleDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/SampleDataQuantaBuilder.scala
index 0642fe6..19d0fea 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/SampleDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/SampleDataQuantaBuilder.scala
@@ -17,10 +17,11 @@
  *   under the License.
  */
 
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
 
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
 import org.apache.wayang.api.util.TypeTrap
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
 import org.apache.wayang.basic.operators.SampleOperator
 
 import java.util.function.IntUnaryOperator
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/SortDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/SortDataQuantaBuilder.scala
similarity index 95%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/SortDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/SortDataQuantaBuilder.scala
index d1af25a..ab6511d 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/SortDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/SortDataQuantaBuilder.scala
@@ -17,10 +17,11 @@
  *   under the License.
  */
 
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
 
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
 import org.apache.wayang.api.util.TypeTrap
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
 import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction
 import org.apache.wayang.core.optimizer.costs.LoadEstimator
 import org.apache.wayang.core.types.DataSetType
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/UnarySourceDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/UnarySourceDataQuantaBuilder.scala
similarity index 89%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/UnarySourceDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/UnarySourceDataQuantaBuilder.scala
index d97b08b..9180e94 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/UnarySourceDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/UnarySourceDataQuantaBuilder.scala
@@ -17,9 +17,10 @@
  *   under the License.
  */
 
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
 
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
 import org.apache.wayang.core.plan.wayangplan.UnarySource
 
 /**
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/UnionDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/UnionDataQuantaBuilder.scala
similarity index 90%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/UnionDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/UnionDataQuantaBuilder.scala
index 04fc6ac..aae2f5e 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/UnionDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/UnionDataQuantaBuilder.scala
@@ -17,9 +17,10 @@
  *   under the License.
  */
 
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
 
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
 
 /**
  * [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.UnionAllOperator]]s.
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/ZipWithIdDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/ZipWithIdDataQuantaBuilder.scala
similarity index 89%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/ZipWithIdDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/ZipWithIdDataQuantaBuilder.scala
index ded640c..e5d246a 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/ZipWithIdDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/ZipWithIdDataQuantaBuilder.scala
@@ -17,9 +17,10 @@
  *   under the License.
  */
 
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
 
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder, dataSetType}
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
+import org.apache.wayang.api.{JavaPlanBuilder, dataSetType}
 import org.apache.wayang.basic.data.{Tuple2 => WT2}
 
 /**
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/graph/EdgeDataQuanta.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/graph/EdgeDataQuanta.scala
index 6ac73ac..99a1df6 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/graph/EdgeDataQuanta.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/graph/EdgeDataQuanta.scala
@@ -19,6 +19,7 @@
 package org.apache.wayang.api.graph
 
 import org.apache.wayang.api._
+import org.apache.wayang.api.dataquanta.DataQuanta
 import org.apache.wayang.basic.data.Record
 import org.apache.wayang.basic.operators.{MapOperator, PageRankOperator}
 import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/graph/EdgeDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/graph/EdgeDataQuantaBuilder.scala
index 1eaec53..526fb6a 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/graph/EdgeDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/graph/EdgeDataQuantaBuilder.scala
@@ -18,9 +18,10 @@
 
 package org.apache.wayang.api.graph
 
-import org.apache.wayang.api.dataquantabuilder.BasicDataQuantaBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
 import org.apache.wayang.api.util.DataQuantaBuilderDecorator
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder, _}
+import org.apache.wayang.api.{JavaPlanBuilder,_ }
+import org.apache.wayang.api.dataquanta.builder.BasicDataQuantaBuilder
 import org.apache.wayang.basic.operators.PageRankOperator
 import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
 
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/graph/package.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/graph/package.scala
index e43fe81..a761e6a 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/graph/package.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/graph/package.scala
@@ -18,6 +18,7 @@
 
 package org.apache.wayang.api
 
+import org.apache.wayang.api.dataquanta.DataQuanta
 import org.apache.wayang.basic.data.{Tuple2 => T2}
 
 /**
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/package.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/package.scala
index f17f5a5..7dfefd0 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/package.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/package.scala
@@ -18,9 +18,10 @@
 
 package org.apache.wayang
 
+import org.apache.wayang.api.dataquanta.{DataQuanta, JoinedDataQuanta}
+
 import _root_.java.lang.{Class => JavaClass, Iterable => JavaIterable}
 import _root_.java.util.function.{Consumer, ToLongBiFunction, ToLongFunction}
-
 import org.apache.wayang.basic.data.{Record, Tuple2 => WayangTuple2}
 import org.apache.wayang.core.api.WayangContext
 import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBinaryOperator, SerializableFunction, SerializablePredicate}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/util/DataQuantaBuilderCache.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/util/DataQuantaBuilderCache.scala
index 21decc2..456efb4 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/util/DataQuantaBuilderCache.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/util/DataQuantaBuilderCache.scala
@@ -18,10 +18,10 @@
 
 package org.apache.wayang.api.util
 
-import org.apache.wayang.api.DataQuanta
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
 
 /**
-  * Caches products of [[org.apache.wayang.api.DataQuantaBuilder]]s that need to be executed at once, e.g., because they
+  * Caches products of [[DataQuantaBuilder]]s that need to be executed at once, e.g., because they
   * belong to different [[org.apache.wayang.core.plan.wayangplan.OutputSlot]]s of the same custom [[org.apache.wayang.core.plan.wayangplan.Operator]].
   */
 class DataQuantaBuilderCache {
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/util/DataQuantaBuilderDecorator.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/util/DataQuantaBuilderDecorator.scala
index 8ea3dd2..a3b6b58 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/util/DataQuantaBuilderDecorator.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/util/DataQuantaBuilderDecorator.scala
@@ -19,7 +19,8 @@
 package org.apache.wayang.api.util
 
 import de.hpi.isg.profiledb.store.model.Experiment
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
+import org.apache.wayang.api.JavaPlanBuilder
 import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator
 import org.apache.wayang.core.platform.Platform
 import org.apache.wayang.core.types.DataSetType
diff --git a/wayang-api/wayang-api-scala-java/src/test/java/org/apache/wayang/api/JavaApiTest.java b/wayang-api/wayang-api-scala-java/src/test/java/org/apache/wayang/api/JavaApiTest.java
index 71996a0..b21ea79 100644
--- a/wayang-api/wayang-api-scala-java/src/test/java/org/apache/wayang/api/JavaApiTest.java
+++ b/wayang-api/wayang-api-scala-java/src/test/java/org/apache/wayang/api/JavaApiTest.java
@@ -18,8 +18,9 @@
 
 package org.apache.wayang.api;
 
-import org.apache.wayang.api.dataquantabuilder.GlobalReduceDataQuantaBuilder;
-import org.apache.wayang.api.dataquantabuilder.LoadCollectionDataQuantaBuilder;
+import org.apache.wayang.api.dataquanta.DataQuantaBuilder;
+import org.apache.wayang.api.dataquanta.builder.GlobalReduceDataQuantaBuilder;
+import org.apache.wayang.api.dataquanta.builder.LoadCollectionDataQuantaBuilder;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
diff --git a/wayang-api/wayang-api-scala-java/src/test/scala/org/apache/wayang/api/ApiTest.scala b/wayang-api/wayang-api-scala-java/src/test/scala/org/apache/wayang/api/ApiTest.scala
index cc05bd2..0e703c6 100644
--- a/wayang-api/wayang-api-scala-java/src/test/scala/org/apache/wayang/api/ApiTest.scala
+++ b/wayang-api/wayang-api-scala-java/src/test/scala/org/apache/wayang/api/ApiTest.scala
@@ -18,12 +18,13 @@
 
 package org.apache.wayang.api
 
+import org.apache.wayang.api.dataquanta.DataQuanta
+
 import java.io.File
 import java.net.URI
 import java.nio.file.{Files, Paths}
 import java.sql.{Connection, Statement}
 import java.util.function.Consumer
-
 import org.junit.{Assert, Test}
 import org.apache.wayang.basic.WayangBasics
 import org.apache.wayang.core.api.{Configuration, WayangContext}
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-api/pom.xml b/wayang-plugins/wayang-hackit/wayang-hackit-api/pom.xml
index 0839a6b..548f9e6 100644
--- a/wayang-plugins/wayang-hackit/wayang-hackit-api/pom.xml
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-api/pom.xml
@@ -30,12 +30,45 @@
 
     <artifactId>wayang-hackit-api</artifactId>
 
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.apache.wayang</groupId>
+                <artifactId>wayang-commons</artifactId>
+                <version>0.6.0-SNAPSHOT</version>
+                <type>pom</type>
+                <scope>import</scope>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
     <dependencies>
         <dependency>
             <groupId>org.apache.wayang</groupId>
             <artifactId>wayang-api-scala-java_2.11</artifactId>
             <version>0.6.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.wayang</groupId>
+            <artifactId>wayang-java</artifactId>
+            <version>0.6.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/java/org/apache/wayang/api/JavaApiTest.java b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/java/org/apache/wayang/api/JavaApiTest.java
new file mode 100644
index 0000000..aaf1f16
--- /dev/null
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/java/org/apache/wayang/api/JavaApiTest.java
@@ -0,0 +1,711 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api;
+
+import org.apache.wayang.core.api.WayangContext;
+import org.apache.wayang.core.util.WayangCollections;
+import org.apache.wayang.java.Java;
+//import org.apache.wayang.spark.Spark;
+//import org.apache.wayang.sqlite3.Sqlite3;
+//import org.apache.wayang.sqlite3.operators.Sqlite3TableSource;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Test suite for the Java API.
+ */
+public class JavaApiTest {
+
+//    private Configuration sqlite3Configuration;
+//
+//    @Before
+//    public void setUp() throws SQLException, IOException {
+//        // Generate test data.
+//        this.sqlite3Configuration = new Configuration();
+//        File sqlite3dbFile = File.createTempFile("wayang-sqlite3", "db");
+//        sqlite3dbFile.deleteOnExit();
+//        this.sqlite3Configuration.setProperty("wayang.sqlite3.jdbc.url", "jdbc:sqlite:" + sqlite3dbFile.getAbsolutePath());
+//        try (Connection connection = Sqlite3.platform().createDatabaseDescriptor(this.sqlite3Configuration).createJdbcConnection()) {
+//            Statement statement = connection.createStatement();
+//            statement.addBatch("DROP TABLE IF EXISTS customer;");
+//            statement.addBatch("CREATE TABLE customer (name TEXT, age INT);");
+//            statement.addBatch("INSERT INTO customer VALUES ('John', 20)");
+//            statement.addBatch("INSERT INTO customer VALUES ('Timmy', 16)");
+//            statement.addBatch("INSERT INTO customer VALUES ('Evelyn', 35)");
+//            statement.executeBatch();
+//        }
+//    }
+
+    @Test
+    public void testMapReduce() {
+        WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
+        JavaPlanBuilder javaPlanBuilder = new JavaPlanBuilder(wayangContext);
+
+        List<Integer> inputCollection = Arrays.asList(0, 1, 2, 3, 4);
+        Collection<Integer> outputCollection = javaPlanBuilder
+                .loadCollection(inputCollection).withName("load numbers")
+                .map(i -> i * i).withName("square")
+                .reduce((a, b) -> a + b).withName("sum")
+                .collect();
+
+        Assert.assertEquals(WayangCollections.asSet(1 + 4 + 9 + 16), WayangCollections.asSet(outputCollection));
+    }
+
+//    @Test
+//    public void testMapReduceBy() {
+//        WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
+//        JavaPlanBuilder javaPlanBuilder = new JavaPlanBuilder(wayangContext);
+//
+//        List<Integer> inputCollection = Arrays.asList(0, 1, 2, 3, 4);
+//        Collection<Integer> outputCollection = javaPlanBuilder
+//                .loadCollection(inputCollection).withName("load numbers")
+//                .map(i -> i * i).withName("square")
+//                .reduceByKey(i -> i & 1, (a, b) -> a + b).withName("sum")
+//                .collect();
+//
+//        Assert.assertEquals(WayangCollections.asSet(4 + 16, 1 + 9), WayangCollections.asSet(outputCollection));
+//    }
+//
+//    @Test
+//    public void testBroadcast2() {
+//        WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
+//        JavaPlanBuilder javaPlanBuilder = new JavaPlanBuilder(wayangContext);
+//
+//        List<Integer> inputCollection = Arrays.asList(0, 1, 2, 3, 4);
+//        List<Integer> offsetCollection = Collections.singletonList(-2);
+//
+//        LoadCollectionDataQuantaBuilder<Integer> offsetDataQuanta = javaPlanBuilder
+//                .loadCollection(offsetCollection)
+//                .withName("load offset");
+//
+//        Collection<Integer> outputCollection = javaPlanBuilder
+//                .loadCollection(inputCollection).withName("load numbers")
+//                .map(new AddOffset("offset")).withName("add offset").withBroadcast(offsetDataQuanta, "offset")
+//                .collect();
+//
+//        Assert.assertEquals(WayangCollections.asSet(-2, -1, 0, 1, 2), WayangCollections.asSet(outputCollection));
+//    }
+//
+//    @Test
+//    public void testCustomOperatorShortCut() {
+//        // Set up WayangContext.
+//        WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
+//
+//        final List<Integer> inputValues = WayangArrays.asList(0, 1, 2, 3);
+//
+//        // Build and execute a Wayang plan.
+//        final Collection<Integer> outputValues = new JavaPlanBuilder(wayangContext)
+//                .loadCollection(inputValues).withName("Load input values")
+//                .<Integer>customOperator(new JavaMapOperator<>(
+//                        DataSetType.createDefault(Integer.class),
+//                        DataSetType.createDefault(Integer.class),
+//                        new TransformationDescriptor<>(
+//                                i -> i + 2,
+//                                Integer.class, Integer.class
+//                        )
+//                )).withName("Add 2")
+//                .collect();
+//
+//        // Check the outcome.
+//        final List<Integer> expectedOutputValues = WayangArrays.asList(2, 3, 4, 5);
+//        Assert.assertEquals(WayangCollections.asSet(expectedOutputValues), WayangCollections.asSet(outputValues));
+//    }
+//
+//    @Test
+//    public void testWordCount() {
+//        // Set up WayangContext.
+//        WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
+//
+//        final List<String> inputValues = Arrays.asList("Big data is big.", "Is data big data?");
+//
+//        // Build and execute a Wayang plan.
+//        final Collection<Tuple2<String, Integer>> outputValues = new JavaPlanBuilder(wayangContext)
+//                .loadCollection(inputValues).withName("Load input values")
+//                .flatMap(line -> Arrays.asList(line.split("\\s+"))).withName("Split words")
+//                .map(token -> token.replaceAll("\\W+", "").toLowerCase()).withName("To lower case")
+//                .map(word -> new Tuple2<>(word, 1)).withName("Attach counter")
+//                .reduceByKey(Tuple2::getField0, (t1, t2) -> new Tuple2<>(t1.field0, t1.field1 + t2.field1)).withName("Sum counters")
+//                .collect();
+//
+//        // Check the outcome.
+//        final Set<Tuple2<String, Integer>> expectedOutputValues = WayangCollections.asSet(
+//                new Tuple2<>("big", 3),
+//                new Tuple2<>("is", 2),
+//                new Tuple2<>("data", 3)
+//        );
+//        Assert.assertEquals(WayangCollections.asSet(expectedOutputValues), WayangCollections.asSet(outputValues));
+//    }
+//
+//    @Test
+//    public void testWordCountOnSparkAndJava() {
+//        // Set up WayangContext.
+//        WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()).with(Spark.basicPlugin());
+//
+//        final List<String> inputValues = Arrays.asList("Big data is big.", "Is data big data?");
+//
+//        // Build and execute a Wayang plan.
+//        final Collection<Tuple2<String, Integer>> outputValues = new JavaPlanBuilder(wayangContext)
+//                .loadCollection(inputValues).withName("Load input values")
+//                .flatMap(line -> Arrays.asList(line.split("\\s+"))).withName("Split words")
+//                .map(token -> token.replaceAll("\\W+", "").toLowerCase()).withName("To lower case")
+//                .map(word -> new Tuple2<>(word, 1)).withName("Attach counter")
+//                .reduceByKey(Tuple2::getField0, (t1, t2) -> new Tuple2<>(t1.field0, t1.field1 + t2.field1)).withName("Sum counters")
+//                .collect();
+//
+//        // Check the outcome.
+//        final Set<Tuple2<String, Integer>> expectedOutputValues = WayangCollections.asSet(
+//                new Tuple2<>("big", 3),
+//                new Tuple2<>("is", 2),
+//                new Tuple2<>("data", 3)
+//        );
+//        Assert.assertEquals(WayangCollections.asSet(expectedOutputValues), WayangCollections.asSet(outputValues));
+//    }
+//
+//    @Test
+//    public void testSample() {
+//        // Set up WayangContext.
+//        WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()).with(Spark.basicPlugin());
+//
+//        // Create some input values.
+//        final List<Integer> inputValues = WayangArrays.asList(WayangArrays.range(100));
+//
+//        // Build and execute a Wayang plan.
+//        final Collection<Integer> outputValues = new JavaPlanBuilder(wayangContext)
+//                .loadCollection(inputValues).withName("Load input values")
+//                .sample(10).withName("Sample")
+//                .collect();
+//
+//        // Check the outcome.
+//        Assert.assertEquals(10, outputValues.size());
+//        Assert.assertEquals(10, WayangCollections.asSet(outputValues).size());
+//
+//    }
+//
+//    @Test
+//    public void testDoWhile() {
+//        // Set up WayangContext.
+//        WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
+//
+//        // Generate test data.
+//        final List<Integer> inputValues = WayangArrays.asList(1, 2);
+//
+//        // Build and execute a word count WayangPlan.
+//
+//        final Collection<Integer> outputValues = new JavaPlanBuilder(wayangContext)
+//                .loadCollection(inputValues).withName("Load input values")
+//                .doWhile(
+//                        values -> values.stream().mapToInt(i -> i).sum() > 100,
+//                        start -> {
+//                            final GlobalReduceDataQuantaBuilder<Integer> sum =
+//                                    start.reduce((a, b) -> a + b).withName("sum");
+//                            return new Tuple<>(
+//                                    start.union(sum).withName("Old+new"),
+//                                    sum
+//                            );
+//                        }
+//                ).withConditionClass(Integer.class).withName("While <= 100")
+//                .collect();
+//
+//        Set<Integer> expectedValues = WayangCollections.asSet(1, 2, 3, 6, 12, 24, 48, 96, 192);
+//        Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
+//    }
+//
+//    private static class AddOffset implements FunctionDescriptor.ExtendedSerializableFunction<Integer, Integer> {
+//
+//        private final String broadcastName;
+//
+//        private int offset;
+//
+//        public AddOffset(String broadcastName) {
+//            this.broadcastName = broadcastName;
+//        }
+//
+//        @Override
+//        public void open(ExecutionContext ctx) {
+//            this.offset = WayangCollections.getSingle(ctx.<Integer>getBroadcast(this.broadcastName));
+//        }
+//
+//        @Override
+//        public Integer apply(Integer input) {
+//            return input + this.offset;
+//        }
+//    }
+//
+//    @Test
+//    public void testRepeat() {
+//        // Set up WayangContext.
+//        WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
+//
+//        // Generate test data.
+//        final List<Integer> inputValues = WayangArrays.asList(1, 2);
+//
+//        // Build and execute a word count WayangPlan.
+//
+//        final Collection<Integer> outputValues = new JavaPlanBuilder(wayangContext)
+//                .loadCollection(inputValues).withName("Load input values")
+//                .repeat(3, start -> start
+//                        .reduce((a, b) -> a * b).withName("Multiply")
+//                        .flatMap(v -> Arrays.asList(v, v + 1)).withName("Duplicate").withOutputClass(Integer.class)
+//                ).withName("Repeat 3x")
+//                .collect();
+//
+//        Set<Integer> expectedValues = WayangCollections.asSet(42, 43);
+//        Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
+//    }
+//
+//    private static class SelectWords implements PredicateDescriptor.ExtendedSerializablePredicate<String> {
+//
+//        private final String broadcastName;
+//
+//        private Collection<Character> selectors;
+//
+//        public SelectWords(String broadcastName) {
+//            this.broadcastName = broadcastName;
+//        }
+//
+//        @Override
+//        public void open(ExecutionContext ctx) {
+//            this.selectors = ctx.getBroadcast(this.broadcastName);
+//        }
+//
+//        @Override
+//        public boolean test(String word) {
+//            return this.selectors.stream().anyMatch(c -> word.indexOf(c) >= 0);
+//        }
+//    }
+//
+//    @Test
+//    public void testBroadcast() {
+//        // Set up WayangContext.
+//        WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
+//        JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
+//
+//        // Generate test data.
+//        final List<String> inputValues = Arrays.asList("Hello", "World", "Hi", "Mars");
+//        final List<Character> selectors = Arrays.asList('o', 'l');
+//
+//        // Execute the job.
+//        final DataQuantaBuilder<?, Character> selectorsDataSet = builder.loadCollection(selectors).withName("Load selectors");
+//        final Collection<String> outputValues = builder
+//                .loadCollection(inputValues).withName("Load input values")
+//                .filter(new SelectWords("selectors")).withName("Filter words")
+//                .withBroadcast(selectorsDataSet, "selectors")
+//                .collect();
+//
+//        // Verify the outcome.
+//        Set<String> expectedValues = WayangCollections.asSet("Hello", "World");
+//        Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
+//    }
+//
+//    @Test
+//    public void testGroupBy() {
+//        // Set up WayangContext.
+//        WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
+//        JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
+//
+//        // Generate test data.
+//        final List<Integer> inputValues = Arrays.asList(1, 2, 3, 4, 5, 7, 8, 9, 10);
+//
+//        // Execute the job.
+//        final Collection<Double> outputValues = builder
+//                .loadCollection(inputValues).withName("Load input values")
+//                .groupByKey(i -> i % 2).withName("group odd and even")
+//                .map(group -> {
+//                    List<Integer> sortedGroup = StreamSupport.stream(group.spliterator(), false)
+//                            .sorted()
+//                            .collect(Collectors.toList());
+//                    int sizeDivTwo = sortedGroup.size() / 2;
+//                    return sortedGroup.size() % 2 == 0 ?
+//                            (sortedGroup.get(sizeDivTwo - 1) + sortedGroup.get(sizeDivTwo)) / 2d :
+//                            (double) sortedGroup.get(sizeDivTwo);
+//                })
+//                .collect();
+//
+//        // Verify the outcome.
+//        Set<Double> expectedValues = WayangCollections.asSet(5d, 6d);
+//        Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
+//    }
+//
+//    @Test
+//    public void testJoin() {
+//        // Set up WayangContext.
+//        WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
+//        JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
+//
+//        // Generate test data.
+//        final List<Tuple2<String, Integer>> inputValues1 = Arrays.asList(
+//                new Tuple2<>("Water", 0),
+//                new Tuple2<>("Tonic", 5),
+//                new Tuple2<>("Juice", 10)
+//        );
+//        final List<Tuple2<String, String>> inputValues2 = Arrays.asList(
+//                new Tuple2<>("Apple juice", "Juice"),
+//                new Tuple2<>("Tap water", "Water"),
+//                new Tuple2<>("Orange juice", "Juice")
+//        );
+//
+//        // Execute the job.
+//        final LoadCollectionDataQuantaBuilder<Tuple2<String, Integer>> dataQuanta1 = builder.loadCollection(inputValues1);
+//        final LoadCollectionDataQuantaBuilder<Tuple2<String, String>> dataQuanta2 = builder.loadCollection(inputValues2);
+//        final Collection<Tuple2<String, Integer>> outputValues = dataQuanta1
+//                .join(Tuple2::getField0, dataQuanta2, Tuple2::getField1)
+//                .map(joinTuple -> new Tuple2<>(joinTuple.getField1().getField0(), joinTuple.getField0().getField1()))
+//                .collect();
+//
+//        // Verify the outcome.
+//        Set<Tuple2<String, Integer>> expectedValues = WayangCollections.asSet(
+//                new Tuple2<>("Apple juice", 10),
+//                new Tuple2<>("Orange juice", 10),
+//                new Tuple2<>("Tap water", 0)
+//        );
+//        Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
+//    }
+//
+//    @Test
+//    public void testJoinAndAssemble() {
+//        // Set up WayangContext.
+//        WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
+//        JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
+//
+//        // Generate test data.
+//        final List<Tuple2<String, Integer>> inputValues1 = Arrays.asList(
+//                new Tuple2<>("Water", 0),
+//                new Tuple2<>("Tonic", 5),
+//                new Tuple2<>("Juice", 10)
+//        );
+//        final List<Tuple2<String, String>> inputValues2 = Arrays.asList(
+//                new Tuple2<>("Apple juice", "Juice"),
+//                new Tuple2<>("Tap water", "Water"),
+//                new Tuple2<>("Orange juice", "Juice")
+//        );
+//
+//        // Execute the job.
+//        final LoadCollectionDataQuantaBuilder<Tuple2<String, Integer>> dataQuanta1 = builder.loadCollection(inputValues1);
+//        final LoadCollectionDataQuantaBuilder<Tuple2<String, String>> dataQuanta2 = builder.loadCollection(inputValues2);
+//        final Collection<Tuple2<String, Integer>> outputValues = dataQuanta1.keyBy(Tuple2::getField0)
+//                .join(dataQuanta2.keyBy(Tuple2::getField1))
+//                .assemble((val1, val2) -> new Tuple2<>(val2.getField0(), val1.getField1()))
+//                .collect();
+//
+//        // Verify the outcome.
+//        Set<Tuple2<String, Integer>> expectedValues = WayangCollections.asSet(
+//                new Tuple2<>("Apple juice", 10),
+//                new Tuple2<>("Orange juice", 10),
+//                new Tuple2<>("Tap water", 0)
+//        );
+//        Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
+//    }
+//
+//    @Test
+//    public void testCoGroup() {
+//        // Set up WayangContext.
+//        WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()).with(Spark.basicPlugin());
+//        JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
+//
+//        // Generate test data.
+//        final List<Tuple2<String, Integer>> inputValues1 = Arrays.asList(
+//                new Tuple2<>("Water", 0),
+//                new Tuple2<>("Cola", 5),
+//                new Tuple2<>("Juice", 10)
+//        );
+//        final List<Tuple2<String, String>> inputValues2 = Arrays.asList(
+//                new Tuple2<>("Apple juice", "Juice"),
+//                new Tuple2<>("Tap water", "Water"),
+//                new Tuple2<>("Orange juice", "Juice")
+//        );
+//
+//        // Execute the job.
+//        final LoadCollectionDataQuantaBuilder<Tuple2<String, Integer>> dataQuanta1 = builder.loadCollection(inputValues1);
+//        final LoadCollectionDataQuantaBuilder<Tuple2<String, String>> dataQuanta2 = builder.loadCollection(inputValues2);
+//        final Collection<Tuple2<Set<Tuple2<String, Integer>>, Set<Tuple2<String, String>>>> outputValues = dataQuanta1
+//                .coGroup(Tuple2::getField0, dataQuanta2, Tuple2::getField1)
+//                .map(joinTuple -> new Tuple2<>(
+//                        WayangCollections.asSet(joinTuple.getField0()),
+//                        WayangCollections.asSet(joinTuple.getField1())
+//                ))
+//                .collect();
+//
+//        // Verify the outcome.
+//        Set<Tuple2<Set<Tuple2<String, Integer>>, Set<Tuple2<String, String>>>> expectedValues = WayangCollections.asSet(
+//                new Tuple2<>(
+//                        WayangCollections.asSet(new Tuple2<>("Water", 0)),
+//                        WayangCollections.asSet(new Tuple2<>("Tap water", "Water"))
+//                ),
+//                new Tuple2<>(
+//                        WayangCollections.asSet(new Tuple2<>("Cola", 5)),
+//                        WayangCollections.asSet()
+//                ), new Tuple2<>(
+//                        WayangCollections.asSet(new Tuple2<>("Juice", 10)),
+//                        WayangCollections.asSet(new Tuple2<>("Apple juice", "Juice"), new Tuple2<>("Orange juice", "Juice"))
+//                )
+//        );
+//        Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
+//    }
+//
+//    @Test
+//    public void testCoGroupViaKeyBy() {
+//        // Set up WayangContext.
+//        WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()).with(Spark.basicPlugin());
+//        JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
+//
+//        // Generate test data.
+//        final List<Tuple2<String, Integer>> inputValues1 = Arrays.asList(
+//                new Tuple2<>("Water", 0),
+//                new Tuple2<>("Cola", 5),
+//                new Tuple2<>("Juice", 10)
+//        );
+//        final List<Tuple2<String, String>> inputValues2 = Arrays.asList(
+//                new Tuple2<>("Apple juice", "Juice"),
+//                new Tuple2<>("Tap water", "Water"),
+//                new Tuple2<>("Orange juice", "Juice")
+//        );
+//
+//        // Execute the job.
+//        final LoadCollectionDataQuantaBuilder<Tuple2<String, Integer>> dataQuanta1 = builder.loadCollection(inputValues1);
+//        final LoadCollectionDataQuantaBuilder<Tuple2<String, String>> dataQuanta2 = builder.loadCollection(inputValues2);
+//        final Collection<Tuple2<Set<Tuple2<String, Integer>>, Set<Tuple2<String, String>>>> outputValues =
+//                dataQuanta1.keyBy(Tuple2::getField0)
+//                        .coGroup(dataQuanta2.keyBy(Tuple2::getField1))
+//                        .map(joinTuple -> new Tuple2<>(
+//                                WayangCollections.asSet(joinTuple.getField0()),
+//                                WayangCollections.asSet(joinTuple.getField1())
+//                        ))
+//                        .collect();
+//
+//        // Verify the outcome.
+//        Set<Tuple2<Set<Tuple2<String, Integer>>, Set<Tuple2<String, String>>>> expectedValues = WayangCollections.asSet(
+//                new Tuple2<>(
+//                        WayangCollections.asSet(new Tuple2<>("Water", 0)),
+//                        WayangCollections.asSet(new Tuple2<>("Tap water", "Water"))
+//                ),
+//                new Tuple2<>(
+//                        WayangCollections.asSet(new Tuple2<>("Cola", 5)),
+//                        WayangCollections.asSet()
+//                ), new Tuple2<>(
+//                        WayangCollections.asSet(new Tuple2<>("Juice", 10)),
+//                        WayangCollections.asSet(new Tuple2<>("Apple juice", "Juice"), new Tuple2<>("Orange juice", "Juice"))
+//                )
+//        );
+//        Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
+//    }
+//
+//    @Test
+//    public void testIntersect() {
+//        // Set up WayangContext.
+//        WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
+//        JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
+//
+//        // Generate test data.
+//        final List<Integer> inputValues1 = Arrays.asList(1, 2, 3, 4, 5, 7, 8, 9, 10);
+//        final List<Integer> inputValues2 = Arrays.asList(0, 2, 3, 3, 4, 5, 7, 8, 9, 11);
+//
+//        // Execute the job.
+//        final LoadCollectionDataQuantaBuilder<Integer> dataQuanta1 = builder.loadCollection(inputValues1);
+//        final LoadCollectionDataQuantaBuilder<Integer> dataQuanta2 = builder.loadCollection(inputValues2);
+//        final Collection<Integer> outputValues = dataQuanta1.intersect(dataQuanta2).collect();
+//
+//        // Verify the outcome.
+//        Set<Integer> expectedValues = WayangCollections.asSet(2, 3, 4, 5, 7, 8, 9);
+//        Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
+//    }
+//
+//    @Test
+//    public void testSort() {
+//        // Set up WayangContext.
+//        WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
+//        JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
+//
+//        // Generate test data.
+//        final List<Integer> inputValues1 = Arrays.asList(3, 4, 5, 2, 1);
+//
+//        // Execute the job.
+//        final LoadCollectionDataQuantaBuilder<Integer> dataQuanta1 = builder.loadCollection(inputValues1);
+//        final Collection<Integer> outputValues = dataQuanta1.sort(r -> r).collect();
+//
+//        // Verify the outcome.
+//        List<Integer> expectedValues = Arrays.asList(1, 2, 3, 4, 5);
+//        Assert.assertEquals(expectedValues, WayangCollections.asList(outputValues));
+//    }
+//
+//
+//    @Test
+//    public void testPageRank() {
+//        // Set up WayangContext.
+//        WayangContext wayangContext = new WayangContext()
+//                .with(Java.basicPlugin())
+//                .with(Java.graphPlugin());
+//        JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
+//
+//        // Create a test graph.
+//        Collection<Tuple2<Long, Long>> edges = Arrays.asList(
+//                new Tuple2<>(0L, 1L),
+//                new Tuple2<>(0L, 2L),
+//                new Tuple2<>(0L, 3L),
+//                new Tuple2<>(1L, 0L),
+//                new Tuple2<>(2L, 1L),
+//                new Tuple2<>(3L, 2L),
+//                new Tuple2<>(3L, 1L)
+//        );
+//
+//        // Execute the job.
+//        Collection<Tuple2<Long, Float>> pageRanks = builder.loadCollection(edges).asEdges()
+//                .pageRank(20)
+//                .collect();
+//        List<Tuple2<Long, Float>> sortedPageRanks = new ArrayList<>(pageRanks);
+//        sortedPageRanks.sort((pr1, pr2) -> Float.compare(pr2.field1, pr1.field1));
+//
+//        System.out.println(sortedPageRanks);
+//        Assert.assertEquals(1L, sortedPageRanks.get(0).field0.longValue());
+//        Assert.assertEquals(0L, sortedPageRanks.get(1).field0.longValue());
+//        Assert.assertEquals(2L, sortedPageRanks.get(2).field0.longValue());
+//        Assert.assertEquals(3L, sortedPageRanks.get(3).field0.longValue());
+//    }
+//
+//    @Test
+//    public void testMapPartitions() {
+//        WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
+//        JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
+//
+//        // Generate test data.
+//        List<Integer> inputValues = WayangArrays.asList(0, 1, 2, 3, 4, 6, 8);
+//
+//        // Execute the job.
+//        Collection<Tuple2<String, Integer>> outputValues = builder.loadCollection(inputValues)
+//                .mapPartitions(partition -> {
+//                    int numEvens = 0, numOdds = 0;
+//                    for (Integer value : partition) {
+//                        if ((value & 1) == 0) numEvens++;
+//                        else numOdds++;
+//                    }
+//                    return Arrays.asList(
+//                            new Tuple2<>("odd", numOdds),
+//                            new Tuple2<>("even", numEvens)
+//                    );
+//                })
+//                .reduceByKey(Tuple2::getField0, (t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1()))
+//                .collect();
+//
+//        // Check the output.
+//        Set<Tuple2<String, Integer>> expectedOutput = WayangCollections.asSet(
+//                new Tuple2<>("even", 5), new Tuple2<>("odd", 2)
+//        );
+//        Assert.assertEquals(expectedOutput, WayangCollections.asSet(outputValues));
+//    }
+//
+//    @Test
+//    public void testZipWithId() {
+//        WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
+//        JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
+//
+//        // Generate test data.
+//        List<Integer> inputValues = new ArrayList<>(42 * 100);
+//        for (int i = 0; i < 100; i++) {
+//            for (int j = 0; j < 42; j++) {
+//                inputValues.add(i);
+//            }
+//        }
+//
+//        // Execute the job.
+//        Collection<Tuple2<Integer, Integer>> outputValues = builder.loadCollection(inputValues)
+//                .zipWithId()
+//                .groupByKey(Tuple2::getField1)
+//                .map(group -> {
+//                    int distinctIds = (int) StreamSupport.stream(group.spliterator(), false)
+//                            .map(Tuple2::getField0)
+//                            .distinct()
+//                            .count();
+//                    return new Tuple2<>(distinctIds, 1);
+//                })
+//                .reduceByKey(Tuple2::getField0, (t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1()))
+//                .collect();
+//
+//        // Check the output.
+//        Set<Tuple2<Integer, Integer>> expectedOutput = Collections.singleton(new Tuple2<>(42, 100));
+//        Assert.assertEquals(expectedOutput, WayangCollections.asSet(outputValues));
+//    }
+//
+//    @Test
+//    public void testWriteTextFile() throws IOException, URISyntaxException {
+//        WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
+//        JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
+//
+//        // Generate test data.
+//        List<Double> inputValues = Arrays.asList(0d, 1 / 3d, 2 / 3d, 1d, 4 / 3d, 5 / 3d);
+//
+//        // Execute the job.
+//        File tempDir = LocalFileSystem.findTempDir();
+//        String targetUrl = LocalFileSystem.toURL(new File(tempDir, "testWriteTextFile.txt"));
+//
+//        builder
+//                .loadCollection(inputValues)
+//                .writeTextFile(targetUrl, d -> String.format("%.2f", d), "testWriteTextFile()");
+//
+//        // Check the output.
+//        Set<String> actualLines = Files.lines(Paths.get(new URI(targetUrl))).collect(Collectors.toSet());
+//        Set<String> expectedLines = inputValues.stream().map(d -> String.format("%.2f", d)).collect(Collectors.toSet());
+//        Assert.assertEquals(expectedLines, actualLines);
+//    }
+//
+//    @Test
+//    public void testSqlOnJava() throws IOException, SQLException {
+//        // Execute job.
+//        final WayangContext wayangCtx = new WayangContext(this.sqlite3Configuration)
+//                .with(Java.basicPlugin())
+//                .with(Sqlite3.plugin());
+//        JavaPlanBuilder builder = new JavaPlanBuilder(wayangCtx, "testSqlOnJava()");
+//        final Collection<String> outputValues = builder
+//                .readTable(new Sqlite3TableSource("customer", "name", "age"))
+//                .filter(r -> (Integer) r.getField(1) >= 18).withSqlUdf("age >= 18").withTargetPlatform(Java.platform())
+//                .asRecords().projectRecords(new String[]{"name"})
+//                .map(record -> (String) record.getField(0))
+//                .collect();
+//
+//        // Test the outcome.
+//        Assert.assertEquals(
+//                WayangCollections.asSet("John", "Evelyn"),
+//                WayangCollections.asSet(outputValues)
+//        );
+//    }
+//
+//    @Test
+//    public void testSqlOnSqlite3() throws IOException, SQLException {
+//        // Execute job.
+//        final WayangContext wayangCtx = new WayangContext(this.sqlite3Configuration)
+//                .with(Java.basicPlugin())
+//                .with(Sqlite3.plugin());
+//        JavaPlanBuilder builder = new JavaPlanBuilder(wayangCtx, "testSqlOnSqlite3()");
+//        final Collection<String> outputValues = builder
+//                .readTable(new Sqlite3TableSource("customer", "name", "age"))
+//                .filter(r -> (Integer) r.getField(1) >= 18).withSqlUdf("age >= 18")
+//                .asRecords().projectRecords(new String[]{"name"}).withTargetPlatform(Sqlite3.platform())
+//                .map(record -> (String) record.getField(0))
+//                .collect();
+//
+//        // Test the outcome.
+//        Assert.assertEquals(
+//                WayangCollections.asSet("John", "Evelyn"),
+//                WayangCollections.asSet(outputValues)
+//        );
+//    }
+
+}
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/scala/org/apache/wayang/api/ApiTest.scala b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/scala/org/apache/wayang/api/ApiTest.scala
new file mode 100644
index 0000000..ed1a7ac
--- /dev/null
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/scala/org/apache/wayang/api/ApiTest.scala
@@ -0,0 +1,575 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api
+
+import org.apache.wayang.basic.WayangBasics
+import org.apache.wayang.core.api.{Configuration, WayangContext}
+import org.apache.wayang.core.function.FunctionDescriptor.ExtendedSerializablePredicate
+import org.apache.wayang.core.function.{ExecutionContext, TransformationDescriptor}
+import org.apache.wayang.core.util.fs.LocalFileSystem
+import org.apache.wayang.java.Java
+import org.junit.{Assert, Test}
+
+import java.io.File
+import java.net.URI
+import java.nio.file.{Files, Paths}
+import java.sql.{Connection, Statement}
+import java.util.function.Consumer
+
+/**
+  * Tests the Wayang API.
+  */
+class ApiTest {
+
+  @Test
+  def testReadMapCollect(): Unit = {
+    // Set up WayangContext.
+    val wayangContext = new WayangContext()
+                         .withPlugin(Java.basicPlugin)
+//                         .withPlugin(Spark.basicPlugin)
+    // Generate some test data.
+    val inputValues = (for (i <- 1 to 10) yield i).toArray
+
+    // Build and execute a Wayang plan.
+    val outputValues = wayangContext
+      .loadCollection(inputValues).withName("Load input values")
+      .map(_ + 2).withName("Add 2")
+      .collect()
+
+    // Check the outcome.
+    val expectedOutputValues = inputValues.map(_ + 2)
+    Assert.assertArrayEquals(expectedOutputValues, outputValues.toArray)
+  }
+
+//  @Test
+//  def testCustomOperator(): Unit = {
+//    // Set up WayangContext.
+//    val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
+//
+//    // Generate some test data.
+//    val inputValues = (for (i <- 1 to 10) yield i).toArray
+//
+//    // Build and execute a Wayang plan.
+//    val inputDataSet = wayang.loadCollection(inputValues).withName("Load input values")
+//
+//    // Add the custom operator.
+//    val IndexedSeq(addedValues) = wayang.customOperator(new JavaMapOperator(
+//      dataSetType[Int],
+//      dataSetType[Int],
+//      new TransformationDescriptor(
+//        toSerializableFunction[Int, Int](_ + 2),
+//        basicDataUnitType[Int], basicDataUnitType[Int]
+//      )
+//    ), inputDataSet)
+//    addedValues.withName("Add 2")
+//
+//    // Collect the result.
+//    val outputValues = addedValues.asInstanceOf[DataQuanta[Int]].collect()
+//
+//    // Check the outcome.
+//    val expectedOutputValues = inputValues.map(_ + 2)
+//    Assert.assertArrayEquals(expectedOutputValues, outputValues.toArray)
+//  }
+//
+//  @Test
+//  def testCustomOperatorShortCut(): Unit = {
+//    // Set up WayangContext.
+//    val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
+//
+//    // Generate some test data.
+//    val inputValues = (for (i <- 1 to 10) yield i).toArray
+//
+//    // Build and execute a Wayang plan.
+//    val outputValues = wayang
+//      .loadCollection(inputValues).withName("Load input values")
+//      .customOperator[Int](new JavaMapOperator(
+//      dataSetType[Int],
+//      dataSetType[Int],
+//      new TransformationDescriptor(
+//        toSerializableFunction[Int, Int](_ + 2),
+//        basicDataUnitType[Int], basicDataUnitType[Int]
+//      )
+//    )).withName("Add 2")
+//      .collect()
+//
+//    // Check the outcome.
+//    val expectedOutputValues = inputValues.map(_ + 2)
+//    Assert.assertArrayEquals(expectedOutputValues, outputValues.toArray)
+//  }
+//
+//  @Test
+//  def testWordCount(): Unit = {
+//    // Set up WayangContext.
+//    val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
+//
+//    // Generate some test data.
+//    val inputValues = Array("Big data is big.", "Is data big data?")
+//
+//    // Build and execute a word count WayangPlan.
+//    val wordCounts = wayang
+//      .loadCollection(inputValues).withName("Load input values")
+//      .flatMap(_.split("\\s+")).withName("Split words")
+//      .map(_.replaceAll("\\W+", "").toLowerCase).withName("To lowercase")
+//      .map((_, 1)).withName("Attach counter")
+//      .reduceByKey(_._1, (a, b) => (a._1, a._2 + b._2)).withName("Sum counters")
+//      .collect().toSet
+//
+//    val expectedWordCounts = Set(("big", 3), ("is", 2), ("data", 3))
+//
+//    Assert.assertEquals(expectedWordCounts, wordCounts)
+//  }
+//
+//  @Test
+//  def testWordCountOnSparkAndJava(): Unit = {
+//    // Set up WayangContext.
+//    val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
+//
+//    // Generate some test data.
+//    val inputValues = Array("Big data is big.", "Is data big data?")
+//
+//    // Build and execute a word count WayangPlan.
+//    val wordCounts = wayang
+//      .loadCollection(inputValues).withName("Load input values").withTargetPlatforms(Java.platform)
+//      .flatMap(_.split("\\s+")).withName("Split words").withTargetPlatforms(Java.platform)
+//      .map(_.replaceAll("\\W+", "").toLowerCase).withName("To lowercase").withTargetPlatforms(Spark.platform)
+//      .map((_, 1)).withName("Attach counter").withTargetPlatforms(Spark.platform)
+//      .reduceByKey(_._1, (a, b) => (a._1, a._2 + b._2)).withName("Sum counters").withTargetPlatforms(Spark.platform)
+//      .collect().toSet
+//
+//    val expectedWordCounts = Set(("big", 3), ("is", 2), ("data", 3))
+//
+//    Assert.assertEquals(expectedWordCounts, wordCounts)
+//  }
+//
+//  @Test
+//  def testSample(): Unit = {
+//    // Set up WayangContext.
+//    val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
+//
+//    // Generate some test data.
+//    val inputValues = for (i <- 0 until 100) yield i
+//
+//    // Build and execute the WayangPlan.
+//    val sample = wayang
+//      .loadCollection(inputValues)
+//      .sample(10)
+//      .collect()
+//
+//    // Check the result.
+//    Assert.assertEquals(10, sample.size)
+//    Assert.assertEquals(10, sample.toSet.size)
+//  }
+//
+//  @Test
+//  def testDoWhile(): Unit = {
+//    // Set up WayangContext.
+//    val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
+//
+//    // Generate some test data.
+//    val inputValues = Array(1, 2)
+//
+//    // Build and execute a word count WayangPlan.
+//
+//    val values = wayang
+//      .loadCollection(inputValues).withName("Load input values")
+//      .doWhile[Int](vals => vals.max > 100, {
+//      start =>
+//        val sum = start.reduce(_ + _).withName("Sum")
+//        (start.union(sum).withName("Old+new"), sum)
+//    }).withName("While <= 100")
+//      .collect().toSet
+//
+//    val expectedValues = Set(1, 2, 3, 6, 12, 24, 48, 96, 192)
+//    Assert.assertEquals(expectedValues, values)
+//  }
+//
+//  @Test
+//  def testRepeat(): Unit = {
+//    // Set up WayangContext.
+//    val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
+//
+//    // Generate some test data.
+//    val inputValues = Array(1, 2)
+//
+//    // Build and execute a word count WayangPlan.
+//
+//    val values = wayang
+//      .loadCollection(inputValues).withName("Load input values").withName(inputValues.mkString(","))
+//      .repeat(3,
+//        _.reduce(_ * _).withName("Multiply")
+//          .flatMap(v => Seq(v, v + 1)).withName("Duplicate")
+//      ).withName("Repeat 3x")
+//      .collect().toSet
+//
+//    // initial: 1,2 -> 1st: 2,3 -> 2nd: 6,7 => 3rd: 42,43
+//    val expectedValues = Set(42, 43)
+//    Assert.assertEquals(expectedValues, values)
+//  }
+//
+//  @Test
+//  def testBroadcast() = {
+//    // Set up WayangContext.
+//    val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
+//    val builder = new PlanBuilder(wayang)
+//
+//    // Generate some test data.
+//    val inputStrings = Array("Hello", "World", "Hi", "Mars")
+//    val selectors = Array('o', 'l')
+//
+//    val selectorsDataSet = builder.loadCollection(selectors).withName("Load selectors")
+//
+//    // Build and execute a word count WayangPlan.
+//    val values = builder
+//      .loadCollection(inputStrings).withName("Load input values")
+//      .filterJava(new ExtendedSerializablePredicate[String] {
+//
+//        var selectors: Iterable[Char] = _
+//
+//        override def open(ctx: ExecutionContext): Unit = {
+//          import scala.collection.JavaConversions._
+//          selectors = collectionAsScalaIterable(ctx.getBroadcast[Char]("selectors"))
+//        }
+//
+//        override def test(t: String): Boolean = selectors.forall(selector => t.contains(selector))
+//
+//      }).withName("Filter words")
+//      .withBroadcast(selectorsDataSet, "selectors")
+//      .collect().toSet
+//
+//    val expectedValues = Set("Hello", "World")
+//    Assert.assertEquals(expectedValues, values)
+//  }
+//
+//  @Test
+//  def testGroupBy() = {
+//    // Set up WayangContext.
+//    val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
+//
+//    val inputValues = Array(1, 2, 3, 4, 5, 7, 8, 9, 10)
+//
+//    val result = wayang
+//      .loadCollection(inputValues)
+//      .groupByKey(_ % 2).withName("group odd and even")
+//      .map {
+//        group =>
+//          import scala.collection.JavaConversions._
+//          val buffer = group.toBuffer
+//          buffer.sortBy(identity)
+//          if (buffer.size % 2 == 0) (buffer(buffer.size / 2 - 1) + buffer(buffer.size / 2)) / 2
+//          else buffer(buffer.size / 2)
+//      }.withName("median")
+//      .collect()
+//
+//    val expectedValues = Set(5, 6)
+//    Assert.assertEquals(expectedValues, result.toSet)
+//  }
+//
+//  @Test
+//  def testGroup() = {
+//    // Set up WayangContext.
+//    val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
+//
+//    val inputValues = Array(1, 2, 3, 4, 5, 7, 8, 9, 10)
+//
+//    val result = wayang
+//      .loadCollection(inputValues)
+//      .group()
+//      .map {
+//        group =>
+//          import scala.collection.JavaConversions._
+//          val buffer = group.toBuffer
+//          buffer.sortBy(int => int)
+//          if (buffer.size % 2 == 0) (buffer(buffer.size / 2) + buffer(buffer.size / 2 + 1)) / 2
+//          else buffer(buffer.size / 2)
+//      }
+//      .collect()
+//
+//    val expectedValues = Set(5)
+//    Assert.assertEquals(expectedValues, result.toSet)
+//  }
+//
+//  @Test
+//  def testJoin() = {
+//    // Set up WayangContext.
+//    val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
+//
+//    val inputValues1 = Array(("Water", 0), ("Tonic", 5), ("Juice", 10))
+//    val inputValues2 = Array(("Apple juice", "Juice"), ("Tap water", "Water"), ("Orange juice", "Juice"))
+//
+//    val builder = new PlanBuilder(wayang)
+//    val dataQuanta1 = builder.loadCollection(inputValues1)
+//    val dataQuanta2 = builder.loadCollection(inputValues2)
+//    val result = dataQuanta1
+//      .join[(String, String), String](_._1, dataQuanta2, _._2)
+//      .map(joinTuple => (joinTuple.field1._1, joinTuple.field0._2))
+//      .collect()
+//
+//    val expectedValues = Set(("Apple juice", 10), ("Tap water", 0), ("Orange juice", 10))
+//    Assert.assertEquals(expectedValues, result.toSet)
+//  }
+//
+//  @Test
+//  def testJoinAndAssemble() = {
+//    // Set up WayangContext.
+//    val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
+//
+//    val inputValues1 = Array(("Water", 0), ("Tonic", 5), ("Juice", 10))
+//    val inputValues2 = Array(("Apple juice", "Juice"), ("Tap water", "Water"), ("Orange juice", "Juice"))
+//
+//    val builder = new PlanBuilder(wayang)
+//    val dataQuanta1 = builder.loadCollection(inputValues1)
+//    val dataQuanta2 = builder.loadCollection(inputValues2)
+//    val result = dataQuanta1.keyBy(_._1).join(dataQuanta2.keyBy(_._2))
+//      .assemble((dq1, dq2) => (dq2._1, dq1._2))
+//      .collect()
+//
+//    val expectedValues = Set(("Apple juice", 10), ("Tap water", 0), ("Orange juice", 10))
+//    Assert.assertEquals(expectedValues, result.toSet)
+//  }
+//
+//
+//  @Test
+//  def testCoGroup() = {
+//    // Set up WayangContext.
+//    val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
+//
+//    val inputValues1 = Array(("Water", 0), ("Cola", 5), ("Juice", 10))
+//    val inputValues2 = Array(("Apple juice", "Juice"), ("Tap water", "Water"), ("Orange juice", "Juice"))
+//
+//    val builder = new PlanBuilder(wayang)
+//    val dataQuanta1 = builder.loadCollection(inputValues1)
+//    val dataQuanta2 = builder.loadCollection(inputValues2)
+//    val result = dataQuanta1
+//      .coGroup[(String, String), String](_._1, dataQuanta2, _._2)
+//      .collect()
+//
+//    import scala.collection.JavaConversions._
+//    val actualValues = result.map(coGroup => (coGroup.field0.toSet, coGroup.field1.toSet)).toSet
+//    val expectedValues = Set(
+//      (Set(("Water", 0)), Set(("Tap water", "Water"))),
+//      (Set(("Cola", 5)), Set()),
+//      (Set(("Juice", 10)), Set(("Apple juice", "Juice"), ("Orange juice", "Juice")))
+//    )
+//    Assert.assertEquals(expectedValues, actualValues)
+//  }
+//
+//  @Test
+//  def testIntersect() = {
+//    // Set up WayangContext.
+//    val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
+//
+//    val inputValues1 = Array(1, 2, 3, 4, 5, 7, 8, 9, 10)
+//    val inputValues2 = Array(0, 2, 3, 3, 4, 5, 7, 8, 9, 11)
+//
+//    val builder = new PlanBuilder(wayang)
+//    val dataQuanta1 = builder.loadCollection(inputValues1)
+//    val dataQuanta2 = builder.loadCollection(inputValues2)
+//    val result = dataQuanta1
+//      .intersect(dataQuanta2)
+//      .collect()
+//
+//    val expectedValues = Set(2, 3, 4, 5, 7, 8, 9)
+//    Assert.assertEquals(expectedValues, result.toSet)
+//  }
+//
+//
+//  @Test
+//  def testSort() = {
+//    // Set up WayangContext.
+//    val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
+//
+//    val inputValues1 = Array(3, 4, 5, 2, 1)
+//
+//    val builder = new PlanBuilder(wayang)
+//    val dataQuanta1 = builder.loadCollection(inputValues1)
+//    val result = dataQuanta1
+//      .sort(r=>r)
+//      .collect()
+//
+//    val expectedValues = Array(1, 2, 3, 4, 5)
+//    Assert.assertArrayEquals(expectedValues, result.toArray)
+//  }
+//
+//
+//  @Test
+//  def testPageRank() = {
+//    // Set up WayangContext.
+//    val wayang = new WayangContext()
+//      .withPlugin(Java.graphPlugin)
+//      .withPlugin(WayangBasics.graphPlugin)
+//      .withPlugin(Java.basicPlugin)
+//    import org.apache.wayang.api.graph._
+//
+//    val edges = Seq((0, 1), (0, 2), (0, 3), (1, 0), (2, 1), (3, 2), (3, 1)).map(t => Edge(t._1, t._2))
+//
+//    val pageRanks = wayang
+//      .loadCollection(edges).withName("Load edges")
+//      .pageRank(20).withName("PageRank")
+//      .collect()
+//      .map(t => t.field0.longValue -> t.field1)
+//      .toMap
+//
+//    print(pageRanks)
+//    // Let's not check absolute numbers but only the relative ordering.
+//    Assert.assertTrue(pageRanks(1) > pageRanks(0))
+//    Assert.assertTrue(pageRanks(0) > pageRanks(2))
+//    Assert.assertTrue(pageRanks(2) > pageRanks(3))
+//  }
+//
+//  @Test
+//  def testMapPartitions() = {
+//    // Set up WayangContext.
+//    val wayang = new WayangContext()
+//      .withPlugin(Java.basicPlugin())
+//      .withPlugin(Spark.basicPlugin)
+//
+//    val typeCounts = wayang
+//      .loadCollection(Seq(0, 1, 2, 3, 4, 6, 8))
+//        .mapPartitions { ints =>
+//          var (numOdds, numEvens) = (0, 0)
+//          ints.foreach(i => if ((i & 1) == 0) numEvens += 1 else numOdds += 1)
+//          Seq(("odd", numOdds), ("even", numEvens))
+//        }
+//      .reduceByKey(_._1, { case ((kind1, count1), (kind2, count2)) => (kind1, count1 + count2) })
+//      .collect()
+//
+//    Assert.assertEquals(Set(("odd", 2), ("even", 5)), typeCounts.toSet)
+//  }
+//
+//  @Test
+//  def testZipWithId() = {
+//    // Set up WayangContext.
+//    val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
+//
+//    val inputValues = for (i <- 0 until 100; j <- 0 until 42) yield i
+//
+//    val result = wayang
+//      .loadCollection(inputValues)
+//      .zipWithId
+//      .groupByKey(_.field1)
+//      .map { group =>
+//        import scala.collection.JavaConversions._
+//        (group.map(_.field0).toSet.size, 1)
+//      }
+//      .reduceByKey(_._1, (t1, t2) => (t1._1, t1._2 + t2._2))
+//      .collect()
+//
+//    val expectedValues = Set((42, 100))
+//    Assert.assertEquals(expectedValues, result.toSet)
+//  }
+//
+//  @Test
+//  def testWriteTextFile() = {
+//    val tempDir = LocalFileSystem.findTempDir
+//    val targetUrl = LocalFileSystem.toURL(new File(tempDir, "testWriteTextFile.txt"))
+//
+//    // Set up WayangContext.
+//    val wayang = new WayangContext().withPlugin(Java.basicPlugin)
+//
+//    val inputValues = for (i <- 0 to 5) yield i * 0.333333333333
+//
+//    val result = wayang
+//      .loadCollection(inputValues)
+//      .writeTextFile(targetUrl, formatterUdf = d => f"${d % .2f}")
+//
+//    val lines = scala.collection.mutable.Set[String]()
+//    Files.lines(Paths.get(new URI(targetUrl))).forEach(new Consumer[String] {
+//      override def accept(line: String): Unit = lines += line
+//    })
+//
+//    val expectedLines = inputValues.map(v => f"${v % .2f}").toSet
+//    Assert.assertEquals(expectedLines, lines)
+//  }
+//
+//  @Test
+//  def testSqlOnJava() = {
+//    // Initialize some test data.
+//    val configuration = new Configuration
+//    val sqlite3dbFile = File.createTempFile("wayang-sqlite3", "db")
+//    sqlite3dbFile.deleteOnExit()
+//    configuration.setProperty("wayang.sqlite3.jdbc.url", "jdbc:sqlite:" + sqlite3dbFile.getAbsolutePath)
+//
+//    try {
+//      val connection: Connection = Sqlite3.platform.createDatabaseDescriptor(configuration).createJdbcConnection
+//      try {
+//        val statement: Statement = connection.createStatement
+//        statement.addBatch("DROP TABLE IF EXISTS customer;")
+//        statement.addBatch("CREATE TABLE customer (name TEXT, age INT);")
+//        statement.addBatch("INSERT INTO customer VALUES ('John', 20)")
+//        statement.addBatch("INSERT INTO customer VALUES ('Timmy', 16)")
+//        statement.addBatch("INSERT INTO customer VALUES ('Evelyn', 35)")
+//        statement.executeBatch()
+//      } finally {
+//        if (connection != null) connection.close()
+//      }
+//    }
+//
+//    // Set up WayangContext.
+//    val wayang = new WayangContext(configuration).withPlugin(Java.basicPlugin).withPlugin(Sqlite3.plugin)
+//
+//    val result = wayang
+//      .readTable(new Sqlite3TableSource("customer", "name", "age"))
+//      .filter(r => r.getField(1).asInstanceOf[Integer] >= 18, sqlUdf = "age >= 18").withTargetPlatforms(Java.platform)
+//      .projectRecords(Seq("name"))
+//      .map(_.getField(0).asInstanceOf[String])
+//      .collect()
+//      .toSet
+//
+//    val expectedValues = Set("John", "Evelyn")
+//    Assert.assertEquals(expectedValues, result)
+//  }
+//
+//  @Test
+//  def testSqlOnSqlite3() = {
+//    // Initialize some test data.
+//    val configuration = new Configuration
+//    val sqlite3dbFile = File.createTempFile("wayang-sqlite3", "db")
+//    sqlite3dbFile.deleteOnExit()
+//    configuration.setProperty("wayang.sqlite3.jdbc.url", "jdbc:sqlite:" + sqlite3dbFile.getAbsolutePath)
+//
+//    try {
+//      val connection: Connection = Sqlite3.platform.createDatabaseDescriptor(configuration).createJdbcConnection
+//      try {
+//        val statement: Statement = connection.createStatement
+//        statement.addBatch("DROP TABLE IF EXISTS customer;")
+//        statement.addBatch("CREATE TABLE customer (name TEXT, age INT);")
+//        statement.addBatch("INSERT INTO customer VALUES ('John', 20)")
+//        statement.addBatch("INSERT INTO customer VALUES ('Timmy', 16)")
+//        statement.addBatch("INSERT INTO customer VALUES ('Evelyn', 35)")
+//        statement.executeBatch
+//      } finally {
+//        if (connection != null) connection.close()
+//      }
+//    }
+//
+//    // Set up WayangContext.
+//    val wayang = new WayangContext(configuration).withPlugin(Java.basicPlugin).withPlugin(Sqlite3.plugin)
+//
+//    val result = wayang
+//      .readTable(new Sqlite3TableSource("customer", "name", "age"))
+//      .filter(r => r.getField(1).asInstanceOf[Integer] >= 18, sqlUdf = "age >= 18")
+//      .projectRecords(Seq("name")).withTargetPlatforms(Sqlite3.platform)
+//      .map(_.getField(0).asInstanceOf[String])
+//      .collect()
+//      .toSet
+//
+//    val expectedValues = Set("John", "Evelyn")
+//    Assert.assertEquals(expectedValues, result)
+//  }
+}
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/tagger/HackitTagger.java b/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/tagger/HackitTagger.java
index 48002de..bb171ef 100644
--- a/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/tagger/HackitTagger.java
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/tagger/HackitTagger.java
@@ -32,6 +32,7 @@ import java.util.List;
  * tagging step in Hackit, this logic have and pre and post processing and they are acting like
  * template that follow same behaivor in every tagger
  */
+//TODO add the option of add a custom function
 public class HackitTagger implements Serializable {
 
     /**
diff --git a/wayang-tests-integration/src/test/java/org/apache/wayang/tests/RegressionIT.java b/wayang-tests-integration/src/test/java/org/apache/wayang/tests/RegressionIT.java
index 3b55a51..92384f2 100644
--- a/wayang-tests-integration/src/test/java/org/apache/wayang/tests/RegressionIT.java
+++ b/wayang-tests-integration/src/test/java/org/apache/wayang/tests/RegressionIT.java
@@ -21,8 +21,8 @@ package org.apache.wayang.tests;
 import org.junit.Assert;
 import org.junit.Test;
 import org.apache.wayang.api.JavaPlanBuilder;
-import org.apache.wayang.api.dataquantabuilder.LoadCollectionDataQuantaBuilder;
-import org.apache.wayang.api.dataquantabuilder.MapDataQuantaBuilder;
+import org.apache.wayang.api.dataquanta.builder.LoadCollectionDataQuantaBuilder;
+import org.apache.wayang.api.dataquanta.builder.MapDataQuantaBuilder;
 import org.apache.wayang.core.api.WayangContext;
 import org.apache.wayang.core.util.WayangArrays;
 import org.apache.wayang.java.Java;
diff --git a/wayang-tests-integration/src/test/java/org/apache/wayang/tests/WayangPlans.java b/wayang-tests-integration/src/test/java/org/apache/wayang/tests/WayangPlans.java
index 3062afb..88fa42f 100644
--- a/wayang-tests-integration/src/test/java/org/apache/wayang/tests/WayangPlans.java
+++ b/wayang-tests-integration/src/test/java/org/apache/wayang/tests/WayangPlans.java
@@ -18,7 +18,7 @@
 
 package org.apache.wayang.tests;
 
-import org.apache.wayang.api.DataQuantaBuilder;
+import org.apache.wayang.api.dataquanta.DataQuantaBuilder;
 import org.apache.wayang.api.JavaPlanBuilder;
 import org.apache.wayang.basic.data.Record;
 import org.apache.wayang.basic.data.Tuple2;