You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@wayang.apache.org by be...@apache.org on 2021/09/22 15:21:48 UTC
[incubator-wayang] 07/15: [WAYANG-31] structure the code on
different way
This is an automated email from the ASF dual-hosted git repository.
bertty pushed a commit to branch WAYANG-28
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit d39ab4f63c1967fc856f64c6aca07eec3fb07807
Author: Bertty Contreras-Rojas <be...@scalytics.io>
AuthorDate: Thu May 13 12:10:46 2021 -0400
[WAYANG-31] structure the code on different way
---
lolo | 121 ++++
.../org/apache/wayang/api/DataQuantaBuilder.scala | 440 -------------
.../org/apache/wayang/api/JavaPlanBuilder.scala | 3 +-
.../scala/org/apache/wayang/api/PlanBuilder.scala | 1 +
.../org/apache/wayang/api/RecordDataQuanta.scala | 1 +
.../wayang/api/RecordDataQuantaBuilder.scala | 3 +-
.../wayang/api/{ => dataquanta}/DataQuanta.scala | 98 +--
.../wayang/api/dataquanta/DataQuantaBuilder.scala | 441 +++++++++++++
.../wayang/api/dataquanta/JoinedDataQuanta.scala | 56 ++
.../wayang/api/dataquanta/KeyedDataQuanta.scala | 53 ++
.../builder}/BasicDataQuantaBuilder.scala | 5 +-
.../builder}/CartesianDataQuantaBuilder.scala | 5 +-
.../builder}/CoGroupDataQuantaBuilder.scala | 5 +-
.../builder}/CountDataQuantaBuilder.scala | 5 +-
.../builder}/CustomOperatorDataQuantaBuilder.scala | 5 +-
.../builder}/DistinctDataQuantaBuilder.scala | 5 +-
.../builder}/DoWhileDataQuantaBuilder.scala | 5 +-
.../builder}/FakeDataQuantaBuilder.scala | 5 +-
.../builder}/FilterDataQuantaBuilder.scala | 5 +-
.../builder}/FlatMapDataQuantaBuilder.scala | 5 +-
.../builder}/GlobalGroupDataQuantaBuilder.scala | 5 +-
.../builder}/GlobalReduceDataQuantaBuilder.scala | 5 +-
.../builder}/GroupByDataQuantaBuilder.scala | 5 +-
.../builder}/IntersectDataQuantaBuilder.scala | 5 +-
.../builder}/JoinDataQuantaBuilder.scala | 5 +-
.../builder}/KeyedDataQuantaBuilder.scala | 5 +-
.../builder}/LoadCollectionDataQuantaBuilder.scala | 5 +-
.../builder}/MapDataQuantaBuilder.scala | 5 +-
.../builder}/MapPartitionsDataQuantaBuilder.scala | 5 +-
.../builder}/ProjectionDataQuantaBuilder.scala | 5 +-
.../builder}/ReduceByDataQuantaBuilder.scala | 5 +-
.../builder}/RepeatDataQuantaBuilder.scala | 5 +-
.../builder}/SampleDataQuantaBuilder.scala | 5 +-
.../builder}/SortDataQuantaBuilder.scala | 5 +-
.../builder}/UnarySourceDataQuantaBuilder.scala | 5 +-
.../builder}/UnionDataQuantaBuilder.scala | 5 +-
.../builder}/ZipWithIdDataQuantaBuilder.scala | 5 +-
.../apache/wayang/api/graph/EdgeDataQuanta.scala | 1 +
.../wayang/api/graph/EdgeDataQuantaBuilder.scala | 5 +-
.../org/apache/wayang/api/graph/package.scala | 1 +
.../main/scala/org/apache/wayang/api/package.scala | 3 +-
.../wayang/api/util/DataQuantaBuilderCache.scala | 4 +-
.../api/util/DataQuantaBuilderDecorator.scala | 3 +-
.../java/org/apache/wayang/api/JavaApiTest.java | 5 +-
.../test/scala/org/apache/wayang/api/ApiTest.scala | 3 +-
.../wayang-hackit/wayang-hackit-api/pom.xml | 33 +
.../java/org/apache/wayang/api/JavaApiTest.java | 711 +++++++++++++++++++++
.../test/scala/org/apache/wayang/api/ApiTest.scala | 575 +++++++++++++++++
.../plugin/hackit/core/tagger/HackitTagger.java | 1 +
.../java/org/apache/wayang/tests/RegressionIT.java | 4 +-
.../java/org/apache/wayang/tests/WayangPlans.java | 2 +-
51 files changed, 2117 insertions(+), 586 deletions(-)
diff --git a/lolo b/lolo
new file mode 100644
index 0000000..11a7c99
--- /dev/null
+++ b/lolo
@@ -0,0 +1,121 @@
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/bin/java -ea -Didea.test.cyclic.buffer.size=1048576 -javaagent
+/Users/bertty/Library/Application Support/JetBrains/Toolbox/apps/IDEA-U/ch-0/211.7142.45/IntelliJ IDEA.app/Contents/lib/idea_rt.jar=61718
+/Users/bertty/Library/Application Support/JetBrains/Toolbox/apps/IDEA-U/ch-0/211.7142.45/IntelliJ IDEA.app/Contents/bin -Dfile.encoding=UTF-8 -classpath /Users/bertty/Library/Application Support/JetBrains/Toolbox/apps/IDEA-U/ch-0/211.7142.45/IntelliJ IDEA.app/Contents/lib/idea_rt.jar
+/Users/bertty/.m2/repository/org/junit/platform/junit-platform-launcher/1.6.1/junit-platform-launcher-1.6.1.jar
+/Users/bertty/Library/Application Support/JetBrains/Toolbox/apps/IDEA-U/ch-0/211.7142.45/IntelliJ IDEA.app/Contents/plugins/junit/lib/junit5-rt.jar
+/Users/bertty/Library/Application Support/JetBrains/Toolbox/apps/IDEA-U/ch-0/211.7142.45/IntelliJ IDEA.app/Contents/plugins/junit/lib/junit-rt.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/jre/lib/charsets.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/jre/lib/ext/cldrdata.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/jre/lib/ext/dnsns.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/jre/lib/ext/jaccess.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/jre/lib/ext/jfxrt.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/jre/lib/ext/localedata.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/jre/lib/ext/nashorn.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/jre/lib/ext/sunec.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/jre/lib/ext/sunjce_provider.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/jre/lib/ext/sunpkcs11.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/jre/lib/ext/zipfs.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/jre/lib/jce.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/jre/lib/jfxswt.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/jre/lib/jsse.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/jre/lib/management-agent.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/jre/lib/resources.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/jre/lib/rt.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/lib/ant-javafx.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/lib/dt.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/lib/javafx-mx.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/lib/jconsole.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/lib/packager.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/lib/sa-jdi.jar
+/Users/bertty/Library/Java/JavaVirtualMachines/corretto-1.8.0_252/Contents/Home/lib/tools.jar
+/Users/bertty/IdeaProjects/incubator-wayang/wayang-plugins/wayang-hackit/wayang-hackit-api/target/test-classes
+/Users/bertty/IdeaProjects/incubator-wayang/wayang-plugins/wayang-hackit/wayang-hackit-api/target/classes
+/Users/bertty/IdeaProjects/incubator-wayang/wayang-api/wayang-api-scala-java/target/classes
+/Users/bertty/IdeaProjects/incubator-wayang/wayang-commons/wayang-core/target/classes
+/Users/bertty/.m2/repository/org/json/json/20160212/json-20160212.jar
+/Users/bertty/.m2/repository/org/yaml/snakeyaml/1.17/snakeyaml-1.17.jar
+/Users/bertty/.m2/repository/de/odysseus/juel/juel-api/2.2.7/juel-api-2.2.7.jar
+/Users/bertty/.m2/repository/de/odysseus/juel/juel-impl/2.2.7/juel-impl-2.2.7.jar
+/Users/bertty/.m2/repository/de/hpi/isg/profiledb-instrumentation/0.1.1/profiledb-instrumentation-0.1.1.jar
+/Users/bertty/.m2/repository/org/apache/commons/commons-lang3/3.3.2/commons-lang3-3.3.2.jar
+/Users/bertty/.m2/repository/org/apache/logging/log4j/log4j-api/2.14.0/log4j-api-2.14.0.jar
+/Users/bertty/.m2/repository/org/apache/logging/log4j/log4j-core/2.14.0/log4j-core-2.14.0.jar
+/Users/bertty/.m2/repository/net/sf/trove4j/trove4j/3.0.3/trove4j-3.0.3.jar
+/Users/bertty/IdeaProjects/incubator-wayang/wayang-commons/wayang-basic/target/classes
+/Users/bertty/.m2/repository/de/hpi/isg/profiledb-store/0.1.1/profiledb-store-0.1.1.jar
+/Users/bertty/IdeaProjects/incubator-wayang/wayang-platforms/wayang-java/target/classes
+/Users/bertty/.m2/repository/org/scala-lang/scala-library/2.11.12/scala-library-2.11.12.jar
+/Users/bertty/.m2/repository/org/apache/hadoop/hadoop-common/2.7.7/hadoop-common-2.7.7.jar
+/Users/bertty/.m2/repository/org/apache/hadoop/hadoop-annotations/2.7.7/hadoop-annotations-2.7.7.jar
+/Users/bertty/.m2/repository/com/google/guava/guava/11.0.2/guava-11.0.2.jar
+/Users/bertty/.m2/repository/commons-cli/commons-cli/1.2/commons-cli-1.2.jar
+/Users/bertty/.m2/repository/org/apache/commons/commons-math3/3.1.1/commons-math3-3.1.1.jar
+/Users/bertty/.m2/repository/xmlenc/xmlenc/0.52/xmlenc-0.52.jar
+/Users/bertty/.m2/repository/commons-httpclient/commons-httpclient/3.1/commons-httpclient-3.1.jar
+/Users/bertty/.m2/repository/commons-codec/commons-codec/1.4/commons-codec-1.4.jar
+/Users/bertty/.m2/repository/commons-io/commons-io/2.4/commons-io-2.4.jar
+/Users/bertty/.m2/repository/commons-net/commons-net/3.1/commons-net-3.1.jar
+/Users/bertty/.m2/repository/commons-collections/commons-collections/3.2.2/commons-collections-3.2.2.jar
+/Users/bertty/.m2/repository/org/mortbay/jetty/jetty-sslengine/6.1.26/jetty-sslengine-6.1.26.jar
+/Users/bertty/.m2/repository/commons-logging/commons-logging/1.1.3/commons-logging-1.1.3.jar
+/Users/bertty/.m2/repository/log4j/log4j/1.2.17/log4j-1.2.17.jar
+/Users/bertty/.m2/repository/commons-lang/commons-lang/2.6/commons-lang-2.6.jar
+/Users/bertty/.m2/repository/commons-configuration/commons-configuration/1.6/commons-configuration-1.6.jar
+/Users/bertty/.m2/repository/commons-digester/commons-digester/1.8/commons-digester-1.8.jar
+/Users/bertty/.m2/repository/commons-beanutils/commons-beanutils/1.7.0/commons-beanutils-1.7.0.jar
+/Users/bertty/.m2/repository/commons-beanutils/commons-beanutils-core/1.8.0/commons-beanutils-core-1.8.0.jar
+/Users/bertty/.m2/repository/org/slf4j/slf4j-api/1.7.10/slf4j-api-1.7.10.jar
+/Users/bertty/.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar
+/Users/bertty/.m2/repository/org/codehaus/jackson/jackson-core-asl/1.9.13/jackson-core-asl-1.9.13.jar
+/Users/bertty/.m2/repository/org/codehaus/jackson/jackson-mapper-asl/1.9.13/jackson-mapper-asl-1.9.13.jar
+/Users/bertty/.m2/repository/org/apache/avro/avro/1.7.4/avro-1.7.4.jar
+/Users/bertty/.m2/repository/com/thoughtworks/paranamer/paranamer/2.3/paranamer-2.3.jar
+/Users/bertty/.m2/repository/org/xerial/snappy/snappy-java/1.0.4.1/snappy-java-1.0.4.1.jar
+/Users/bertty/.m2/repository/com/google/protobuf/protobuf-java/2.5.0/protobuf-java-2.5.0.jar
+/Users/bertty/.m2/repository/com/google/code/gson/gson/2.2.4/gson-2.2.4.jar
+/Users/bertty/.m2/repository/org/apache/hadoop/hadoop-auth/2.7.7/hadoop-auth-2.7.7.jar
+/Users/bertty/.m2/repository/org/apache/httpcomponents/httpclient/4.2.5/httpclient-4.2.5.jar
+/Users/bertty/.m2/repository/org/apache/httpcomponents/httpcore/4.2.4/httpcore-4.2.4.jar
+/Users/bertty/.m2/repository/org/apache/directory/server/apacheds-kerberos-codec/2.0.0-M15/apacheds-kerberos-codec-2.0.0-M15.jar
+/Users/bertty/.m2/repository/org/apache/directory/server/apacheds-i18n/2.0.0-M15/apacheds-i18n-2.0.0-M15.jar
+/Users/bertty/.m2/repository/org/apache/directory/api/api-asn1-api/1.0.0-M20/api-asn1-api-1.0.0-M20.jar
+/Users/bertty/.m2/repository/org/apache/directory/api/api-util/1.0.0-M20/api-util-1.0.0-M20.jar
+/Users/bertty/.m2/repository/org/apache/curator/curator-framework/2.7.1/curator-framework-2.7.1.jar
+/Users/bertty/.m2/repository/org/apache/curator/curator-client/2.7.1/curator-client-2.7.1.jar
+/Users/bertty/.m2/repository/org/apache/curator/curator-recipes/2.7.1/curator-recipes-2.7.1.jar
+/Users/bertty/.m2/repository/com/google/code/findbugs/jsr305/3.0.0/jsr305-3.0.0.jar
+/Users/bertty/.m2/repository/org/apache/htrace/htrace-core/3.1.0-incubating/htrace-core-3.1.0-incubating.jar
+/Users/bertty/.m2/repository/org/apache/zookeeper/zookeeper/3.4.6/zookeeper-3.4.6.jar
+/Users/bertty/.m2/repository/org/apache/commons/commons-compress/1.4.1/commons-compress-1.4.1.jar
+/Users/bertty/.m2/repository/org/tukaani/xz/1.0/xz-1.0.jar
+/Users/bertty/.m2/repository/org/apache/hadoop/hadoop-hdfs/2.7.7/hadoop-hdfs-2.7.7.jar
+/Users/bertty/.m2/repository/org/mortbay/jetty/jetty-util/6.1.26/jetty-util-6.1.26.jar
+/Users/bertty/.m2/repository/io/netty/netty/3.6.2.Final/netty-3.6.2.Final.jar
+/Users/bertty/.m2/repository/io/netty/netty-all/4.0.23.Final/netty-all-4.0.23.Final.jar
+/Users/bertty/.m2/repository/xerces/xercesImpl/2.9.1/xercesImpl-2.9.1.jar
+/Users/bertty/.m2/repository/xml-apis/xml-apis/1.3.04/xml-apis-1.3.04.jar
+/Users/bertty/.m2/repository/org/fusesource/leveldbjni/leveldbjni-all/1.8/leveldbjni-all-1.8.jar
+/Users/bertty/.m2/repository/org/junit/jupiter/junit-jupiter/5.6.1/junit-jupiter-5.6.1.jar
+/Users/bertty/.m2/repository/org/junit/jupiter/junit-jupiter-api/5.6.1/junit-jupiter-api-5.6.1.jar
+/Users/bertty/.m2/repository/org/opentest4j/opentest4j/1.2.0/opentest4j-1.2.0.jar
+/Users/bertty/.m2/repository/org/junit/platform/junit-platform-commons/1.6.1/junit-platform-commons-1.6.1.jar
+/Users/bertty/.m2/repository/org/junit/jupiter/junit-jupiter-params/5.6.1/junit-jupiter-params-5.6.1.jar
+/Users/bertty/.m2/repository/org/junit/jupiter/junit-jupiter-engine/5.6.1/junit-jupiter-engine-5.6.1.jar
+/Users/bertty/.m2/repository/org/junit/vintage/junit-vintage-engine/5.6.1/junit-vintage-engine-5.6.1.jar
+/Users/bertty/.m2/repository/org/apiguardian/apiguardian-api/1.1.0/apiguardian-api-1.1.0.jar
+/Users/bertty/.m2/repository/org/junit/platform/junit-platform-engine/1.6.1/junit-platform-engine-1.6.1.jar
+/Users/bertty/.m2/repository/junit/junit/4.12/junit-4.12.jar
+/Users/bertty/.m2/repository/org/hamcrest/hamcrest-core/1.3/hamcrest-core-1.3.jar
+/Users/bertty/.m2/repository/org/mockito/mockito-core/3.5.10/mockito-core-3.5.10.jar
+/Users/bertty/.m2/repository/net/bytebuddy/byte-buddy/1.10.13/byte-buddy-1.10.13.jar
+/Users/bertty/.m2/repository/net/bytebuddy/byte-buddy-agent/1.10.13/byte-buddy-agent-1.10.13.jar
+/Users/bertty/.m2/repository/org/objenesis/objenesis/3.1/objenesis-3.1.jar
+/Users/bertty/.m2/repository/org/mockito/mockito-junit-jupiter/3.5.10/mockito-junit-jupiter-3.5.10.jar
+/Users/bertty/.m2/repository/org/assertj/assertj-core/3.17.2/assertj-core-3.17.2.jar
+/Users/bertty/.m2/repository/org/antlr/antlr4/4.9.1/antlr4-4.9.1.jar
+/Users/bertty/.m2/repository/org/antlr/antlr4-runtime/4.9.1/antlr4-runtime-4.9.1.jar
+/Users/bertty/.m2/repository/org/antlr/antlr-runtime/3.5.2/antlr-runtime-3.5.2.jar
+/Users/bertty/.m2/repository/org/antlr/ST4/4.3/ST4-4.3.jar
+/Users/bertty/.m2/repository/org/abego/treelayout/org.abego.treelayout.core/1.0.3/org.abego.treelayout.core-1.0.3.jar
+/Users/bertty/.m2/repository/org/glassfish/javax.json/1.0.4/javax.json-1.0.4.jar
+/Users/bertty/.m2/repository/com/ibm/icu/icu4j/61.1/icu4j-61.1.jar com.intellij.rt.junit.JUnitStarter -ideVersion5 -junit5 org.apache.wayang.api.ApiTest,testReadMapCollect
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala
deleted file mode 100644
index c6e5dcb..0000000
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala
+++ /dev/null
@@ -1,440 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.wayang.api
-
-
-import java.util.function.{Consumer, IntUnaryOperator, Function => JavaFunction}
-import java.util.{Collection => JavaCollection}
-import de.hpi.isg.profiledb.store.model.Experiment
-import org.apache.wayang.api.dataquantabuilder.{BasicDataQuantaBuilder, CartesianDataQuantaBuilder, CoGroupDataQuantaBuilder, CountDataQuantaBuilder, CustomOperatorDataQuantaBuilder, DistinctDataQuantaBuilder, DoWhileDataQuantaBuilder, FilterDataQuantaBuilder, FlatMapDataQuantaBuilder, GlobalGroupDataQuantaBuilder, GlobalReduceDataQuantaBuilder, GroupByDataQuantaBuilder, IntersectDataQuantaBuilder, JoinDataQuantaBuilder, KeyedDataQuantaBuilder, MapDataQuantaBuilder, MapPartitionsDataQuan [...]
-import org.apache.wayang.api.graph.{Edge, EdgeDataQuantaBuilder, EdgeDataQuantaBuilderDecorator}
-import org.apache.wayang.api.util.{DataQuantaBuilderCache, TypeTrap}
-import org.apache.wayang.basic.data.{Record, Tuple2 => RT2}
-import org.apache.wayang.basic.operators.{GlobalReduceOperator, LocalCallbackSink, MapOperator, SampleOperator}
-import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBiFunction, SerializableBinaryOperator, SerializableFunction, SerializablePredicate}
-import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
-import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator
-import org.apache.wayang.core.optimizer.costs.{LoadEstimator, LoadProfile, LoadProfileEstimator}
-import org.apache.wayang.core.plan.wayangplan.{Operator, OutputSlot, UnarySource, WayangPlan}
-import org.apache.wayang.core.platform.Platform
-import org.apache.wayang.core.types.DataSetType
-import org.apache.wayang.core.util.{Logging, ReflectionUtils, WayangCollections, Tuple => WayangTuple}
-
-import scala.collection.mutable.ListBuffer
-import scala.reflect.ClassTag
-
-/**
- * Trait/interface for builders of [[DataQuanta]]. The purpose of the builders is to provide a convenient
- * Java API for Wayang that compensates for lacking default and named arguments.
- */
-trait DataQuantaBuilder[+This <: DataQuantaBuilder[_, Out], Out] extends Logging {
-
- /**
- * The type of the [[DataQuanta]] to be built.
- */
- protected[api] def outputTypeTrap: TypeTrap
-
- /**
- * Provide a [[JavaPlanBuilder]] to which this instance is associated.
- */
- protected[api] implicit def javaPlanBuilder: JavaPlanBuilder
-
- /**
- * Set a name for the [[DataQuanta]] and its associated [[org.apache.wayang.core.plan.wayangplan.Operator]]s.
- *
- * @param name the name
- * @return this instance
- */
- def withName(name: String): This
-
- /**
- * Set an [[Experiment]] for the currently built [[org.apache.wayang.core.api.Job]].
- *
- * @param experiment the [[Experiment]]
- * @return this instance
- */
- def withExperiment(experiment: Experiment): This
-
- /**
- * Explicitly set an output [[DataSetType]] for the currently built [[DataQuanta]]. Note that it is not
- * always necessary to set it and that it can be inferred in some situations.
- *
- * @param outputType the output [[DataSetType]]
- * @return this instance
- */
- def withOutputType(outputType: DataSetType[Out]): This
-
- /**
- * Explicitly set an output [[Class]] for the currently built [[DataQuanta]]. Note that it is not
- * always necessary to set it and that it can be inferred in some situations.
- *
- * @param cls the output [[Class]]
- * @return this instance
- */
- def withOutputClass(cls: Class[Out]): This
-
- /**
- * Register a broadcast with the [[DataQuanta]] to be built
- *
- * @param sender a [[DataQuantaBuilder]] constructing the broadcasted [[DataQuanta]]
- * @param broadcastName the name of the broadcast
- * @return this instance
- */
- def withBroadcast[Sender <: DataQuantaBuilder[_, _]](sender: Sender, broadcastName: String): This
-
- /**
- * Set a [[CardinalityEstimator]] for the currently built [[DataQuanta]].
- *
- * @param cardinalityEstimator the [[CardinalityEstimator]]
- * @return this instance
- */
- def withCardinalityEstimator(cardinalityEstimator: CardinalityEstimator): This
-
- /**
- * Add a target [[Platform]] on which the currently built [[DataQuanta]] should be calculated. Can be invoked
- * multiple times to set multiple possilbe target [[Platform]]s or not at all to impose no restrictions.
- *
- * @param platform the [[CardinalityEstimator]]
- * @return this instance
- */
- def withTargetPlatform(platform: Platform): This
-
- /**
- * Register the JAR file containing the given [[Class]] with the currently built [[org.apache.wayang.core.api.Job]].
- *
- * @param cls the [[Class]]
- * @return this instance
- */
- def withUdfJarOf(cls: Class[_]): This
-
- /**
- * Register a JAR file with the currently built [[org.apache.wayang.core.api.Job]].
- *
- * @param path the path of the JAR file
- * @return this instance
- */
- def withUdfJar(path: String): This
-
- /**
- * Provide a [[ClassTag]] for the constructed [[DataQuanta]].
- *
- * @return the [[ClassTag]]
- */
- protected[api] implicit def classTag: ClassTag[Out] = ClassTag(outputTypeTrap.typeClass)
-
- /**
- * Feed the built [[DataQuanta]] into a [[MapOperator]].
- *
- * @param udf the UDF for the [[MapOperator]]
- * @return a [[MapDataQuantaBuilder]]
- */
- def map[NewOut](udf: SerializableFunction[Out, NewOut]) = new MapDataQuantaBuilder(this, udf)
-
- /**
- * Feed the built [[DataQuanta]] into a [[MapOperator]] with a [[org.apache.wayang.basic.function.ProjectionDescriptor]].
- *
- * @param fieldNames field names for the [[org.apache.wayang.basic.function.ProjectionDescriptor]]
- * @return a [[MapDataQuantaBuilder]]
- */
- def project[NewOut](fieldNames: Array[String]) = new ProjectionDataQuantaBuilder(this, fieldNames)
-
- /**
- * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.FilterOperator]].
- *
- * @param udf filter UDF
- * @return a [[FilterDataQuantaBuilder]]
- */
- def filter(udf: SerializablePredicate[Out]) = new FilterDataQuantaBuilder(this, udf)
-
- /**
- * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.FlatMapOperator]].
- *
- * @param udf the UDF for the [[org.apache.wayang.basic.operators.FlatMapOperator]]
- * @return a [[FlatMapDataQuantaBuilder]]
- */
- def flatMap[NewOut](udf: SerializableFunction[Out, java.lang.Iterable[NewOut]]) = new FlatMapDataQuantaBuilder(this, udf)
-
- /**
- * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.MapPartitionsOperator]].
- *
- * @param udf the UDF for the [[org.apache.wayang.basic.operators.MapPartitionsOperator]]
- * @return a [[MapPartitionsDataQuantaBuilder]]
- */
- def mapPartitions[NewOut](udf: SerializableFunction[java.lang.Iterable[Out], java.lang.Iterable[NewOut]]) =
- new MapPartitionsDataQuantaBuilder(this, udf)
-
- /**
- * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.SampleOperator]].
- *
- * @param sampleSize the absolute size of the sample
- * @return a [[SampleDataQuantaBuilder]]
- */
- def sample(sampleSize: Int): SampleDataQuantaBuilder[Out] = this.sample(new IntUnaryOperator {
- override def applyAsInt(operand: Int): Int = sampleSize
- })
-
-
- /**
- * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.SampleOperator]].
- *
- * @param sampleSizeFunction the absolute size of the sample as a function of the current iteration number
- * @return a [[SampleDataQuantaBuilder]]
- */
- def sample(sampleSizeFunction: IntUnaryOperator) = new SampleDataQuantaBuilder[Out](this, sampleSizeFunction)
-
- /**
- * Annotates a key to this instance.
- * @param keyExtractor extracts the key from the data quanta
- * @return a [[KeyedDataQuantaBuilder]]
- */
- def keyBy[Key](keyExtractor: SerializableFunction[Out, Key]) = new KeyedDataQuantaBuilder[Out, Key](this, keyExtractor)
-
- /**
- * Feed the built [[DataQuanta]] into a [[GlobalReduceOperator]].
- *
- * @param udf the UDF for the [[GlobalReduceOperator]]
- * @return a [[GlobalReduceDataQuantaBuilder]]
- */
- def reduce(udf: SerializableBinaryOperator[Out]) = new GlobalReduceDataQuantaBuilder(this, udf)
-
- /**
- * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.ReduceByOperator]].
- *
- * @param keyUdf the key UDF for the [[org.apache.wayang.basic.operators.ReduceByOperator]]
- * @param udf the UDF for the [[org.apache.wayang.basic.operators.ReduceByOperator]]
- * @return a [[ReduceByDataQuantaBuilder]]
- */
- def reduceByKey[Key](keyUdf: SerializableFunction[Out, Key], udf: SerializableBinaryOperator[Out]) =
- new ReduceByDataQuantaBuilder(this, keyUdf, udf)
-
- /**
- * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.MaterializedGroupByOperator]].
- *
- * @param keyUdf the key UDF for the [[org.apache.wayang.basic.operators.MaterializedGroupByOperator]]
- * @return a [[GroupByDataQuantaBuilder]]
- */
- def groupByKey[Key](keyUdf: SerializableFunction[Out, Key]) =
- new GroupByDataQuantaBuilder(this, keyUdf)
-
- /**
- * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.GlobalMaterializedGroupOperator]].
- *
- * @return a [[GlobalGroupDataQuantaBuilder]]
- */
- def group() = new GlobalGroupDataQuantaBuilder(this)
-
- /**
- * Feed the built [[DataQuanta]] of this and the given instance into a
- * [[org.apache.wayang.basic.operators.UnionAllOperator]].
- *
- * @param that the other [[DataQuantaBuilder]] to union with
- * @return a [[UnionDataQuantaBuilder]]
- */
- def union(that: DataQuantaBuilder[_, Out]) = new UnionDataQuantaBuilder(this, that)
-
- /**
- * Feed the built [[DataQuanta]] of this and the given instance into a
- * [[org.apache.wayang.basic.operators.IntersectOperator]].
- *
- * @param that the other [[DataQuantaBuilder]] to intersect with
- * @return an [[IntersectDataQuantaBuilder]]
- */
- def intersect(that: DataQuantaBuilder[_, Out]) = new IntersectDataQuantaBuilder(this, that)
-
- /**
- * Feed the built [[DataQuanta]] of this and the given instance into a
- * [[org.apache.wayang.basic.operators.JoinOperator]].
- *
- * @param thisKeyUdf the key extraction UDF for this instance
- * @param that the other [[DataQuantaBuilder]] to join with
- * @param thatKeyUdf the key extraction UDF for `that` instance
- * @return a [[JoinDataQuantaBuilder]]
- */
- def join[ThatOut, Key](thisKeyUdf: SerializableFunction[Out, Key],
- that: DataQuantaBuilder[_, ThatOut],
- thatKeyUdf: SerializableFunction[ThatOut, Key]) =
- new JoinDataQuantaBuilder(this, that, thisKeyUdf, thatKeyUdf)
-
- /**
- * Feed the built [[DataQuanta]] of this and the given instance into a
- * [[org.apache.wayang.basic.operators.CoGroupOperator]].
- *
- * @param thisKeyUdf the key extraction UDF for this instance
- * @param that the other [[DataQuantaBuilder]] to join with
- * @param thatKeyUdf the key extraction UDF for `that` instance
- * @return a [[CoGroupDataQuantaBuilder]]
- */
- def coGroup[ThatOut, Key](thisKeyUdf: SerializableFunction[Out, Key],
- that: DataQuantaBuilder[_, ThatOut],
- thatKeyUdf: SerializableFunction[ThatOut, Key]) =
- new CoGroupDataQuantaBuilder(this, that, thisKeyUdf, thatKeyUdf)
-
-
- /**
- * Feed the built [[DataQuanta]] of this and the given instance into a
- * [[org.apache.wayang.basic.operators.SortOperator]].
- *
- * @param keyUdf the key extraction UDF for this instance
- * @return a [[SortDataQuantaBuilder]]
- */
- def sort[Key](keyUdf: SerializableFunction[Out, Key]) =
- new SortDataQuantaBuilder(this, keyUdf)
-
- /**
- * Feed the built [[DataQuanta]] of this and the given instance into a
- * [[org.apache.wayang.basic.operators.CartesianOperator]].
- *
- * @return a [[CartesianDataQuantaBuilder]]
- */
- def cartesian[ThatOut](that: DataQuantaBuilder[_, ThatOut]) = new CartesianDataQuantaBuilder(this, that)
-
- /**
- * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.ZipWithIdOperator]].
- *
- * @return a [[ZipWithIdDataQuantaBuilder]] representing the [[org.apache.wayang.basic.operators.ZipWithIdOperator]]'s output
- */
- def zipWithId = new ZipWithIdDataQuantaBuilder(this)
-
- /**
- * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.DistinctOperator]].
- *
- * @return a [[DistinctDataQuantaBuilder]] representing the [[org.apache.wayang.basic.operators.DistinctOperator]]'s output
- */
- def distinct = new DistinctDataQuantaBuilder(this)
-
- /**
- * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.CountOperator]].
- *
- * @return a [[CountDataQuantaBuilder]] representing the [[org.apache.wayang.basic.operators.CountOperator]]'s output
- */
- def count = new CountDataQuantaBuilder(this)
-
- /**
- * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.DoWhileOperator]].
- *
- * @return a [[DoWhileDataQuantaBuilder]]
- */
- def doWhile[Conv](conditionUdf: SerializablePredicate[JavaCollection[Conv]],
- bodyBuilder: JavaFunction[DataQuantaBuilder[_, Out], WayangTuple[DataQuantaBuilder[_, Out], DataQuantaBuilder[_, Conv]]]) =
- new DoWhileDataQuantaBuilder(this, conditionUdf.asInstanceOf[SerializablePredicate[JavaCollection[Conv]]], bodyBuilder)
-
- /**
- * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.RepeatOperator]].
- *
- * @return a [[DoWhileDataQuantaBuilder]]
- */
- def repeat(numRepetitions: Int, bodyBuilder: JavaFunction[DataQuantaBuilder[_, Out], DataQuantaBuilder[_, Out]]) =
- new RepeatDataQuantaBuilder(this, numRepetitions, bodyBuilder)
-
- /**
- * Feed the built [[DataQuanta]] into a custom [[Operator]] with a single [[org.apache.wayang.core.plan.wayangplan.InputSlot]]
- * and a single [[OutputSlot]].
- *
- * @param operator the custom [[Operator]]
- * @tparam T the type of the output [[DataQuanta]]
- * @return a [[CustomOperatorDataQuantaBuilder]]
- */
- def customOperator[T](operator: Operator) = {
- assert(operator.getNumInputs == 1, "customOperator(...) only allows for operators with a single input.")
- assert(operator.getNumOutputs == 1, "customOperator(...) only allows for operators with a single output.")
- new CustomOperatorDataQuantaBuilder[T](operator, 0, new DataQuantaBuilderCache, this)
- }
-
- /**
- * Feed the built [[DataQuanta]] into a [[LocalCallbackSink]] that collects all data quanta locally. This triggers
- * execution of the constructed [[WayangPlan]].
- *
- * @return the collected data quanta
- */
- def collect(): JavaCollection[Out] = {
- import scala.collection.JavaConversions._
- this.dataQuanta().collect()
- }
-
- /**
- * Feed the built [[DataQuanta]] into a [[JavaFunction]] that runs locally. This triggers
- * execution of the constructed [[WayangPlan]].
- *
- * @param f the [[JavaFunction]]
- * @return the collected data quanta
- */
- def forEach(f: Consumer[Out]): Unit = this.dataQuanta().foreachJava(f)
-
- /**
- * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.TextFileSink]]. This triggers
- * execution of the constructed [[WayangPlan]].
- *
- * @param url the URL of the file to be written
- * @param jobName optional name for the [[WayangPlan]]
- * @return the collected data quanta
- */
- def writeTextFile(url: String, formatterUdf: SerializableFunction[Out, String], jobName: String): Unit =
- this.writeTextFile(url, formatterUdf, jobName, null)
-
- /**
- * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.TextFileSink]]. This triggers
- * execution of the constructed [[WayangPlan]].
- *
- * @param url the URL of the file to be written
- * @return the collected data quanta
- */
- def writeTextFile(url: String,
- formatterUdf: SerializableFunction[Out, String],
- jobName: String,
- udfLoadProfileEstimator: LoadProfileEstimator): Unit = {
- this.javaPlanBuilder.withJobName(jobName)
- this.dataQuanta().writeTextFileJava(url, formatterUdf, udfLoadProfileEstimator)
- }
-
- /**
- * Enriches the set of operations to [[Record]]-based ones. This instances must deal with data quanta of
- * type [[Record]], though. Because of Java's type erasure, we need to leave it up to you whether this
- * operation is applicable.
- *
- * @return a [[RecordDataQuantaBuilder]]
- */
- def asRecords[T <: RecordDataQuantaBuilder[T]]: RecordDataQuantaBuilder[T] = {
- this match {
- case records: RecordDataQuantaBuilder[_] => records.asInstanceOf[RecordDataQuantaBuilder[T]]
- case _ => new RecordDataQuantaBuilderDecorator(this.asInstanceOf[DataQuantaBuilder[_, Record]])
- }
- }
-
- /**
- * Enriches the set of operations to [[Edge]]-based ones. This instances must deal with data quanta of
- * type [[Edge]], though. Because of Java's type erasure, we need to leave it up to you whether this
- * operation is applicable.
- *
- * @return a [[EdgeDataQuantaBuilder]]
- */
- def asEdges[T <: EdgeDataQuantaBuilder[T]]: EdgeDataQuantaBuilder[T] = {
- this match {
- case edges: RecordDataQuantaBuilder[_] => edges.asInstanceOf[EdgeDataQuantaBuilder[T]]
- case _ => new EdgeDataQuantaBuilderDecorator(this.asInstanceOf[DataQuantaBuilder[_, Edge]])
- }
- }
-
- /**
- * Get or create the [[DataQuanta]] built by this instance.
- *
- * @return the [[DataQuanta]]
- */
- protected[api] def dataQuanta(): DataQuanta[Out]
-
-}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala
index 2c88f58..bcb916f 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala
@@ -21,7 +21,8 @@ package org.apache.wayang.api
import java.util.{Collection => JavaCollection}
import de.hpi.isg.profiledb.store.model.Experiment
import org.apache.commons.lang3.Validate
-import org.apache.wayang.api.dataquantabuilder.{CustomOperatorDataQuantaBuilder, LoadCollectionDataQuantaBuilder, UnarySourceDataQuantaBuilder}
+import org.apache.wayang.api.dataquanta.builder.{CustomOperatorDataQuantaBuilder, LoadCollectionDataQuantaBuilder, UnarySourceDataQuantaBuilder}
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
import org.apache.wayang.api.util.DataQuantaBuilderCache
import org.apache.wayang.basic.data.Record
import org.apache.wayang.basic.operators.{TableSource, TextFileSource}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala
index 8600e23..1c59ca9 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala
@@ -21,6 +21,7 @@ package org.apache.wayang.api
import de.hpi.isg.profiledb.store.model.Experiment
import org.apache.commons.lang3.Validate
import org.apache.wayang.api
+import org.apache.wayang.api.dataquanta.DataQuanta
import org.apache.wayang.basic.data.Record
import org.apache.wayang.basic.operators.{CollectionSource, TableSource, TextFileSource}
import org.apache.wayang.core.api.WayangContext
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/RecordDataQuanta.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/RecordDataQuanta.scala
index cc95e39..61c6909 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/RecordDataQuanta.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/RecordDataQuanta.scala
@@ -18,6 +18,7 @@
package org.apache.wayang.api
+import org.apache.wayang.api.dataquanta.DataQuanta
import org.apache.wayang.basic.data.Record
import org.apache.wayang.basic.function.ProjectionDescriptor
import org.apache.wayang.basic.operators.MapOperator
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/RecordDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/RecordDataQuantaBuilder.scala
index 5dc045b..0f1c1e2 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/RecordDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/RecordDataQuantaBuilder.scala
@@ -18,7 +18,8 @@
package org.apache.wayang.api
-import org.apache.wayang.api.dataquantabuilder.BasicDataQuantaBuilder
+import org.apache.wayang.api.dataquanta.builder.BasicDataQuantaBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
import org.apache.wayang.api.util.DataQuantaBuilderDecorator
import org.apache.wayang.basic.data.Record
import org.apache.wayang.basic.function.ProjectionDescriptor
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuanta.scala
similarity index 90%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuanta.scala
index 89722cc..a614467 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuanta.scala
@@ -1,29 +1,28 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.wayang.api
-
-import _root_.java.lang.{Iterable => JavaIterable}
-import _root_.java.util.function.{Consumer, IntUnaryOperator, BiFunction => JavaBiFunction, Function => JavaFunction}
-import _root_.java.util.{Collection => JavaCollection}
+package org.apache.wayang.api.dataquanta
import de.hpi.isg.profiledb.store.model.Experiment
import org.apache.commons.lang3.Validate
+import org.apache.wayang.api.{PlanBuilder, basicDataUnitType, dataSetType, groupedDataSetType, groupedDataUnitType, toConsumer, toSerializableBinaryOperator, toSerializableFlatteningFunction, toSerializableFunction, toSerializablePartitionFunction, toSerializablePredicate}
+import org.apache.wayang.basic.data.{Tuple2 => WayangTuple2}
import org.apache.wayang.basic.function.ProjectionDescriptor
import org.apache.wayang.basic.operators._
import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBinaryOperator, SerializableFunction, SerializablePredicate}
@@ -34,8 +33,10 @@ import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator
import org.apache.wayang.core.plan.wayangplan._
import org.apache.wayang.core.platform.Platform
import org.apache.wayang.core.util.{Tuple => WayangTuple}
-import org.apache.wayang.basic.data.{Tuple2 => WayangTuple2}
+import java.lang.{Iterable => JavaIterable}
+import java.util.function.{Consumer, IntUnaryOperator, BiFunction => JavaBiFunction, Function => JavaFunction}
+import java.util.{Collection => JavaCollection}
import scala.collection.JavaConversions
import scala.collection.JavaConversions._
import scala.reflect._
@@ -866,65 +867,6 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I
}
-/**
- * This class provides operations on [[DataQuanta]] with additional operations.
- */
-class KeyedDataQuanta[Out: ClassTag, Key: ClassTag](val dataQuanta: DataQuanta[Out],
- val keyExtractor: SerializableFunction[Out, Key]) {
-
- /**
- * Performs a join. The join fields are governed by the [[KeyedDataQuanta]]'s keys.
- *
- * @param that the other [[KeyedDataQuanta]] to join with
- * @return the join product [[DataQuanta]]
- */
- def join[ThatOut: ClassTag](that: KeyedDataQuanta[ThatOut, Key]):
- DataQuanta[WayangTuple2[Out, ThatOut]] =
- dataQuanta.joinJava[ThatOut, Key](this.keyExtractor, that.dataQuanta, that.keyExtractor)
-
- /**
- * Performs a co-group. The grouping fields are governed by the [[KeyedDataQuanta]]'s keys.
- *
- * @param that the other [[KeyedDataQuanta]] to co-group with
- * @return the co-grouped [[DataQuanta]]
- */
- def coGroup[ThatOut: ClassTag](that: KeyedDataQuanta[ThatOut, Key]):
- DataQuanta[WayangTuple2[java.lang.Iterable[Out], java.lang.Iterable[ThatOut]]] =
- dataQuanta.coGroupJava[ThatOut, Key](this.keyExtractor, that.dataQuanta, that.keyExtractor)
-
-}
-
-/**
- * This class amends joined [[DataQuanta]] with additional operations.
- */
-class JoinedDataQuanta[Out0: ClassTag, Out1: ClassTag]
-(val dataQuanta: DataQuanta[WayangTuple2[Out0, Out1]]) {
-
- /**
- * Assembles a new element from a join product tuple.
- *
- * @param udf creates the output data quantum from two joinable data quanta
- * @param udfLoad optional [[LoadProfileEstimator]] for the `udf`
- * @return the join product [[DataQuanta]]
- */
- def assemble[NewOut: ClassTag](udf: (Out0, Out1) => NewOut,
- udfLoad: LoadProfileEstimator = null):
- DataQuanta[NewOut] =
- dataQuanta.map(joinTuple => udf.apply(joinTuple.field0, joinTuple.field1), udfLoad)
-
- /**
- * Assembles a new element from a join product tuple.
- *
- * @param assembler creates the output data quantum from two joinable data quanta
- * @param udfLoad optional [[LoadProfileEstimator]] for the `udf`
- * @return the join product [[DataQuanta]]
- */
- def assembleJava[NewOut: ClassTag](assembler: JavaBiFunction[Out0, Out1, NewOut],
- udfLoad: LoadProfileEstimator = null): DataQuanta[NewOut] =
- dataQuanta.map(join => assembler.apply(join.field0, join.field1), udfLoad)
-
-}
-
object DataQuanta {
def create[T](output: OutputSlot[T])(implicit planBuilder: PlanBuilder): DataQuanta[_] =
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaBuilder.scala
new file mode 100644
index 0000000..c0a8e8e
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/DataQuantaBuilder.scala
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.wayang.api.dataquanta
+
+import de.hpi.isg.profiledb.store.model.Experiment
+import org.apache.wayang.api.dataquanta.builder.{CartesianDataQuantaBuilder, CoGroupDataQuantaBuilder, CountDataQuantaBuilder, CustomOperatorDataQuantaBuilder, DistinctDataQuantaBuilder, DoWhileDataQuantaBuilder, FilterDataQuantaBuilder, FlatMapDataQuantaBuilder, GlobalGroupDataQuantaBuilder, GlobalReduceDataQuantaBuilder, GroupByDataQuantaBuilder, IntersectDataQuantaBuilder, JoinDataQuantaBuilder, KeyedDataQuantaBuilder, MapDataQuantaBuilder, MapPartitionsDataQuantaBuilder, ProjectionDa [...]
+import org.apache.wayang.api.graph.{Edge, EdgeDataQuantaBuilder, EdgeDataQuantaBuilderDecorator}
+import org.apache.wayang.api.util.{DataQuantaBuilderCache, TypeTrap}
+import org.apache.wayang.api.{JavaPlanBuilder, RecordDataQuantaBuilder, RecordDataQuantaBuilderDecorator}
+import org.apache.wayang.basic.data.Record
+import org.apache.wayang.basic.operators.{GlobalReduceOperator, LocalCallbackSink, MapOperator}
+import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBinaryOperator, SerializableFunction, SerializablePredicate}
+import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator
+import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator
+import org.apache.wayang.core.plan.wayangplan.{Operator, OutputSlot, WayangPlan}
+import org.apache.wayang.core.platform.Platform
+import org.apache.wayang.core.types.DataSetType
+import org.apache.wayang.core.util.Logging
+import org.apache.wayang.core.util.{Tuple => WayangTuple}
+
+import java.util.function.{Consumer, IntUnaryOperator, Function => JavaFunction}
+import java.util.{Collection => JavaCollection}
+import scala.reflect.ClassTag
+
+/**
+ * Trait/interface for builders of [[DataQuanta]]. The purpose of the builders is to provide a convenient
+ * Java API for Wayang that compensates for lacking default and named arguments.
+ */
+trait DataQuantaBuilder[+This <: DataQuantaBuilder[_, Out], Out] extends Logging {
+
+ /**
+ * The type of the [[DataQuanta]] to be built.
+ */
+ protected[api] def outputTypeTrap: TypeTrap
+
+ /**
+ * Provide a [[JavaPlanBuilder]] to which this instance is associated.
+ */
+ protected[api] implicit def javaPlanBuilder: JavaPlanBuilder
+
+ /**
+ * Set a name for the [[DataQuanta]] and its associated [[org.apache.wayang.core.plan.wayangplan.Operator]]s.
+ *
+ * @param name the name
+ * @return this instance
+ */
+ def withName(name: String): This
+
+ /**
+ * Set an [[Experiment]] for the currently built [[org.apache.wayang.core.api.Job]].
+ *
+ * @param experiment the [[Experiment]]
+ * @return this instance
+ */
+ def withExperiment(experiment: Experiment): This
+
+ /**
+ * Explicitly set an output [[DataSetType]] for the currently built [[DataQuanta]]. Note that it is not
+ * always necessary to set it and that it can be inferred in some situations.
+ *
+ * @param outputType the output [[DataSetType]]
+ * @return this instance
+ */
+ def withOutputType(outputType: DataSetType[Out]): This
+
+ /**
+ * Explicitly set an output [[Class]] for the currently built [[DataQuanta]]. Note that it is not
+ * always necessary to set it and that it can be inferred in some situations.
+ *
+ * @param cls the output [[Class]]
+ * @return this instance
+ */
+ def withOutputClass(cls: Class[Out]): This
+
+ /**
+ * Register a broadcast with the [[DataQuanta]] to be built
+ *
+ * @param sender a [[DataQuantaBuilder]] constructing the broadcasted [[DataQuanta]]
+ * @param broadcastName the name of the broadcast
+ * @return this instance
+ */
+ def withBroadcast[Sender <: DataQuantaBuilder[_, _]](sender: Sender, broadcastName: String): This
+
+ /**
+ * Set a [[CardinalityEstimator]] for the currently built [[DataQuanta]].
+ *
+ * @param cardinalityEstimator the [[CardinalityEstimator]]
+ * @return this instance
+ */
+ def withCardinalityEstimator(cardinalityEstimator: CardinalityEstimator): This
+
+ /**
+ * Add a target [[Platform]] on which the currently built [[DataQuanta]] should be calculated. Can be invoked
+ * multiple times to set multiple possilbe target [[Platform]]s or not at all to impose no restrictions.
+ *
+ * @param platform the [[CardinalityEstimator]]
+ * @return this instance
+ */
+ def withTargetPlatform(platform: Platform): This
+
+ /**
+ * Register the JAR file containing the given [[Class]] with the currently built [[org.apache.wayang.core.api.Job]].
+ *
+ * @param cls the [[Class]]
+ * @return this instance
+ */
+ def withUdfJarOf(cls: Class[_]): This
+
+ /**
+ * Register a JAR file with the currently built [[org.apache.wayang.core.api.Job]].
+ *
+ * @param path the path of the JAR file
+ * @return this instance
+ */
+ def withUdfJar(path: String): This
+
+ /**
+ * Provide a [[ClassTag]] for the constructed [[DataQuanta]].
+ *
+ * @return the [[ClassTag]]
+ */
+ protected[api] implicit def classTag: ClassTag[Out] = ClassTag(outputTypeTrap.typeClass)
+
+ /**
+ * Feed the built [[DataQuanta]] into a [[MapOperator]].
+ *
+ * @param udf the UDF for the [[MapOperator]]
+ * @return a [[MapDataQuantaBuilder]]
+ */
+ def map[NewOut](udf: SerializableFunction[Out, NewOut]) = new MapDataQuantaBuilder(this, udf)
+
+ /**
+ * Feed the built [[DataQuanta]] into a [[MapOperator]] with a [[org.apache.wayang.basic.function.ProjectionDescriptor]].
+ *
+ * @param fieldNames field names for the [[org.apache.wayang.basic.function.ProjectionDescriptor]]
+ * @return a [[MapDataQuantaBuilder]]
+ */
+ def project[NewOut](fieldNames: Array[String]) = new ProjectionDataQuantaBuilder(this, fieldNames)
+
+ /**
+ * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.FilterOperator]].
+ *
+ * @param udf filter UDF
+ * @return a [[FilterDataQuantaBuilder]]
+ */
+ def filter(udf: SerializablePredicate[Out]) = new FilterDataQuantaBuilder(this, udf)
+
+ /**
+ * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.FlatMapOperator]].
+ *
+ * @param udf the UDF for the [[org.apache.wayang.basic.operators.FlatMapOperator]]
+ * @return a [[FlatMapDataQuantaBuilder]]
+ */
+ def flatMap[NewOut](udf: SerializableFunction[Out, java.lang.Iterable[NewOut]]) = new FlatMapDataQuantaBuilder(this, udf)
+
+ /**
+ * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.MapPartitionsOperator]].
+ *
+ * @param udf the UDF for the [[org.apache.wayang.basic.operators.MapPartitionsOperator]]
+ * @return a [[MapPartitionsDataQuantaBuilder]]
+ */
+ def mapPartitions[NewOut](udf: SerializableFunction[java.lang.Iterable[Out], java.lang.Iterable[NewOut]]) =
+ new MapPartitionsDataQuantaBuilder(this, udf)
+
+ /**
+ * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.SampleOperator]].
+ *
+ * @param sampleSize the absolute size of the sample
+ * @return a [[SampleDataQuantaBuilder]]
+ */
+ def sample(sampleSize: Int): SampleDataQuantaBuilder[Out] = this.sample(new IntUnaryOperator {
+ override def applyAsInt(operand: Int): Int = sampleSize
+ })
+
+
+ /**
+ * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.SampleOperator]].
+ *
+ * @param sampleSizeFunction the absolute size of the sample as a function of the current iteration number
+ * @return a [[SampleDataQuantaBuilder]]
+ */
+ def sample(sampleSizeFunction: IntUnaryOperator) = new SampleDataQuantaBuilder[Out](this, sampleSizeFunction)
+
+ /**
+ * Annotates a key to this instance.
+ *
+ * @param keyExtractor extracts the key from the data quanta
+ * @return a [[KeyedDataQuantaBuilder]]
+ */
+ def keyBy[Key](keyExtractor: SerializableFunction[Out, Key]) = new KeyedDataQuantaBuilder[Out, Key](this, keyExtractor)
+
+ /**
+ * Feed the built [[DataQuanta]] into a [[GlobalReduceOperator]].
+ *
+ * @param udf the UDF for the [[GlobalReduceOperator]]
+ * @return a [[GlobalReduceDataQuantaBuilder]]
+ */
+ def reduce(udf: SerializableBinaryOperator[Out]) = new GlobalReduceDataQuantaBuilder(this, udf)
+
+ /**
+ * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.ReduceByOperator]].
+ *
+ * @param keyUdf the key UDF for the [[org.apache.wayang.basic.operators.ReduceByOperator]]
+ * @param udf the UDF for the [[org.apache.wayang.basic.operators.ReduceByOperator]]
+ * @return a [[ReduceByDataQuantaBuilder]]
+ */
+ def reduceByKey[Key](keyUdf: SerializableFunction[Out, Key], udf: SerializableBinaryOperator[Out]) =
+ new ReduceByDataQuantaBuilder(this, keyUdf, udf)
+
+ /**
+ * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.MaterializedGroupByOperator]].
+ *
+ * @param keyUdf the key UDF for the [[org.apache.wayang.basic.operators.MaterializedGroupByOperator]]
+ * @return a [[GroupByDataQuantaBuilder]]
+ */
+ def groupByKey[Key](keyUdf: SerializableFunction[Out, Key]) =
+ new GroupByDataQuantaBuilder(this, keyUdf)
+
+ /**
+ * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.GlobalMaterializedGroupOperator]].
+ *
+ * @return a [[GlobalGroupDataQuantaBuilder]]
+ */
+ def group() = new GlobalGroupDataQuantaBuilder(this)
+
+ /**
+ * Feed the built [[DataQuanta]] of this and the given instance into a
+ * [[org.apache.wayang.basic.operators.UnionAllOperator]].
+ *
+ * @param that the other [[DataQuantaBuilder]] to union with
+ * @return a [[UnionDataQuantaBuilder]]
+ */
+ def union(that: DataQuantaBuilder[_, Out]) = new UnionDataQuantaBuilder(this, that)
+
+ /**
+ * Feed the built [[DataQuanta]] of this and the given instance into a
+ * [[org.apache.wayang.basic.operators.IntersectOperator]].
+ *
+ * @param that the other [[DataQuantaBuilder]] to intersect with
+ * @return an [[IntersectDataQuantaBuilder]]
+ */
+ def intersect(that: DataQuantaBuilder[_, Out]) = new IntersectDataQuantaBuilder(this, that)
+
+ /**
+ * Feed the built [[DataQuanta]] of this and the given instance into a
+ * [[org.apache.wayang.basic.operators.JoinOperator]].
+ *
+ * @param thisKeyUdf the key extraction UDF for this instance
+ * @param that the other [[DataQuantaBuilder]] to join with
+ * @param thatKeyUdf the key extraction UDF for `that` instance
+ * @return a [[JoinDataQuantaBuilder]]
+ */
+ def join[ThatOut, Key](thisKeyUdf: SerializableFunction[Out, Key],
+ that: DataQuantaBuilder[_, ThatOut],
+ thatKeyUdf: SerializableFunction[ThatOut, Key]) =
+ new JoinDataQuantaBuilder(this, that, thisKeyUdf, thatKeyUdf)
+
+ /**
+ * Feed the built [[DataQuanta]] of this and the given instance into a
+ * [[org.apache.wayang.basic.operators.CoGroupOperator]].
+ *
+ * @param thisKeyUdf the key extraction UDF for this instance
+ * @param that the other [[DataQuantaBuilder]] to join with
+ * @param thatKeyUdf the key extraction UDF for `that` instance
+ * @return a [[CoGroupDataQuantaBuilder]]
+ */
+ def coGroup[ThatOut, Key](thisKeyUdf: SerializableFunction[Out, Key],
+ that: DataQuantaBuilder[_, ThatOut],
+ thatKeyUdf: SerializableFunction[ThatOut, Key]) =
+ new CoGroupDataQuantaBuilder(this, that, thisKeyUdf, thatKeyUdf)
+
+
+ /**
+ * Feed the built [[DataQuanta]] of this and the given instance into a
+ * [[org.apache.wayang.basic.operators.SortOperator]].
+ *
+ * @param keyUdf the key extraction UDF for this instance
+ * @return a [[SortDataQuantaBuilder]]
+ */
+ def sort[Key](keyUdf: SerializableFunction[Out, Key]) =
+ new SortDataQuantaBuilder(this, keyUdf)
+
+ /**
+ * Feed the built [[DataQuanta]] of this and the given instance into a
+ * [[org.apache.wayang.basic.operators.CartesianOperator]].
+ *
+ * @return a [[CartesianDataQuantaBuilder]]
+ */
+ def cartesian[ThatOut](that: DataQuantaBuilder[_, ThatOut]) = new CartesianDataQuantaBuilder(this, that)
+
+ /**
+ * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.ZipWithIdOperator]].
+ *
+ * @return a [[ZipWithIdDataQuantaBuilder]] representing the [[org.apache.wayang.basic.operators.ZipWithIdOperator]]'s output
+ */
+ def zipWithId = new ZipWithIdDataQuantaBuilder(this)
+
+ /**
+ * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.DistinctOperator]].
+ *
+ * @return a [[DistinctDataQuantaBuilder]] representing the [[org.apache.wayang.basic.operators.DistinctOperator]]'s output
+ */
+ def distinct = new DistinctDataQuantaBuilder(this)
+
+ /**
+ * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.CountOperator]].
+ *
+ * @return a [[CountDataQuantaBuilder]] representing the [[org.apache.wayang.basic.operators.CountOperator]]'s output
+ */
+ def count = new CountDataQuantaBuilder(this)
+
+ /**
+ * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.DoWhileOperator]].
+ *
+ * @return a [[DoWhileDataQuantaBuilder]]
+ */
+ def doWhile[Conv](conditionUdf: SerializablePredicate[JavaCollection[Conv]],
+ bodyBuilder: JavaFunction[DataQuantaBuilder[_, Out], WayangTuple[DataQuantaBuilder[_, Out], DataQuantaBuilder[_, Conv]]]) =
+ new DoWhileDataQuantaBuilder(this, conditionUdf.asInstanceOf[SerializablePredicate[JavaCollection[Conv]]], bodyBuilder)
+
+ /**
+ * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.RepeatOperator]].
+ *
+ * @return a [[DoWhileDataQuantaBuilder]]
+ */
+ def repeat(numRepetitions: Int, bodyBuilder: JavaFunction[DataQuantaBuilder[_, Out], DataQuantaBuilder[_, Out]]) =
+ new RepeatDataQuantaBuilder(this, numRepetitions, bodyBuilder)
+
+ /**
+ * Feed the built [[DataQuanta]] into a custom [[Operator]] with a single [[org.apache.wayang.core.plan.wayangplan.InputSlot]]
+ * and a single [[OutputSlot]].
+ *
+ * @param operator the custom [[Operator]]
+ * @tparam T the type of the output [[DataQuanta]]
+ * @return a [[CustomOperatorDataQuantaBuilder]]
+ */
+ def customOperator[T](operator: Operator) = {
+ assert(operator.getNumInputs == 1, "customOperator(...) only allows for operators with a single input.")
+ assert(operator.getNumOutputs == 1, "customOperator(...) only allows for operators with a single output.")
+ new CustomOperatorDataQuantaBuilder[T](operator, 0, new DataQuantaBuilderCache, this)
+ }
+
+ /**
+ * Feed the built [[DataQuanta]] into a [[LocalCallbackSink]] that collects all data quanta locally. This triggers
+ * execution of the constructed [[WayangPlan]].
+ *
+ * @return the collected data quanta
+ */
+ def collect(): JavaCollection[Out] = {
+ import scala.collection.JavaConversions._
+ this.dataQuanta().collect()
+ }
+
+ /**
+ * Feed the built [[DataQuanta]] into a [[JavaFunction]] that runs locally. This triggers
+ * execution of the constructed [[WayangPlan]].
+ *
+ * @param f the [[JavaFunction]]
+ * @return the collected data quanta
+ */
+ def forEach(f: Consumer[Out]): Unit = this.dataQuanta().foreachJava(f)
+
+ /**
+ * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.TextFileSink]]. This triggers
+ * execution of the constructed [[WayangPlan]].
+ *
+ * @param url the URL of the file to be written
+ * @param jobName optional name for the [[WayangPlan]]
+ * @return the collected data quanta
+ */
+ def writeTextFile(url: String, formatterUdf: SerializableFunction[Out, String], jobName: String): Unit =
+ this.writeTextFile(url, formatterUdf, jobName, null)
+
+ /**
+ * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.TextFileSink]]. This triggers
+ * execution of the constructed [[WayangPlan]].
+ *
+ * @param url the URL of the file to be written
+ * @return the collected data quanta
+ */
+ def writeTextFile(url: String,
+ formatterUdf: SerializableFunction[Out, String],
+ jobName: String,
+ udfLoadProfileEstimator: LoadProfileEstimator): Unit = {
+ this.javaPlanBuilder.withJobName(jobName)
+ this.dataQuanta().writeTextFileJava(url, formatterUdf, udfLoadProfileEstimator)
+ }
+
+ /**
+ * Enriches the set of operations to [[Record]]-based ones. This instances must deal with data quanta of
+ * type [[Record]], though. Because of Java's type erasure, we need to leave it up to you whether this
+ * operation is applicable.
+ *
+ * @return a [[RecordDataQuantaBuilder]]
+ */
+ def asRecords[T <: RecordDataQuantaBuilder[T]]: RecordDataQuantaBuilder[T] = {
+ this match {
+ case records: RecordDataQuantaBuilder[_] => records.asInstanceOf[RecordDataQuantaBuilder[T]]
+ case _ => new RecordDataQuantaBuilderDecorator(this.asInstanceOf[DataQuantaBuilder[_, Record]])
+ }
+ }
+
+ /**
+ * Enriches the set of operations to [[Edge]]-based ones. This instances must deal with data quanta of
+ * type [[Edge]], though. Because of Java's type erasure, we need to leave it up to you whether this
+ * operation is applicable.
+ *
+ * @return a [[EdgeDataQuantaBuilder]]
+ */
+ def asEdges[T <: EdgeDataQuantaBuilder[T]]: EdgeDataQuantaBuilder[T] = {
+ this match {
+ case edges: RecordDataQuantaBuilder[_] => edges.asInstanceOf[EdgeDataQuantaBuilder[T]]
+ case _ => new EdgeDataQuantaBuilderDecorator(this.asInstanceOf[DataQuantaBuilder[_, Edge]])
+ }
+ }
+
+ /**
+ * Get or create the [[DataQuanta]] built by this instance.
+ *
+ * @return the [[DataQuanta]]
+ */
+ protected[api] def dataQuanta(): DataQuanta[Out]
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/JoinedDataQuanta.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/JoinedDataQuanta.scala
new file mode 100644
index 0000000..99768cc
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/JoinedDataQuanta.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.wayang.api.dataquanta
+
+import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator
+import org.apache.wayang.basic.data.{Tuple2 => WayangTuple2}
+
+import scala.reflect.ClassTag
+
+/**
+ * This class amends joined [[DataQuanta]] with additional operations.
+ */
+class JoinedDataQuanta[Out0: ClassTag, Out1: ClassTag]
+(val dataQuanta: DataQuanta[WayangTuple2[Out0, Out1]]) {
+
+ /**
+ * Assembles a new element from a join product tuple.
+ *
+ * @param udf creates the output data quantum from two joinable data quanta
+ * @param udfLoad optional [[LoadProfileEstimator]] for the `udf`
+ * @return the join product [[DataQuanta]]
+ */
+ def assemble[NewOut: ClassTag](udf: (Out0, Out1) => NewOut,
+ udfLoad: LoadProfileEstimator = null):
+ DataQuanta[NewOut] =
+ dataQuanta.map(joinTuple => udf.apply(joinTuple.field0, joinTuple.field1), udfLoad)
+
+ /**
+ * Assembles a new element from a join product tuple.
+ *
+ * @param assembler creates the output data quantum from two joinable data quanta
+ * @param udfLoad optional [[LoadProfileEstimator]] for the `udf`
+ * @return the join product [[DataQuanta]]
+ */
+ def assembleJava[NewOut: ClassTag](assembler: JavaBiFunction[Out0, Out1, NewOut],
+ udfLoad: LoadProfileEstimator = null): DataQuanta[NewOut] =
+ dataQuanta.map(join => assembler.apply(join.field0, join.field1), udfLoad)
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/KeyedDataQuanta.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/KeyedDataQuanta.scala
new file mode 100644
index 0000000..fb6e3f1
--- /dev/null
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/KeyedDataQuanta.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.wayang.api.dataquanta
+
+import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction
+import org.apache.wayang.basic.data.{Tuple2 => WayangTuple2}
+
+import scala.reflect.ClassTag
+
+/**
+ * This class provides operations on [[DataQuanta]] with additional operations.
+ */
+class KeyedDataQuanta[Out: ClassTag, Key: ClassTag](val dataQuanta: DataQuanta[Out],
+ val keyExtractor: SerializableFunction[Out, Key]) {
+
+ /**
+ * Performs a join. The join fields are governed by the [[KeyedDataQuanta]]'s keys.
+ *
+ * @param that the other [[KeyedDataQuanta]] to join with
+ * @return the join product [[DataQuanta]]
+ */
+ def join[ThatOut: ClassTag](that: KeyedDataQuanta[ThatOut, Key]):
+ DataQuanta[WayangTuple2[Out, ThatOut]] =
+ dataQuanta.joinJava[ThatOut, Key](this.keyExtractor, that.dataQuanta, that.keyExtractor)
+
+ /**
+ * Performs a co-group. The grouping fields are governed by the [[KeyedDataQuanta]]'s keys.
+ *
+ * @param that the other [[KeyedDataQuanta]] to co-group with
+ * @return the co-grouped [[DataQuanta]]
+ */
+ def coGroup[ThatOut: ClassTag](that: KeyedDataQuanta[ThatOut, Key]):
+ DataQuanta[WayangTuple2[java.lang.Iterable[Out], java.lang.Iterable[ThatOut]]] =
+ dataQuanta.coGroupJava[ThatOut, Key](this.keyExtractor, that.dataQuanta, that.keyExtractor)
+
+}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/BasicDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/BasicDataQuantaBuilder.scala
similarity index 96%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/BasicDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/BasicDataQuantaBuilder.scala
index 9aad1a1..78cb3bb 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/BasicDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/BasicDataQuantaBuilder.scala
@@ -17,11 +17,12 @@
* under the License.
*/
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
import de.hpi.isg.profiledb.store.model.Experiment
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
import org.apache.wayang.api.util.TypeTrap
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator
import org.apache.wayang.core.platform.Platform
import org.apache.wayang.core.types.DataSetType
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CartesianDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/CartesianDataQuantaBuilder.scala
similarity index 91%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CartesianDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/CartesianDataQuantaBuilder.scala
index 40d4fae..fad5ff8 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CartesianDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/CartesianDataQuantaBuilder.scala
@@ -17,9 +17,10 @@
* under the License.
*/
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder, dataSetType}
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
+import org.apache.wayang.api.{JavaPlanBuilder, dataSetType}
import org.apache.wayang.basic.data.{Tuple2 => WT2}
/**
* [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.CartesianOperator]]s.
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CoGroupDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/CoGroupDataQuantaBuilder.scala
similarity index 96%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CoGroupDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/CoGroupDataQuantaBuilder.scala
index aa1d88a..79f2b5f 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CoGroupDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/CoGroupDataQuantaBuilder.scala
@@ -17,9 +17,10 @@
* under the License.
*/
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder, dataSetType}
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
+import org.apache.wayang.api.{JavaPlanBuilder, dataSetType}
import org.apache.wayang.basic.data.{Tuple2 => WT2}
import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction
import org.apache.wayang.core.optimizer.costs.LoadEstimator
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CountDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/CountDataQuantaBuilder.scala
similarity index 88%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CountDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/CountDataQuantaBuilder.scala
index d8f7d18..9d51e7c 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CountDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/CountDataQuantaBuilder.scala
@@ -17,9 +17,10 @@
* under the License.
*/
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder, dataSetType}
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
+import org.apache.wayang.api.{JavaPlanBuilder, dataSetType}
/**
* [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.CountOperator]]s.
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CustomOperatorDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/CustomOperatorDataQuantaBuilder.scala
similarity index 93%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CustomOperatorDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/CustomOperatorDataQuantaBuilder.scala
index 5fca840..09c88f2 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/CustomOperatorDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/CustomOperatorDataQuantaBuilder.scala
@@ -17,10 +17,11 @@
* under the License.
*/
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
import org.apache.wayang.api.util.DataQuantaBuilderCache
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
import org.apache.wayang.core.plan.wayangplan.{Operator, OutputSlot, WayangPlan}
/**
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/DistinctDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/DistinctDataQuantaBuilder.scala
similarity index 90%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/DistinctDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/DistinctDataQuantaBuilder.scala
index 9e8e044..f112388 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/DistinctDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/DistinctDataQuantaBuilder.scala
@@ -17,10 +17,11 @@
* under the License.
*/
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
import org.apache.wayang.api.util.TypeTrap
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
/**
* [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.DistinctOperator]]s.
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/DoWhileDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/DoWhileDataQuantaBuilder.scala
similarity index 96%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/DoWhileDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/DoWhileDataQuantaBuilder.scala
index 34de7be..aabbb3a 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/DoWhileDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/DoWhileDataQuantaBuilder.scala
@@ -17,10 +17,11 @@
* under the License.
*/
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
import org.apache.wayang.api.util.TypeTrap
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
import org.apache.wayang.core.function.FunctionDescriptor.SerializablePredicate
import org.apache.wayang.core.optimizer.costs.{LoadProfile, LoadProfileEstimator}
import org.apache.wayang.core.types.DataSetType
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/FakeDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/FakeDataQuantaBuilder.scala
similarity index 90%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/FakeDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/FakeDataQuantaBuilder.scala
index 6b52c40..a8982d8 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/FakeDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/FakeDataQuantaBuilder.scala
@@ -17,9 +17,10 @@
* under the License.
*/
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
import scala.reflect.ClassTag
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/FilterDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/FilterDataQuantaBuilder.scala
similarity index 95%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/FilterDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/FilterDataQuantaBuilder.scala
index 3d33c80..ce546e3 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/FilterDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/FilterDataQuantaBuilder.scala
@@ -17,10 +17,11 @@
* under the License.
*/
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
import org.apache.wayang.api.util.TypeTrap
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
import org.apache.wayang.basic.operators.MapOperator
import org.apache.wayang.core.function.FunctionDescriptor.{SerializableFunction, SerializablePredicate}
import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/FlatMapDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/FlatMapDataQuantaBuilder.scala
similarity index 95%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/FlatMapDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/FlatMapDataQuantaBuilder.scala
index 83b4383..19aadc2 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/FlatMapDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/FlatMapDataQuantaBuilder.scala
@@ -17,9 +17,10 @@
* under the License.
*/
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction
import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
import org.apache.wayang.core.optimizer.costs.{LoadProfile, LoadProfileEstimator}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/GlobalGroupDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/GlobalGroupDataQuantaBuilder.scala
similarity index 88%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/GlobalGroupDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/GlobalGroupDataQuantaBuilder.scala
index abc1d9a..794d8c4 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/GlobalGroupDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/GlobalGroupDataQuantaBuilder.scala
@@ -17,9 +17,10 @@
* under the License.
*/
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
/**
* [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.GlobalMaterializedGroupOperator]]s.
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/GlobalReduceDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/GlobalReduceDataQuantaBuilder.scala
similarity index 94%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/GlobalReduceDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/GlobalReduceDataQuantaBuilder.scala
index c3efc95..dac4e4e 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/GlobalReduceDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/GlobalReduceDataQuantaBuilder.scala
@@ -17,10 +17,11 @@
* under the License.
*/
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
import org.apache.wayang.api.util.TypeTrap
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
import org.apache.wayang.core.function.FunctionDescriptor.SerializableBinaryOperator
import org.apache.wayang.core.optimizer.costs.{LoadProfile, LoadProfileEstimator}
import org.apache.wayang.core.types.DataSetType
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/GroupByDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/GroupByDataQuantaBuilder.scala
similarity index 94%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/GroupByDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/GroupByDataQuantaBuilder.scala
index b61c71e..3531271 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/GroupByDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/GroupByDataQuantaBuilder.scala
@@ -17,9 +17,10 @@
* under the License.
*/
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction
import org.apache.wayang.core.optimizer.costs.{LoadProfile, LoadProfileEstimator}
import org.apache.wayang.core.types.DataSetType
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/IntersectDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/IntersectDataQuantaBuilder.scala
similarity index 90%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/IntersectDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/IntersectDataQuantaBuilder.scala
index 6db8ffe..71fcf76 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/IntersectDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/IntersectDataQuantaBuilder.scala
@@ -17,9 +17,10 @@
* under the License.
*/
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
/**
* [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.IntersectOperator]]s.
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/JoinDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/JoinDataQuantaBuilder.scala
similarity index 97%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/JoinDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/JoinDataQuantaBuilder.scala
index ab49ffb..4cfb938 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/JoinDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/JoinDataQuantaBuilder.scala
@@ -17,9 +17,10 @@
* under the License.
*/
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder, dataSetType}
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
+import org.apache.wayang.api.{JavaPlanBuilder, dataSetType}
import org.apache.wayang.basic.data.{Tuple2 => WT2}
import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBiFunction, SerializableFunction}
import org.apache.wayang.core.optimizer.costs.LoadEstimator
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/KeyedDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/KeyedDataQuantaBuilder.scala
similarity index 92%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/KeyedDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/KeyedDataQuantaBuilder.scala
index 9ddc101..7e34472 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/KeyedDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/KeyedDataQuantaBuilder.scala
@@ -17,9 +17,10 @@
* under the License.
*/
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
-import org.apache.wayang.api.{DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.DataQuantaBuilder
import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction
/**
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/LoadCollectionDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/LoadCollectionDataQuantaBuilder.scala
similarity index 91%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/LoadCollectionDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/LoadCollectionDataQuantaBuilder.scala
index 748416e..d5ee73f 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/LoadCollectionDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/LoadCollectionDataQuantaBuilder.scala
@@ -17,9 +17,10 @@
* under the License.
*/
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
import org.apache.wayang.core.types.DataSetType
import org.apache.wayang.core.util.WayangCollections
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/MapDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/MapDataQuantaBuilder.scala
similarity index 94%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/MapDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/MapDataQuantaBuilder.scala
index deb15a9..f0927f9 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/MapDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/MapDataQuantaBuilder.scala
@@ -17,9 +17,10 @@
* under the License.
*/
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
import org.apache.wayang.basic.operators.MapOperator
import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction
import org.apache.wayang.core.optimizer.costs.{LoadProfile, LoadProfileEstimator}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/MapPartitionsDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/MapPartitionsDataQuantaBuilder.scala
similarity index 95%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/MapPartitionsDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/MapPartitionsDataQuantaBuilder.scala
index e686565..708cfc6 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/MapPartitionsDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/MapPartitionsDataQuantaBuilder.scala
@@ -17,9 +17,10 @@
* under the License.
*/
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction
import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
import org.apache.wayang.core.optimizer.costs.{LoadProfile, LoadProfileEstimator}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/ProjectionDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/ProjectionDataQuantaBuilder.scala
similarity index 90%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/ProjectionDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/ProjectionDataQuantaBuilder.scala
index 28ede1e..9e3f7ce 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/ProjectionDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/ProjectionDataQuantaBuilder.scala
@@ -17,9 +17,10 @@
* under the License.
*/
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
/**
* [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.MapOperator]]s with
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/ReduceByDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/ReduceByDataQuantaBuilder.scala
similarity index 96%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/ReduceByDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/ReduceByDataQuantaBuilder.scala
index ca14576..9dbd9bf 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/ReduceByDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/ReduceByDataQuantaBuilder.scala
@@ -17,10 +17,11 @@
* under the License.
*/
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
import org.apache.wayang.api.util.TypeTrap
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBinaryOperator, SerializableFunction}
import org.apache.wayang.core.optimizer.costs.{LoadProfile, LoadProfileEstimator}
import org.apache.wayang.core.types.DataSetType
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/RepeatDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/RepeatDataQuantaBuilder.scala
similarity index 92%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/RepeatDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/RepeatDataQuantaBuilder.scala
index f7170f7..5246823 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/RepeatDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/RepeatDataQuantaBuilder.scala
@@ -17,10 +17,11 @@
* under the License.
*/
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
import org.apache.wayang.api.util.TypeTrap
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
import java.util.function.{Function => JavaFunction}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/SampleDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/SampleDataQuantaBuilder.scala
similarity index 94%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/SampleDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/SampleDataQuantaBuilder.scala
index 0642fe6..19d0fea 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/SampleDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/SampleDataQuantaBuilder.scala
@@ -17,10 +17,11 @@
* under the License.
*/
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
import org.apache.wayang.api.util.TypeTrap
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
import org.apache.wayang.basic.operators.SampleOperator
import java.util.function.IntUnaryOperator
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/SortDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/SortDataQuantaBuilder.scala
similarity index 95%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/SortDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/SortDataQuantaBuilder.scala
index d1af25a..ab6511d 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/SortDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/SortDataQuantaBuilder.scala
@@ -17,10 +17,11 @@
* under the License.
*/
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
import org.apache.wayang.api.util.TypeTrap
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
import org.apache.wayang.core.function.FunctionDescriptor.SerializableFunction
import org.apache.wayang.core.optimizer.costs.LoadEstimator
import org.apache.wayang.core.types.DataSetType
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/UnarySourceDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/UnarySourceDataQuantaBuilder.scala
similarity index 89%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/UnarySourceDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/UnarySourceDataQuantaBuilder.scala
index d97b08b..9180e94 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/UnarySourceDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/UnarySourceDataQuantaBuilder.scala
@@ -17,9 +17,10 @@
* under the License.
*/
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
import org.apache.wayang.core.plan.wayangplan.UnarySource
/**
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/UnionDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/UnionDataQuantaBuilder.scala
similarity index 90%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/UnionDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/UnionDataQuantaBuilder.scala
index 04fc6ac..aae2f5e 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/UnionDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/UnionDataQuantaBuilder.scala
@@ -17,9 +17,10 @@
* under the License.
*/
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.api.JavaPlanBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
/**
* [[DataQuantaBuilder]] implementation for [[org.apache.wayang.basic.operators.UnionAllOperator]]s.
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/ZipWithIdDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/ZipWithIdDataQuantaBuilder.scala
similarity index 89%
rename from wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/ZipWithIdDataQuantaBuilder.scala
rename to wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/ZipWithIdDataQuantaBuilder.scala
index ded640c..e5d246a 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquantabuilder/ZipWithIdDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/dataquanta/builder/ZipWithIdDataQuantaBuilder.scala
@@ -17,9 +17,10 @@
* under the License.
*/
-package org.apache.wayang.api.dataquantabuilder
+package org.apache.wayang.api.dataquanta.builder
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder, dataSetType}
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
+import org.apache.wayang.api.{JavaPlanBuilder, dataSetType}
import org.apache.wayang.basic.data.{Tuple2 => WT2}
/**
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/graph/EdgeDataQuanta.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/graph/EdgeDataQuanta.scala
index 6ac73ac..99a1df6 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/graph/EdgeDataQuanta.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/graph/EdgeDataQuanta.scala
@@ -19,6 +19,7 @@
package org.apache.wayang.api.graph
import org.apache.wayang.api._
+import org.apache.wayang.api.dataquanta.DataQuanta
import org.apache.wayang.basic.data.Record
import org.apache.wayang.basic.operators.{MapOperator, PageRankOperator}
import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/graph/EdgeDataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/graph/EdgeDataQuantaBuilder.scala
index 1eaec53..526fb6a 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/graph/EdgeDataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/graph/EdgeDataQuantaBuilder.scala
@@ -18,9 +18,10 @@
package org.apache.wayang.api.graph
-import org.apache.wayang.api.dataquantabuilder.BasicDataQuantaBuilder
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
import org.apache.wayang.api.util.DataQuantaBuilderDecorator
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder, _}
+import org.apache.wayang.api.{JavaPlanBuilder,_ }
+import org.apache.wayang.api.dataquanta.builder.BasicDataQuantaBuilder
import org.apache.wayang.basic.operators.PageRankOperator
import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/graph/package.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/graph/package.scala
index e43fe81..a761e6a 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/graph/package.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/graph/package.scala
@@ -18,6 +18,7 @@
package org.apache.wayang.api
+import org.apache.wayang.api.dataquanta.DataQuanta
import org.apache.wayang.basic.data.{Tuple2 => T2}
/**
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/package.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/package.scala
index f17f5a5..7dfefd0 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/package.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/package.scala
@@ -18,9 +18,10 @@
package org.apache.wayang
+import org.apache.wayang.api.dataquanta.{DataQuanta, JoinedDataQuanta}
+
import _root_.java.lang.{Class => JavaClass, Iterable => JavaIterable}
import _root_.java.util.function.{Consumer, ToLongBiFunction, ToLongFunction}
-
import org.apache.wayang.basic.data.{Record, Tuple2 => WayangTuple2}
import org.apache.wayang.core.api.WayangContext
import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBinaryOperator, SerializableFunction, SerializablePredicate}
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/util/DataQuantaBuilderCache.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/util/DataQuantaBuilderCache.scala
index 21decc2..456efb4 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/util/DataQuantaBuilderCache.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/util/DataQuantaBuilderCache.scala
@@ -18,10 +18,10 @@
package org.apache.wayang.api.util
-import org.apache.wayang.api.DataQuanta
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
/**
- * Caches products of [[org.apache.wayang.api.DataQuantaBuilder]]s that need to be executed at once, e.g., because they
+ * Caches products of [[DataQuantaBuilder]]s that need to be executed at once, e.g., because they
* belong to different [[org.apache.wayang.core.plan.wayangplan.OutputSlot]]s of the same custom [[org.apache.wayang.core.plan.wayangplan.Operator]].
*/
class DataQuantaBuilderCache {
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/util/DataQuantaBuilderDecorator.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/util/DataQuantaBuilderDecorator.scala
index 8ea3dd2..a3b6b58 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/util/DataQuantaBuilderDecorator.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/util/DataQuantaBuilderDecorator.scala
@@ -19,7 +19,8 @@
package org.apache.wayang.api.util
import de.hpi.isg.profiledb.store.model.Experiment
-import org.apache.wayang.api.{DataQuanta, DataQuantaBuilder, JavaPlanBuilder}
+import org.apache.wayang.api.dataquanta.{DataQuanta, DataQuantaBuilder}
+import org.apache.wayang.api.JavaPlanBuilder
import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator
import org.apache.wayang.core.platform.Platform
import org.apache.wayang.core.types.DataSetType
diff --git a/wayang-api/wayang-api-scala-java/src/test/java/org/apache/wayang/api/JavaApiTest.java b/wayang-api/wayang-api-scala-java/src/test/java/org/apache/wayang/api/JavaApiTest.java
index 71996a0..b21ea79 100644
--- a/wayang-api/wayang-api-scala-java/src/test/java/org/apache/wayang/api/JavaApiTest.java
+++ b/wayang-api/wayang-api-scala-java/src/test/java/org/apache/wayang/api/JavaApiTest.java
@@ -18,8 +18,9 @@
package org.apache.wayang.api;
-import org.apache.wayang.api.dataquantabuilder.GlobalReduceDataQuantaBuilder;
-import org.apache.wayang.api.dataquantabuilder.LoadCollectionDataQuantaBuilder;
+import org.apache.wayang.api.dataquanta.DataQuantaBuilder;
+import org.apache.wayang.api.dataquanta.builder.GlobalReduceDataQuantaBuilder;
+import org.apache.wayang.api.dataquanta.builder.LoadCollectionDataQuantaBuilder;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
diff --git a/wayang-api/wayang-api-scala-java/src/test/scala/org/apache/wayang/api/ApiTest.scala b/wayang-api/wayang-api-scala-java/src/test/scala/org/apache/wayang/api/ApiTest.scala
index cc05bd2..0e703c6 100644
--- a/wayang-api/wayang-api-scala-java/src/test/scala/org/apache/wayang/api/ApiTest.scala
+++ b/wayang-api/wayang-api-scala-java/src/test/scala/org/apache/wayang/api/ApiTest.scala
@@ -18,12 +18,13 @@
package org.apache.wayang.api
+import org.apache.wayang.api.dataquanta.DataQuanta
+
import java.io.File
import java.net.URI
import java.nio.file.{Files, Paths}
import java.sql.{Connection, Statement}
import java.util.function.Consumer
-
import org.junit.{Assert, Test}
import org.apache.wayang.basic.WayangBasics
import org.apache.wayang.core.api.{Configuration, WayangContext}
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-api/pom.xml b/wayang-plugins/wayang-hackit/wayang-hackit-api/pom.xml
index 0839a6b..548f9e6 100644
--- a/wayang-plugins/wayang-hackit/wayang-hackit-api/pom.xml
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-api/pom.xml
@@ -30,12 +30,45 @@
<artifactId>wayang-hackit-api</artifactId>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.wayang</groupId>
+ <artifactId>wayang-commons</artifactId>
+ <version>0.6.0-SNAPSHOT</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
<dependencies>
<dependency>
<groupId>org.apache.wayang</groupId>
<artifactId>wayang-api-scala-java_2.11</artifactId>
<version>0.6.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.wayang</groupId>
+ <artifactId>wayang-java</artifactId>
+ <version>0.6.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/java/org/apache/wayang/api/JavaApiTest.java b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/java/org/apache/wayang/api/JavaApiTest.java
new file mode 100644
index 0000000..aaf1f16
--- /dev/null
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/java/org/apache/wayang/api/JavaApiTest.java
@@ -0,0 +1,711 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api;
+
+import org.apache.wayang.core.api.WayangContext;
+import org.apache.wayang.core.util.WayangCollections;
+import org.apache.wayang.java.Java;
+//import org.apache.wayang.spark.Spark;
+//import org.apache.wayang.sqlite3.Sqlite3;
+//import org.apache.wayang.sqlite3.operators.Sqlite3TableSource;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Test suite for the Java API.
+ */
+public class JavaApiTest {
+
+// private Configuration sqlite3Configuration;
+//
+// @Before
+// public void setUp() throws SQLException, IOException {
+// // Generate test data.
+// this.sqlite3Configuration = new Configuration();
+// File sqlite3dbFile = File.createTempFile("wayang-sqlite3", "db");
+// sqlite3dbFile.deleteOnExit();
+// this.sqlite3Configuration.setProperty("wayang.sqlite3.jdbc.url", "jdbc:sqlite:" + sqlite3dbFile.getAbsolutePath());
+// try (Connection connection = Sqlite3.platform().createDatabaseDescriptor(this.sqlite3Configuration).createJdbcConnection()) {
+// Statement statement = connection.createStatement();
+// statement.addBatch("DROP TABLE IF EXISTS customer;");
+// statement.addBatch("CREATE TABLE customer (name TEXT, age INT);");
+// statement.addBatch("INSERT INTO customer VALUES ('John', 20)");
+// statement.addBatch("INSERT INTO customer VALUES ('Timmy', 16)");
+// statement.addBatch("INSERT INTO customer VALUES ('Evelyn', 35)");
+// statement.executeBatch();
+// }
+// }
+
+ @Test
+ public void testMapReduce() {
+ WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
+ JavaPlanBuilder javaPlanBuilder = new JavaPlanBuilder(wayangContext);
+
+ List<Integer> inputCollection = Arrays.asList(0, 1, 2, 3, 4);
+ Collection<Integer> outputCollection = javaPlanBuilder
+ .loadCollection(inputCollection).withName("load numbers")
+ .map(i -> i * i).withName("square")
+ .reduce((a, b) -> a + b).withName("sum")
+ .collect();
+
+ Assert.assertEquals(WayangCollections.asSet(1 + 4 + 9 + 16), WayangCollections.asSet(outputCollection));
+ }
+
+// @Test
+// public void testMapReduceBy() {
+// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
+// JavaPlanBuilder javaPlanBuilder = new JavaPlanBuilder(wayangContext);
+//
+// List<Integer> inputCollection = Arrays.asList(0, 1, 2, 3, 4);
+// Collection<Integer> outputCollection = javaPlanBuilder
+// .loadCollection(inputCollection).withName("load numbers")
+// .map(i -> i * i).withName("square")
+// .reduceByKey(i -> i & 1, (a, b) -> a + b).withName("sum")
+// .collect();
+//
+// Assert.assertEquals(WayangCollections.asSet(4 + 16, 1 + 9), WayangCollections.asSet(outputCollection));
+// }
+//
+// @Test
+// public void testBroadcast2() {
+// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
+// JavaPlanBuilder javaPlanBuilder = new JavaPlanBuilder(wayangContext);
+//
+// List<Integer> inputCollection = Arrays.asList(0, 1, 2, 3, 4);
+// List<Integer> offsetCollection = Collections.singletonList(-2);
+//
+// LoadCollectionDataQuantaBuilder<Integer> offsetDataQuanta = javaPlanBuilder
+// .loadCollection(offsetCollection)
+// .withName("load offset");
+//
+// Collection<Integer> outputCollection = javaPlanBuilder
+// .loadCollection(inputCollection).withName("load numbers")
+// .map(new AddOffset("offset")).withName("add offset").withBroadcast(offsetDataQuanta, "offset")
+// .collect();
+//
+// Assert.assertEquals(WayangCollections.asSet(-2, -1, 0, 1, 2), WayangCollections.asSet(outputCollection));
+// }
+//
+// @Test
+// public void testCustomOperatorShortCut() {
+// // Set up WayangContext.
+// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
+//
+// final List<Integer> inputValues = WayangArrays.asList(0, 1, 2, 3);
+//
+// // Build and execute a Wayang plan.
+// final Collection<Integer> outputValues = new JavaPlanBuilder(wayangContext)
+// .loadCollection(inputValues).withName("Load input values")
+// .<Integer>customOperator(new JavaMapOperator<>(
+// DataSetType.createDefault(Integer.class),
+// DataSetType.createDefault(Integer.class),
+// new TransformationDescriptor<>(
+// i -> i + 2,
+// Integer.class, Integer.class
+// )
+// )).withName("Add 2")
+// .collect();
+//
+// // Check the outcome.
+// final List<Integer> expectedOutputValues = WayangArrays.asList(2, 3, 4, 5);
+// Assert.assertEquals(WayangCollections.asSet(expectedOutputValues), WayangCollections.asSet(outputValues));
+// }
+//
+// @Test
+// public void testWordCount() {
+// // Set up WayangContext.
+// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
+//
+// final List<String> inputValues = Arrays.asList("Big data is big.", "Is data big data?");
+//
+// // Build and execute a Wayang plan.
+// final Collection<Tuple2<String, Integer>> outputValues = new JavaPlanBuilder(wayangContext)
+// .loadCollection(inputValues).withName("Load input values")
+// .flatMap(line -> Arrays.asList(line.split("\\s+"))).withName("Split words")
+// .map(token -> token.replaceAll("\\W+", "").toLowerCase()).withName("To lower case")
+// .map(word -> new Tuple2<>(word, 1)).withName("Attach counter")
+// .reduceByKey(Tuple2::getField0, (t1, t2) -> new Tuple2<>(t1.field0, t1.field1 + t2.field1)).withName("Sum counters")
+// .collect();
+//
+// // Check the outcome.
+// final Set<Tuple2<String, Integer>> expectedOutputValues = WayangCollections.asSet(
+// new Tuple2<>("big", 3),
+// new Tuple2<>("is", 2),
+// new Tuple2<>("data", 3)
+// );
+// Assert.assertEquals(WayangCollections.asSet(expectedOutputValues), WayangCollections.asSet(outputValues));
+// }
+//
+// @Test
+// public void testWordCountOnSparkAndJava() {
+// // Set up WayangContext.
+// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()).with(Spark.basicPlugin());
+//
+// final List<String> inputValues = Arrays.asList("Big data is big.", "Is data big data?");
+//
+// // Build and execute a Wayang plan.
+// final Collection<Tuple2<String, Integer>> outputValues = new JavaPlanBuilder(wayangContext)
+// .loadCollection(inputValues).withName("Load input values")
+// .flatMap(line -> Arrays.asList(line.split("\\s+"))).withName("Split words")
+// .map(token -> token.replaceAll("\\W+", "").toLowerCase()).withName("To lower case")
+// .map(word -> new Tuple2<>(word, 1)).withName("Attach counter")
+// .reduceByKey(Tuple2::getField0, (t1, t2) -> new Tuple2<>(t1.field0, t1.field1 + t2.field1)).withName("Sum counters")
+// .collect();
+//
+// // Check the outcome.
+// final Set<Tuple2<String, Integer>> expectedOutputValues = WayangCollections.asSet(
+// new Tuple2<>("big", 3),
+// new Tuple2<>("is", 2),
+// new Tuple2<>("data", 3)
+// );
+// Assert.assertEquals(WayangCollections.asSet(expectedOutputValues), WayangCollections.asSet(outputValues));
+// }
+//
+// @Test
+// public void testSample() {
+// // Set up WayangContext.
+// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()).with(Spark.basicPlugin());
+//
+// // Create some input values.
+// final List<Integer> inputValues = WayangArrays.asList(WayangArrays.range(100));
+//
+// // Build and execute a Wayang plan.
+// final Collection<Integer> outputValues = new JavaPlanBuilder(wayangContext)
+// .loadCollection(inputValues).withName("Load input values")
+// .sample(10).withName("Sample")
+// .collect();
+//
+// // Check the outcome.
+// Assert.assertEquals(10, outputValues.size());
+// Assert.assertEquals(10, WayangCollections.asSet(outputValues).size());
+//
+// }
+//
+// @Test
+// public void testDoWhile() {
+// // Set up WayangContext.
+// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
+//
+// // Generate test data.
+// final List<Integer> inputValues = WayangArrays.asList(1, 2);
+//
+// // Build and execute a word count WayangPlan.
+//
+// final Collection<Integer> outputValues = new JavaPlanBuilder(wayangContext)
+// .loadCollection(inputValues).withName("Load input values")
+// .doWhile(
+// values -> values.stream().mapToInt(i -> i).sum() > 100,
+// start -> {
+// final GlobalReduceDataQuantaBuilder<Integer> sum =
+// start.reduce((a, b) -> a + b).withName("sum");
+// return new Tuple<>(
+// start.union(sum).withName("Old+new"),
+// sum
+// );
+// }
+// ).withConditionClass(Integer.class).withName("While <= 100")
+// .collect();
+//
+// Set<Integer> expectedValues = WayangCollections.asSet(1, 2, 3, 6, 12, 24, 48, 96, 192);
+// Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
+// }
+//
+// private static class AddOffset implements FunctionDescriptor.ExtendedSerializableFunction<Integer, Integer> {
+//
+// private final String broadcastName;
+//
+// private int offset;
+//
+// public AddOffset(String broadcastName) {
+// this.broadcastName = broadcastName;
+// }
+//
+// @Override
+// public void open(ExecutionContext ctx) {
+// this.offset = WayangCollections.getSingle(ctx.<Integer>getBroadcast(this.broadcastName));
+// }
+//
+// @Override
+// public Integer apply(Integer input) {
+// return input + this.offset;
+// }
+// }
+//
+// @Test
+// public void testRepeat() {
+// // Set up WayangContext.
+// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
+//
+// // Generate test data.
+// final List<Integer> inputValues = WayangArrays.asList(1, 2);
+//
+// // Build and execute a word count WayangPlan.
+//
+// final Collection<Integer> outputValues = new JavaPlanBuilder(wayangContext)
+// .loadCollection(inputValues).withName("Load input values")
+// .repeat(3, start -> start
+// .reduce((a, b) -> a * b).withName("Multiply")
+// .flatMap(v -> Arrays.asList(v, v + 1)).withName("Duplicate").withOutputClass(Integer.class)
+// ).withName("Repeat 3x")
+// .collect();
+//
+// Set<Integer> expectedValues = WayangCollections.asSet(42, 43);
+// Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
+// }
+//
+// private static class SelectWords implements PredicateDescriptor.ExtendedSerializablePredicate<String> {
+//
+// private final String broadcastName;
+//
+// private Collection<Character> selectors;
+//
+// public SelectWords(String broadcastName) {
+// this.broadcastName = broadcastName;
+// }
+//
+// @Override
+// public void open(ExecutionContext ctx) {
+// this.selectors = ctx.getBroadcast(this.broadcastName);
+// }
+//
+// @Override
+// public boolean test(String word) {
+// return this.selectors.stream().anyMatch(c -> word.indexOf(c) >= 0);
+// }
+// }
+//
+// @Test
+// public void testBroadcast() {
+// // Set up WayangContext.
+// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
+// JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
+//
+// // Generate test data.
+// final List<String> inputValues = Arrays.asList("Hello", "World", "Hi", "Mars");
+// final List<Character> selectors = Arrays.asList('o', 'l');
+//
+// // Execute the job.
+// final DataQuantaBuilder<?, Character> selectorsDataSet = builder.loadCollection(selectors).withName("Load selectors");
+// final Collection<String> outputValues = builder
+// .loadCollection(inputValues).withName("Load input values")
+// .filter(new SelectWords("selectors")).withName("Filter words")
+// .withBroadcast(selectorsDataSet, "selectors")
+// .collect();
+//
+// // Verify the outcome.
+// Set<String> expectedValues = WayangCollections.asSet("Hello", "World");
+// Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
+// }
+//
+// @Test
+// public void testGroupBy() {
+// // Set up WayangContext.
+// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
+// JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
+//
+// // Generate test data.
+// final List<Integer> inputValues = Arrays.asList(1, 2, 3, 4, 5, 7, 8, 9, 10);
+//
+// // Execute the job.
+// final Collection<Double> outputValues = builder
+// .loadCollection(inputValues).withName("Load input values")
+// .groupByKey(i -> i % 2).withName("group odd and even")
+// .map(group -> {
+// List<Integer> sortedGroup = StreamSupport.stream(group.spliterator(), false)
+// .sorted()
+// .collect(Collectors.toList());
+// int sizeDivTwo = sortedGroup.size() / 2;
+// return sortedGroup.size() % 2 == 0 ?
+// (sortedGroup.get(sizeDivTwo - 1) + sortedGroup.get(sizeDivTwo)) / 2d :
+// (double) sortedGroup.get(sizeDivTwo);
+// })
+// .collect();
+//
+// // Verify the outcome.
+// Set<Double> expectedValues = WayangCollections.asSet(5d, 6d);
+// Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
+// }
+//
+// @Test
+// public void testJoin() {
+// // Set up WayangContext.
+// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
+// JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
+//
+// // Generate test data.
+// final List<Tuple2<String, Integer>> inputValues1 = Arrays.asList(
+// new Tuple2<>("Water", 0),
+// new Tuple2<>("Tonic", 5),
+// new Tuple2<>("Juice", 10)
+// );
+// final List<Tuple2<String, String>> inputValues2 = Arrays.asList(
+// new Tuple2<>("Apple juice", "Juice"),
+// new Tuple2<>("Tap water", "Water"),
+// new Tuple2<>("Orange juice", "Juice")
+// );
+//
+// // Execute the job.
+// final LoadCollectionDataQuantaBuilder<Tuple2<String, Integer>> dataQuanta1 = builder.loadCollection(inputValues1);
+// final LoadCollectionDataQuantaBuilder<Tuple2<String, String>> dataQuanta2 = builder.loadCollection(inputValues2);
+// final Collection<Tuple2<String, Integer>> outputValues = dataQuanta1
+// .join(Tuple2::getField0, dataQuanta2, Tuple2::getField1)
+// .map(joinTuple -> new Tuple2<>(joinTuple.getField1().getField0(), joinTuple.getField0().getField1()))
+// .collect();
+//
+// // Verify the outcome.
+// Set<Tuple2<String, Integer>> expectedValues = WayangCollections.asSet(
+// new Tuple2<>("Apple juice", 10),
+// new Tuple2<>("Orange juice", 10),
+// new Tuple2<>("Tap water", 0)
+// );
+// Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
+// }
+//
+// @Test
+// public void testJoinAndAssemble() {
+// // Set up WayangContext.
+// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
+// JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
+//
+// // Generate test data.
+// final List<Tuple2<String, Integer>> inputValues1 = Arrays.asList(
+// new Tuple2<>("Water", 0),
+// new Tuple2<>("Tonic", 5),
+// new Tuple2<>("Juice", 10)
+// );
+// final List<Tuple2<String, String>> inputValues2 = Arrays.asList(
+// new Tuple2<>("Apple juice", "Juice"),
+// new Tuple2<>("Tap water", "Water"),
+// new Tuple2<>("Orange juice", "Juice")
+// );
+//
+// // Execute the job.
+// final LoadCollectionDataQuantaBuilder<Tuple2<String, Integer>> dataQuanta1 = builder.loadCollection(inputValues1);
+// final LoadCollectionDataQuantaBuilder<Tuple2<String, String>> dataQuanta2 = builder.loadCollection(inputValues2);
+// final Collection<Tuple2<String, Integer>> outputValues = dataQuanta1.keyBy(Tuple2::getField0)
+// .join(dataQuanta2.keyBy(Tuple2::getField1))
+// .assemble((val1, val2) -> new Tuple2<>(val2.getField0(), val1.getField1()))
+// .collect();
+//
+// // Verify the outcome.
+// Set<Tuple2<String, Integer>> expectedValues = WayangCollections.asSet(
+// new Tuple2<>("Apple juice", 10),
+// new Tuple2<>("Orange juice", 10),
+// new Tuple2<>("Tap water", 0)
+// );
+// Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
+// }
+//
+// @Test
+// public void testCoGroup() {
+// // Set up WayangContext.
+// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()).with(Spark.basicPlugin());
+// JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
+//
+// // Generate test data.
+// final List<Tuple2<String, Integer>> inputValues1 = Arrays.asList(
+// new Tuple2<>("Water", 0),
+// new Tuple2<>("Cola", 5),
+// new Tuple2<>("Juice", 10)
+// );
+// final List<Tuple2<String, String>> inputValues2 = Arrays.asList(
+// new Tuple2<>("Apple juice", "Juice"),
+// new Tuple2<>("Tap water", "Water"),
+// new Tuple2<>("Orange juice", "Juice")
+// );
+//
+// // Execute the job.
+// final LoadCollectionDataQuantaBuilder<Tuple2<String, Integer>> dataQuanta1 = builder.loadCollection(inputValues1);
+// final LoadCollectionDataQuantaBuilder<Tuple2<String, String>> dataQuanta2 = builder.loadCollection(inputValues2);
+// final Collection<Tuple2<Set<Tuple2<String, Integer>>, Set<Tuple2<String, String>>>> outputValues = dataQuanta1
+// .coGroup(Tuple2::getField0, dataQuanta2, Tuple2::getField1)
+// .map(joinTuple -> new Tuple2<>(
+// WayangCollections.asSet(joinTuple.getField0()),
+// WayangCollections.asSet(joinTuple.getField1())
+// ))
+// .collect();
+//
+// // Verify the outcome.
+// Set<Tuple2<Set<Tuple2<String, Integer>>, Set<Tuple2<String, String>>>> expectedValues = WayangCollections.asSet(
+// new Tuple2<>(
+// WayangCollections.asSet(new Tuple2<>("Water", 0)),
+// WayangCollections.asSet(new Tuple2<>("Tap water", "Water"))
+// ),
+// new Tuple2<>(
+// WayangCollections.asSet(new Tuple2<>("Cola", 5)),
+// WayangCollections.asSet()
+// ), new Tuple2<>(
+// WayangCollections.asSet(new Tuple2<>("Juice", 10)),
+// WayangCollections.asSet(new Tuple2<>("Apple juice", "Juice"), new Tuple2<>("Orange juice", "Juice"))
+// )
+// );
+// Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
+// }
+//
+// @Test
+// public void testCoGroupViaKeyBy() {
+// // Set up WayangContext.
+// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin()).with(Spark.basicPlugin());
+// JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
+//
+// // Generate test data.
+// final List<Tuple2<String, Integer>> inputValues1 = Arrays.asList(
+// new Tuple2<>("Water", 0),
+// new Tuple2<>("Cola", 5),
+// new Tuple2<>("Juice", 10)
+// );
+// final List<Tuple2<String, String>> inputValues2 = Arrays.asList(
+// new Tuple2<>("Apple juice", "Juice"),
+// new Tuple2<>("Tap water", "Water"),
+// new Tuple2<>("Orange juice", "Juice")
+// );
+//
+// // Execute the job.
+// final LoadCollectionDataQuantaBuilder<Tuple2<String, Integer>> dataQuanta1 = builder.loadCollection(inputValues1);
+// final LoadCollectionDataQuantaBuilder<Tuple2<String, String>> dataQuanta2 = builder.loadCollection(inputValues2);
+// final Collection<Tuple2<Set<Tuple2<String, Integer>>, Set<Tuple2<String, String>>>> outputValues =
+// dataQuanta1.keyBy(Tuple2::getField0)
+// .coGroup(dataQuanta2.keyBy(Tuple2::getField1))
+// .map(joinTuple -> new Tuple2<>(
+// WayangCollections.asSet(joinTuple.getField0()),
+// WayangCollections.asSet(joinTuple.getField1())
+// ))
+// .collect();
+//
+// // Verify the outcome.
+// Set<Tuple2<Set<Tuple2<String, Integer>>, Set<Tuple2<String, String>>>> expectedValues = WayangCollections.asSet(
+// new Tuple2<>(
+// WayangCollections.asSet(new Tuple2<>("Water", 0)),
+// WayangCollections.asSet(new Tuple2<>("Tap water", "Water"))
+// ),
+// new Tuple2<>(
+// WayangCollections.asSet(new Tuple2<>("Cola", 5)),
+// WayangCollections.asSet()
+// ), new Tuple2<>(
+// WayangCollections.asSet(new Tuple2<>("Juice", 10)),
+// WayangCollections.asSet(new Tuple2<>("Apple juice", "Juice"), new Tuple2<>("Orange juice", "Juice"))
+// )
+// );
+// Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
+// }
+//
+// @Test
+// public void testIntersect() {
+// // Set up WayangContext.
+// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
+// JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
+//
+// // Generate test data.
+// final List<Integer> inputValues1 = Arrays.asList(1, 2, 3, 4, 5, 7, 8, 9, 10);
+// final List<Integer> inputValues2 = Arrays.asList(0, 2, 3, 3, 4, 5, 7, 8, 9, 11);
+//
+// // Execute the job.
+// final LoadCollectionDataQuantaBuilder<Integer> dataQuanta1 = builder.loadCollection(inputValues1);
+// final LoadCollectionDataQuantaBuilder<Integer> dataQuanta2 = builder.loadCollection(inputValues2);
+// final Collection<Integer> outputValues = dataQuanta1.intersect(dataQuanta2).collect();
+//
+// // Verify the outcome.
+// Set<Integer> expectedValues = WayangCollections.asSet(2, 3, 4, 5, 7, 8, 9);
+// Assert.assertEquals(expectedValues, WayangCollections.asSet(outputValues));
+// }
+//
+// @Test
+// public void testSort() {
+// // Set up WayangContext.
+// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
+// JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
+//
+// // Generate test data.
+// final List<Integer> inputValues1 = Arrays.asList(3, 4, 5, 2, 1);
+//
+// // Execute the job.
+// final LoadCollectionDataQuantaBuilder<Integer> dataQuanta1 = builder.loadCollection(inputValues1);
+// final Collection<Integer> outputValues = dataQuanta1.sort(r -> r).collect();
+//
+// // Verify the outcome.
+// List<Integer> expectedValues = Arrays.asList(1, 2, 3, 4, 5);
+// Assert.assertEquals(expectedValues, WayangCollections.asList(outputValues));
+// }
+//
+//
+// @Test
+// public void testPageRank() {
+// // Set up WayangContext.
+// WayangContext wayangContext = new WayangContext()
+// .with(Java.basicPlugin())
+// .with(Java.graphPlugin());
+// JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
+//
+// // Create a test graph.
+// Collection<Tuple2<Long, Long>> edges = Arrays.asList(
+// new Tuple2<>(0L, 1L),
+// new Tuple2<>(0L, 2L),
+// new Tuple2<>(0L, 3L),
+// new Tuple2<>(1L, 0L),
+// new Tuple2<>(2L, 1L),
+// new Tuple2<>(3L, 2L),
+// new Tuple2<>(3L, 1L)
+// );
+//
+// // Execute the job.
+// Collection<Tuple2<Long, Float>> pageRanks = builder.loadCollection(edges).asEdges()
+// .pageRank(20)
+// .collect();
+// List<Tuple2<Long, Float>> sortedPageRanks = new ArrayList<>(pageRanks);
+// sortedPageRanks.sort((pr1, pr2) -> Float.compare(pr2.field1, pr1.field1));
+//
+// System.out.println(sortedPageRanks);
+// Assert.assertEquals(1L, sortedPageRanks.get(0).field0.longValue());
+// Assert.assertEquals(0L, sortedPageRanks.get(1).field0.longValue());
+// Assert.assertEquals(2L, sortedPageRanks.get(2).field0.longValue());
+// Assert.assertEquals(3L, sortedPageRanks.get(3).field0.longValue());
+// }
+//
+// @Test
+// public void testMapPartitions() {
+// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
+// JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
+//
+// // Generate test data.
+// List<Integer> inputValues = WayangArrays.asList(0, 1, 2, 3, 4, 6, 8);
+//
+// // Execute the job.
+// Collection<Tuple2<String, Integer>> outputValues = builder.loadCollection(inputValues)
+// .mapPartitions(partition -> {
+// int numEvens = 0, numOdds = 0;
+// for (Integer value : partition) {
+// if ((value & 1) == 0) numEvens++;
+// else numOdds++;
+// }
+// return Arrays.asList(
+// new Tuple2<>("odd", numOdds),
+// new Tuple2<>("even", numEvens)
+// );
+// })
+// .reduceByKey(Tuple2::getField0, (t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1()))
+// .collect();
+//
+// // Check the output.
+// Set<Tuple2<String, Integer>> expectedOutput = WayangCollections.asSet(
+// new Tuple2<>("even", 5), new Tuple2<>("odd", 2)
+// );
+// Assert.assertEquals(expectedOutput, WayangCollections.asSet(outputValues));
+// }
+//
+// @Test
+// public void testZipWithId() {
+// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
+// JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
+//
+// // Generate test data.
+// List<Integer> inputValues = new ArrayList<>(42 * 100);
+// for (int i = 0; i < 100; i++) {
+// for (int j = 0; j < 42; j++) {
+// inputValues.add(i);
+// }
+// }
+//
+// // Execute the job.
+// Collection<Tuple2<Integer, Integer>> outputValues = builder.loadCollection(inputValues)
+// .zipWithId()
+// .groupByKey(Tuple2::getField1)
+// .map(group -> {
+// int distinctIds = (int) StreamSupport.stream(group.spliterator(), false)
+// .map(Tuple2::getField0)
+// .distinct()
+// .count();
+// return new Tuple2<>(distinctIds, 1);
+// })
+// .reduceByKey(Tuple2::getField0, (t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1()))
+// .collect();
+//
+// // Check the output.
+// Set<Tuple2<Integer, Integer>> expectedOutput = Collections.singleton(new Tuple2<>(42, 100));
+// Assert.assertEquals(expectedOutput, WayangCollections.asSet(outputValues));
+// }
+//
+// @Test
+// public void testWriteTextFile() throws IOException, URISyntaxException {
+// WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
+// JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);
+//
+// // Generate test data.
+// List<Double> inputValues = Arrays.asList(0d, 1 / 3d, 2 / 3d, 1d, 4 / 3d, 5 / 3d);
+//
+// // Execute the job.
+// File tempDir = LocalFileSystem.findTempDir();
+// String targetUrl = LocalFileSystem.toURL(new File(tempDir, "testWriteTextFile.txt"));
+//
+// builder
+// .loadCollection(inputValues)
+// .writeTextFile(targetUrl, d -> String.format("%.2f", d), "testWriteTextFile()");
+//
+// // Check the output.
+// Set<String> actualLines = Files.lines(Paths.get(new URI(targetUrl))).collect(Collectors.toSet());
+// Set<String> expectedLines = inputValues.stream().map(d -> String.format("%.2f", d)).collect(Collectors.toSet());
+// Assert.assertEquals(expectedLines, actualLines);
+// }
+//
+// @Test
+// public void testSqlOnJava() throws IOException, SQLException {
+// // Execute job.
+// final WayangContext wayangCtx = new WayangContext(this.sqlite3Configuration)
+// .with(Java.basicPlugin())
+// .with(Sqlite3.plugin());
+// JavaPlanBuilder builder = new JavaPlanBuilder(wayangCtx, "testSqlOnJava()");
+// final Collection<String> outputValues = builder
+// .readTable(new Sqlite3TableSource("customer", "name", "age"))
+// .filter(r -> (Integer) r.getField(1) >= 18).withSqlUdf("age >= 18").withTargetPlatform(Java.platform())
+// .asRecords().projectRecords(new String[]{"name"})
+// .map(record -> (String) record.getField(0))
+// .collect();
+//
+// // Test the outcome.
+// Assert.assertEquals(
+// WayangCollections.asSet("John", "Evelyn"),
+// WayangCollections.asSet(outputValues)
+// );
+// }
+//
+// @Test
+// public void testSqlOnSqlite3() throws IOException, SQLException {
+// // Execute job.
+// final WayangContext wayangCtx = new WayangContext(this.sqlite3Configuration)
+// .with(Java.basicPlugin())
+// .with(Sqlite3.plugin());
+// JavaPlanBuilder builder = new JavaPlanBuilder(wayangCtx, "testSqlOnSqlite3()");
+// final Collection<String> outputValues = builder
+// .readTable(new Sqlite3TableSource("customer", "name", "age"))
+// .filter(r -> (Integer) r.getField(1) >= 18).withSqlUdf("age >= 18")
+// .asRecords().projectRecords(new String[]{"name"}).withTargetPlatform(Sqlite3.platform())
+// .map(record -> (String) record.getField(0))
+// .collect();
+//
+// // Test the outcome.
+// Assert.assertEquals(
+// WayangCollections.asSet("John", "Evelyn"),
+// WayangCollections.asSet(outputValues)
+// );
+// }
+
+}
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/scala/org/apache/wayang/api/ApiTest.scala b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/scala/org/apache/wayang/api/ApiTest.scala
new file mode 100644
index 0000000..ed1a7ac
--- /dev/null
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-api/src/test/scala/org/apache/wayang/api/ApiTest.scala
@@ -0,0 +1,575 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.api
+
+import org.apache.wayang.basic.WayangBasics
+import org.apache.wayang.core.api.{Configuration, WayangContext}
+import org.apache.wayang.core.function.FunctionDescriptor.ExtendedSerializablePredicate
+import org.apache.wayang.core.function.{ExecutionContext, TransformationDescriptor}
+import org.apache.wayang.core.util.fs.LocalFileSystem
+import org.apache.wayang.java.Java
+import org.junit.{Assert, Test}
+
+import java.io.File
+import java.net.URI
+import java.nio.file.{Files, Paths}
+import java.sql.{Connection, Statement}
+import java.util.function.Consumer
+
+/**
+ * Tests the Wayang API.
+ */
+class ApiTest {
+
+ @Test
+ def testReadMapCollect(): Unit = {
+ // Set up WayangContext.
+ val wayangContext = new WayangContext()
+ .withPlugin(Java.basicPlugin)
+// .withPlugin(Spark.basicPlugin)
+ // Generate some test data.
+ val inputValues = (for (i <- 1 to 10) yield i).toArray
+
+ // Build and execute a Wayang plan.
+ val outputValues = wayangContext
+ .loadCollection(inputValues).withName("Load input values")
+ .map(_ + 2).withName("Add 2")
+ .collect()
+
+ // Check the outcome.
+ val expectedOutputValues = inputValues.map(_ + 2)
+ Assert.assertArrayEquals(expectedOutputValues, outputValues.toArray)
+ }
+
+// @Test
+// def testCustomOperator(): Unit = {
+// // Set up WayangContext.
+// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
+//
+// // Generate some test data.
+// val inputValues = (for (i <- 1 to 10) yield i).toArray
+//
+// // Build and execute a Wayang plan.
+// val inputDataSet = wayang.loadCollection(inputValues).withName("Load input values")
+//
+// // Add the custom operator.
+// val IndexedSeq(addedValues) = wayang.customOperator(new JavaMapOperator(
+// dataSetType[Int],
+// dataSetType[Int],
+// new TransformationDescriptor(
+// toSerializableFunction[Int, Int](_ + 2),
+// basicDataUnitType[Int], basicDataUnitType[Int]
+// )
+// ), inputDataSet)
+// addedValues.withName("Add 2")
+//
+// // Collect the result.
+// val outputValues = addedValues.asInstanceOf[DataQuanta[Int]].collect()
+//
+// // Check the outcome.
+// val expectedOutputValues = inputValues.map(_ + 2)
+// Assert.assertArrayEquals(expectedOutputValues, outputValues.toArray)
+// }
+//
+// @Test
+// def testCustomOperatorShortCut(): Unit = {
+// // Set up WayangContext.
+// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
+//
+// // Generate some test data.
+// val inputValues = (for (i <- 1 to 10) yield i).toArray
+//
+// // Build and execute a Wayang plan.
+// val outputValues = wayang
+// .loadCollection(inputValues).withName("Load input values")
+// .customOperator[Int](new JavaMapOperator(
+// dataSetType[Int],
+// dataSetType[Int],
+// new TransformationDescriptor(
+// toSerializableFunction[Int, Int](_ + 2),
+// basicDataUnitType[Int], basicDataUnitType[Int]
+// )
+// )).withName("Add 2")
+// .collect()
+//
+// // Check the outcome.
+// val expectedOutputValues = inputValues.map(_ + 2)
+// Assert.assertArrayEquals(expectedOutputValues, outputValues.toArray)
+// }
+//
+// @Test
+// def testWordCount(): Unit = {
+// // Set up WayangContext.
+// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
+//
+// // Generate some test data.
+// val inputValues = Array("Big data is big.", "Is data big data?")
+//
+// // Build and execute a word count WayangPlan.
+// val wordCounts = wayang
+// .loadCollection(inputValues).withName("Load input values")
+// .flatMap(_.split("\\s+")).withName("Split words")
+// .map(_.replaceAll("\\W+", "").toLowerCase).withName("To lowercase")
+// .map((_, 1)).withName("Attach counter")
+// .reduceByKey(_._1, (a, b) => (a._1, a._2 + b._2)).withName("Sum counters")
+// .collect().toSet
+//
+// val expectedWordCounts = Set(("big", 3), ("is", 2), ("data", 3))
+//
+// Assert.assertEquals(expectedWordCounts, wordCounts)
+// }
+//
+// @Test
+// def testWordCountOnSparkAndJava(): Unit = {
+// // Set up WayangContext.
+// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
+//
+// // Generate some test data.
+// val inputValues = Array("Big data is big.", "Is data big data?")
+//
+// // Build and execute a word count WayangPlan.
+// val wordCounts = wayang
+// .loadCollection(inputValues).withName("Load input values").withTargetPlatforms(Java.platform)
+// .flatMap(_.split("\\s+")).withName("Split words").withTargetPlatforms(Java.platform)
+// .map(_.replaceAll("\\W+", "").toLowerCase).withName("To lowercase").withTargetPlatforms(Spark.platform)
+// .map((_, 1)).withName("Attach counter").withTargetPlatforms(Spark.platform)
+// .reduceByKey(_._1, (a, b) => (a._1, a._2 + b._2)).withName("Sum counters").withTargetPlatforms(Spark.platform)
+// .collect().toSet
+//
+// val expectedWordCounts = Set(("big", 3), ("is", 2), ("data", 3))
+//
+// Assert.assertEquals(expectedWordCounts, wordCounts)
+// }
+//
+// @Test
+// def testSample(): Unit = {
+// // Set up WayangContext.
+// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
+//
+// // Generate some test data.
+// val inputValues = for (i <- 0 until 100) yield i
+//
+// // Build and execute the WayangPlan.
+// val sample = wayang
+// .loadCollection(inputValues)
+// .sample(10)
+// .collect()
+//
+// // Check the result.
+// Assert.assertEquals(10, sample.size)
+// Assert.assertEquals(10, sample.toSet.size)
+// }
+//
+// @Test
+// def testDoWhile(): Unit = {
+// // Set up WayangContext.
+// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
+//
+// // Generate some test data.
+// val inputValues = Array(1, 2)
+//
+// // Build and execute a word count WayangPlan.
+//
+// val values = wayang
+// .loadCollection(inputValues).withName("Load input values")
+// .doWhile[Int](vals => vals.max > 100, {
+// start =>
+// val sum = start.reduce(_ + _).withName("Sum")
+// (start.union(sum).withName("Old+new"), sum)
+// }).withName("While <= 100")
+// .collect().toSet
+//
+// val expectedValues = Set(1, 2, 3, 6, 12, 24, 48, 96, 192)
+// Assert.assertEquals(expectedValues, values)
+// }
+//
+// @Test
+// def testRepeat(): Unit = {
+// // Set up WayangContext.
+// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
+//
+// // Generate some test data.
+// val inputValues = Array(1, 2)
+//
+// // Build and execute a word count WayangPlan.
+//
+// val values = wayang
+// .loadCollection(inputValues).withName("Load input values").withName(inputValues.mkString(","))
+// .repeat(3,
+// _.reduce(_ * _).withName("Multiply")
+// .flatMap(v => Seq(v, v + 1)).withName("Duplicate")
+// ).withName("Repeat 3x")
+// .collect().toSet
+//
+// // initial: 1,2 -> 1st: 2,3 -> 2nd: 6,7 => 3rd: 42,43
+// val expectedValues = Set(42, 43)
+// Assert.assertEquals(expectedValues, values)
+// }
+//
+// @Test
+// def testBroadcast() = {
+// // Set up WayangContext.
+// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
+// val builder = new PlanBuilder(wayang)
+//
+// // Generate some test data.
+// val inputStrings = Array("Hello", "World", "Hi", "Mars")
+// val selectors = Array('o', 'l')
+//
+// val selectorsDataSet = builder.loadCollection(selectors).withName("Load selectors")
+//
+// // Build and execute a word count WayangPlan.
+// val values = builder
+// .loadCollection(inputStrings).withName("Load input values")
+// .filterJava(new ExtendedSerializablePredicate[String] {
+//
+// var selectors: Iterable[Char] = _
+//
+// override def open(ctx: ExecutionContext): Unit = {
+// import scala.collection.JavaConversions._
+// selectors = collectionAsScalaIterable(ctx.getBroadcast[Char]("selectors"))
+// }
+//
+// override def test(t: String): Boolean = selectors.forall(selector => t.contains(selector))
+//
+// }).withName("Filter words")
+// .withBroadcast(selectorsDataSet, "selectors")
+// .collect().toSet
+//
+// val expectedValues = Set("Hello", "World")
+// Assert.assertEquals(expectedValues, values)
+// }
+//
+// @Test
+// def testGroupBy() = {
+// // Set up WayangContext.
+// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
+//
+// val inputValues = Array(1, 2, 3, 4, 5, 7, 8, 9, 10)
+//
+// val result = wayang
+// .loadCollection(inputValues)
+// .groupByKey(_ % 2).withName("group odd and even")
+// .map {
+// group =>
+// import scala.collection.JavaConversions._
+// val buffer = group.toBuffer
+// buffer.sortBy(identity)
+// if (buffer.size % 2 == 0) (buffer(buffer.size / 2 - 1) + buffer(buffer.size / 2)) / 2
+// else buffer(buffer.size / 2)
+// }.withName("median")
+// .collect()
+//
+// val expectedValues = Set(5, 6)
+// Assert.assertEquals(expectedValues, result.toSet)
+// }
+//
+// @Test
+// def testGroup() = {
+// // Set up WayangContext.
+// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
+//
+// val inputValues = Array(1, 2, 3, 4, 5, 7, 8, 9, 10)
+//
+// val result = wayang
+// .loadCollection(inputValues)
+// .group()
+// .map {
+// group =>
+// import scala.collection.JavaConversions._
+// val buffer = group.toBuffer
+// buffer.sortBy(int => int)
+// if (buffer.size % 2 == 0) (buffer(buffer.size / 2) + buffer(buffer.size / 2 + 1)) / 2
+// else buffer(buffer.size / 2)
+// }
+// .collect()
+//
+// val expectedValues = Set(5)
+// Assert.assertEquals(expectedValues, result.toSet)
+// }
+//
+// @Test
+// def testJoin() = {
+// // Set up WayangContext.
+// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
+//
+// val inputValues1 = Array(("Water", 0), ("Tonic", 5), ("Juice", 10))
+// val inputValues2 = Array(("Apple juice", "Juice"), ("Tap water", "Water"), ("Orange juice", "Juice"))
+//
+// val builder = new PlanBuilder(wayang)
+// val dataQuanta1 = builder.loadCollection(inputValues1)
+// val dataQuanta2 = builder.loadCollection(inputValues2)
+// val result = dataQuanta1
+// .join[(String, String), String](_._1, dataQuanta2, _._2)
+// .map(joinTuple => (joinTuple.field1._1, joinTuple.field0._2))
+// .collect()
+//
+// val expectedValues = Set(("Apple juice", 10), ("Tap water", 0), ("Orange juice", 10))
+// Assert.assertEquals(expectedValues, result.toSet)
+// }
+//
+// @Test
+// def testJoinAndAssemble() = {
+// // Set up WayangContext.
+// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
+//
+// val inputValues1 = Array(("Water", 0), ("Tonic", 5), ("Juice", 10))
+// val inputValues2 = Array(("Apple juice", "Juice"), ("Tap water", "Water"), ("Orange juice", "Juice"))
+//
+// val builder = new PlanBuilder(wayang)
+// val dataQuanta1 = builder.loadCollection(inputValues1)
+// val dataQuanta2 = builder.loadCollection(inputValues2)
+// val result = dataQuanta1.keyBy(_._1).join(dataQuanta2.keyBy(_._2))
+// .assemble((dq1, dq2) => (dq2._1, dq1._2))
+// .collect()
+//
+// val expectedValues = Set(("Apple juice", 10), ("Tap water", 0), ("Orange juice", 10))
+// Assert.assertEquals(expectedValues, result.toSet)
+// }
+//
+//
+// @Test
+// def testCoGroup() = {
+// // Set up WayangContext.
+// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
+//
+// val inputValues1 = Array(("Water", 0), ("Cola", 5), ("Juice", 10))
+// val inputValues2 = Array(("Apple juice", "Juice"), ("Tap water", "Water"), ("Orange juice", "Juice"))
+//
+// val builder = new PlanBuilder(wayang)
+// val dataQuanta1 = builder.loadCollection(inputValues1)
+// val dataQuanta2 = builder.loadCollection(inputValues2)
+// val result = dataQuanta1
+// .coGroup[(String, String), String](_._1, dataQuanta2, _._2)
+// .collect()
+//
+// import scala.collection.JavaConversions._
+// val actualValues = result.map(coGroup => (coGroup.field0.toSet, coGroup.field1.toSet)).toSet
+// val expectedValues = Set(
+// (Set(("Water", 0)), Set(("Tap water", "Water"))),
+// (Set(("Cola", 5)), Set()),
+// (Set(("Juice", 10)), Set(("Apple juice", "Juice"), ("Orange juice", "Juice")))
+// )
+// Assert.assertEquals(expectedValues, actualValues)
+// }
+//
+// @Test
+// def testIntersect() = {
+// // Set up WayangContext.
+// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
+//
+// val inputValues1 = Array(1, 2, 3, 4, 5, 7, 8, 9, 10)
+// val inputValues2 = Array(0, 2, 3, 3, 4, 5, 7, 8, 9, 11)
+//
+// val builder = new PlanBuilder(wayang)
+// val dataQuanta1 = builder.loadCollection(inputValues1)
+// val dataQuanta2 = builder.loadCollection(inputValues2)
+// val result = dataQuanta1
+// .intersect(dataQuanta2)
+// .collect()
+//
+// val expectedValues = Set(2, 3, 4, 5, 7, 8, 9)
+// Assert.assertEquals(expectedValues, result.toSet)
+// }
+//
+//
+// @Test
+// def testSort() = {
+// // Set up WayangContext.
+// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
+//
+// val inputValues1 = Array(3, 4, 5, 2, 1)
+//
+// val builder = new PlanBuilder(wayang)
+// val dataQuanta1 = builder.loadCollection(inputValues1)
+// val result = dataQuanta1
+// .sort(r=>r)
+// .collect()
+//
+// val expectedValues = Array(1, 2, 3, 4, 5)
+// Assert.assertArrayEquals(expectedValues, result.toArray)
+// }
+//
+//
+// @Test
+// def testPageRank() = {
+// // Set up WayangContext.
+// val wayang = new WayangContext()
+// .withPlugin(Java.graphPlugin)
+// .withPlugin(WayangBasics.graphPlugin)
+// .withPlugin(Java.basicPlugin)
+// import org.apache.wayang.api.graph._
+//
+// val edges = Seq((0, 1), (0, 2), (0, 3), (1, 0), (2, 1), (3, 2), (3, 1)).map(t => Edge(t._1, t._2))
+//
+// val pageRanks = wayang
+// .loadCollection(edges).withName("Load edges")
+// .pageRank(20).withName("PageRank")
+// .collect()
+// .map(t => t.field0.longValue -> t.field1)
+// .toMap
+//
+// print(pageRanks)
+// // Let's not check absolute numbers but only the relative ordering.
+// Assert.assertTrue(pageRanks(1) > pageRanks(0))
+// Assert.assertTrue(pageRanks(0) > pageRanks(2))
+// Assert.assertTrue(pageRanks(2) > pageRanks(3))
+// }
+//
+// @Test
+// def testMapPartitions() = {
+// // Set up WayangContext.
+// val wayang = new WayangContext()
+// .withPlugin(Java.basicPlugin())
+// .withPlugin(Spark.basicPlugin)
+//
+// val typeCounts = wayang
+// .loadCollection(Seq(0, 1, 2, 3, 4, 6, 8))
+// .mapPartitions { ints =>
+// var (numOdds, numEvens) = (0, 0)
+// ints.foreach(i => if ((i & 1) == 0) numEvens += 1 else numOdds += 1)
+// Seq(("odd", numOdds), ("even", numEvens))
+// }
+// .reduceByKey(_._1, { case ((kind1, count1), (kind2, count2)) => (kind1, count1 + count2) })
+// .collect()
+//
+// Assert.assertEquals(Set(("odd", 2), ("even", 5)), typeCounts.toSet)
+// }
+//
+// @Test
+// def testZipWithId() = {
+// // Set up WayangContext.
+// val wayang = new WayangContext().withPlugin(Java.basicPlugin).withPlugin(Spark.basicPlugin)
+//
+// val inputValues = for (i <- 0 until 100; j <- 0 until 42) yield i
+//
+// val result = wayang
+// .loadCollection(inputValues)
+// .zipWithId
+// .groupByKey(_.field1)
+// .map { group =>
+// import scala.collection.JavaConversions._
+// (group.map(_.field0).toSet.size, 1)
+// }
+// .reduceByKey(_._1, (t1, t2) => (t1._1, t1._2 + t2._2))
+// .collect()
+//
+// val expectedValues = Set((42, 100))
+// Assert.assertEquals(expectedValues, result.toSet)
+// }
+//
+// @Test
+// def testWriteTextFile() = {
+// val tempDir = LocalFileSystem.findTempDir
+// val targetUrl = LocalFileSystem.toURL(new File(tempDir, "testWriteTextFile.txt"))
+//
+// // Set up WayangContext.
+// val wayang = new WayangContext().withPlugin(Java.basicPlugin)
+//
+// val inputValues = for (i <- 0 to 5) yield i * 0.333333333333
+//
+// val result = wayang
+// .loadCollection(inputValues)
+// .writeTextFile(targetUrl, formatterUdf = d => f"${d % .2f}")
+//
+// val lines = scala.collection.mutable.Set[String]()
+// Files.lines(Paths.get(new URI(targetUrl))).forEach(new Consumer[String] {
+// override def accept(line: String): Unit = lines += line
+// })
+//
+// val expectedLines = inputValues.map(v => f"${v % .2f}").toSet
+// Assert.assertEquals(expectedLines, lines)
+// }
+//
+// @Test
+// def testSqlOnJava() = {
+// // Initialize some test data.
+// val configuration = new Configuration
+// val sqlite3dbFile = File.createTempFile("wayang-sqlite3", "db")
+// sqlite3dbFile.deleteOnExit()
+// configuration.setProperty("wayang.sqlite3.jdbc.url", "jdbc:sqlite:" + sqlite3dbFile.getAbsolutePath)
+//
+// try {
+// val connection: Connection = Sqlite3.platform.createDatabaseDescriptor(configuration).createJdbcConnection
+// try {
+// val statement: Statement = connection.createStatement
+// statement.addBatch("DROP TABLE IF EXISTS customer;")
+// statement.addBatch("CREATE TABLE customer (name TEXT, age INT);")
+// statement.addBatch("INSERT INTO customer VALUES ('John', 20)")
+// statement.addBatch("INSERT INTO customer VALUES ('Timmy', 16)")
+// statement.addBatch("INSERT INTO customer VALUES ('Evelyn', 35)")
+// statement.executeBatch()
+// } finally {
+// if (connection != null) connection.close()
+// }
+// }
+//
+// // Set up WayangContext.
+// val wayang = new WayangContext(configuration).withPlugin(Java.basicPlugin).withPlugin(Sqlite3.plugin)
+//
+// val result = wayang
+// .readTable(new Sqlite3TableSource("customer", "name", "age"))
+// .filter(r => r.getField(1).asInstanceOf[Integer] >= 18, sqlUdf = "age >= 18").withTargetPlatforms(Java.platform)
+// .projectRecords(Seq("name"))
+// .map(_.getField(0).asInstanceOf[String])
+// .collect()
+// .toSet
+//
+// val expectedValues = Set("John", "Evelyn")
+// Assert.assertEquals(expectedValues, result)
+// }
+//
+// @Test
+// def testSqlOnSqlite3() = {
+// // Initialize some test data.
+// val configuration = new Configuration
+// val sqlite3dbFile = File.createTempFile("wayang-sqlite3", "db")
+// sqlite3dbFile.deleteOnExit()
+// configuration.setProperty("wayang.sqlite3.jdbc.url", "jdbc:sqlite:" + sqlite3dbFile.getAbsolutePath)
+//
+// try {
+// val connection: Connection = Sqlite3.platform.createDatabaseDescriptor(configuration).createJdbcConnection
+// try {
+// val statement: Statement = connection.createStatement
+// statement.addBatch("DROP TABLE IF EXISTS customer;")
+// statement.addBatch("CREATE TABLE customer (name TEXT, age INT);")
+// statement.addBatch("INSERT INTO customer VALUES ('John', 20)")
+// statement.addBatch("INSERT INTO customer VALUES ('Timmy', 16)")
+// statement.addBatch("INSERT INTO customer VALUES ('Evelyn', 35)")
+// statement.executeBatch
+// } finally {
+// if (connection != null) connection.close()
+// }
+// }
+//
+// // Set up WayangContext.
+// val wayang = new WayangContext(configuration).withPlugin(Java.basicPlugin).withPlugin(Sqlite3.plugin)
+//
+// val result = wayang
+// .readTable(new Sqlite3TableSource("customer", "name", "age"))
+// .filter(r => r.getField(1).asInstanceOf[Integer] >= 18, sqlUdf = "age >= 18")
+// .projectRecords(Seq("name")).withTargetPlatforms(Sqlite3.platform)
+// .map(_.getField(0).asInstanceOf[String])
+// .collect()
+// .toSet
+//
+// val expectedValues = Set("John", "Evelyn")
+// Assert.assertEquals(expectedValues, result)
+// }
+}
diff --git a/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/tagger/HackitTagger.java b/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/tagger/HackitTagger.java
index 48002de..bb171ef 100644
--- a/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/tagger/HackitTagger.java
+++ b/wayang-plugins/wayang-hackit/wayang-hackit-core/src/main/java/org/apache/wayang/plugin/hackit/core/tagger/HackitTagger.java
@@ -32,6 +32,7 @@ import java.util.List;
* tagging step in Hackit, this logic have and pre and post processing and they are acting like
* template that follow same behaivor in every tagger
*/
+//TODO add the option of add a custom function
public class HackitTagger implements Serializable {
/**
diff --git a/wayang-tests-integration/src/test/java/org/apache/wayang/tests/RegressionIT.java b/wayang-tests-integration/src/test/java/org/apache/wayang/tests/RegressionIT.java
index 3b55a51..92384f2 100644
--- a/wayang-tests-integration/src/test/java/org/apache/wayang/tests/RegressionIT.java
+++ b/wayang-tests-integration/src/test/java/org/apache/wayang/tests/RegressionIT.java
@@ -21,8 +21,8 @@ package org.apache.wayang.tests;
import org.junit.Assert;
import org.junit.Test;
import org.apache.wayang.api.JavaPlanBuilder;
-import org.apache.wayang.api.dataquantabuilder.LoadCollectionDataQuantaBuilder;
-import org.apache.wayang.api.dataquantabuilder.MapDataQuantaBuilder;
+import org.apache.wayang.api.dataquanta.builder.LoadCollectionDataQuantaBuilder;
+import org.apache.wayang.api.dataquanta.builder.MapDataQuantaBuilder;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.core.util.WayangArrays;
import org.apache.wayang.java.Java;
diff --git a/wayang-tests-integration/src/test/java/org/apache/wayang/tests/WayangPlans.java b/wayang-tests-integration/src/test/java/org/apache/wayang/tests/WayangPlans.java
index 3062afb..88fa42f 100644
--- a/wayang-tests-integration/src/test/java/org/apache/wayang/tests/WayangPlans.java
+++ b/wayang-tests-integration/src/test/java/org/apache/wayang/tests/WayangPlans.java
@@ -18,7 +18,7 @@
package org.apache.wayang.tests;
-import org.apache.wayang.api.DataQuantaBuilder;
+import org.apache.wayang.api.dataquanta.DataQuantaBuilder;
import org.apache.wayang.api.JavaPlanBuilder;
import org.apache.wayang.basic.data.Record;
import org.apache.wayang.basic.data.Tuple2;